aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorChris Lu <chris.lu@gmail.com>2018-12-03 01:37:29 -0800
committerChris Lu <chris.lu@gmail.com>2018-12-03 01:37:29 -0800
commit5b5018265857bdc83ae7673042223dc3f7020bbd (patch)
tree293faecad3eb96089834098a19de1569a074bf1b
parent7ace0efd653518fd6ebebba0a4c5576fed94d6ae (diff)
downloadseaweedfs-5b5018265857bdc83ae7673042223dc3f7020bbd.tar.xz
seaweedfs-5b5018265857bdc83ae7673042223dc3f7020bbd.zip
put file is working
-rw-r--r--other/java/hdfs/src/main/java/seaweed/hdfs/SeaweedFileSystem.java53
-rw-r--r--other/java/hdfs/src/main/java/seaweed/hdfs/SeaweedFileSystemStore.java48
-rw-r--r--other/java/hdfs/src/main/java/seaweed/hdfs/SeaweedOutputStream.java9
3 files changed, 84 insertions, 26 deletions
diff --git a/other/java/hdfs/src/main/java/seaweed/hdfs/SeaweedFileSystem.java b/other/java/hdfs/src/main/java/seaweed/hdfs/SeaweedFileSystem.java
index 752758a4e..79c2641ec 100644
--- a/other/java/hdfs/src/main/java/seaweed/hdfs/SeaweedFileSystem.java
+++ b/other/java/hdfs/src/main/java/seaweed/hdfs/SeaweedFileSystem.java
@@ -9,8 +9,9 @@ import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.util.Progressable;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
-import java.io.FileNotFoundException;
import java.io.IOException;
import java.io.OutputStream;
import java.net.URI;
@@ -21,6 +22,8 @@ public class SeaweedFileSystem extends org.apache.hadoop.fs.FileSystem {
public static final String FS_SEAWEED_FILER_HOST = "fs.seaweed.filer.host";
public static final String FS_SEAWEED_FILER_PORT = "fs.seaweed.filer.port";
+ private static final Logger LOG = LoggerFactory.getLogger(SeaweedFileSystem.class);
+
private URI uri;
private Path workingDirectory = new Path("/");
private SeaweedFileSystemStore seaweedFileSystemStore;
@@ -57,6 +60,9 @@ public class SeaweedFileSystem extends org.apache.hadoop.fs.FileSystem {
}
public FSDataInputStream open(Path path, int bufferSize) throws IOException {
+
+ LOG.debug("open path: {} bufferSize:{}", path, bufferSize);
+
path = qualify(path);
return null;
@@ -64,6 +70,9 @@ public class SeaweedFileSystem extends org.apache.hadoop.fs.FileSystem {
public FSDataOutputStream create(Path path, FsPermission permission, final boolean overwrite, final int bufferSize,
final short replication, final long blockSize, final Progressable progress) throws IOException {
+
+ LOG.debug("create path: {} bufferSize:{} blockSize:{}", path, bufferSize, blockSize);
+
path = qualify(path);
try {
@@ -76,6 +85,9 @@ public class SeaweedFileSystem extends org.apache.hadoop.fs.FileSystem {
}
public FSDataOutputStream append(Path path, int bufferSize, Progressable progressable) throws IOException {
+
+ LOG.debug("append path: {} bufferSize:{}", path, bufferSize);
+
path = qualify(path);
try {
OutputStream outputStream = seaweedFileSystemStore.createFile(path, false, null, "");
@@ -85,7 +97,9 @@ public class SeaweedFileSystem extends org.apache.hadoop.fs.FileSystem {
}
}
- public boolean rename(Path src, Path dst) throws IOException {
+ public boolean rename(Path src, Path dst) {
+
+ LOG.debug("rename path: {} => {}", src, dst);
if (src.isRoot()) {
return false;
@@ -110,14 +124,17 @@ public class SeaweedFileSystem extends org.apache.hadoop.fs.FileSystem {
Path qualifiedDstPath = qualify(adjustedDst);
seaweedFileSystemStore.rename(qualifiedSrcPath, qualifiedDstPath);
- return false;
+ return true;
}
- public boolean delete(Path path, boolean recursive) throws IOException {
+ public boolean delete(Path path, boolean recursive) {
+
+ LOG.debug("delete path: {} recursive:{}", path, recursive);
path = qualify(path);
FileStatus fileStatus = getFileStatus(path);
+
if (fileStatus == null) {
return true;
}
@@ -126,7 +143,9 @@ public class SeaweedFileSystem extends org.apache.hadoop.fs.FileSystem {
}
- public FileStatus[] listStatus(Path path) throws FileNotFoundException, IOException {
+ public FileStatus[] listStatus(Path path) throws IOException {
+
+ LOG.debug("listStatus path: {}", path);
path = qualify(path);
@@ -147,25 +166,31 @@ public class SeaweedFileSystem extends org.apache.hadoop.fs.FileSystem {
public boolean mkdirs(Path path, FsPermission fsPermission) throws IOException {
+ LOG.debug("mkdirs path: {}", path);
+
path = qualify(path);
- try {
- FileStatus fileStatus = getFileStatus(path);
+ FileStatus fileStatus = getFileStatus(path);
+
+ if (fileStatus == null) {
- if (fileStatus.isDirectory()) {
- return true;
- } else {
- throw new FileAlreadyExistsException("Path is a file: " + path);
- }
- } catch (FileNotFoundException e) {
UserGroupInformation currentUser = UserGroupInformation.getCurrentUser();
return seaweedFileSystemStore.createDirectory(path, currentUser,
fsPermission == null ? FsPermission.getDirDefault() : fsPermission,
FsPermission.getUMask(getConf()));
+
+ }
+
+ if (fileStatus.isDirectory()) {
+ return true;
+ } else {
+ throw new FileAlreadyExistsException("Path is a file: " + path);
}
}
- public FileStatus getFileStatus(Path path) throws IOException {
+ public FileStatus getFileStatus(Path path) {
+
+ LOG.debug("getFileStatus path: {}", path);
path = qualify(path);
diff --git a/other/java/hdfs/src/main/java/seaweed/hdfs/SeaweedFileSystemStore.java b/other/java/hdfs/src/main/java/seaweed/hdfs/SeaweedFileSystemStore.java
index d58d07ea8..7cc12424b 100644
--- a/other/java/hdfs/src/main/java/seaweed/hdfs/SeaweedFileSystemStore.java
+++ b/other/java/hdfs/src/main/java/seaweed/hdfs/SeaweedFileSystemStore.java
@@ -9,7 +9,6 @@ import org.slf4j.LoggerFactory;
import seaweedfs.client.FilerGrpcClient;
import seaweedfs.client.FilerProto;
-import java.io.FileNotFoundException;
import java.io.IOException;
import java.io.OutputStream;
import java.util.ArrayList;
@@ -27,7 +26,7 @@ public class SeaweedFileSystemStore {
filerGrpcClient = new FilerGrpcClient(host, grpcPort);
}
- static String getParentDirectory(Path path) {
+ public static String getParentDirectory(Path path) {
return path.isRoot() ? "/" : path.getParent().toUri().getPath();
}
@@ -76,7 +75,7 @@ public class SeaweedFileSystemStore {
for (FilerProto.Entry entry : entries) {
- FileStatus fileStatus = getFileStatus(new Path(path, entry.getName()), entry);
+ FileStatus fileStatus = doGetFileStatus(new Path(path, entry.getName()), entry);
fileStatuses.add(fileStatus);
}
@@ -84,17 +83,23 @@ public class SeaweedFileSystemStore {
}
private List<FilerProto.Entry> lookupEntries(Path path) {
+
+ LOG.debug("listEntries path: {}", path);
+
return filerGrpcClient.getBlockingStub().listEntries(FilerProto.ListEntriesRequest.newBuilder()
.setDirectory(path.toUri().getPath())
.setLimit(100000)
.build()).getEntriesList();
}
- public FileStatus getFileStatus(final Path path) throws FileNotFoundException {
- LOG.debug("getFileStatus path: {}", path);
+ public FileStatus getFileStatus(final Path path) {
+ LOG.debug("doGetFileStatus path: {}", path);
FilerProto.Entry entry = lookupEntry(path);
- FileStatus fileStatus = getFileStatus(path, entry);
+ if (entry == null) {
+ return null;
+ }
+ FileStatus fileStatus = doGetFileStatus(path, entry);
return fileStatus;
}
@@ -119,7 +124,7 @@ public class SeaweedFileSystemStore {
return true;
}
- private FileStatus getFileStatus(Path path, FilerProto.Entry entry) {
+ private FileStatus doGetFileStatus(Path path, FilerProto.Entry entry) {
FilerProto.FuseAttributes attributes = entry.getAttributes();
long length = attributes.getFileSize();
boolean isDir = entry.getIsDirectory();
@@ -134,7 +139,7 @@ public class SeaweedFileSystemStore {
modification_time, access_time, permission, owner, group, null, path);
}
- private FilerProto.Entry lookupEntry(Path path) throws FileNotFoundException {
+ private FilerProto.Entry lookupEntry(Path path) {
String directory = getParentDirectory(path);
@@ -146,19 +151,31 @@ public class SeaweedFileSystemStore {
.build());
return response.getEntry();
} catch (io.grpc.StatusRuntimeException e) {
- throw new FileNotFoundException(e.getMessage());
+ return null;
}
}
- public void rename(Path source, Path destination) throws FileNotFoundException {
+ public void rename(Path source, Path destination) {
+
+ LOG.debug("rename source: {} destination:{}", source, destination);
+
if (source.isRoot()) {
return;
}
+ LOG.warn("rename lookupEntry source: {}", source);
FilerProto.Entry entry = lookupEntry(source);
+ if (entry == null) {
+ LOG.warn("rename non-existing source: {}", source);
+ return;
+ }
+ LOG.warn("rename moveEntry source: {}", source);
moveEntry(source.getParent(), entry, destination);
}
private boolean moveEntry(Path oldParent, FilerProto.Entry entry, Path destination) {
+
+ LOG.debug("moveEntry: {}/{} => {}", oldParent, entry.getName(), destination);
+
if (entry.getIsDirectory()) {
Path entryPath = new Path(oldParent, entry.getName());
List<FilerProto.Entry> entries = lookupEntries(entryPath);
@@ -170,8 +187,10 @@ public class SeaweedFileSystemStore {
}
}
+ FilerProto.Entry.Builder newEntry = entry.toBuilder().setName(destination.getName());
filerGrpcClient.getBlockingStub().createEntry(FilerProto.CreateEntryRequest.newBuilder()
.setDirectory(getParentDirectory(destination))
+ .setEntry(newEntry)
.build());
filerGrpcClient.getBlockingStub().deleteEntry(FilerProto.DeleteEntryRequest.newBuilder()
@@ -197,23 +216,28 @@ public class SeaweedFileSystemStore {
permission.toString());
UserGroupInformation userGroupInformation = UserGroupInformation.getCurrentUser();
+ long now = System.currentTimeMillis() / 1000L;
- FilerProto.Entry.Builder entry = FilerProto.Entry.newBuilder();
+ FilerProto.Entry.Builder entry = null;
long writePosition = 0;
if (!overwrite) {
FilerProto.Entry existingEntry = lookupEntry(path);
if (existingEntry != null) {
entry.mergeFrom(existingEntry);
+ entry.getAttributesBuilder().setMtime(now);
}
writePosition = existingEntry.getAttributes().getFileSize();
replication = existingEntry.getAttributes().getReplication();
}
if (entry == null) {
entry = FilerProto.Entry.newBuilder()
+ .setName(path.getName())
+ .setIsDirectory(false)
.setAttributes(FilerProto.FuseAttributes.newBuilder()
.setFileMode(permissionToMode(permission, false))
.setReplication(replication)
- .setCrtime(System.currentTimeMillis() / 1000L)
+ .setCrtime(now)
+ .setMtime(now)
.setUserName(userGroupInformation.getUserName())
.addAllGroupName(Arrays.asList(userGroupInformation.getGroupNames()))
);
diff --git a/other/java/hdfs/src/main/java/seaweed/hdfs/SeaweedOutputStream.java b/other/java/hdfs/src/main/java/seaweed/hdfs/SeaweedOutputStream.java
index 5da77c3a3..19894956a 100644
--- a/other/java/hdfs/src/main/java/seaweed/hdfs/SeaweedOutputStream.java
+++ b/other/java/hdfs/src/main/java/seaweed/hdfs/SeaweedOutputStream.java
@@ -7,6 +7,8 @@ import org.apache.hadoop.fs.FSExceptionMessages;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.StreamCapabilities;
import org.apache.hadoop.fs.Syncable;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
import seaweedfs.client.FilerGrpcClient;
import seaweedfs.client.FilerProto;
@@ -24,6 +26,8 @@ import java.util.concurrent.TimeUnit;
public class SeaweedOutputStream extends OutputStream implements Syncable, StreamCapabilities {
+ private static final Logger LOG = LoggerFactory.getLogger(SeaweedOutputStream.class);
+
private final FilerGrpcClient filerGrpcClient;
private final Path path;
private final int bufferSize;
@@ -71,6 +75,9 @@ public class SeaweedOutputStream extends OutputStream implements Syncable, Strea
}
private synchronized void flushWrittenBytesToServiceInternal(final long offset) throws IOException {
+
+ LOG.debug("SeaweedWrite.writeMeta path: {} entry:{}", path, entry);
+
try {
SeaweedWrite.writeMeta(filerGrpcClient, path, entry);
} catch (Exception ex) {
@@ -186,6 +193,7 @@ public class SeaweedOutputStream extends OutputStream implements Syncable, Strea
return;
}
+ LOG.debug("close path: {}", path);
try {
flushInternal();
threadExecutor.shutdown();
@@ -293,6 +301,7 @@ public class SeaweedOutputStream extends OutputStream implements Syncable, Strea
throw lastError;
}
}
+ LOG.debug("flushWrittenBytesToService: {} position:{}", path, position);
flushWrittenBytesToServiceInternal(position);
}