diff options
| author | Chris Lu <chris.lu@gmail.com> | 2021-02-09 11:37:07 -0800 |
|---|---|---|
| committer | Chris Lu <chris.lu@gmail.com> | 2021-02-09 11:37:07 -0800 |
| commit | 821c46edf10097200b986bd17dc01d3991cf57ff (patch) | |
| tree | ca181a9ef3c2f7e45cf0dbb40373b87717a9a636 /other/java | |
| parent | 15da5834e1a33d060924740ba195f6bcd79f2af2 (diff) | |
| parent | a6e8d606b47e5f3e8cd8a57d2769d6f1404fbc8f (diff) | |
| download | seaweedfs-821c46edf10097200b986bd17dc01d3991cf57ff.tar.xz seaweedfs-821c46edf10097200b986bd17dc01d3991cf57ff.zip | |
Merge branch 'master' into support_ssd_volume
Diffstat (limited to 'other/java')
29 files changed, 679 insertions, 798 deletions
diff --git a/other/java/client/pom.xml b/other/java/client/pom.xml index 4bfc5ab8f..056904ebe 100644 --- a/other/java/client/pom.xml +++ b/other/java/client/pom.xml @@ -5,7 +5,7 @@ <groupId>com.github.chrislusf</groupId> <artifactId>seaweedfs-client</artifactId> - <version>1.5.6</version> + <version>1.6.1</version> <parent> <groupId>org.sonatype.oss</groupId> diff --git a/other/java/client/pom.xml.deploy b/other/java/client/pom.xml.deploy index c3c960a28..69b900017 100644 --- a/other/java/client/pom.xml.deploy +++ b/other/java/client/pom.xml.deploy @@ -5,7 +5,7 @@ <groupId>com.github.chrislusf</groupId> <artifactId>seaweedfs-client</artifactId> - <version>1.5.6</version> + <version>1.6.1</version> <parent> <groupId>org.sonatype.oss</groupId> diff --git a/other/java/client/pom_debug.xml b/other/java/client/pom_debug.xml index 078778b6c..1447401b7 100644 --- a/other/java/client/pom_debug.xml +++ b/other/java/client/pom_debug.xml @@ -5,7 +5,7 @@ <groupId>com.github.chrislusf</groupId> <artifactId>seaweedfs-client</artifactId> - <version>1.5.6</version> + <version>1.6.1</version> <parent> <groupId>org.sonatype.oss</groupId> diff --git a/other/java/client/src/main/java/seaweedfs/client/FileChunkManifest.java b/other/java/client/src/main/java/seaweedfs/client/FileChunkManifest.java index 3293db2ca..9b6ba5dfc 100644 --- a/other/java/client/src/main/java/seaweedfs/client/FileChunkManifest.java +++ b/other/java/client/src/main/java/seaweedfs/client/FileChunkManifest.java @@ -23,7 +23,7 @@ public class FileChunkManifest { } public static List<FilerProto.FileChunk> resolveChunkManifest( - final FilerGrpcClient filerGrpcClient, List<FilerProto.FileChunk> chunks) throws IOException { + final FilerClient filerClient, List<FilerProto.FileChunk> chunks) throws IOException { List<FilerProto.FileChunk> dataChunks = new ArrayList<>(); @@ -35,30 +35,30 @@ public class FileChunkManifest { // IsChunkManifest LOG.debug("fetching chunk manifest:{}", chunk); - byte[] data = fetchChunk(filerGrpcClient, chunk); + byte[] data = fetchChunk(filerClient, chunk); FilerProto.FileChunkManifest m = FilerProto.FileChunkManifest.newBuilder().mergeFrom(data).build(); List<FilerProto.FileChunk> resolvedChunks = new ArrayList<>(); for (FilerProto.FileChunk t : m.getChunksList()) { // avoid deprecated chunk.getFileId() resolvedChunks.add(t.toBuilder().setFileId(FilerClient.toFileId(t.getFid())).build()); } - dataChunks.addAll(resolveChunkManifest(filerGrpcClient, resolvedChunks)); + dataChunks.addAll(resolveChunkManifest(filerClient, resolvedChunks)); } return dataChunks; } - private static byte[] fetchChunk(final FilerGrpcClient filerGrpcClient, FilerProto.FileChunk chunk) throws IOException { + private static byte[] fetchChunk(final FilerClient filerClient, FilerProto.FileChunk chunk) throws IOException { String vid = "" + chunk.getFid().getVolumeId(); - FilerProto.Locations locations = filerGrpcClient.vidLocations.get(vid); + FilerProto.Locations locations = filerClient.vidLocations.get(vid); if (locations == null) { FilerProto.LookupVolumeRequest.Builder lookupRequest = FilerProto.LookupVolumeRequest.newBuilder(); lookupRequest.addVolumeIds(vid); - FilerProto.LookupVolumeResponse lookupResponse = filerGrpcClient + FilerProto.LookupVolumeResponse lookupResponse = filerClient .getBlockingStub().lookupVolume(lookupRequest.build()); locations = lookupResponse.getLocationsMapMap().get(vid); - filerGrpcClient.vidLocations.put(vid, locations); + filerClient.vidLocations.put(vid, locations); LOG.debug("fetchChunk vid:{} locations:{}", vid, locations); } @@ -74,7 +74,7 @@ public class FileChunkManifest { byte[] chunkData = SeaweedRead.chunkCache.getChunk(chunkView.fileId); if (chunkData == null) { LOG.debug("doFetchFullChunkData:{}", chunkView); - chunkData = SeaweedRead.doFetchFullChunkData(chunkView, locations); + chunkData = SeaweedRead.doFetchFullChunkData(filerClient, chunkView, locations); } if (chunk.getIsChunkManifest()){ LOG.debug("chunk {} size {}", chunkView.fileId, chunkData.length); @@ -86,7 +86,7 @@ public class FileChunkManifest { } public static List<FilerProto.FileChunk> maybeManifestize( - final FilerGrpcClient filerGrpcClient, List<FilerProto.FileChunk> inputChunks, String parentDirectory) throws IOException { + final FilerClient filerClient, List<FilerProto.FileChunk> inputChunks, String parentDirectory) throws IOException { // the return variable List<FilerProto.FileChunk> chunks = new ArrayList<>(); @@ -101,7 +101,7 @@ public class FileChunkManifest { int remaining = dataChunks.size(); for (int i = 0; i + mergeFactor < dataChunks.size(); i += mergeFactor) { - FilerProto.FileChunk chunk = mergeIntoManifest(filerGrpcClient, dataChunks.subList(i, i + mergeFactor), parentDirectory); + FilerProto.FileChunk chunk = mergeIntoManifest(filerClient, dataChunks.subList(i, i + mergeFactor), parentDirectory); chunks.add(chunk); remaining -= mergeFactor; } @@ -113,7 +113,7 @@ public class FileChunkManifest { return chunks; } - private static FilerProto.FileChunk mergeIntoManifest(final FilerGrpcClient filerGrpcClient, List<FilerProto.FileChunk> dataChunks, String parentDirectory) throws IOException { + private static FilerProto.FileChunk mergeIntoManifest(final FilerClient filerClient, List<FilerProto.FileChunk> dataChunks, String parentDirectory) throws IOException { // create and serialize the manifest dataChunks = FilerClient.beforeEntrySerialization(dataChunks); FilerProto.FileChunkManifest.Builder m = FilerProto.FileChunkManifest.newBuilder().addAllChunks(dataChunks); @@ -127,8 +127,8 @@ public class FileChunkManifest { } FilerProto.FileChunk.Builder manifestChunk = SeaweedWrite.writeChunk( - filerGrpcClient.getReplication(), - filerGrpcClient, + filerClient.getReplication(), + filerClient, minOffset, data, 0, data.length, parentDirectory); manifestChunk.setIsChunkManifest(true); diff --git a/other/java/client/src/main/java/seaweedfs/client/FilerClient.java b/other/java/client/src/main/java/seaweedfs/client/FilerClient.java index 7338d5bee..58269d41f 100644 --- a/other/java/client/src/main/java/seaweedfs/client/FilerClient.java +++ b/other/java/client/src/main/java/seaweedfs/client/FilerClient.java @@ -11,18 +11,12 @@ import java.util.Arrays; import java.util.Iterator; import java.util.List; -public class FilerClient { +public class FilerClient extends FilerGrpcClient { private static final Logger LOG = LoggerFactory.getLogger(FilerClient.class); - private final FilerGrpcClient filerGrpcClient; - public FilerClient(String host, int grpcPort) { - filerGrpcClient = new FilerGrpcClient(host, grpcPort); - } - - public FilerClient(FilerGrpcClient filerGrpcClient) { - this.filerGrpcClient = filerGrpcClient; + super(host, grpcPort); } public static String toFileId(FilerProto.FileId fid) { @@ -236,7 +230,7 @@ public class FilerClient { } public List<FilerProto.Entry> listEntries(String path, String entryPrefix, String lastEntryName, int limit, boolean includeLastEntry) { - Iterator<FilerProto.ListEntriesResponse> iter = filerGrpcClient.getBlockingStub().listEntries(FilerProto.ListEntriesRequest.newBuilder() + Iterator<FilerProto.ListEntriesResponse> iter = this.getBlockingStub().listEntries(FilerProto.ListEntriesRequest.newBuilder() .setDirectory(path) .setPrefix(entryPrefix) .setStartFromFileName(lastEntryName) @@ -253,7 +247,7 @@ public class FilerClient { public FilerProto.Entry lookupEntry(String directory, String entryName) { try { - FilerProto.Entry entry = filerGrpcClient.getBlockingStub().lookupDirectoryEntry( + FilerProto.Entry entry = this.getBlockingStub().lookupDirectoryEntry( FilerProto.LookupDirectoryEntryRequest.newBuilder() .setDirectory(directory) .setName(entryName) @@ -274,7 +268,7 @@ public class FilerClient { public boolean createEntry(String parent, FilerProto.Entry entry) { try { FilerProto.CreateEntryResponse createEntryResponse = - filerGrpcClient.getBlockingStub().createEntry(FilerProto.CreateEntryRequest.newBuilder() + this.getBlockingStub().createEntry(FilerProto.CreateEntryRequest.newBuilder() .setDirectory(parent) .setEntry(entry) .build()); @@ -291,7 +285,7 @@ public class FilerClient { public boolean updateEntry(String parent, FilerProto.Entry entry) { try { - filerGrpcClient.getBlockingStub().updateEntry(FilerProto.UpdateEntryRequest.newBuilder() + this.getBlockingStub().updateEntry(FilerProto.UpdateEntryRequest.newBuilder() .setDirectory(parent) .setEntry(entry) .build()); @@ -304,7 +298,7 @@ public class FilerClient { public boolean deleteEntry(String parent, String entryName, boolean isDeleteFileChunk, boolean isRecursive, boolean ignoreRecusiveError) { try { - filerGrpcClient.getBlockingStub().deleteEntry(FilerProto.DeleteEntryRequest.newBuilder() + this.getBlockingStub().deleteEntry(FilerProto.DeleteEntryRequest.newBuilder() .setDirectory(parent) .setName(entryName) .setIsDeleteData(isDeleteFileChunk) @@ -320,7 +314,7 @@ public class FilerClient { public boolean atomicRenameEntry(String oldParent, String oldName, String newParent, String newName) { try { - filerGrpcClient.getBlockingStub().atomicRenameEntry(FilerProto.AtomicRenameEntryRequest.newBuilder() + this.getBlockingStub().atomicRenameEntry(FilerProto.AtomicRenameEntryRequest.newBuilder() .setOldDirectory(oldParent) .setOldName(oldName) .setNewDirectory(newParent) @@ -334,7 +328,7 @@ public class FilerClient { } public Iterator<FilerProto.SubscribeMetadataResponse> watch(String prefix, String clientName, long sinceNs) { - return filerGrpcClient.getBlockingStub().subscribeMetadata(FilerProto.SubscribeMetadataRequest.newBuilder() + return this.getBlockingStub().subscribeMetadata(FilerProto.SubscribeMetadataRequest.newBuilder() .setPathPrefix(prefix) .setClientName(clientName) .setSinceNs(sinceNs) diff --git a/other/java/client/src/main/java/seaweedfs/client/FilerGrpcClient.java b/other/java/client/src/main/java/seaweedfs/client/FilerGrpcClient.java index 1a719f3c0..6c57e2e0d 100644 --- a/other/java/client/src/main/java/seaweedfs/client/FilerGrpcClient.java +++ b/other/java/client/src/main/java/seaweedfs/client/FilerGrpcClient.java @@ -9,8 +9,8 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import javax.net.ssl.SSLException; -import java.util.Map; import java.util.HashMap; +import java.util.Map; import java.util.concurrent.TimeUnit; public class FilerGrpcClient { @@ -26,6 +26,9 @@ public class FilerGrpcClient { } } + public final int VOLUME_SERVER_ACCESS_DIRECT = 0; + public final int VOLUME_SERVER_ACCESS_PUBLIC_URL = 1; + public final int VOLUME_SERVER_ACCESS_FILER_PROXY = 2; public final Map<String, FilerProto.Locations> vidLocations = new HashMap<>(); private final ManagedChannel channel; private final SeaweedFilerGrpc.SeaweedFilerBlockingStub blockingStub; @@ -34,6 +37,8 @@ public class FilerGrpcClient { private boolean cipher = false; private String collection = ""; private String replication = ""; + private int volumeServerAccess = VOLUME_SERVER_ACCESS_DIRECT; + private String filerAddress; public FilerGrpcClient(String host, int grpcPort) { this(host, grpcPort, sslContext); @@ -49,6 +54,8 @@ public class FilerGrpcClient { .negotiationType(NegotiationType.TLS) .sslContext(sslContext)); + filerAddress = String.format("%s:%d", host, grpcPort - 10000); + FilerProto.GetFilerConfigurationResponse filerConfigurationResponse = this.getBlockingStub().getFilerConfiguration( FilerProto.GetFilerConfigurationRequest.newBuilder().build()); @@ -58,7 +65,7 @@ public class FilerGrpcClient { } - public FilerGrpcClient(ManagedChannelBuilder<?> channelBuilder) { + private FilerGrpcClient(ManagedChannelBuilder<?> channelBuilder) { channel = channelBuilder.build(); blockingStub = SeaweedFilerGrpc.newBlockingStub(channel); asyncStub = SeaweedFilerGrpc.newStub(channel); @@ -93,4 +100,39 @@ public class FilerGrpcClient { return futureStub; } + public void setAccessVolumeServerDirectly() { + this.volumeServerAccess = VOLUME_SERVER_ACCESS_DIRECT; + } + + public boolean isAccessVolumeServerDirectly() { + return this.volumeServerAccess == VOLUME_SERVER_ACCESS_DIRECT; + } + + public void setAccessVolumeServerByPublicUrl() { + this.volumeServerAccess = VOLUME_SERVER_ACCESS_PUBLIC_URL; + } + + public boolean isAccessVolumeServerByPublicUrl() { + return this.volumeServerAccess == VOLUME_SERVER_ACCESS_PUBLIC_URL; + } + + public void setAccessVolumeServerByFilerProxy() { + this.volumeServerAccess = VOLUME_SERVER_ACCESS_FILER_PROXY; + } + + public boolean isAccessVolumeServerByFilerProxy() { + return this.volumeServerAccess == VOLUME_SERVER_ACCESS_FILER_PROXY; + } + + public String getChunkUrl(String chunkId, String url, String publicUrl) { + switch (this.volumeServerAccess) { + case VOLUME_SERVER_ACCESS_PUBLIC_URL: + return String.format("http://%s/%s", publicUrl, chunkId); + case VOLUME_SERVER_ACCESS_FILER_PROXY: + return String.format("http://%s/?proxyChunkId=%s", this.filerAddress, chunkId); + default: + return String.format("http://%s/%s", url, chunkId); + } + } + } diff --git a/other/java/hdfs3/src/main/java/seaweed/hdfs/SeaweedInputStream.java b/other/java/client/src/main/java/seaweedfs/client/SeaweedInputStream.java index 690366849..4e40ce1b6 100644 --- a/other/java/hdfs3/src/main/java/seaweed/hdfs/SeaweedInputStream.java +++ b/other/java/client/src/main/java/seaweedfs/client/SeaweedInputStream.java @@ -1,28 +1,22 @@ -package seaweed.hdfs; +package seaweedfs.client; // based on org.apache.hadoop.fs.azurebfs.services.AbfsInputStream -import org.apache.hadoop.fs.ByteBufferReadable; -import org.apache.hadoop.fs.FSExceptionMessages; -import org.apache.hadoop.fs.FSInputStream; -import org.apache.hadoop.fs.FileSystem.Statistics; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import seaweedfs.client.FilerGrpcClient; -import seaweedfs.client.FilerProto; -import seaweedfs.client.SeaweedRead; import java.io.EOFException; import java.io.IOException; +import java.io.InputStream; import java.nio.ByteBuffer; import java.util.List; -public class SeaweedInputStream extends FSInputStream implements ByteBufferReadable { +public class SeaweedInputStream extends InputStream { private static final Logger LOG = LoggerFactory.getLogger(SeaweedInputStream.class); + private static final IOException EXCEPTION_STREAM_IS_CLOSED = new IOException("Stream is closed!"); - private final FilerGrpcClient filerGrpcClient; - private final Statistics statistics; + private final FilerClient filerClient; private final String path; private final FilerProto.Entry entry; private final List<SeaweedRead.VisibleInterval> visibleIntervalList; @@ -33,17 +27,31 @@ public class SeaweedInputStream extends FSInputStream implements ByteBufferReada private boolean closed = false; public SeaweedInputStream( - final FilerGrpcClient filerGrpcClient, - final Statistics statistics, + final FilerClient filerClient, + final String fullpath) throws IOException { + this.path = fullpath; + this.filerClient = filerClient; + this.entry = filerClient.lookupEntry( + SeaweedOutputStream.getParentDirectory(fullpath), + SeaweedOutputStream.getFileName(fullpath)); + this.contentLength = SeaweedRead.fileSize(entry); + + this.visibleIntervalList = SeaweedRead.nonOverlappingVisibleIntervals(filerClient, entry.getChunksList()); + + LOG.debug("new path:{} entry:{} visibleIntervalList:{}", path, entry, visibleIntervalList); + + } + + public SeaweedInputStream( + final FilerClient filerClient, final String path, final FilerProto.Entry entry) throws IOException { - this.filerGrpcClient = filerGrpcClient; - this.statistics = statistics; + this.filerClient = filerClient; this.path = path; this.entry = entry; this.contentLength = SeaweedRead.fileSize(entry); - this.visibleIntervalList = SeaweedRead.nonOverlappingVisibleIntervals(filerGrpcClient, entry.getChunksList()); + this.visibleIntervalList = SeaweedRead.nonOverlappingVisibleIntervals(filerClient, entry.getChunksList()); LOG.debug("new path:{} entry:{} visibleIntervalList:{}", path, entry, visibleIntervalList); @@ -86,7 +94,6 @@ public class SeaweedInputStream extends FSInputStream implements ByteBufferReada } // implement ByteBufferReadable - @Override public synchronized int read(ByteBuffer buf) throws IOException { if (position < 0) { @@ -102,7 +109,7 @@ public class SeaweedInputStream extends FSInputStream implements ByteBufferReada if (start+len <= entry.getContent().size()) { entry.getContent().substring(start, start+len).copyTo(buf); } else { - bytesRead = SeaweedRead.read(this.filerGrpcClient, this.visibleIntervalList, this.position, buf, SeaweedRead.fileSize(entry)); + bytesRead = SeaweedRead.read(this.filerClient, this.visibleIntervalList, this.position, buf, SeaweedRead.fileSize(entry)); } if (bytesRead > Integer.MAX_VALUE) { @@ -111,45 +118,32 @@ public class SeaweedInputStream extends FSInputStream implements ByteBufferReada if (bytesRead > 0) { this.position += bytesRead; - if (statistics != null) { - statistics.incrementBytesRead(bytesRead); - } } return (int) bytesRead; } - /** - * Seek to given position in stream. - * - * @param n position to seek to - * @throws IOException if there is an error - * @throws EOFException if attempting to seek past end of file - */ - @Override public synchronized void seek(long n) throws IOException { if (closed) { - throw new IOException(FSExceptionMessages.STREAM_IS_CLOSED); + throw EXCEPTION_STREAM_IS_CLOSED; } if (n < 0) { - throw new EOFException(FSExceptionMessages.NEGATIVE_SEEK); + throw new EOFException("Cannot seek to a negative offset"); } if (n > contentLength) { - throw new EOFException(FSExceptionMessages.CANNOT_SEEK_PAST_EOF); + throw new EOFException("Attempted to seek or read past the end of the file"); } - this.position = n; - } @Override public synchronized long skip(long n) throws IOException { if (closed) { - throw new IOException(FSExceptionMessages.STREAM_IS_CLOSED); + throw EXCEPTION_STREAM_IS_CLOSED; } if (this.position == contentLength) { if (n > 0) { - throw new EOFException(FSExceptionMessages.CANNOT_SEEK_PAST_EOF); + throw new EOFException("Attempted to seek or read past the end of the file"); } } long newPos = this.position + n; @@ -177,10 +171,9 @@ public class SeaweedInputStream extends FSInputStream implements ByteBufferReada @Override public synchronized int available() throws IOException { if (closed) { - throw new IOException( - FSExceptionMessages.STREAM_IS_CLOSED); + throw EXCEPTION_STREAM_IS_CLOSED; } - final long remaining = this.contentLength - this.getPos(); + final long remaining = this.contentLength - this.position; return remaining <= Integer.MAX_VALUE ? (int) remaining : Integer.MAX_VALUE; } @@ -195,65 +188,21 @@ public class SeaweedInputStream extends FSInputStream implements ByteBufferReada */ public long length() throws IOException { if (closed) { - throw new IOException(FSExceptionMessages.STREAM_IS_CLOSED); + throw EXCEPTION_STREAM_IS_CLOSED; } return contentLength; } - /** - * Return the current offset from the start of the file - * - * @throws IOException throws {@link IOException} if there is an error - */ - @Override public synchronized long getPos() throws IOException { if (closed) { - throw new IOException(FSExceptionMessages.STREAM_IS_CLOSED); + throw EXCEPTION_STREAM_IS_CLOSED; } return position; } - /** - * Seeks a different copy of the data. Returns true if - * found a new source, false otherwise. - * - * @throws IOException throws {@link IOException} if there is an error - */ - @Override - public boolean seekToNewSource(long l) throws IOException { - return false; - } - @Override public synchronized void close() throws IOException { closed = true; } - /** - * Not supported by this stream. Throws {@link UnsupportedOperationException} - * - * @param readlimit ignored - */ - @Override - public synchronized void mark(int readlimit) { - throw new UnsupportedOperationException("mark()/reset() not supported on this stream"); - } - - /** - * Not supported by this stream. Throws {@link UnsupportedOperationException} - */ - @Override - public synchronized void reset() throws IOException { - throw new UnsupportedOperationException("mark()/reset() not supported on this stream"); - } - - /** - * gets whether mark and reset are supported by {@code ADLFileInputStream}. Always returns false. - * - * @return always {@code false} - */ - @Override - public boolean markSupported() { - return false; - } } diff --git a/other/java/hdfs2/src/main/java/seaweed/hdfs/SeaweedOutputStream.java b/other/java/client/src/main/java/seaweedfs/client/SeaweedOutputStream.java index 26290c46c..b73e99e69 100644 --- a/other/java/hdfs2/src/main/java/seaweed/hdfs/SeaweedOutputStream.java +++ b/other/java/client/src/main/java/seaweedfs/client/SeaweedOutputStream.java @@ -1,16 +1,9 @@ -package seaweed.hdfs; +package seaweedfs.client; // adapted from org.apache.hadoop.fs.azurebfs.services.AbfsOutputStream -import com.google.common.base.Preconditions; -import org.apache.hadoop.fs.FSExceptionMessages; -import org.apache.hadoop.fs.Path; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import seaweedfs.client.ByteBufferPool; -import seaweedfs.client.FilerGrpcClient; -import seaweedfs.client.FilerProto; -import seaweedfs.client.SeaweedWrite; import java.io.IOException; import java.io.InterruptedIOException; @@ -18,21 +11,19 @@ import java.io.OutputStream; import java.nio.ByteBuffer; import java.util.concurrent.*; -import static seaweed.hdfs.SeaweedFileSystemStore.getParentDirectory; - public class SeaweedOutputStream extends OutputStream { private static final Logger LOG = LoggerFactory.getLogger(SeaweedOutputStream.class); - - private final FilerGrpcClient filerGrpcClient; - private final Path path; + protected final boolean supportFlush = true; + private final FilerClient filerClient; + private final String path; private final int bufferSize; private final int maxConcurrentRequestCount; private final ThreadPoolExecutor threadExecutor; private final ExecutorCompletionService<Void> completionService; - private final FilerProto.Entry.Builder entry; - private final boolean supportFlush = false; // true; private final ConcurrentLinkedDeque<WriteOperation> writeOperations; + private final boolean shouldSaveMetadata = false; + private FilerProto.Entry.Builder entry; private long position; private boolean closed; private volatile IOException lastError; @@ -42,9 +33,17 @@ public class SeaweedOutputStream extends OutputStream { private long outputIndex; private String replication = "000"; - public SeaweedOutputStream(FilerGrpcClient filerGrpcClient, final Path path, FilerProto.Entry.Builder entry, + public SeaweedOutputStream(FilerClient filerClient, final String fullpath) { + this(filerClient, fullpath, "000"); + } + + public SeaweedOutputStream(FilerClient filerClient, final String fullpath, final String replication) { + this(filerClient, fullpath, null, 0, 8 * 1024 * 1024, "000"); + } + + public SeaweedOutputStream(FilerClient filerClient, final String path, FilerProto.Entry.Builder entry, final long position, final int bufferSize, final String replication) { - this.filerGrpcClient = filerGrpcClient; + this.filerClient = filerClient; this.replication = replication; this.path = path; this.position = position; @@ -67,12 +66,50 @@ public class SeaweedOutputStream extends OutputStream { this.completionService = new ExecutorCompletionService<>(this.threadExecutor); this.entry = entry; + if (this.entry == null) { + long now = System.currentTimeMillis() / 1000L; + + this.entry = FilerProto.Entry.newBuilder() + .setName(getFileName(path)) + .setIsDirectory(false) + .setAttributes(FilerProto.FuseAttributes.newBuilder() + .setFileMode(0755) + .setReplication(replication) + .setCrtime(now) + .setMtime(now) + .clearGroupName() + ); + } } + public static String getParentDirectory(String path) { + int protoIndex = path.indexOf("://"); + if (protoIndex >= 0) { + int pathStart = path.indexOf("/", protoIndex+3); + path = path.substring(pathStart); + } + if (path.equals("/")) { + return path; + } + int lastSlashIndex = path.lastIndexOf("/"); + if (lastSlashIndex == 0) { + return "/"; + } + return path.substring(0, lastSlashIndex); + } + + public static String getFileName(String path) { + if (path.indexOf("/") < 0) { + return path; + } + int lastSlashIndex = path.lastIndexOf("/"); + return path.substring(lastSlashIndex + 1); + } + private synchronized void flushWrittenBytesToServiceInternal(final long offset) throws IOException { try { - SeaweedWrite.writeMeta(filerGrpcClient, getParentDirectory(path), entry); + SeaweedWrite.writeMeta(filerClient, getParentDirectory(path), entry); } catch (Exception ex) { throw new IOException(ex); } @@ -89,7 +126,9 @@ public class SeaweedOutputStream extends OutputStream { throws IOException { maybeThrowLastError(); - Preconditions.checkArgument(data != null, "null data"); + if (data == null) { + return; + } if (off < 0 || length < 0 || length > data.length - off) { throw new IndexOutOfBoundsException(); @@ -152,7 +191,7 @@ public class SeaweedOutputStream extends OutputStream { flushInternal(); threadExecutor.shutdown(); } finally { - lastError = new IOException(FSExceptionMessages.STREAM_IS_CLOSED); + lastError = new IOException("Stream is closed!"); ByteBufferPool.release(buffer); buffer = null; outputIndex = 0; @@ -162,6 +201,7 @@ public class SeaweedOutputStream extends OutputStream { threadExecutor.shutdownNow(); } } + } private synchronized void writeCurrentBufferToService() throws IOException { @@ -185,7 +225,7 @@ public class SeaweedOutputStream extends OutputStream { } final Future<Void> job = completionService.submit(() -> { // System.out.println(path + " is going to save [" + (writePosition) + "," + ((writePosition) + bytesLength) + ")"); - SeaweedWrite.writeData(entry, replication, filerGrpcClient, writePosition, bufferToWrite.array(), bufferToWrite.position(), bufferToWrite.limit(), path.toUri().getPath()); + SeaweedWrite.writeData(entry, replication, filerClient, writePosition, bufferToWrite.array(), bufferToWrite.position(), bufferToWrite.limit(), path); // System.out.println(path + " saved [" + (writePosition) + "," + ((writePosition) + bytesLength) + ")"); ByteBufferPool.release(bufferToWrite); return null; @@ -239,13 +279,13 @@ public class SeaweedOutputStream extends OutputStream { } } - private synchronized void flushInternal() throws IOException { + protected synchronized void flushInternal() throws IOException { maybeThrowLastError(); writeCurrentBufferToService(); flushWrittenBytesToService(); } - private synchronized void flushInternalAsync() throws IOException { + protected synchronized void flushInternalAsync() throws IOException { maybeThrowLastError(); writeCurrentBufferToService(); flushWrittenBytesToServiceAsync(); @@ -278,10 +318,6 @@ public class SeaweedOutputStream extends OutputStream { private final long length; WriteOperation(final Future<Void> task, final long startOffset, final long length) { - Preconditions.checkNotNull(task, "task"); - Preconditions.checkArgument(startOffset >= 0, "startOffset"); - Preconditions.checkArgument(length >= 0, "length"); - this.task = task; this.startOffset = startOffset; this.length = length; diff --git a/other/java/client/src/main/java/seaweedfs/client/SeaweedRead.java b/other/java/client/src/main/java/seaweedfs/client/SeaweedRead.java index c45987bed..384636601 100644 --- a/other/java/client/src/main/java/seaweedfs/client/SeaweedRead.java +++ b/other/java/client/src/main/java/seaweedfs/client/SeaweedRead.java @@ -23,7 +23,7 @@ public class SeaweedRead { static VolumeIdCache volumeIdCache = new VolumeIdCache(4 * 1024); // returns bytesRead - public static long read(FilerGrpcClient filerGrpcClient, List<VisibleInterval> visibleIntervals, + public static long read(FilerClient filerClient, List<VisibleInterval> visibleIntervals, final long position, final ByteBuffer buf, final long fileSize) throws IOException { List<ChunkView> chunkViews = viewFromVisibles(visibleIntervals, position, buf.remaining()); @@ -42,7 +42,7 @@ public class SeaweedRead { } if (lookupRequest.getVolumeIdsCount() > 0) { - FilerProto.LookupVolumeResponse lookupResponse = filerGrpcClient + FilerProto.LookupVolumeResponse lookupResponse = filerClient .getBlockingStub().lookupVolume(lookupRequest.build()); Map<String, FilerProto.Locations> vid2Locations = lookupResponse.getLocationsMapMap(); for (Map.Entry<String, FilerProto.Locations> entry : vid2Locations.entrySet()) { @@ -71,7 +71,7 @@ public class SeaweedRead { return 0; } - int len = readChunkView(startOffset, buf, chunkView, locations); + int len = readChunkView(filerClient, startOffset, buf, chunkView, locations); LOG.debug("read [{},{}) {} size {}", startOffset, startOffset + len, chunkView.fileId, chunkView.size); @@ -93,12 +93,12 @@ public class SeaweedRead { return readCount; } - private static int readChunkView(long startOffset, ByteBuffer buf, ChunkView chunkView, FilerProto.Locations locations) throws IOException { + private static int readChunkView(FilerClient filerClient, long startOffset, ByteBuffer buf, ChunkView chunkView, FilerProto.Locations locations) throws IOException { byte[] chunkData = chunkCache.getChunk(chunkView.fileId); if (chunkData == null) { - chunkData = doFetchFullChunkData(chunkView, locations); + chunkData = doFetchFullChunkData(filerClient, chunkView, locations); chunkCache.setChunk(chunkView.fileId, chunkData); } @@ -110,13 +110,13 @@ public class SeaweedRead { return len; } - public static byte[] doFetchFullChunkData(ChunkView chunkView, FilerProto.Locations locations) throws IOException { + public static byte[] doFetchFullChunkData(FilerClient filerClient, ChunkView chunkView, FilerProto.Locations locations) throws IOException { byte[] data = null; IOException lastException = null; for (long waitTime = 1000L; waitTime < 10 * 1000; waitTime += waitTime / 2) { for (FilerProto.Location location : locations.getLocationsList()) { - String url = String.format("http://%s/%s", location.getUrl(), chunkView.fileId); + String url = filerClient.getChunkUrl(chunkView.fileId, location.getUrl(), location.getPublicUrl()); try { data = doFetchOneFullChunkData(chunkView, url); lastException = null; @@ -145,7 +145,7 @@ public class SeaweedRead { } - public static byte[] doFetchOneFullChunkData(ChunkView chunkView, String url) throws IOException { + private static byte[] doFetchOneFullChunkData(ChunkView chunkView, String url) throws IOException { HttpGet request = new HttpGet(url); @@ -221,9 +221,9 @@ public class SeaweedRead { } public static List<VisibleInterval> nonOverlappingVisibleIntervals( - final FilerGrpcClient filerGrpcClient, List<FilerProto.FileChunk> chunkList) throws IOException { + final FilerClient filerClient, List<FilerProto.FileChunk> chunkList) throws IOException { - chunkList = FileChunkManifest.resolveChunkManifest(filerGrpcClient, chunkList); + chunkList = FileChunkManifest.resolveChunkManifest(filerClient, chunkList); FilerProto.FileChunk[] chunks = chunkList.toArray(new FilerProto.FileChunk[0]); Arrays.sort(chunks, new Comparator<FilerProto.FileChunk>() { diff --git a/other/java/client/src/main/java/seaweedfs/client/SeaweedWrite.java b/other/java/client/src/main/java/seaweedfs/client/SeaweedWrite.java index b8fd3e299..f8c0c76b6 100644 --- a/other/java/client/src/main/java/seaweedfs/client/SeaweedWrite.java +++ b/other/java/client/src/main/java/seaweedfs/client/SeaweedWrite.java @@ -23,41 +23,41 @@ public class SeaweedWrite { public static void writeData(FilerProto.Entry.Builder entry, final String replication, - final FilerGrpcClient filerGrpcClient, + final FilerClient filerClient, final long offset, final byte[] bytes, final long bytesOffset, final long bytesLength, final String path) throws IOException { FilerProto.FileChunk.Builder chunkBuilder = writeChunk( - replication, filerGrpcClient, offset, bytes, bytesOffset, bytesLength, path); + replication, filerClient, offset, bytes, bytesOffset, bytesLength, path); synchronized (entry) { entry.addChunks(chunkBuilder); } } public static FilerProto.FileChunk.Builder writeChunk(final String replication, - final FilerGrpcClient filerGrpcClient, + final FilerClient filerClient, final long offset, final byte[] bytes, final long bytesOffset, final long bytesLength, final String path) throws IOException { - FilerProto.AssignVolumeResponse response = filerGrpcClient.getBlockingStub().assignVolume( + FilerProto.AssignVolumeResponse response = filerClient.getBlockingStub().assignVolume( FilerProto.AssignVolumeRequest.newBuilder() - .setCollection(filerGrpcClient.getCollection()) - .setReplication(replication == null ? filerGrpcClient.getReplication() : replication) + .setCollection(filerClient.getCollection()) + .setReplication(replication == null ? filerClient.getReplication() : replication) .setDataCenter("") .setTtlSec(0) .setPath(path) .build()); String fileId = response.getFileId(); - String url = response.getUrl(); String auth = response.getAuth(); - String targetUrl = String.format("http://%s/%s", url, fileId); + + String targetUrl = filerClient.getChunkUrl(fileId, response.getUrl(), response.getPublicUrl()); ByteString cipherKeyString = com.google.protobuf.ByteString.EMPTY; byte[] cipherKey = null; - if (filerGrpcClient.isCipher()) { + if (filerClient.isCipher()) { cipherKey = genCipherKey(); cipherKeyString = ByteString.copyFrom(cipherKey); } @@ -75,15 +75,15 @@ public class SeaweedWrite { .setCipherKey(cipherKeyString); } - public static void writeMeta(final FilerGrpcClient filerGrpcClient, + public static void writeMeta(final FilerClient filerClient, final String parentDirectory, final FilerProto.Entry.Builder entry) throws IOException { synchronized (entry) { - List<FilerProto.FileChunk> chunks = FileChunkManifest.maybeManifestize(filerGrpcClient, entry.getChunksList(), parentDirectory); + List<FilerProto.FileChunk> chunks = FileChunkManifest.maybeManifestize(filerClient, entry.getChunksList(), parentDirectory); entry.clearChunks(); entry.addAllChunks(chunks); - filerGrpcClient.getBlockingStub().createEntry( + filerClient.getBlockingStub().createEntry( FilerProto.CreateEntryRequest.newBuilder() .setDirectory(parentDirectory) .setEntry(entry) diff --git a/other/java/examples/pom.xml b/other/java/examples/pom.xml index f7c48d0ab..2456113d0 100644 --- a/other/java/examples/pom.xml +++ b/other/java/examples/pom.xml @@ -11,13 +11,13 @@ <dependency> <groupId>com.github.chrislusf</groupId> <artifactId>seaweedfs-client</artifactId> - <version>1.5.6</version> + <version>1.6.1</version> <scope>compile</scope> </dependency> <dependency> <groupId>com.github.chrislusf</groupId> <artifactId>seaweedfs-hadoop2-client</artifactId> - <version>1.5.6</version> + <version>1.6.1</version> <scope>compile</scope> </dependency> <dependency> diff --git a/other/java/examples/src/main/java/com/seaweedfs/examples/UnzipFile.java b/other/java/examples/src/main/java/com/seaweedfs/examples/ExampleReadFile.java index 0529a5c73..d2eb94135 100644 --- a/other/java/examples/src/main/java/com/seaweedfs/examples/UnzipFile.java +++ b/other/java/examples/src/main/java/com/seaweedfs/examples/ExampleReadFile.java @@ -1,8 +1,7 @@ package com.seaweedfs.examples; -import seaweed.hdfs.SeaweedInputStream; import seaweedfs.client.FilerClient; -import seaweedfs.client.FilerGrpcClient; +import seaweedfs.client.SeaweedInputStream; import java.io.FileInputStream; import java.io.IOException; @@ -10,12 +9,11 @@ import java.io.InputStream; import java.util.zip.ZipEntry; import java.util.zip.ZipInputStream; -public class UnzipFile { +public class ExampleReadFile { public static void main(String[] args) throws IOException { - FilerGrpcClient filerGrpcClient = new FilerGrpcClient("localhost", 18888); - FilerClient filerClient = new FilerClient(filerGrpcClient); + FilerClient filerClient = new FilerClient("localhost", 18888); long startTime = System.currentTimeMillis(); parseZip("/Users/chris/tmp/test.zip"); @@ -25,11 +23,7 @@ public class UnzipFile { long localProcessTime = startTime2 - startTime; SeaweedInputStream seaweedInputStream = new SeaweedInputStream( - filerGrpcClient, - new org.apache.hadoop.fs.FileSystem.Statistics(""), - "/", - filerClient.lookupEntry("/", "test.zip") - ); + filerClient, "/test.zip"); parseZip(seaweedInputStream); long swProcessTime = System.currentTimeMillis() - startTime2; diff --git a/other/java/examples/src/main/java/com/seaweedfs/examples/WatchFiles.java b/other/java/examples/src/main/java/com/seaweedfs/examples/ExampleWatchFileChanges.java index e489cb3b1..72c572d31 100644 --- a/other/java/examples/src/main/java/com/seaweedfs/examples/WatchFiles.java +++ b/other/java/examples/src/main/java/com/seaweedfs/examples/ExampleWatchFileChanges.java @@ -7,7 +7,7 @@ import java.io.IOException; import java.util.Date; import java.util.Iterator; -public class WatchFiles { +public class ExampleWatchFileChanges { public static void main(String[] args) throws IOException { diff --git a/other/java/examples/src/main/java/com/seaweedfs/examples/ExampleWriteFile.java b/other/java/examples/src/main/java/com/seaweedfs/examples/ExampleWriteFile.java new file mode 100644 index 000000000..26b74028f --- /dev/null +++ b/other/java/examples/src/main/java/com/seaweedfs/examples/ExampleWriteFile.java @@ -0,0 +1,47 @@ +package com.seaweedfs.examples; + +import seaweedfs.client.FilerClient; +import seaweedfs.client.SeaweedInputStream; +import seaweedfs.client.SeaweedOutputStream; + +import java.io.IOException; +import java.io.InputStream; +import java.util.zip.ZipEntry; +import java.util.zip.ZipInputStream; + +public class ExampleWriteFile { + + public static void main(String[] args) throws IOException { + + FilerClient filerClient = new FilerClient("localhost", 18888); + + SeaweedInputStream seaweedInputStream = new SeaweedInputStream(filerClient, "/test.zip"); + unZipFiles(filerClient, seaweedInputStream); + + } + + public static void unZipFiles(FilerClient filerClient, InputStream is) throws IOException { + ZipInputStream zin = new ZipInputStream(is); + ZipEntry ze; + while ((ze = zin.getNextEntry()) != null) { + + String filename = ze.getName(); + if (filename.indexOf("/") >= 0) { + filename = filename.substring(filename.lastIndexOf("/") + 1); + } + if (filename.length()==0) { + continue; + } + + SeaweedOutputStream seaweedOutputStream = new SeaweedOutputStream(filerClient, "/test/"+filename); + byte[] bytesIn = new byte[16 * 1024]; + int read = 0; + while ((read = zin.read(bytesIn))!=-1) { + seaweedOutputStream.write(bytesIn,0,read); + } + seaweedOutputStream.close(); + + System.out.println(ze.getName()); + } + } +} diff --git a/other/java/examples/src/main/java/com/seaweedfs/examples/HdfsCopyFile.java b/other/java/examples/src/main/java/com/seaweedfs/examples/HdfsCopyFile.java new file mode 100644 index 000000000..006c581c9 --- /dev/null +++ b/other/java/examples/src/main/java/com/seaweedfs/examples/HdfsCopyFile.java @@ -0,0 +1,25 @@ +package com.seaweedfs.examples; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.io.IOUtils; + +import java.io.*; + +public class HdfsCopyFile { + public static void main(String[] args) throws IOException { + Configuration configuration = new Configuration(); + + configuration.set("fs.defaultFS", "seaweedfs://localhost:8888"); + configuration.set("fs.seaweedfs.impl", "seaweed.hdfs.SeaweedFileSystem"); + + FileSystem fs = FileSystem.get(configuration); + String source = "/Users/chris/tmp/test.zip"; + String destination = "/buckets/spark/test01.zip"; + InputStream in = new BufferedInputStream(new FileInputStream(source)); + + OutputStream out = fs.create(new Path(destination)); + IOUtils.copyBytes(in, out, 4096, true); + } +} diff --git a/other/java/hdfs2/dependency-reduced-pom.xml b/other/java/hdfs2/dependency-reduced-pom.xml index f7873a435..0680d86bb 100644 --- a/other/java/hdfs2/dependency-reduced-pom.xml +++ b/other/java/hdfs2/dependency-reduced-pom.xml @@ -301,7 +301,7 @@ </snapshotRepository>
</distributionManagement>
<properties>
- <seaweedfs.client.version>1.5.6</seaweedfs.client.version>
+ <seaweedfs.client.version>1.6.1</seaweedfs.client.version>
<hadoop.version>2.9.2</hadoop.version>
</properties>
</project>
diff --git a/other/java/hdfs2/pom.xml b/other/java/hdfs2/pom.xml index bda0eba56..897477066 100644 --- a/other/java/hdfs2/pom.xml +++ b/other/java/hdfs2/pom.xml @@ -5,7 +5,7 @@ <modelVersion>4.0.0</modelVersion> <properties> - <seaweedfs.client.version>1.5.6</seaweedfs.client.version> + <seaweedfs.client.version>1.6.1</seaweedfs.client.version> <hadoop.version>2.9.2</hadoop.version> </properties> diff --git a/other/java/hdfs2/src/main/java/seaweed/hdfs/SeaweedFileSystem.java b/other/java/hdfs2/src/main/java/seaweed/hdfs/SeaweedFileSystem.java index 84f11e846..25395db7a 100644 --- a/other/java/hdfs2/src/main/java/seaweed/hdfs/SeaweedFileSystem.java +++ b/other/java/hdfs2/src/main/java/seaweed/hdfs/SeaweedFileSystem.java @@ -26,6 +26,7 @@ public class SeaweedFileSystem extends FileSystem { public static final int FS_SEAWEED_DEFAULT_PORT = 8888; public static final String FS_SEAWEED_BUFFER_SIZE = "fs.seaweed.buffer.size"; public static final String FS_SEAWEED_REPLICATION = "fs.seaweed.replication"; + public static final String FS_SEAWEED_VOLUME_SERVER_ACCESS = "fs.seaweed.volume.server.access"; public static final int FS_SEAWEED_DEFAULT_BUFFER_SIZE = 4 * 1024 * 1024; private static final Logger LOG = LoggerFactory.getLogger(SeaweedFileSystem.class); diff --git a/other/java/hdfs2/src/main/java/seaweed/hdfs/SeaweedFileSystemStore.java b/other/java/hdfs2/src/main/java/seaweed/hdfs/SeaweedFileSystemStore.java index 14b32528e..f4e8c9349 100644 --- a/other/java/hdfs2/src/main/java/seaweed/hdfs/SeaweedFileSystemStore.java +++ b/other/java/hdfs2/src/main/java/seaweed/hdfs/SeaweedFileSystemStore.java @@ -18,27 +18,31 @@ import java.util.ArrayList; import java.util.Arrays; import java.util.List; -import static seaweed.hdfs.SeaweedFileSystem.FS_SEAWEED_BUFFER_SIZE; -import static seaweed.hdfs.SeaweedFileSystem.FS_SEAWEED_DEFAULT_BUFFER_SIZE; +import static seaweed.hdfs.SeaweedFileSystem.*; public class SeaweedFileSystemStore { private static final Logger LOG = LoggerFactory.getLogger(SeaweedFileSystemStore.class); - private FilerGrpcClient filerGrpcClient; private FilerClient filerClient; private Configuration conf; public SeaweedFileSystemStore(String host, int port, Configuration conf) { int grpcPort = 10000 + port; - filerGrpcClient = new FilerGrpcClient(host, grpcPort); - filerClient = new FilerClient(filerGrpcClient); + filerClient = new FilerClient(host, grpcPort); this.conf = conf; + String volumeServerAccessMode = this.conf.get(FS_SEAWEED_VOLUME_SERVER_ACCESS, "direct"); + if (volumeServerAccessMode.equals("publicUrl")) { + filerClient.setAccessVolumeServerByPublicUrl(); + } else if (volumeServerAccessMode.equals("filerProxy")) { + filerClient.setAccessVolumeServerByFilerProxy(); + } + } public void close() { try { - this.filerGrpcClient.shutdown(); + this.filerClient.shutdown(); } catch (InterruptedException e) { e.printStackTrace(); } @@ -213,10 +217,10 @@ public class SeaweedFileSystemStore { .clearGroupName() .addAllGroupName(Arrays.asList(userGroupInformation.getGroupNames())) ); - SeaweedWrite.writeMeta(filerGrpcClient, getParentDirectory(path), entry); + SeaweedWrite.writeMeta(filerClient, getParentDirectory(path), entry); } - return new SeaweedOutputStream(filerGrpcClient, path, entry, writePosition, bufferSize, replication); + return new SeaweedHadoopOutputStream(filerClient, path.toString(), entry, writePosition, bufferSize, replication); } @@ -230,7 +234,7 @@ public class SeaweedFileSystemStore { throw new FileNotFoundException("read non-exist file " + path); } - return new SeaweedInputStream(filerGrpcClient, + return new SeaweedHadoopInputStream(filerClient, statistics, path.toUri().getPath(), entry); diff --git a/other/java/hdfs2/src/main/java/seaweed/hdfs/SeaweedHadoopInputStream.java b/other/java/hdfs2/src/main/java/seaweed/hdfs/SeaweedHadoopInputStream.java new file mode 100644 index 000000000..f26eae597 --- /dev/null +++ b/other/java/hdfs2/src/main/java/seaweed/hdfs/SeaweedHadoopInputStream.java @@ -0,0 +1,150 @@ +package seaweed.hdfs; + +// based on org.apache.hadoop.fs.azurebfs.services.AbfsInputStream + +import org.apache.hadoop.fs.ByteBufferReadable; +import org.apache.hadoop.fs.FSInputStream; +import org.apache.hadoop.fs.FileSystem.Statistics; +import seaweedfs.client.FilerClient; +import seaweedfs.client.FilerProto; +import seaweedfs.client.SeaweedInputStream; + +import java.io.EOFException; +import java.io.IOException; +import java.nio.ByteBuffer; + +public class SeaweedHadoopInputStream extends FSInputStream implements ByteBufferReadable { + + private final SeaweedInputStream seaweedInputStream; + private final Statistics statistics; + + public SeaweedHadoopInputStream( + final FilerClient filerClient, + final Statistics statistics, + final String path, + final FilerProto.Entry entry) throws IOException { + this.seaweedInputStream = new SeaweedInputStream(filerClient, path, entry); + this.statistics = statistics; + } + + @Override + public int read() throws IOException { + return seaweedInputStream.read(); + } + + @Override + public int read(final byte[] b, final int off, final int len) throws IOException { + return seaweedInputStream.read(b, off, len); + } + + // implement ByteBufferReadable + @Override + public synchronized int read(ByteBuffer buf) throws IOException { + int bytesRead = seaweedInputStream.read(buf); + + if (bytesRead > 0) { + if (statistics != null) { + statistics.incrementBytesRead(bytesRead); + } + } + + return bytesRead; + } + + /** + * Seek to given position in stream. + * + * @param n position to seek to + * @throws IOException if there is an error + * @throws EOFException if attempting to seek past end of file + */ + @Override + public synchronized void seek(long n) throws IOException { + seaweedInputStream.seek(n); + } + + @Override + public synchronized long skip(long n) throws IOException { + return seaweedInputStream.skip(n); + } + + /** + * Return the size of the remaining available bytes + * if the size is less than or equal to {@link Integer#MAX_VALUE}, + * otherwise, return {@link Integer#MAX_VALUE}. + * <p> + * This is to match the behavior of DFSInputStream.available(), + * which some clients may rely on (HBase write-ahead log reading in + * particular). + */ + @Override + public synchronized int available() throws IOException { + return seaweedInputStream.available(); + } + + /** + * Returns the length of the file that this stream refers to. Note that the length returned is the length + * as of the time the Stream was opened. Specifically, if there have been subsequent appends to the file, + * they wont be reflected in the returned length. + * + * @return length of the file. + * @throws IOException if the stream is closed + */ + public long length() throws IOException { + return seaweedInputStream.length(); + } + + /** + * Return the current offset from the start of the file + * + * @throws IOException throws {@link IOException} if there is an error + */ + @Override + public synchronized long getPos() throws IOException { + return seaweedInputStream.getPos(); + } + + /** + * Seeks a different copy of the data. Returns true if + * found a new source, false otherwise. + * + * @throws IOException throws {@link IOException} if there is an error + */ + @Override + public boolean seekToNewSource(long l) throws IOException { + return false; + } + + @Override + public synchronized void close() throws IOException { + seaweedInputStream.close(); + } + + /** + * Not supported by this stream. Throws {@link UnsupportedOperationException} + * + * @param readlimit ignored + */ + @Override + public synchronized void mark(int readlimit) { + throw new UnsupportedOperationException("mark()/reset() not supported on this stream"); + } + + /** + * Not supported by this stream. Throws {@link UnsupportedOperationException} + */ + @Override + public synchronized void reset() throws IOException { + throw new UnsupportedOperationException("mark()/reset() not supported on this stream"); + } + + /** + * gets whether mark and reset are supported by {@code ADLFileInputStream}. Always returns false. + * + * @return always {@code false} + */ + @Override + public boolean markSupported() { + return false; + } +} diff --git a/other/java/hdfs2/src/main/java/seaweed/hdfs/SeaweedHadoopOutputStream.java b/other/java/hdfs2/src/main/java/seaweed/hdfs/SeaweedHadoopOutputStream.java new file mode 100644 index 000000000..da5b56bbc --- /dev/null +++ b/other/java/hdfs2/src/main/java/seaweed/hdfs/SeaweedHadoopOutputStream.java @@ -0,0 +1,16 @@ +package seaweed.hdfs; + +// adapted from org.apache.hadoop.fs.azurebfs.services.AbfsOutputStream + +import seaweedfs.client.FilerClient; +import seaweedfs.client.FilerProto; +import seaweedfs.client.SeaweedOutputStream; + +public class SeaweedHadoopOutputStream extends SeaweedOutputStream { + + public SeaweedHadoopOutputStream(FilerClient filerClient, final String path, FilerProto.Entry.Builder entry, + final long position, final int bufferSize, final String replication) { + super(filerClient, path, entry, position, bufferSize, replication); + } + +} diff --git a/other/java/hdfs2/src/main/java/seaweed/hdfs/SeaweedInputStream.java b/other/java/hdfs2/src/main/java/seaweed/hdfs/SeaweedInputStream.java deleted file mode 100644 index 690366849..000000000 --- a/other/java/hdfs2/src/main/java/seaweed/hdfs/SeaweedInputStream.java +++ /dev/null @@ -1,259 +0,0 @@ -package seaweed.hdfs; - -// based on org.apache.hadoop.fs.azurebfs.services.AbfsInputStream - -import org.apache.hadoop.fs.ByteBufferReadable; -import org.apache.hadoop.fs.FSExceptionMessages; -import org.apache.hadoop.fs.FSInputStream; -import org.apache.hadoop.fs.FileSystem.Statistics; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; -import seaweedfs.client.FilerGrpcClient; -import seaweedfs.client.FilerProto; -import seaweedfs.client.SeaweedRead; - -import java.io.EOFException; -import java.io.IOException; -import java.nio.ByteBuffer; -import java.util.List; - -public class SeaweedInputStream extends FSInputStream implements ByteBufferReadable { - - private static final Logger LOG = LoggerFactory.getLogger(SeaweedInputStream.class); - - private final FilerGrpcClient filerGrpcClient; - private final Statistics statistics; - private final String path; - private final FilerProto.Entry entry; - private final List<SeaweedRead.VisibleInterval> visibleIntervalList; - private final long contentLength; - - private long position = 0; // cursor of the file - - private boolean closed = false; - - public SeaweedInputStream( - final FilerGrpcClient filerGrpcClient, - final Statistics statistics, - final String path, - final FilerProto.Entry entry) throws IOException { - this.filerGrpcClient = filerGrpcClient; - this.statistics = statistics; - this.path = path; - this.entry = entry; - this.contentLength = SeaweedRead.fileSize(entry); - - this.visibleIntervalList = SeaweedRead.nonOverlappingVisibleIntervals(filerGrpcClient, entry.getChunksList()); - - LOG.debug("new path:{} entry:{} visibleIntervalList:{}", path, entry, visibleIntervalList); - - } - - public String getPath() { - return path; - } - - @Override - public int read() throws IOException { - byte[] b = new byte[1]; - int numberOfBytesRead = read(b, 0, 1); - if (numberOfBytesRead < 0) { - return -1; - } else { - return (b[0] & 0xFF); - } - } - - @Override - public int read(final byte[] b, final int off, final int len) throws IOException { - - if (b == null) { - throw new IllegalArgumentException("null byte array passed in to read() method"); - } - if (off >= b.length) { - throw new IllegalArgumentException("offset greater than length of array"); - } - if (len < 0) { - throw new IllegalArgumentException("requested read length is less than zero"); - } - if (len > (b.length - off)) { - throw new IllegalArgumentException("requested read length is more than will fit after requested offset in buffer"); - } - - ByteBuffer buf = ByteBuffer.wrap(b, off, len); - return read(buf); - - } - - // implement ByteBufferReadable - @Override - public synchronized int read(ByteBuffer buf) throws IOException { - - if (position < 0) { - throw new IllegalArgumentException("attempting to read from negative offset"); - } - if (position >= contentLength) { - return -1; // Hadoop prefers -1 to EOFException - } - - long bytesRead = 0; - int len = buf.remaining(); - int start = (int) this.position; - if (start+len <= entry.getContent().size()) { - entry.getContent().substring(start, start+len).copyTo(buf); - } else { - bytesRead = SeaweedRead.read(this.filerGrpcClient, this.visibleIntervalList, this.position, buf, SeaweedRead.fileSize(entry)); - } - - if (bytesRead > Integer.MAX_VALUE) { - throw new IOException("Unexpected Content-Length"); - } - - if (bytesRead > 0) { - this.position += bytesRead; - if (statistics != null) { - statistics.incrementBytesRead(bytesRead); - } - } - - return (int) bytesRead; - } - - /** - * Seek to given position in stream. - * - * @param n position to seek to - * @throws IOException if there is an error - * @throws EOFException if attempting to seek past end of file - */ - @Override - public synchronized void seek(long n) throws IOException { - if (closed) { - throw new IOException(FSExceptionMessages.STREAM_IS_CLOSED); - } - if (n < 0) { - throw new EOFException(FSExceptionMessages.NEGATIVE_SEEK); - } - if (n > contentLength) { - throw new EOFException(FSExceptionMessages.CANNOT_SEEK_PAST_EOF); - } - - this.position = n; - - } - - @Override - public synchronized long skip(long n) throws IOException { - if (closed) { - throw new IOException(FSExceptionMessages.STREAM_IS_CLOSED); - } - if (this.position == contentLength) { - if (n > 0) { - throw new EOFException(FSExceptionMessages.CANNOT_SEEK_PAST_EOF); - } - } - long newPos = this.position + n; - if (newPos < 0) { - newPos = 0; - n = newPos - this.position; - } - if (newPos > contentLength) { - newPos = contentLength; - n = newPos - this.position; - } - seek(newPos); - return n; - } - - /** - * Return the size of the remaining available bytes - * if the size is less than or equal to {@link Integer#MAX_VALUE}, - * otherwise, return {@link Integer#MAX_VALUE}. - * <p> - * This is to match the behavior of DFSInputStream.available(), - * which some clients may rely on (HBase write-ahead log reading in - * particular). - */ - @Override - public synchronized int available() throws IOException { - if (closed) { - throw new IOException( - FSExceptionMessages.STREAM_IS_CLOSED); - } - final long remaining = this.contentLength - this.getPos(); - return remaining <= Integer.MAX_VALUE - ? (int) remaining : Integer.MAX_VALUE; - } - - /** - * Returns the length of the file that this stream refers to. Note that the length returned is the length - * as of the time the Stream was opened. Specifically, if there have been subsequent appends to the file, - * they wont be reflected in the returned length. - * - * @return length of the file. - * @throws IOException if the stream is closed - */ - public long length() throws IOException { - if (closed) { - throw new IOException(FSExceptionMessages.STREAM_IS_CLOSED); - } - return contentLength; - } - - /** - * Return the current offset from the start of the file - * - * @throws IOException throws {@link IOException} if there is an error - */ - @Override - public synchronized long getPos() throws IOException { - if (closed) { - throw new IOException(FSExceptionMessages.STREAM_IS_CLOSED); - } - return position; - } - - /** - * Seeks a different copy of the data. Returns true if - * found a new source, false otherwise. - * - * @throws IOException throws {@link IOException} if there is an error - */ - @Override - public boolean seekToNewSource(long l) throws IOException { - return false; - } - - @Override - public synchronized void close() throws IOException { - closed = true; - } - - /** - * Not supported by this stream. Throws {@link UnsupportedOperationException} - * - * @param readlimit ignored - */ - @Override - public synchronized void mark(int readlimit) { - throw new UnsupportedOperationException("mark()/reset() not supported on this stream"); - } - - /** - * Not supported by this stream. Throws {@link UnsupportedOperationException} - */ - @Override - public synchronized void reset() throws IOException { - throw new UnsupportedOperationException("mark()/reset() not supported on this stream"); - } - - /** - * gets whether mark and reset are supported by {@code ADLFileInputStream}. Always returns false. - * - * @return always {@code false} - */ - @Override - public boolean markSupported() { - return false; - } -} diff --git a/other/java/hdfs3/dependency-reduced-pom.xml b/other/java/hdfs3/dependency-reduced-pom.xml index 20b52e20f..2b4a1a494 100644 --- a/other/java/hdfs3/dependency-reduced-pom.xml +++ b/other/java/hdfs3/dependency-reduced-pom.xml @@ -309,7 +309,7 @@ </snapshotRepository>
</distributionManagement>
<properties>
- <seaweedfs.client.version>1.5.6</seaweedfs.client.version>
+ <seaweedfs.client.version>1.6.1</seaweedfs.client.version>
<hadoop.version>3.1.1</hadoop.version>
</properties>
</project>
diff --git a/other/java/hdfs3/pom.xml b/other/java/hdfs3/pom.xml index 85d8db859..49ff8f926 100644 --- a/other/java/hdfs3/pom.xml +++ b/other/java/hdfs3/pom.xml @@ -5,7 +5,7 @@ <modelVersion>4.0.0</modelVersion> <properties> - <seaweedfs.client.version>1.5.6</seaweedfs.client.version> + <seaweedfs.client.version>1.6.1</seaweedfs.client.version> <hadoop.version>3.1.1</hadoop.version> </properties> diff --git a/other/java/hdfs3/src/main/java/seaweed/hdfs/SeaweedFileSystem.java b/other/java/hdfs3/src/main/java/seaweed/hdfs/SeaweedFileSystem.java index 84f11e846..25395db7a 100644 --- a/other/java/hdfs3/src/main/java/seaweed/hdfs/SeaweedFileSystem.java +++ b/other/java/hdfs3/src/main/java/seaweed/hdfs/SeaweedFileSystem.java @@ -26,6 +26,7 @@ public class SeaweedFileSystem extends FileSystem { public static final int FS_SEAWEED_DEFAULT_PORT = 8888; public static final String FS_SEAWEED_BUFFER_SIZE = "fs.seaweed.buffer.size"; public static final String FS_SEAWEED_REPLICATION = "fs.seaweed.replication"; + public static final String FS_SEAWEED_VOLUME_SERVER_ACCESS = "fs.seaweed.volume.server.access"; public static final int FS_SEAWEED_DEFAULT_BUFFER_SIZE = 4 * 1024 * 1024; private static final Logger LOG = LoggerFactory.getLogger(SeaweedFileSystem.class); diff --git a/other/java/hdfs3/src/main/java/seaweed/hdfs/SeaweedFileSystemStore.java b/other/java/hdfs3/src/main/java/seaweed/hdfs/SeaweedFileSystemStore.java index 14b32528e..f4e8c9349 100644 --- a/other/java/hdfs3/src/main/java/seaweed/hdfs/SeaweedFileSystemStore.java +++ b/other/java/hdfs3/src/main/java/seaweed/hdfs/SeaweedFileSystemStore.java @@ -18,27 +18,31 @@ import java.util.ArrayList; import java.util.Arrays; import java.util.List; -import static seaweed.hdfs.SeaweedFileSystem.FS_SEAWEED_BUFFER_SIZE; -import static seaweed.hdfs.SeaweedFileSystem.FS_SEAWEED_DEFAULT_BUFFER_SIZE; +import static seaweed.hdfs.SeaweedFileSystem.*; public class SeaweedFileSystemStore { private static final Logger LOG = LoggerFactory.getLogger(SeaweedFileSystemStore.class); - private FilerGrpcClient filerGrpcClient; private FilerClient filerClient; private Configuration conf; public SeaweedFileSystemStore(String host, int port, Configuration conf) { int grpcPort = 10000 + port; - filerGrpcClient = new FilerGrpcClient(host, grpcPort); - filerClient = new FilerClient(filerGrpcClient); + filerClient = new FilerClient(host, grpcPort); this.conf = conf; + String volumeServerAccessMode = this.conf.get(FS_SEAWEED_VOLUME_SERVER_ACCESS, "direct"); + if (volumeServerAccessMode.equals("publicUrl")) { + filerClient.setAccessVolumeServerByPublicUrl(); + } else if (volumeServerAccessMode.equals("filerProxy")) { + filerClient.setAccessVolumeServerByFilerProxy(); + } + } public void close() { try { - this.filerGrpcClient.shutdown(); + this.filerClient.shutdown(); } catch (InterruptedException e) { e.printStackTrace(); } @@ -213,10 +217,10 @@ public class SeaweedFileSystemStore { .clearGroupName() .addAllGroupName(Arrays.asList(userGroupInformation.getGroupNames())) ); - SeaweedWrite.writeMeta(filerGrpcClient, getParentDirectory(path), entry); + SeaweedWrite.writeMeta(filerClient, getParentDirectory(path), entry); } - return new SeaweedOutputStream(filerGrpcClient, path, entry, writePosition, bufferSize, replication); + return new SeaweedHadoopOutputStream(filerClient, path.toString(), entry, writePosition, bufferSize, replication); } @@ -230,7 +234,7 @@ public class SeaweedFileSystemStore { throw new FileNotFoundException("read non-exist file " + path); } - return new SeaweedInputStream(filerGrpcClient, + return new SeaweedHadoopInputStream(filerClient, statistics, path.toUri().getPath(), entry); diff --git a/other/java/hdfs3/src/main/java/seaweed/hdfs/SeaweedHadoopInputStream.java b/other/java/hdfs3/src/main/java/seaweed/hdfs/SeaweedHadoopInputStream.java new file mode 100644 index 000000000..f26eae597 --- /dev/null +++ b/other/java/hdfs3/src/main/java/seaweed/hdfs/SeaweedHadoopInputStream.java @@ -0,0 +1,150 @@ +package seaweed.hdfs; + +// based on org.apache.hadoop.fs.azurebfs.services.AbfsInputStream + +import org.apache.hadoop.fs.ByteBufferReadable; +import org.apache.hadoop.fs.FSInputStream; +import org.apache.hadoop.fs.FileSystem.Statistics; +import seaweedfs.client.FilerClient; +import seaweedfs.client.FilerProto; +import seaweedfs.client.SeaweedInputStream; + +import java.io.EOFException; +import java.io.IOException; +import java.nio.ByteBuffer; + +public class SeaweedHadoopInputStream extends FSInputStream implements ByteBufferReadable { + + private final SeaweedInputStream seaweedInputStream; + private final Statistics statistics; + + public SeaweedHadoopInputStream( + final FilerClient filerClient, + final Statistics statistics, + final String path, + final FilerProto.Entry entry) throws IOException { + this.seaweedInputStream = new SeaweedInputStream(filerClient, path, entry); + this.statistics = statistics; + } + + @Override + public int read() throws IOException { + return seaweedInputStream.read(); + } + + @Override + public int read(final byte[] b, final int off, final int len) throws IOException { + return seaweedInputStream.read(b, off, len); + } + + // implement ByteBufferReadable + @Override + public synchronized int read(ByteBuffer buf) throws IOException { + int bytesRead = seaweedInputStream.read(buf); + + if (bytesRead > 0) { + if (statistics != null) { + statistics.incrementBytesRead(bytesRead); + } + } + + return bytesRead; + } + + /** + * Seek to given position in stream. + * + * @param n position to seek to + * @throws IOException if there is an error + * @throws EOFException if attempting to seek past end of file + */ + @Override + public synchronized void seek(long n) throws IOException { + seaweedInputStream.seek(n); + } + + @Override + public synchronized long skip(long n) throws IOException { + return seaweedInputStream.skip(n); + } + + /** + * Return the size of the remaining available bytes + * if the size is less than or equal to {@link Integer#MAX_VALUE}, + * otherwise, return {@link Integer#MAX_VALUE}. + * <p> + * This is to match the behavior of DFSInputStream.available(), + * which some clients may rely on (HBase write-ahead log reading in + * particular). + */ + @Override + public synchronized int available() throws IOException { + return seaweedInputStream.available(); + } + + /** + * Returns the length of the file that this stream refers to. Note that the length returned is the length + * as of the time the Stream was opened. Specifically, if there have been subsequent appends to the file, + * they wont be reflected in the returned length. + * + * @return length of the file. + * @throws IOException if the stream is closed + */ + public long length() throws IOException { + return seaweedInputStream.length(); + } + + /** + * Return the current offset from the start of the file + * + * @throws IOException throws {@link IOException} if there is an error + */ + @Override + public synchronized long getPos() throws IOException { + return seaweedInputStream.getPos(); + } + + /** + * Seeks a different copy of the data. Returns true if + * found a new source, false otherwise. + * + * @throws IOException throws {@link IOException} if there is an error + */ + @Override + public boolean seekToNewSource(long l) throws IOException { + return false; + } + + @Override + public synchronized void close() throws IOException { + seaweedInputStream.close(); + } + + /** + * Not supported by this stream. Throws {@link UnsupportedOperationException} + * + * @param readlimit ignored + */ + @Override + public synchronized void mark(int readlimit) { + throw new UnsupportedOperationException("mark()/reset() not supported on this stream"); + } + + /** + * Not supported by this stream. Throws {@link UnsupportedOperationException} + */ + @Override + public synchronized void reset() throws IOException { + throw new UnsupportedOperationException("mark()/reset() not supported on this stream"); + } + + /** + * gets whether mark and reset are supported by {@code ADLFileInputStream}. Always returns false. + * + * @return always {@code false} + */ + @Override + public boolean markSupported() { + return false; + } +} diff --git a/other/java/hdfs3/src/main/java/seaweed/hdfs/SeaweedHadoopOutputStream.java b/other/java/hdfs3/src/main/java/seaweed/hdfs/SeaweedHadoopOutputStream.java new file mode 100644 index 000000000..1740312fe --- /dev/null +++ b/other/java/hdfs3/src/main/java/seaweed/hdfs/SeaweedHadoopOutputStream.java @@ -0,0 +1,64 @@ +package seaweed.hdfs; + +// adapted from org.apache.hadoop.fs.azurebfs.services.AbfsOutputStream + +import org.apache.hadoop.fs.StreamCapabilities; +import org.apache.hadoop.fs.Syncable; +import seaweedfs.client.FilerClient; +import seaweedfs.client.FilerProto; +import seaweedfs.client.SeaweedOutputStream; + +import java.io.IOException; +import java.util.Locale; + +public class SeaweedHadoopOutputStream extends SeaweedOutputStream implements Syncable, StreamCapabilities { + + public SeaweedHadoopOutputStream(FilerClient filerClient, final String path, FilerProto.Entry.Builder entry, + final long position, final int bufferSize, final String replication) { + super(filerClient, path, entry, position, bufferSize, replication); + } + + /** + * Similar to posix fsync, flush out the data in client's user buffer + * all the way to the disk device (but the disk may have it in its cache). + * + * @throws IOException if error occurs + */ + @Override + public void hsync() throws IOException { + if (supportFlush) { + flushInternal(); + } + } + + /** + * Flush out the data in client's user buffer. After the return of + * this call, new readers will see the data. + * + * @throws IOException if any error occurs + */ + @Override + public void hflush() throws IOException { + if (supportFlush) { + flushInternal(); + } + } + + /** + * Query the stream for a specific capability. + * + * @param capability string to query the stream support for. + * @return true for hsync and hflush. + */ + @Override + public boolean hasCapability(String capability) { + switch (capability.toLowerCase(Locale.ENGLISH)) { + case StreamCapabilities.HSYNC: + case StreamCapabilities.HFLUSH: + return supportFlush; + default: + return false; + } + } + +} diff --git a/other/java/hdfs3/src/main/java/seaweed/hdfs/SeaweedOutputStream.java b/other/java/hdfs3/src/main/java/seaweed/hdfs/SeaweedOutputStream.java deleted file mode 100644 index d4c967a06..000000000 --- a/other/java/hdfs3/src/main/java/seaweed/hdfs/SeaweedOutputStream.java +++ /dev/null @@ -1,337 +0,0 @@ -package seaweed.hdfs; - -// adapted from org.apache.hadoop.fs.azurebfs.services.AbfsOutputStream - -import com.google.common.base.Preconditions; -import org.apache.hadoop.fs.FSExceptionMessages; -import org.apache.hadoop.fs.Path; -import org.apache.hadoop.fs.StreamCapabilities; -import org.apache.hadoop.fs.Syncable; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; -import seaweedfs.client.ByteBufferPool; -import seaweedfs.client.FilerGrpcClient; -import seaweedfs.client.FilerProto; -import seaweedfs.client.SeaweedWrite; - -import java.io.IOException; -import java.io.InterruptedIOException; -import java.io.OutputStream; -import java.nio.ByteBuffer; -import java.util.Locale; -import java.util.concurrent.*; - -import static seaweed.hdfs.SeaweedFileSystemStore.getParentDirectory; - -public class SeaweedOutputStream extends OutputStream implements Syncable, StreamCapabilities { - - private static final Logger LOG = LoggerFactory.getLogger(SeaweedOutputStream.class); - - private final FilerGrpcClient filerGrpcClient; - private final Path path; - private final int bufferSize; - private final int maxConcurrentRequestCount; - private final ThreadPoolExecutor threadExecutor; - private final ExecutorCompletionService<Void> completionService; - private final FilerProto.Entry.Builder entry; - private final boolean supportFlush = false; // true; - private final ConcurrentLinkedDeque<WriteOperation> writeOperations; - private long position; - private boolean closed; - private volatile IOException lastError; - private long lastFlushOffset; - private long lastTotalAppendOffset = 0; - private ByteBuffer buffer; - private long outputIndex; - private String replication = "000"; - - public SeaweedOutputStream(FilerGrpcClient filerGrpcClient, final Path path, FilerProto.Entry.Builder entry, - final long position, final int bufferSize, final String replication) { - this.filerGrpcClient = filerGrpcClient; - this.replication = replication; - this.path = path; - this.position = position; - this.closed = false; - this.lastError = null; - this.lastFlushOffset = 0; - this.bufferSize = bufferSize; - this.buffer = ByteBufferPool.request(bufferSize); - this.outputIndex = 0; - this.writeOperations = new ConcurrentLinkedDeque<>(); - - this.maxConcurrentRequestCount = Runtime.getRuntime().availableProcessors(); - - this.threadExecutor - = new ThreadPoolExecutor(maxConcurrentRequestCount, - maxConcurrentRequestCount, - 120L, - TimeUnit.SECONDS, - new LinkedBlockingQueue<Runnable>()); - this.completionService = new ExecutorCompletionService<>(this.threadExecutor); - - this.entry = entry; - - } - - private synchronized void flushWrittenBytesToServiceInternal(final long offset) throws IOException { - try { - SeaweedWrite.writeMeta(filerGrpcClient, getParentDirectory(path), entry); - } catch (Exception ex) { - throw new IOException(ex); - } - this.lastFlushOffset = offset; - } - - @Override - public void write(final int byteVal) throws IOException { - write(new byte[]{(byte) (byteVal & 0xFF)}); - } - - @Override - public synchronized void write(final byte[] data, final int off, final int length) - throws IOException { - maybeThrowLastError(); - - Preconditions.checkArgument(data != null, "null data"); - - if (off < 0 || length < 0 || length > data.length - off) { - throw new IndexOutOfBoundsException(); - } - - // System.out.println(path + " write [" + (outputIndex + off) + "," + ((outputIndex + off) + length) + ")"); - - int currentOffset = off; - int writableBytes = bufferSize - buffer.position(); - int numberOfBytesToWrite = length; - - while (numberOfBytesToWrite > 0) { - - if (numberOfBytesToWrite < writableBytes) { - buffer.put(data, currentOffset, numberOfBytesToWrite); - outputIndex += numberOfBytesToWrite; - break; - } - - // System.out.println(path + " [" + (outputIndex + currentOffset) + "," + ((outputIndex + currentOffset) + writableBytes) + ") " + buffer.capacity()); - buffer.put(data, currentOffset, writableBytes); - outputIndex += writableBytes; - currentOffset += writableBytes; - writeCurrentBufferToService(); - numberOfBytesToWrite = numberOfBytesToWrite - writableBytes; - writableBytes = bufferSize - buffer.position(); - } - - } - - /** - * Flushes this output stream and forces any buffered output bytes to be - * written out. If any data remains in the payload it is committed to the - * service. Data is queued for writing and forced out to the service - * before the call returns. - */ - @Override - public void flush() throws IOException { - if (supportFlush) { - flushInternalAsync(); - } - } - - /** - * Similar to posix fsync, flush out the data in client's user buffer - * all the way to the disk device (but the disk may have it in its cache). - * - * @throws IOException if error occurs - */ - @Override - public void hsync() throws IOException { - if (supportFlush) { - flushInternal(); - } - } - - /** - * Flush out the data in client's user buffer. After the return of - * this call, new readers will see the data. - * - * @throws IOException if any error occurs - */ - @Override - public void hflush() throws IOException { - if (supportFlush) { - flushInternal(); - } - } - - /** - * Query the stream for a specific capability. - * - * @param capability string to query the stream support for. - * @return true for hsync and hflush. - */ - @Override - public boolean hasCapability(String capability) { - switch (capability.toLowerCase(Locale.ENGLISH)) { - case StreamCapabilities.HSYNC: - case StreamCapabilities.HFLUSH: - return supportFlush; - default: - return false; - } - } - - /** - * Force all data in the output stream to be written to Azure storage. - * Wait to return until this is complete. Close the access to the stream and - * shutdown the upload thread pool. - * If the blob was created, its lease will be released. - * Any error encountered caught in threads and stored will be rethrown here - * after cleanup. - */ - @Override - public synchronized void close() throws IOException { - if (closed) { - return; - } - - LOG.debug("close path: {}", path); - try { - flushInternal(); - threadExecutor.shutdown(); - } finally { - lastError = new IOException(FSExceptionMessages.STREAM_IS_CLOSED); - ByteBufferPool.release(buffer); - buffer = null; - outputIndex = 0; - closed = true; - writeOperations.clear(); - if (!threadExecutor.isShutdown()) { - threadExecutor.shutdownNow(); - } - } - } - - private synchronized void writeCurrentBufferToService() throws IOException { - if (buffer.position() == 0) { - return; - } - - position += submitWriteBufferToService(buffer, position); - - buffer = ByteBufferPool.request(bufferSize); - - } - - private synchronized int submitWriteBufferToService(final ByteBuffer bufferToWrite, final long writePosition) throws IOException { - - bufferToWrite.flip(); - int bytesLength = bufferToWrite.limit() - bufferToWrite.position(); - - if (threadExecutor.getQueue().size() >= maxConcurrentRequestCount) { - waitForTaskToComplete(); - } - final Future<Void> job = completionService.submit(() -> { - // System.out.println(path + " is going to save [" + (writePosition) + "," + ((writePosition) + bytesLength) + ")"); - SeaweedWrite.writeData(entry, replication, filerGrpcClient, writePosition, bufferToWrite.array(), bufferToWrite.position(), bufferToWrite.limit(), path.toUri().getPath()); - // System.out.println(path + " saved [" + (writePosition) + "," + ((writePosition) + bytesLength) + ")"); - ByteBufferPool.release(bufferToWrite); - return null; - }); - - writeOperations.add(new WriteOperation(job, writePosition, bytesLength)); - - // Try to shrink the queue - shrinkWriteOperationQueue(); - - return bytesLength; - - } - - private void waitForTaskToComplete() throws IOException { - boolean completed; - for (completed = false; completionService.poll() != null; completed = true) { - // keep polling until there is no data - } - - if (!completed) { - try { - completionService.take(); - } catch (InterruptedException e) { - lastError = (IOException) new InterruptedIOException(e.toString()).initCause(e); - throw lastError; - } - } - } - - private void maybeThrowLastError() throws IOException { - if (lastError != null) { - throw lastError; - } - } - - /** - * Try to remove the completed write operations from the beginning of write - * operation FIFO queue. - */ - private synchronized void shrinkWriteOperationQueue() throws IOException { - try { - while (writeOperations.peek() != null && writeOperations.peek().task.isDone()) { - writeOperations.peek().task.get(); - lastTotalAppendOffset += writeOperations.peek().length; - writeOperations.remove(); - } - } catch (Exception e) { - lastError = new IOException(e); - throw lastError; - } - } - - private synchronized void flushInternal() throws IOException { - maybeThrowLastError(); - writeCurrentBufferToService(); - flushWrittenBytesToService(); - } - - private synchronized void flushInternalAsync() throws IOException { - maybeThrowLastError(); - writeCurrentBufferToService(); - flushWrittenBytesToServiceAsync(); - } - - private synchronized void flushWrittenBytesToService() throws IOException { - for (WriteOperation writeOperation : writeOperations) { - try { - writeOperation.task.get(); - } catch (Exception ex) { - lastError = new IOException(ex); - throw lastError; - } - } - LOG.debug("flushWrittenBytesToService: {} position:{}", path, position); - flushWrittenBytesToServiceInternal(position); - } - - private synchronized void flushWrittenBytesToServiceAsync() throws IOException { - shrinkWriteOperationQueue(); - - if (this.lastTotalAppendOffset > this.lastFlushOffset) { - this.flushWrittenBytesToServiceInternal(this.lastTotalAppendOffset); - } - } - - private static class WriteOperation { - private final Future<Void> task; - private final long startOffset; - private final long length; - - WriteOperation(final Future<Void> task, final long startOffset, final long length) { - Preconditions.checkNotNull(task, "task"); - Preconditions.checkArgument(startOffset >= 0, "startOffset"); - Preconditions.checkArgument(length >= 0, "length"); - - this.task = task; - this.startOffset = startOffset; - this.length = length; - } - } - -} |
