From 043c2d796098e1f88da1442eefdb9bd5df075708 Mon Sep 17 00:00:00 2001 From: Chris Lu Date: Sat, 30 Jan 2021 05:39:09 -0800 Subject: refactoring SeaweedOutputStream --- .../java/seaweedfs/client/SeaweedOutputStream.java | 288 +++++++++++++++++++++ 1 file changed, 288 insertions(+) create mode 100644 other/java/client/src/main/java/seaweedfs/client/SeaweedOutputStream.java (limited to 'other/java/client/src') diff --git a/other/java/client/src/main/java/seaweedfs/client/SeaweedOutputStream.java b/other/java/client/src/main/java/seaweedfs/client/SeaweedOutputStream.java new file mode 100644 index 000000000..b09a15a5c --- /dev/null +++ b/other/java/client/src/main/java/seaweedfs/client/SeaweedOutputStream.java @@ -0,0 +1,288 @@ +package seaweedfs.client; + +// adapted from org.apache.hadoop.fs.azurebfs.services.AbfsOutputStream + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.io.InterruptedIOException; +import java.io.OutputStream; +import java.nio.ByteBuffer; +import java.util.concurrent.*; + +public class SeaweedOutputStream extends OutputStream { + + private static final Logger LOG = LoggerFactory.getLogger(SeaweedOutputStream.class); + + private final FilerGrpcClient filerGrpcClient; + private final String path; + private final int bufferSize; + private final int maxConcurrentRequestCount; + private final ThreadPoolExecutor threadExecutor; + private final ExecutorCompletionService completionService; + private final FilerProto.Entry.Builder entry; + protected final boolean supportFlush = false; // true; + private final ConcurrentLinkedDeque writeOperations; + private long position; + private boolean closed; + private volatile IOException lastError; + private long lastFlushOffset; + private long lastTotalAppendOffset = 0; + private ByteBuffer buffer; + private long outputIndex; + private String replication = "000"; + + public SeaweedOutputStream(FilerGrpcClient filerGrpcClient, final String path, FilerProto.Entry.Builder entry, + final long position, final int bufferSize, final String replication) { + this.filerGrpcClient = filerGrpcClient; + this.replication = replication; + this.path = path; + this.position = position; + this.closed = false; + this.lastError = null; + this.lastFlushOffset = 0; + this.bufferSize = bufferSize; + this.buffer = ByteBufferPool.request(bufferSize); + this.outputIndex = 0; + this.writeOperations = new ConcurrentLinkedDeque<>(); + + this.maxConcurrentRequestCount = Runtime.getRuntime().availableProcessors(); + + this.threadExecutor + = new ThreadPoolExecutor(maxConcurrentRequestCount, + maxConcurrentRequestCount, + 120L, + TimeUnit.SECONDS, + new LinkedBlockingQueue()); + this.completionService = new ExecutorCompletionService<>(this.threadExecutor); + + this.entry = entry; + + } + + public static String getParentDirectory(String path) { + if (path.equals("/")) { + return path; + } + int lastSlashIndex = path.lastIndexOf("/"); + return path.substring(0, lastSlashIndex); + } + + private synchronized void flushWrittenBytesToServiceInternal(final long offset) throws IOException { + try { + SeaweedWrite.writeMeta(filerGrpcClient, getParentDirectory(path), entry); + } catch (Exception ex) { + throw new IOException(ex); + } + this.lastFlushOffset = offset; + } + + @Override + public void write(final int byteVal) throws IOException { + write(new byte[]{(byte) (byteVal & 0xFF)}); + } + + @Override + public synchronized void write(final byte[] data, final int off, final int length) + throws IOException { + maybeThrowLastError(); + + if (data == null) { + return; + } + + if (off < 0 || length < 0 || length > data.length - off) { + throw new IndexOutOfBoundsException(); + } + + // System.out.println(path + " write [" + (outputIndex + off) + "," + ((outputIndex + off) + length) + ")"); + + int currentOffset = off; + int writableBytes = bufferSize - buffer.position(); + int numberOfBytesToWrite = length; + + while (numberOfBytesToWrite > 0) { + + if (numberOfBytesToWrite < writableBytes) { + buffer.put(data, currentOffset, numberOfBytesToWrite); + outputIndex += numberOfBytesToWrite; + break; + } + + // System.out.println(path + " [" + (outputIndex + currentOffset) + "," + ((outputIndex + currentOffset) + writableBytes) + ") " + buffer.capacity()); + buffer.put(data, currentOffset, writableBytes); + outputIndex += writableBytes; + currentOffset += writableBytes; + writeCurrentBufferToService(); + numberOfBytesToWrite = numberOfBytesToWrite - writableBytes; + writableBytes = bufferSize - buffer.position(); + } + + } + + /** + * Flushes this output stream and forces any buffered output bytes to be + * written out. If any data remains in the payload it is committed to the + * service. Data is queued for writing and forced out to the service + * before the call returns. + */ + @Override + public void flush() throws IOException { + if (supportFlush) { + flushInternalAsync(); + } + } + + /** + * Force all data in the output stream to be written to Azure storage. + * Wait to return until this is complete. Close the access to the stream and + * shutdown the upload thread pool. + * If the blob was created, its lease will be released. + * Any error encountered caught in threads and stored will be rethrown here + * after cleanup. + */ + @Override + public synchronized void close() throws IOException { + if (closed) { + return; + } + + LOG.debug("close path: {}", path); + try { + flushInternal(); + threadExecutor.shutdown(); + } finally { + lastError = new IOException("Stream is closed!"); + ByteBufferPool.release(buffer); + buffer = null; + outputIndex = 0; + closed = true; + writeOperations.clear(); + if (!threadExecutor.isShutdown()) { + threadExecutor.shutdownNow(); + } + } + } + + private synchronized void writeCurrentBufferToService() throws IOException { + if (buffer.position() == 0) { + return; + } + + position += submitWriteBufferToService(buffer, position); + + buffer = ByteBufferPool.request(bufferSize); + + } + + private synchronized int submitWriteBufferToService(final ByteBuffer bufferToWrite, final long writePosition) throws IOException { + + bufferToWrite.flip(); + int bytesLength = bufferToWrite.limit() - bufferToWrite.position(); + + if (threadExecutor.getQueue().size() >= maxConcurrentRequestCount) { + waitForTaskToComplete(); + } + final Future job = completionService.submit(() -> { + // System.out.println(path + " is going to save [" + (writePosition) + "," + ((writePosition) + bytesLength) + ")"); + SeaweedWrite.writeData(entry, replication, filerGrpcClient, writePosition, bufferToWrite.array(), bufferToWrite.position(), bufferToWrite.limit(), path); + // System.out.println(path + " saved [" + (writePosition) + "," + ((writePosition) + bytesLength) + ")"); + ByteBufferPool.release(bufferToWrite); + return null; + }); + + writeOperations.add(new WriteOperation(job, writePosition, bytesLength)); + + // Try to shrink the queue + shrinkWriteOperationQueue(); + + return bytesLength; + + } + + private void waitForTaskToComplete() throws IOException { + boolean completed; + for (completed = false; completionService.poll() != null; completed = true) { + // keep polling until there is no data + } + + if (!completed) { + try { + completionService.take(); + } catch (InterruptedException e) { + lastError = (IOException) new InterruptedIOException(e.toString()).initCause(e); + throw lastError; + } + } + } + + private void maybeThrowLastError() throws IOException { + if (lastError != null) { + throw lastError; + } + } + + /** + * Try to remove the completed write operations from the beginning of write + * operation FIFO queue. + */ + private synchronized void shrinkWriteOperationQueue() throws IOException { + try { + while (writeOperations.peek() != null && writeOperations.peek().task.isDone()) { + writeOperations.peek().task.get(); + lastTotalAppendOffset += writeOperations.peek().length; + writeOperations.remove(); + } + } catch (Exception e) { + lastError = new IOException(e); + throw lastError; + } + } + + protected synchronized void flushInternal() throws IOException { + maybeThrowLastError(); + writeCurrentBufferToService(); + flushWrittenBytesToService(); + } + + protected synchronized void flushInternalAsync() throws IOException { + maybeThrowLastError(); + writeCurrentBufferToService(); + flushWrittenBytesToServiceAsync(); + } + + private synchronized void flushWrittenBytesToService() throws IOException { + for (WriteOperation writeOperation : writeOperations) { + try { + writeOperation.task.get(); + } catch (Exception ex) { + lastError = new IOException(ex); + throw lastError; + } + } + LOG.debug("flushWrittenBytesToService: {} position:{}", path, position); + flushWrittenBytesToServiceInternal(position); + } + + private synchronized void flushWrittenBytesToServiceAsync() throws IOException { + shrinkWriteOperationQueue(); + + if (this.lastTotalAppendOffset > this.lastFlushOffset) { + this.flushWrittenBytesToServiceInternal(this.lastTotalAppendOffset); + } + } + + private static class WriteOperation { + private final Future task; + private final long startOffset; + private final long length; + + WriteOperation(final Future task, final long startOffset, final long length) { + this.task = task; + this.startOffset = startOffset; + this.length = length; + } + } + +} -- cgit v1.2.3 From 6f4aab51f9d7e8d9721c3c4fd7b57da7ab8b6d8e Mon Sep 17 00:00:00 2001 From: Chris Lu Date: Sat, 30 Jan 2021 06:16:02 -0800 Subject: refactoring SeaweedInputStream --- .../java/seaweedfs/client/SeaweedInputStream.java | 192 +++++++++++++++++++++ 1 file changed, 192 insertions(+) create mode 100644 other/java/client/src/main/java/seaweedfs/client/SeaweedInputStream.java (limited to 'other/java/client/src') diff --git a/other/java/client/src/main/java/seaweedfs/client/SeaweedInputStream.java b/other/java/client/src/main/java/seaweedfs/client/SeaweedInputStream.java new file mode 100644 index 000000000..312e77aa2 --- /dev/null +++ b/other/java/client/src/main/java/seaweedfs/client/SeaweedInputStream.java @@ -0,0 +1,192 @@ +package seaweedfs.client; + +// based on org.apache.hadoop.fs.azurebfs.services.AbfsInputStream + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.EOFException; +import java.io.IOException; +import java.io.InputStream; +import java.nio.ByteBuffer; +import java.util.List; + +public class SeaweedInputStream extends InputStream { + + private static final Logger LOG = LoggerFactory.getLogger(SeaweedInputStream.class); + private static final IOException EXCEPTION_STREAM_IS_CLOSED = new IOException("Stream is closed!"); + + private final FilerGrpcClient filerGrpcClient; + private final String path; + private final FilerProto.Entry entry; + private final List visibleIntervalList; + private final long contentLength; + + private long position = 0; // cursor of the file + + private boolean closed = false; + + public SeaweedInputStream( + final FilerGrpcClient filerGrpcClient, + final String path, + final FilerProto.Entry entry) throws IOException { + this.filerGrpcClient = filerGrpcClient; + this.path = path; + this.entry = entry; + this.contentLength = SeaweedRead.fileSize(entry); + + this.visibleIntervalList = SeaweedRead.nonOverlappingVisibleIntervals(filerGrpcClient, entry.getChunksList()); + + LOG.debug("new path:{} entry:{} visibleIntervalList:{}", path, entry, visibleIntervalList); + + } + + public String getPath() { + return path; + } + + @Override + public int read() throws IOException { + byte[] b = new byte[1]; + int numberOfBytesRead = read(b, 0, 1); + if (numberOfBytesRead < 0) { + return -1; + } else { + return (b[0] & 0xFF); + } + } + + @Override + public int read(final byte[] b, final int off, final int len) throws IOException { + + if (b == null) { + throw new IllegalArgumentException("null byte array passed in to read() method"); + } + if (off >= b.length) { + throw new IllegalArgumentException("offset greater than length of array"); + } + if (len < 0) { + throw new IllegalArgumentException("requested read length is less than zero"); + } + if (len > (b.length - off)) { + throw new IllegalArgumentException("requested read length is more than will fit after requested offset in buffer"); + } + + ByteBuffer buf = ByteBuffer.wrap(b, off, len); + return read(buf); + + } + + // implement ByteBufferReadable + public synchronized int read(ByteBuffer buf) throws IOException { + + if (position < 0) { + throw new IllegalArgumentException("attempting to read from negative offset"); + } + if (position >= contentLength) { + return -1; // Hadoop prefers -1 to EOFException + } + + long bytesRead = 0; + int len = buf.remaining(); + int start = (int) this.position; + if (start+len <= entry.getContent().size()) { + entry.getContent().substring(start, start+len).copyTo(buf); + } else { + bytesRead = SeaweedRead.read(this.filerGrpcClient, this.visibleIntervalList, this.position, buf, SeaweedRead.fileSize(entry)); + } + + if (bytesRead > Integer.MAX_VALUE) { + throw new IOException("Unexpected Content-Length"); + } + + if (bytesRead > 0) { + this.position += bytesRead; + } + + return (int) bytesRead; + } + + public synchronized void seek(long n) throws IOException { + if (closed) { + throw EXCEPTION_STREAM_IS_CLOSED; + } + if (n < 0) { + throw new EOFException("Cannot seek to a negative offset"); + } + if (n > contentLength) { + throw new EOFException("Attempted to seek or read past the end of the file"); + } + this.position = n; + } + + @Override + public synchronized long skip(long n) throws IOException { + if (closed) { + throw EXCEPTION_STREAM_IS_CLOSED; + } + if (this.position == contentLength) { + if (n > 0) { + throw new EOFException("Attempted to seek or read past the end of the file"); + } + } + long newPos = this.position + n; + if (newPos < 0) { + newPos = 0; + n = newPos - this.position; + } + if (newPos > contentLength) { + newPos = contentLength; + n = newPos - this.position; + } + seek(newPos); + return n; + } + + /** + * Return the size of the remaining available bytes + * if the size is less than or equal to {@link Integer#MAX_VALUE}, + * otherwise, return {@link Integer#MAX_VALUE}. + *

+ * This is to match the behavior of DFSInputStream.available(), + * which some clients may rely on (HBase write-ahead log reading in + * particular). + */ + @Override + public synchronized int available() throws IOException { + if (closed) { + throw EXCEPTION_STREAM_IS_CLOSED; + } + final long remaining = this.contentLength - this.position; + return remaining <= Integer.MAX_VALUE + ? (int) remaining : Integer.MAX_VALUE; + } + + /** + * Returns the length of the file that this stream refers to. Note that the length returned is the length + * as of the time the Stream was opened. Specifically, if there have been subsequent appends to the file, + * they wont be reflected in the returned length. + * + * @return length of the file. + * @throws IOException if the stream is closed + */ + public long length() throws IOException { + if (closed) { + throw EXCEPTION_STREAM_IS_CLOSED; + } + return contentLength; + } + + public synchronized long getPos() throws IOException { + if (closed) { + throw EXCEPTION_STREAM_IS_CLOSED; + } + return position; + } + + @Override + public synchronized void close() throws IOException { + closed = true; + } + +} -- cgit v1.2.3 From 502554887f58915c077462372db4e2813eac3f92 Mon Sep 17 00:00:00 2001 From: Chris Lu Date: Thu, 4 Feb 2021 18:44:57 -0800 Subject: Java: add SeaweedInputStream example --- .../main/java/seaweedfs/client/SeaweedInputStream.java | 15 +++++++++++++++ 1 file changed, 15 insertions(+) (limited to 'other/java/client/src') diff --git a/other/java/client/src/main/java/seaweedfs/client/SeaweedInputStream.java b/other/java/client/src/main/java/seaweedfs/client/SeaweedInputStream.java index 312e77aa2..519ff0fd9 100644 --- a/other/java/client/src/main/java/seaweedfs/client/SeaweedInputStream.java +++ b/other/java/client/src/main/java/seaweedfs/client/SeaweedInputStream.java @@ -26,6 +26,21 @@ public class SeaweedInputStream extends InputStream { private boolean closed = false; + public SeaweedInputStream( + final FilerGrpcClient filerGrpcClient, + final String dir, final String name) throws IOException { + this.filerGrpcClient = filerGrpcClient; + this.path = dir; + FilerClient filerClient = new FilerClient(filerGrpcClient); + this.entry = filerClient.lookupEntry(dir, name); + this.contentLength = SeaweedRead.fileSize(entry); + + this.visibleIntervalList = SeaweedRead.nonOverlappingVisibleIntervals(filerGrpcClient, entry.getChunksList()); + + LOG.debug("new path:{} entry:{} visibleIntervalList:{}", path, entry, visibleIntervalList); + + } + public SeaweedInputStream( final FilerGrpcClient filerGrpcClient, final String path, -- cgit v1.2.3 From 7f90d14f100f9ce69b6b05f6b8f80823f4c69fdf Mon Sep 17 00:00:00 2001 From: Chris Lu Date: Thu, 4 Feb 2021 20:16:08 -0800 Subject: Java: add SeaweedOutputStream example --- .../java/seaweedfs/client/SeaweedInputStream.java | 8 +-- .../java/seaweedfs/client/SeaweedOutputStream.java | 58 +++++++++++++++++++++- 2 files changed, 61 insertions(+), 5 deletions(-) (limited to 'other/java/client/src') diff --git a/other/java/client/src/main/java/seaweedfs/client/SeaweedInputStream.java b/other/java/client/src/main/java/seaweedfs/client/SeaweedInputStream.java index 519ff0fd9..8b26c242c 100644 --- a/other/java/client/src/main/java/seaweedfs/client/SeaweedInputStream.java +++ b/other/java/client/src/main/java/seaweedfs/client/SeaweedInputStream.java @@ -28,11 +28,13 @@ public class SeaweedInputStream extends InputStream { public SeaweedInputStream( final FilerGrpcClient filerGrpcClient, - final String dir, final String name) throws IOException { + final String fullpath) throws IOException { this.filerGrpcClient = filerGrpcClient; - this.path = dir; + this.path = fullpath; FilerClient filerClient = new FilerClient(filerGrpcClient); - this.entry = filerClient.lookupEntry(dir, name); + this.entry = filerClient.lookupEntry( + SeaweedOutputStream.getParentDirectory(fullpath), + SeaweedOutputStream.getFileName(fullpath)); this.contentLength = SeaweedRead.fileSize(entry); this.visibleIntervalList = SeaweedRead.nonOverlappingVisibleIntervals(filerGrpcClient, entry.getChunksList()); diff --git a/other/java/client/src/main/java/seaweedfs/client/SeaweedOutputStream.java b/other/java/client/src/main/java/seaweedfs/client/SeaweedOutputStream.java index b09a15a5c..a98bbd1ab 100644 --- a/other/java/client/src/main/java/seaweedfs/client/SeaweedOutputStream.java +++ b/other/java/client/src/main/java/seaweedfs/client/SeaweedOutputStream.java @@ -14,7 +14,7 @@ import java.util.concurrent.*; public class SeaweedOutputStream extends OutputStream { private static final Logger LOG = LoggerFactory.getLogger(SeaweedOutputStream.class); - + protected final boolean supportFlush = false; // true; private final FilerGrpcClient filerGrpcClient; private final String path; private final int bufferSize; @@ -22,7 +22,6 @@ public class SeaweedOutputStream extends OutputStream { private final ThreadPoolExecutor threadExecutor; private final ExecutorCompletionService completionService; private final FilerProto.Entry.Builder entry; - protected final boolean supportFlush = false; // true; private final ConcurrentLinkedDeque writeOperations; private long position; private boolean closed; @@ -32,6 +31,45 @@ public class SeaweedOutputStream extends OutputStream { private ByteBuffer buffer; private long outputIndex; private String replication = "000"; + private boolean shouldSaveMetadata = false; + + public SeaweedOutputStream(FilerGrpcClient filerGrpcClient, final String fullpath) { + this.filerGrpcClient = filerGrpcClient; + this.path = fullpath; + this.position = 0; + this.closed = false; + this.lastError = null; + this.lastFlushOffset = 0; + this.bufferSize = 8 * 1024 * 1024; + this.buffer = ByteBufferPool.request(bufferSize); + this.outputIndex = 0; + this.writeOperations = new ConcurrentLinkedDeque<>(); + + this.maxConcurrentRequestCount = Runtime.getRuntime().availableProcessors(); + + this.threadExecutor + = new ThreadPoolExecutor(maxConcurrentRequestCount, + maxConcurrentRequestCount, + 120L, + TimeUnit.SECONDS, + new LinkedBlockingQueue()); + this.completionService = new ExecutorCompletionService<>(this.threadExecutor); + + long now = System.currentTimeMillis() / 1000L; + + this.entry = FilerProto.Entry.newBuilder() + .setName(getFileName(path)) + .setIsDirectory(false) + .setAttributes(FilerProto.FuseAttributes.newBuilder() + .setFileMode(0755) + .setReplication(replication) + .setCrtime(now) + .setMtime(now) + .clearGroupName() + ); + this.shouldSaveMetadata = true; + + } public SeaweedOutputStream(FilerGrpcClient filerGrpcClient, final String path, FilerProto.Entry.Builder entry, final long position, final int bufferSize, final String replication) { @@ -66,9 +104,20 @@ public class SeaweedOutputStream extends OutputStream { return path; } int lastSlashIndex = path.lastIndexOf("/"); + if (lastSlashIndex == 0) { + return "/"; + } return path.substring(0, lastSlashIndex); } + public static String getFileName(String path) { + if (path.indexOf("/") < 0) { + return path; + } + int lastSlashIndex = path.lastIndexOf("/"); + return path.substring(lastSlashIndex+1); + } + private synchronized void flushWrittenBytesToServiceInternal(final long offset) throws IOException { try { SeaweedWrite.writeMeta(filerGrpcClient, getParentDirectory(path), entry); @@ -163,6 +212,11 @@ public class SeaweedOutputStream extends OutputStream { threadExecutor.shutdownNow(); } } + + if (shouldSaveMetadata) { + SeaweedWrite.writeMeta(filerGrpcClient, getParentDirectory(path), entry); + } + } private synchronized void writeCurrentBufferToService() throws IOException { -- cgit v1.2.3 From 82c167aaca8716443a84a338a9e181f661ca1c79 Mon Sep 17 00:00:00 2001 From: Chris Lu Date: Thu, 4 Feb 2021 20:18:33 -0800 Subject: Java: supportFlush set to true --- .../java/client/src/main/java/seaweedfs/client/SeaweedOutputStream.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) (limited to 'other/java/client/src') diff --git a/other/java/client/src/main/java/seaweedfs/client/SeaweedOutputStream.java b/other/java/client/src/main/java/seaweedfs/client/SeaweedOutputStream.java index a98bbd1ab..925a56bdd 100644 --- a/other/java/client/src/main/java/seaweedfs/client/SeaweedOutputStream.java +++ b/other/java/client/src/main/java/seaweedfs/client/SeaweedOutputStream.java @@ -14,7 +14,7 @@ import java.util.concurrent.*; public class SeaweedOutputStream extends OutputStream { private static final Logger LOG = LoggerFactory.getLogger(SeaweedOutputStream.class); - protected final boolean supportFlush = false; // true; + protected final boolean supportFlush = true; private final FilerGrpcClient filerGrpcClient; private final String path; private final int bufferSize; -- cgit v1.2.3 From ce416d765f42f673e84723703f75c8d0ab151889 Mon Sep 17 00:00:00 2001 From: Chris Lu Date: Thu, 4 Feb 2021 20:22:40 -0800 Subject: Java: SeaweedOutputStream add replication option --- .../client/src/main/java/seaweedfs/client/SeaweedOutputStream.java | 5 +++++ 1 file changed, 5 insertions(+) (limited to 'other/java/client/src') diff --git a/other/java/client/src/main/java/seaweedfs/client/SeaweedOutputStream.java b/other/java/client/src/main/java/seaweedfs/client/SeaweedOutputStream.java index 925a56bdd..94f34b221 100644 --- a/other/java/client/src/main/java/seaweedfs/client/SeaweedOutputStream.java +++ b/other/java/client/src/main/java/seaweedfs/client/SeaweedOutputStream.java @@ -34,6 +34,11 @@ public class SeaweedOutputStream extends OutputStream { private boolean shouldSaveMetadata = false; public SeaweedOutputStream(FilerGrpcClient filerGrpcClient, final String fullpath) { + this(filerGrpcClient, fullpath, "000"); + } + + public SeaweedOutputStream(FilerGrpcClient filerGrpcClient, final String fullpath, final String replication) { + this.replication = replication; this.filerGrpcClient = filerGrpcClient; this.path = fullpath; this.position = 0; -- cgit v1.2.3 From 9fa7977714d99e546cee32e02b5f7fdf3528078b Mon Sep 17 00:00:00 2001 From: Chris Lu Date: Thu, 4 Feb 2021 20:30:49 -0800 Subject: Java: SeaweedOutputStream refactoring --- .../java/seaweedfs/client/SeaweedOutputStream.java | 61 +++++++--------------- 1 file changed, 18 insertions(+), 43 deletions(-) (limited to 'other/java/client/src') diff --git a/other/java/client/src/main/java/seaweedfs/client/SeaweedOutputStream.java b/other/java/client/src/main/java/seaweedfs/client/SeaweedOutputStream.java index 94f34b221..f9df22c9b 100644 --- a/other/java/client/src/main/java/seaweedfs/client/SeaweedOutputStream.java +++ b/other/java/client/src/main/java/seaweedfs/client/SeaweedOutputStream.java @@ -21,8 +21,9 @@ public class SeaweedOutputStream extends OutputStream { private final int maxConcurrentRequestCount; private final ThreadPoolExecutor threadExecutor; private final ExecutorCompletionService completionService; - private final FilerProto.Entry.Builder entry; private final ConcurrentLinkedDeque writeOperations; + private final boolean shouldSaveMetadata = false; + private FilerProto.Entry.Builder entry; private long position; private boolean closed; private volatile IOException lastError; @@ -31,49 +32,13 @@ public class SeaweedOutputStream extends OutputStream { private ByteBuffer buffer; private long outputIndex; private String replication = "000"; - private boolean shouldSaveMetadata = false; public SeaweedOutputStream(FilerGrpcClient filerGrpcClient, final String fullpath) { this(filerGrpcClient, fullpath, "000"); } public SeaweedOutputStream(FilerGrpcClient filerGrpcClient, final String fullpath, final String replication) { - this.replication = replication; - this.filerGrpcClient = filerGrpcClient; - this.path = fullpath; - this.position = 0; - this.closed = false; - this.lastError = null; - this.lastFlushOffset = 0; - this.bufferSize = 8 * 1024 * 1024; - this.buffer = ByteBufferPool.request(bufferSize); - this.outputIndex = 0; - this.writeOperations = new ConcurrentLinkedDeque<>(); - - this.maxConcurrentRequestCount = Runtime.getRuntime().availableProcessors(); - - this.threadExecutor - = new ThreadPoolExecutor(maxConcurrentRequestCount, - maxConcurrentRequestCount, - 120L, - TimeUnit.SECONDS, - new LinkedBlockingQueue()); - this.completionService = new ExecutorCompletionService<>(this.threadExecutor); - - long now = System.currentTimeMillis() / 1000L; - - this.entry = FilerProto.Entry.newBuilder() - .setName(getFileName(path)) - .setIsDirectory(false) - .setAttributes(FilerProto.FuseAttributes.newBuilder() - .setFileMode(0755) - .setReplication(replication) - .setCrtime(now) - .setMtime(now) - .clearGroupName() - ); - this.shouldSaveMetadata = true; - + this(filerGrpcClient, fullpath, null, 0, 8 * 1024 * 1024, "000"); } public SeaweedOutputStream(FilerGrpcClient filerGrpcClient, final String path, FilerProto.Entry.Builder entry, @@ -101,6 +66,20 @@ public class SeaweedOutputStream extends OutputStream { this.completionService = new ExecutorCompletionService<>(this.threadExecutor); this.entry = entry; + if (this.entry == null) { + long now = System.currentTimeMillis() / 1000L; + + this.entry = FilerProto.Entry.newBuilder() + .setName(getFileName(path)) + .setIsDirectory(false) + .setAttributes(FilerProto.FuseAttributes.newBuilder() + .setFileMode(0755) + .setReplication(replication) + .setCrtime(now) + .setMtime(now) + .clearGroupName() + ); + } } @@ -120,7 +99,7 @@ public class SeaweedOutputStream extends OutputStream { return path; } int lastSlashIndex = path.lastIndexOf("/"); - return path.substring(lastSlashIndex+1); + return path.substring(lastSlashIndex + 1); } private synchronized void flushWrittenBytesToServiceInternal(final long offset) throws IOException { @@ -218,10 +197,6 @@ public class SeaweedOutputStream extends OutputStream { } } - if (shouldSaveMetadata) { - SeaweedWrite.writeMeta(filerGrpcClient, getParentDirectory(path), entry); - } - } private synchronized void writeCurrentBufferToService() throws IOException { -- cgit v1.2.3 From 8c3177d835bb86eed6127b390e2f39ca63ba1a04 Mon Sep 17 00:00:00 2001 From: Chris Lu Date: Thu, 4 Feb 2021 21:41:19 -0800 Subject: java: resolve parent directory if started with seaweedfs:// --- .../client/src/main/java/seaweedfs/client/SeaweedOutputStream.java | 5 +++++ 1 file changed, 5 insertions(+) (limited to 'other/java/client/src') diff --git a/other/java/client/src/main/java/seaweedfs/client/SeaweedOutputStream.java b/other/java/client/src/main/java/seaweedfs/client/SeaweedOutputStream.java index f9df22c9b..92dc59f61 100644 --- a/other/java/client/src/main/java/seaweedfs/client/SeaweedOutputStream.java +++ b/other/java/client/src/main/java/seaweedfs/client/SeaweedOutputStream.java @@ -84,6 +84,11 @@ public class SeaweedOutputStream extends OutputStream { } public static String getParentDirectory(String path) { + int protoIndex = path.indexOf("://"); + if (protoIndex >= 0) { + int pathStart = path.indexOf("/", protoIndex+3); + path = path.substring(pathStart); + } if (path.equals("/")) { return path; } -- cgit v1.2.3 From 694df8933139a1b498eefb5f2f501e2d5912d58c Mon Sep 17 00:00:00 2001 From: Chris Lu Date: Thu, 4 Feb 2021 22:21:55 -0800 Subject: java: add configurable volume access mode --- .../java/seaweedfs/client/FileChunkManifest.java | 2 +- .../java/seaweedfs/client/FilerGrpcClient.java | 32 +++++++++++++++++++++- .../main/java/seaweedfs/client/SeaweedRead.java | 16 +++++++---- .../main/java/seaweedfs/client/SeaweedWrite.java | 10 +++++-- 4 files changed, 51 insertions(+), 9 deletions(-) (limited to 'other/java/client/src') diff --git a/other/java/client/src/main/java/seaweedfs/client/FileChunkManifest.java b/other/java/client/src/main/java/seaweedfs/client/FileChunkManifest.java index 3293db2ca..3d7da91d5 100644 --- a/other/java/client/src/main/java/seaweedfs/client/FileChunkManifest.java +++ b/other/java/client/src/main/java/seaweedfs/client/FileChunkManifest.java @@ -74,7 +74,7 @@ public class FileChunkManifest { byte[] chunkData = SeaweedRead.chunkCache.getChunk(chunkView.fileId); if (chunkData == null) { LOG.debug("doFetchFullChunkData:{}", chunkView); - chunkData = SeaweedRead.doFetchFullChunkData(chunkView, locations); + chunkData = SeaweedRead.doFetchFullChunkData(filerGrpcClient, chunkView, locations); } if (chunk.getIsChunkManifest()){ LOG.debug("chunk {} size {}", chunkView.fileId, chunkData.length); diff --git a/other/java/client/src/main/java/seaweedfs/client/FilerGrpcClient.java b/other/java/client/src/main/java/seaweedfs/client/FilerGrpcClient.java index 1a719f3c0..8a37827f1 100644 --- a/other/java/client/src/main/java/seaweedfs/client/FilerGrpcClient.java +++ b/other/java/client/src/main/java/seaweedfs/client/FilerGrpcClient.java @@ -15,6 +15,10 @@ import java.util.concurrent.TimeUnit; public class FilerGrpcClient { + public final int VOLUME_SERVER_ACCESS_DIRECT = 0; + public final int VOLUME_SERVER_ACCESS_PUBLIC_URL = 1; + public final int VOLUME_SERVER_ACCESS_FILER_PROXY = 2; + private static final Logger logger = LoggerFactory.getLogger(FilerGrpcClient.class); static SslContext sslContext; @@ -34,6 +38,8 @@ public class FilerGrpcClient { private boolean cipher = false; private String collection = ""; private String replication = ""; + private int volumeServerAccess = VOLUME_SERVER_ACCESS_DIRECT; + private String filerAddress; public FilerGrpcClient(String host, int grpcPort) { this(host, grpcPort, sslContext); @@ -49,6 +55,8 @@ public class FilerGrpcClient { .negotiationType(NegotiationType.TLS) .sslContext(sslContext)); + filerAddress = String.format("%s:%d", host, grpcPort-10000); + FilerProto.GetFilerConfigurationResponse filerConfigurationResponse = this.getBlockingStub().getFilerConfiguration( FilerProto.GetFilerConfigurationRequest.newBuilder().build()); @@ -58,7 +66,7 @@ public class FilerGrpcClient { } - public FilerGrpcClient(ManagedChannelBuilder channelBuilder) { + private FilerGrpcClient(ManagedChannelBuilder channelBuilder) { channel = channelBuilder.build(); blockingStub = SeaweedFilerGrpc.newBlockingStub(channel); asyncStub = SeaweedFilerGrpc.newStub(channel); @@ -93,4 +101,26 @@ public class FilerGrpcClient { return futureStub; } + public void setAccessVolumeServerDirectly() { + this.volumeServerAccess = VOLUME_SERVER_ACCESS_DIRECT; + } + public boolean isAccessVolumeServerDirectly() { + return this.volumeServerAccess == VOLUME_SERVER_ACCESS_DIRECT; + } + public void setAccessVolumeServerByPublicUrl() { + this.volumeServerAccess = VOLUME_SERVER_ACCESS_PUBLIC_URL; + } + public boolean isAccessVolumeServerByPublicUrl() { + return this.volumeServerAccess == VOLUME_SERVER_ACCESS_PUBLIC_URL; + } + public void setAccessVolumeServerByFilerProxy() { + this.volumeServerAccess = VOLUME_SERVER_ACCESS_FILER_PROXY; + } + public boolean isAccessVolumeServerByFilerProxy() { + return this.volumeServerAccess == VOLUME_SERVER_ACCESS_FILER_PROXY; + } + public String getFilerAddress() { + return this.filerAddress; + } + } diff --git a/other/java/client/src/main/java/seaweedfs/client/SeaweedRead.java b/other/java/client/src/main/java/seaweedfs/client/SeaweedRead.java index c45987bed..3df832d7d 100644 --- a/other/java/client/src/main/java/seaweedfs/client/SeaweedRead.java +++ b/other/java/client/src/main/java/seaweedfs/client/SeaweedRead.java @@ -71,7 +71,7 @@ public class SeaweedRead { return 0; } - int len = readChunkView(startOffset, buf, chunkView, locations); + int len = readChunkView(filerGrpcClient, startOffset, buf, chunkView, locations); LOG.debug("read [{},{}) {} size {}", startOffset, startOffset + len, chunkView.fileId, chunkView.size); @@ -93,12 +93,12 @@ public class SeaweedRead { return readCount; } - private static int readChunkView(long startOffset, ByteBuffer buf, ChunkView chunkView, FilerProto.Locations locations) throws IOException { + private static int readChunkView(FilerGrpcClient filerGrpcClient, long startOffset, ByteBuffer buf, ChunkView chunkView, FilerProto.Locations locations) throws IOException { byte[] chunkData = chunkCache.getChunk(chunkView.fileId); if (chunkData == null) { - chunkData = doFetchFullChunkData(chunkView, locations); + chunkData = doFetchFullChunkData(filerGrpcClient, chunkView, locations); chunkCache.setChunk(chunkView.fileId, chunkData); } @@ -110,12 +110,18 @@ public class SeaweedRead { return len; } - public static byte[] doFetchFullChunkData(ChunkView chunkView, FilerProto.Locations locations) throws IOException { + public static byte[] doFetchFullChunkData(FilerGrpcClient filerGrpcClient, ChunkView chunkView, FilerProto.Locations locations) throws IOException { byte[] data = null; IOException lastException = null; for (long waitTime = 1000L; waitTime < 10 * 1000; waitTime += waitTime / 2) { for (FilerProto.Location location : locations.getLocationsList()) { + String host = location.getUrl(); + if (filerGrpcClient.isAccessVolumeServerByPublicUrl()) { + host = location.getPublicUrl(); + } else if (filerGrpcClient.isAccessVolumeServerByFilerProxy()) { + host = filerGrpcClient.getFilerAddress(); + } String url = String.format("http://%s/%s", location.getUrl(), chunkView.fileId); try { data = doFetchOneFullChunkData(chunkView, url); @@ -145,7 +151,7 @@ public class SeaweedRead { } - public static byte[] doFetchOneFullChunkData(ChunkView chunkView, String url) throws IOException { + private static byte[] doFetchOneFullChunkData(ChunkView chunkView, String url) throws IOException { HttpGet request = new HttpGet(url); diff --git a/other/java/client/src/main/java/seaweedfs/client/SeaweedWrite.java b/other/java/client/src/main/java/seaweedfs/client/SeaweedWrite.java index b8fd3e299..3cc11e21c 100644 --- a/other/java/client/src/main/java/seaweedfs/client/SeaweedWrite.java +++ b/other/java/client/src/main/java/seaweedfs/client/SeaweedWrite.java @@ -51,9 +51,15 @@ public class SeaweedWrite { .setPath(path) .build()); String fileId = response.getFileId(); - String url = response.getUrl(); String auth = response.getAuth(); - String targetUrl = String.format("http://%s/%s", url, fileId); + + String host = response.getUrl(); + if (filerGrpcClient.isAccessVolumeServerByPublicUrl()) { + host = response.getPublicUrl(); + } else if (filerGrpcClient.isAccessVolumeServerByFilerProxy()) { + host = filerGrpcClient.getFilerAddress(); + } + String targetUrl = String.format("http://%s/%s", host, fileId); ByteString cipherKeyString = com.google.protobuf.ByteString.EMPTY; byte[] cipherKey = null; -- cgit v1.2.3 From 8f3a51f2b8896254c74040b0fed9ed307d8efb34 Mon Sep 17 00:00:00 2001 From: Chris Lu Date: Fri, 5 Feb 2021 10:42:20 -0800 Subject: Java: 1.5.8 additional fixes --- other/java/client/src/main/java/seaweedfs/client/SeaweedRead.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) (limited to 'other/java/client/src') diff --git a/other/java/client/src/main/java/seaweedfs/client/SeaweedRead.java b/other/java/client/src/main/java/seaweedfs/client/SeaweedRead.java index 3df832d7d..a70553762 100644 --- a/other/java/client/src/main/java/seaweedfs/client/SeaweedRead.java +++ b/other/java/client/src/main/java/seaweedfs/client/SeaweedRead.java @@ -122,7 +122,7 @@ public class SeaweedRead { } else if (filerGrpcClient.isAccessVolumeServerByFilerProxy()) { host = filerGrpcClient.getFilerAddress(); } - String url = String.format("http://%s/%s", location.getUrl(), chunkView.fileId); + String url = String.format("http://%s/%s", host, chunkView.fileId); try { data = doFetchOneFullChunkData(chunkView, url); lastException = null; -- cgit v1.2.3 From 35ba277a976b583f486e4beb0a1dde3ccff5c433 Mon Sep 17 00:00:00 2001 From: Chris Lu Date: Fri, 5 Feb 2021 22:43:56 -0800 Subject: Java: fix filerProxy mode --- .../java/seaweedfs/client/FilerGrpcClient.java | 28 +++++++++++++++------- .../main/java/seaweedfs/client/SeaweedRead.java | 8 +------ .../main/java/seaweedfs/client/SeaweedWrite.java | 8 +------ 3 files changed, 22 insertions(+), 22 deletions(-) (limited to 'other/java/client/src') diff --git a/other/java/client/src/main/java/seaweedfs/client/FilerGrpcClient.java b/other/java/client/src/main/java/seaweedfs/client/FilerGrpcClient.java index 8a37827f1..6c57e2e0d 100644 --- a/other/java/client/src/main/java/seaweedfs/client/FilerGrpcClient.java +++ b/other/java/client/src/main/java/seaweedfs/client/FilerGrpcClient.java @@ -9,16 +9,12 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import javax.net.ssl.SSLException; -import java.util.Map; import java.util.HashMap; +import java.util.Map; import java.util.concurrent.TimeUnit; public class FilerGrpcClient { - public final int VOLUME_SERVER_ACCESS_DIRECT = 0; - public final int VOLUME_SERVER_ACCESS_PUBLIC_URL = 1; - public final int VOLUME_SERVER_ACCESS_FILER_PROXY = 2; - private static final Logger logger = LoggerFactory.getLogger(FilerGrpcClient.class); static SslContext sslContext; @@ -30,6 +26,9 @@ public class FilerGrpcClient { } } + public final int VOLUME_SERVER_ACCESS_DIRECT = 0; + public final int VOLUME_SERVER_ACCESS_PUBLIC_URL = 1; + public final int VOLUME_SERVER_ACCESS_FILER_PROXY = 2; public final Map vidLocations = new HashMap<>(); private final ManagedChannel channel; private final SeaweedFilerGrpc.SeaweedFilerBlockingStub blockingStub; @@ -55,7 +54,7 @@ public class FilerGrpcClient { .negotiationType(NegotiationType.TLS) .sslContext(sslContext)); - filerAddress = String.format("%s:%d", host, grpcPort-10000); + filerAddress = String.format("%s:%d", host, grpcPort - 10000); FilerProto.GetFilerConfigurationResponse filerConfigurationResponse = this.getBlockingStub().getFilerConfiguration( @@ -104,23 +103,36 @@ public class FilerGrpcClient { public void setAccessVolumeServerDirectly() { this.volumeServerAccess = VOLUME_SERVER_ACCESS_DIRECT; } + public boolean isAccessVolumeServerDirectly() { return this.volumeServerAccess == VOLUME_SERVER_ACCESS_DIRECT; } + public void setAccessVolumeServerByPublicUrl() { this.volumeServerAccess = VOLUME_SERVER_ACCESS_PUBLIC_URL; } + public boolean isAccessVolumeServerByPublicUrl() { return this.volumeServerAccess == VOLUME_SERVER_ACCESS_PUBLIC_URL; } + public void setAccessVolumeServerByFilerProxy() { this.volumeServerAccess = VOLUME_SERVER_ACCESS_FILER_PROXY; } + public boolean isAccessVolumeServerByFilerProxy() { return this.volumeServerAccess == VOLUME_SERVER_ACCESS_FILER_PROXY; } - public String getFilerAddress() { - return this.filerAddress; + + public String getChunkUrl(String chunkId, String url, String publicUrl) { + switch (this.volumeServerAccess) { + case VOLUME_SERVER_ACCESS_PUBLIC_URL: + return String.format("http://%s/%s", publicUrl, chunkId); + case VOLUME_SERVER_ACCESS_FILER_PROXY: + return String.format("http://%s/?proxyChunkId=%s", this.filerAddress, chunkId); + default: + return String.format("http://%s/%s", url, chunkId); + } } } diff --git a/other/java/client/src/main/java/seaweedfs/client/SeaweedRead.java b/other/java/client/src/main/java/seaweedfs/client/SeaweedRead.java index a70553762..e55c5b7aa 100644 --- a/other/java/client/src/main/java/seaweedfs/client/SeaweedRead.java +++ b/other/java/client/src/main/java/seaweedfs/client/SeaweedRead.java @@ -116,13 +116,7 @@ public class SeaweedRead { IOException lastException = null; for (long waitTime = 1000L; waitTime < 10 * 1000; waitTime += waitTime / 2) { for (FilerProto.Location location : locations.getLocationsList()) { - String host = location.getUrl(); - if (filerGrpcClient.isAccessVolumeServerByPublicUrl()) { - host = location.getPublicUrl(); - } else if (filerGrpcClient.isAccessVolumeServerByFilerProxy()) { - host = filerGrpcClient.getFilerAddress(); - } - String url = String.format("http://%s/%s", host, chunkView.fileId); + String url = filerGrpcClient.getChunkUrl(chunkView.fileId, location.getUrl(), location.getPublicUrl()); try { data = doFetchOneFullChunkData(chunkView, url); lastException = null; diff --git a/other/java/client/src/main/java/seaweedfs/client/SeaweedWrite.java b/other/java/client/src/main/java/seaweedfs/client/SeaweedWrite.java index 3cc11e21c..db3cc3931 100644 --- a/other/java/client/src/main/java/seaweedfs/client/SeaweedWrite.java +++ b/other/java/client/src/main/java/seaweedfs/client/SeaweedWrite.java @@ -53,13 +53,7 @@ public class SeaweedWrite { String fileId = response.getFileId(); String auth = response.getAuth(); - String host = response.getUrl(); - if (filerGrpcClient.isAccessVolumeServerByPublicUrl()) { - host = response.getPublicUrl(); - } else if (filerGrpcClient.isAccessVolumeServerByFilerProxy()) { - host = filerGrpcClient.getFilerAddress(); - } - String targetUrl = String.format("http://%s/%s", host, fileId); + String targetUrl = filerGrpcClient.getChunkUrl(fileId, response.getUrl(), response.getPublicUrl()); ByteString cipherKeyString = com.google.protobuf.ByteString.EMPTY; byte[] cipherKey = null; -- cgit v1.2.3 From ad36c7b0d76f870a5cb0c7c68f5c81ec5340a79e Mon Sep 17 00:00:00 2001 From: Chris Lu Date: Mon, 8 Feb 2021 02:28:45 -0800 Subject: refactoring: only expose FilerClient class --- .../java/seaweedfs/client/FileChunkManifest.java | 26 +++++++++++----------- .../main/java/seaweedfs/client/FilerClient.java | 24 ++++++++------------ .../java/seaweedfs/client/SeaweedInputStream.java | 17 +++++++------- .../java/seaweedfs/client/SeaweedOutputStream.java | 18 +++++++-------- .../main/java/seaweedfs/client/SeaweedRead.java | 18 +++++++-------- .../main/java/seaweedfs/client/SeaweedWrite.java | 22 +++++++++--------- 6 files changed, 59 insertions(+), 66 deletions(-) (limited to 'other/java/client/src') diff --git a/other/java/client/src/main/java/seaweedfs/client/FileChunkManifest.java b/other/java/client/src/main/java/seaweedfs/client/FileChunkManifest.java index 3d7da91d5..9b6ba5dfc 100644 --- a/other/java/client/src/main/java/seaweedfs/client/FileChunkManifest.java +++ b/other/java/client/src/main/java/seaweedfs/client/FileChunkManifest.java @@ -23,7 +23,7 @@ public class FileChunkManifest { } public static List resolveChunkManifest( - final FilerGrpcClient filerGrpcClient, List chunks) throws IOException { + final FilerClient filerClient, List chunks) throws IOException { List dataChunks = new ArrayList<>(); @@ -35,30 +35,30 @@ public class FileChunkManifest { // IsChunkManifest LOG.debug("fetching chunk manifest:{}", chunk); - byte[] data = fetchChunk(filerGrpcClient, chunk); + byte[] data = fetchChunk(filerClient, chunk); FilerProto.FileChunkManifest m = FilerProto.FileChunkManifest.newBuilder().mergeFrom(data).build(); List resolvedChunks = new ArrayList<>(); for (FilerProto.FileChunk t : m.getChunksList()) { // avoid deprecated chunk.getFileId() resolvedChunks.add(t.toBuilder().setFileId(FilerClient.toFileId(t.getFid())).build()); } - dataChunks.addAll(resolveChunkManifest(filerGrpcClient, resolvedChunks)); + dataChunks.addAll(resolveChunkManifest(filerClient, resolvedChunks)); } return dataChunks; } - private static byte[] fetchChunk(final FilerGrpcClient filerGrpcClient, FilerProto.FileChunk chunk) throws IOException { + private static byte[] fetchChunk(final FilerClient filerClient, FilerProto.FileChunk chunk) throws IOException { String vid = "" + chunk.getFid().getVolumeId(); - FilerProto.Locations locations = filerGrpcClient.vidLocations.get(vid); + FilerProto.Locations locations = filerClient.vidLocations.get(vid); if (locations == null) { FilerProto.LookupVolumeRequest.Builder lookupRequest = FilerProto.LookupVolumeRequest.newBuilder(); lookupRequest.addVolumeIds(vid); - FilerProto.LookupVolumeResponse lookupResponse = filerGrpcClient + FilerProto.LookupVolumeResponse lookupResponse = filerClient .getBlockingStub().lookupVolume(lookupRequest.build()); locations = lookupResponse.getLocationsMapMap().get(vid); - filerGrpcClient.vidLocations.put(vid, locations); + filerClient.vidLocations.put(vid, locations); LOG.debug("fetchChunk vid:{} locations:{}", vid, locations); } @@ -74,7 +74,7 @@ public class FileChunkManifest { byte[] chunkData = SeaweedRead.chunkCache.getChunk(chunkView.fileId); if (chunkData == null) { LOG.debug("doFetchFullChunkData:{}", chunkView); - chunkData = SeaweedRead.doFetchFullChunkData(filerGrpcClient, chunkView, locations); + chunkData = SeaweedRead.doFetchFullChunkData(filerClient, chunkView, locations); } if (chunk.getIsChunkManifest()){ LOG.debug("chunk {} size {}", chunkView.fileId, chunkData.length); @@ -86,7 +86,7 @@ public class FileChunkManifest { } public static List maybeManifestize( - final FilerGrpcClient filerGrpcClient, List inputChunks, String parentDirectory) throws IOException { + final FilerClient filerClient, List inputChunks, String parentDirectory) throws IOException { // the return variable List chunks = new ArrayList<>(); @@ -101,7 +101,7 @@ public class FileChunkManifest { int remaining = dataChunks.size(); for (int i = 0; i + mergeFactor < dataChunks.size(); i += mergeFactor) { - FilerProto.FileChunk chunk = mergeIntoManifest(filerGrpcClient, dataChunks.subList(i, i + mergeFactor), parentDirectory); + FilerProto.FileChunk chunk = mergeIntoManifest(filerClient, dataChunks.subList(i, i + mergeFactor), parentDirectory); chunks.add(chunk); remaining -= mergeFactor; } @@ -113,7 +113,7 @@ public class FileChunkManifest { return chunks; } - private static FilerProto.FileChunk mergeIntoManifest(final FilerGrpcClient filerGrpcClient, List dataChunks, String parentDirectory) throws IOException { + private static FilerProto.FileChunk mergeIntoManifest(final FilerClient filerClient, List dataChunks, String parentDirectory) throws IOException { // create and serialize the manifest dataChunks = FilerClient.beforeEntrySerialization(dataChunks); FilerProto.FileChunkManifest.Builder m = FilerProto.FileChunkManifest.newBuilder().addAllChunks(dataChunks); @@ -127,8 +127,8 @@ public class FileChunkManifest { } FilerProto.FileChunk.Builder manifestChunk = SeaweedWrite.writeChunk( - filerGrpcClient.getReplication(), - filerGrpcClient, + filerClient.getReplication(), + filerClient, minOffset, data, 0, data.length, parentDirectory); manifestChunk.setIsChunkManifest(true); diff --git a/other/java/client/src/main/java/seaweedfs/client/FilerClient.java b/other/java/client/src/main/java/seaweedfs/client/FilerClient.java index 7338d5bee..58269d41f 100644 --- a/other/java/client/src/main/java/seaweedfs/client/FilerClient.java +++ b/other/java/client/src/main/java/seaweedfs/client/FilerClient.java @@ -11,18 +11,12 @@ import java.util.Arrays; import java.util.Iterator; import java.util.List; -public class FilerClient { +public class FilerClient extends FilerGrpcClient { private static final Logger LOG = LoggerFactory.getLogger(FilerClient.class); - private final FilerGrpcClient filerGrpcClient; - public FilerClient(String host, int grpcPort) { - filerGrpcClient = new FilerGrpcClient(host, grpcPort); - } - - public FilerClient(FilerGrpcClient filerGrpcClient) { - this.filerGrpcClient = filerGrpcClient; + super(host, grpcPort); } public static String toFileId(FilerProto.FileId fid) { @@ -236,7 +230,7 @@ public class FilerClient { } public List listEntries(String path, String entryPrefix, String lastEntryName, int limit, boolean includeLastEntry) { - Iterator iter = filerGrpcClient.getBlockingStub().listEntries(FilerProto.ListEntriesRequest.newBuilder() + Iterator iter = this.getBlockingStub().listEntries(FilerProto.ListEntriesRequest.newBuilder() .setDirectory(path) .setPrefix(entryPrefix) .setStartFromFileName(lastEntryName) @@ -253,7 +247,7 @@ public class FilerClient { public FilerProto.Entry lookupEntry(String directory, String entryName) { try { - FilerProto.Entry entry = filerGrpcClient.getBlockingStub().lookupDirectoryEntry( + FilerProto.Entry entry = this.getBlockingStub().lookupDirectoryEntry( FilerProto.LookupDirectoryEntryRequest.newBuilder() .setDirectory(directory) .setName(entryName) @@ -274,7 +268,7 @@ public class FilerClient { public boolean createEntry(String parent, FilerProto.Entry entry) { try { FilerProto.CreateEntryResponse createEntryResponse = - filerGrpcClient.getBlockingStub().createEntry(FilerProto.CreateEntryRequest.newBuilder() + this.getBlockingStub().createEntry(FilerProto.CreateEntryRequest.newBuilder() .setDirectory(parent) .setEntry(entry) .build()); @@ -291,7 +285,7 @@ public class FilerClient { public boolean updateEntry(String parent, FilerProto.Entry entry) { try { - filerGrpcClient.getBlockingStub().updateEntry(FilerProto.UpdateEntryRequest.newBuilder() + this.getBlockingStub().updateEntry(FilerProto.UpdateEntryRequest.newBuilder() .setDirectory(parent) .setEntry(entry) .build()); @@ -304,7 +298,7 @@ public class FilerClient { public boolean deleteEntry(String parent, String entryName, boolean isDeleteFileChunk, boolean isRecursive, boolean ignoreRecusiveError) { try { - filerGrpcClient.getBlockingStub().deleteEntry(FilerProto.DeleteEntryRequest.newBuilder() + this.getBlockingStub().deleteEntry(FilerProto.DeleteEntryRequest.newBuilder() .setDirectory(parent) .setName(entryName) .setIsDeleteData(isDeleteFileChunk) @@ -320,7 +314,7 @@ public class FilerClient { public boolean atomicRenameEntry(String oldParent, String oldName, String newParent, String newName) { try { - filerGrpcClient.getBlockingStub().atomicRenameEntry(FilerProto.AtomicRenameEntryRequest.newBuilder() + this.getBlockingStub().atomicRenameEntry(FilerProto.AtomicRenameEntryRequest.newBuilder() .setOldDirectory(oldParent) .setOldName(oldName) .setNewDirectory(newParent) @@ -334,7 +328,7 @@ public class FilerClient { } public Iterator watch(String prefix, String clientName, long sinceNs) { - return filerGrpcClient.getBlockingStub().subscribeMetadata(FilerProto.SubscribeMetadataRequest.newBuilder() + return this.getBlockingStub().subscribeMetadata(FilerProto.SubscribeMetadataRequest.newBuilder() .setPathPrefix(prefix) .setClientName(clientName) .setSinceNs(sinceNs) diff --git a/other/java/client/src/main/java/seaweedfs/client/SeaweedInputStream.java b/other/java/client/src/main/java/seaweedfs/client/SeaweedInputStream.java index 8b26c242c..4e40ce1b6 100644 --- a/other/java/client/src/main/java/seaweedfs/client/SeaweedInputStream.java +++ b/other/java/client/src/main/java/seaweedfs/client/SeaweedInputStream.java @@ -16,7 +16,7 @@ public class SeaweedInputStream extends InputStream { private static final Logger LOG = LoggerFactory.getLogger(SeaweedInputStream.class); private static final IOException EXCEPTION_STREAM_IS_CLOSED = new IOException("Stream is closed!"); - private final FilerGrpcClient filerGrpcClient; + private final FilerClient filerClient; private final String path; private final FilerProto.Entry entry; private final List visibleIntervalList; @@ -27,32 +27,31 @@ public class SeaweedInputStream extends InputStream { private boolean closed = false; public SeaweedInputStream( - final FilerGrpcClient filerGrpcClient, + final FilerClient filerClient, final String fullpath) throws IOException { - this.filerGrpcClient = filerGrpcClient; this.path = fullpath; - FilerClient filerClient = new FilerClient(filerGrpcClient); + this.filerClient = filerClient; this.entry = filerClient.lookupEntry( SeaweedOutputStream.getParentDirectory(fullpath), SeaweedOutputStream.getFileName(fullpath)); this.contentLength = SeaweedRead.fileSize(entry); - this.visibleIntervalList = SeaweedRead.nonOverlappingVisibleIntervals(filerGrpcClient, entry.getChunksList()); + this.visibleIntervalList = SeaweedRead.nonOverlappingVisibleIntervals(filerClient, entry.getChunksList()); LOG.debug("new path:{} entry:{} visibleIntervalList:{}", path, entry, visibleIntervalList); } public SeaweedInputStream( - final FilerGrpcClient filerGrpcClient, + final FilerClient filerClient, final String path, final FilerProto.Entry entry) throws IOException { - this.filerGrpcClient = filerGrpcClient; + this.filerClient = filerClient; this.path = path; this.entry = entry; this.contentLength = SeaweedRead.fileSize(entry); - this.visibleIntervalList = SeaweedRead.nonOverlappingVisibleIntervals(filerGrpcClient, entry.getChunksList()); + this.visibleIntervalList = SeaweedRead.nonOverlappingVisibleIntervals(filerClient, entry.getChunksList()); LOG.debug("new path:{} entry:{} visibleIntervalList:{}", path, entry, visibleIntervalList); @@ -110,7 +109,7 @@ public class SeaweedInputStream extends InputStream { if (start+len <= entry.getContent().size()) { entry.getContent().substring(start, start+len).copyTo(buf); } else { - bytesRead = SeaweedRead.read(this.filerGrpcClient, this.visibleIntervalList, this.position, buf, SeaweedRead.fileSize(entry)); + bytesRead = SeaweedRead.read(this.filerClient, this.visibleIntervalList, this.position, buf, SeaweedRead.fileSize(entry)); } if (bytesRead > Integer.MAX_VALUE) { diff --git a/other/java/client/src/main/java/seaweedfs/client/SeaweedOutputStream.java b/other/java/client/src/main/java/seaweedfs/client/SeaweedOutputStream.java index 92dc59f61..b73e99e69 100644 --- a/other/java/client/src/main/java/seaweedfs/client/SeaweedOutputStream.java +++ b/other/java/client/src/main/java/seaweedfs/client/SeaweedOutputStream.java @@ -15,7 +15,7 @@ public class SeaweedOutputStream extends OutputStream { private static final Logger LOG = LoggerFactory.getLogger(SeaweedOutputStream.class); protected final boolean supportFlush = true; - private final FilerGrpcClient filerGrpcClient; + private final FilerClient filerClient; private final String path; private final int bufferSize; private final int maxConcurrentRequestCount; @@ -33,17 +33,17 @@ public class SeaweedOutputStream extends OutputStream { private long outputIndex; private String replication = "000"; - public SeaweedOutputStream(FilerGrpcClient filerGrpcClient, final String fullpath) { - this(filerGrpcClient, fullpath, "000"); + public SeaweedOutputStream(FilerClient filerClient, final String fullpath) { + this(filerClient, fullpath, "000"); } - public SeaweedOutputStream(FilerGrpcClient filerGrpcClient, final String fullpath, final String replication) { - this(filerGrpcClient, fullpath, null, 0, 8 * 1024 * 1024, "000"); + public SeaweedOutputStream(FilerClient filerClient, final String fullpath, final String replication) { + this(filerClient, fullpath, null, 0, 8 * 1024 * 1024, "000"); } - public SeaweedOutputStream(FilerGrpcClient filerGrpcClient, final String path, FilerProto.Entry.Builder entry, + public SeaweedOutputStream(FilerClient filerClient, final String path, FilerProto.Entry.Builder entry, final long position, final int bufferSize, final String replication) { - this.filerGrpcClient = filerGrpcClient; + this.filerClient = filerClient; this.replication = replication; this.path = path; this.position = position; @@ -109,7 +109,7 @@ public class SeaweedOutputStream extends OutputStream { private synchronized void flushWrittenBytesToServiceInternal(final long offset) throws IOException { try { - SeaweedWrite.writeMeta(filerGrpcClient, getParentDirectory(path), entry); + SeaweedWrite.writeMeta(filerClient, getParentDirectory(path), entry); } catch (Exception ex) { throw new IOException(ex); } @@ -225,7 +225,7 @@ public class SeaweedOutputStream extends OutputStream { } final Future job = completionService.submit(() -> { // System.out.println(path + " is going to save [" + (writePosition) + "," + ((writePosition) + bytesLength) + ")"); - SeaweedWrite.writeData(entry, replication, filerGrpcClient, writePosition, bufferToWrite.array(), bufferToWrite.position(), bufferToWrite.limit(), path); + SeaweedWrite.writeData(entry, replication, filerClient, writePosition, bufferToWrite.array(), bufferToWrite.position(), bufferToWrite.limit(), path); // System.out.println(path + " saved [" + (writePosition) + "," + ((writePosition) + bytesLength) + ")"); ByteBufferPool.release(bufferToWrite); return null; diff --git a/other/java/client/src/main/java/seaweedfs/client/SeaweedRead.java b/other/java/client/src/main/java/seaweedfs/client/SeaweedRead.java index e55c5b7aa..384636601 100644 --- a/other/java/client/src/main/java/seaweedfs/client/SeaweedRead.java +++ b/other/java/client/src/main/java/seaweedfs/client/SeaweedRead.java @@ -23,7 +23,7 @@ public class SeaweedRead { static VolumeIdCache volumeIdCache = new VolumeIdCache(4 * 1024); // returns bytesRead - public static long read(FilerGrpcClient filerGrpcClient, List visibleIntervals, + public static long read(FilerClient filerClient, List visibleIntervals, final long position, final ByteBuffer buf, final long fileSize) throws IOException { List chunkViews = viewFromVisibles(visibleIntervals, position, buf.remaining()); @@ -42,7 +42,7 @@ public class SeaweedRead { } if (lookupRequest.getVolumeIdsCount() > 0) { - FilerProto.LookupVolumeResponse lookupResponse = filerGrpcClient + FilerProto.LookupVolumeResponse lookupResponse = filerClient .getBlockingStub().lookupVolume(lookupRequest.build()); Map vid2Locations = lookupResponse.getLocationsMapMap(); for (Map.Entry entry : vid2Locations.entrySet()) { @@ -71,7 +71,7 @@ public class SeaweedRead { return 0; } - int len = readChunkView(filerGrpcClient, startOffset, buf, chunkView, locations); + int len = readChunkView(filerClient, startOffset, buf, chunkView, locations); LOG.debug("read [{},{}) {} size {}", startOffset, startOffset + len, chunkView.fileId, chunkView.size); @@ -93,12 +93,12 @@ public class SeaweedRead { return readCount; } - private static int readChunkView(FilerGrpcClient filerGrpcClient, long startOffset, ByteBuffer buf, ChunkView chunkView, FilerProto.Locations locations) throws IOException { + private static int readChunkView(FilerClient filerClient, long startOffset, ByteBuffer buf, ChunkView chunkView, FilerProto.Locations locations) throws IOException { byte[] chunkData = chunkCache.getChunk(chunkView.fileId); if (chunkData == null) { - chunkData = doFetchFullChunkData(filerGrpcClient, chunkView, locations); + chunkData = doFetchFullChunkData(filerClient, chunkView, locations); chunkCache.setChunk(chunkView.fileId, chunkData); } @@ -110,13 +110,13 @@ public class SeaweedRead { return len; } - public static byte[] doFetchFullChunkData(FilerGrpcClient filerGrpcClient, ChunkView chunkView, FilerProto.Locations locations) throws IOException { + public static byte[] doFetchFullChunkData(FilerClient filerClient, ChunkView chunkView, FilerProto.Locations locations) throws IOException { byte[] data = null; IOException lastException = null; for (long waitTime = 1000L; waitTime < 10 * 1000; waitTime += waitTime / 2) { for (FilerProto.Location location : locations.getLocationsList()) { - String url = filerGrpcClient.getChunkUrl(chunkView.fileId, location.getUrl(), location.getPublicUrl()); + String url = filerClient.getChunkUrl(chunkView.fileId, location.getUrl(), location.getPublicUrl()); try { data = doFetchOneFullChunkData(chunkView, url); lastException = null; @@ -221,9 +221,9 @@ public class SeaweedRead { } public static List nonOverlappingVisibleIntervals( - final FilerGrpcClient filerGrpcClient, List chunkList) throws IOException { + final FilerClient filerClient, List chunkList) throws IOException { - chunkList = FileChunkManifest.resolveChunkManifest(filerGrpcClient, chunkList); + chunkList = FileChunkManifest.resolveChunkManifest(filerClient, chunkList); FilerProto.FileChunk[] chunks = chunkList.toArray(new FilerProto.FileChunk[0]); Arrays.sort(chunks, new Comparator() { diff --git a/other/java/client/src/main/java/seaweedfs/client/SeaweedWrite.java b/other/java/client/src/main/java/seaweedfs/client/SeaweedWrite.java index db3cc3931..f8c0c76b6 100644 --- a/other/java/client/src/main/java/seaweedfs/client/SeaweedWrite.java +++ b/other/java/client/src/main/java/seaweedfs/client/SeaweedWrite.java @@ -23,29 +23,29 @@ public class SeaweedWrite { public static void writeData(FilerProto.Entry.Builder entry, final String replication, - final FilerGrpcClient filerGrpcClient, + final FilerClient filerClient, final long offset, final byte[] bytes, final long bytesOffset, final long bytesLength, final String path) throws IOException { FilerProto.FileChunk.Builder chunkBuilder = writeChunk( - replication, filerGrpcClient, offset, bytes, bytesOffset, bytesLength, path); + replication, filerClient, offset, bytes, bytesOffset, bytesLength, path); synchronized (entry) { entry.addChunks(chunkBuilder); } } public static FilerProto.FileChunk.Builder writeChunk(final String replication, - final FilerGrpcClient filerGrpcClient, + final FilerClient filerClient, final long offset, final byte[] bytes, final long bytesOffset, final long bytesLength, final String path) throws IOException { - FilerProto.AssignVolumeResponse response = filerGrpcClient.getBlockingStub().assignVolume( + FilerProto.AssignVolumeResponse response = filerClient.getBlockingStub().assignVolume( FilerProto.AssignVolumeRequest.newBuilder() - .setCollection(filerGrpcClient.getCollection()) - .setReplication(replication == null ? filerGrpcClient.getReplication() : replication) + .setCollection(filerClient.getCollection()) + .setReplication(replication == null ? filerClient.getReplication() : replication) .setDataCenter("") .setTtlSec(0) .setPath(path) @@ -53,11 +53,11 @@ public class SeaweedWrite { String fileId = response.getFileId(); String auth = response.getAuth(); - String targetUrl = filerGrpcClient.getChunkUrl(fileId, response.getUrl(), response.getPublicUrl()); + String targetUrl = filerClient.getChunkUrl(fileId, response.getUrl(), response.getPublicUrl()); ByteString cipherKeyString = com.google.protobuf.ByteString.EMPTY; byte[] cipherKey = null; - if (filerGrpcClient.isCipher()) { + if (filerClient.isCipher()) { cipherKey = genCipherKey(); cipherKeyString = ByteString.copyFrom(cipherKey); } @@ -75,15 +75,15 @@ public class SeaweedWrite { .setCipherKey(cipherKeyString); } - public static void writeMeta(final FilerGrpcClient filerGrpcClient, + public static void writeMeta(final FilerClient filerClient, final String parentDirectory, final FilerProto.Entry.Builder entry) throws IOException { synchronized (entry) { - List chunks = FileChunkManifest.maybeManifestize(filerGrpcClient, entry.getChunksList(), parentDirectory); + List chunks = FileChunkManifest.maybeManifestize(filerClient, entry.getChunksList(), parentDirectory); entry.clearChunks(); entry.addAllChunks(chunks); - filerGrpcClient.getBlockingStub().createEntry( + filerClient.getBlockingStub().createEntry( FilerProto.CreateEntryRequest.newBuilder() .setDirectory(parentDirectory) .setEntry(entry) -- cgit v1.2.3