diff options
Diffstat (limited to 'other/java/client/src/main')
5 files changed, 231 insertions, 115 deletions
diff --git a/other/java/client/src/main/java/seaweedfs/client/FilerGrpcClient.java b/other/java/client/src/main/java/seaweedfs/client/FilerGrpcClient.java index 44977d186..320a754ea 100644 --- a/other/java/client/src/main/java/seaweedfs/client/FilerGrpcClient.java +++ b/other/java/client/src/main/java/seaweedfs/client/FilerGrpcClient.java @@ -8,10 +8,13 @@ import io.grpc.netty.shaded.io.netty.handler.ssl.SslContext; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.util.ArrayList; import java.util.HashMap; +import java.util.List; import java.util.Map; import java.util.Random; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; public class FilerGrpcClient { @@ -29,10 +32,12 @@ public class FilerGrpcClient { public final int VOLUME_SERVER_ACCESS_FILER_PROXY = 2; public final Map<String, FilerProto.Locations> vidLocations = new HashMap<>(); protected int randomClientId; - private final ManagedChannel channel; - private final SeaweedFilerGrpc.SeaweedFilerBlockingStub blockingStub; - private final SeaweedFilerGrpc.SeaweedFilerStub asyncStub; - private final SeaweedFilerGrpc.SeaweedFilerFutureStub futureStub; + + // Connection pool to handle concurrent requests + private static final int CHANNEL_POOL_SIZE = 4; + private final List<ManagedChannel> channelPool; + private final AtomicInteger channelIndex = new AtomicInteger(0); + private boolean cipher = false; private String collection = ""; private String replication = ""; @@ -45,26 +50,18 @@ public class FilerGrpcClient { public FilerGrpcClient(String host, int port, int grpcPort, String cn, SslContext sslContext) { - this(sslContext == null ? - ManagedChannelBuilder.forAddress(host, grpcPort) - .usePlaintext() - .maxInboundMessageSize(1024 * 1024 * 1024) : - cn.isEmpty() ? - NettyChannelBuilder.forAddress(host, grpcPort) - .maxInboundMessageSize(1024 * 1024 * 1024) - .negotiationType(NegotiationType.TLS) - .sslContext(sslContext) : - NettyChannelBuilder.forAddress(host, grpcPort) - .maxInboundMessageSize(1024 * 1024 * 1024) - .negotiationType(NegotiationType.TLS) - .overrideAuthority(cn) //will not check hostname of the filer server - .sslContext(sslContext) - ); - filerAddress = SeaweedUtil.joinHostPort(host, port); - FilerProto.GetFilerConfigurationResponse filerConfigurationResponse = - this.getBlockingStub().getFilerConfiguration( + // Create a pool of channels for better concurrency handling + channelPool = new ArrayList<>(CHANNEL_POOL_SIZE); + + for (int i = 0; i < CHANNEL_POOL_SIZE; i++) { + channelPool.add(createChannelBuilder(host, grpcPort, sslContext, cn).build()); + } + + // Get filer configuration using first channel + FilerProto.GetFilerConfigurationResponse filerConfigurationResponse = SeaweedFilerGrpc + .newBlockingStub(channelPool.get(0)).getFilerConfiguration( FilerProto.GetFilerConfigurationRequest.newBuilder().build()); cipher = filerConfigurationResponse.getCipher(); collection = filerConfigurationResponse.getCollection(); @@ -73,11 +70,39 @@ public class FilerGrpcClient { } - private FilerGrpcClient(ManagedChannelBuilder<?> channelBuilder) { - channel = channelBuilder.build(); - blockingStub = SeaweedFilerGrpc.newBlockingStub(channel); - asyncStub = SeaweedFilerGrpc.newStub(channel); - futureStub = SeaweedFilerGrpc.newFutureStub(channel); + /** + * Creates a NettyChannelBuilder with common gRPC configuration. + * Supports plaintext and TLS modes with optional authority override. + */ + private NettyChannelBuilder createChannelBuilder(String host, int grpcPort, SslContext sslContext, String cn) { + NettyChannelBuilder builder = NettyChannelBuilder.forAddress(host, grpcPort) + .maxInboundMessageSize(1024 * 1024 * 1024) + .maxInboundMetadataSize(1024 * 1024) + .flowControlWindow(16 * 1024 * 1024) + .initialFlowControlWindow(16 * 1024 * 1024) + .maxHeaderListSize(16 * 1024 * 1024) + .keepAliveTime(30, TimeUnit.SECONDS) + .keepAliveTimeout(10, TimeUnit.SECONDS) + .keepAliveWithoutCalls(true) + .withOption(io.grpc.netty.shaded.io.netty.channel.ChannelOption.SO_RCVBUF, 16 * 1024 * 1024) + .withOption(io.grpc.netty.shaded.io.netty.channel.ChannelOption.SO_SNDBUF, 16 * 1024 * 1024); + + if (sslContext == null) { + builder.usePlaintext(); + } else { + builder.negotiationType(NegotiationType.TLS).sslContext(sslContext); + if (!cn.isEmpty()) { + builder.overrideAuthority(cn); + } + } + return builder; + } + + // Get a channel from the pool using round-robin + private ManagedChannel getChannel() { + int raw = channelIndex.getAndIncrement(); + int index = Math.floorMod(raw, CHANNEL_POOL_SIZE); + return channelPool.get(index); } public boolean isCipher() { @@ -93,19 +118,25 @@ public class FilerGrpcClient { } public void shutdown() throws InterruptedException { - channel.shutdown().awaitTermination(5, TimeUnit.SECONDS); + for (ManagedChannel channel : channelPool) { + channel.shutdown(); + } + for (ManagedChannel channel : channelPool) { + channel.awaitTermination(5, TimeUnit.SECONDS); + } } public SeaweedFilerGrpc.SeaweedFilerBlockingStub getBlockingStub() { - return blockingStub; + // Return a new stub using a channel from the pool (round-robin) + return SeaweedFilerGrpc.newBlockingStub(getChannel()); } public SeaweedFilerGrpc.SeaweedFilerStub getAsyncStub() { - return asyncStub; + return SeaweedFilerGrpc.newStub(getChannel()); } public SeaweedFilerGrpc.SeaweedFilerFutureStub getFutureStub() { - return futureStub; + return SeaweedFilerGrpc.newFutureStub(getChannel()); } public void setAccessVolumeServerDirectly() { diff --git a/other/java/client/src/main/java/seaweedfs/client/SeaweedInputStream.java b/other/java/client/src/main/java/seaweedfs/client/SeaweedInputStream.java index 64754321b..48a508db0 100644 --- a/other/java/client/src/main/java/seaweedfs/client/SeaweedInputStream.java +++ b/other/java/client/src/main/java/seaweedfs/client/SeaweedInputStream.java @@ -23,7 +23,7 @@ public class SeaweedInputStream extends InputStream { private final long contentLength; private FilerProto.Entry entry; - private long position = 0; // cursor of the file + private long position = 0; // cursor of the file private boolean closed = false; @@ -44,7 +44,6 @@ public class SeaweedInputStream extends InputStream { } this.contentLength = SeaweedRead.fileSize(entry); - this.visibleIntervalList = SeaweedRead.nonOverlappingVisibleIntervals(filerClient, entry.getChunksList()); LOG.debug("new path:{} entry:{} visibleIntervalList:{}", path, entry, visibleIntervalList); @@ -64,7 +63,6 @@ public class SeaweedInputStream extends InputStream { } this.contentLength = SeaweedRead.fileSize(entry); - this.visibleIntervalList = SeaweedRead.nonOverlappingVisibleIntervals(filerClient, entry.getChunksList()); LOG.debug("new path:{} entry:{} visibleIntervalList:{}", path, entry, visibleIntervalList); @@ -99,7 +97,8 @@ public class SeaweedInputStream extends InputStream { throw new IllegalArgumentException("requested read length is less than zero"); } if (len > (b.length - off)) { - throw new IllegalArgumentException("requested read length is more than will fit after requested offset in buffer"); + throw new IllegalArgumentException( + "requested read length is more than will fit after requested offset in buffer"); } ByteBuffer buf = ByteBuffer.wrap(b, off, len); @@ -114,21 +113,30 @@ public class SeaweedInputStream extends InputStream { throw new IllegalArgumentException("attempting to read from negative offset"); } if (position >= contentLength) { - return -1; // Hadoop prefers -1 to EOFException + return -1; // Hadoop prefers -1 to EOFException } long bytesRead = 0; int len = buf.remaining(); - if (this.position< Integer.MAX_VALUE && (this.position + len )<= entry.getContent().size()) { - entry.getContent().substring((int)this.position, (int)(this.position + len)).copyTo(buf); + if (this.position < Integer.MAX_VALUE && (this.position + len) <= entry.getContent().size()) { + entry.getContent().substring((int) this.position, (int) (this.position + len)).copyTo(buf); + bytesRead = len; // FIX: Update bytesRead after inline copy } else { - bytesRead = SeaweedRead.read(this.filerClient, this.visibleIntervalList, this.position, buf, SeaweedRead.fileSize(entry)); + // Use the known contentLength instead of recomputing from the entry to avoid + // races + bytesRead = SeaweedRead.read(this.filerClient, this.visibleIntervalList, this.position, buf, + this.contentLength); } if (bytesRead > Integer.MAX_VALUE) { throw new IOException("Unexpected Content-Length"); } + // Clamp premature EOFs: do not return -1 unless position >= contentLength + if (bytesRead < 0 && position < contentLength) { + bytesRead = 0; + } + if (bytesRead > 0) { this.position += bytesRead; } @@ -188,12 +196,15 @@ public class SeaweedInputStream extends InputStream { } final long remaining = this.contentLength - this.position; return remaining <= Integer.MAX_VALUE - ? (int) remaining : Integer.MAX_VALUE; + ? (int) remaining + : Integer.MAX_VALUE; } /** - * Returns the length of the file that this stream refers to. Note that the length returned is the length - * as of the time the Stream was opened. Specifically, if there have been subsequent appends to the file, + * Returns the length of the file that this stream refers to. Note that the + * length returned is the length + * as of the time the Stream was opened. Specifically, if there have been + * subsequent appends to the file, * they wont be reflected in the returned length. * * @return length of the file. diff --git a/other/java/client/src/main/java/seaweedfs/client/SeaweedOutputStream.java b/other/java/client/src/main/java/seaweedfs/client/SeaweedOutputStream.java index 68c281992..ea4c99805 100644 --- a/other/java/client/src/main/java/seaweedfs/client/SeaweedOutputStream.java +++ b/other/java/client/src/main/java/seaweedfs/client/SeaweedOutputStream.java @@ -23,15 +23,13 @@ public class SeaweedOutputStream extends OutputStream { private final ThreadPoolExecutor threadExecutor; private final ExecutorCompletionService<Void> completionService; private final ConcurrentLinkedDeque<WriteOperation> writeOperations; - private final boolean shouldSaveMetadata = false; private FilerProto.Entry.Builder entry; - private long position; + private long position; // Flushed bytes (committed to service) private boolean closed; private volatile IOException lastError; private long lastFlushOffset; private long lastTotalAppendOffset = 0; private ByteBuffer buffer; - private long outputIndex; private String replication = ""; private String collection = ""; @@ -44,7 +42,8 @@ public class SeaweedOutputStream extends OutputStream { } public SeaweedOutputStream(FilerClient filerClient, final String path, FilerProto.Entry.Builder entry, - final long position, final int bufferSize, final String replication) { + final long position, final int bufferSize, final String replication) { + this.filerClient = filerClient; this.replication = replication; this.path = path; @@ -58,8 +57,7 @@ public class SeaweedOutputStream extends OutputStream { this.maxConcurrentRequestCount = Runtime.getRuntime().availableProcessors(); - this.threadExecutor - = new ThreadPoolExecutor(maxConcurrentRequestCount, + this.threadExecutor = new ThreadPoolExecutor(maxConcurrentRequestCount, maxConcurrentRequestCount, 120L, TimeUnit.SECONDS, @@ -77,8 +75,7 @@ public class SeaweedOutputStream extends OutputStream { .setFileMode(0755) .setCrtime(now) .setMtime(now) - .clearGroupName() - ); + .clearGroupName()); } } @@ -86,14 +83,35 @@ public class SeaweedOutputStream extends OutputStream { public void setReplication(String replication) { this.replication = replication; } + public void setCollection(String collection) { this.collection = collection; } + /** + * Get the current position in the output stream. + * This returns the total position including both flushed and buffered data. + * + * @return current position (flushed + buffered bytes) + */ + public synchronized long getPos() throws IOException { + // Guard against NPE if called after close() + if (buffer == null) { + return position; + } + + // Return current position (flushed + buffered) + return position + buffer.position(); + } + public static String getParentDirectory(String path) { int protoIndex = path.indexOf("://"); if (protoIndex >= 0) { - int pathStart = path.indexOf("/", protoIndex+3); + int pathStart = path.indexOf("/", protoIndex + 3); + if (pathStart < 0) { + // No path segment; treat as root (e.g., "seaweedfs://host") + return "/"; + } path = path.substring(pathStart); } if (path.equals("/")) { @@ -116,6 +134,13 @@ public class SeaweedOutputStream extends OutputStream { private synchronized void flushWrittenBytesToServiceInternal(final long offset) throws IOException { try { + + // Set the file size in attributes based on our position + // This ensures Parquet footer metadata matches what we actually wrote + FilerProto.FuseAttributes.Builder attrBuilder = entry.getAttributes().toBuilder(); + attrBuilder.setFileSize(offset); + entry.setAttributes(attrBuilder); + SeaweedWrite.writeMeta(filerClient, getParentDirectory(path), entry); } catch (Exception ex) { throw new IOException(ex); @@ -125,7 +150,7 @@ public class SeaweedOutputStream extends OutputStream { @Override public void write(final int byteVal) throws IOException { - write(new byte[]{(byte) (byteVal & 0xFF)}); + write(new byte[] { (byte) (byteVal & 0xFF) }); } @Override @@ -141,8 +166,6 @@ public class SeaweedOutputStream extends OutputStream { throw new IndexOutOfBoundsException(); } - // System.out.println(path + " write [" + (outputIndex + off) + "," + ((outputIndex + off) + length) + ")"); - int currentOffset = off; int writableBytes = bufferSize - buffer.position(); int numberOfBytesToWrite = length; @@ -154,9 +177,11 @@ public class SeaweedOutputStream extends OutputStream { break; } - // System.out.println(path + " [" + (outputIndex + currentOffset) + "," + ((outputIndex + currentOffset) + writableBytes) + ") " + buffer.capacity()); + // System.out.println(path + " [" + (outputIndex + currentOffset) + "," + + // ((outputIndex + currentOffset) + writableBytes) + ") " + buffer.capacity()); buffer.put(data, currentOffset, writableBytes); currentOffset += writableBytes; + writeCurrentBufferToService(); numberOfBytesToWrite = numberOfBytesToWrite - writableBytes; writableBytes = bufferSize - buffer.position(); @@ -191,7 +216,6 @@ public class SeaweedOutputStream extends OutputStream { return; } - LOG.debug("close path: {}", path); try { flushInternal(); threadExecutor.shutdown(); @@ -209,28 +233,35 @@ public class SeaweedOutputStream extends OutputStream { } private synchronized void writeCurrentBufferToService() throws IOException { - if (buffer.position() == 0) { + int bufferPos = buffer.position(); + + if (bufferPos == 0) { return; } - position += submitWriteBufferToService(buffer, position); + int written = submitWriteBufferToService(buffer, position); + position += written; buffer = ByteBufferPool.request(bufferSize); } - private synchronized int submitWriteBufferToService(final ByteBuffer bufferToWrite, final long writePosition) throws IOException { + private synchronized int submitWriteBufferToService(final ByteBuffer bufferToWrite, final long writePosition) + throws IOException { - ((Buffer)bufferToWrite).flip(); + ((Buffer) bufferToWrite).flip(); int bytesLength = bufferToWrite.limit() - bufferToWrite.position(); if (threadExecutor.getQueue().size() >= maxConcurrentRequestCount) { waitForTaskToComplete(); } final Future<Void> job = completionService.submit(() -> { - // System.out.println(path + " is going to save [" + (writePosition) + "," + ((writePosition) + bytesLength) + ")"); - SeaweedWrite.writeData(entry, replication, collection, filerClient, writePosition, bufferToWrite.array(), bufferToWrite.position(), bufferToWrite.limit(), path); - // System.out.println(path + " saved [" + (writePosition) + "," + ((writePosition) + bytesLength) + ")"); + // System.out.println(path + " is going to save [" + (writePosition) + "," + + // ((writePosition) + bytesLength) + ")"); + SeaweedWrite.writeData(entry, replication, collection, filerClient, writePosition, bufferToWrite.array(), + bufferToWrite.position(), bufferToWrite.limit(), path); + // System.out.println(path + " saved [" + (writePosition) + "," + + // ((writePosition) + bytesLength) + ")"); ByteBufferPool.release(bufferToWrite); return null; }); @@ -318,12 +349,10 @@ public class SeaweedOutputStream extends OutputStream { private static class WriteOperation { private final Future<Void> task; - private final long startOffset; private final long length; WriteOperation(final Future<Void> task, final long startOffset, final long length) { this.task = task; - this.startOffset = startOffset; this.length = length; } } diff --git a/other/java/client/src/main/java/seaweedfs/client/SeaweedRead.java b/other/java/client/src/main/java/seaweedfs/client/SeaweedRead.java index cac85d186..3fd184671 100644 --- a/other/java/client/src/main/java/seaweedfs/client/SeaweedRead.java +++ b/other/java/client/src/main/java/seaweedfs/client/SeaweedRead.java @@ -24,9 +24,10 @@ public class SeaweedRead { // returns bytesRead public static long read(FilerClient filerClient, List<VisibleInterval> visibleIntervals, - final long position, final ByteBuffer buf, final long fileSize) throws IOException { + final long position, final ByteBuffer buf, final long fileSize) throws IOException { - List<ChunkView> chunkViews = viewFromVisibles(visibleIntervals, position, buf.remaining()); + int originalRemaining = buf.remaining(); + List<ChunkView> chunkViews = viewFromVisibles(visibleIntervals, position, originalRemaining); Map<String, FilerProto.Locations> knownLocations = new HashMap<>(); @@ -51,7 +52,7 @@ public class SeaweedRead { } } - //TODO parallel this + // TODO parallel this long readCount = 0; long startOffset = position; for (ChunkView chunkView : chunkViews) { @@ -59,7 +60,7 @@ public class SeaweedRead { if (startOffset < chunkView.logicOffset) { long gap = chunkView.logicOffset - startOffset; LOG.debug("zero [{},{})", startOffset, startOffset + gap); - buf.position(buf.position()+ (int)gap); + buf.position(buf.position() + (int) gap); readCount += gap; startOffset += gap; } @@ -81,12 +82,17 @@ public class SeaweedRead { } - long limit = Math.min(buf.limit(), fileSize); + // Fix: Calculate the correct limit based on the read position and requested + // size, + // not the buffer's absolute limit. This fixes the 78-byte EOF error when + // seeking + // near the end of the file. + long limit = Math.min(position + originalRemaining, fileSize); if (startOffset < limit) { long gap = limit - startOffset; LOG.debug("zero2 [{},{})", startOffset, startOffset + gap); - buf.position(buf.position()+ (int)gap); + buf.position(buf.position() + (int) gap); readCount += gap; startOffset += gap; } @@ -94,7 +100,8 @@ public class SeaweedRead { return readCount; } - private static int readChunkView(FilerClient filerClient, long startOffset, ByteBuffer buf, ChunkView chunkView, FilerProto.Locations locations) throws IOException { + private static int readChunkView(FilerClient filerClient, long startOffset, ByteBuffer buf, ChunkView chunkView, + FilerProto.Locations locations) throws IOException { byte[] chunkData = chunkCache.getChunk(chunkView.fileId); @@ -105,13 +112,15 @@ public class SeaweedRead { int len = (int) chunkView.size - (int) (startOffset - chunkView.logicOffset); LOG.debug("readChunkView fid:{} chunkData.length:{} chunkView.offset:{} chunkView[{};{}) startOffset:{}", - chunkView.fileId, chunkData.length, chunkView.offset, chunkView.logicOffset, chunkView.logicOffset + chunkView.size, startOffset); + chunkView.fileId, chunkData.length, chunkView.offset, chunkView.logicOffset, + chunkView.logicOffset + chunkView.size, startOffset); buf.put(chunkData, (int) (startOffset - chunkView.logicOffset + chunkView.offset), len); return len; } - public static byte[] doFetchFullChunkData(FilerClient filerClient, ChunkView chunkView, FilerProto.Locations locations) throws IOException { + public static byte[] doFetchFullChunkData(FilerClient filerClient, ChunkView chunkView, + FilerProto.Locations locations) throws IOException { byte[] data = null; IOException lastException = null; @@ -214,8 +223,7 @@ public class SeaweedRead { chunkStart, isFullChunk, chunk.cipherKey, - chunk.isCompressed - )); + chunk.isCompressed)); } } return views; @@ -239,7 +247,10 @@ public class SeaweedRead { } public static long fileSize(FilerProto.Entry entry) { - return Math.max(totalSize(entry.getChunksList()), entry.getAttributes().getFileSize()); + long chunksSize = totalSize(entry.getChunksList()); + long attrSize = entry.getAttributes().getFileSize(); + long finalSize = Math.max(chunksSize, attrSize); + return finalSize; } public static long totalSize(List<FilerProto.FileChunk> chunksList) { @@ -263,7 +274,8 @@ public class SeaweedRead { public final byte[] cipherKey; public final boolean isCompressed; - public VisibleInterval(long start, long stop, String fileId, long modifiedTime, long chunkOffset, boolean isFullChunk, byte[] cipherKey, boolean isCompressed) { + public VisibleInterval(long start, long stop, String fileId, long modifiedTime, long chunkOffset, + boolean isFullChunk, byte[] cipherKey, boolean isCompressed) { this.start = start; this.stop = stop; this.modifiedTime = modifiedTime; @@ -297,7 +309,8 @@ public class SeaweedRead { public final byte[] cipherKey; public final boolean isCompressed; - public ChunkView(String fileId, long offset, long size, long logicOffset, boolean isFullChunk, byte[] cipherKey, boolean isCompressed) { + public ChunkView(String fileId, long offset, long size, long logicOffset, boolean isFullChunk, byte[] cipherKey, + boolean isCompressed) { this.fileId = fileId; this.offset = offset; this.size = size; diff --git a/other/java/client/src/main/java/seaweedfs/client/SeaweedWrite.java b/other/java/client/src/main/java/seaweedfs/client/SeaweedWrite.java index 88c7cefbe..0fadd53cc 100644 --- a/other/java/client/src/main/java/seaweedfs/client/SeaweedWrite.java +++ b/other/java/client/src/main/java/seaweedfs/client/SeaweedWrite.java @@ -2,6 +2,7 @@ package seaweedfs.client; import com.google.common.base.Strings; import com.google.protobuf.ByteString; +import org.apache.http.HttpEntity; import org.apache.http.client.methods.CloseableHttpResponse; import org.apache.http.client.methods.HttpPost; import org.apache.http.entity.mime.HttpMultipartMode; @@ -25,13 +26,13 @@ public class SeaweedWrite { private static final SecureRandom random = new SecureRandom(); public static void writeData(FilerProto.Entry.Builder entry, - final String replication, - String collection, - final FilerClient filerClient, - final long offset, - final byte[] bytes, - final long bytesOffset, final long bytesLength, - final String path) throws IOException { + final String replication, + String collection, + final FilerClient filerClient, + final long offset, + final byte[] bytes, + final long bytesOffset, final long bytesLength, + final String path) throws IOException { IOException lastException = null; for (long waitTime = 1000L; waitTime < 10 * 1000; waitTime += waitTime / 2) { @@ -60,21 +61,50 @@ public class SeaweedWrite { } public static FilerProto.FileChunk.Builder writeChunk(final String replication, - final String collection, - final FilerClient filerClient, - final long offset, - final byte[] bytes, - final long bytesOffset, - final long bytesLength, - final String path) throws IOException { - FilerProto.AssignVolumeResponse response = filerClient.getBlockingStub().assignVolume( - FilerProto.AssignVolumeRequest.newBuilder() - .setCollection(Strings.isNullOrEmpty(collection) ? filerClient.getCollection() : collection) - .setReplication(Strings.isNullOrEmpty(replication) ? filerClient.getReplication() : replication) - .setDataCenter("") - .setTtlSec(0) - .setPath(path) - .build()); + final String collection, + final FilerClient filerClient, + final long offset, + final byte[] bytes, + final long bytesOffset, + final long bytesLength, + final String path) throws IOException { + + // Retry assignVolume call for transient network/server errors + FilerProto.AssignVolumeResponse response = null; + IOException lastException = null; + int maxRetries = 3; + + for (int attempt = 0; attempt < maxRetries; attempt++) { + try { + response = filerClient.getBlockingStub().assignVolume( + FilerProto.AssignVolumeRequest.newBuilder() + .setCollection( + Strings.isNullOrEmpty(collection) ? filerClient.getCollection() : collection) + .setReplication( + Strings.isNullOrEmpty(replication) ? filerClient.getReplication() : replication) + .setDataCenter("") + .setTtlSec(0) + .setPath(path) + .build()); + break; // Success, exit retry loop + } catch (io.grpc.StatusRuntimeException e) { + lastException = new IOException( + "assignVolume failed (attempt " + (attempt + 1) + "/" + maxRetries + "): " + e.getMessage(), e); + if (attempt < maxRetries - 1) { + try { + Thread.sleep(100 * (attempt + 1)); // Exponential backoff: 100ms, 200ms, 300ms + } catch (InterruptedException ie) { + Thread.currentThread().interrupt(); + throw new IOException("Interrupted during retry", ie); + } + } + } + } + + if (response == null) { + throw lastException != null ? lastException + : new IOException("assignVolume failed after " + maxRetries + " attempts"); + } if (!Strings.isNullOrEmpty(response.getError())) { throw new IOException(response.getError()); @@ -83,7 +113,8 @@ public class SeaweedWrite { String fileId = response.getFileId(); String auth = response.getAuth(); - String targetUrl = filerClient.getChunkUrl(fileId, response.getLocation().getUrl(), response.getLocation().getPublicUrl()); + String targetUrl = filerClient.getChunkUrl(fileId, response.getLocation().getUrl(), + response.getLocation().getPublicUrl()); ByteString cipherKeyString = com.google.protobuf.ByteString.EMPTY; byte[] cipherKey = null; @@ -94,8 +125,6 @@ public class SeaweedWrite { String etag = multipartUpload(targetUrl, auth, bytes, bytesOffset, bytesLength, cipherKey); - LOG.debug("write file chunk {} size {}", targetUrl, bytesLength); - return FilerProto.FileChunk.newBuilder() .setFileId(fileId) .setOffset(offset) @@ -106,27 +135,28 @@ public class SeaweedWrite { } public static void writeMeta(final FilerClient filerClient, - final String parentDirectory, - final FilerProto.Entry.Builder entry) throws IOException { + final String parentDirectory, + final FilerProto.Entry.Builder entry) throws IOException { synchronized (entry) { - List<FilerProto.FileChunk> chunks = FileChunkManifest.maybeManifestize(filerClient, entry.getChunksList(), parentDirectory); + List<FilerProto.FileChunk> chunks = FileChunkManifest.maybeManifestize(filerClient, entry.getChunksList(), + parentDirectory); + entry.clearChunks(); entry.addAllChunks(chunks); filerClient.getBlockingStub().createEntry( FilerProto.CreateEntryRequest.newBuilder() .setDirectory(parentDirectory) .setEntry(entry) - .build() - ); + .build()); } } private static String multipartUpload(String targetUrl, - String auth, - final byte[] bytes, - final long bytesOffset, final long bytesLength, - byte[] cipherKey) throws IOException { + String auth, + final byte[] bytes, + final long bytesOffset, final long bytesLength, + byte[] cipherKey) throws IOException { MessageDigest md = null; try { md = MessageDigest.getInstance("MD5"); @@ -162,8 +192,10 @@ public class SeaweedWrite { try { if (response.getStatusLine().getStatusCode() / 100 != 2) { - if (response.getEntity().getContentType() != null && response.getEntity().getContentType().getValue().equals("application/json")) { - throw new IOException(EntityUtils.toString(response.getEntity(), "UTF-8")); + HttpEntity entity = response.getEntity(); + if (entity != null && entity.getContentType() != null + && entity.getContentType().getValue().equals("application/json")) { + throw new IOException(EntityUtils.toString(entity, "UTF-8")); } else { throw new IOException(response.getStatusLine().getReasonPhrase()); } |
