aboutsummaryrefslogtreecommitdiff
path: root/other/java
diff options
context:
space:
mode:
Diffstat (limited to 'other/java')
-rw-r--r--other/java/client/pom.xml2
-rw-r--r--other/java/client/pom.xml.deploy2
-rw-r--r--other/java/client/pom_debug.xml2
-rw-r--r--other/java/client/src/main/java/seaweedfs/client/FileChunkManifest.java26
-rw-r--r--other/java/client/src/main/java/seaweedfs/client/FilerClient.java24
-rw-r--r--other/java/client/src/main/java/seaweedfs/client/FilerGrpcClient.java46
-rw-r--r--other/java/client/src/main/java/seaweedfs/client/SeaweedInputStream.java (renamed from other/java/hdfs3/src/main/java/seaweed/hdfs/SeaweedInputStream.java)119
-rw-r--r--other/java/client/src/main/java/seaweedfs/client/SeaweedOutputStream.java (renamed from other/java/hdfs2/src/main/java/seaweed/hdfs/SeaweedOutputStream.java)90
-rw-r--r--other/java/client/src/main/java/seaweedfs/client/SeaweedRead.java20
-rw-r--r--other/java/client/src/main/java/seaweedfs/client/SeaweedWrite.java24
-rw-r--r--other/java/examples/pom.xml4
-rw-r--r--other/java/examples/src/main/java/com/seaweedfs/examples/ExampleReadFile.java (renamed from other/java/examples/src/main/java/com/seaweedfs/examples/UnzipFile.java)14
-rw-r--r--other/java/examples/src/main/java/com/seaweedfs/examples/ExampleWatchFileChanges.java (renamed from other/java/examples/src/main/java/com/seaweedfs/examples/WatchFiles.java)2
-rw-r--r--other/java/examples/src/main/java/com/seaweedfs/examples/ExampleWriteFile.java47
-rw-r--r--other/java/examples/src/main/java/com/seaweedfs/examples/HdfsCopyFile.java25
-rw-r--r--other/java/hdfs2/dependency-reduced-pom.xml2
-rw-r--r--other/java/hdfs2/pom.xml2
-rw-r--r--other/java/hdfs2/src/main/java/seaweed/hdfs/SeaweedFileSystem.java1
-rw-r--r--other/java/hdfs2/src/main/java/seaweed/hdfs/SeaweedFileSystemStore.java22
-rw-r--r--other/java/hdfs2/src/main/java/seaweed/hdfs/SeaweedHadoopInputStream.java150
-rw-r--r--other/java/hdfs2/src/main/java/seaweed/hdfs/SeaweedHadoopOutputStream.java16
-rw-r--r--other/java/hdfs2/src/main/java/seaweed/hdfs/SeaweedInputStream.java259
-rw-r--r--other/java/hdfs3/dependency-reduced-pom.xml2
-rw-r--r--other/java/hdfs3/pom.xml2
-rw-r--r--other/java/hdfs3/src/main/java/seaweed/hdfs/SeaweedFileSystem.java1
-rw-r--r--other/java/hdfs3/src/main/java/seaweed/hdfs/SeaweedFileSystemStore.java22
-rw-r--r--other/java/hdfs3/src/main/java/seaweed/hdfs/SeaweedHadoopInputStream.java150
-rw-r--r--other/java/hdfs3/src/main/java/seaweed/hdfs/SeaweedHadoopOutputStream.java64
-rw-r--r--other/java/hdfs3/src/main/java/seaweed/hdfs/SeaweedOutputStream.java337
29 files changed, 679 insertions, 798 deletions
diff --git a/other/java/client/pom.xml b/other/java/client/pom.xml
index 4bfc5ab8f..056904ebe 100644
--- a/other/java/client/pom.xml
+++ b/other/java/client/pom.xml
@@ -5,7 +5,7 @@
<groupId>com.github.chrislusf</groupId>
<artifactId>seaweedfs-client</artifactId>
- <version>1.5.6</version>
+ <version>1.6.1</version>
<parent>
<groupId>org.sonatype.oss</groupId>
diff --git a/other/java/client/pom.xml.deploy b/other/java/client/pom.xml.deploy
index c3c960a28..69b900017 100644
--- a/other/java/client/pom.xml.deploy
+++ b/other/java/client/pom.xml.deploy
@@ -5,7 +5,7 @@
<groupId>com.github.chrislusf</groupId>
<artifactId>seaweedfs-client</artifactId>
- <version>1.5.6</version>
+ <version>1.6.1</version>
<parent>
<groupId>org.sonatype.oss</groupId>
diff --git a/other/java/client/pom_debug.xml b/other/java/client/pom_debug.xml
index 078778b6c..1447401b7 100644
--- a/other/java/client/pom_debug.xml
+++ b/other/java/client/pom_debug.xml
@@ -5,7 +5,7 @@
<groupId>com.github.chrislusf</groupId>
<artifactId>seaweedfs-client</artifactId>
- <version>1.5.6</version>
+ <version>1.6.1</version>
<parent>
<groupId>org.sonatype.oss</groupId>
diff --git a/other/java/client/src/main/java/seaweedfs/client/FileChunkManifest.java b/other/java/client/src/main/java/seaweedfs/client/FileChunkManifest.java
index 3293db2ca..9b6ba5dfc 100644
--- a/other/java/client/src/main/java/seaweedfs/client/FileChunkManifest.java
+++ b/other/java/client/src/main/java/seaweedfs/client/FileChunkManifest.java
@@ -23,7 +23,7 @@ public class FileChunkManifest {
}
public static List<FilerProto.FileChunk> resolveChunkManifest(
- final FilerGrpcClient filerGrpcClient, List<FilerProto.FileChunk> chunks) throws IOException {
+ final FilerClient filerClient, List<FilerProto.FileChunk> chunks) throws IOException {
List<FilerProto.FileChunk> dataChunks = new ArrayList<>();
@@ -35,30 +35,30 @@ public class FileChunkManifest {
// IsChunkManifest
LOG.debug("fetching chunk manifest:{}", chunk);
- byte[] data = fetchChunk(filerGrpcClient, chunk);
+ byte[] data = fetchChunk(filerClient, chunk);
FilerProto.FileChunkManifest m = FilerProto.FileChunkManifest.newBuilder().mergeFrom(data).build();
List<FilerProto.FileChunk> resolvedChunks = new ArrayList<>();
for (FilerProto.FileChunk t : m.getChunksList()) {
// avoid deprecated chunk.getFileId()
resolvedChunks.add(t.toBuilder().setFileId(FilerClient.toFileId(t.getFid())).build());
}
- dataChunks.addAll(resolveChunkManifest(filerGrpcClient, resolvedChunks));
+ dataChunks.addAll(resolveChunkManifest(filerClient, resolvedChunks));
}
return dataChunks;
}
- private static byte[] fetchChunk(final FilerGrpcClient filerGrpcClient, FilerProto.FileChunk chunk) throws IOException {
+ private static byte[] fetchChunk(final FilerClient filerClient, FilerProto.FileChunk chunk) throws IOException {
String vid = "" + chunk.getFid().getVolumeId();
- FilerProto.Locations locations = filerGrpcClient.vidLocations.get(vid);
+ FilerProto.Locations locations = filerClient.vidLocations.get(vid);
if (locations == null) {
FilerProto.LookupVolumeRequest.Builder lookupRequest = FilerProto.LookupVolumeRequest.newBuilder();
lookupRequest.addVolumeIds(vid);
- FilerProto.LookupVolumeResponse lookupResponse = filerGrpcClient
+ FilerProto.LookupVolumeResponse lookupResponse = filerClient
.getBlockingStub().lookupVolume(lookupRequest.build());
locations = lookupResponse.getLocationsMapMap().get(vid);
- filerGrpcClient.vidLocations.put(vid, locations);
+ filerClient.vidLocations.put(vid, locations);
LOG.debug("fetchChunk vid:{} locations:{}", vid, locations);
}
@@ -74,7 +74,7 @@ public class FileChunkManifest {
byte[] chunkData = SeaweedRead.chunkCache.getChunk(chunkView.fileId);
if (chunkData == null) {
LOG.debug("doFetchFullChunkData:{}", chunkView);
- chunkData = SeaweedRead.doFetchFullChunkData(chunkView, locations);
+ chunkData = SeaweedRead.doFetchFullChunkData(filerClient, chunkView, locations);
}
if (chunk.getIsChunkManifest()){
LOG.debug("chunk {} size {}", chunkView.fileId, chunkData.length);
@@ -86,7 +86,7 @@ public class FileChunkManifest {
}
public static List<FilerProto.FileChunk> maybeManifestize(
- final FilerGrpcClient filerGrpcClient, List<FilerProto.FileChunk> inputChunks, String parentDirectory) throws IOException {
+ final FilerClient filerClient, List<FilerProto.FileChunk> inputChunks, String parentDirectory) throws IOException {
// the return variable
List<FilerProto.FileChunk> chunks = new ArrayList<>();
@@ -101,7 +101,7 @@ public class FileChunkManifest {
int remaining = dataChunks.size();
for (int i = 0; i + mergeFactor < dataChunks.size(); i += mergeFactor) {
- FilerProto.FileChunk chunk = mergeIntoManifest(filerGrpcClient, dataChunks.subList(i, i + mergeFactor), parentDirectory);
+ FilerProto.FileChunk chunk = mergeIntoManifest(filerClient, dataChunks.subList(i, i + mergeFactor), parentDirectory);
chunks.add(chunk);
remaining -= mergeFactor;
}
@@ -113,7 +113,7 @@ public class FileChunkManifest {
return chunks;
}
- private static FilerProto.FileChunk mergeIntoManifest(final FilerGrpcClient filerGrpcClient, List<FilerProto.FileChunk> dataChunks, String parentDirectory) throws IOException {
+ private static FilerProto.FileChunk mergeIntoManifest(final FilerClient filerClient, List<FilerProto.FileChunk> dataChunks, String parentDirectory) throws IOException {
// create and serialize the manifest
dataChunks = FilerClient.beforeEntrySerialization(dataChunks);
FilerProto.FileChunkManifest.Builder m = FilerProto.FileChunkManifest.newBuilder().addAllChunks(dataChunks);
@@ -127,8 +127,8 @@ public class FileChunkManifest {
}
FilerProto.FileChunk.Builder manifestChunk = SeaweedWrite.writeChunk(
- filerGrpcClient.getReplication(),
- filerGrpcClient,
+ filerClient.getReplication(),
+ filerClient,
minOffset,
data, 0, data.length, parentDirectory);
manifestChunk.setIsChunkManifest(true);
diff --git a/other/java/client/src/main/java/seaweedfs/client/FilerClient.java b/other/java/client/src/main/java/seaweedfs/client/FilerClient.java
index 7338d5bee..58269d41f 100644
--- a/other/java/client/src/main/java/seaweedfs/client/FilerClient.java
+++ b/other/java/client/src/main/java/seaweedfs/client/FilerClient.java
@@ -11,18 +11,12 @@ import java.util.Arrays;
import java.util.Iterator;
import java.util.List;
-public class FilerClient {
+public class FilerClient extends FilerGrpcClient {
private static final Logger LOG = LoggerFactory.getLogger(FilerClient.class);
- private final FilerGrpcClient filerGrpcClient;
-
public FilerClient(String host, int grpcPort) {
- filerGrpcClient = new FilerGrpcClient(host, grpcPort);
- }
-
- public FilerClient(FilerGrpcClient filerGrpcClient) {
- this.filerGrpcClient = filerGrpcClient;
+ super(host, grpcPort);
}
public static String toFileId(FilerProto.FileId fid) {
@@ -236,7 +230,7 @@ public class FilerClient {
}
public List<FilerProto.Entry> listEntries(String path, String entryPrefix, String lastEntryName, int limit, boolean includeLastEntry) {
- Iterator<FilerProto.ListEntriesResponse> iter = filerGrpcClient.getBlockingStub().listEntries(FilerProto.ListEntriesRequest.newBuilder()
+ Iterator<FilerProto.ListEntriesResponse> iter = this.getBlockingStub().listEntries(FilerProto.ListEntriesRequest.newBuilder()
.setDirectory(path)
.setPrefix(entryPrefix)
.setStartFromFileName(lastEntryName)
@@ -253,7 +247,7 @@ public class FilerClient {
public FilerProto.Entry lookupEntry(String directory, String entryName) {
try {
- FilerProto.Entry entry = filerGrpcClient.getBlockingStub().lookupDirectoryEntry(
+ FilerProto.Entry entry = this.getBlockingStub().lookupDirectoryEntry(
FilerProto.LookupDirectoryEntryRequest.newBuilder()
.setDirectory(directory)
.setName(entryName)
@@ -274,7 +268,7 @@ public class FilerClient {
public boolean createEntry(String parent, FilerProto.Entry entry) {
try {
FilerProto.CreateEntryResponse createEntryResponse =
- filerGrpcClient.getBlockingStub().createEntry(FilerProto.CreateEntryRequest.newBuilder()
+ this.getBlockingStub().createEntry(FilerProto.CreateEntryRequest.newBuilder()
.setDirectory(parent)
.setEntry(entry)
.build());
@@ -291,7 +285,7 @@ public class FilerClient {
public boolean updateEntry(String parent, FilerProto.Entry entry) {
try {
- filerGrpcClient.getBlockingStub().updateEntry(FilerProto.UpdateEntryRequest.newBuilder()
+ this.getBlockingStub().updateEntry(FilerProto.UpdateEntryRequest.newBuilder()
.setDirectory(parent)
.setEntry(entry)
.build());
@@ -304,7 +298,7 @@ public class FilerClient {
public boolean deleteEntry(String parent, String entryName, boolean isDeleteFileChunk, boolean isRecursive, boolean ignoreRecusiveError) {
try {
- filerGrpcClient.getBlockingStub().deleteEntry(FilerProto.DeleteEntryRequest.newBuilder()
+ this.getBlockingStub().deleteEntry(FilerProto.DeleteEntryRequest.newBuilder()
.setDirectory(parent)
.setName(entryName)
.setIsDeleteData(isDeleteFileChunk)
@@ -320,7 +314,7 @@ public class FilerClient {
public boolean atomicRenameEntry(String oldParent, String oldName, String newParent, String newName) {
try {
- filerGrpcClient.getBlockingStub().atomicRenameEntry(FilerProto.AtomicRenameEntryRequest.newBuilder()
+ this.getBlockingStub().atomicRenameEntry(FilerProto.AtomicRenameEntryRequest.newBuilder()
.setOldDirectory(oldParent)
.setOldName(oldName)
.setNewDirectory(newParent)
@@ -334,7 +328,7 @@ public class FilerClient {
}
public Iterator<FilerProto.SubscribeMetadataResponse> watch(String prefix, String clientName, long sinceNs) {
- return filerGrpcClient.getBlockingStub().subscribeMetadata(FilerProto.SubscribeMetadataRequest.newBuilder()
+ return this.getBlockingStub().subscribeMetadata(FilerProto.SubscribeMetadataRequest.newBuilder()
.setPathPrefix(prefix)
.setClientName(clientName)
.setSinceNs(sinceNs)
diff --git a/other/java/client/src/main/java/seaweedfs/client/FilerGrpcClient.java b/other/java/client/src/main/java/seaweedfs/client/FilerGrpcClient.java
index 1a719f3c0..6c57e2e0d 100644
--- a/other/java/client/src/main/java/seaweedfs/client/FilerGrpcClient.java
+++ b/other/java/client/src/main/java/seaweedfs/client/FilerGrpcClient.java
@@ -9,8 +9,8 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import javax.net.ssl.SSLException;
-import java.util.Map;
import java.util.HashMap;
+import java.util.Map;
import java.util.concurrent.TimeUnit;
public class FilerGrpcClient {
@@ -26,6 +26,9 @@ public class FilerGrpcClient {
}
}
+ public final int VOLUME_SERVER_ACCESS_DIRECT = 0;
+ public final int VOLUME_SERVER_ACCESS_PUBLIC_URL = 1;
+ public final int VOLUME_SERVER_ACCESS_FILER_PROXY = 2;
public final Map<String, FilerProto.Locations> vidLocations = new HashMap<>();
private final ManagedChannel channel;
private final SeaweedFilerGrpc.SeaweedFilerBlockingStub blockingStub;
@@ -34,6 +37,8 @@ public class FilerGrpcClient {
private boolean cipher = false;
private String collection = "";
private String replication = "";
+ private int volumeServerAccess = VOLUME_SERVER_ACCESS_DIRECT;
+ private String filerAddress;
public FilerGrpcClient(String host, int grpcPort) {
this(host, grpcPort, sslContext);
@@ -49,6 +54,8 @@ public class FilerGrpcClient {
.negotiationType(NegotiationType.TLS)
.sslContext(sslContext));
+ filerAddress = String.format("%s:%d", host, grpcPort - 10000);
+
FilerProto.GetFilerConfigurationResponse filerConfigurationResponse =
this.getBlockingStub().getFilerConfiguration(
FilerProto.GetFilerConfigurationRequest.newBuilder().build());
@@ -58,7 +65,7 @@ public class FilerGrpcClient {
}
- public FilerGrpcClient(ManagedChannelBuilder<?> channelBuilder) {
+ private FilerGrpcClient(ManagedChannelBuilder<?> channelBuilder) {
channel = channelBuilder.build();
blockingStub = SeaweedFilerGrpc.newBlockingStub(channel);
asyncStub = SeaweedFilerGrpc.newStub(channel);
@@ -93,4 +100,39 @@ public class FilerGrpcClient {
return futureStub;
}
+ public void setAccessVolumeServerDirectly() {
+ this.volumeServerAccess = VOLUME_SERVER_ACCESS_DIRECT;
+ }
+
+ public boolean isAccessVolumeServerDirectly() {
+ return this.volumeServerAccess == VOLUME_SERVER_ACCESS_DIRECT;
+ }
+
+ public void setAccessVolumeServerByPublicUrl() {
+ this.volumeServerAccess = VOLUME_SERVER_ACCESS_PUBLIC_URL;
+ }
+
+ public boolean isAccessVolumeServerByPublicUrl() {
+ return this.volumeServerAccess == VOLUME_SERVER_ACCESS_PUBLIC_URL;
+ }
+
+ public void setAccessVolumeServerByFilerProxy() {
+ this.volumeServerAccess = VOLUME_SERVER_ACCESS_FILER_PROXY;
+ }
+
+ public boolean isAccessVolumeServerByFilerProxy() {
+ return this.volumeServerAccess == VOLUME_SERVER_ACCESS_FILER_PROXY;
+ }
+
+ public String getChunkUrl(String chunkId, String url, String publicUrl) {
+ switch (this.volumeServerAccess) {
+ case VOLUME_SERVER_ACCESS_PUBLIC_URL:
+ return String.format("http://%s/%s", publicUrl, chunkId);
+ case VOLUME_SERVER_ACCESS_FILER_PROXY:
+ return String.format("http://%s/?proxyChunkId=%s", this.filerAddress, chunkId);
+ default:
+ return String.format("http://%s/%s", url, chunkId);
+ }
+ }
+
}
diff --git a/other/java/hdfs3/src/main/java/seaweed/hdfs/SeaweedInputStream.java b/other/java/client/src/main/java/seaweedfs/client/SeaweedInputStream.java
index 690366849..4e40ce1b6 100644
--- a/other/java/hdfs3/src/main/java/seaweed/hdfs/SeaweedInputStream.java
+++ b/other/java/client/src/main/java/seaweedfs/client/SeaweedInputStream.java
@@ -1,28 +1,22 @@
-package seaweed.hdfs;
+package seaweedfs.client;
// based on org.apache.hadoop.fs.azurebfs.services.AbfsInputStream
-import org.apache.hadoop.fs.ByteBufferReadable;
-import org.apache.hadoop.fs.FSExceptionMessages;
-import org.apache.hadoop.fs.FSInputStream;
-import org.apache.hadoop.fs.FileSystem.Statistics;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import seaweedfs.client.FilerGrpcClient;
-import seaweedfs.client.FilerProto;
-import seaweedfs.client.SeaweedRead;
import java.io.EOFException;
import java.io.IOException;
+import java.io.InputStream;
import java.nio.ByteBuffer;
import java.util.List;
-public class SeaweedInputStream extends FSInputStream implements ByteBufferReadable {
+public class SeaweedInputStream extends InputStream {
private static final Logger LOG = LoggerFactory.getLogger(SeaweedInputStream.class);
+ private static final IOException EXCEPTION_STREAM_IS_CLOSED = new IOException("Stream is closed!");
- private final FilerGrpcClient filerGrpcClient;
- private final Statistics statistics;
+ private final FilerClient filerClient;
private final String path;
private final FilerProto.Entry entry;
private final List<SeaweedRead.VisibleInterval> visibleIntervalList;
@@ -33,17 +27,31 @@ public class SeaweedInputStream extends FSInputStream implements ByteBufferReada
private boolean closed = false;
public SeaweedInputStream(
- final FilerGrpcClient filerGrpcClient,
- final Statistics statistics,
+ final FilerClient filerClient,
+ final String fullpath) throws IOException {
+ this.path = fullpath;
+ this.filerClient = filerClient;
+ this.entry = filerClient.lookupEntry(
+ SeaweedOutputStream.getParentDirectory(fullpath),
+ SeaweedOutputStream.getFileName(fullpath));
+ this.contentLength = SeaweedRead.fileSize(entry);
+
+ this.visibleIntervalList = SeaweedRead.nonOverlappingVisibleIntervals(filerClient, entry.getChunksList());
+
+ LOG.debug("new path:{} entry:{} visibleIntervalList:{}", path, entry, visibleIntervalList);
+
+ }
+
+ public SeaweedInputStream(
+ final FilerClient filerClient,
final String path,
final FilerProto.Entry entry) throws IOException {
- this.filerGrpcClient = filerGrpcClient;
- this.statistics = statistics;
+ this.filerClient = filerClient;
this.path = path;
this.entry = entry;
this.contentLength = SeaweedRead.fileSize(entry);
- this.visibleIntervalList = SeaweedRead.nonOverlappingVisibleIntervals(filerGrpcClient, entry.getChunksList());
+ this.visibleIntervalList = SeaweedRead.nonOverlappingVisibleIntervals(filerClient, entry.getChunksList());
LOG.debug("new path:{} entry:{} visibleIntervalList:{}", path, entry, visibleIntervalList);
@@ -86,7 +94,6 @@ public class SeaweedInputStream extends FSInputStream implements ByteBufferReada
}
// implement ByteBufferReadable
- @Override
public synchronized int read(ByteBuffer buf) throws IOException {
if (position < 0) {
@@ -102,7 +109,7 @@ public class SeaweedInputStream extends FSInputStream implements ByteBufferReada
if (start+len <= entry.getContent().size()) {
entry.getContent().substring(start, start+len).copyTo(buf);
} else {
- bytesRead = SeaweedRead.read(this.filerGrpcClient, this.visibleIntervalList, this.position, buf, SeaweedRead.fileSize(entry));
+ bytesRead = SeaweedRead.read(this.filerClient, this.visibleIntervalList, this.position, buf, SeaweedRead.fileSize(entry));
}
if (bytesRead > Integer.MAX_VALUE) {
@@ -111,45 +118,32 @@ public class SeaweedInputStream extends FSInputStream implements ByteBufferReada
if (bytesRead > 0) {
this.position += bytesRead;
- if (statistics != null) {
- statistics.incrementBytesRead(bytesRead);
- }
}
return (int) bytesRead;
}
- /**
- * Seek to given position in stream.
- *
- * @param n position to seek to
- * @throws IOException if there is an error
- * @throws EOFException if attempting to seek past end of file
- */
- @Override
public synchronized void seek(long n) throws IOException {
if (closed) {
- throw new IOException(FSExceptionMessages.STREAM_IS_CLOSED);
+ throw EXCEPTION_STREAM_IS_CLOSED;
}
if (n < 0) {
- throw new EOFException(FSExceptionMessages.NEGATIVE_SEEK);
+ throw new EOFException("Cannot seek to a negative offset");
}
if (n > contentLength) {
- throw new EOFException(FSExceptionMessages.CANNOT_SEEK_PAST_EOF);
+ throw new EOFException("Attempted to seek or read past the end of the file");
}
-
this.position = n;
-
}
@Override
public synchronized long skip(long n) throws IOException {
if (closed) {
- throw new IOException(FSExceptionMessages.STREAM_IS_CLOSED);
+ throw EXCEPTION_STREAM_IS_CLOSED;
}
if (this.position == contentLength) {
if (n > 0) {
- throw new EOFException(FSExceptionMessages.CANNOT_SEEK_PAST_EOF);
+ throw new EOFException("Attempted to seek or read past the end of the file");
}
}
long newPos = this.position + n;
@@ -177,10 +171,9 @@ public class SeaweedInputStream extends FSInputStream implements ByteBufferReada
@Override
public synchronized int available() throws IOException {
if (closed) {
- throw new IOException(
- FSExceptionMessages.STREAM_IS_CLOSED);
+ throw EXCEPTION_STREAM_IS_CLOSED;
}
- final long remaining = this.contentLength - this.getPos();
+ final long remaining = this.contentLength - this.position;
return remaining <= Integer.MAX_VALUE
? (int) remaining : Integer.MAX_VALUE;
}
@@ -195,65 +188,21 @@ public class SeaweedInputStream extends FSInputStream implements ByteBufferReada
*/
public long length() throws IOException {
if (closed) {
- throw new IOException(FSExceptionMessages.STREAM_IS_CLOSED);
+ throw EXCEPTION_STREAM_IS_CLOSED;
}
return contentLength;
}
- /**
- * Return the current offset from the start of the file
- *
- * @throws IOException throws {@link IOException} if there is an error
- */
- @Override
public synchronized long getPos() throws IOException {
if (closed) {
- throw new IOException(FSExceptionMessages.STREAM_IS_CLOSED);
+ throw EXCEPTION_STREAM_IS_CLOSED;
}
return position;
}
- /**
- * Seeks a different copy of the data. Returns true if
- * found a new source, false otherwise.
- *
- * @throws IOException throws {@link IOException} if there is an error
- */
- @Override
- public boolean seekToNewSource(long l) throws IOException {
- return false;
- }
-
@Override
public synchronized void close() throws IOException {
closed = true;
}
- /**
- * Not supported by this stream. Throws {@link UnsupportedOperationException}
- *
- * @param readlimit ignored
- */
- @Override
- public synchronized void mark(int readlimit) {
- throw new UnsupportedOperationException("mark()/reset() not supported on this stream");
- }
-
- /**
- * Not supported by this stream. Throws {@link UnsupportedOperationException}
- */
- @Override
- public synchronized void reset() throws IOException {
- throw new UnsupportedOperationException("mark()/reset() not supported on this stream");
- }
-
- /**
- * gets whether mark and reset are supported by {@code ADLFileInputStream}. Always returns false.
- *
- * @return always {@code false}
- */
- @Override
- public boolean markSupported() {
- return false;
- }
}
diff --git a/other/java/hdfs2/src/main/java/seaweed/hdfs/SeaweedOutputStream.java b/other/java/client/src/main/java/seaweedfs/client/SeaweedOutputStream.java
index 26290c46c..b73e99e69 100644
--- a/other/java/hdfs2/src/main/java/seaweed/hdfs/SeaweedOutputStream.java
+++ b/other/java/client/src/main/java/seaweedfs/client/SeaweedOutputStream.java
@@ -1,16 +1,9 @@
-package seaweed.hdfs;
+package seaweedfs.client;
// adapted from org.apache.hadoop.fs.azurebfs.services.AbfsOutputStream
-import com.google.common.base.Preconditions;
-import org.apache.hadoop.fs.FSExceptionMessages;
-import org.apache.hadoop.fs.Path;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import seaweedfs.client.ByteBufferPool;
-import seaweedfs.client.FilerGrpcClient;
-import seaweedfs.client.FilerProto;
-import seaweedfs.client.SeaweedWrite;
import java.io.IOException;
import java.io.InterruptedIOException;
@@ -18,21 +11,19 @@ import java.io.OutputStream;
import java.nio.ByteBuffer;
import java.util.concurrent.*;
-import static seaweed.hdfs.SeaweedFileSystemStore.getParentDirectory;
-
public class SeaweedOutputStream extends OutputStream {
private static final Logger LOG = LoggerFactory.getLogger(SeaweedOutputStream.class);
-
- private final FilerGrpcClient filerGrpcClient;
- private final Path path;
+ protected final boolean supportFlush = true;
+ private final FilerClient filerClient;
+ private final String path;
private final int bufferSize;
private final int maxConcurrentRequestCount;
private final ThreadPoolExecutor threadExecutor;
private final ExecutorCompletionService<Void> completionService;
- private final FilerProto.Entry.Builder entry;
- private final boolean supportFlush = false; // true;
private final ConcurrentLinkedDeque<WriteOperation> writeOperations;
+ private final boolean shouldSaveMetadata = false;
+ private FilerProto.Entry.Builder entry;
private long position;
private boolean closed;
private volatile IOException lastError;
@@ -42,9 +33,17 @@ public class SeaweedOutputStream extends OutputStream {
private long outputIndex;
private String replication = "000";
- public SeaweedOutputStream(FilerGrpcClient filerGrpcClient, final Path path, FilerProto.Entry.Builder entry,
+ public SeaweedOutputStream(FilerClient filerClient, final String fullpath) {
+ this(filerClient, fullpath, "000");
+ }
+
+ public SeaweedOutputStream(FilerClient filerClient, final String fullpath, final String replication) {
+ this(filerClient, fullpath, null, 0, 8 * 1024 * 1024, "000");
+ }
+
+ public SeaweedOutputStream(FilerClient filerClient, final String path, FilerProto.Entry.Builder entry,
final long position, final int bufferSize, final String replication) {
- this.filerGrpcClient = filerGrpcClient;
+ this.filerClient = filerClient;
this.replication = replication;
this.path = path;
this.position = position;
@@ -67,12 +66,50 @@ public class SeaweedOutputStream extends OutputStream {
this.completionService = new ExecutorCompletionService<>(this.threadExecutor);
this.entry = entry;
+ if (this.entry == null) {
+ long now = System.currentTimeMillis() / 1000L;
+
+ this.entry = FilerProto.Entry.newBuilder()
+ .setName(getFileName(path))
+ .setIsDirectory(false)
+ .setAttributes(FilerProto.FuseAttributes.newBuilder()
+ .setFileMode(0755)
+ .setReplication(replication)
+ .setCrtime(now)
+ .setMtime(now)
+ .clearGroupName()
+ );
+ }
}
+ public static String getParentDirectory(String path) {
+ int protoIndex = path.indexOf("://");
+ if (protoIndex >= 0) {
+ int pathStart = path.indexOf("/", protoIndex+3);
+ path = path.substring(pathStart);
+ }
+ if (path.equals("/")) {
+ return path;
+ }
+ int lastSlashIndex = path.lastIndexOf("/");
+ if (lastSlashIndex == 0) {
+ return "/";
+ }
+ return path.substring(0, lastSlashIndex);
+ }
+
+ public static String getFileName(String path) {
+ if (path.indexOf("/") < 0) {
+ return path;
+ }
+ int lastSlashIndex = path.lastIndexOf("/");
+ return path.substring(lastSlashIndex + 1);
+ }
+
private synchronized void flushWrittenBytesToServiceInternal(final long offset) throws IOException {
try {
- SeaweedWrite.writeMeta(filerGrpcClient, getParentDirectory(path), entry);
+ SeaweedWrite.writeMeta(filerClient, getParentDirectory(path), entry);
} catch (Exception ex) {
throw new IOException(ex);
}
@@ -89,7 +126,9 @@ public class SeaweedOutputStream extends OutputStream {
throws IOException {
maybeThrowLastError();
- Preconditions.checkArgument(data != null, "null data");
+ if (data == null) {
+ return;
+ }
if (off < 0 || length < 0 || length > data.length - off) {
throw new IndexOutOfBoundsException();
@@ -152,7 +191,7 @@ public class SeaweedOutputStream extends OutputStream {
flushInternal();
threadExecutor.shutdown();
} finally {
- lastError = new IOException(FSExceptionMessages.STREAM_IS_CLOSED);
+ lastError = new IOException("Stream is closed!");
ByteBufferPool.release(buffer);
buffer = null;
outputIndex = 0;
@@ -162,6 +201,7 @@ public class SeaweedOutputStream extends OutputStream {
threadExecutor.shutdownNow();
}
}
+
}
private synchronized void writeCurrentBufferToService() throws IOException {
@@ -185,7 +225,7 @@ public class SeaweedOutputStream extends OutputStream {
}
final Future<Void> job = completionService.submit(() -> {
// System.out.println(path + " is going to save [" + (writePosition) + "," + ((writePosition) + bytesLength) + ")");
- SeaweedWrite.writeData(entry, replication, filerGrpcClient, writePosition, bufferToWrite.array(), bufferToWrite.position(), bufferToWrite.limit(), path.toUri().getPath());
+ SeaweedWrite.writeData(entry, replication, filerClient, writePosition, bufferToWrite.array(), bufferToWrite.position(), bufferToWrite.limit(), path);
// System.out.println(path + " saved [" + (writePosition) + "," + ((writePosition) + bytesLength) + ")");
ByteBufferPool.release(bufferToWrite);
return null;
@@ -239,13 +279,13 @@ public class SeaweedOutputStream extends OutputStream {
}
}
- private synchronized void flushInternal() throws IOException {
+ protected synchronized void flushInternal() throws IOException {
maybeThrowLastError();
writeCurrentBufferToService();
flushWrittenBytesToService();
}
- private synchronized void flushInternalAsync() throws IOException {
+ protected synchronized void flushInternalAsync() throws IOException {
maybeThrowLastError();
writeCurrentBufferToService();
flushWrittenBytesToServiceAsync();
@@ -278,10 +318,6 @@ public class SeaweedOutputStream extends OutputStream {
private final long length;
WriteOperation(final Future<Void> task, final long startOffset, final long length) {
- Preconditions.checkNotNull(task, "task");
- Preconditions.checkArgument(startOffset >= 0, "startOffset");
- Preconditions.checkArgument(length >= 0, "length");
-
this.task = task;
this.startOffset = startOffset;
this.length = length;
diff --git a/other/java/client/src/main/java/seaweedfs/client/SeaweedRead.java b/other/java/client/src/main/java/seaweedfs/client/SeaweedRead.java
index c45987bed..384636601 100644
--- a/other/java/client/src/main/java/seaweedfs/client/SeaweedRead.java
+++ b/other/java/client/src/main/java/seaweedfs/client/SeaweedRead.java
@@ -23,7 +23,7 @@ public class SeaweedRead {
static VolumeIdCache volumeIdCache = new VolumeIdCache(4 * 1024);
// returns bytesRead
- public static long read(FilerGrpcClient filerGrpcClient, List<VisibleInterval> visibleIntervals,
+ public static long read(FilerClient filerClient, List<VisibleInterval> visibleIntervals,
final long position, final ByteBuffer buf, final long fileSize) throws IOException {
List<ChunkView> chunkViews = viewFromVisibles(visibleIntervals, position, buf.remaining());
@@ -42,7 +42,7 @@ public class SeaweedRead {
}
if (lookupRequest.getVolumeIdsCount() > 0) {
- FilerProto.LookupVolumeResponse lookupResponse = filerGrpcClient
+ FilerProto.LookupVolumeResponse lookupResponse = filerClient
.getBlockingStub().lookupVolume(lookupRequest.build());
Map<String, FilerProto.Locations> vid2Locations = lookupResponse.getLocationsMapMap();
for (Map.Entry<String, FilerProto.Locations> entry : vid2Locations.entrySet()) {
@@ -71,7 +71,7 @@ public class SeaweedRead {
return 0;
}
- int len = readChunkView(startOffset, buf, chunkView, locations);
+ int len = readChunkView(filerClient, startOffset, buf, chunkView, locations);
LOG.debug("read [{},{}) {} size {}", startOffset, startOffset + len, chunkView.fileId, chunkView.size);
@@ -93,12 +93,12 @@ public class SeaweedRead {
return readCount;
}
- private static int readChunkView(long startOffset, ByteBuffer buf, ChunkView chunkView, FilerProto.Locations locations) throws IOException {
+ private static int readChunkView(FilerClient filerClient, long startOffset, ByteBuffer buf, ChunkView chunkView, FilerProto.Locations locations) throws IOException {
byte[] chunkData = chunkCache.getChunk(chunkView.fileId);
if (chunkData == null) {
- chunkData = doFetchFullChunkData(chunkView, locations);
+ chunkData = doFetchFullChunkData(filerClient, chunkView, locations);
chunkCache.setChunk(chunkView.fileId, chunkData);
}
@@ -110,13 +110,13 @@ public class SeaweedRead {
return len;
}
- public static byte[] doFetchFullChunkData(ChunkView chunkView, FilerProto.Locations locations) throws IOException {
+ public static byte[] doFetchFullChunkData(FilerClient filerClient, ChunkView chunkView, FilerProto.Locations locations) throws IOException {
byte[] data = null;
IOException lastException = null;
for (long waitTime = 1000L; waitTime < 10 * 1000; waitTime += waitTime / 2) {
for (FilerProto.Location location : locations.getLocationsList()) {
- String url = String.format("http://%s/%s", location.getUrl(), chunkView.fileId);
+ String url = filerClient.getChunkUrl(chunkView.fileId, location.getUrl(), location.getPublicUrl());
try {
data = doFetchOneFullChunkData(chunkView, url);
lastException = null;
@@ -145,7 +145,7 @@ public class SeaweedRead {
}
- public static byte[] doFetchOneFullChunkData(ChunkView chunkView, String url) throws IOException {
+ private static byte[] doFetchOneFullChunkData(ChunkView chunkView, String url) throws IOException {
HttpGet request = new HttpGet(url);
@@ -221,9 +221,9 @@ public class SeaweedRead {
}
public static List<VisibleInterval> nonOverlappingVisibleIntervals(
- final FilerGrpcClient filerGrpcClient, List<FilerProto.FileChunk> chunkList) throws IOException {
+ final FilerClient filerClient, List<FilerProto.FileChunk> chunkList) throws IOException {
- chunkList = FileChunkManifest.resolveChunkManifest(filerGrpcClient, chunkList);
+ chunkList = FileChunkManifest.resolveChunkManifest(filerClient, chunkList);
FilerProto.FileChunk[] chunks = chunkList.toArray(new FilerProto.FileChunk[0]);
Arrays.sort(chunks, new Comparator<FilerProto.FileChunk>() {
diff --git a/other/java/client/src/main/java/seaweedfs/client/SeaweedWrite.java b/other/java/client/src/main/java/seaweedfs/client/SeaweedWrite.java
index b8fd3e299..f8c0c76b6 100644
--- a/other/java/client/src/main/java/seaweedfs/client/SeaweedWrite.java
+++ b/other/java/client/src/main/java/seaweedfs/client/SeaweedWrite.java
@@ -23,41 +23,41 @@ public class SeaweedWrite {
public static void writeData(FilerProto.Entry.Builder entry,
final String replication,
- final FilerGrpcClient filerGrpcClient,
+ final FilerClient filerClient,
final long offset,
final byte[] bytes,
final long bytesOffset, final long bytesLength,
final String path) throws IOException {
FilerProto.FileChunk.Builder chunkBuilder = writeChunk(
- replication, filerGrpcClient, offset, bytes, bytesOffset, bytesLength, path);
+ replication, filerClient, offset, bytes, bytesOffset, bytesLength, path);
synchronized (entry) {
entry.addChunks(chunkBuilder);
}
}
public static FilerProto.FileChunk.Builder writeChunk(final String replication,
- final FilerGrpcClient filerGrpcClient,
+ final FilerClient filerClient,
final long offset,
final byte[] bytes,
final long bytesOffset,
final long bytesLength,
final String path) throws IOException {
- FilerProto.AssignVolumeResponse response = filerGrpcClient.getBlockingStub().assignVolume(
+ FilerProto.AssignVolumeResponse response = filerClient.getBlockingStub().assignVolume(
FilerProto.AssignVolumeRequest.newBuilder()
- .setCollection(filerGrpcClient.getCollection())
- .setReplication(replication == null ? filerGrpcClient.getReplication() : replication)
+ .setCollection(filerClient.getCollection())
+ .setReplication(replication == null ? filerClient.getReplication() : replication)
.setDataCenter("")
.setTtlSec(0)
.setPath(path)
.build());
String fileId = response.getFileId();
- String url = response.getUrl();
String auth = response.getAuth();
- String targetUrl = String.format("http://%s/%s", url, fileId);
+
+ String targetUrl = filerClient.getChunkUrl(fileId, response.getUrl(), response.getPublicUrl());
ByteString cipherKeyString = com.google.protobuf.ByteString.EMPTY;
byte[] cipherKey = null;
- if (filerGrpcClient.isCipher()) {
+ if (filerClient.isCipher()) {
cipherKey = genCipherKey();
cipherKeyString = ByteString.copyFrom(cipherKey);
}
@@ -75,15 +75,15 @@ public class SeaweedWrite {
.setCipherKey(cipherKeyString);
}
- public static void writeMeta(final FilerGrpcClient filerGrpcClient,
+ public static void writeMeta(final FilerClient filerClient,
final String parentDirectory,
final FilerProto.Entry.Builder entry) throws IOException {
synchronized (entry) {
- List<FilerProto.FileChunk> chunks = FileChunkManifest.maybeManifestize(filerGrpcClient, entry.getChunksList(), parentDirectory);
+ List<FilerProto.FileChunk> chunks = FileChunkManifest.maybeManifestize(filerClient, entry.getChunksList(), parentDirectory);
entry.clearChunks();
entry.addAllChunks(chunks);
- filerGrpcClient.getBlockingStub().createEntry(
+ filerClient.getBlockingStub().createEntry(
FilerProto.CreateEntryRequest.newBuilder()
.setDirectory(parentDirectory)
.setEntry(entry)
diff --git a/other/java/examples/pom.xml b/other/java/examples/pom.xml
index f7c48d0ab..2456113d0 100644
--- a/other/java/examples/pom.xml
+++ b/other/java/examples/pom.xml
@@ -11,13 +11,13 @@
<dependency>
<groupId>com.github.chrislusf</groupId>
<artifactId>seaweedfs-client</artifactId>
- <version>1.5.6</version>
+ <version>1.6.1</version>
<scope>compile</scope>
</dependency>
<dependency>
<groupId>com.github.chrislusf</groupId>
<artifactId>seaweedfs-hadoop2-client</artifactId>
- <version>1.5.6</version>
+ <version>1.6.1</version>
<scope>compile</scope>
</dependency>
<dependency>
diff --git a/other/java/examples/src/main/java/com/seaweedfs/examples/UnzipFile.java b/other/java/examples/src/main/java/com/seaweedfs/examples/ExampleReadFile.java
index 0529a5c73..d2eb94135 100644
--- a/other/java/examples/src/main/java/com/seaweedfs/examples/UnzipFile.java
+++ b/other/java/examples/src/main/java/com/seaweedfs/examples/ExampleReadFile.java
@@ -1,8 +1,7 @@
package com.seaweedfs.examples;
-import seaweed.hdfs.SeaweedInputStream;
import seaweedfs.client.FilerClient;
-import seaweedfs.client.FilerGrpcClient;
+import seaweedfs.client.SeaweedInputStream;
import java.io.FileInputStream;
import java.io.IOException;
@@ -10,12 +9,11 @@ import java.io.InputStream;
import java.util.zip.ZipEntry;
import java.util.zip.ZipInputStream;
-public class UnzipFile {
+public class ExampleReadFile {
public static void main(String[] args) throws IOException {
- FilerGrpcClient filerGrpcClient = new FilerGrpcClient("localhost", 18888);
- FilerClient filerClient = new FilerClient(filerGrpcClient);
+ FilerClient filerClient = new FilerClient("localhost", 18888);
long startTime = System.currentTimeMillis();
parseZip("/Users/chris/tmp/test.zip");
@@ -25,11 +23,7 @@ public class UnzipFile {
long localProcessTime = startTime2 - startTime;
SeaweedInputStream seaweedInputStream = new SeaweedInputStream(
- filerGrpcClient,
- new org.apache.hadoop.fs.FileSystem.Statistics(""),
- "/",
- filerClient.lookupEntry("/", "test.zip")
- );
+ filerClient, "/test.zip");
parseZip(seaweedInputStream);
long swProcessTime = System.currentTimeMillis() - startTime2;
diff --git a/other/java/examples/src/main/java/com/seaweedfs/examples/WatchFiles.java b/other/java/examples/src/main/java/com/seaweedfs/examples/ExampleWatchFileChanges.java
index e489cb3b1..72c572d31 100644
--- a/other/java/examples/src/main/java/com/seaweedfs/examples/WatchFiles.java
+++ b/other/java/examples/src/main/java/com/seaweedfs/examples/ExampleWatchFileChanges.java
@@ -7,7 +7,7 @@ import java.io.IOException;
import java.util.Date;
import java.util.Iterator;
-public class WatchFiles {
+public class ExampleWatchFileChanges {
public static void main(String[] args) throws IOException {
diff --git a/other/java/examples/src/main/java/com/seaweedfs/examples/ExampleWriteFile.java b/other/java/examples/src/main/java/com/seaweedfs/examples/ExampleWriteFile.java
new file mode 100644
index 000000000..26b74028f
--- /dev/null
+++ b/other/java/examples/src/main/java/com/seaweedfs/examples/ExampleWriteFile.java
@@ -0,0 +1,47 @@
+package com.seaweedfs.examples;
+
+import seaweedfs.client.FilerClient;
+import seaweedfs.client.SeaweedInputStream;
+import seaweedfs.client.SeaweedOutputStream;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.zip.ZipEntry;
+import java.util.zip.ZipInputStream;
+
+public class ExampleWriteFile {
+
+ public static void main(String[] args) throws IOException {
+
+ FilerClient filerClient = new FilerClient("localhost", 18888);
+
+ SeaweedInputStream seaweedInputStream = new SeaweedInputStream(filerClient, "/test.zip");
+ unZipFiles(filerClient, seaweedInputStream);
+
+ }
+
+ public static void unZipFiles(FilerClient filerClient, InputStream is) throws IOException {
+ ZipInputStream zin = new ZipInputStream(is);
+ ZipEntry ze;
+ while ((ze = zin.getNextEntry()) != null) {
+
+ String filename = ze.getName();
+ if (filename.indexOf("/") >= 0) {
+ filename = filename.substring(filename.lastIndexOf("/") + 1);
+ }
+ if (filename.length()==0) {
+ continue;
+ }
+
+ SeaweedOutputStream seaweedOutputStream = new SeaweedOutputStream(filerClient, "/test/"+filename);
+ byte[] bytesIn = new byte[16 * 1024];
+ int read = 0;
+ while ((read = zin.read(bytesIn))!=-1) {
+ seaweedOutputStream.write(bytesIn,0,read);
+ }
+ seaweedOutputStream.close();
+
+ System.out.println(ze.getName());
+ }
+ }
+}
diff --git a/other/java/examples/src/main/java/com/seaweedfs/examples/HdfsCopyFile.java b/other/java/examples/src/main/java/com/seaweedfs/examples/HdfsCopyFile.java
new file mode 100644
index 000000000..006c581c9
--- /dev/null
+++ b/other/java/examples/src/main/java/com/seaweedfs/examples/HdfsCopyFile.java
@@ -0,0 +1,25 @@
+package com.seaweedfs.examples;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.IOUtils;
+
+import java.io.*;
+
+public class HdfsCopyFile {
+ public static void main(String[] args) throws IOException {
+ Configuration configuration = new Configuration();
+
+ configuration.set("fs.defaultFS", "seaweedfs://localhost:8888");
+ configuration.set("fs.seaweedfs.impl", "seaweed.hdfs.SeaweedFileSystem");
+
+ FileSystem fs = FileSystem.get(configuration);
+ String source = "/Users/chris/tmp/test.zip";
+ String destination = "/buckets/spark/test01.zip";
+ InputStream in = new BufferedInputStream(new FileInputStream(source));
+
+ OutputStream out = fs.create(new Path(destination));
+ IOUtils.copyBytes(in, out, 4096, true);
+ }
+}
diff --git a/other/java/hdfs2/dependency-reduced-pom.xml b/other/java/hdfs2/dependency-reduced-pom.xml
index f7873a435..0680d86bb 100644
--- a/other/java/hdfs2/dependency-reduced-pom.xml
+++ b/other/java/hdfs2/dependency-reduced-pom.xml
@@ -301,7 +301,7 @@
</snapshotRepository>
</distributionManagement>
<properties>
- <seaweedfs.client.version>1.5.6</seaweedfs.client.version>
+ <seaweedfs.client.version>1.6.1</seaweedfs.client.version>
<hadoop.version>2.9.2</hadoop.version>
</properties>
</project>
diff --git a/other/java/hdfs2/pom.xml b/other/java/hdfs2/pom.xml
index bda0eba56..897477066 100644
--- a/other/java/hdfs2/pom.xml
+++ b/other/java/hdfs2/pom.xml
@@ -5,7 +5,7 @@
<modelVersion>4.0.0</modelVersion>
<properties>
- <seaweedfs.client.version>1.5.6</seaweedfs.client.version>
+ <seaweedfs.client.version>1.6.1</seaweedfs.client.version>
<hadoop.version>2.9.2</hadoop.version>
</properties>
diff --git a/other/java/hdfs2/src/main/java/seaweed/hdfs/SeaweedFileSystem.java b/other/java/hdfs2/src/main/java/seaweed/hdfs/SeaweedFileSystem.java
index 84f11e846..25395db7a 100644
--- a/other/java/hdfs2/src/main/java/seaweed/hdfs/SeaweedFileSystem.java
+++ b/other/java/hdfs2/src/main/java/seaweed/hdfs/SeaweedFileSystem.java
@@ -26,6 +26,7 @@ public class SeaweedFileSystem extends FileSystem {
public static final int FS_SEAWEED_DEFAULT_PORT = 8888;
public static final String FS_SEAWEED_BUFFER_SIZE = "fs.seaweed.buffer.size";
public static final String FS_SEAWEED_REPLICATION = "fs.seaweed.replication";
+ public static final String FS_SEAWEED_VOLUME_SERVER_ACCESS = "fs.seaweed.volume.server.access";
public static final int FS_SEAWEED_DEFAULT_BUFFER_SIZE = 4 * 1024 * 1024;
private static final Logger LOG = LoggerFactory.getLogger(SeaweedFileSystem.class);
diff --git a/other/java/hdfs2/src/main/java/seaweed/hdfs/SeaweedFileSystemStore.java b/other/java/hdfs2/src/main/java/seaweed/hdfs/SeaweedFileSystemStore.java
index 14b32528e..f4e8c9349 100644
--- a/other/java/hdfs2/src/main/java/seaweed/hdfs/SeaweedFileSystemStore.java
+++ b/other/java/hdfs2/src/main/java/seaweed/hdfs/SeaweedFileSystemStore.java
@@ -18,27 +18,31 @@ import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
-import static seaweed.hdfs.SeaweedFileSystem.FS_SEAWEED_BUFFER_SIZE;
-import static seaweed.hdfs.SeaweedFileSystem.FS_SEAWEED_DEFAULT_BUFFER_SIZE;
+import static seaweed.hdfs.SeaweedFileSystem.*;
public class SeaweedFileSystemStore {
private static final Logger LOG = LoggerFactory.getLogger(SeaweedFileSystemStore.class);
- private FilerGrpcClient filerGrpcClient;
private FilerClient filerClient;
private Configuration conf;
public SeaweedFileSystemStore(String host, int port, Configuration conf) {
int grpcPort = 10000 + port;
- filerGrpcClient = new FilerGrpcClient(host, grpcPort);
- filerClient = new FilerClient(filerGrpcClient);
+ filerClient = new FilerClient(host, grpcPort);
this.conf = conf;
+ String volumeServerAccessMode = this.conf.get(FS_SEAWEED_VOLUME_SERVER_ACCESS, "direct");
+ if (volumeServerAccessMode.equals("publicUrl")) {
+ filerClient.setAccessVolumeServerByPublicUrl();
+ } else if (volumeServerAccessMode.equals("filerProxy")) {
+ filerClient.setAccessVolumeServerByFilerProxy();
+ }
+
}
public void close() {
try {
- this.filerGrpcClient.shutdown();
+ this.filerClient.shutdown();
} catch (InterruptedException e) {
e.printStackTrace();
}
@@ -213,10 +217,10 @@ public class SeaweedFileSystemStore {
.clearGroupName()
.addAllGroupName(Arrays.asList(userGroupInformation.getGroupNames()))
);
- SeaweedWrite.writeMeta(filerGrpcClient, getParentDirectory(path), entry);
+ SeaweedWrite.writeMeta(filerClient, getParentDirectory(path), entry);
}
- return new SeaweedOutputStream(filerGrpcClient, path, entry, writePosition, bufferSize, replication);
+ return new SeaweedHadoopOutputStream(filerClient, path.toString(), entry, writePosition, bufferSize, replication);
}
@@ -230,7 +234,7 @@ public class SeaweedFileSystemStore {
throw new FileNotFoundException("read non-exist file " + path);
}
- return new SeaweedInputStream(filerGrpcClient,
+ return new SeaweedHadoopInputStream(filerClient,
statistics,
path.toUri().getPath(),
entry);
diff --git a/other/java/hdfs2/src/main/java/seaweed/hdfs/SeaweedHadoopInputStream.java b/other/java/hdfs2/src/main/java/seaweed/hdfs/SeaweedHadoopInputStream.java
new file mode 100644
index 000000000..f26eae597
--- /dev/null
+++ b/other/java/hdfs2/src/main/java/seaweed/hdfs/SeaweedHadoopInputStream.java
@@ -0,0 +1,150 @@
+package seaweed.hdfs;
+
+// based on org.apache.hadoop.fs.azurebfs.services.AbfsInputStream
+
+import org.apache.hadoop.fs.ByteBufferReadable;
+import org.apache.hadoop.fs.FSInputStream;
+import org.apache.hadoop.fs.FileSystem.Statistics;
+import seaweedfs.client.FilerClient;
+import seaweedfs.client.FilerProto;
+import seaweedfs.client.SeaweedInputStream;
+
+import java.io.EOFException;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+
+public class SeaweedHadoopInputStream extends FSInputStream implements ByteBufferReadable {
+
+ private final SeaweedInputStream seaweedInputStream;
+ private final Statistics statistics;
+
+ public SeaweedHadoopInputStream(
+ final FilerClient filerClient,
+ final Statistics statistics,
+ final String path,
+ final FilerProto.Entry entry) throws IOException {
+ this.seaweedInputStream = new SeaweedInputStream(filerClient, path, entry);
+ this.statistics = statistics;
+ }
+
+ @Override
+ public int read() throws IOException {
+ return seaweedInputStream.read();
+ }
+
+ @Override
+ public int read(final byte[] b, final int off, final int len) throws IOException {
+ return seaweedInputStream.read(b, off, len);
+ }
+
+ // implement ByteBufferReadable
+ @Override
+ public synchronized int read(ByteBuffer buf) throws IOException {
+ int bytesRead = seaweedInputStream.read(buf);
+
+ if (bytesRead > 0) {
+ if (statistics != null) {
+ statistics.incrementBytesRead(bytesRead);
+ }
+ }
+
+ return bytesRead;
+ }
+
+ /**
+ * Seek to given position in stream.
+ *
+ * @param n position to seek to
+ * @throws IOException if there is an error
+ * @throws EOFException if attempting to seek past end of file
+ */
+ @Override
+ public synchronized void seek(long n) throws IOException {
+ seaweedInputStream.seek(n);
+ }
+
+ @Override
+ public synchronized long skip(long n) throws IOException {
+ return seaweedInputStream.skip(n);
+ }
+
+ /**
+ * Return the size of the remaining available bytes
+ * if the size is less than or equal to {@link Integer#MAX_VALUE},
+ * otherwise, return {@link Integer#MAX_VALUE}.
+ * <p>
+ * This is to match the behavior of DFSInputStream.available(),
+ * which some clients may rely on (HBase write-ahead log reading in
+ * particular).
+ */
+ @Override
+ public synchronized int available() throws IOException {
+ return seaweedInputStream.available();
+ }
+
+ /**
+ * Returns the length of the file that this stream refers to. Note that the length returned is the length
+ * as of the time the Stream was opened. Specifically, if there have been subsequent appends to the file,
+ * they wont be reflected in the returned length.
+ *
+ * @return length of the file.
+ * @throws IOException if the stream is closed
+ */
+ public long length() throws IOException {
+ return seaweedInputStream.length();
+ }
+
+ /**
+ * Return the current offset from the start of the file
+ *
+ * @throws IOException throws {@link IOException} if there is an error
+ */
+ @Override
+ public synchronized long getPos() throws IOException {
+ return seaweedInputStream.getPos();
+ }
+
+ /**
+ * Seeks a different copy of the data. Returns true if
+ * found a new source, false otherwise.
+ *
+ * @throws IOException throws {@link IOException} if there is an error
+ */
+ @Override
+ public boolean seekToNewSource(long l) throws IOException {
+ return false;
+ }
+
+ @Override
+ public synchronized void close() throws IOException {
+ seaweedInputStream.close();
+ }
+
+ /**
+ * Not supported by this stream. Throws {@link UnsupportedOperationException}
+ *
+ * @param readlimit ignored
+ */
+ @Override
+ public synchronized void mark(int readlimit) {
+ throw new UnsupportedOperationException("mark()/reset() not supported on this stream");
+ }
+
+ /**
+ * Not supported by this stream. Throws {@link UnsupportedOperationException}
+ */
+ @Override
+ public synchronized void reset() throws IOException {
+ throw new UnsupportedOperationException("mark()/reset() not supported on this stream");
+ }
+
+ /**
+ * gets whether mark and reset are supported by {@code ADLFileInputStream}. Always returns false.
+ *
+ * @return always {@code false}
+ */
+ @Override
+ public boolean markSupported() {
+ return false;
+ }
+}
diff --git a/other/java/hdfs2/src/main/java/seaweed/hdfs/SeaweedHadoopOutputStream.java b/other/java/hdfs2/src/main/java/seaweed/hdfs/SeaweedHadoopOutputStream.java
new file mode 100644
index 000000000..da5b56bbc
--- /dev/null
+++ b/other/java/hdfs2/src/main/java/seaweed/hdfs/SeaweedHadoopOutputStream.java
@@ -0,0 +1,16 @@
+package seaweed.hdfs;
+
+// adapted from org.apache.hadoop.fs.azurebfs.services.AbfsOutputStream
+
+import seaweedfs.client.FilerClient;
+import seaweedfs.client.FilerProto;
+import seaweedfs.client.SeaweedOutputStream;
+
+public class SeaweedHadoopOutputStream extends SeaweedOutputStream {
+
+ public SeaweedHadoopOutputStream(FilerClient filerClient, final String path, FilerProto.Entry.Builder entry,
+ final long position, final int bufferSize, final String replication) {
+ super(filerClient, path, entry, position, bufferSize, replication);
+ }
+
+}
diff --git a/other/java/hdfs2/src/main/java/seaweed/hdfs/SeaweedInputStream.java b/other/java/hdfs2/src/main/java/seaweed/hdfs/SeaweedInputStream.java
deleted file mode 100644
index 690366849..000000000
--- a/other/java/hdfs2/src/main/java/seaweed/hdfs/SeaweedInputStream.java
+++ /dev/null
@@ -1,259 +0,0 @@
-package seaweed.hdfs;
-
-// based on org.apache.hadoop.fs.azurebfs.services.AbfsInputStream
-
-import org.apache.hadoop.fs.ByteBufferReadable;
-import org.apache.hadoop.fs.FSExceptionMessages;
-import org.apache.hadoop.fs.FSInputStream;
-import org.apache.hadoop.fs.FileSystem.Statistics;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import seaweedfs.client.FilerGrpcClient;
-import seaweedfs.client.FilerProto;
-import seaweedfs.client.SeaweedRead;
-
-import java.io.EOFException;
-import java.io.IOException;
-import java.nio.ByteBuffer;
-import java.util.List;
-
-public class SeaweedInputStream extends FSInputStream implements ByteBufferReadable {
-
- private static final Logger LOG = LoggerFactory.getLogger(SeaweedInputStream.class);
-
- private final FilerGrpcClient filerGrpcClient;
- private final Statistics statistics;
- private final String path;
- private final FilerProto.Entry entry;
- private final List<SeaweedRead.VisibleInterval> visibleIntervalList;
- private final long contentLength;
-
- private long position = 0; // cursor of the file
-
- private boolean closed = false;
-
- public SeaweedInputStream(
- final FilerGrpcClient filerGrpcClient,
- final Statistics statistics,
- final String path,
- final FilerProto.Entry entry) throws IOException {
- this.filerGrpcClient = filerGrpcClient;
- this.statistics = statistics;
- this.path = path;
- this.entry = entry;
- this.contentLength = SeaweedRead.fileSize(entry);
-
- this.visibleIntervalList = SeaweedRead.nonOverlappingVisibleIntervals(filerGrpcClient, entry.getChunksList());
-
- LOG.debug("new path:{} entry:{} visibleIntervalList:{}", path, entry, visibleIntervalList);
-
- }
-
- public String getPath() {
- return path;
- }
-
- @Override
- public int read() throws IOException {
- byte[] b = new byte[1];
- int numberOfBytesRead = read(b, 0, 1);
- if (numberOfBytesRead < 0) {
- return -1;
- } else {
- return (b[0] & 0xFF);
- }
- }
-
- @Override
- public int read(final byte[] b, final int off, final int len) throws IOException {
-
- if (b == null) {
- throw new IllegalArgumentException("null byte array passed in to read() method");
- }
- if (off >= b.length) {
- throw new IllegalArgumentException("offset greater than length of array");
- }
- if (len < 0) {
- throw new IllegalArgumentException("requested read length is less than zero");
- }
- if (len > (b.length - off)) {
- throw new IllegalArgumentException("requested read length is more than will fit after requested offset in buffer");
- }
-
- ByteBuffer buf = ByteBuffer.wrap(b, off, len);
- return read(buf);
-
- }
-
- // implement ByteBufferReadable
- @Override
- public synchronized int read(ByteBuffer buf) throws IOException {
-
- if (position < 0) {
- throw new IllegalArgumentException("attempting to read from negative offset");
- }
- if (position >= contentLength) {
- return -1; // Hadoop prefers -1 to EOFException
- }
-
- long bytesRead = 0;
- int len = buf.remaining();
- int start = (int) this.position;
- if (start+len <= entry.getContent().size()) {
- entry.getContent().substring(start, start+len).copyTo(buf);
- } else {
- bytesRead = SeaweedRead.read(this.filerGrpcClient, this.visibleIntervalList, this.position, buf, SeaweedRead.fileSize(entry));
- }
-
- if (bytesRead > Integer.MAX_VALUE) {
- throw new IOException("Unexpected Content-Length");
- }
-
- if (bytesRead > 0) {
- this.position += bytesRead;
- if (statistics != null) {
- statistics.incrementBytesRead(bytesRead);
- }
- }
-
- return (int) bytesRead;
- }
-
- /**
- * Seek to given position in stream.
- *
- * @param n position to seek to
- * @throws IOException if there is an error
- * @throws EOFException if attempting to seek past end of file
- */
- @Override
- public synchronized void seek(long n) throws IOException {
- if (closed) {
- throw new IOException(FSExceptionMessages.STREAM_IS_CLOSED);
- }
- if (n < 0) {
- throw new EOFException(FSExceptionMessages.NEGATIVE_SEEK);
- }
- if (n > contentLength) {
- throw new EOFException(FSExceptionMessages.CANNOT_SEEK_PAST_EOF);
- }
-
- this.position = n;
-
- }
-
- @Override
- public synchronized long skip(long n) throws IOException {
- if (closed) {
- throw new IOException(FSExceptionMessages.STREAM_IS_CLOSED);
- }
- if (this.position == contentLength) {
- if (n > 0) {
- throw new EOFException(FSExceptionMessages.CANNOT_SEEK_PAST_EOF);
- }
- }
- long newPos = this.position + n;
- if (newPos < 0) {
- newPos = 0;
- n = newPos - this.position;
- }
- if (newPos > contentLength) {
- newPos = contentLength;
- n = newPos - this.position;
- }
- seek(newPos);
- return n;
- }
-
- /**
- * Return the size of the remaining available bytes
- * if the size is less than or equal to {@link Integer#MAX_VALUE},
- * otherwise, return {@link Integer#MAX_VALUE}.
- * <p>
- * This is to match the behavior of DFSInputStream.available(),
- * which some clients may rely on (HBase write-ahead log reading in
- * particular).
- */
- @Override
- public synchronized int available() throws IOException {
- if (closed) {
- throw new IOException(
- FSExceptionMessages.STREAM_IS_CLOSED);
- }
- final long remaining = this.contentLength - this.getPos();
- return remaining <= Integer.MAX_VALUE
- ? (int) remaining : Integer.MAX_VALUE;
- }
-
- /**
- * Returns the length of the file that this stream refers to. Note that the length returned is the length
- * as of the time the Stream was opened. Specifically, if there have been subsequent appends to the file,
- * they wont be reflected in the returned length.
- *
- * @return length of the file.
- * @throws IOException if the stream is closed
- */
- public long length() throws IOException {
- if (closed) {
- throw new IOException(FSExceptionMessages.STREAM_IS_CLOSED);
- }
- return contentLength;
- }
-
- /**
- * Return the current offset from the start of the file
- *
- * @throws IOException throws {@link IOException} if there is an error
- */
- @Override
- public synchronized long getPos() throws IOException {
- if (closed) {
- throw new IOException(FSExceptionMessages.STREAM_IS_CLOSED);
- }
- return position;
- }
-
- /**
- * Seeks a different copy of the data. Returns true if
- * found a new source, false otherwise.
- *
- * @throws IOException throws {@link IOException} if there is an error
- */
- @Override
- public boolean seekToNewSource(long l) throws IOException {
- return false;
- }
-
- @Override
- public synchronized void close() throws IOException {
- closed = true;
- }
-
- /**
- * Not supported by this stream. Throws {@link UnsupportedOperationException}
- *
- * @param readlimit ignored
- */
- @Override
- public synchronized void mark(int readlimit) {
- throw new UnsupportedOperationException("mark()/reset() not supported on this stream");
- }
-
- /**
- * Not supported by this stream. Throws {@link UnsupportedOperationException}
- */
- @Override
- public synchronized void reset() throws IOException {
- throw new UnsupportedOperationException("mark()/reset() not supported on this stream");
- }
-
- /**
- * gets whether mark and reset are supported by {@code ADLFileInputStream}. Always returns false.
- *
- * @return always {@code false}
- */
- @Override
- public boolean markSupported() {
- return false;
- }
-}
diff --git a/other/java/hdfs3/dependency-reduced-pom.xml b/other/java/hdfs3/dependency-reduced-pom.xml
index 20b52e20f..2b4a1a494 100644
--- a/other/java/hdfs3/dependency-reduced-pom.xml
+++ b/other/java/hdfs3/dependency-reduced-pom.xml
@@ -309,7 +309,7 @@
</snapshotRepository>
</distributionManagement>
<properties>
- <seaweedfs.client.version>1.5.6</seaweedfs.client.version>
+ <seaweedfs.client.version>1.6.1</seaweedfs.client.version>
<hadoop.version>3.1.1</hadoop.version>
</properties>
</project>
diff --git a/other/java/hdfs3/pom.xml b/other/java/hdfs3/pom.xml
index 85d8db859..49ff8f926 100644
--- a/other/java/hdfs3/pom.xml
+++ b/other/java/hdfs3/pom.xml
@@ -5,7 +5,7 @@
<modelVersion>4.0.0</modelVersion>
<properties>
- <seaweedfs.client.version>1.5.6</seaweedfs.client.version>
+ <seaweedfs.client.version>1.6.1</seaweedfs.client.version>
<hadoop.version>3.1.1</hadoop.version>
</properties>
diff --git a/other/java/hdfs3/src/main/java/seaweed/hdfs/SeaweedFileSystem.java b/other/java/hdfs3/src/main/java/seaweed/hdfs/SeaweedFileSystem.java
index 84f11e846..25395db7a 100644
--- a/other/java/hdfs3/src/main/java/seaweed/hdfs/SeaweedFileSystem.java
+++ b/other/java/hdfs3/src/main/java/seaweed/hdfs/SeaweedFileSystem.java
@@ -26,6 +26,7 @@ public class SeaweedFileSystem extends FileSystem {
public static final int FS_SEAWEED_DEFAULT_PORT = 8888;
public static final String FS_SEAWEED_BUFFER_SIZE = "fs.seaweed.buffer.size";
public static final String FS_SEAWEED_REPLICATION = "fs.seaweed.replication";
+ public static final String FS_SEAWEED_VOLUME_SERVER_ACCESS = "fs.seaweed.volume.server.access";
public static final int FS_SEAWEED_DEFAULT_BUFFER_SIZE = 4 * 1024 * 1024;
private static final Logger LOG = LoggerFactory.getLogger(SeaweedFileSystem.class);
diff --git a/other/java/hdfs3/src/main/java/seaweed/hdfs/SeaweedFileSystemStore.java b/other/java/hdfs3/src/main/java/seaweed/hdfs/SeaweedFileSystemStore.java
index 14b32528e..f4e8c9349 100644
--- a/other/java/hdfs3/src/main/java/seaweed/hdfs/SeaweedFileSystemStore.java
+++ b/other/java/hdfs3/src/main/java/seaweed/hdfs/SeaweedFileSystemStore.java
@@ -18,27 +18,31 @@ import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
-import static seaweed.hdfs.SeaweedFileSystem.FS_SEAWEED_BUFFER_SIZE;
-import static seaweed.hdfs.SeaweedFileSystem.FS_SEAWEED_DEFAULT_BUFFER_SIZE;
+import static seaweed.hdfs.SeaweedFileSystem.*;
public class SeaweedFileSystemStore {
private static final Logger LOG = LoggerFactory.getLogger(SeaweedFileSystemStore.class);
- private FilerGrpcClient filerGrpcClient;
private FilerClient filerClient;
private Configuration conf;
public SeaweedFileSystemStore(String host, int port, Configuration conf) {
int grpcPort = 10000 + port;
- filerGrpcClient = new FilerGrpcClient(host, grpcPort);
- filerClient = new FilerClient(filerGrpcClient);
+ filerClient = new FilerClient(host, grpcPort);
this.conf = conf;
+ String volumeServerAccessMode = this.conf.get(FS_SEAWEED_VOLUME_SERVER_ACCESS, "direct");
+ if (volumeServerAccessMode.equals("publicUrl")) {
+ filerClient.setAccessVolumeServerByPublicUrl();
+ } else if (volumeServerAccessMode.equals("filerProxy")) {
+ filerClient.setAccessVolumeServerByFilerProxy();
+ }
+
}
public void close() {
try {
- this.filerGrpcClient.shutdown();
+ this.filerClient.shutdown();
} catch (InterruptedException e) {
e.printStackTrace();
}
@@ -213,10 +217,10 @@ public class SeaweedFileSystemStore {
.clearGroupName()
.addAllGroupName(Arrays.asList(userGroupInformation.getGroupNames()))
);
- SeaweedWrite.writeMeta(filerGrpcClient, getParentDirectory(path), entry);
+ SeaweedWrite.writeMeta(filerClient, getParentDirectory(path), entry);
}
- return new SeaweedOutputStream(filerGrpcClient, path, entry, writePosition, bufferSize, replication);
+ return new SeaweedHadoopOutputStream(filerClient, path.toString(), entry, writePosition, bufferSize, replication);
}
@@ -230,7 +234,7 @@ public class SeaweedFileSystemStore {
throw new FileNotFoundException("read non-exist file " + path);
}
- return new SeaweedInputStream(filerGrpcClient,
+ return new SeaweedHadoopInputStream(filerClient,
statistics,
path.toUri().getPath(),
entry);
diff --git a/other/java/hdfs3/src/main/java/seaweed/hdfs/SeaweedHadoopInputStream.java b/other/java/hdfs3/src/main/java/seaweed/hdfs/SeaweedHadoopInputStream.java
new file mode 100644
index 000000000..f26eae597
--- /dev/null
+++ b/other/java/hdfs3/src/main/java/seaweed/hdfs/SeaweedHadoopInputStream.java
@@ -0,0 +1,150 @@
+package seaweed.hdfs;
+
+// based on org.apache.hadoop.fs.azurebfs.services.AbfsInputStream
+
+import org.apache.hadoop.fs.ByteBufferReadable;
+import org.apache.hadoop.fs.FSInputStream;
+import org.apache.hadoop.fs.FileSystem.Statistics;
+import seaweedfs.client.FilerClient;
+import seaweedfs.client.FilerProto;
+import seaweedfs.client.SeaweedInputStream;
+
+import java.io.EOFException;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+
+public class SeaweedHadoopInputStream extends FSInputStream implements ByteBufferReadable {
+
+ private final SeaweedInputStream seaweedInputStream;
+ private final Statistics statistics;
+
+ public SeaweedHadoopInputStream(
+ final FilerClient filerClient,
+ final Statistics statistics,
+ final String path,
+ final FilerProto.Entry entry) throws IOException {
+ this.seaweedInputStream = new SeaweedInputStream(filerClient, path, entry);
+ this.statistics = statistics;
+ }
+
+ @Override
+ public int read() throws IOException {
+ return seaweedInputStream.read();
+ }
+
+ @Override
+ public int read(final byte[] b, final int off, final int len) throws IOException {
+ return seaweedInputStream.read(b, off, len);
+ }
+
+ // implement ByteBufferReadable
+ @Override
+ public synchronized int read(ByteBuffer buf) throws IOException {
+ int bytesRead = seaweedInputStream.read(buf);
+
+ if (bytesRead > 0) {
+ if (statistics != null) {
+ statistics.incrementBytesRead(bytesRead);
+ }
+ }
+
+ return bytesRead;
+ }
+
+ /**
+ * Seek to given position in stream.
+ *
+ * @param n position to seek to
+ * @throws IOException if there is an error
+ * @throws EOFException if attempting to seek past end of file
+ */
+ @Override
+ public synchronized void seek(long n) throws IOException {
+ seaweedInputStream.seek(n);
+ }
+
+ @Override
+ public synchronized long skip(long n) throws IOException {
+ return seaweedInputStream.skip(n);
+ }
+
+ /**
+ * Return the size of the remaining available bytes
+ * if the size is less than or equal to {@link Integer#MAX_VALUE},
+ * otherwise, return {@link Integer#MAX_VALUE}.
+ * <p>
+ * This is to match the behavior of DFSInputStream.available(),
+ * which some clients may rely on (HBase write-ahead log reading in
+ * particular).
+ */
+ @Override
+ public synchronized int available() throws IOException {
+ return seaweedInputStream.available();
+ }
+
+ /**
+ * Returns the length of the file that this stream refers to. Note that the length returned is the length
+ * as of the time the Stream was opened. Specifically, if there have been subsequent appends to the file,
+ * they wont be reflected in the returned length.
+ *
+ * @return length of the file.
+ * @throws IOException if the stream is closed
+ */
+ public long length() throws IOException {
+ return seaweedInputStream.length();
+ }
+
+ /**
+ * Return the current offset from the start of the file
+ *
+ * @throws IOException throws {@link IOException} if there is an error
+ */
+ @Override
+ public synchronized long getPos() throws IOException {
+ return seaweedInputStream.getPos();
+ }
+
+ /**
+ * Seeks a different copy of the data. Returns true if
+ * found a new source, false otherwise.
+ *
+ * @throws IOException throws {@link IOException} if there is an error
+ */
+ @Override
+ public boolean seekToNewSource(long l) throws IOException {
+ return false;
+ }
+
+ @Override
+ public synchronized void close() throws IOException {
+ seaweedInputStream.close();
+ }
+
+ /**
+ * Not supported by this stream. Throws {@link UnsupportedOperationException}
+ *
+ * @param readlimit ignored
+ */
+ @Override
+ public synchronized void mark(int readlimit) {
+ throw new UnsupportedOperationException("mark()/reset() not supported on this stream");
+ }
+
+ /**
+ * Not supported by this stream. Throws {@link UnsupportedOperationException}
+ */
+ @Override
+ public synchronized void reset() throws IOException {
+ throw new UnsupportedOperationException("mark()/reset() not supported on this stream");
+ }
+
+ /**
+ * gets whether mark and reset are supported by {@code ADLFileInputStream}. Always returns false.
+ *
+ * @return always {@code false}
+ */
+ @Override
+ public boolean markSupported() {
+ return false;
+ }
+}
diff --git a/other/java/hdfs3/src/main/java/seaweed/hdfs/SeaweedHadoopOutputStream.java b/other/java/hdfs3/src/main/java/seaweed/hdfs/SeaweedHadoopOutputStream.java
new file mode 100644
index 000000000..1740312fe
--- /dev/null
+++ b/other/java/hdfs3/src/main/java/seaweed/hdfs/SeaweedHadoopOutputStream.java
@@ -0,0 +1,64 @@
+package seaweed.hdfs;
+
+// adapted from org.apache.hadoop.fs.azurebfs.services.AbfsOutputStream
+
+import org.apache.hadoop.fs.StreamCapabilities;
+import org.apache.hadoop.fs.Syncable;
+import seaweedfs.client.FilerClient;
+import seaweedfs.client.FilerProto;
+import seaweedfs.client.SeaweedOutputStream;
+
+import java.io.IOException;
+import java.util.Locale;
+
+public class SeaweedHadoopOutputStream extends SeaweedOutputStream implements Syncable, StreamCapabilities {
+
+ public SeaweedHadoopOutputStream(FilerClient filerClient, final String path, FilerProto.Entry.Builder entry,
+ final long position, final int bufferSize, final String replication) {
+ super(filerClient, path, entry, position, bufferSize, replication);
+ }
+
+ /**
+ * Similar to posix fsync, flush out the data in client's user buffer
+ * all the way to the disk device (but the disk may have it in its cache).
+ *
+ * @throws IOException if error occurs
+ */
+ @Override
+ public void hsync() throws IOException {
+ if (supportFlush) {
+ flushInternal();
+ }
+ }
+
+ /**
+ * Flush out the data in client's user buffer. After the return of
+ * this call, new readers will see the data.
+ *
+ * @throws IOException if any error occurs
+ */
+ @Override
+ public void hflush() throws IOException {
+ if (supportFlush) {
+ flushInternal();
+ }
+ }
+
+ /**
+ * Query the stream for a specific capability.
+ *
+ * @param capability string to query the stream support for.
+ * @return true for hsync and hflush.
+ */
+ @Override
+ public boolean hasCapability(String capability) {
+ switch (capability.toLowerCase(Locale.ENGLISH)) {
+ case StreamCapabilities.HSYNC:
+ case StreamCapabilities.HFLUSH:
+ return supportFlush;
+ default:
+ return false;
+ }
+ }
+
+}
diff --git a/other/java/hdfs3/src/main/java/seaweed/hdfs/SeaweedOutputStream.java b/other/java/hdfs3/src/main/java/seaweed/hdfs/SeaweedOutputStream.java
deleted file mode 100644
index d4c967a06..000000000
--- a/other/java/hdfs3/src/main/java/seaweed/hdfs/SeaweedOutputStream.java
+++ /dev/null
@@ -1,337 +0,0 @@
-package seaweed.hdfs;
-
-// adapted from org.apache.hadoop.fs.azurebfs.services.AbfsOutputStream
-
-import com.google.common.base.Preconditions;
-import org.apache.hadoop.fs.FSExceptionMessages;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.fs.StreamCapabilities;
-import org.apache.hadoop.fs.Syncable;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import seaweedfs.client.ByteBufferPool;
-import seaweedfs.client.FilerGrpcClient;
-import seaweedfs.client.FilerProto;
-import seaweedfs.client.SeaweedWrite;
-
-import java.io.IOException;
-import java.io.InterruptedIOException;
-import java.io.OutputStream;
-import java.nio.ByteBuffer;
-import java.util.Locale;
-import java.util.concurrent.*;
-
-import static seaweed.hdfs.SeaweedFileSystemStore.getParentDirectory;
-
-public class SeaweedOutputStream extends OutputStream implements Syncable, StreamCapabilities {
-
- private static final Logger LOG = LoggerFactory.getLogger(SeaweedOutputStream.class);
-
- private final FilerGrpcClient filerGrpcClient;
- private final Path path;
- private final int bufferSize;
- private final int maxConcurrentRequestCount;
- private final ThreadPoolExecutor threadExecutor;
- private final ExecutorCompletionService<Void> completionService;
- private final FilerProto.Entry.Builder entry;
- private final boolean supportFlush = false; // true;
- private final ConcurrentLinkedDeque<WriteOperation> writeOperations;
- private long position;
- private boolean closed;
- private volatile IOException lastError;
- private long lastFlushOffset;
- private long lastTotalAppendOffset = 0;
- private ByteBuffer buffer;
- private long outputIndex;
- private String replication = "000";
-
- public SeaweedOutputStream(FilerGrpcClient filerGrpcClient, final Path path, FilerProto.Entry.Builder entry,
- final long position, final int bufferSize, final String replication) {
- this.filerGrpcClient = filerGrpcClient;
- this.replication = replication;
- this.path = path;
- this.position = position;
- this.closed = false;
- this.lastError = null;
- this.lastFlushOffset = 0;
- this.bufferSize = bufferSize;
- this.buffer = ByteBufferPool.request(bufferSize);
- this.outputIndex = 0;
- this.writeOperations = new ConcurrentLinkedDeque<>();
-
- this.maxConcurrentRequestCount = Runtime.getRuntime().availableProcessors();
-
- this.threadExecutor
- = new ThreadPoolExecutor(maxConcurrentRequestCount,
- maxConcurrentRequestCount,
- 120L,
- TimeUnit.SECONDS,
- new LinkedBlockingQueue<Runnable>());
- this.completionService = new ExecutorCompletionService<>(this.threadExecutor);
-
- this.entry = entry;
-
- }
-
- private synchronized void flushWrittenBytesToServiceInternal(final long offset) throws IOException {
- try {
- SeaweedWrite.writeMeta(filerGrpcClient, getParentDirectory(path), entry);
- } catch (Exception ex) {
- throw new IOException(ex);
- }
- this.lastFlushOffset = offset;
- }
-
- @Override
- public void write(final int byteVal) throws IOException {
- write(new byte[]{(byte) (byteVal & 0xFF)});
- }
-
- @Override
- public synchronized void write(final byte[] data, final int off, final int length)
- throws IOException {
- maybeThrowLastError();
-
- Preconditions.checkArgument(data != null, "null data");
-
- if (off < 0 || length < 0 || length > data.length - off) {
- throw new IndexOutOfBoundsException();
- }
-
- // System.out.println(path + " write [" + (outputIndex + off) + "," + ((outputIndex + off) + length) + ")");
-
- int currentOffset = off;
- int writableBytes = bufferSize - buffer.position();
- int numberOfBytesToWrite = length;
-
- while (numberOfBytesToWrite > 0) {
-
- if (numberOfBytesToWrite < writableBytes) {
- buffer.put(data, currentOffset, numberOfBytesToWrite);
- outputIndex += numberOfBytesToWrite;
- break;
- }
-
- // System.out.println(path + " [" + (outputIndex + currentOffset) + "," + ((outputIndex + currentOffset) + writableBytes) + ") " + buffer.capacity());
- buffer.put(data, currentOffset, writableBytes);
- outputIndex += writableBytes;
- currentOffset += writableBytes;
- writeCurrentBufferToService();
- numberOfBytesToWrite = numberOfBytesToWrite - writableBytes;
- writableBytes = bufferSize - buffer.position();
- }
-
- }
-
- /**
- * Flushes this output stream and forces any buffered output bytes to be
- * written out. If any data remains in the payload it is committed to the
- * service. Data is queued for writing and forced out to the service
- * before the call returns.
- */
- @Override
- public void flush() throws IOException {
- if (supportFlush) {
- flushInternalAsync();
- }
- }
-
- /**
- * Similar to posix fsync, flush out the data in client's user buffer
- * all the way to the disk device (but the disk may have it in its cache).
- *
- * @throws IOException if error occurs
- */
- @Override
- public void hsync() throws IOException {
- if (supportFlush) {
- flushInternal();
- }
- }
-
- /**
- * Flush out the data in client's user buffer. After the return of
- * this call, new readers will see the data.
- *
- * @throws IOException if any error occurs
- */
- @Override
- public void hflush() throws IOException {
- if (supportFlush) {
- flushInternal();
- }
- }
-
- /**
- * Query the stream for a specific capability.
- *
- * @param capability string to query the stream support for.
- * @return true for hsync and hflush.
- */
- @Override
- public boolean hasCapability(String capability) {
- switch (capability.toLowerCase(Locale.ENGLISH)) {
- case StreamCapabilities.HSYNC:
- case StreamCapabilities.HFLUSH:
- return supportFlush;
- default:
- return false;
- }
- }
-
- /**
- * Force all data in the output stream to be written to Azure storage.
- * Wait to return until this is complete. Close the access to the stream and
- * shutdown the upload thread pool.
- * If the blob was created, its lease will be released.
- * Any error encountered caught in threads and stored will be rethrown here
- * after cleanup.
- */
- @Override
- public synchronized void close() throws IOException {
- if (closed) {
- return;
- }
-
- LOG.debug("close path: {}", path);
- try {
- flushInternal();
- threadExecutor.shutdown();
- } finally {
- lastError = new IOException(FSExceptionMessages.STREAM_IS_CLOSED);
- ByteBufferPool.release(buffer);
- buffer = null;
- outputIndex = 0;
- closed = true;
- writeOperations.clear();
- if (!threadExecutor.isShutdown()) {
- threadExecutor.shutdownNow();
- }
- }
- }
-
- private synchronized void writeCurrentBufferToService() throws IOException {
- if (buffer.position() == 0) {
- return;
- }
-
- position += submitWriteBufferToService(buffer, position);
-
- buffer = ByteBufferPool.request(bufferSize);
-
- }
-
- private synchronized int submitWriteBufferToService(final ByteBuffer bufferToWrite, final long writePosition) throws IOException {
-
- bufferToWrite.flip();
- int bytesLength = bufferToWrite.limit() - bufferToWrite.position();
-
- if (threadExecutor.getQueue().size() >= maxConcurrentRequestCount) {
- waitForTaskToComplete();
- }
- final Future<Void> job = completionService.submit(() -> {
- // System.out.println(path + " is going to save [" + (writePosition) + "," + ((writePosition) + bytesLength) + ")");
- SeaweedWrite.writeData(entry, replication, filerGrpcClient, writePosition, bufferToWrite.array(), bufferToWrite.position(), bufferToWrite.limit(), path.toUri().getPath());
- // System.out.println(path + " saved [" + (writePosition) + "," + ((writePosition) + bytesLength) + ")");
- ByteBufferPool.release(bufferToWrite);
- return null;
- });
-
- writeOperations.add(new WriteOperation(job, writePosition, bytesLength));
-
- // Try to shrink the queue
- shrinkWriteOperationQueue();
-
- return bytesLength;
-
- }
-
- private void waitForTaskToComplete() throws IOException {
- boolean completed;
- for (completed = false; completionService.poll() != null; completed = true) {
- // keep polling until there is no data
- }
-
- if (!completed) {
- try {
- completionService.take();
- } catch (InterruptedException e) {
- lastError = (IOException) new InterruptedIOException(e.toString()).initCause(e);
- throw lastError;
- }
- }
- }
-
- private void maybeThrowLastError() throws IOException {
- if (lastError != null) {
- throw lastError;
- }
- }
-
- /**
- * Try to remove the completed write operations from the beginning of write
- * operation FIFO queue.
- */
- private synchronized void shrinkWriteOperationQueue() throws IOException {
- try {
- while (writeOperations.peek() != null && writeOperations.peek().task.isDone()) {
- writeOperations.peek().task.get();
- lastTotalAppendOffset += writeOperations.peek().length;
- writeOperations.remove();
- }
- } catch (Exception e) {
- lastError = new IOException(e);
- throw lastError;
- }
- }
-
- private synchronized void flushInternal() throws IOException {
- maybeThrowLastError();
- writeCurrentBufferToService();
- flushWrittenBytesToService();
- }
-
- private synchronized void flushInternalAsync() throws IOException {
- maybeThrowLastError();
- writeCurrentBufferToService();
- flushWrittenBytesToServiceAsync();
- }
-
- private synchronized void flushWrittenBytesToService() throws IOException {
- for (WriteOperation writeOperation : writeOperations) {
- try {
- writeOperation.task.get();
- } catch (Exception ex) {
- lastError = new IOException(ex);
- throw lastError;
- }
- }
- LOG.debug("flushWrittenBytesToService: {} position:{}", path, position);
- flushWrittenBytesToServiceInternal(position);
- }
-
- private synchronized void flushWrittenBytesToServiceAsync() throws IOException {
- shrinkWriteOperationQueue();
-
- if (this.lastTotalAppendOffset > this.lastFlushOffset) {
- this.flushWrittenBytesToServiceInternal(this.lastTotalAppendOffset);
- }
- }
-
- private static class WriteOperation {
- private final Future<Void> task;
- private final long startOffset;
- private final long length;
-
- WriteOperation(final Future<Void> task, final long startOffset, final long length) {
- Preconditions.checkNotNull(task, "task");
- Preconditions.checkArgument(startOffset >= 0, "startOffset");
- Preconditions.checkArgument(length >= 0, "length");
-
- this.task = task;
- this.startOffset = startOffset;
- this.length = length;
- }
- }
-
-}