aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorChris Lu <chris.lu@gmail.com>2018-12-03 20:25:57 -0800
committerChris Lu <chris.lu@gmail.com>2018-12-03 20:25:57 -0800
commit4119c61df84305886e6116705f1ad08d3da69328 (patch)
tree8afbb29a6d361c15996610394c1943d83065b477
parent5b5018265857bdc83ae7673042223dc3f7020bbd (diff)
downloadseaweedfs-4119c61df84305886e6116705f1ad08d3da69328.tar.xz
seaweedfs-4119c61df84305886e6116705f1ad08d3da69328.zip
HCFS can read files
-rw-r--r--.gitignore1
-rw-r--r--other/java/hdfs/src/main/java/seaweed/hdfs/SeaweedFileSystem.java17
-rw-r--r--other/java/hdfs/src/main/java/seaweed/hdfs/SeaweedFileSystemStore.java31
-rw-r--r--other/java/hdfs/src/main/java/seaweed/hdfs/SeaweedInputStream.java6
-rw-r--r--other/java/hdfs/src/main/java/seaweed/hdfs/SeaweedRead.java37
5 files changed, 77 insertions, 15 deletions
diff --git a/.gitignore b/.gitignore
index 98a401933..a56dfb8a3 100644
--- a/.gitignore
+++ b/.gitignore
@@ -79,3 +79,4 @@ test_data
build
target
*.class
+other/java/hdfs/dependency-reduced-pom.xml
diff --git a/other/java/hdfs/src/main/java/seaweed/hdfs/SeaweedFileSystem.java b/other/java/hdfs/src/main/java/seaweed/hdfs/SeaweedFileSystem.java
index 79c2641ec..f2dd01385 100644
--- a/other/java/hdfs/src/main/java/seaweed/hdfs/SeaweedFileSystem.java
+++ b/other/java/hdfs/src/main/java/seaweed/hdfs/SeaweedFileSystem.java
@@ -13,9 +13,12 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
+import java.io.InputStream;
import java.io.OutputStream;
import java.net.URI;
+import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.IO_FILE_BUFFER_SIZE_KEY;
+
public class SeaweedFileSystem extends org.apache.hadoop.fs.FileSystem {
public static final int FS_SEAWEED_DEFAULT_PORT = 8888;
@@ -23,6 +26,7 @@ public class SeaweedFileSystem extends org.apache.hadoop.fs.FileSystem {
public static final String FS_SEAWEED_FILER_PORT = "fs.seaweed.filer.port";
private static final Logger LOG = LoggerFactory.getLogger(SeaweedFileSystem.class);
+ private static int BUFFER_SIZE = 16 * 1024 * 1024;
private URI uri;
private Path workingDirectory = new Path("/");
@@ -53,6 +57,8 @@ public class SeaweedFileSystem extends org.apache.hadoop.fs.FileSystem {
port = (port == -1) ? FS_SEAWEED_DEFAULT_PORT : port;
conf.setInt(FS_SEAWEED_FILER_PORT, port);
+ conf.setInt(IO_FILE_BUFFER_SIZE_KEY, BUFFER_SIZE);
+
setConf(conf);
this.uri = uri;
@@ -65,7 +71,12 @@ public class SeaweedFileSystem extends org.apache.hadoop.fs.FileSystem {
path = qualify(path);
- return null;
+ try {
+ InputStream inputStream = seaweedFileSystemStore.openFileForRead(path, statistics, bufferSize);
+ return new FSDataInputStream(inputStream);
+ } catch (Exception ex) {
+ return null;
+ }
}
public FSDataOutputStream create(Path path, FsPermission permission, final boolean overwrite, final int bufferSize,
@@ -77,7 +88,7 @@ public class SeaweedFileSystem extends org.apache.hadoop.fs.FileSystem {
try {
String replicaPlacement = String.format("%03d", replication - 1);
- OutputStream outputStream = seaweedFileSystemStore.createFile(path, overwrite, permission, replicaPlacement);
+ OutputStream outputStream = seaweedFileSystemStore.createFile(path, overwrite, permission, bufferSize, replicaPlacement);
return new FSDataOutputStream(outputStream, statistics);
} catch (Exception ex) {
return null;
@@ -90,7 +101,7 @@ public class SeaweedFileSystem extends org.apache.hadoop.fs.FileSystem {
path = qualify(path);
try {
- OutputStream outputStream = seaweedFileSystemStore.createFile(path, false, null, "");
+ OutputStream outputStream = seaweedFileSystemStore.createFile(path, false, null, bufferSize, "");
return new FSDataOutputStream(outputStream, statistics);
} catch (Exception ex) {
return null;
diff --git a/other/java/hdfs/src/main/java/seaweed/hdfs/SeaweedFileSystemStore.java b/other/java/hdfs/src/main/java/seaweed/hdfs/SeaweedFileSystemStore.java
index 7cc12424b..dd68e53f1 100644
--- a/other/java/hdfs/src/main/java/seaweed/hdfs/SeaweedFileSystemStore.java
+++ b/other/java/hdfs/src/main/java/seaweed/hdfs/SeaweedFileSystemStore.java
@@ -1,6 +1,7 @@
package seaweed.hdfs;
import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.security.UserGroupInformation;
@@ -9,7 +10,9 @@ import org.slf4j.LoggerFactory;
import seaweedfs.client.FilerGrpcClient;
import seaweedfs.client.FilerProto;
+import java.io.FileNotFoundException;
import java.io.IOException;
+import java.io.InputStream;
import java.io.OutputStream;
import java.util.ArrayList;
import java.util.Arrays;
@@ -35,7 +38,6 @@ public class SeaweedFileSystemStore {
if (isDirectory) {
p = p | 1 << 31;
}
- System.out.println(permission + " = " + p);
return p;
}
@@ -126,7 +128,7 @@ public class SeaweedFileSystemStore {
private FileStatus doGetFileStatus(Path path, FilerProto.Entry entry) {
FilerProto.FuseAttributes attributes = entry.getAttributes();
- long length = attributes.getFileSize();
+ long length = SeaweedRead.totalSize(entry.getChunksList());
boolean isDir = entry.getIsDirectory();
int block_replication = 1;
int blocksize = 512;
@@ -206,6 +208,7 @@ public class SeaweedFileSystemStore {
public OutputStream createFile(final Path path,
final boolean overwrite,
FsPermission permission,
+ int bufferSize,
String replication) throws IOException {
permission = permission == null ? FsPermission.getFileDefault() : permission;
@@ -226,7 +229,7 @@ public class SeaweedFileSystemStore {
entry.mergeFrom(existingEntry);
entry.getAttributesBuilder().setMtime(now);
}
- writePosition = existingEntry.getAttributes().getFileSize();
+ writePosition = SeaweedRead.totalSize(existingEntry.getChunksList());
replication = existingEntry.getAttributes().getReplication();
}
if (entry == null) {
@@ -243,7 +246,27 @@ public class SeaweedFileSystemStore {
);
}
- return new SeaweedOutputStream(filerGrpcClient, path, entry, writePosition, 16 * 1024 * 1024, replication);
+ return new SeaweedOutputStream(filerGrpcClient, path, entry, writePosition, bufferSize, replication);
}
+
+ public InputStream openFileForRead(final Path path, FileSystem.Statistics statistics,
+ int bufferSize) throws IOException {
+
+ LOG.debug("openFileForRead path:{} bufferSize:{}", path, bufferSize);
+
+ int readAheadQueueDepth = 2;
+ FilerProto.Entry entry = lookupEntry(path);
+
+ if (entry == null) {
+ throw new FileNotFoundException("read non-exist file " + path);
+ }
+
+ return new SeaweedInputStream(filerGrpcClient,
+ statistics,
+ path.toUri().getPath(),
+ entry,
+ bufferSize,
+ readAheadQueueDepth);
+ }
}
diff --git a/other/java/hdfs/src/main/java/seaweed/hdfs/SeaweedInputStream.java b/other/java/hdfs/src/main/java/seaweed/hdfs/SeaweedInputStream.java
index 0cd118f22..b31cae166 100644
--- a/other/java/hdfs/src/main/java/seaweed/hdfs/SeaweedInputStream.java
+++ b/other/java/hdfs/src/main/java/seaweed/hdfs/SeaweedInputStream.java
@@ -6,6 +6,8 @@ import com.google.common.base.Preconditions;
import org.apache.hadoop.fs.FSExceptionMessages;
import org.apache.hadoop.fs.FSInputStream;
import org.apache.hadoop.fs.FileSystem.Statistics;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
import seaweedfs.client.FilerGrpcClient;
import seaweedfs.client.FilerProto;
@@ -15,6 +17,8 @@ import java.util.List;
public class SeaweedInputStream extends FSInputStream {
+ private static final Logger LOG = LoggerFactory.getLogger(SeaweedInputStream.class);
+
private final FilerGrpcClient filerGrpcClient;
private final Statistics statistics;
private final String path;
@@ -45,7 +49,7 @@ public class SeaweedInputStream extends FSInputStream {
this.statistics = statistics;
this.path = path;
this.entry = entry;
- this.contentLength = entry.getAttributes().getFileSize();
+ this.contentLength = SeaweedRead.totalSize(entry.getChunksList());
this.bufferSize = bufferSize;
this.readAheadQueueDepth = (readAheadQueueDepth >= 0) ? readAheadQueueDepth : Runtime.getRuntime().availableProcessors();
this.readAheadEnabled = true;
diff --git a/other/java/hdfs/src/main/java/seaweed/hdfs/SeaweedRead.java b/other/java/hdfs/src/main/java/seaweed/hdfs/SeaweedRead.java
index bf3669d59..edc279adc 100644
--- a/other/java/hdfs/src/main/java/seaweed/hdfs/SeaweedRead.java
+++ b/other/java/hdfs/src/main/java/seaweed/hdfs/SeaweedRead.java
@@ -5,6 +5,8 @@ import org.apache.http.HttpResponse;
import org.apache.http.client.HttpClient;
import org.apache.http.client.methods.HttpGet;
import org.apache.http.impl.client.HttpClientBuilder;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
import seaweedfs.client.FilerGrpcClient;
import seaweedfs.client.FilerProto;
@@ -17,6 +19,8 @@ import java.util.Map;
public class SeaweedRead {
+ private static final Logger LOG = LoggerFactory.getLogger(SeaweedRead.class);
+
// returns bytesRead
public static long read(FilerGrpcClient filerGrpcClient, List<VisibleInterval> visibleIntervals,
final long position, final byte[] buffer, final int bufferOffset,
@@ -54,9 +58,11 @@ public class SeaweedRead {
HttpResponse response = client.execute(request);
HttpEntity entity = response.getEntity();
- readCount += entity.getContent().read(buffer,
- (int) (chunkView.logicOffset - position),
- (int) (chunkView.logicOffset - position + chunkView.size));
+ int len = (int) (chunkView.logicOffset - position + chunkView.size);
+ entity.getContent().read(buffer, bufferOffset, len);
+
+ LOG.debug("* read chunkView:{} length:{} position:{} bufferLength:{}", chunkView, len, position, bufferLength);
+ readCount += len;
} catch (IOException e) {
e.printStackTrace();
@@ -93,11 +99,17 @@ public class SeaweedRead {
List<VisibleInterval> newVisibles = new ArrayList<>();
List<VisibleInterval> visibles = new ArrayList<>();
for (FilerProto.FileChunk chunk : chunks) {
+ List<VisibleInterval> t = newVisibles;
newVisibles = mergeIntoVisibles(visibles, newVisibles, chunk);
- visibles.clear();
- List<VisibleInterval> t = visibles;
- visibles = newVisibles;
- newVisibles = t;
+ if (t != newVisibles) {
+ // visibles are changed in place
+ } else {
+ // newVisibles are modified
+ visibles.clear();
+ t = visibles;
+ visibles = newVisibles;
+ newVisibles = t;
+ }
}
return visibles;
@@ -168,6 +180,17 @@ public class SeaweedRead {
return fileId;
}
+ public static long totalSize(List<FilerProto.FileChunk> chunksList) {
+ long size = 0;
+ for (FilerProto.FileChunk chunk : chunksList) {
+ long t = chunk.getOffset() + chunk.getSize();
+ if (size < t) {
+ size = t;
+ }
+ }
+ return size;
+ }
+
public static class VisibleInterval {
long start;
long stop;