aboutsummaryrefslogtreecommitdiff
path: root/other/java/client/src/main
diff options
context:
space:
mode:
Diffstat (limited to 'other/java/client/src/main')
-rw-r--r--other/java/client/src/main/java/seaweedfs/client/FilerGrpcClient.java93
-rw-r--r--other/java/client/src/main/java/seaweedfs/client/SeaweedInputStream.java33
-rw-r--r--other/java/client/src/main/java/seaweedfs/client/SeaweedOutputStream.java75
-rw-r--r--other/java/client/src/main/java/seaweedfs/client/SeaweedRead.java41
-rw-r--r--other/java/client/src/main/java/seaweedfs/client/SeaweedWrite.java104
5 files changed, 231 insertions, 115 deletions
diff --git a/other/java/client/src/main/java/seaweedfs/client/FilerGrpcClient.java b/other/java/client/src/main/java/seaweedfs/client/FilerGrpcClient.java
index 44977d186..320a754ea 100644
--- a/other/java/client/src/main/java/seaweedfs/client/FilerGrpcClient.java
+++ b/other/java/client/src/main/java/seaweedfs/client/FilerGrpcClient.java
@@ -8,10 +8,13 @@ import io.grpc.netty.shaded.io.netty.handler.ssl.SslContext;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import java.util.ArrayList;
import java.util.HashMap;
+import java.util.List;
import java.util.Map;
import java.util.Random;
import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
public class FilerGrpcClient {
@@ -29,10 +32,12 @@ public class FilerGrpcClient {
public final int VOLUME_SERVER_ACCESS_FILER_PROXY = 2;
public final Map<String, FilerProto.Locations> vidLocations = new HashMap<>();
protected int randomClientId;
- private final ManagedChannel channel;
- private final SeaweedFilerGrpc.SeaweedFilerBlockingStub blockingStub;
- private final SeaweedFilerGrpc.SeaweedFilerStub asyncStub;
- private final SeaweedFilerGrpc.SeaweedFilerFutureStub futureStub;
+
+ // Connection pool to handle concurrent requests
+ private static final int CHANNEL_POOL_SIZE = 4;
+ private final List<ManagedChannel> channelPool;
+ private final AtomicInteger channelIndex = new AtomicInteger(0);
+
private boolean cipher = false;
private String collection = "";
private String replication = "";
@@ -45,26 +50,18 @@ public class FilerGrpcClient {
public FilerGrpcClient(String host, int port, int grpcPort, String cn, SslContext sslContext) {
- this(sslContext == null ?
- ManagedChannelBuilder.forAddress(host, grpcPort)
- .usePlaintext()
- .maxInboundMessageSize(1024 * 1024 * 1024) :
- cn.isEmpty() ?
- NettyChannelBuilder.forAddress(host, grpcPort)
- .maxInboundMessageSize(1024 * 1024 * 1024)
- .negotiationType(NegotiationType.TLS)
- .sslContext(sslContext) :
- NettyChannelBuilder.forAddress(host, grpcPort)
- .maxInboundMessageSize(1024 * 1024 * 1024)
- .negotiationType(NegotiationType.TLS)
- .overrideAuthority(cn) //will not check hostname of the filer server
- .sslContext(sslContext)
- );
-
filerAddress = SeaweedUtil.joinHostPort(host, port);
- FilerProto.GetFilerConfigurationResponse filerConfigurationResponse =
- this.getBlockingStub().getFilerConfiguration(
+ // Create a pool of channels for better concurrency handling
+ channelPool = new ArrayList<>(CHANNEL_POOL_SIZE);
+
+ for (int i = 0; i < CHANNEL_POOL_SIZE; i++) {
+ channelPool.add(createChannelBuilder(host, grpcPort, sslContext, cn).build());
+ }
+
+ // Get filer configuration using first channel
+ FilerProto.GetFilerConfigurationResponse filerConfigurationResponse = SeaweedFilerGrpc
+ .newBlockingStub(channelPool.get(0)).getFilerConfiguration(
FilerProto.GetFilerConfigurationRequest.newBuilder().build());
cipher = filerConfigurationResponse.getCipher();
collection = filerConfigurationResponse.getCollection();
@@ -73,11 +70,39 @@ public class FilerGrpcClient {
}
- private FilerGrpcClient(ManagedChannelBuilder<?> channelBuilder) {
- channel = channelBuilder.build();
- blockingStub = SeaweedFilerGrpc.newBlockingStub(channel);
- asyncStub = SeaweedFilerGrpc.newStub(channel);
- futureStub = SeaweedFilerGrpc.newFutureStub(channel);
+ /**
+ * Creates a NettyChannelBuilder with common gRPC configuration.
+ * Supports plaintext and TLS modes with optional authority override.
+ */
+ private NettyChannelBuilder createChannelBuilder(String host, int grpcPort, SslContext sslContext, String cn) {
+ NettyChannelBuilder builder = NettyChannelBuilder.forAddress(host, grpcPort)
+ .maxInboundMessageSize(1024 * 1024 * 1024)
+ .maxInboundMetadataSize(1024 * 1024)
+ .flowControlWindow(16 * 1024 * 1024)
+ .initialFlowControlWindow(16 * 1024 * 1024)
+ .maxHeaderListSize(16 * 1024 * 1024)
+ .keepAliveTime(30, TimeUnit.SECONDS)
+ .keepAliveTimeout(10, TimeUnit.SECONDS)
+ .keepAliveWithoutCalls(true)
+ .withOption(io.grpc.netty.shaded.io.netty.channel.ChannelOption.SO_RCVBUF, 16 * 1024 * 1024)
+ .withOption(io.grpc.netty.shaded.io.netty.channel.ChannelOption.SO_SNDBUF, 16 * 1024 * 1024);
+
+ if (sslContext == null) {
+ builder.usePlaintext();
+ } else {
+ builder.negotiationType(NegotiationType.TLS).sslContext(sslContext);
+ if (!cn.isEmpty()) {
+ builder.overrideAuthority(cn);
+ }
+ }
+ return builder;
+ }
+
+ // Get a channel from the pool using round-robin
+ private ManagedChannel getChannel() {
+ int raw = channelIndex.getAndIncrement();
+ int index = Math.floorMod(raw, CHANNEL_POOL_SIZE);
+ return channelPool.get(index);
}
public boolean isCipher() {
@@ -93,19 +118,25 @@ public class FilerGrpcClient {
}
public void shutdown() throws InterruptedException {
- channel.shutdown().awaitTermination(5, TimeUnit.SECONDS);
+ for (ManagedChannel channel : channelPool) {
+ channel.shutdown();
+ }
+ for (ManagedChannel channel : channelPool) {
+ channel.awaitTermination(5, TimeUnit.SECONDS);
+ }
}
public SeaweedFilerGrpc.SeaweedFilerBlockingStub getBlockingStub() {
- return blockingStub;
+ // Return a new stub using a channel from the pool (round-robin)
+ return SeaweedFilerGrpc.newBlockingStub(getChannel());
}
public SeaweedFilerGrpc.SeaweedFilerStub getAsyncStub() {
- return asyncStub;
+ return SeaweedFilerGrpc.newStub(getChannel());
}
public SeaweedFilerGrpc.SeaweedFilerFutureStub getFutureStub() {
- return futureStub;
+ return SeaweedFilerGrpc.newFutureStub(getChannel());
}
public void setAccessVolumeServerDirectly() {
diff --git a/other/java/client/src/main/java/seaweedfs/client/SeaweedInputStream.java b/other/java/client/src/main/java/seaweedfs/client/SeaweedInputStream.java
index 64754321b..48a508db0 100644
--- a/other/java/client/src/main/java/seaweedfs/client/SeaweedInputStream.java
+++ b/other/java/client/src/main/java/seaweedfs/client/SeaweedInputStream.java
@@ -23,7 +23,7 @@ public class SeaweedInputStream extends InputStream {
private final long contentLength;
private FilerProto.Entry entry;
- private long position = 0; // cursor of the file
+ private long position = 0; // cursor of the file
private boolean closed = false;
@@ -44,7 +44,6 @@ public class SeaweedInputStream extends InputStream {
}
this.contentLength = SeaweedRead.fileSize(entry);
-
this.visibleIntervalList = SeaweedRead.nonOverlappingVisibleIntervals(filerClient, entry.getChunksList());
LOG.debug("new path:{} entry:{} visibleIntervalList:{}", path, entry, visibleIntervalList);
@@ -64,7 +63,6 @@ public class SeaweedInputStream extends InputStream {
}
this.contentLength = SeaweedRead.fileSize(entry);
-
this.visibleIntervalList = SeaweedRead.nonOverlappingVisibleIntervals(filerClient, entry.getChunksList());
LOG.debug("new path:{} entry:{} visibleIntervalList:{}", path, entry, visibleIntervalList);
@@ -99,7 +97,8 @@ public class SeaweedInputStream extends InputStream {
throw new IllegalArgumentException("requested read length is less than zero");
}
if (len > (b.length - off)) {
- throw new IllegalArgumentException("requested read length is more than will fit after requested offset in buffer");
+ throw new IllegalArgumentException(
+ "requested read length is more than will fit after requested offset in buffer");
}
ByteBuffer buf = ByteBuffer.wrap(b, off, len);
@@ -114,21 +113,30 @@ public class SeaweedInputStream extends InputStream {
throw new IllegalArgumentException("attempting to read from negative offset");
}
if (position >= contentLength) {
- return -1; // Hadoop prefers -1 to EOFException
+ return -1; // Hadoop prefers -1 to EOFException
}
long bytesRead = 0;
int len = buf.remaining();
- if (this.position< Integer.MAX_VALUE && (this.position + len )<= entry.getContent().size()) {
- entry.getContent().substring((int)this.position, (int)(this.position + len)).copyTo(buf);
+ if (this.position < Integer.MAX_VALUE && (this.position + len) <= entry.getContent().size()) {
+ entry.getContent().substring((int) this.position, (int) (this.position + len)).copyTo(buf);
+ bytesRead = len; // FIX: Update bytesRead after inline copy
} else {
- bytesRead = SeaweedRead.read(this.filerClient, this.visibleIntervalList, this.position, buf, SeaweedRead.fileSize(entry));
+ // Use the known contentLength instead of recomputing from the entry to avoid
+ // races
+ bytesRead = SeaweedRead.read(this.filerClient, this.visibleIntervalList, this.position, buf,
+ this.contentLength);
}
if (bytesRead > Integer.MAX_VALUE) {
throw new IOException("Unexpected Content-Length");
}
+ // Clamp premature EOFs: do not return -1 unless position >= contentLength
+ if (bytesRead < 0 && position < contentLength) {
+ bytesRead = 0;
+ }
+
if (bytesRead > 0) {
this.position += bytesRead;
}
@@ -188,12 +196,15 @@ public class SeaweedInputStream extends InputStream {
}
final long remaining = this.contentLength - this.position;
return remaining <= Integer.MAX_VALUE
- ? (int) remaining : Integer.MAX_VALUE;
+ ? (int) remaining
+ : Integer.MAX_VALUE;
}
/**
- * Returns the length of the file that this stream refers to. Note that the length returned is the length
- * as of the time the Stream was opened. Specifically, if there have been subsequent appends to the file,
+ * Returns the length of the file that this stream refers to. Note that the
+ * length returned is the length
+ * as of the time the Stream was opened. Specifically, if there have been
+ * subsequent appends to the file,
* they wont be reflected in the returned length.
*
* @return length of the file.
diff --git a/other/java/client/src/main/java/seaweedfs/client/SeaweedOutputStream.java b/other/java/client/src/main/java/seaweedfs/client/SeaweedOutputStream.java
index 68c281992..ea4c99805 100644
--- a/other/java/client/src/main/java/seaweedfs/client/SeaweedOutputStream.java
+++ b/other/java/client/src/main/java/seaweedfs/client/SeaweedOutputStream.java
@@ -23,15 +23,13 @@ public class SeaweedOutputStream extends OutputStream {
private final ThreadPoolExecutor threadExecutor;
private final ExecutorCompletionService<Void> completionService;
private final ConcurrentLinkedDeque<WriteOperation> writeOperations;
- private final boolean shouldSaveMetadata = false;
private FilerProto.Entry.Builder entry;
- private long position;
+ private long position; // Flushed bytes (committed to service)
private boolean closed;
private volatile IOException lastError;
private long lastFlushOffset;
private long lastTotalAppendOffset = 0;
private ByteBuffer buffer;
- private long outputIndex;
private String replication = "";
private String collection = "";
@@ -44,7 +42,8 @@ public class SeaweedOutputStream extends OutputStream {
}
public SeaweedOutputStream(FilerClient filerClient, final String path, FilerProto.Entry.Builder entry,
- final long position, final int bufferSize, final String replication) {
+ final long position, final int bufferSize, final String replication) {
+
this.filerClient = filerClient;
this.replication = replication;
this.path = path;
@@ -58,8 +57,7 @@ public class SeaweedOutputStream extends OutputStream {
this.maxConcurrentRequestCount = Runtime.getRuntime().availableProcessors();
- this.threadExecutor
- = new ThreadPoolExecutor(maxConcurrentRequestCount,
+ this.threadExecutor = new ThreadPoolExecutor(maxConcurrentRequestCount,
maxConcurrentRequestCount,
120L,
TimeUnit.SECONDS,
@@ -77,8 +75,7 @@ public class SeaweedOutputStream extends OutputStream {
.setFileMode(0755)
.setCrtime(now)
.setMtime(now)
- .clearGroupName()
- );
+ .clearGroupName());
}
}
@@ -86,14 +83,35 @@ public class SeaweedOutputStream extends OutputStream {
public void setReplication(String replication) {
this.replication = replication;
}
+
public void setCollection(String collection) {
this.collection = collection;
}
+ /**
+ * Get the current position in the output stream.
+ * This returns the total position including both flushed and buffered data.
+ *
+ * @return current position (flushed + buffered bytes)
+ */
+ public synchronized long getPos() throws IOException {
+ // Guard against NPE if called after close()
+ if (buffer == null) {
+ return position;
+ }
+
+ // Return current position (flushed + buffered)
+ return position + buffer.position();
+ }
+
public static String getParentDirectory(String path) {
int protoIndex = path.indexOf("://");
if (protoIndex >= 0) {
- int pathStart = path.indexOf("/", protoIndex+3);
+ int pathStart = path.indexOf("/", protoIndex + 3);
+ if (pathStart < 0) {
+ // No path segment; treat as root (e.g., "seaweedfs://host")
+ return "/";
+ }
path = path.substring(pathStart);
}
if (path.equals("/")) {
@@ -116,6 +134,13 @@ public class SeaweedOutputStream extends OutputStream {
private synchronized void flushWrittenBytesToServiceInternal(final long offset) throws IOException {
try {
+
+ // Set the file size in attributes based on our position
+ // This ensures Parquet footer metadata matches what we actually wrote
+ FilerProto.FuseAttributes.Builder attrBuilder = entry.getAttributes().toBuilder();
+ attrBuilder.setFileSize(offset);
+ entry.setAttributes(attrBuilder);
+
SeaweedWrite.writeMeta(filerClient, getParentDirectory(path), entry);
} catch (Exception ex) {
throw new IOException(ex);
@@ -125,7 +150,7 @@ public class SeaweedOutputStream extends OutputStream {
@Override
public void write(final int byteVal) throws IOException {
- write(new byte[]{(byte) (byteVal & 0xFF)});
+ write(new byte[] { (byte) (byteVal & 0xFF) });
}
@Override
@@ -141,8 +166,6 @@ public class SeaweedOutputStream extends OutputStream {
throw new IndexOutOfBoundsException();
}
- // System.out.println(path + " write [" + (outputIndex + off) + "," + ((outputIndex + off) + length) + ")");
-
int currentOffset = off;
int writableBytes = bufferSize - buffer.position();
int numberOfBytesToWrite = length;
@@ -154,9 +177,11 @@ public class SeaweedOutputStream extends OutputStream {
break;
}
- // System.out.println(path + " [" + (outputIndex + currentOffset) + "," + ((outputIndex + currentOffset) + writableBytes) + ") " + buffer.capacity());
+ // System.out.println(path + " [" + (outputIndex + currentOffset) + "," +
+ // ((outputIndex + currentOffset) + writableBytes) + ") " + buffer.capacity());
buffer.put(data, currentOffset, writableBytes);
currentOffset += writableBytes;
+
writeCurrentBufferToService();
numberOfBytesToWrite = numberOfBytesToWrite - writableBytes;
writableBytes = bufferSize - buffer.position();
@@ -191,7 +216,6 @@ public class SeaweedOutputStream extends OutputStream {
return;
}
- LOG.debug("close path: {}", path);
try {
flushInternal();
threadExecutor.shutdown();
@@ -209,28 +233,35 @@ public class SeaweedOutputStream extends OutputStream {
}
private synchronized void writeCurrentBufferToService() throws IOException {
- if (buffer.position() == 0) {
+ int bufferPos = buffer.position();
+
+ if (bufferPos == 0) {
return;
}
- position += submitWriteBufferToService(buffer, position);
+ int written = submitWriteBufferToService(buffer, position);
+ position += written;
buffer = ByteBufferPool.request(bufferSize);
}
- private synchronized int submitWriteBufferToService(final ByteBuffer bufferToWrite, final long writePosition) throws IOException {
+ private synchronized int submitWriteBufferToService(final ByteBuffer bufferToWrite, final long writePosition)
+ throws IOException {
- ((Buffer)bufferToWrite).flip();
+ ((Buffer) bufferToWrite).flip();
int bytesLength = bufferToWrite.limit() - bufferToWrite.position();
if (threadExecutor.getQueue().size() >= maxConcurrentRequestCount) {
waitForTaskToComplete();
}
final Future<Void> job = completionService.submit(() -> {
- // System.out.println(path + " is going to save [" + (writePosition) + "," + ((writePosition) + bytesLength) + ")");
- SeaweedWrite.writeData(entry, replication, collection, filerClient, writePosition, bufferToWrite.array(), bufferToWrite.position(), bufferToWrite.limit(), path);
- // System.out.println(path + " saved [" + (writePosition) + "," + ((writePosition) + bytesLength) + ")");
+ // System.out.println(path + " is going to save [" + (writePosition) + "," +
+ // ((writePosition) + bytesLength) + ")");
+ SeaweedWrite.writeData(entry, replication, collection, filerClient, writePosition, bufferToWrite.array(),
+ bufferToWrite.position(), bufferToWrite.limit(), path);
+ // System.out.println(path + " saved [" + (writePosition) + "," +
+ // ((writePosition) + bytesLength) + ")");
ByteBufferPool.release(bufferToWrite);
return null;
});
@@ -318,12 +349,10 @@ public class SeaweedOutputStream extends OutputStream {
private static class WriteOperation {
private final Future<Void> task;
- private final long startOffset;
private final long length;
WriteOperation(final Future<Void> task, final long startOffset, final long length) {
this.task = task;
- this.startOffset = startOffset;
this.length = length;
}
}
diff --git a/other/java/client/src/main/java/seaweedfs/client/SeaweedRead.java b/other/java/client/src/main/java/seaweedfs/client/SeaweedRead.java
index cac85d186..3fd184671 100644
--- a/other/java/client/src/main/java/seaweedfs/client/SeaweedRead.java
+++ b/other/java/client/src/main/java/seaweedfs/client/SeaweedRead.java
@@ -24,9 +24,10 @@ public class SeaweedRead {
// returns bytesRead
public static long read(FilerClient filerClient, List<VisibleInterval> visibleIntervals,
- final long position, final ByteBuffer buf, final long fileSize) throws IOException {
+ final long position, final ByteBuffer buf, final long fileSize) throws IOException {
- List<ChunkView> chunkViews = viewFromVisibles(visibleIntervals, position, buf.remaining());
+ int originalRemaining = buf.remaining();
+ List<ChunkView> chunkViews = viewFromVisibles(visibleIntervals, position, originalRemaining);
Map<String, FilerProto.Locations> knownLocations = new HashMap<>();
@@ -51,7 +52,7 @@ public class SeaweedRead {
}
}
- //TODO parallel this
+ // TODO parallel this
long readCount = 0;
long startOffset = position;
for (ChunkView chunkView : chunkViews) {
@@ -59,7 +60,7 @@ public class SeaweedRead {
if (startOffset < chunkView.logicOffset) {
long gap = chunkView.logicOffset - startOffset;
LOG.debug("zero [{},{})", startOffset, startOffset + gap);
- buf.position(buf.position()+ (int)gap);
+ buf.position(buf.position() + (int) gap);
readCount += gap;
startOffset += gap;
}
@@ -81,12 +82,17 @@ public class SeaweedRead {
}
- long limit = Math.min(buf.limit(), fileSize);
+ // Fix: Calculate the correct limit based on the read position and requested
+ // size,
+ // not the buffer's absolute limit. This fixes the 78-byte EOF error when
+ // seeking
+ // near the end of the file.
+ long limit = Math.min(position + originalRemaining, fileSize);
if (startOffset < limit) {
long gap = limit - startOffset;
LOG.debug("zero2 [{},{})", startOffset, startOffset + gap);
- buf.position(buf.position()+ (int)gap);
+ buf.position(buf.position() + (int) gap);
readCount += gap;
startOffset += gap;
}
@@ -94,7 +100,8 @@ public class SeaweedRead {
return readCount;
}
- private static int readChunkView(FilerClient filerClient, long startOffset, ByteBuffer buf, ChunkView chunkView, FilerProto.Locations locations) throws IOException {
+ private static int readChunkView(FilerClient filerClient, long startOffset, ByteBuffer buf, ChunkView chunkView,
+ FilerProto.Locations locations) throws IOException {
byte[] chunkData = chunkCache.getChunk(chunkView.fileId);
@@ -105,13 +112,15 @@ public class SeaweedRead {
int len = (int) chunkView.size - (int) (startOffset - chunkView.logicOffset);
LOG.debug("readChunkView fid:{} chunkData.length:{} chunkView.offset:{} chunkView[{};{}) startOffset:{}",
- chunkView.fileId, chunkData.length, chunkView.offset, chunkView.logicOffset, chunkView.logicOffset + chunkView.size, startOffset);
+ chunkView.fileId, chunkData.length, chunkView.offset, chunkView.logicOffset,
+ chunkView.logicOffset + chunkView.size, startOffset);
buf.put(chunkData, (int) (startOffset - chunkView.logicOffset + chunkView.offset), len);
return len;
}
- public static byte[] doFetchFullChunkData(FilerClient filerClient, ChunkView chunkView, FilerProto.Locations locations) throws IOException {
+ public static byte[] doFetchFullChunkData(FilerClient filerClient, ChunkView chunkView,
+ FilerProto.Locations locations) throws IOException {
byte[] data = null;
IOException lastException = null;
@@ -214,8 +223,7 @@ public class SeaweedRead {
chunkStart,
isFullChunk,
chunk.cipherKey,
- chunk.isCompressed
- ));
+ chunk.isCompressed));
}
}
return views;
@@ -239,7 +247,10 @@ public class SeaweedRead {
}
public static long fileSize(FilerProto.Entry entry) {
- return Math.max(totalSize(entry.getChunksList()), entry.getAttributes().getFileSize());
+ long chunksSize = totalSize(entry.getChunksList());
+ long attrSize = entry.getAttributes().getFileSize();
+ long finalSize = Math.max(chunksSize, attrSize);
+ return finalSize;
}
public static long totalSize(List<FilerProto.FileChunk> chunksList) {
@@ -263,7 +274,8 @@ public class SeaweedRead {
public final byte[] cipherKey;
public final boolean isCompressed;
- public VisibleInterval(long start, long stop, String fileId, long modifiedTime, long chunkOffset, boolean isFullChunk, byte[] cipherKey, boolean isCompressed) {
+ public VisibleInterval(long start, long stop, String fileId, long modifiedTime, long chunkOffset,
+ boolean isFullChunk, byte[] cipherKey, boolean isCompressed) {
this.start = start;
this.stop = stop;
this.modifiedTime = modifiedTime;
@@ -297,7 +309,8 @@ public class SeaweedRead {
public final byte[] cipherKey;
public final boolean isCompressed;
- public ChunkView(String fileId, long offset, long size, long logicOffset, boolean isFullChunk, byte[] cipherKey, boolean isCompressed) {
+ public ChunkView(String fileId, long offset, long size, long logicOffset, boolean isFullChunk, byte[] cipherKey,
+ boolean isCompressed) {
this.fileId = fileId;
this.offset = offset;
this.size = size;
diff --git a/other/java/client/src/main/java/seaweedfs/client/SeaweedWrite.java b/other/java/client/src/main/java/seaweedfs/client/SeaweedWrite.java
index 88c7cefbe..0fadd53cc 100644
--- a/other/java/client/src/main/java/seaweedfs/client/SeaweedWrite.java
+++ b/other/java/client/src/main/java/seaweedfs/client/SeaweedWrite.java
@@ -2,6 +2,7 @@ package seaweedfs.client;
import com.google.common.base.Strings;
import com.google.protobuf.ByteString;
+import org.apache.http.HttpEntity;
import org.apache.http.client.methods.CloseableHttpResponse;
import org.apache.http.client.methods.HttpPost;
import org.apache.http.entity.mime.HttpMultipartMode;
@@ -25,13 +26,13 @@ public class SeaweedWrite {
private static final SecureRandom random = new SecureRandom();
public static void writeData(FilerProto.Entry.Builder entry,
- final String replication,
- String collection,
- final FilerClient filerClient,
- final long offset,
- final byte[] bytes,
- final long bytesOffset, final long bytesLength,
- final String path) throws IOException {
+ final String replication,
+ String collection,
+ final FilerClient filerClient,
+ final long offset,
+ final byte[] bytes,
+ final long bytesOffset, final long bytesLength,
+ final String path) throws IOException {
IOException lastException = null;
for (long waitTime = 1000L; waitTime < 10 * 1000; waitTime += waitTime / 2) {
@@ -60,21 +61,50 @@ public class SeaweedWrite {
}
public static FilerProto.FileChunk.Builder writeChunk(final String replication,
- final String collection,
- final FilerClient filerClient,
- final long offset,
- final byte[] bytes,
- final long bytesOffset,
- final long bytesLength,
- final String path) throws IOException {
- FilerProto.AssignVolumeResponse response = filerClient.getBlockingStub().assignVolume(
- FilerProto.AssignVolumeRequest.newBuilder()
- .setCollection(Strings.isNullOrEmpty(collection) ? filerClient.getCollection() : collection)
- .setReplication(Strings.isNullOrEmpty(replication) ? filerClient.getReplication() : replication)
- .setDataCenter("")
- .setTtlSec(0)
- .setPath(path)
- .build());
+ final String collection,
+ final FilerClient filerClient,
+ final long offset,
+ final byte[] bytes,
+ final long bytesOffset,
+ final long bytesLength,
+ final String path) throws IOException {
+
+ // Retry assignVolume call for transient network/server errors
+ FilerProto.AssignVolumeResponse response = null;
+ IOException lastException = null;
+ int maxRetries = 3;
+
+ for (int attempt = 0; attempt < maxRetries; attempt++) {
+ try {
+ response = filerClient.getBlockingStub().assignVolume(
+ FilerProto.AssignVolumeRequest.newBuilder()
+ .setCollection(
+ Strings.isNullOrEmpty(collection) ? filerClient.getCollection() : collection)
+ .setReplication(
+ Strings.isNullOrEmpty(replication) ? filerClient.getReplication() : replication)
+ .setDataCenter("")
+ .setTtlSec(0)
+ .setPath(path)
+ .build());
+ break; // Success, exit retry loop
+ } catch (io.grpc.StatusRuntimeException e) {
+ lastException = new IOException(
+ "assignVolume failed (attempt " + (attempt + 1) + "/" + maxRetries + "): " + e.getMessage(), e);
+ if (attempt < maxRetries - 1) {
+ try {
+ Thread.sleep(100 * (attempt + 1)); // Exponential backoff: 100ms, 200ms, 300ms
+ } catch (InterruptedException ie) {
+ Thread.currentThread().interrupt();
+ throw new IOException("Interrupted during retry", ie);
+ }
+ }
+ }
+ }
+
+ if (response == null) {
+ throw lastException != null ? lastException
+ : new IOException("assignVolume failed after " + maxRetries + " attempts");
+ }
if (!Strings.isNullOrEmpty(response.getError())) {
throw new IOException(response.getError());
@@ -83,7 +113,8 @@ public class SeaweedWrite {
String fileId = response.getFileId();
String auth = response.getAuth();
- String targetUrl = filerClient.getChunkUrl(fileId, response.getLocation().getUrl(), response.getLocation().getPublicUrl());
+ String targetUrl = filerClient.getChunkUrl(fileId, response.getLocation().getUrl(),
+ response.getLocation().getPublicUrl());
ByteString cipherKeyString = com.google.protobuf.ByteString.EMPTY;
byte[] cipherKey = null;
@@ -94,8 +125,6 @@ public class SeaweedWrite {
String etag = multipartUpload(targetUrl, auth, bytes, bytesOffset, bytesLength, cipherKey);
- LOG.debug("write file chunk {} size {}", targetUrl, bytesLength);
-
return FilerProto.FileChunk.newBuilder()
.setFileId(fileId)
.setOffset(offset)
@@ -106,27 +135,28 @@ public class SeaweedWrite {
}
public static void writeMeta(final FilerClient filerClient,
- final String parentDirectory,
- final FilerProto.Entry.Builder entry) throws IOException {
+ final String parentDirectory,
+ final FilerProto.Entry.Builder entry) throws IOException {
synchronized (entry) {
- List<FilerProto.FileChunk> chunks = FileChunkManifest.maybeManifestize(filerClient, entry.getChunksList(), parentDirectory);
+ List<FilerProto.FileChunk> chunks = FileChunkManifest.maybeManifestize(filerClient, entry.getChunksList(),
+ parentDirectory);
+
entry.clearChunks();
entry.addAllChunks(chunks);
filerClient.getBlockingStub().createEntry(
FilerProto.CreateEntryRequest.newBuilder()
.setDirectory(parentDirectory)
.setEntry(entry)
- .build()
- );
+ .build());
}
}
private static String multipartUpload(String targetUrl,
- String auth,
- final byte[] bytes,
- final long bytesOffset, final long bytesLength,
- byte[] cipherKey) throws IOException {
+ String auth,
+ final byte[] bytes,
+ final long bytesOffset, final long bytesLength,
+ byte[] cipherKey) throws IOException {
MessageDigest md = null;
try {
md = MessageDigest.getInstance("MD5");
@@ -162,8 +192,10 @@ public class SeaweedWrite {
try {
if (response.getStatusLine().getStatusCode() / 100 != 2) {
- if (response.getEntity().getContentType() != null && response.getEntity().getContentType().getValue().equals("application/json")) {
- throw new IOException(EntityUtils.toString(response.getEntity(), "UTF-8"));
+ HttpEntity entity = response.getEntity();
+ if (entity != null && entity.getContentType() != null
+ && entity.getContentType().getValue().equals("application/json")) {
+ throw new IOException(EntityUtils.toString(entity, "UTF-8"));
} else {
throw new IOException(response.getStatusLine().getReasonPhrase());
}