diff options
| author | shibinbin <shibinbin@megvii.com> | 2020-10-28 11:36:42 +0800 |
|---|---|---|
| committer | shibinbin <shibinbin@megvii.com> | 2020-10-28 11:36:42 +0800 |
| commit | 7cc07655d493d11c967cfa978ddc5181d4b6b861 (patch) | |
| tree | 5ae5bcf7ccc3cee3c55372674753d7c1ca48dff9 /other/java | |
| parent | 29a4c3944eeb07434060df52dfb1d3cf4c59dc91 (diff) | |
| parent | 53c3aad87528d57343afc5fdb3fb5107544af0fc (diff) | |
| download | seaweedfs-7cc07655d493d11c967cfa978ddc5181d4b6b861.tar.xz seaweedfs-7cc07655d493d11c967cfa978ddc5181d4b6b861.zip | |
Merge remote-tracking branch 'upstream/master'
Diffstat (limited to 'other/java')
35 files changed, 1406 insertions, 1864 deletions
diff --git a/other/java/client/pom.xml b/other/java/client/pom.xml index a8b561251..efbf304c4 100644 --- a/other/java/client/pom.xml +++ b/other/java/client/pom.xml @@ -5,7 +5,7 @@ <groupId>com.github.chrislusf</groupId> <artifactId>seaweedfs-client</artifactId> - <version>1.2.8</version> + <version>1.5.2</version> <parent> <groupId>org.sonatype.oss</groupId> @@ -65,7 +65,7 @@ <dependency> <groupId>junit</groupId> <artifactId>junit</artifactId> - <version>4.12</version> + <version>4.13.1</version> <scope>test</scope> </dependency> </dependencies> diff --git a/other/java/client/pom.xml.deploy b/other/java/client/pom.xml.deploy new file mode 100644 index 000000000..9efc21373 --- /dev/null +++ b/other/java/client/pom.xml.deploy @@ -0,0 +1,170 @@ +<?xml version="1.0" encoding="UTF-8"?> +<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" + xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> + <modelVersion>4.0.0</modelVersion> + + <groupId>com.github.chrislusf</groupId> + <artifactId>seaweedfs-client</artifactId> + <version>1.5.2</version> + + <parent> + <groupId>org.sonatype.oss</groupId> + <artifactId>oss-parent</artifactId> + <version>9</version> + </parent> + + <properties> + <protobuf.version>3.9.1</protobuf.version> + <!-- follow https://github.com/grpc/grpc-java --> + <grpc.version>1.23.0</grpc.version> + <guava.version>28.0-jre</guava.version> + </properties> + + <dependencies> + <dependency> + <groupId>com.moandjiezana.toml</groupId> + <artifactId>toml4j</artifactId> + <version>0.7.2</version> + </dependency> + <!-- https://mvnrepository.com/artifact/com.google.protobuf/protobuf-java --> + <dependency> + <groupId>com.google.protobuf</groupId> + <artifactId>protobuf-java</artifactId> + <version>${protobuf.version}</version> + </dependency> + <dependency> + <groupId>com.google.guava</groupId> + <artifactId>guava</artifactId> + <version>${guava.version}</version> + </dependency> + <dependency> + <groupId>io.grpc</groupId> + <artifactId>grpc-netty-shaded</artifactId> + <version>${grpc.version}</version> + </dependency> + <dependency> + <groupId>io.grpc</groupId> + <artifactId>grpc-protobuf</artifactId> + <version>${grpc.version}</version> + </dependency> + <dependency> + <groupId>io.grpc</groupId> + <artifactId>grpc-stub</artifactId> + <version>${grpc.version}</version> + </dependency> + <dependency> + <groupId>org.slf4j</groupId> + <artifactId>slf4j-api</artifactId> + <version>1.7.25</version> + </dependency> + <dependency> + <groupId>org.apache.httpcomponents</groupId> + <artifactId>httpmime</artifactId> + <version>4.5.6</version> + </dependency> + <dependency> + <groupId>junit</groupId> + <artifactId>junit</artifactId> + <version>4.12</version> + <scope>test</scope> + </dependency> + </dependencies> + + <distributionManagement> + <snapshotRepository> + <id>ossrh</id> + <url>https://oss.sonatype.org/content/repositories/snapshots</url> + </snapshotRepository> + </distributionManagement> + <build> + <extensions> + <extension> + <groupId>kr.motd.maven</groupId> + <artifactId>os-maven-plugin</artifactId> + <version>1.6.2</version> + </extension> + </extensions> + <plugins> + <plugin> + <groupId>org.apache.maven.plugins</groupId> + <artifactId>maven-compiler-plugin</artifactId> + <configuration> + <source>8</source> + <target>8</target> + </configuration> + </plugin> + <plugin> + <groupId>org.xolstice.maven.plugins</groupId> + <artifactId>protobuf-maven-plugin</artifactId> + <version>0.6.1</version> + <configuration> + <protocArtifact>com.google.protobuf:protoc:${protobuf.version}:exe:${os.detected.classifier} + </protocArtifact> + <pluginId>grpc-java</pluginId> + <pluginArtifact>io.grpc:protoc-gen-grpc-java:${grpc.version}:exe:${os.detected.classifier} + </pluginArtifact> + </configuration> + <executions> + <execution> + <goals> + <goal>compile</goal> + <goal>compile-custom</goal> + </goals> + </execution> + </executions> + </plugin> + <plugin> + <groupId>org.apache.maven.plugins</groupId> + <artifactId>maven-gpg-plugin</artifactId> + <version>1.5</version> + <executions> + <execution> + <id>sign-artifacts</id> + <phase>verify</phase> + <goals> + <goal>sign</goal> + </goals> + </execution> + </executions> + </plugin> + <plugin> + <groupId>org.sonatype.plugins</groupId> + <artifactId>nexus-staging-maven-plugin</artifactId> + <version>1.6.7</version> + <extensions>true</extensions> + <configuration> + <serverId>ossrh</serverId> + <nexusUrl>https://oss.sonatype.org/</nexusUrl> + <autoReleaseAfterClose>true</autoReleaseAfterClose> + </configuration> + </plugin> + <plugin> + <groupId>org.apache.maven.plugins</groupId> + <artifactId>maven-source-plugin</artifactId> + <version>2.2.1</version> + <executions> + <execution> + <id>attach-sources</id> + <goals> + <goal>jar-no-fork</goal> + </goals> + </execution> + </executions> + </plugin> + <plugin> + <groupId>org.apache.maven.plugins</groupId> + <artifactId>maven-javadoc-plugin</artifactId> + <version>2.9.1</version> + <executions> + <execution> + <id>attach-javadocs</id> + <goals> + <goal>jar</goal> + </goals> + </execution> + </executions> + </plugin> + </plugins> + </build> + +</project> diff --git a/other/java/client/pom_debug.xml b/other/java/client/pom_debug.xml index 88447f7e7..04ff52730 100644 --- a/other/java/client/pom_debug.xml +++ b/other/java/client/pom_debug.xml @@ -5,7 +5,7 @@ <groupId>com.github.chrislusf</groupId> <artifactId>seaweedfs-client</artifactId> - <version>1.2.8</version> + <version>1.5.2</version> <parent> <groupId>org.sonatype.oss</groupId> @@ -65,7 +65,7 @@ <dependency> <groupId>junit</groupId> <artifactId>junit</artifactId> - <version>4.12</version> + <version>4.13.1</version> <scope>test</scope> </dependency> </dependencies> diff --git a/other/java/client/src/main/java/seaweedfs/client/ByteBufferPool.java b/other/java/client/src/main/java/seaweedfs/client/ByteBufferPool.java new file mode 100644 index 000000000..994bcaa2b --- /dev/null +++ b/other/java/client/src/main/java/seaweedfs/client/ByteBufferPool.java @@ -0,0 +1,41 @@ +package seaweedfs.client; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.nio.ByteBuffer; +import java.util.ArrayList; +import java.util.List; + +public class ByteBufferPool { + + private static final int MIN_BUFFER_SIZE = 8 * 1024 * 1024; + private static final Logger LOG = LoggerFactory.getLogger(ByteBufferPool.class); + + private static final List<ByteBuffer> bufferList = new ArrayList<>(); + + public static synchronized ByteBuffer request(int bufferSize) { + if (bufferSize < MIN_BUFFER_SIZE) { + bufferSize = MIN_BUFFER_SIZE; + } + LOG.debug("requested new buffer {}", bufferSize); + if (bufferList.isEmpty()) { + return ByteBuffer.allocate(bufferSize); + } + ByteBuffer buffer = bufferList.remove(bufferList.size() - 1); + if (buffer.capacity() >= bufferSize) { + return buffer; + } + + LOG.info("add new buffer from {} to {}", buffer.capacity(), bufferSize); + bufferList.add(0, buffer); + return ByteBuffer.allocate(bufferSize); + + } + + public static synchronized void release(ByteBuffer obj) { + obj.clear(); + bufferList.add(0, obj); + } + +} diff --git a/other/java/client/src/main/java/seaweedfs/client/ChunkCache.java b/other/java/client/src/main/java/seaweedfs/client/ChunkCache.java index e249d4524..7afa2dca0 100644 --- a/other/java/client/src/main/java/seaweedfs/client/ChunkCache.java +++ b/other/java/client/src/main/java/seaweedfs/client/ChunkCache.java @@ -7,20 +7,30 @@ import java.util.concurrent.TimeUnit; public class ChunkCache { - private final Cache<String, byte[]> cache; + private Cache<String, byte[]> cache = null; public ChunkCache(int maxEntries) { + if (maxEntries == 0) { + return; + } this.cache = CacheBuilder.newBuilder() .maximumSize(maxEntries) + .weakValues() .expireAfterAccess(1, TimeUnit.HOURS) .build(); } public byte[] getChunk(String fileId) { + if (this.cache == null) { + return null; + } return this.cache.getIfPresent(fileId); } public void setChunk(String fileId, byte[] data) { + if (this.cache == null) { + return; + } this.cache.put(fileId, data); } diff --git a/other/java/client/src/main/java/seaweedfs/client/FileChunkManifest.java b/other/java/client/src/main/java/seaweedfs/client/FileChunkManifest.java new file mode 100644 index 000000000..3293db2ca --- /dev/null +++ b/other/java/client/src/main/java/seaweedfs/client/FileChunkManifest.java @@ -0,0 +1,140 @@ +package seaweedfs.client; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; + +public class FileChunkManifest { + + private static final Logger LOG = LoggerFactory.getLogger(FileChunkManifest.class); + + private static final int mergeFactor = 1000; + + public static boolean hasChunkManifest(List<FilerProto.FileChunk> chunks) { + for (FilerProto.FileChunk chunk : chunks) { + if (chunk.getIsChunkManifest()) { + return true; + } + } + return false; + } + + public static List<FilerProto.FileChunk> resolveChunkManifest( + final FilerGrpcClient filerGrpcClient, List<FilerProto.FileChunk> chunks) throws IOException { + + List<FilerProto.FileChunk> dataChunks = new ArrayList<>(); + + for (FilerProto.FileChunk chunk : chunks) { + if (!chunk.getIsChunkManifest()) { + dataChunks.add(chunk); + continue; + } + + // IsChunkManifest + LOG.debug("fetching chunk manifest:{}", chunk); + byte[] data = fetchChunk(filerGrpcClient, chunk); + FilerProto.FileChunkManifest m = FilerProto.FileChunkManifest.newBuilder().mergeFrom(data).build(); + List<FilerProto.FileChunk> resolvedChunks = new ArrayList<>(); + for (FilerProto.FileChunk t : m.getChunksList()) { + // avoid deprecated chunk.getFileId() + resolvedChunks.add(t.toBuilder().setFileId(FilerClient.toFileId(t.getFid())).build()); + } + dataChunks.addAll(resolveChunkManifest(filerGrpcClient, resolvedChunks)); + } + + return dataChunks; + } + + private static byte[] fetchChunk(final FilerGrpcClient filerGrpcClient, FilerProto.FileChunk chunk) throws IOException { + + String vid = "" + chunk.getFid().getVolumeId(); + FilerProto.Locations locations = filerGrpcClient.vidLocations.get(vid); + if (locations == null) { + FilerProto.LookupVolumeRequest.Builder lookupRequest = FilerProto.LookupVolumeRequest.newBuilder(); + lookupRequest.addVolumeIds(vid); + FilerProto.LookupVolumeResponse lookupResponse = filerGrpcClient + .getBlockingStub().lookupVolume(lookupRequest.build()); + locations = lookupResponse.getLocationsMapMap().get(vid); + filerGrpcClient.vidLocations.put(vid, locations); + LOG.debug("fetchChunk vid:{} locations:{}", vid, locations); + } + + SeaweedRead.ChunkView chunkView = new SeaweedRead.ChunkView( + FilerClient.toFileId(chunk.getFid()), // avoid deprecated chunk.getFileId() + 0, + -1, + 0, + true, + chunk.getCipherKey().toByteArray(), + chunk.getIsCompressed()); + + byte[] chunkData = SeaweedRead.chunkCache.getChunk(chunkView.fileId); + if (chunkData == null) { + LOG.debug("doFetchFullChunkData:{}", chunkView); + chunkData = SeaweedRead.doFetchFullChunkData(chunkView, locations); + } + if (chunk.getIsChunkManifest()){ + LOG.debug("chunk {} size {}", chunkView.fileId, chunkData.length); + SeaweedRead.chunkCache.setChunk(chunkView.fileId, chunkData); + } + + return chunkData; + + } + + public static List<FilerProto.FileChunk> maybeManifestize( + final FilerGrpcClient filerGrpcClient, List<FilerProto.FileChunk> inputChunks, String parentDirectory) throws IOException { + // the return variable + List<FilerProto.FileChunk> chunks = new ArrayList<>(); + + List<FilerProto.FileChunk> dataChunks = new ArrayList<>(); + for (FilerProto.FileChunk chunk : inputChunks) { + if (!chunk.getIsChunkManifest()) { + dataChunks.add(chunk); + } else { + chunks.add(chunk); + } + } + + int remaining = dataChunks.size(); + for (int i = 0; i + mergeFactor < dataChunks.size(); i += mergeFactor) { + FilerProto.FileChunk chunk = mergeIntoManifest(filerGrpcClient, dataChunks.subList(i, i + mergeFactor), parentDirectory); + chunks.add(chunk); + remaining -= mergeFactor; + } + + // remaining + for (int i = dataChunks.size() - remaining; i < dataChunks.size(); i++) { + chunks.add(dataChunks.get(i)); + } + return chunks; + } + + private static FilerProto.FileChunk mergeIntoManifest(final FilerGrpcClient filerGrpcClient, List<FilerProto.FileChunk> dataChunks, String parentDirectory) throws IOException { + // create and serialize the manifest + dataChunks = FilerClient.beforeEntrySerialization(dataChunks); + FilerProto.FileChunkManifest.Builder m = FilerProto.FileChunkManifest.newBuilder().addAllChunks(dataChunks); + byte[] data = m.build().toByteArray(); + + long minOffset = Long.MAX_VALUE; + long maxOffset = -1; + for (FilerProto.FileChunk chunk : dataChunks) { + minOffset = Math.min(minOffset, chunk.getOffset()); + maxOffset = Math.max(maxOffset, chunk.getSize() + chunk.getOffset()); + } + + FilerProto.FileChunk.Builder manifestChunk = SeaweedWrite.writeChunk( + filerGrpcClient.getReplication(), + filerGrpcClient, + minOffset, + data, 0, data.length, parentDirectory); + manifestChunk.setIsChunkManifest(true); + manifestChunk.setSize(maxOffset - minOffset); + return manifestChunk.build(); + + } + +} diff --git a/other/java/client/src/main/java/seaweedfs/client/FilerClient.java b/other/java/client/src/main/java/seaweedfs/client/FilerClient.java index ef32c7e9a..035b2c852 100644 --- a/other/java/client/src/main/java/seaweedfs/client/FilerClient.java +++ b/other/java/client/src/main/java/seaweedfs/client/FilerClient.java @@ -1,5 +1,6 @@ package seaweedfs.client; +import com.google.common.base.Strings; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -24,6 +25,67 @@ public class FilerClient { this.filerGrpcClient = filerGrpcClient; } + public static String toFileId(FilerProto.FileId fid) { + if (fid == null) { + return null; + } + return String.format("%d,%x%08x", fid.getVolumeId(), fid.getFileKey(), fid.getCookie()); + } + + public static FilerProto.FileId toFileIdObject(String fileIdStr) { + if (fileIdStr == null || fileIdStr.length() == 0) { + return null; + } + int commaIndex = fileIdStr.lastIndexOf(','); + String volumeIdStr = fileIdStr.substring(0, commaIndex); + String fileKeyStr = fileIdStr.substring(commaIndex + 1, fileIdStr.length() - 8); + String cookieStr = fileIdStr.substring(fileIdStr.length() - 8); + + return FilerProto.FileId.newBuilder() + .setVolumeId(Integer.parseInt(volumeIdStr)) + .setFileKey(Long.parseLong(fileKeyStr, 16)) + .setCookie((int) Long.parseLong(cookieStr, 16)) + .build(); + } + + public static List<FilerProto.FileChunk> beforeEntrySerialization(List<FilerProto.FileChunk> chunks) { + List<FilerProto.FileChunk> cleanedChunks = new ArrayList<>(); + for (FilerProto.FileChunk chunk : chunks) { + FilerProto.FileChunk.Builder chunkBuilder = chunk.toBuilder(); + chunkBuilder.clearFileId(); + chunkBuilder.clearSourceFileId(); + chunkBuilder.setFid(toFileIdObject(chunk.getFileId())); + FilerProto.FileId sourceFid = toFileIdObject(chunk.getSourceFileId()); + if (sourceFid != null) { + chunkBuilder.setSourceFid(sourceFid); + } + cleanedChunks.add(chunkBuilder.build()); + } + return cleanedChunks; + } + + public static FilerProto.Entry afterEntryDeserialization(FilerProto.Entry entry) { + if (entry.getChunksList().size() <= 0) { + return entry; + } + String fileId = entry.getChunks(0).getFileId(); + if (fileId != null && fileId.length() != 0) { + return entry; + } + FilerProto.Entry.Builder entryBuilder = entry.toBuilder(); + entryBuilder.clearChunks(); + for (FilerProto.FileChunk chunk : entry.getChunksList()) { + FilerProto.FileChunk.Builder chunkBuilder = chunk.toBuilder(); + chunkBuilder.setFileId(toFileId(chunk.getFid())); + String sourceFileId = toFileId(chunk.getSourceFid()); + if (sourceFileId != null) { + chunkBuilder.setSourceFileId(sourceFileId); + } + entryBuilder.addChunks(chunkBuilder); + } + return entryBuilder.build(); + } + public boolean mkdirs(String path, int mode) { String currentUser = System.getProperty("user.name"); return mkdirs(path, mode, 0, 0, currentUser, new String[]{}); @@ -156,7 +218,7 @@ public class FilerClient { List<FilerProto.Entry> results = new ArrayList<FilerProto.Entry>(); String lastFileName = ""; for (int limit = Integer.MAX_VALUE; limit > 0; ) { - List<FilerProto.Entry> t = listEntries(path, "", lastFileName, 1024); + List<FilerProto.Entry> t = listEntries(path, "", lastFileName, 1024, false); if (t == null) { break; } @@ -173,17 +235,18 @@ public class FilerClient { return results; } - public List<FilerProto.Entry> listEntries(String path, String entryPrefix, String lastEntryName, int limit) { + public List<FilerProto.Entry> listEntries(String path, String entryPrefix, String lastEntryName, int limit, boolean includeLastEntry) { Iterator<FilerProto.ListEntriesResponse> iter = filerGrpcClient.getBlockingStub().listEntries(FilerProto.ListEntriesRequest.newBuilder() .setDirectory(path) .setPrefix(entryPrefix) .setStartFromFileName(lastEntryName) + .setInclusiveStartFrom(includeLastEntry) .setLimit(limit) .build()); List<FilerProto.Entry> entries = new ArrayList<>(); while (iter.hasNext()) { FilerProto.ListEntriesResponse resp = iter.next(); - entries.add(fixEntryAfterReading(resp.getEntry())); + entries.add(afterEntryDeserialization(resp.getEntry())); } return entries; } @@ -198,7 +261,7 @@ public class FilerClient { if (entry == null) { return null; } - return fixEntryAfterReading(entry); + return afterEntryDeserialization(entry); } catch (Exception e) { if (e.getMessage().indexOf("filer: no entry is found in filer store") > 0) { return null; @@ -208,18 +271,22 @@ public class FilerClient { } } - public boolean createEntry(String parent, FilerProto.Entry entry) { try { - filerGrpcClient.getBlockingStub().createEntry(FilerProto.CreateEntryRequest.newBuilder() + FilerProto.CreateEntryResponse createEntryResponse = + filerGrpcClient.getBlockingStub().createEntry(FilerProto.CreateEntryRequest.newBuilder() .setDirectory(parent) .setEntry(entry) .build()); + if (Strings.isNullOrEmpty(createEntryResponse.getError())) { + return true; + } + LOG.warn("createEntry {}/{} error: {}", parent, entry.getName(), createEntryResponse.getError()); + return false; } catch (Exception e) { LOG.warn("createEntry {}/{}: {}", parent, entry.getName(), e); return false; } - return true; } public boolean updateEntry(String parent, FilerProto.Entry entry) { @@ -229,7 +296,7 @@ public class FilerClient { .setEntry(entry) .build()); } catch (Exception e) { - LOG.warn("createEntry {}/{}: {}", parent, entry.getName(), e); + LOG.warn("updateEntry {}/{}: {}", parent, entry.getName(), e); return false; } return true; @@ -266,24 +333,4 @@ public class FilerClient { return true; } - private FilerProto.Entry fixEntryAfterReading(FilerProto.Entry entry) { - if (entry.getChunksList().size() <= 0) { - return entry; - } - String fileId = entry.getChunks(0).getFileId(); - if (fileId != null && fileId.length() != 0) { - return entry; - } - FilerProto.Entry.Builder entryBuilder = entry.toBuilder(); - entryBuilder.clearChunks(); - for (FilerProto.FileChunk chunk : entry.getChunksList()) { - FilerProto.FileChunk.Builder chunkBuilder = chunk.toBuilder(); - FilerProto.FileId fid = chunk.getFid(); - fileId = String.format("%d,%d%x", fid.getVolumeId(), fid.getFileKey(), fid.getCookie()); - chunkBuilder.setFileId(fileId); - entryBuilder.addChunks(chunkBuilder); - } - return entryBuilder.build(); - } - } diff --git a/other/java/client/src/main/java/seaweedfs/client/FilerGrpcClient.java b/other/java/client/src/main/java/seaweedfs/client/FilerGrpcClient.java index 3f5d1e8e9..1a719f3c0 100644 --- a/other/java/client/src/main/java/seaweedfs/client/FilerGrpcClient.java +++ b/other/java/client/src/main/java/seaweedfs/client/FilerGrpcClient.java @@ -9,6 +9,8 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import javax.net.ssl.SSLException; +import java.util.Map; +import java.util.HashMap; import java.util.concurrent.TimeUnit; public class FilerGrpcClient { @@ -24,6 +26,7 @@ public class FilerGrpcClient { } } + public final Map<String, FilerProto.Locations> vidLocations = new HashMap<>(); private final ManagedChannel channel; private final SeaweedFilerGrpc.SeaweedFilerBlockingStub blockingStub; private final SeaweedFilerGrpc.SeaweedFilerStub asyncStub; @@ -39,8 +42,10 @@ public class FilerGrpcClient { public FilerGrpcClient(String host, int grpcPort, SslContext sslContext) { this(sslContext == null ? - ManagedChannelBuilder.forAddress(host, grpcPort).usePlaintext() : + ManagedChannelBuilder.forAddress(host, grpcPort).usePlaintext() + .maxInboundMessageSize(1024 * 1024 * 1024) : NettyChannelBuilder.forAddress(host, grpcPort) + .maxInboundMessageSize(1024 * 1024 * 1024) .negotiationType(NegotiationType.TLS) .sslContext(sslContext)); diff --git a/other/java/client/src/main/java/seaweedfs/client/Gzip.java b/other/java/client/src/main/java/seaweedfs/client/Gzip.java index 248285dd3..4909094f5 100644 --- a/other/java/client/src/main/java/seaweedfs/client/Gzip.java +++ b/other/java/client/src/main/java/seaweedfs/client/Gzip.java @@ -18,14 +18,18 @@ public class Gzip { return compressed; } - public static byte[] decompress(byte[] compressed) throws IOException { - ByteArrayInputStream bis = new ByteArrayInputStream(compressed); - GZIPInputStream gis = new GZIPInputStream(bis); - return readAll(gis); + public static byte[] decompress(byte[] compressed) { + try { + ByteArrayInputStream bis = new ByteArrayInputStream(compressed); + GZIPInputStream gis = new GZIPInputStream(bis); + return readAll(gis); + } catch (Exception e) { + return compressed; + } } private static byte[] readAll(InputStream input) throws IOException { - try( ByteArrayOutputStream output = new ByteArrayOutputStream()){ + try (ByteArrayOutputStream output = new ByteArrayOutputStream()) { byte[] buffer = new byte[4096]; int n; while (-1 != (n = input.read(buffer))) { diff --git a/other/java/client/src/main/java/seaweedfs/client/SeaweedRead.java b/other/java/client/src/main/java/seaweedfs/client/SeaweedRead.java index 7be39da53..ab2407dec 100644 --- a/other/java/client/src/main/java/seaweedfs/client/SeaweedRead.java +++ b/other/java/client/src/main/java/seaweedfs/client/SeaweedRead.java @@ -1,16 +1,16 @@ package seaweedfs.client; +import org.apache.http.Header; +import org.apache.http.HeaderElement; import org.apache.http.HttpEntity; import org.apache.http.HttpHeaders; -import org.apache.http.HttpResponse; -import org.apache.http.client.HttpClient; +import org.apache.http.client.entity.GzipDecompressingEntity; +import org.apache.http.client.methods.CloseableHttpResponse; import org.apache.http.client.methods.HttpGet; -import org.apache.http.impl.client.DefaultHttpClient; import org.apache.http.util.EntityUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.io.Closeable; import java.io.IOException; import java.util.*; @@ -18,12 +18,12 @@ public class SeaweedRead { private static final Logger LOG = LoggerFactory.getLogger(SeaweedRead.class); - static ChunkCache chunkCache = new ChunkCache(1000); + static ChunkCache chunkCache = new ChunkCache(4); // returns bytesRead public static long read(FilerGrpcClient filerGrpcClient, List<VisibleInterval> visibleIntervals, final long position, final byte[] buffer, final int bufferOffset, - final int bufferLength) throws IOException { + final int bufferLength, final long fileSize) throws IOException { List<ChunkView> chunkViews = viewFromVisibles(visibleIntervals, position, bufferLength); @@ -40,67 +40,128 @@ public class SeaweedRead { //TODO parallel this long readCount = 0; - int startOffset = bufferOffset; + long startOffset = position; for (ChunkView chunkView : chunkViews) { + + if (startOffset < chunkView.logicOffset) { + long gap = chunkView.logicOffset - startOffset; + LOG.debug("zero [{},{})", startOffset, startOffset + gap); + readCount += gap; + startOffset += gap; + } + FilerProto.Locations locations = vid2Locations.get(parseVolumeId(chunkView.fileId)); - if (locations.getLocationsCount() == 0) { + if (locations == null || locations.getLocationsCount() == 0) { + LOG.error("failed to locate {}", chunkView.fileId); // log here! return 0; } - int len = readChunkView(position, buffer, startOffset, chunkView, locations); + int len = readChunkView(startOffset, buffer, bufferOffset + readCount, chunkView, locations); + + LOG.debug("read [{},{}) {} size {}", startOffset, startOffset + len, chunkView.fileId, chunkView.size); readCount += len; startOffset += len; } + long limit = Math.min(bufferOffset + bufferLength, fileSize); + + if (startOffset < limit) { + long gap = limit - startOffset; + LOG.debug("zero2 [{},{})", startOffset, startOffset + gap); + readCount += gap; + startOffset += gap; + } + return readCount; } - private static int readChunkView(long position, byte[] buffer, int startOffset, ChunkView chunkView, FilerProto.Locations locations) throws IOException { + private static int readChunkView(long startOffset, byte[] buffer, long bufOffset, ChunkView chunkView, FilerProto.Locations locations) throws IOException { byte[] chunkData = chunkCache.getChunk(chunkView.fileId); if (chunkData == null) { chunkData = doFetchFullChunkData(chunkView, locations); + chunkCache.setChunk(chunkView.fileId, chunkData); } int len = (int) chunkView.size; - LOG.debug("readChunkView fid:{} chunkData.length:{} chunkView.offset:{} buffer.length:{} startOffset:{} len:{}", - chunkView.fileId, chunkData.length, chunkView.offset, buffer.length, startOffset, len); - System.arraycopy(chunkData, (int) chunkView.offset, buffer, startOffset, len); - - chunkCache.setChunk(chunkView.fileId, chunkData); + LOG.debug("readChunkView fid:{} chunkData.length:{} chunkView.offset:{} chunkView[{};{}) buf[{},{})/{} startOffset:{}", + chunkView.fileId, chunkData.length, chunkView.offset, chunkView.logicOffset, chunkView.logicOffset + chunkView.size, bufOffset, bufOffset + len, buffer.length, startOffset); + System.arraycopy(chunkData, (int) (startOffset - chunkView.logicOffset + chunkView.offset), buffer, (int) bufOffset, len); return len; } - private static byte[] doFetchFullChunkData(ChunkView chunkView, FilerProto.Locations locations) throws IOException { + public static byte[] doFetchFullChunkData(ChunkView chunkView, FilerProto.Locations locations) throws IOException { + + byte[] data = null; + IOException lastException = null; + for (long waitTime = 1000L; waitTime < 10 * 1000; waitTime += waitTime / 2) { + for (FilerProto.Location location : locations.getLocationsList()) { + String url = String.format("http://%s/%s", location.getUrl(), chunkView.fileId); + try { + data = doFetchOneFullChunkData(chunkView, url); + lastException = null; + break; + } catch (IOException ioe) { + LOG.debug("doFetchFullChunkData {} :{}", url, ioe); + lastException = ioe; + } + } + if (data != null) { + break; + } + try { + Thread.sleep(waitTime); + } catch (InterruptedException e) { + } + } + + if (lastException != null) { + throw lastException; + } + + LOG.debug("doFetchFullChunkData fid:{} chunkData.length:{}", chunkView.fileId, data.length); + + return data; + + } + + public static byte[] doFetchOneFullChunkData(ChunkView chunkView, String url) throws IOException { - HttpClient client = new DefaultHttpClient(); - HttpGet request = new HttpGet( - String.format("http://%s/%s", locations.getLocations(0).getUrl(), chunkView.fileId)); + HttpGet request = new HttpGet(url); - request.setHeader(HttpHeaders.ACCEPT_ENCODING, ""); + request.setHeader(HttpHeaders.ACCEPT_ENCODING, "gzip"); byte[] data = null; + CloseableHttpResponse response = SeaweedUtil.getClosableHttpClient().execute(request); + try { - HttpResponse response = client.execute(request); HttpEntity entity = response.getEntity(); - data = EntityUtils.toByteArray(entity); + Header contentEncodingHeader = entity.getContentEncoding(); - } finally { - if (client instanceof Closeable) { - Closeable t = (Closeable) client; - t.close(); + if (contentEncodingHeader != null) { + HeaderElement[] encodings = contentEncodingHeader.getElements(); + for (int i = 0; i < encodings.length; i++) { + if (encodings[i].getName().equalsIgnoreCase("gzip")) { + entity = new GzipDecompressingEntity(entity); + break; + } + } } - } - if (chunkView.isGzipped) { - data = Gzip.decompress(data); + data = EntityUtils.toByteArray(entity); + + EntityUtils.consume(entity); + + } finally { + response.close(); + request.releaseConnection(); } if (chunkView.cipherKey != null && chunkView.cipherKey.length != 0) { @@ -111,6 +172,12 @@ public class SeaweedRead { } } + if (chunkView.isCompressed) { + data = Gzip.decompress(data); + } + + LOG.debug("doFetchOneFullChunkData url:{} chunkData.length:{}", url, data.length); + return data; } @@ -120,29 +187,40 @@ public class SeaweedRead { long stop = offset + size; for (VisibleInterval chunk : visibleIntervals) { - if (chunk.start <= offset && offset < chunk.stop && offset < stop) { + long chunkStart = Math.max(offset, chunk.start); + long chunkStop = Math.min(stop, chunk.stop); + if (chunkStart < chunkStop) { boolean isFullChunk = chunk.isFullChunk && chunk.start == offset && chunk.stop <= stop; views.add(new ChunkView( chunk.fileId, - offset - chunk.start, - Math.min(chunk.stop, stop) - offset, - offset, + chunkStart - chunk.start + chunk.chunkOffset, + chunkStop - chunkStart, + chunkStart, isFullChunk, chunk.cipherKey, - chunk.isGzipped + chunk.isCompressed )); - offset = Math.min(chunk.stop, stop); } } return views; } - public static List<VisibleInterval> nonOverlappingVisibleIntervals(List<FilerProto.FileChunk> chunkList) { + public static List<VisibleInterval> nonOverlappingVisibleIntervals( + final FilerGrpcClient filerGrpcClient, List<FilerProto.FileChunk> chunkList) throws IOException { + + chunkList = FileChunkManifest.resolveChunkManifest(filerGrpcClient, chunkList); + FilerProto.FileChunk[] chunks = chunkList.toArray(new FilerProto.FileChunk[0]); Arrays.sort(chunks, new Comparator<FilerProto.FileChunk>() { @Override public int compare(FilerProto.FileChunk a, FilerProto.FileChunk b) { - return (int) (a.getMtime() - b.getMtime()); + // if just a.getMtime() - b.getMtime(), it will overflow! + if (a.getMtime() < b.getMtime()) { + return -1; + } else if (a.getMtime() > b.getMtime()) { + return 1; + } + return 0; } }); @@ -163,9 +241,10 @@ public class SeaweedRead { chunk.getOffset() + chunk.getSize(), chunk.getFileId(), chunk.getMtime(), + 0, true, chunk.getCipherKey().toByteArray(), - chunk.getIsGzipped() + chunk.getIsCompressed() ); // easy cases to speed up @@ -185,9 +264,10 @@ public class SeaweedRead { chunk.getOffset(), v.fileId, v.modifiedTime, + v.chunkOffset, false, v.cipherKey, - v.isGzipped + v.isCompressed )); } long chunkStop = chunk.getOffset() + chunk.getSize(); @@ -197,9 +277,10 @@ public class SeaweedRead { v.stop, v.fileId, v.modifiedTime, + v.chunkOffset + (chunkStop - v.start), false, v.cipherKey, - v.isGzipped + v.isCompressed )); } if (chunkStop <= v.start || v.stop <= chunk.getOffset()) { @@ -229,6 +310,10 @@ public class SeaweedRead { return fileId; } + public static long fileSize(FilerProto.Entry entry) { + return Math.max(totalSize(entry.getChunksList()), entry.getAttributes().getFileSize()); + } + public static long totalSize(List<FilerProto.FileChunk> chunksList) { long size = 0; for (FilerProto.FileChunk chunk : chunksList) { @@ -245,18 +330,20 @@ public class SeaweedRead { public final long stop; public final long modifiedTime; public final String fileId; + public final long chunkOffset; public final boolean isFullChunk; public final byte[] cipherKey; - public final boolean isGzipped; + public final boolean isCompressed; - public VisibleInterval(long start, long stop, String fileId, long modifiedTime, boolean isFullChunk, byte[] cipherKey, boolean isGzipped) { + public VisibleInterval(long start, long stop, String fileId, long modifiedTime, long chunkOffset, boolean isFullChunk, byte[] cipherKey, boolean isCompressed) { this.start = start; this.stop = stop; this.modifiedTime = modifiedTime; this.fileId = fileId; + this.chunkOffset = chunkOffset; this.isFullChunk = isFullChunk; this.cipherKey = cipherKey; - this.isGzipped = isGzipped; + this.isCompressed = isCompressed; } @Override @@ -268,7 +355,7 @@ public class SeaweedRead { ", fileId='" + fileId + '\'' + ", isFullChunk=" + isFullChunk + ", cipherKey=" + Arrays.toString(cipherKey) + - ", isGzipped=" + isGzipped + + ", isCompressed=" + isCompressed + '}'; } } @@ -280,16 +367,16 @@ public class SeaweedRead { public final long logicOffset; public final boolean isFullChunk; public final byte[] cipherKey; - public final boolean isGzipped; + public final boolean isCompressed; - public ChunkView(String fileId, long offset, long size, long logicOffset, boolean isFullChunk, byte[] cipherKey, boolean isGzipped) { + public ChunkView(String fileId, long offset, long size, long logicOffset, boolean isFullChunk, byte[] cipherKey, boolean isCompressed) { this.fileId = fileId; this.offset = offset; this.size = size; this.logicOffset = logicOffset; this.isFullChunk = isFullChunk; this.cipherKey = cipherKey; - this.isGzipped = isGzipped; + this.isCompressed = isCompressed; } @Override @@ -301,7 +388,7 @@ public class SeaweedRead { ", logicOffset=" + logicOffset + ", isFullChunk=" + isFullChunk + ", cipherKey=" + Arrays.toString(cipherKey) + - ", isGzipped=" + isGzipped + + ", isCompressed=" + isCompressed + '}'; } } diff --git a/other/java/client/src/main/java/seaweedfs/client/SeaweedUtil.java b/other/java/client/src/main/java/seaweedfs/client/SeaweedUtil.java new file mode 100644 index 000000000..c465d935f --- /dev/null +++ b/other/java/client/src/main/java/seaweedfs/client/SeaweedUtil.java @@ -0,0 +1,30 @@ +package seaweedfs.client; + +import org.apache.http.impl.DefaultConnectionReuseStrategy; +import org.apache.http.impl.client.CloseableHttpClient; +import org.apache.http.impl.client.DefaultConnectionKeepAliveStrategy; +import org.apache.http.impl.client.HttpClientBuilder; +import org.apache.http.impl.conn.PoolingHttpClientConnectionManager; + +public class SeaweedUtil { + + static PoolingHttpClientConnectionManager cm = new PoolingHttpClientConnectionManager(); + static CloseableHttpClient httpClient; + + static { + // Increase max total connection to 200 + cm.setMaxTotal(200); + // Increase default max connection per route to 20 + cm.setDefaultMaxPerRoute(20); + + httpClient = HttpClientBuilder.create() + .setConnectionManager(cm) + .setConnectionReuseStrategy(DefaultConnectionReuseStrategy.INSTANCE) + .setKeepAliveStrategy(DefaultConnectionKeepAliveStrategy.INSTANCE) + .build(); + } + + public static CloseableHttpClient getClosableHttpClient() { + return httpClient; + } +} diff --git a/other/java/client/src/main/java/seaweedfs/client/SeaweedWrite.java b/other/java/client/src/main/java/seaweedfs/client/SeaweedWrite.java index dc6203e52..b8fd3e299 100644 --- a/other/java/client/src/main/java/seaweedfs/client/SeaweedWrite.java +++ b/other/java/client/src/main/java/seaweedfs/client/SeaweedWrite.java @@ -1,35 +1,54 @@ package seaweedfs.client; import com.google.protobuf.ByteString; -import org.apache.http.HttpResponse; -import org.apache.http.client.HttpClient; +import org.apache.http.client.methods.CloseableHttpResponse; import org.apache.http.client.methods.HttpPost; import org.apache.http.entity.mime.HttpMultipartMode; import org.apache.http.entity.mime.MultipartEntityBuilder; -import org.apache.http.impl.client.DefaultHttpClient; +import org.apache.http.util.EntityUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import java.io.ByteArrayInputStream; -import java.io.Closeable; import java.io.IOException; import java.io.InputStream; import java.security.SecureRandom; +import java.util.List; public class SeaweedWrite { - private static SecureRandom random = new SecureRandom(); + private static final Logger LOG = LoggerFactory.getLogger(SeaweedWrite.class); + + private static final SecureRandom random = new SecureRandom(); public static void writeData(FilerProto.Entry.Builder entry, final String replication, final FilerGrpcClient filerGrpcClient, final long offset, final byte[] bytes, - final long bytesOffset, final long bytesLength) throws IOException { + final long bytesOffset, final long bytesLength, + final String path) throws IOException { + FilerProto.FileChunk.Builder chunkBuilder = writeChunk( + replication, filerGrpcClient, offset, bytes, bytesOffset, bytesLength, path); + synchronized (entry) { + entry.addChunks(chunkBuilder); + } + } + + public static FilerProto.FileChunk.Builder writeChunk(final String replication, + final FilerGrpcClient filerGrpcClient, + final long offset, + final byte[] bytes, + final long bytesOffset, + final long bytesLength, + final String path) throws IOException { FilerProto.AssignVolumeResponse response = filerGrpcClient.getBlockingStub().assignVolume( FilerProto.AssignVolumeRequest.newBuilder() .setCollection(filerGrpcClient.getCollection()) .setReplication(replication == null ? filerGrpcClient.getReplication() : replication) .setDataCenter("") .setTtlSec(0) + .setPath(path) .build()); String fileId = response.getFileId(); String url = response.getUrl(); @@ -45,28 +64,32 @@ public class SeaweedWrite { String etag = multipartUpload(targetUrl, auth, bytes, bytesOffset, bytesLength, cipherKey); - // cache fileId ~ bytes - SeaweedRead.chunkCache.setChunk(fileId, bytes); + LOG.debug("write file chunk {} size {}", targetUrl, bytesLength); - entry.addChunks(FilerProto.FileChunk.newBuilder() + return FilerProto.FileChunk.newBuilder() .setFileId(fileId) .setOffset(offset) .setSize(bytesLength) .setMtime(System.currentTimeMillis() / 10000L) .setETag(etag) - .setCipherKey(cipherKeyString) - ); - + .setCipherKey(cipherKeyString); } public static void writeMeta(final FilerGrpcClient filerGrpcClient, - final String parentDirectory, final FilerProto.Entry.Builder entry) { - filerGrpcClient.getBlockingStub().createEntry( - FilerProto.CreateEntryRequest.newBuilder() - .setDirectory(parentDirectory) - .setEntry(entry) - .build() - ); + final String parentDirectory, + final FilerProto.Entry.Builder entry) throws IOException { + + synchronized (entry) { + List<FilerProto.FileChunk> chunks = FileChunkManifest.maybeManifestize(filerGrpcClient, entry.getChunksList(), parentDirectory); + entry.clearChunks(); + entry.addAllChunks(chunks); + filerGrpcClient.getBlockingStub().createEntry( + FilerProto.CreateEntryRequest.newBuilder() + .setDirectory(parentDirectory) + .setEntry(entry) + .build() + ); + } } private static String multipartUpload(String targetUrl, @@ -75,8 +98,6 @@ public class SeaweedWrite { final long bytesOffset, final long bytesLength, byte[] cipherKey) throws IOException { - HttpClient client = new DefaultHttpClient(); - InputStream inputStream = null; if (cipherKey == null || cipherKey.length == 0) { inputStream = new ByteArrayInputStream(bytes, (int) bytesOffset, (int) bytesLength); @@ -99,8 +120,9 @@ public class SeaweedWrite { .addBinaryBody("upload", inputStream) .build()); + CloseableHttpResponse response = SeaweedUtil.getClosableHttpClient().execute(post); + try { - HttpResponse response = client.execute(post); String etag = response.getLastHeader("ETag").getValue(); @@ -108,12 +130,12 @@ public class SeaweedWrite { etag = etag.substring(1, etag.length() - 1); } + EntityUtils.consume(response.getEntity()); + return etag; } finally { - if (client instanceof Closeable) { - Closeable t = (Closeable) client; - t.close(); - } + response.close(); + post.releaseConnection(); } } diff --git a/other/java/client/src/main/proto/filer.proto b/other/java/client/src/main/proto/filer.proto index 1fc8ef63d..11c29e6ec 100644 --- a/other/java/client/src/main/proto/filer.proto +++ b/other/java/client/src/main/proto/filer.proto @@ -2,6 +2,7 @@ syntax = "proto3"; package filer_pb; +option go_package = "github.com/chrislusf/seaweedfs/weed/pb/filer_pb"; option java_package = "seaweedfs.client"; option java_outer_classname = "FilerProto"; @@ -36,6 +37,9 @@ service SeaweedFiler { rpc LookupVolume (LookupVolumeRequest) returns (LookupVolumeResponse) { } + rpc CollectionList (CollectionListRequest) returns (CollectionListResponse) { + } + rpc DeleteCollection (DeleteCollectionRequest) returns (DeleteCollectionResponse) { } @@ -48,12 +52,21 @@ service SeaweedFiler { rpc SubscribeMetadata (SubscribeMetadataRequest) returns (stream SubscribeMetadataResponse) { } + rpc SubscribeLocalMetadata (SubscribeMetadataRequest) returns (stream SubscribeMetadataResponse) { + } + rpc KeepConnected (stream KeepConnectedRequest) returns (stream KeepConnectedResponse) { } rpc LocateBroker (LocateBrokerRequest) returns (LocateBrokerResponse) { } + rpc KvGet (KvGetRequest) returns (KvGetResponse) { + } + + rpc KvPut (KvPutRequest) returns (KvPutResponse) { + } + } ////////////////////////////////////////////////// @@ -85,6 +98,8 @@ message Entry { repeated FileChunk chunks = 3; FuseAttributes attributes = 4; map<string, bytes> extended = 5; + bytes hard_link_id = 7; + int32 hard_link_counter = 8; // only exists in hard link meta data } message FullEntry { @@ -97,6 +112,8 @@ message EventNotification { Entry new_entry = 2; bool delete_chunks = 3; string new_parent_path = 4; + bool is_from_other_cluster = 5; + repeated int32 signatures = 6; } message FileChunk { @@ -109,7 +126,12 @@ message FileChunk { FileId fid = 7; FileId source_fid = 8; bytes cipher_key = 9; - bool is_gzipped = 10; + bool is_compressed = 10; + bool is_chunk_manifest = 11; // content is a list of FileChunks +} + +message FileChunkManifest { + repeated FileChunk chunks = 1; } message FileId { @@ -139,6 +161,8 @@ message CreateEntryRequest { string directory = 1; Entry entry = 2; bool o_excl = 3; + bool is_from_other_cluster = 4; + repeated int32 signatures = 5; } message CreateEntryResponse { @@ -148,6 +172,8 @@ message CreateEntryResponse { message UpdateEntryRequest { string directory = 1; Entry entry = 2; + bool is_from_other_cluster = 3; + repeated int32 signatures = 4; } message UpdateEntryResponse { } @@ -167,6 +193,8 @@ message DeleteEntryRequest { bool is_delete_data = 4; bool is_recursive = 5; bool ignore_recursive_error = 6; + bool is_from_other_cluster = 7; + repeated int32 signatures = 8; } message DeleteEntryResponse { @@ -189,7 +217,8 @@ message AssignVolumeRequest { string replication = 3; int32 ttl_sec = 4; string data_center = 5; - string parent_path = 6; + string path = 6; + string rack = 7; } message AssignVolumeResponse { @@ -219,6 +248,16 @@ message LookupVolumeResponse { map<string, Locations> locations_map = 1; } +message Collection { + string name = 1; +} +message CollectionListRequest { + bool include_normal_volumes = 1; + bool include_ec_volumes = 2; +} +message CollectionListResponse { + repeated Collection collections = 1; +} message DeleteCollectionRequest { string collection = 1; } @@ -249,12 +288,16 @@ message GetFilerConfigurationResponse { uint32 max_mb = 4; string dir_buckets = 5; bool cipher = 7; + int32 signature = 8; + string metrics_address = 9; + int32 metrics_interval_sec = 10; } message SubscribeMetadataRequest { string client_name = 1; string path_prefix = 2; int64 since_ns = 3; + int32 signature = 4; } message SubscribeMetadataResponse { string directory = 1; @@ -289,3 +332,19 @@ message LocateBrokerResponse { } repeated Resource resources = 2; } + +// Key-Value operations +message KvGetRequest { + bytes key = 1; +} +message KvGetResponse { + bytes value = 1; + string error = 2; +} +message KvPutRequest { + bytes key = 1; + bytes value = 2; +} +message KvPutResponse { + string error = 1; +} diff --git a/other/java/client/src/test/java/seaweedfs/client/SeaweedReadTest.java b/other/java/client/src/test/java/seaweedfs/client/SeaweedReadTest.java index ccfcdb117..44b833c90 100644 --- a/other/java/client/src/test/java/seaweedfs/client/SeaweedReadTest.java +++ b/other/java/client/src/test/java/seaweedfs/client/SeaweedReadTest.java @@ -3,13 +3,14 @@ package seaweedfs.client; import org.junit.Assert; import org.junit.Test; +import java.io.IOException; import java.util.ArrayList; import java.util.List; public class SeaweedReadTest { @Test - public void testNonOverlappingVisibleIntervals() { + public void testNonOverlappingVisibleIntervals() throws IOException { List<FilerProto.FileChunk> chunks = new ArrayList<>(); chunks.add(FilerProto.FileChunk.newBuilder() .setFileId("aaa") @@ -24,7 +25,7 @@ public class SeaweedReadTest { .setMtime(2000) .build()); - List<SeaweedRead.VisibleInterval> visibleIntervals = SeaweedRead.nonOverlappingVisibleIntervals(chunks); + List<SeaweedRead.VisibleInterval> visibleIntervals = SeaweedRead.nonOverlappingVisibleIntervals(null, chunks); for (SeaweedRead.VisibleInterval visibleInterval : visibleIntervals) { System.out.println("visible:" + visibleInterval); } diff --git a/other/java/hdfs2/dependency-reduced-pom.xml b/other/java/hdfs2/dependency-reduced-pom.xml index bef448f3f..229fa673c 100644 --- a/other/java/hdfs2/dependency-reduced-pom.xml +++ b/other/java/hdfs2/dependency-reduced-pom.xml @@ -15,8 +15,8 @@ <plugin>
<artifactId>maven-compiler-plugin</artifactId>
<configuration>
- <source>7</source>
- <target>7</target>
+ <source>8</source>
+ <target>8</target>
</configuration>
</plugin>
<plugin>
@@ -120,6 +120,180 @@ </plugin>
</plugins>
</build>
+ <dependencies>
+ <dependency>
+ <groupId>org.apache.hadoop</groupId>
+ <artifactId>hadoop-client</artifactId>
+ <version>2.9.2</version>
+ <scope>provided</scope>
+ <exclusions>
+ <exclusion>
+ <artifactId>hadoop-hdfs-client</artifactId>
+ <groupId>org.apache.hadoop</groupId>
+ </exclusion>
+ <exclusion>
+ <artifactId>hadoop-mapreduce-client-app</artifactId>
+ <groupId>org.apache.hadoop</groupId>
+ </exclusion>
+ <exclusion>
+ <artifactId>hadoop-yarn-api</artifactId>
+ <groupId>org.apache.hadoop</groupId>
+ </exclusion>
+ <exclusion>
+ <artifactId>hadoop-mapreduce-client-core</artifactId>
+ <groupId>org.apache.hadoop</groupId>
+ </exclusion>
+ <exclusion>
+ <artifactId>hadoop-mapreduce-client-jobclient</artifactId>
+ <groupId>org.apache.hadoop</groupId>
+ </exclusion>
+ <exclusion>
+ <artifactId>hadoop-annotations</artifactId>
+ <groupId>org.apache.hadoop</groupId>
+ </exclusion>
+ </exclusions>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.hadoop</groupId>
+ <artifactId>hadoop-common</artifactId>
+ <version>2.9.2</version>
+ <scope>provided</scope>
+ <exclusions>
+ <exclusion>
+ <artifactId>commons-cli</artifactId>
+ <groupId>commons-cli</groupId>
+ </exclusion>
+ <exclusion>
+ <artifactId>commons-math3</artifactId>
+ <groupId>org.apache.commons</groupId>
+ </exclusion>
+ <exclusion>
+ <artifactId>xmlenc</artifactId>
+ <groupId>xmlenc</groupId>
+ </exclusion>
+ <exclusion>
+ <artifactId>commons-io</artifactId>
+ <groupId>commons-io</groupId>
+ </exclusion>
+ <exclusion>
+ <artifactId>commons-net</artifactId>
+ <groupId>commons-net</groupId>
+ </exclusion>
+ <exclusion>
+ <artifactId>commons-collections</artifactId>
+ <groupId>commons-collections</groupId>
+ </exclusion>
+ <exclusion>
+ <artifactId>servlet-api</artifactId>
+ <groupId>javax.servlet</groupId>
+ </exclusion>
+ <exclusion>
+ <artifactId>jetty</artifactId>
+ <groupId>org.mortbay.jetty</groupId>
+ </exclusion>
+ <exclusion>
+ <artifactId>jetty-util</artifactId>
+ <groupId>org.mortbay.jetty</groupId>
+ </exclusion>
+ <exclusion>
+ <artifactId>jetty-sslengine</artifactId>
+ <groupId>org.mortbay.jetty</groupId>
+ </exclusion>
+ <exclusion>
+ <artifactId>jsp-api</artifactId>
+ <groupId>javax.servlet.jsp</groupId>
+ </exclusion>
+ <exclusion>
+ <artifactId>jersey-core</artifactId>
+ <groupId>com.sun.jersey</groupId>
+ </exclusion>
+ <exclusion>
+ <artifactId>jersey-json</artifactId>
+ <groupId>com.sun.jersey</groupId>
+ </exclusion>
+ <exclusion>
+ <artifactId>jersey-server</artifactId>
+ <groupId>com.sun.jersey</groupId>
+ </exclusion>
+ <exclusion>
+ <artifactId>log4j</artifactId>
+ <groupId>log4j</groupId>
+ </exclusion>
+ <exclusion>
+ <artifactId>jets3t</artifactId>
+ <groupId>net.java.dev.jets3t</groupId>
+ </exclusion>
+ <exclusion>
+ <artifactId>commons-lang</artifactId>
+ <groupId>commons-lang</groupId>
+ </exclusion>
+ <exclusion>
+ <artifactId>commons-configuration</artifactId>
+ <groupId>commons-configuration</groupId>
+ </exclusion>
+ <exclusion>
+ <artifactId>commons-lang3</artifactId>
+ <groupId>org.apache.commons</groupId>
+ </exclusion>
+ <exclusion>
+ <artifactId>slf4j-log4j12</artifactId>
+ <groupId>org.slf4j</groupId>
+ </exclusion>
+ <exclusion>
+ <artifactId>jackson-core-asl</artifactId>
+ <groupId>org.codehaus.jackson</groupId>
+ </exclusion>
+ <exclusion>
+ <artifactId>jackson-mapper-asl</artifactId>
+ <groupId>org.codehaus.jackson</groupId>
+ </exclusion>
+ <exclusion>
+ <artifactId>avro</artifactId>
+ <groupId>org.apache.avro</groupId>
+ </exclusion>
+ <exclusion>
+ <artifactId>hadoop-auth</artifactId>
+ <groupId>org.apache.hadoop</groupId>
+ </exclusion>
+ <exclusion>
+ <artifactId>jsch</artifactId>
+ <groupId>com.jcraft</groupId>
+ </exclusion>
+ <exclusion>
+ <artifactId>curator-client</artifactId>
+ <groupId>org.apache.curator</groupId>
+ </exclusion>
+ <exclusion>
+ <artifactId>curator-recipes</artifactId>
+ <groupId>org.apache.curator</groupId>
+ </exclusion>
+ <exclusion>
+ <artifactId>htrace-core4</artifactId>
+ <groupId>org.apache.htrace</groupId>
+ </exclusion>
+ <exclusion>
+ <artifactId>zookeeper</artifactId>
+ <groupId>org.apache.zookeeper</groupId>
+ </exclusion>
+ <exclusion>
+ <artifactId>commons-compress</artifactId>
+ <groupId>org.apache.commons</groupId>
+ </exclusion>
+ <exclusion>
+ <artifactId>stax2-api</artifactId>
+ <groupId>org.codehaus.woodstox</groupId>
+ </exclusion>
+ <exclusion>
+ <artifactId>woodstox-core</artifactId>
+ <groupId>com.fasterxml.woodstox</groupId>
+ </exclusion>
+ <exclusion>
+ <artifactId>hadoop-annotations</artifactId>
+ <groupId>org.apache.hadoop</groupId>
+ </exclusion>
+ </exclusions>
+ </dependency>
+ </dependencies>
<distributionManagement>
<snapshotRepository>
<id>ossrh</id>
@@ -127,7 +301,7 @@ </snapshotRepository>
</distributionManagement>
<properties>
- <seaweedfs.client.version>1.2.8</seaweedfs.client.version>
+ <seaweedfs.client.version>1.5.2</seaweedfs.client.version>
<hadoop.version>2.9.2</hadoop.version>
</properties>
</project>
diff --git a/other/java/hdfs2/pom.xml b/other/java/hdfs2/pom.xml index f3086fab8..1b73b2811 100644 --- a/other/java/hdfs2/pom.xml +++ b/other/java/hdfs2/pom.xml @@ -5,7 +5,7 @@ <modelVersion>4.0.0</modelVersion> <properties> - <seaweedfs.client.version>1.2.8</seaweedfs.client.version> + <seaweedfs.client.version>1.5.2</seaweedfs.client.version> <hadoop.version>2.9.2</hadoop.version> </properties> @@ -31,8 +31,8 @@ <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-compiler-plugin</artifactId> <configuration> - <source>7</source> - <target>7</target> + <source>8</source> + <target>8</target> </configuration> </plugin> <plugin> @@ -147,6 +147,7 @@ <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-client</artifactId> <version>${hadoop.version}</version> + <scope>provided</scope> </dependency> <dependency> <groupId>com.github.chrislusf</groupId> @@ -157,6 +158,7 @@ <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-common</artifactId> <version>${hadoop.version}</version> + <scope>provided</scope> </dependency> </dependencies> diff --git a/other/java/hdfs2/src/main/java/seaweed/hdfs/ReadBuffer.java b/other/java/hdfs2/src/main/java/seaweed/hdfs/ReadBuffer.java deleted file mode 100644 index 926d0b83b..000000000 --- a/other/java/hdfs2/src/main/java/seaweed/hdfs/ReadBuffer.java +++ /dev/null @@ -1,137 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package seaweed.hdfs; - -import java.util.concurrent.CountDownLatch; - -class ReadBuffer { - - private SeaweedInputStream stream; - private long offset; // offset within the file for the buffer - private int length; // actual length, set after the buffer is filles - private int requestedLength; // requested length of the read - private byte[] buffer; // the buffer itself - private int bufferindex = -1; // index in the buffers array in Buffer manager - private ReadBufferStatus status; // status of the buffer - private CountDownLatch latch = null; // signaled when the buffer is done reading, so any client - // waiting on this buffer gets unblocked - - // fields to help with eviction logic - private long timeStamp = 0; // tick at which buffer became available to read - private boolean isFirstByteConsumed = false; - private boolean isLastByteConsumed = false; - private boolean isAnyByteConsumed = false; - - public SeaweedInputStream getStream() { - return stream; - } - - public void setStream(SeaweedInputStream stream) { - this.stream = stream; - } - - public long getOffset() { - return offset; - } - - public void setOffset(long offset) { - this.offset = offset; - } - - public int getLength() { - return length; - } - - public void setLength(int length) { - this.length = length; - } - - public int getRequestedLength() { - return requestedLength; - } - - public void setRequestedLength(int requestedLength) { - this.requestedLength = requestedLength; - } - - public byte[] getBuffer() { - return buffer; - } - - public void setBuffer(byte[] buffer) { - this.buffer = buffer; - } - - public int getBufferindex() { - return bufferindex; - } - - public void setBufferindex(int bufferindex) { - this.bufferindex = bufferindex; - } - - public ReadBufferStatus getStatus() { - return status; - } - - public void setStatus(ReadBufferStatus status) { - this.status = status; - } - - public CountDownLatch getLatch() { - return latch; - } - - public void setLatch(CountDownLatch latch) { - this.latch = latch; - } - - public long getTimeStamp() { - return timeStamp; - } - - public void setTimeStamp(long timeStamp) { - this.timeStamp = timeStamp; - } - - public boolean isFirstByteConsumed() { - return isFirstByteConsumed; - } - - public void setFirstByteConsumed(boolean isFirstByteConsumed) { - this.isFirstByteConsumed = isFirstByteConsumed; - } - - public boolean isLastByteConsumed() { - return isLastByteConsumed; - } - - public void setLastByteConsumed(boolean isLastByteConsumed) { - this.isLastByteConsumed = isLastByteConsumed; - } - - public boolean isAnyByteConsumed() { - return isAnyByteConsumed; - } - - public void setAnyByteConsumed(boolean isAnyByteConsumed) { - this.isAnyByteConsumed = isAnyByteConsumed; - } - -} diff --git a/other/java/hdfs2/src/main/java/seaweed/hdfs/ReadBufferManager.java b/other/java/hdfs2/src/main/java/seaweed/hdfs/ReadBufferManager.java deleted file mode 100644 index 5b1e21529..000000000 --- a/other/java/hdfs2/src/main/java/seaweed/hdfs/ReadBufferManager.java +++ /dev/null @@ -1,394 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * <p> - * http://www.apache.org/licenses/LICENSE-2.0 - * <p> - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package seaweed.hdfs; - -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.util.Collection; -import java.util.LinkedList; -import java.util.Queue; -import java.util.Stack; -import java.util.concurrent.CountDownLatch; - -/** - * The Read Buffer Manager for Rest AbfsClient. - */ -final class ReadBufferManager { - private static final Logger LOGGER = LoggerFactory.getLogger(ReadBufferManager.class); - - private static final int NUM_BUFFERS = 16; - private static final int BLOCK_SIZE = 4 * 1024 * 1024; - private static final int NUM_THREADS = 8; - private static final int THRESHOLD_AGE_MILLISECONDS = 3000; // have to see if 3 seconds is a good threshold - - private Thread[] threads = new Thread[NUM_THREADS]; - private byte[][] buffers; // array of byte[] buffers, to hold the data that is read - private Stack<Integer> freeList = new Stack<>(); // indices in buffers[] array that are available - - private Queue<ReadBuffer> readAheadQueue = new LinkedList<>(); // queue of requests that are not picked up by any worker thread yet - private LinkedList<ReadBuffer> inProgressList = new LinkedList<>(); // requests being processed by worker threads - private LinkedList<ReadBuffer> completedReadList = new LinkedList<>(); // buffers available for reading - private static final ReadBufferManager BUFFER_MANAGER; // singleton, initialized in static initialization block - - static { - BUFFER_MANAGER = new ReadBufferManager(); - BUFFER_MANAGER.init(); - } - - static ReadBufferManager getBufferManager() { - return BUFFER_MANAGER; - } - - private void init() { - buffers = new byte[NUM_BUFFERS][]; - for (int i = 0; i < NUM_BUFFERS; i++) { - buffers[i] = new byte[BLOCK_SIZE]; // same buffers are reused. The byte array never goes back to GC - freeList.add(i); - } - for (int i = 0; i < NUM_THREADS; i++) { - Thread t = new Thread(new ReadBufferWorker(i)); - t.setDaemon(true); - threads[i] = t; - t.setName("SeaweedFS-prefetch-" + i); - t.start(); - } - ReadBufferWorker.UNLEASH_WORKERS.countDown(); - } - - // hide instance constructor - private ReadBufferManager() { - } - - - /* - * - * SeaweedInputStream-facing methods - * - */ - - - /** - * {@link SeaweedInputStream} calls this method to queue read-aheads. - * - * @param stream The {@link SeaweedInputStream} for which to do the read-ahead - * @param requestedOffset The offset in the file which shoukd be read - * @param requestedLength The length to read - */ - void queueReadAhead(final SeaweedInputStream stream, final long requestedOffset, final int requestedLength) { - if (LOGGER.isTraceEnabled()) { - LOGGER.trace("Start Queueing readAhead for {} offset {} length {}", - stream.getPath(), requestedOffset, requestedLength); - } - ReadBuffer buffer; - synchronized (this) { - if (isAlreadyQueued(stream, requestedOffset)) { - return; // already queued, do not queue again - } - if (freeList.isEmpty() && !tryEvict()) { - return; // no buffers available, cannot queue anything - } - - buffer = new ReadBuffer(); - buffer.setStream(stream); - buffer.setOffset(requestedOffset); - buffer.setLength(0); - buffer.setRequestedLength(requestedLength); - buffer.setStatus(ReadBufferStatus.NOT_AVAILABLE); - buffer.setLatch(new CountDownLatch(1)); - - Integer bufferIndex = freeList.pop(); // will return a value, since we have checked size > 0 already - - buffer.setBuffer(buffers[bufferIndex]); - buffer.setBufferindex(bufferIndex); - readAheadQueue.add(buffer); - notifyAll(); - } - if (LOGGER.isTraceEnabled()) { - LOGGER.trace("Done q-ing readAhead for file {} offset {} buffer idx {}", - stream.getPath(), requestedOffset, buffer.getBufferindex()); - } - } - - - /** - * {@link SeaweedInputStream} calls this method read any bytes already available in a buffer (thereby saving a - * remote read). This returns the bytes if the data already exists in buffer. If there is a buffer that is reading - * the requested offset, then this method blocks until that read completes. If the data is queued in a read-ahead - * but not picked up by a worker thread yet, then it cancels that read-ahead and reports cache miss. This is because - * depending on worker thread availability, the read-ahead may take a while - the calling thread can do it's own - * read to get the data faster (copmared to the read waiting in queue for an indeterminate amount of time). - * - * @param stream the file to read bytes for - * @param position the offset in the file to do a read for - * @param length the length to read - * @param buffer the buffer to read data into. Note that the buffer will be written into from offset 0. - * @return the number of bytes read - */ - int getBlock(final SeaweedInputStream stream, final long position, final int length, final byte[] buffer) { - // not synchronized, so have to be careful with locking - if (LOGGER.isTraceEnabled()) { - LOGGER.trace("getBlock for file {} position {} thread {}", - stream.getPath(), position, Thread.currentThread().getName()); - } - - waitForProcess(stream, position); - - int bytesRead = 0; - synchronized (this) { - bytesRead = getBlockFromCompletedQueue(stream, position, length, buffer); - } - if (bytesRead > 0) { - if (LOGGER.isTraceEnabled()) { - LOGGER.trace("Done read from Cache for {} position {} length {}", - stream.getPath(), position, bytesRead); - } - return bytesRead; - } - - // otherwise, just say we got nothing - calling thread can do its own read - return 0; - } - - /* - * - * Internal methods - * - */ - - private void waitForProcess(final SeaweedInputStream stream, final long position) { - ReadBuffer readBuf; - synchronized (this) { - clearFromReadAheadQueue(stream, position); - readBuf = getFromList(inProgressList, stream, position); - } - if (readBuf != null) { // if in in-progress queue, then block for it - try { - if (LOGGER.isTraceEnabled()) { - LOGGER.trace("got a relevant read buffer for file {} offset {} buffer idx {}", - stream.getPath(), readBuf.getOffset(), readBuf.getBufferindex()); - } - readBuf.getLatch().await(); // blocking wait on the caller stream's thread - // Note on correctness: readBuf gets out of inProgressList only in 1 place: after worker thread - // is done processing it (in doneReading). There, the latch is set after removing the buffer from - // inProgressList. So this latch is safe to be outside the synchronized block. - // Putting it in synchronized would result in a deadlock, since this thread would be holding the lock - // while waiting, so no one will be able to change any state. If this becomes more complex in the future, - // then the latch cane be removed and replaced with wait/notify whenever inProgressList is touched. - } catch (InterruptedException ex) { - Thread.currentThread().interrupt(); - } - if (LOGGER.isTraceEnabled()) { - LOGGER.trace("latch done for file {} buffer idx {} length {}", - stream.getPath(), readBuf.getBufferindex(), readBuf.getLength()); - } - } - } - - /** - * If any buffer in the completedlist can be reclaimed then reclaim it and return the buffer to free list. - * The objective is to find just one buffer - there is no advantage to evicting more than one. - * - * @return whether the eviction succeeeded - i.e., were we able to free up one buffer - */ - private synchronized boolean tryEvict() { - ReadBuffer nodeToEvict = null; - if (completedReadList.size() <= 0) { - return false; // there are no evict-able buffers - } - - // first, try buffers where all bytes have been consumed (approximated as first and last bytes consumed) - for (ReadBuffer buf : completedReadList) { - if (buf.isFirstByteConsumed() && buf.isLastByteConsumed()) { - nodeToEvict = buf; - break; - } - } - if (nodeToEvict != null) { - return evict(nodeToEvict); - } - - // next, try buffers where any bytes have been consumed (may be a bad idea? have to experiment and see) - for (ReadBuffer buf : completedReadList) { - if (buf.isAnyByteConsumed()) { - nodeToEvict = buf; - break; - } - } - - if (nodeToEvict != null) { - return evict(nodeToEvict); - } - - // next, try any old nodes that have not been consumed - long earliestBirthday = Long.MAX_VALUE; - for (ReadBuffer buf : completedReadList) { - if (buf.getTimeStamp() < earliestBirthday) { - nodeToEvict = buf; - earliestBirthday = buf.getTimeStamp(); - } - } - if ((currentTimeMillis() - earliestBirthday > THRESHOLD_AGE_MILLISECONDS) && (nodeToEvict != null)) { - return evict(nodeToEvict); - } - - // nothing can be evicted - return false; - } - - private boolean evict(final ReadBuffer buf) { - freeList.push(buf.getBufferindex()); - completedReadList.remove(buf); - if (LOGGER.isTraceEnabled()) { - LOGGER.trace("Evicting buffer idx {}; was used for file {} offset {} length {}", - buf.getBufferindex(), buf.getStream().getPath(), buf.getOffset(), buf.getLength()); - } - return true; - } - - private boolean isAlreadyQueued(final SeaweedInputStream stream, final long requestedOffset) { - // returns true if any part of the buffer is already queued - return (isInList(readAheadQueue, stream, requestedOffset) - || isInList(inProgressList, stream, requestedOffset) - || isInList(completedReadList, stream, requestedOffset)); - } - - private boolean isInList(final Collection<ReadBuffer> list, final SeaweedInputStream stream, final long requestedOffset) { - return (getFromList(list, stream, requestedOffset) != null); - } - - private ReadBuffer getFromList(final Collection<ReadBuffer> list, final SeaweedInputStream stream, final long requestedOffset) { - for (ReadBuffer buffer : list) { - if (buffer.getStream() == stream) { - if (buffer.getStatus() == ReadBufferStatus.AVAILABLE - && requestedOffset >= buffer.getOffset() - && requestedOffset < buffer.getOffset() + buffer.getLength()) { - return buffer; - } else if (requestedOffset >= buffer.getOffset() - && requestedOffset < buffer.getOffset() + buffer.getRequestedLength()) { - return buffer; - } - } - } - return null; - } - - private void clearFromReadAheadQueue(final SeaweedInputStream stream, final long requestedOffset) { - ReadBuffer buffer = getFromList(readAheadQueue, stream, requestedOffset); - if (buffer != null) { - readAheadQueue.remove(buffer); - notifyAll(); // lock is held in calling method - freeList.push(buffer.getBufferindex()); - } - } - - private int getBlockFromCompletedQueue(final SeaweedInputStream stream, final long position, final int length, - final byte[] buffer) { - ReadBuffer buf = getFromList(completedReadList, stream, position); - if (buf == null || position >= buf.getOffset() + buf.getLength()) { - return 0; - } - int cursor = (int) (position - buf.getOffset()); - int availableLengthInBuffer = buf.getLength() - cursor; - int lengthToCopy = Math.min(length, availableLengthInBuffer); - System.arraycopy(buf.getBuffer(), cursor, buffer, 0, lengthToCopy); - if (cursor == 0) { - buf.setFirstByteConsumed(true); - } - if (cursor + lengthToCopy == buf.getLength()) { - buf.setLastByteConsumed(true); - } - buf.setAnyByteConsumed(true); - return lengthToCopy; - } - - /* - * - * ReadBufferWorker-thread-facing methods - * - */ - - /** - * ReadBufferWorker thread calls this to get the next buffer that it should work on. - * - * @return {@link ReadBuffer} - * @throws InterruptedException if thread is interrupted - */ - ReadBuffer getNextBlockToRead() throws InterruptedException { - ReadBuffer buffer = null; - synchronized (this) { - //buffer = readAheadQueue.take(); // blocking method - while (readAheadQueue.size() == 0) { - wait(); - } - buffer = readAheadQueue.remove(); - notifyAll(); - if (buffer == null) { - return null; // should never happen - } - buffer.setStatus(ReadBufferStatus.READING_IN_PROGRESS); - inProgressList.add(buffer); - } - if (LOGGER.isTraceEnabled()) { - LOGGER.trace("ReadBufferWorker picked file {} for offset {}", - buffer.getStream().getPath(), buffer.getOffset()); - } - return buffer; - } - - /** - * ReadBufferWorker thread calls this method to post completion. - * - * @param buffer the buffer whose read was completed - * @param result the {@link ReadBufferStatus} after the read operation in the worker thread - * @param bytesActuallyRead the number of bytes that the worker thread was actually able to read - */ - void doneReading(final ReadBuffer buffer, final ReadBufferStatus result, final int bytesActuallyRead) { - if (LOGGER.isTraceEnabled()) { - LOGGER.trace("ReadBufferWorker completed file {} for offset {} bytes {}", - buffer.getStream().getPath(), buffer.getOffset(), bytesActuallyRead); - } - synchronized (this) { - inProgressList.remove(buffer); - if (result == ReadBufferStatus.AVAILABLE && bytesActuallyRead > 0) { - buffer.setStatus(ReadBufferStatus.AVAILABLE); - buffer.setTimeStamp(currentTimeMillis()); - buffer.setLength(bytesActuallyRead); - completedReadList.add(buffer); - } else { - freeList.push(buffer.getBufferindex()); - // buffer should go out of scope after the end of the calling method in ReadBufferWorker, and eligible for GC - } - } - //outside the synchronized, since anyone receiving a wake-up from the latch must see safe-published results - buffer.getLatch().countDown(); // wake up waiting threads (if any) - } - - /** - * Similar to System.currentTimeMillis, except implemented with System.nanoTime(). - * System.currentTimeMillis can go backwards when system clock is changed (e.g., with NTP time synchronization), - * making it unsuitable for measuring time intervals. nanotime is strictly monotonically increasing per CPU core. - * Note: it is not monotonic across Sockets, and even within a CPU, its only the - * more recent parts which share a clock across all cores. - * - * @return current time in milliseconds - */ - private long currentTimeMillis() { - return System.nanoTime() / 1000 / 1000; - } -} diff --git a/other/java/hdfs2/src/main/java/seaweed/hdfs/ReadBufferWorker.java b/other/java/hdfs2/src/main/java/seaweed/hdfs/ReadBufferWorker.java deleted file mode 100644 index 6ffbc4644..000000000 --- a/other/java/hdfs2/src/main/java/seaweed/hdfs/ReadBufferWorker.java +++ /dev/null @@ -1,70 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package seaweed.hdfs; - -import java.util.concurrent.CountDownLatch; - -class ReadBufferWorker implements Runnable { - - protected static final CountDownLatch UNLEASH_WORKERS = new CountDownLatch(1); - private int id; - - ReadBufferWorker(final int id) { - this.id = id; - } - - /** - * return the ID of ReadBufferWorker. - */ - public int getId() { - return this.id; - } - - /** - * Waits until a buffer becomes available in ReadAheadQueue. - * Once a buffer becomes available, reads the file specified in it and then posts results back to buffer manager. - * Rinse and repeat. Forever. - */ - public void run() { - try { - UNLEASH_WORKERS.await(); - } catch (InterruptedException ex) { - Thread.currentThread().interrupt(); - } - ReadBufferManager bufferManager = ReadBufferManager.getBufferManager(); - ReadBuffer buffer; - while (true) { - try { - buffer = bufferManager.getNextBlockToRead(); // blocks, until a buffer is available for this thread - } catch (InterruptedException ex) { - Thread.currentThread().interrupt(); - return; - } - if (buffer != null) { - try { - // do the actual read, from the file. - int bytesRead = buffer.getStream().readRemote(buffer.getOffset(), buffer.getBuffer(), 0, buffer.getRequestedLength()); - bufferManager.doneReading(buffer, ReadBufferStatus.AVAILABLE, bytesRead); // post result back to ReadBufferManager - } catch (Exception ex) { - bufferManager.doneReading(buffer, ReadBufferStatus.READ_FAILED, 0); - } - } - } - } -} diff --git a/other/java/hdfs3/src/main/java/seaweed/hdfs/ReadBufferStatus.java b/other/java/hdfs2/src/main/java/seaweed/hdfs/SeaweedAbstractFileSystem.java index d63674977..e021401aa 100644 --- a/other/java/hdfs3/src/main/java/seaweed/hdfs/ReadBufferStatus.java +++ b/other/java/hdfs2/src/main/java/seaweed/hdfs/SeaweedAbstractFileSystem.java @@ -18,12 +18,18 @@ package seaweed.hdfs; -/** - * The ReadBufferStatus for Rest AbfsClient - */ -public enum ReadBufferStatus { - NOT_AVAILABLE, // buffers sitting in readaheadqueue have this stats - READING_IN_PROGRESS, // reading is in progress on this buffer. Buffer should be in inProgressList - AVAILABLE, // data is available in buffer. It should be in completedList - READ_FAILED // read completed, but failed. +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.DelegateToFileSystem; + +import java.io.IOException; +import java.net.URI; +import java.net.URISyntaxException; + +public class SeaweedAbstractFileSystem extends DelegateToFileSystem { + + SeaweedAbstractFileSystem(final URI uri, final Configuration conf) + throws IOException, URISyntaxException { + super(uri, new SeaweedFileSystem(), conf, "seaweedfs", false); + } + } diff --git a/other/java/hdfs2/src/main/java/seaweed/hdfs/SeaweedFileSystem.java b/other/java/hdfs2/src/main/java/seaweed/hdfs/SeaweedFileSystem.java index d471d8440..ca67c3874 100644 --- a/other/java/hdfs2/src/main/java/seaweed/hdfs/SeaweedFileSystem.java +++ b/other/java/hdfs2/src/main/java/seaweed/hdfs/SeaweedFileSystem.java @@ -5,31 +5,29 @@ import org.apache.hadoop.fs.*; import org.apache.hadoop.fs.permission.AclEntry; import org.apache.hadoop.fs.permission.AclStatus; import org.apache.hadoop.fs.permission.FsPermission; -import org.apache.hadoop.security.AccessControlException; import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.util.Progressable; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import seaweedfs.client.FilerProto; import java.io.FileNotFoundException; import java.io.IOException; -import java.io.InputStream; import java.io.OutputStream; import java.net.URI; import java.util.EnumSet; import java.util.List; import java.util.Map; -import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.IO_FILE_BUFFER_SIZE_KEY; +public class SeaweedFileSystem extends FileSystem { -public class SeaweedFileSystem extends org.apache.hadoop.fs.FileSystem { - - public static final int FS_SEAWEED_DEFAULT_PORT = 8888; public static final String FS_SEAWEED_FILER_HOST = "fs.seaweed.filer.host"; public static final String FS_SEAWEED_FILER_PORT = "fs.seaweed.filer.port"; + public static final int FS_SEAWEED_DEFAULT_PORT = 8888; + public static final String FS_SEAWEED_BUFFER_SIZE = "fs.seaweed.buffer.size"; + public static final int FS_SEAWEED_DEFAULT_BUFFER_SIZE = 4 * 1024 * 1024; private static final Logger LOG = LoggerFactory.getLogger(SeaweedFileSystem.class); - private static int BUFFER_SIZE = 16 * 1024 * 1024; private URI uri; private Path workingDirectory = new Path("/"); @@ -60,12 +58,10 @@ public class SeaweedFileSystem extends org.apache.hadoop.fs.FileSystem { port = (port == -1) ? FS_SEAWEED_DEFAULT_PORT : port; conf.setInt(FS_SEAWEED_FILER_PORT, port); - conf.setInt(IO_FILE_BUFFER_SIZE_KEY, BUFFER_SIZE); - setConf(conf); this.uri = uri; - seaweedFileSystemStore = new SeaweedFileSystemStore(host, port); + seaweedFileSystemStore = new SeaweedFileSystemStore(host, port, conf); } @@ -77,8 +73,9 @@ public class SeaweedFileSystem extends org.apache.hadoop.fs.FileSystem { path = qualify(path); try { - InputStream inputStream = seaweedFileSystemStore.openFileForRead(path, statistics, bufferSize); - return new FSDataInputStream(inputStream); + int seaweedBufferSize = this.getConf().getInt(FS_SEAWEED_BUFFER_SIZE, FS_SEAWEED_DEFAULT_BUFFER_SIZE); + FSInputStream inputStream = seaweedFileSystemStore.openFileForRead(path, statistics); + return new FSDataInputStream(new BufferedFSInputStream(inputStream, 4 * seaweedBufferSize)); } catch (Exception ex) { LOG.warn("open path: {} bufferSize:{}", path, bufferSize, ex); return null; @@ -95,7 +92,8 @@ public class SeaweedFileSystem extends org.apache.hadoop.fs.FileSystem { try { String replicaPlacement = String.format("%03d", replication - 1); - OutputStream outputStream = seaweedFileSystemStore.createFile(path, overwrite, permission, bufferSize, replicaPlacement); + int seaweedBufferSize = this.getConf().getInt(FS_SEAWEED_BUFFER_SIZE, FS_SEAWEED_DEFAULT_BUFFER_SIZE); + OutputStream outputStream = seaweedFileSystemStore.createFile(path, overwrite, permission, seaweedBufferSize, replicaPlacement); return new FSDataOutputStream(outputStream, statistics); } catch (Exception ex) { LOG.warn("create path: {} bufferSize:{} blockSize:{}", path, bufferSize, blockSize, ex); @@ -105,8 +103,9 @@ public class SeaweedFileSystem extends org.apache.hadoop.fs.FileSystem { /** * {@inheritDoc} + * * @throws FileNotFoundException if the parent directory is not present -or - * is not a directory. + * is not a directory. */ @Override public FSDataOutputStream createNonRecursive(Path path, @@ -123,9 +122,10 @@ public class SeaweedFileSystem extends org.apache.hadoop.fs.FileSystem { throw new FileAlreadyExistsException("Not a directory: " + parent); } } + int seaweedBufferSize = this.getConf().getInt(FS_SEAWEED_BUFFER_SIZE, FS_SEAWEED_DEFAULT_BUFFER_SIZE); return create(path, permission, flags.contains(CreateFlag.OVERWRITE), bufferSize, - replication, blockSize, progress); + replication, seaweedBufferSize, progress); } @Override @@ -135,7 +135,8 @@ public class SeaweedFileSystem extends org.apache.hadoop.fs.FileSystem { path = qualify(path); try { - OutputStream outputStream = seaweedFileSystemStore.createFile(path, false, null, bufferSize, ""); + int seaweedBufferSize = this.getConf().getInt(FS_SEAWEED_BUFFER_SIZE, FS_SEAWEED_DEFAULT_BUFFER_SIZE); + OutputStream outputStream = seaweedFileSystemStore.createFile(path, false, null, seaweedBufferSize, ""); return new FSDataOutputStream(outputStream, statistics); } catch (Exception ex) { LOG.warn("append path: {} bufferSize:{}", path, bufferSize, ex); @@ -144,7 +145,7 @@ public class SeaweedFileSystem extends org.apache.hadoop.fs.FileSystem { } @Override - public boolean rename(Path src, Path dst) { + public boolean rename(Path src, Path dst) throws IOException { LOG.debug("rename path: {} => {}", src, dst); @@ -155,12 +156,13 @@ public class SeaweedFileSystem extends org.apache.hadoop.fs.FileSystem { if (src.equals(dst)) { return true; } - FileStatus dstFileStatus = getFileStatus(dst); + FilerProto.Entry entry = seaweedFileSystemStore.lookupEntry(dst); - String sourceFileName = src.getName(); Path adjustedDst = dst; - if (dstFileStatus != null) { + if (entry != null) { + FileStatus dstFileStatus = getFileStatus(dst); + String sourceFileName = src.getName(); if (!dstFileStatus.isDirectory()) { return false; } @@ -175,18 +177,20 @@ public class SeaweedFileSystem extends org.apache.hadoop.fs.FileSystem { } @Override - public boolean delete(Path path, boolean recursive) { + public boolean delete(Path path, boolean recursive) throws IOException { LOG.debug("delete path: {} recursive:{}", path, recursive); path = qualify(path); - FileStatus fileStatus = getFileStatus(path); + FilerProto.Entry entry = seaweedFileSystemStore.lookupEntry(path); - if (fileStatus == null) { + if (entry == null) { return true; } + FileStatus fileStatus = getFileStatus(path); + return seaweedFileSystemStore.deleteEntries(path, fileStatus.isDirectory(), recursive); } @@ -222,9 +226,9 @@ public class SeaweedFileSystem extends org.apache.hadoop.fs.FileSystem { path = qualify(path); - FileStatus fileStatus = getFileStatus(path); + FilerProto.Entry entry = seaweedFileSystemStore.lookupEntry(path); - if (fileStatus == null) { + if (entry == null) { UserGroupInformation currentUser = UserGroupInformation.getCurrentUser(); return seaweedFileSystemStore.createDirectory(path, currentUser, @@ -233,6 +237,8 @@ public class SeaweedFileSystem extends org.apache.hadoop.fs.FileSystem { } + FileStatus fileStatus = getFileStatus(path); + if (fileStatus.isDirectory()) { return true; } else { @@ -241,7 +247,7 @@ public class SeaweedFileSystem extends org.apache.hadoop.fs.FileSystem { } @Override - public FileStatus getFileStatus(Path path) { + public FileStatus getFileStatus(Path path) throws IOException { LOG.debug("getFileStatus path: {}", path); @@ -335,9 +341,7 @@ public class SeaweedFileSystem extends org.apache.hadoop.fs.FileSystem { @Override public void createSymlink(final Path target, final Path link, - final boolean createParent) throws AccessControlException, - FileAlreadyExistsException, FileNotFoundException, - ParentNotDirectoryException, UnsupportedFileSystemException, + final boolean createParent) throws IOException { // Supporting filesystems should override this method throw new UnsupportedOperationException( diff --git a/other/java/hdfs2/src/main/java/seaweed/hdfs/SeaweedFileSystemStore.java b/other/java/hdfs2/src/main/java/seaweed/hdfs/SeaweedFileSystemStore.java index 9617a38be..23556a578 100644 --- a/other/java/hdfs2/src/main/java/seaweed/hdfs/SeaweedFileSystemStore.java +++ b/other/java/hdfs2/src/main/java/seaweed/hdfs/SeaweedFileSystemStore.java @@ -1,5 +1,7 @@ package seaweed.hdfs; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FSInputStream; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; @@ -7,30 +9,31 @@ import org.apache.hadoop.fs.permission.FsPermission; import org.apache.hadoop.security.UserGroupInformation; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import seaweedfs.client.FilerClient; -import seaweedfs.client.FilerGrpcClient; -import seaweedfs.client.FilerProto; -import seaweedfs.client.SeaweedRead; +import seaweedfs.client.*; import java.io.FileNotFoundException; import java.io.IOException; -import java.io.InputStream; import java.io.OutputStream; import java.util.ArrayList; import java.util.Arrays; import java.util.List; +import static seaweed.hdfs.SeaweedFileSystem.FS_SEAWEED_BUFFER_SIZE; +import static seaweed.hdfs.SeaweedFileSystem.FS_SEAWEED_DEFAULT_BUFFER_SIZE; + public class SeaweedFileSystemStore { private static final Logger LOG = LoggerFactory.getLogger(SeaweedFileSystemStore.class); private FilerGrpcClient filerGrpcClient; private FilerClient filerClient; + private Configuration conf; - public SeaweedFileSystemStore(String host, int port) { + public SeaweedFileSystemStore(String host, int port, Configuration conf) { int grpcPort = 10000 + port; filerGrpcClient = new FilerGrpcClient(host, grpcPort); filerClient = new FilerClient(filerGrpcClient); + this.conf = conf; } public static String getParentDirectory(Path path) { @@ -61,7 +64,7 @@ public class SeaweedFileSystemStore { ); } - public FileStatus[] listEntries(final Path path) { + public FileStatus[] listEntries(final Path path) throws IOException { LOG.debug("listEntries path: {}", path); FileStatus pathStatus = getFileStatus(path); @@ -89,11 +92,11 @@ public class SeaweedFileSystemStore { } - public FileStatus getFileStatus(final Path path) { + public FileStatus getFileStatus(final Path path) throws IOException { FilerProto.Entry entry = lookupEntry(path); if (entry == null) { - return null; + throw new FileNotFoundException("File does not exist: " + path); } LOG.debug("doGetFileStatus path:{} entry:{}", path, entry); @@ -123,10 +126,10 @@ public class SeaweedFileSystemStore { private FileStatus doGetFileStatus(Path path, FilerProto.Entry entry) { FilerProto.FuseAttributes attributes = entry.getAttributes(); - long length = SeaweedRead.totalSize(entry.getChunksList()); + long length = SeaweedRead.fileSize(entry); boolean isDir = entry.getIsDirectory(); int block_replication = 1; - int blocksize = 512; + int blocksize = this.conf.getInt(FS_SEAWEED_BUFFER_SIZE, FS_SEAWEED_DEFAULT_BUFFER_SIZE); long modification_time = attributes.getMtime() * 1000; // milliseconds long access_time = 0; FsPermission permission = FsPermission.createImmutable((short) attributes.getFileMode()); @@ -136,7 +139,7 @@ public class SeaweedFileSystemStore { modification_time, access_time, permission, owner, group, null, path); } - private FilerProto.Entry lookupEntry(Path path) { + public FilerProto.Entry lookupEntry(Path path) { return filerClient.lookupEntry(getParentDirectory(path), path.getName()); @@ -184,7 +187,7 @@ public class SeaweedFileSystemStore { entry.mergeFrom(existingEntry); entry.getAttributesBuilder().setMtime(now); LOG.debug("createFile merged entry path:{} entry:{} from:{}", path, entry, existingEntry); - writePosition = SeaweedRead.totalSize(existingEntry.getChunksList()); + writePosition = SeaweedRead.fileSize(existingEntry); replication = existingEntry.getAttributes().getReplication(); } } @@ -201,18 +204,17 @@ public class SeaweedFileSystemStore { .clearGroupName() .addAllGroupName(Arrays.asList(userGroupInformation.getGroupNames())) ); + SeaweedWrite.writeMeta(filerGrpcClient, getParentDirectory(path), entry); } return new SeaweedOutputStream(filerGrpcClient, path, entry, writePosition, bufferSize, replication); } - public InputStream openFileForRead(final Path path, FileSystem.Statistics statistics, - int bufferSize) throws IOException { + public FSInputStream openFileForRead(final Path path, FileSystem.Statistics statistics) throws IOException { - LOG.debug("openFileForRead path:{} bufferSize:{}", path, bufferSize); + LOG.debug("openFileForRead path:{}", path); - int readAheadQueueDepth = 2; FilerProto.Entry entry = lookupEntry(path); if (entry == null) { @@ -222,9 +224,7 @@ public class SeaweedFileSystemStore { return new SeaweedInputStream(filerGrpcClient, statistics, path.toUri().getPath(), - entry, - bufferSize, - readAheadQueueDepth); + entry); } public void setOwner(Path path, String owner, String group) { diff --git a/other/java/hdfs2/src/main/java/seaweed/hdfs/SeaweedInputStream.java b/other/java/hdfs2/src/main/java/seaweed/hdfs/SeaweedInputStream.java index 90c14c772..8bda2e092 100644 --- a/other/java/hdfs2/src/main/java/seaweed/hdfs/SeaweedInputStream.java +++ b/other/java/hdfs2/src/main/java/seaweed/hdfs/SeaweedInputStream.java @@ -2,7 +2,6 @@ package seaweed.hdfs; // based on org.apache.hadoop.fs.azurebfs.services.AbfsInputStream -import com.google.common.base.Preconditions; import org.apache.hadoop.fs.FSExceptionMessages; import org.apache.hadoop.fs.FSInputStream; import org.apache.hadoop.fs.FileSystem.Statistics; @@ -26,36 +25,23 @@ public class SeaweedInputStream extends FSInputStream { private final FilerProto.Entry entry; private final List<SeaweedRead.VisibleInterval> visibleIntervalList; private final long contentLength; - private final int bufferSize; // default buffer size - private final int readAheadQueueDepth; // initialized in constructor - private final boolean readAheadEnabled; // whether enable readAhead; - private byte[] buffer = null; // will be initialized on first use + private long position = 0; // cursor of the file - private long fCursor = 0; // cursor of buffer within file - offset of next byte to read from remote server - private long fCursorAfterLastRead = -1; - private int bCursor = 0; // cursor of read within buffer - offset of next byte to be returned from buffer - private int limit = 0; // offset of next byte to be read into buffer from service (i.e., upper marker+1 - // of valid bytes in buffer) private boolean closed = false; public SeaweedInputStream( - final FilerGrpcClient filerGrpcClient, - final Statistics statistics, - final String path, - final FilerProto.Entry entry, - final int bufferSize, - final int readAheadQueueDepth) { + final FilerGrpcClient filerGrpcClient, + final Statistics statistics, + final String path, + final FilerProto.Entry entry) throws IOException { this.filerGrpcClient = filerGrpcClient; this.statistics = statistics; this.path = path; this.entry = entry; - this.contentLength = SeaweedRead.totalSize(entry.getChunksList()); - this.bufferSize = bufferSize; - this.readAheadQueueDepth = (readAheadQueueDepth >= 0) ? readAheadQueueDepth : Runtime.getRuntime().availableProcessors(); - this.readAheadEnabled = true; + this.contentLength = SeaweedRead.fileSize(entry); - this.visibleIntervalList = SeaweedRead.nonOverlappingVisibleIntervals(entry.getChunksList()); + this.visibleIntervalList = SeaweedRead.nonOverlappingVisibleIntervals(filerGrpcClient, entry.getChunksList()); LOG.debug("new path:{} entry:{} visibleIntervalList:{}", path, entry, visibleIntervalList); @@ -78,122 +64,7 @@ public class SeaweedInputStream extends FSInputStream { @Override public synchronized int read(final byte[] b, final int off, final int len) throws IOException { - int currentOff = off; - int currentLen = len; - int lastReadBytes; - int totalReadBytes = 0; - do { - lastReadBytes = readOneBlock(b, currentOff, currentLen); - if (lastReadBytes > 0) { - currentOff += lastReadBytes; - currentLen -= lastReadBytes; - totalReadBytes += lastReadBytes; - } - if (currentLen <= 0 || currentLen > b.length - currentOff) { - break; - } - } while (lastReadBytes > 0); - return totalReadBytes > 0 ? totalReadBytes : lastReadBytes; - } - - private int readOneBlock(final byte[] b, final int off, final int len) throws IOException { - if (closed) { - throw new IOException(FSExceptionMessages.STREAM_IS_CLOSED); - } - - Preconditions.checkNotNull(b); - - if (len == 0) { - return 0; - } - - if (this.available() == 0) { - return -1; - } - - if (off < 0 || len < 0 || len > b.length - off) { - throw new IndexOutOfBoundsException(); - } - - //If buffer is empty, then fill the buffer. - if (bCursor == limit) { - //If EOF, then return -1 - if (fCursor >= contentLength) { - return -1; - } - - long bytesRead = 0; - //reset buffer to initial state - i.e., throw away existing data - bCursor = 0; - limit = 0; - if (buffer == null) { - buffer = new byte[bufferSize]; - } - - // Enable readAhead when reading sequentially - if (-1 == fCursorAfterLastRead || fCursorAfterLastRead == fCursor || b.length >= bufferSize) { - bytesRead = readInternal(fCursor, buffer, 0, bufferSize, false); - } else { - bytesRead = readInternal(fCursor, buffer, 0, b.length, true); - } - - if (bytesRead == -1) { - return -1; - } - limit += bytesRead; - fCursor += bytesRead; - fCursorAfterLastRead = fCursor; - } - - //If there is anything in the buffer, then return lesser of (requested bytes) and (bytes in buffer) - //(bytes returned may be less than requested) - int bytesRemaining = limit - bCursor; - int bytesToRead = Math.min(len, bytesRemaining); - System.arraycopy(buffer, bCursor, b, off, bytesToRead); - bCursor += bytesToRead; - if (statistics != null) { - statistics.incrementBytesRead(bytesToRead); - } - return bytesToRead; - } - - - private int readInternal(final long position, final byte[] b, final int offset, final int length, - final boolean bypassReadAhead) throws IOException { - if (readAheadEnabled && !bypassReadAhead) { - // try reading from read-ahead - if (offset != 0) { - throw new IllegalArgumentException("readahead buffers cannot have non-zero buffer offsets"); - } - int receivedBytes; - - // queue read-aheads - int numReadAheads = this.readAheadQueueDepth; - long nextSize; - long nextOffset = position; - while (numReadAheads > 0 && nextOffset < contentLength) { - nextSize = Math.min((long) bufferSize, contentLength - nextOffset); - ReadBufferManager.getBufferManager().queueReadAhead(this, nextOffset, (int) nextSize); - nextOffset = nextOffset + nextSize; - numReadAheads--; - } - - // try reading from buffers first - receivedBytes = ReadBufferManager.getBufferManager().getBlock(this, position, length, b); - if (receivedBytes > 0) { - return receivedBytes; - } - - // got nothing from read-ahead, do our own read now - receivedBytes = readRemote(position, b, offset, length); - return receivedBytes; - } else { - return readRemote(position, b, offset, length); - } - } - - int readRemote(long position, byte[] b, int offset, int length) throws IOException { if (position < 0) { throw new IllegalArgumentException("attempting to read from negative offset"); } @@ -203,21 +74,30 @@ public class SeaweedInputStream extends FSInputStream { if (b == null) { throw new IllegalArgumentException("null byte array passed in to read() method"); } - if (offset >= b.length) { + if (off >= b.length) { throw new IllegalArgumentException("offset greater than length of array"); } - if (length < 0) { + if (len < 0) { throw new IllegalArgumentException("requested read length is less than zero"); } - if (length > (b.length - offset)) { + if (len > (b.length - off)) { throw new IllegalArgumentException("requested read length is more than will fit after requested offset in buffer"); } - long bytesRead = SeaweedRead.read(filerGrpcClient, visibleIntervalList, position, b, offset, length); + long bytesRead = SeaweedRead.read(this.filerGrpcClient, this.visibleIntervalList, this.position, b, off, len, SeaweedRead.fileSize(entry)); if (bytesRead > Integer.MAX_VALUE) { throw new IOException("Unexpected Content-Length"); } + + if (bytesRead > 0) { + this.position += bytesRead; + if (statistics != null) { + statistics.incrementBytesRead(bytesRead); + } + } + return (int) bytesRead; + } /** @@ -239,17 +119,8 @@ public class SeaweedInputStream extends FSInputStream { throw new EOFException(FSExceptionMessages.CANNOT_SEEK_PAST_EOF); } - if (n >= fCursor - limit && n <= fCursor) { // within buffer - bCursor = (int) (n - (fCursor - limit)); - return; - } - - // next read will read from here - fCursor = n; + this.position = n; - //invalidate buffer - limit = 0; - bCursor = 0; } @Override @@ -257,20 +128,19 @@ public class SeaweedInputStream extends FSInputStream { if (closed) { throw new IOException(FSExceptionMessages.STREAM_IS_CLOSED); } - long currentPos = getPos(); - if (currentPos == contentLength) { + if (this.position == contentLength) { if (n > 0) { throw new EOFException(FSExceptionMessages.CANNOT_SEEK_PAST_EOF); } } - long newPos = currentPos + n; + long newPos = this.position + n; if (newPos < 0) { newPos = 0; - n = newPos - currentPos; + n = newPos - this.position; } if (newPos > contentLength) { newPos = contentLength; - n = newPos - currentPos; + n = newPos - this.position; } seek(newPos); return n; @@ -289,11 +159,11 @@ public class SeaweedInputStream extends FSInputStream { public synchronized int available() throws IOException { if (closed) { throw new IOException( - FSExceptionMessages.STREAM_IS_CLOSED); + FSExceptionMessages.STREAM_IS_CLOSED); } final long remaining = this.contentLength - this.getPos(); return remaining <= Integer.MAX_VALUE - ? (int) remaining : Integer.MAX_VALUE; + ? (int) remaining : Integer.MAX_VALUE; } /** @@ -321,7 +191,7 @@ public class SeaweedInputStream extends FSInputStream { if (closed) { throw new IOException(FSExceptionMessages.STREAM_IS_CLOSED); } - return fCursor - limit + bCursor; + return position; } /** @@ -338,7 +208,6 @@ public class SeaweedInputStream extends FSInputStream { @Override public synchronized void close() throws IOException { closed = true; - buffer = null; // de-reference the buffer so it can be GC'ed sooner } /** diff --git a/other/java/hdfs2/src/main/java/seaweed/hdfs/SeaweedOutputStream.java b/other/java/hdfs2/src/main/java/seaweed/hdfs/SeaweedOutputStream.java index 7b488a5da..26290c46c 100644 --- a/other/java/hdfs2/src/main/java/seaweed/hdfs/SeaweedOutputStream.java +++ b/other/java/hdfs2/src/main/java/seaweed/hdfs/SeaweedOutputStream.java @@ -7,6 +7,7 @@ import org.apache.hadoop.fs.FSExceptionMessages; import org.apache.hadoop.fs.Path; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import seaweedfs.client.ByteBufferPool; import seaweedfs.client.FilerGrpcClient; import seaweedfs.client.FilerProto; import seaweedfs.client.SeaweedWrite; @@ -14,6 +15,7 @@ import seaweedfs.client.SeaweedWrite; import java.io.IOException; import java.io.InterruptedIOException; import java.io.OutputStream; +import java.nio.ByteBuffer; import java.util.concurrent.*; import static seaweed.hdfs.SeaweedFileSystemStore.getParentDirectory; @@ -28,16 +30,16 @@ public class SeaweedOutputStream extends OutputStream { private final int maxConcurrentRequestCount; private final ThreadPoolExecutor threadExecutor; private final ExecutorCompletionService<Void> completionService; - private FilerProto.Entry.Builder entry; + private final FilerProto.Entry.Builder entry; + private final boolean supportFlush = false; // true; + private final ConcurrentLinkedDeque<WriteOperation> writeOperations; private long position; private boolean closed; - private boolean supportFlush = true; private volatile IOException lastError; private long lastFlushOffset; private long lastTotalAppendOffset = 0; - private byte[] buffer; - private int bufferIndex; - private ConcurrentLinkedDeque<WriteOperation> writeOperations; + private ByteBuffer buffer; + private long outputIndex; private String replication = "000"; public SeaweedOutputStream(FilerGrpcClient filerGrpcClient, final Path path, FilerProto.Entry.Builder entry, @@ -50,18 +52,18 @@ public class SeaweedOutputStream extends OutputStream { this.lastError = null; this.lastFlushOffset = 0; this.bufferSize = bufferSize; - this.buffer = new byte[bufferSize]; - this.bufferIndex = 0; + this.buffer = ByteBufferPool.request(bufferSize); + this.outputIndex = 0; this.writeOperations = new ConcurrentLinkedDeque<>(); - this.maxConcurrentRequestCount = 4 * Runtime.getRuntime().availableProcessors(); + this.maxConcurrentRequestCount = Runtime.getRuntime().availableProcessors(); this.threadExecutor - = new ThreadPoolExecutor(maxConcurrentRequestCount, - maxConcurrentRequestCount, - 10L, - TimeUnit.SECONDS, - new LinkedBlockingQueue<Runnable>()); + = new ThreadPoolExecutor(maxConcurrentRequestCount, + maxConcurrentRequestCount, + 120L, + TimeUnit.SECONDS, + new LinkedBlockingQueue<Runnable>()); this.completionService = new ExecutorCompletionService<>(this.threadExecutor); this.entry = entry; @@ -69,9 +71,6 @@ public class SeaweedOutputStream extends OutputStream { } private synchronized void flushWrittenBytesToServiceInternal(final long offset) throws IOException { - - LOG.debug("SeaweedWrite.writeMeta path: {} entry:{}", path, entry); - try { SeaweedWrite.writeMeta(filerGrpcClient, getParentDirectory(path), entry); } catch (Exception ex) { @@ -87,7 +86,7 @@ public class SeaweedOutputStream extends OutputStream { @Override public synchronized void write(final byte[] data, final int off, final int length) - throws IOException { + throws IOException { maybeThrowLastError(); Preconditions.checkArgument(data != null, "null data"); @@ -96,25 +95,29 @@ public class SeaweedOutputStream extends OutputStream { throw new IndexOutOfBoundsException(); } + // System.out.println(path + " write [" + (outputIndex + off) + "," + ((outputIndex + off) + length) + ")"); + int currentOffset = off; - int writableBytes = bufferSize - bufferIndex; + int writableBytes = bufferSize - buffer.position(); int numberOfBytesToWrite = length; while (numberOfBytesToWrite > 0) { - if (writableBytes <= numberOfBytesToWrite) { - System.arraycopy(data, currentOffset, buffer, bufferIndex, writableBytes); - bufferIndex += writableBytes; - writeCurrentBufferToService(); - currentOffset += writableBytes; - numberOfBytesToWrite = numberOfBytesToWrite - writableBytes; - } else { - System.arraycopy(data, currentOffset, buffer, bufferIndex, numberOfBytesToWrite); - bufferIndex += numberOfBytesToWrite; - numberOfBytesToWrite = 0; + + if (numberOfBytesToWrite < writableBytes) { + buffer.put(data, currentOffset, numberOfBytesToWrite); + outputIndex += numberOfBytesToWrite; + break; } - writableBytes = bufferSize - bufferIndex; + // System.out.println(path + " [" + (outputIndex + currentOffset) + "," + ((outputIndex + currentOffset) + writableBytes) + ") " + buffer.capacity()); + buffer.put(data, currentOffset, writableBytes); + outputIndex += writableBytes; + currentOffset += writableBytes; + writeCurrentBufferToService(); + numberOfBytesToWrite = numberOfBytesToWrite - writableBytes; + writableBytes = bufferSize - buffer.position(); } + } /** @@ -150,8 +153,9 @@ public class SeaweedOutputStream extends OutputStream { threadExecutor.shutdown(); } finally { lastError = new IOException(FSExceptionMessages.STREAM_IS_CLOSED); + ByteBufferPool.release(buffer); buffer = null; - bufferIndex = 0; + outputIndex = 0; closed = true; writeOperations.clear(); if (!threadExecutor.isShutdown()) { @@ -161,35 +165,39 @@ public class SeaweedOutputStream extends OutputStream { } private synchronized void writeCurrentBufferToService() throws IOException { - if (bufferIndex == 0) { + if (buffer.position() == 0) { return; } - final byte[] bytes = buffer; - final int bytesLength = bufferIndex; + position += submitWriteBufferToService(buffer, position); + + buffer = ByteBufferPool.request(bufferSize); - buffer = new byte[bufferSize]; - bufferIndex = 0; - final long offset = position; - position += bytesLength; + } - if (threadExecutor.getQueue().size() >= maxConcurrentRequestCount * 2) { + private synchronized int submitWriteBufferToService(final ByteBuffer bufferToWrite, final long writePosition) throws IOException { + + bufferToWrite.flip(); + int bytesLength = bufferToWrite.limit() - bufferToWrite.position(); + + if (threadExecutor.getQueue().size() >= maxConcurrentRequestCount) { waitForTaskToComplete(); } - - final Future<Void> job = completionService.submit(new Callable<Void>() { - @Override - public Void call() throws Exception { - // originally: client.append(path, offset, bytes, 0, bytesLength); - SeaweedWrite.writeData(entry, replication, filerGrpcClient, offset, bytes, 0, bytesLength); - return null; - } + final Future<Void> job = completionService.submit(() -> { + // System.out.println(path + " is going to save [" + (writePosition) + "," + ((writePosition) + bytesLength) + ")"); + SeaweedWrite.writeData(entry, replication, filerGrpcClient, writePosition, bufferToWrite.array(), bufferToWrite.position(), bufferToWrite.limit(), path.toUri().getPath()); + // System.out.println(path + " saved [" + (writePosition) + "," + ((writePosition) + bytesLength) + ")"); + ByteBufferPool.release(bufferToWrite); + return null; }); - writeOperations.add(new WriteOperation(job, offset, bytesLength)); + writeOperations.add(new WriteOperation(job, writePosition, bytesLength)); // Try to shrink the queue shrinkWriteOperationQueue(); + + return bytesLength; + } private void waitForTaskToComplete() throws IOException { diff --git a/other/java/hdfs3/dependency-reduced-pom.xml b/other/java/hdfs3/dependency-reduced-pom.xml index f2056b7b1..c4847a9b9 100644 --- a/other/java/hdfs3/dependency-reduced-pom.xml +++ b/other/java/hdfs3/dependency-reduced-pom.xml @@ -15,8 +15,8 @@ <plugin>
<artifactId>maven-compiler-plugin</artifactId>
<configuration>
- <source>7</source>
- <target>7</target>
+ <source>8</source>
+ <target>8</target>
</configuration>
</plugin>
<plugin>
@@ -120,6 +120,188 @@ </plugin>
</plugins>
</build>
+ <dependencies>
+ <dependency>
+ <groupId>org.apache.hadoop</groupId>
+ <artifactId>hadoop-client</artifactId>
+ <version>3.1.1</version>
+ <scope>provided</scope>
+ <exclusions>
+ <exclusion>
+ <artifactId>hadoop-hdfs-client</artifactId>
+ <groupId>org.apache.hadoop</groupId>
+ </exclusion>
+ <exclusion>
+ <artifactId>hadoop-yarn-api</artifactId>
+ <groupId>org.apache.hadoop</groupId>
+ </exclusion>
+ <exclusion>
+ <artifactId>hadoop-yarn-client</artifactId>
+ <groupId>org.apache.hadoop</groupId>
+ </exclusion>
+ <exclusion>
+ <artifactId>hadoop-mapreduce-client-core</artifactId>
+ <groupId>org.apache.hadoop</groupId>
+ </exclusion>
+ <exclusion>
+ <artifactId>hadoop-mapreduce-client-jobclient</artifactId>
+ <groupId>org.apache.hadoop</groupId>
+ </exclusion>
+ <exclusion>
+ <artifactId>hadoop-annotations</artifactId>
+ <groupId>org.apache.hadoop</groupId>
+ </exclusion>
+ </exclusions>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.hadoop</groupId>
+ <artifactId>hadoop-common</artifactId>
+ <version>3.1.1</version>
+ <scope>provided</scope>
+ <exclusions>
+ <exclusion>
+ <artifactId>commons-cli</artifactId>
+ <groupId>commons-cli</groupId>
+ </exclusion>
+ <exclusion>
+ <artifactId>commons-math3</artifactId>
+ <groupId>org.apache.commons</groupId>
+ </exclusion>
+ <exclusion>
+ <artifactId>commons-io</artifactId>
+ <groupId>commons-io</groupId>
+ </exclusion>
+ <exclusion>
+ <artifactId>commons-net</artifactId>
+ <groupId>commons-net</groupId>
+ </exclusion>
+ <exclusion>
+ <artifactId>commons-collections</artifactId>
+ <groupId>commons-collections</groupId>
+ </exclusion>
+ <exclusion>
+ <artifactId>javax.servlet-api</artifactId>
+ <groupId>javax.servlet</groupId>
+ </exclusion>
+ <exclusion>
+ <artifactId>jetty-server</artifactId>
+ <groupId>org.eclipse.jetty</groupId>
+ </exclusion>
+ <exclusion>
+ <artifactId>jetty-util</artifactId>
+ <groupId>org.eclipse.jetty</groupId>
+ </exclusion>
+ <exclusion>
+ <artifactId>jetty-servlet</artifactId>
+ <groupId>org.eclipse.jetty</groupId>
+ </exclusion>
+ <exclusion>
+ <artifactId>jetty-webapp</artifactId>
+ <groupId>org.eclipse.jetty</groupId>
+ </exclusion>
+ <exclusion>
+ <artifactId>jsp-api</artifactId>
+ <groupId>javax.servlet.jsp</groupId>
+ </exclusion>
+ <exclusion>
+ <artifactId>jersey-core</artifactId>
+ <groupId>com.sun.jersey</groupId>
+ </exclusion>
+ <exclusion>
+ <artifactId>jersey-servlet</artifactId>
+ <groupId>com.sun.jersey</groupId>
+ </exclusion>
+ <exclusion>
+ <artifactId>jersey-json</artifactId>
+ <groupId>com.sun.jersey</groupId>
+ </exclusion>
+ <exclusion>
+ <artifactId>jersey-server</artifactId>
+ <groupId>com.sun.jersey</groupId>
+ </exclusion>
+ <exclusion>
+ <artifactId>log4j</artifactId>
+ <groupId>log4j</groupId>
+ </exclusion>
+ <exclusion>
+ <artifactId>commons-lang</artifactId>
+ <groupId>commons-lang</groupId>
+ </exclusion>
+ <exclusion>
+ <artifactId>commons-beanutils</artifactId>
+ <groupId>commons-beanutils</groupId>
+ </exclusion>
+ <exclusion>
+ <artifactId>commons-configuration2</artifactId>
+ <groupId>org.apache.commons</groupId>
+ </exclusion>
+ <exclusion>
+ <artifactId>commons-lang3</artifactId>
+ <groupId>org.apache.commons</groupId>
+ </exclusion>
+ <exclusion>
+ <artifactId>slf4j-log4j12</artifactId>
+ <groupId>org.slf4j</groupId>
+ </exclusion>
+ <exclusion>
+ <artifactId>avro</artifactId>
+ <groupId>org.apache.avro</groupId>
+ </exclusion>
+ <exclusion>
+ <artifactId>re2j</artifactId>
+ <groupId>com.google.re2j</groupId>
+ </exclusion>
+ <exclusion>
+ <artifactId>hadoop-auth</artifactId>
+ <groupId>org.apache.hadoop</groupId>
+ </exclusion>
+ <exclusion>
+ <artifactId>jsch</artifactId>
+ <groupId>com.jcraft</groupId>
+ </exclusion>
+ <exclusion>
+ <artifactId>curator-client</artifactId>
+ <groupId>org.apache.curator</groupId>
+ </exclusion>
+ <exclusion>
+ <artifactId>curator-recipes</artifactId>
+ <groupId>org.apache.curator</groupId>
+ </exclusion>
+ <exclusion>
+ <artifactId>htrace-core4</artifactId>
+ <groupId>org.apache.htrace</groupId>
+ </exclusion>
+ <exclusion>
+ <artifactId>zookeeper</artifactId>
+ <groupId>org.apache.zookeeper</groupId>
+ </exclusion>
+ <exclusion>
+ <artifactId>commons-compress</artifactId>
+ <groupId>org.apache.commons</groupId>
+ </exclusion>
+ <exclusion>
+ <artifactId>kerb-simplekdc</artifactId>
+ <groupId>org.apache.kerby</groupId>
+ </exclusion>
+ <exclusion>
+ <artifactId>jackson-databind</artifactId>
+ <groupId>com.fasterxml.jackson.core</groupId>
+ </exclusion>
+ <exclusion>
+ <artifactId>stax2-api</artifactId>
+ <groupId>org.codehaus.woodstox</groupId>
+ </exclusion>
+ <exclusion>
+ <artifactId>woodstox-core</artifactId>
+ <groupId>com.fasterxml.woodstox</groupId>
+ </exclusion>
+ <exclusion>
+ <artifactId>hadoop-annotations</artifactId>
+ <groupId>org.apache.hadoop</groupId>
+ </exclusion>
+ </exclusions>
+ </dependency>
+ </dependencies>
<distributionManagement>
<snapshotRepository>
<id>ossrh</id>
@@ -127,7 +309,7 @@ </snapshotRepository>
</distributionManagement>
<properties>
- <seaweedfs.client.version>1.2.8</seaweedfs.client.version>
+ <seaweedfs.client.version>1.5.2</seaweedfs.client.version>
<hadoop.version>3.1.1</hadoop.version>
</properties>
</project>
diff --git a/other/java/hdfs3/pom.xml b/other/java/hdfs3/pom.xml index 6ca210f78..9e668bba8 100644 --- a/other/java/hdfs3/pom.xml +++ b/other/java/hdfs3/pom.xml @@ -5,7 +5,7 @@ <modelVersion>4.0.0</modelVersion> <properties> - <seaweedfs.client.version>1.2.8</seaweedfs.client.version> + <seaweedfs.client.version>1.5.2</seaweedfs.client.version> <hadoop.version>3.1.1</hadoop.version> </properties> @@ -31,8 +31,8 @@ <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-compiler-plugin</artifactId> <configuration> - <source>7</source> - <target>7</target> + <source>8</source> + <target>8</target> </configuration> </plugin> <plugin> @@ -147,6 +147,7 @@ <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-client</artifactId> <version>${hadoop.version}</version> + <scope>provided</scope> </dependency> <dependency> <groupId>com.github.chrislusf</groupId> @@ -157,6 +158,7 @@ <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-common</artifactId> <version>${hadoop.version}</version> + <scope>provided</scope> </dependency> </dependencies> diff --git a/other/java/hdfs3/src/main/java/seaweed/hdfs/ReadBuffer.java b/other/java/hdfs3/src/main/java/seaweed/hdfs/ReadBuffer.java deleted file mode 100644 index 926d0b83b..000000000 --- a/other/java/hdfs3/src/main/java/seaweed/hdfs/ReadBuffer.java +++ /dev/null @@ -1,137 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package seaweed.hdfs; - -import java.util.concurrent.CountDownLatch; - -class ReadBuffer { - - private SeaweedInputStream stream; - private long offset; // offset within the file for the buffer - private int length; // actual length, set after the buffer is filles - private int requestedLength; // requested length of the read - private byte[] buffer; // the buffer itself - private int bufferindex = -1; // index in the buffers array in Buffer manager - private ReadBufferStatus status; // status of the buffer - private CountDownLatch latch = null; // signaled when the buffer is done reading, so any client - // waiting on this buffer gets unblocked - - // fields to help with eviction logic - private long timeStamp = 0; // tick at which buffer became available to read - private boolean isFirstByteConsumed = false; - private boolean isLastByteConsumed = false; - private boolean isAnyByteConsumed = false; - - public SeaweedInputStream getStream() { - return stream; - } - - public void setStream(SeaweedInputStream stream) { - this.stream = stream; - } - - public long getOffset() { - return offset; - } - - public void setOffset(long offset) { - this.offset = offset; - } - - public int getLength() { - return length; - } - - public void setLength(int length) { - this.length = length; - } - - public int getRequestedLength() { - return requestedLength; - } - - public void setRequestedLength(int requestedLength) { - this.requestedLength = requestedLength; - } - - public byte[] getBuffer() { - return buffer; - } - - public void setBuffer(byte[] buffer) { - this.buffer = buffer; - } - - public int getBufferindex() { - return bufferindex; - } - - public void setBufferindex(int bufferindex) { - this.bufferindex = bufferindex; - } - - public ReadBufferStatus getStatus() { - return status; - } - - public void setStatus(ReadBufferStatus status) { - this.status = status; - } - - public CountDownLatch getLatch() { - return latch; - } - - public void setLatch(CountDownLatch latch) { - this.latch = latch; - } - - public long getTimeStamp() { - return timeStamp; - } - - public void setTimeStamp(long timeStamp) { - this.timeStamp = timeStamp; - } - - public boolean isFirstByteConsumed() { - return isFirstByteConsumed; - } - - public void setFirstByteConsumed(boolean isFirstByteConsumed) { - this.isFirstByteConsumed = isFirstByteConsumed; - } - - public boolean isLastByteConsumed() { - return isLastByteConsumed; - } - - public void setLastByteConsumed(boolean isLastByteConsumed) { - this.isLastByteConsumed = isLastByteConsumed; - } - - public boolean isAnyByteConsumed() { - return isAnyByteConsumed; - } - - public void setAnyByteConsumed(boolean isAnyByteConsumed) { - this.isAnyByteConsumed = isAnyByteConsumed; - } - -} diff --git a/other/java/hdfs3/src/main/java/seaweed/hdfs/ReadBufferManager.java b/other/java/hdfs3/src/main/java/seaweed/hdfs/ReadBufferManager.java deleted file mode 100644 index 5b1e21529..000000000 --- a/other/java/hdfs3/src/main/java/seaweed/hdfs/ReadBufferManager.java +++ /dev/null @@ -1,394 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * <p> - * http://www.apache.org/licenses/LICENSE-2.0 - * <p> - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package seaweed.hdfs; - -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.util.Collection; -import java.util.LinkedList; -import java.util.Queue; -import java.util.Stack; -import java.util.concurrent.CountDownLatch; - -/** - * The Read Buffer Manager for Rest AbfsClient. - */ -final class ReadBufferManager { - private static final Logger LOGGER = LoggerFactory.getLogger(ReadBufferManager.class); - - private static final int NUM_BUFFERS = 16; - private static final int BLOCK_SIZE = 4 * 1024 * 1024; - private static final int NUM_THREADS = 8; - private static final int THRESHOLD_AGE_MILLISECONDS = 3000; // have to see if 3 seconds is a good threshold - - private Thread[] threads = new Thread[NUM_THREADS]; - private byte[][] buffers; // array of byte[] buffers, to hold the data that is read - private Stack<Integer> freeList = new Stack<>(); // indices in buffers[] array that are available - - private Queue<ReadBuffer> readAheadQueue = new LinkedList<>(); // queue of requests that are not picked up by any worker thread yet - private LinkedList<ReadBuffer> inProgressList = new LinkedList<>(); // requests being processed by worker threads - private LinkedList<ReadBuffer> completedReadList = new LinkedList<>(); // buffers available for reading - private static final ReadBufferManager BUFFER_MANAGER; // singleton, initialized in static initialization block - - static { - BUFFER_MANAGER = new ReadBufferManager(); - BUFFER_MANAGER.init(); - } - - static ReadBufferManager getBufferManager() { - return BUFFER_MANAGER; - } - - private void init() { - buffers = new byte[NUM_BUFFERS][]; - for (int i = 0; i < NUM_BUFFERS; i++) { - buffers[i] = new byte[BLOCK_SIZE]; // same buffers are reused. The byte array never goes back to GC - freeList.add(i); - } - for (int i = 0; i < NUM_THREADS; i++) { - Thread t = new Thread(new ReadBufferWorker(i)); - t.setDaemon(true); - threads[i] = t; - t.setName("SeaweedFS-prefetch-" + i); - t.start(); - } - ReadBufferWorker.UNLEASH_WORKERS.countDown(); - } - - // hide instance constructor - private ReadBufferManager() { - } - - - /* - * - * SeaweedInputStream-facing methods - * - */ - - - /** - * {@link SeaweedInputStream} calls this method to queue read-aheads. - * - * @param stream The {@link SeaweedInputStream} for which to do the read-ahead - * @param requestedOffset The offset in the file which shoukd be read - * @param requestedLength The length to read - */ - void queueReadAhead(final SeaweedInputStream stream, final long requestedOffset, final int requestedLength) { - if (LOGGER.isTraceEnabled()) { - LOGGER.trace("Start Queueing readAhead for {} offset {} length {}", - stream.getPath(), requestedOffset, requestedLength); - } - ReadBuffer buffer; - synchronized (this) { - if (isAlreadyQueued(stream, requestedOffset)) { - return; // already queued, do not queue again - } - if (freeList.isEmpty() && !tryEvict()) { - return; // no buffers available, cannot queue anything - } - - buffer = new ReadBuffer(); - buffer.setStream(stream); - buffer.setOffset(requestedOffset); - buffer.setLength(0); - buffer.setRequestedLength(requestedLength); - buffer.setStatus(ReadBufferStatus.NOT_AVAILABLE); - buffer.setLatch(new CountDownLatch(1)); - - Integer bufferIndex = freeList.pop(); // will return a value, since we have checked size > 0 already - - buffer.setBuffer(buffers[bufferIndex]); - buffer.setBufferindex(bufferIndex); - readAheadQueue.add(buffer); - notifyAll(); - } - if (LOGGER.isTraceEnabled()) { - LOGGER.trace("Done q-ing readAhead for file {} offset {} buffer idx {}", - stream.getPath(), requestedOffset, buffer.getBufferindex()); - } - } - - - /** - * {@link SeaweedInputStream} calls this method read any bytes already available in a buffer (thereby saving a - * remote read). This returns the bytes if the data already exists in buffer. If there is a buffer that is reading - * the requested offset, then this method blocks until that read completes. If the data is queued in a read-ahead - * but not picked up by a worker thread yet, then it cancels that read-ahead and reports cache miss. This is because - * depending on worker thread availability, the read-ahead may take a while - the calling thread can do it's own - * read to get the data faster (copmared to the read waiting in queue for an indeterminate amount of time). - * - * @param stream the file to read bytes for - * @param position the offset in the file to do a read for - * @param length the length to read - * @param buffer the buffer to read data into. Note that the buffer will be written into from offset 0. - * @return the number of bytes read - */ - int getBlock(final SeaweedInputStream stream, final long position, final int length, final byte[] buffer) { - // not synchronized, so have to be careful with locking - if (LOGGER.isTraceEnabled()) { - LOGGER.trace("getBlock for file {} position {} thread {}", - stream.getPath(), position, Thread.currentThread().getName()); - } - - waitForProcess(stream, position); - - int bytesRead = 0; - synchronized (this) { - bytesRead = getBlockFromCompletedQueue(stream, position, length, buffer); - } - if (bytesRead > 0) { - if (LOGGER.isTraceEnabled()) { - LOGGER.trace("Done read from Cache for {} position {} length {}", - stream.getPath(), position, bytesRead); - } - return bytesRead; - } - - // otherwise, just say we got nothing - calling thread can do its own read - return 0; - } - - /* - * - * Internal methods - * - */ - - private void waitForProcess(final SeaweedInputStream stream, final long position) { - ReadBuffer readBuf; - synchronized (this) { - clearFromReadAheadQueue(stream, position); - readBuf = getFromList(inProgressList, stream, position); - } - if (readBuf != null) { // if in in-progress queue, then block for it - try { - if (LOGGER.isTraceEnabled()) { - LOGGER.trace("got a relevant read buffer for file {} offset {} buffer idx {}", - stream.getPath(), readBuf.getOffset(), readBuf.getBufferindex()); - } - readBuf.getLatch().await(); // blocking wait on the caller stream's thread - // Note on correctness: readBuf gets out of inProgressList only in 1 place: after worker thread - // is done processing it (in doneReading). There, the latch is set after removing the buffer from - // inProgressList. So this latch is safe to be outside the synchronized block. - // Putting it in synchronized would result in a deadlock, since this thread would be holding the lock - // while waiting, so no one will be able to change any state. If this becomes more complex in the future, - // then the latch cane be removed and replaced with wait/notify whenever inProgressList is touched. - } catch (InterruptedException ex) { - Thread.currentThread().interrupt(); - } - if (LOGGER.isTraceEnabled()) { - LOGGER.trace("latch done for file {} buffer idx {} length {}", - stream.getPath(), readBuf.getBufferindex(), readBuf.getLength()); - } - } - } - - /** - * If any buffer in the completedlist can be reclaimed then reclaim it and return the buffer to free list. - * The objective is to find just one buffer - there is no advantage to evicting more than one. - * - * @return whether the eviction succeeeded - i.e., were we able to free up one buffer - */ - private synchronized boolean tryEvict() { - ReadBuffer nodeToEvict = null; - if (completedReadList.size() <= 0) { - return false; // there are no evict-able buffers - } - - // first, try buffers where all bytes have been consumed (approximated as first and last bytes consumed) - for (ReadBuffer buf : completedReadList) { - if (buf.isFirstByteConsumed() && buf.isLastByteConsumed()) { - nodeToEvict = buf; - break; - } - } - if (nodeToEvict != null) { - return evict(nodeToEvict); - } - - // next, try buffers where any bytes have been consumed (may be a bad idea? have to experiment and see) - for (ReadBuffer buf : completedReadList) { - if (buf.isAnyByteConsumed()) { - nodeToEvict = buf; - break; - } - } - - if (nodeToEvict != null) { - return evict(nodeToEvict); - } - - // next, try any old nodes that have not been consumed - long earliestBirthday = Long.MAX_VALUE; - for (ReadBuffer buf : completedReadList) { - if (buf.getTimeStamp() < earliestBirthday) { - nodeToEvict = buf; - earliestBirthday = buf.getTimeStamp(); - } - } - if ((currentTimeMillis() - earliestBirthday > THRESHOLD_AGE_MILLISECONDS) && (nodeToEvict != null)) { - return evict(nodeToEvict); - } - - // nothing can be evicted - return false; - } - - private boolean evict(final ReadBuffer buf) { - freeList.push(buf.getBufferindex()); - completedReadList.remove(buf); - if (LOGGER.isTraceEnabled()) { - LOGGER.trace("Evicting buffer idx {}; was used for file {} offset {} length {}", - buf.getBufferindex(), buf.getStream().getPath(), buf.getOffset(), buf.getLength()); - } - return true; - } - - private boolean isAlreadyQueued(final SeaweedInputStream stream, final long requestedOffset) { - // returns true if any part of the buffer is already queued - return (isInList(readAheadQueue, stream, requestedOffset) - || isInList(inProgressList, stream, requestedOffset) - || isInList(completedReadList, stream, requestedOffset)); - } - - private boolean isInList(final Collection<ReadBuffer> list, final SeaweedInputStream stream, final long requestedOffset) { - return (getFromList(list, stream, requestedOffset) != null); - } - - private ReadBuffer getFromList(final Collection<ReadBuffer> list, final SeaweedInputStream stream, final long requestedOffset) { - for (ReadBuffer buffer : list) { - if (buffer.getStream() == stream) { - if (buffer.getStatus() == ReadBufferStatus.AVAILABLE - && requestedOffset >= buffer.getOffset() - && requestedOffset < buffer.getOffset() + buffer.getLength()) { - return buffer; - } else if (requestedOffset >= buffer.getOffset() - && requestedOffset < buffer.getOffset() + buffer.getRequestedLength()) { - return buffer; - } - } - } - return null; - } - - private void clearFromReadAheadQueue(final SeaweedInputStream stream, final long requestedOffset) { - ReadBuffer buffer = getFromList(readAheadQueue, stream, requestedOffset); - if (buffer != null) { - readAheadQueue.remove(buffer); - notifyAll(); // lock is held in calling method - freeList.push(buffer.getBufferindex()); - } - } - - private int getBlockFromCompletedQueue(final SeaweedInputStream stream, final long position, final int length, - final byte[] buffer) { - ReadBuffer buf = getFromList(completedReadList, stream, position); - if (buf == null || position >= buf.getOffset() + buf.getLength()) { - return 0; - } - int cursor = (int) (position - buf.getOffset()); - int availableLengthInBuffer = buf.getLength() - cursor; - int lengthToCopy = Math.min(length, availableLengthInBuffer); - System.arraycopy(buf.getBuffer(), cursor, buffer, 0, lengthToCopy); - if (cursor == 0) { - buf.setFirstByteConsumed(true); - } - if (cursor + lengthToCopy == buf.getLength()) { - buf.setLastByteConsumed(true); - } - buf.setAnyByteConsumed(true); - return lengthToCopy; - } - - /* - * - * ReadBufferWorker-thread-facing methods - * - */ - - /** - * ReadBufferWorker thread calls this to get the next buffer that it should work on. - * - * @return {@link ReadBuffer} - * @throws InterruptedException if thread is interrupted - */ - ReadBuffer getNextBlockToRead() throws InterruptedException { - ReadBuffer buffer = null; - synchronized (this) { - //buffer = readAheadQueue.take(); // blocking method - while (readAheadQueue.size() == 0) { - wait(); - } - buffer = readAheadQueue.remove(); - notifyAll(); - if (buffer == null) { - return null; // should never happen - } - buffer.setStatus(ReadBufferStatus.READING_IN_PROGRESS); - inProgressList.add(buffer); - } - if (LOGGER.isTraceEnabled()) { - LOGGER.trace("ReadBufferWorker picked file {} for offset {}", - buffer.getStream().getPath(), buffer.getOffset()); - } - return buffer; - } - - /** - * ReadBufferWorker thread calls this method to post completion. - * - * @param buffer the buffer whose read was completed - * @param result the {@link ReadBufferStatus} after the read operation in the worker thread - * @param bytesActuallyRead the number of bytes that the worker thread was actually able to read - */ - void doneReading(final ReadBuffer buffer, final ReadBufferStatus result, final int bytesActuallyRead) { - if (LOGGER.isTraceEnabled()) { - LOGGER.trace("ReadBufferWorker completed file {} for offset {} bytes {}", - buffer.getStream().getPath(), buffer.getOffset(), bytesActuallyRead); - } - synchronized (this) { - inProgressList.remove(buffer); - if (result == ReadBufferStatus.AVAILABLE && bytesActuallyRead > 0) { - buffer.setStatus(ReadBufferStatus.AVAILABLE); - buffer.setTimeStamp(currentTimeMillis()); - buffer.setLength(bytesActuallyRead); - completedReadList.add(buffer); - } else { - freeList.push(buffer.getBufferindex()); - // buffer should go out of scope after the end of the calling method in ReadBufferWorker, and eligible for GC - } - } - //outside the synchronized, since anyone receiving a wake-up from the latch must see safe-published results - buffer.getLatch().countDown(); // wake up waiting threads (if any) - } - - /** - * Similar to System.currentTimeMillis, except implemented with System.nanoTime(). - * System.currentTimeMillis can go backwards when system clock is changed (e.g., with NTP time synchronization), - * making it unsuitable for measuring time intervals. nanotime is strictly monotonically increasing per CPU core. - * Note: it is not monotonic across Sockets, and even within a CPU, its only the - * more recent parts which share a clock across all cores. - * - * @return current time in milliseconds - */ - private long currentTimeMillis() { - return System.nanoTime() / 1000 / 1000; - } -} diff --git a/other/java/hdfs3/src/main/java/seaweed/hdfs/ReadBufferWorker.java b/other/java/hdfs3/src/main/java/seaweed/hdfs/ReadBufferWorker.java deleted file mode 100644 index 6ffbc4644..000000000 --- a/other/java/hdfs3/src/main/java/seaweed/hdfs/ReadBufferWorker.java +++ /dev/null @@ -1,70 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package seaweed.hdfs; - -import java.util.concurrent.CountDownLatch; - -class ReadBufferWorker implements Runnable { - - protected static final CountDownLatch UNLEASH_WORKERS = new CountDownLatch(1); - private int id; - - ReadBufferWorker(final int id) { - this.id = id; - } - - /** - * return the ID of ReadBufferWorker. - */ - public int getId() { - return this.id; - } - - /** - * Waits until a buffer becomes available in ReadAheadQueue. - * Once a buffer becomes available, reads the file specified in it and then posts results back to buffer manager. - * Rinse and repeat. Forever. - */ - public void run() { - try { - UNLEASH_WORKERS.await(); - } catch (InterruptedException ex) { - Thread.currentThread().interrupt(); - } - ReadBufferManager bufferManager = ReadBufferManager.getBufferManager(); - ReadBuffer buffer; - while (true) { - try { - buffer = bufferManager.getNextBlockToRead(); // blocks, until a buffer is available for this thread - } catch (InterruptedException ex) { - Thread.currentThread().interrupt(); - return; - } - if (buffer != null) { - try { - // do the actual read, from the file. - int bytesRead = buffer.getStream().readRemote(buffer.getOffset(), buffer.getBuffer(), 0, buffer.getRequestedLength()); - bufferManager.doneReading(buffer, ReadBufferStatus.AVAILABLE, bytesRead); // post result back to ReadBufferManager - } catch (Exception ex) { - bufferManager.doneReading(buffer, ReadBufferStatus.READ_FAILED, 0); - } - } - } - } -} diff --git a/other/java/hdfs2/src/main/java/seaweed/hdfs/ReadBufferStatus.java b/other/java/hdfs3/src/main/java/seaweed/hdfs/SeaweedAbstractFileSystem.java index d63674977..e021401aa 100644 --- a/other/java/hdfs2/src/main/java/seaweed/hdfs/ReadBufferStatus.java +++ b/other/java/hdfs3/src/main/java/seaweed/hdfs/SeaweedAbstractFileSystem.java @@ -18,12 +18,18 @@ package seaweed.hdfs; -/** - * The ReadBufferStatus for Rest AbfsClient - */ -public enum ReadBufferStatus { - NOT_AVAILABLE, // buffers sitting in readaheadqueue have this stats - READING_IN_PROGRESS, // reading is in progress on this buffer. Buffer should be in inProgressList - AVAILABLE, // data is available in buffer. It should be in completedList - READ_FAILED // read completed, but failed. +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.DelegateToFileSystem; + +import java.io.IOException; +import java.net.URI; +import java.net.URISyntaxException; + +public class SeaweedAbstractFileSystem extends DelegateToFileSystem { + + SeaweedAbstractFileSystem(final URI uri, final Configuration conf) + throws IOException, URISyntaxException { + super(uri, new SeaweedFileSystem(), conf, "seaweedfs", false); + } + } diff --git a/other/java/hdfs3/src/main/java/seaweed/hdfs/SeaweedFileSystem.java b/other/java/hdfs3/src/main/java/seaweed/hdfs/SeaweedFileSystem.java index c12da8261..ca67c3874 100644 --- a/other/java/hdfs3/src/main/java/seaweed/hdfs/SeaweedFileSystem.java +++ b/other/java/hdfs3/src/main/java/seaweed/hdfs/SeaweedFileSystem.java @@ -5,31 +5,29 @@ import org.apache.hadoop.fs.*; import org.apache.hadoop.fs.permission.AclEntry; import org.apache.hadoop.fs.permission.AclStatus; import org.apache.hadoop.fs.permission.FsPermission; -import org.apache.hadoop.security.AccessControlException; import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.util.Progressable; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import seaweedfs.client.FilerProto; import java.io.FileNotFoundException; import java.io.IOException; -import java.io.InputStream; import java.io.OutputStream; import java.net.URI; import java.util.EnumSet; import java.util.List; import java.util.Map; -import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.IO_FILE_BUFFER_SIZE_KEY; - public class SeaweedFileSystem extends FileSystem { - public static final int FS_SEAWEED_DEFAULT_PORT = 8888; public static final String FS_SEAWEED_FILER_HOST = "fs.seaweed.filer.host"; public static final String FS_SEAWEED_FILER_PORT = "fs.seaweed.filer.port"; + public static final int FS_SEAWEED_DEFAULT_PORT = 8888; + public static final String FS_SEAWEED_BUFFER_SIZE = "fs.seaweed.buffer.size"; + public static final int FS_SEAWEED_DEFAULT_BUFFER_SIZE = 4 * 1024 * 1024; private static final Logger LOG = LoggerFactory.getLogger(SeaweedFileSystem.class); - private static int BUFFER_SIZE = 16 * 1024 * 1024; private URI uri; private Path workingDirectory = new Path("/"); @@ -60,12 +58,10 @@ public class SeaweedFileSystem extends FileSystem { port = (port == -1) ? FS_SEAWEED_DEFAULT_PORT : port; conf.setInt(FS_SEAWEED_FILER_PORT, port); - conf.setInt(IO_FILE_BUFFER_SIZE_KEY, BUFFER_SIZE); - setConf(conf); this.uri = uri; - seaweedFileSystemStore = new SeaweedFileSystemStore(host, port); + seaweedFileSystemStore = new SeaweedFileSystemStore(host, port, conf); } @@ -77,8 +73,9 @@ public class SeaweedFileSystem extends FileSystem { path = qualify(path); try { - InputStream inputStream = seaweedFileSystemStore.openFileForRead(path, statistics, bufferSize); - return new FSDataInputStream(inputStream); + int seaweedBufferSize = this.getConf().getInt(FS_SEAWEED_BUFFER_SIZE, FS_SEAWEED_DEFAULT_BUFFER_SIZE); + FSInputStream inputStream = seaweedFileSystemStore.openFileForRead(path, statistics); + return new FSDataInputStream(new BufferedFSInputStream(inputStream, 4 * seaweedBufferSize)); } catch (Exception ex) { LOG.warn("open path: {} bufferSize:{}", path, bufferSize, ex); return null; @@ -95,7 +92,8 @@ public class SeaweedFileSystem extends FileSystem { try { String replicaPlacement = String.format("%03d", replication - 1); - OutputStream outputStream = seaweedFileSystemStore.createFile(path, overwrite, permission, bufferSize, replicaPlacement); + int seaweedBufferSize = this.getConf().getInt(FS_SEAWEED_BUFFER_SIZE, FS_SEAWEED_DEFAULT_BUFFER_SIZE); + OutputStream outputStream = seaweedFileSystemStore.createFile(path, overwrite, permission, seaweedBufferSize, replicaPlacement); return new FSDataOutputStream(outputStream, statistics); } catch (Exception ex) { LOG.warn("create path: {} bufferSize:{} blockSize:{}", path, bufferSize, blockSize, ex); @@ -105,8 +103,9 @@ public class SeaweedFileSystem extends FileSystem { /** * {@inheritDoc} + * * @throws FileNotFoundException if the parent directory is not present -or - * is not a directory. + * is not a directory. */ @Override public FSDataOutputStream createNonRecursive(Path path, @@ -123,9 +122,10 @@ public class SeaweedFileSystem extends FileSystem { throw new FileAlreadyExistsException("Not a directory: " + parent); } } + int seaweedBufferSize = this.getConf().getInt(FS_SEAWEED_BUFFER_SIZE, FS_SEAWEED_DEFAULT_BUFFER_SIZE); return create(path, permission, flags.contains(CreateFlag.OVERWRITE), bufferSize, - replication, blockSize, progress); + replication, seaweedBufferSize, progress); } @Override @@ -135,7 +135,8 @@ public class SeaweedFileSystem extends FileSystem { path = qualify(path); try { - OutputStream outputStream = seaweedFileSystemStore.createFile(path, false, null, bufferSize, ""); + int seaweedBufferSize = this.getConf().getInt(FS_SEAWEED_BUFFER_SIZE, FS_SEAWEED_DEFAULT_BUFFER_SIZE); + OutputStream outputStream = seaweedFileSystemStore.createFile(path, false, null, seaweedBufferSize, ""); return new FSDataOutputStream(outputStream, statistics); } catch (Exception ex) { LOG.warn("append path: {} bufferSize:{}", path, bufferSize, ex); @@ -144,7 +145,7 @@ public class SeaweedFileSystem extends FileSystem { } @Override - public boolean rename(Path src, Path dst) { + public boolean rename(Path src, Path dst) throws IOException { LOG.debug("rename path: {} => {}", src, dst); @@ -155,12 +156,13 @@ public class SeaweedFileSystem extends FileSystem { if (src.equals(dst)) { return true; } - FileStatus dstFileStatus = getFileStatus(dst); + FilerProto.Entry entry = seaweedFileSystemStore.lookupEntry(dst); - String sourceFileName = src.getName(); Path adjustedDst = dst; - if (dstFileStatus != null) { + if (entry != null) { + FileStatus dstFileStatus = getFileStatus(dst); + String sourceFileName = src.getName(); if (!dstFileStatus.isDirectory()) { return false; } @@ -175,18 +177,20 @@ public class SeaweedFileSystem extends FileSystem { } @Override - public boolean delete(Path path, boolean recursive) { + public boolean delete(Path path, boolean recursive) throws IOException { LOG.debug("delete path: {} recursive:{}", path, recursive); path = qualify(path); - FileStatus fileStatus = getFileStatus(path); + FilerProto.Entry entry = seaweedFileSystemStore.lookupEntry(path); - if (fileStatus == null) { + if (entry == null) { return true; } + FileStatus fileStatus = getFileStatus(path); + return seaweedFileSystemStore.deleteEntries(path, fileStatus.isDirectory(), recursive); } @@ -222,9 +226,9 @@ public class SeaweedFileSystem extends FileSystem { path = qualify(path); - FileStatus fileStatus = getFileStatus(path); + FilerProto.Entry entry = seaweedFileSystemStore.lookupEntry(path); - if (fileStatus == null) { + if (entry == null) { UserGroupInformation currentUser = UserGroupInformation.getCurrentUser(); return seaweedFileSystemStore.createDirectory(path, currentUser, @@ -233,6 +237,8 @@ public class SeaweedFileSystem extends FileSystem { } + FileStatus fileStatus = getFileStatus(path); + if (fileStatus.isDirectory()) { return true; } else { @@ -241,7 +247,7 @@ public class SeaweedFileSystem extends FileSystem { } @Override - public FileStatus getFileStatus(Path path) { + public FileStatus getFileStatus(Path path) throws IOException { LOG.debug("getFileStatus path: {}", path); @@ -335,9 +341,7 @@ public class SeaweedFileSystem extends FileSystem { @Override public void createSymlink(final Path target, final Path link, - final boolean createParent) throws AccessControlException, - FileAlreadyExistsException, FileNotFoundException, - ParentNotDirectoryException, UnsupportedFileSystemException, + final boolean createParent) throws IOException { // Supporting filesystems should override this method throw new UnsupportedOperationException( diff --git a/other/java/hdfs3/src/main/java/seaweed/hdfs/SeaweedFileSystemStore.java b/other/java/hdfs3/src/main/java/seaweed/hdfs/SeaweedFileSystemStore.java index 9617a38be..23556a578 100644 --- a/other/java/hdfs3/src/main/java/seaweed/hdfs/SeaweedFileSystemStore.java +++ b/other/java/hdfs3/src/main/java/seaweed/hdfs/SeaweedFileSystemStore.java @@ -1,5 +1,7 @@ package seaweed.hdfs; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FSInputStream; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; @@ -7,30 +9,31 @@ import org.apache.hadoop.fs.permission.FsPermission; import org.apache.hadoop.security.UserGroupInformation; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import seaweedfs.client.FilerClient; -import seaweedfs.client.FilerGrpcClient; -import seaweedfs.client.FilerProto; -import seaweedfs.client.SeaweedRead; +import seaweedfs.client.*; import java.io.FileNotFoundException; import java.io.IOException; -import java.io.InputStream; import java.io.OutputStream; import java.util.ArrayList; import java.util.Arrays; import java.util.List; +import static seaweed.hdfs.SeaweedFileSystem.FS_SEAWEED_BUFFER_SIZE; +import static seaweed.hdfs.SeaweedFileSystem.FS_SEAWEED_DEFAULT_BUFFER_SIZE; + public class SeaweedFileSystemStore { private static final Logger LOG = LoggerFactory.getLogger(SeaweedFileSystemStore.class); private FilerGrpcClient filerGrpcClient; private FilerClient filerClient; + private Configuration conf; - public SeaweedFileSystemStore(String host, int port) { + public SeaweedFileSystemStore(String host, int port, Configuration conf) { int grpcPort = 10000 + port; filerGrpcClient = new FilerGrpcClient(host, grpcPort); filerClient = new FilerClient(filerGrpcClient); + this.conf = conf; } public static String getParentDirectory(Path path) { @@ -61,7 +64,7 @@ public class SeaweedFileSystemStore { ); } - public FileStatus[] listEntries(final Path path) { + public FileStatus[] listEntries(final Path path) throws IOException { LOG.debug("listEntries path: {}", path); FileStatus pathStatus = getFileStatus(path); @@ -89,11 +92,11 @@ public class SeaweedFileSystemStore { } - public FileStatus getFileStatus(final Path path) { + public FileStatus getFileStatus(final Path path) throws IOException { FilerProto.Entry entry = lookupEntry(path); if (entry == null) { - return null; + throw new FileNotFoundException("File does not exist: " + path); } LOG.debug("doGetFileStatus path:{} entry:{}", path, entry); @@ -123,10 +126,10 @@ public class SeaweedFileSystemStore { private FileStatus doGetFileStatus(Path path, FilerProto.Entry entry) { FilerProto.FuseAttributes attributes = entry.getAttributes(); - long length = SeaweedRead.totalSize(entry.getChunksList()); + long length = SeaweedRead.fileSize(entry); boolean isDir = entry.getIsDirectory(); int block_replication = 1; - int blocksize = 512; + int blocksize = this.conf.getInt(FS_SEAWEED_BUFFER_SIZE, FS_SEAWEED_DEFAULT_BUFFER_SIZE); long modification_time = attributes.getMtime() * 1000; // milliseconds long access_time = 0; FsPermission permission = FsPermission.createImmutable((short) attributes.getFileMode()); @@ -136,7 +139,7 @@ public class SeaweedFileSystemStore { modification_time, access_time, permission, owner, group, null, path); } - private FilerProto.Entry lookupEntry(Path path) { + public FilerProto.Entry lookupEntry(Path path) { return filerClient.lookupEntry(getParentDirectory(path), path.getName()); @@ -184,7 +187,7 @@ public class SeaweedFileSystemStore { entry.mergeFrom(existingEntry); entry.getAttributesBuilder().setMtime(now); LOG.debug("createFile merged entry path:{} entry:{} from:{}", path, entry, existingEntry); - writePosition = SeaweedRead.totalSize(existingEntry.getChunksList()); + writePosition = SeaweedRead.fileSize(existingEntry); replication = existingEntry.getAttributes().getReplication(); } } @@ -201,18 +204,17 @@ public class SeaweedFileSystemStore { .clearGroupName() .addAllGroupName(Arrays.asList(userGroupInformation.getGroupNames())) ); + SeaweedWrite.writeMeta(filerGrpcClient, getParentDirectory(path), entry); } return new SeaweedOutputStream(filerGrpcClient, path, entry, writePosition, bufferSize, replication); } - public InputStream openFileForRead(final Path path, FileSystem.Statistics statistics, - int bufferSize) throws IOException { + public FSInputStream openFileForRead(final Path path, FileSystem.Statistics statistics) throws IOException { - LOG.debug("openFileForRead path:{} bufferSize:{}", path, bufferSize); + LOG.debug("openFileForRead path:{}", path); - int readAheadQueueDepth = 2; FilerProto.Entry entry = lookupEntry(path); if (entry == null) { @@ -222,9 +224,7 @@ public class SeaweedFileSystemStore { return new SeaweedInputStream(filerGrpcClient, statistics, path.toUri().getPath(), - entry, - bufferSize, - readAheadQueueDepth); + entry); } public void setOwner(Path path, String owner, String group) { diff --git a/other/java/hdfs3/src/main/java/seaweed/hdfs/SeaweedInputStream.java b/other/java/hdfs3/src/main/java/seaweed/hdfs/SeaweedInputStream.java index 90c14c772..8bda2e092 100644 --- a/other/java/hdfs3/src/main/java/seaweed/hdfs/SeaweedInputStream.java +++ b/other/java/hdfs3/src/main/java/seaweed/hdfs/SeaweedInputStream.java @@ -2,7 +2,6 @@ package seaweed.hdfs; // based on org.apache.hadoop.fs.azurebfs.services.AbfsInputStream -import com.google.common.base.Preconditions; import org.apache.hadoop.fs.FSExceptionMessages; import org.apache.hadoop.fs.FSInputStream; import org.apache.hadoop.fs.FileSystem.Statistics; @@ -26,36 +25,23 @@ public class SeaweedInputStream extends FSInputStream { private final FilerProto.Entry entry; private final List<SeaweedRead.VisibleInterval> visibleIntervalList; private final long contentLength; - private final int bufferSize; // default buffer size - private final int readAheadQueueDepth; // initialized in constructor - private final boolean readAheadEnabled; // whether enable readAhead; - private byte[] buffer = null; // will be initialized on first use + private long position = 0; // cursor of the file - private long fCursor = 0; // cursor of buffer within file - offset of next byte to read from remote server - private long fCursorAfterLastRead = -1; - private int bCursor = 0; // cursor of read within buffer - offset of next byte to be returned from buffer - private int limit = 0; // offset of next byte to be read into buffer from service (i.e., upper marker+1 - // of valid bytes in buffer) private boolean closed = false; public SeaweedInputStream( - final FilerGrpcClient filerGrpcClient, - final Statistics statistics, - final String path, - final FilerProto.Entry entry, - final int bufferSize, - final int readAheadQueueDepth) { + final FilerGrpcClient filerGrpcClient, + final Statistics statistics, + final String path, + final FilerProto.Entry entry) throws IOException { this.filerGrpcClient = filerGrpcClient; this.statistics = statistics; this.path = path; this.entry = entry; - this.contentLength = SeaweedRead.totalSize(entry.getChunksList()); - this.bufferSize = bufferSize; - this.readAheadQueueDepth = (readAheadQueueDepth >= 0) ? readAheadQueueDepth : Runtime.getRuntime().availableProcessors(); - this.readAheadEnabled = true; + this.contentLength = SeaweedRead.fileSize(entry); - this.visibleIntervalList = SeaweedRead.nonOverlappingVisibleIntervals(entry.getChunksList()); + this.visibleIntervalList = SeaweedRead.nonOverlappingVisibleIntervals(filerGrpcClient, entry.getChunksList()); LOG.debug("new path:{} entry:{} visibleIntervalList:{}", path, entry, visibleIntervalList); @@ -78,122 +64,7 @@ public class SeaweedInputStream extends FSInputStream { @Override public synchronized int read(final byte[] b, final int off, final int len) throws IOException { - int currentOff = off; - int currentLen = len; - int lastReadBytes; - int totalReadBytes = 0; - do { - lastReadBytes = readOneBlock(b, currentOff, currentLen); - if (lastReadBytes > 0) { - currentOff += lastReadBytes; - currentLen -= lastReadBytes; - totalReadBytes += lastReadBytes; - } - if (currentLen <= 0 || currentLen > b.length - currentOff) { - break; - } - } while (lastReadBytes > 0); - return totalReadBytes > 0 ? totalReadBytes : lastReadBytes; - } - - private int readOneBlock(final byte[] b, final int off, final int len) throws IOException { - if (closed) { - throw new IOException(FSExceptionMessages.STREAM_IS_CLOSED); - } - - Preconditions.checkNotNull(b); - - if (len == 0) { - return 0; - } - - if (this.available() == 0) { - return -1; - } - - if (off < 0 || len < 0 || len > b.length - off) { - throw new IndexOutOfBoundsException(); - } - - //If buffer is empty, then fill the buffer. - if (bCursor == limit) { - //If EOF, then return -1 - if (fCursor >= contentLength) { - return -1; - } - - long bytesRead = 0; - //reset buffer to initial state - i.e., throw away existing data - bCursor = 0; - limit = 0; - if (buffer == null) { - buffer = new byte[bufferSize]; - } - - // Enable readAhead when reading sequentially - if (-1 == fCursorAfterLastRead || fCursorAfterLastRead == fCursor || b.length >= bufferSize) { - bytesRead = readInternal(fCursor, buffer, 0, bufferSize, false); - } else { - bytesRead = readInternal(fCursor, buffer, 0, b.length, true); - } - - if (bytesRead == -1) { - return -1; - } - limit += bytesRead; - fCursor += bytesRead; - fCursorAfterLastRead = fCursor; - } - - //If there is anything in the buffer, then return lesser of (requested bytes) and (bytes in buffer) - //(bytes returned may be less than requested) - int bytesRemaining = limit - bCursor; - int bytesToRead = Math.min(len, bytesRemaining); - System.arraycopy(buffer, bCursor, b, off, bytesToRead); - bCursor += bytesToRead; - if (statistics != null) { - statistics.incrementBytesRead(bytesToRead); - } - return bytesToRead; - } - - - private int readInternal(final long position, final byte[] b, final int offset, final int length, - final boolean bypassReadAhead) throws IOException { - if (readAheadEnabled && !bypassReadAhead) { - // try reading from read-ahead - if (offset != 0) { - throw new IllegalArgumentException("readahead buffers cannot have non-zero buffer offsets"); - } - int receivedBytes; - - // queue read-aheads - int numReadAheads = this.readAheadQueueDepth; - long nextSize; - long nextOffset = position; - while (numReadAheads > 0 && nextOffset < contentLength) { - nextSize = Math.min((long) bufferSize, contentLength - nextOffset); - ReadBufferManager.getBufferManager().queueReadAhead(this, nextOffset, (int) nextSize); - nextOffset = nextOffset + nextSize; - numReadAheads--; - } - - // try reading from buffers first - receivedBytes = ReadBufferManager.getBufferManager().getBlock(this, position, length, b); - if (receivedBytes > 0) { - return receivedBytes; - } - - // got nothing from read-ahead, do our own read now - receivedBytes = readRemote(position, b, offset, length); - return receivedBytes; - } else { - return readRemote(position, b, offset, length); - } - } - - int readRemote(long position, byte[] b, int offset, int length) throws IOException { if (position < 0) { throw new IllegalArgumentException("attempting to read from negative offset"); } @@ -203,21 +74,30 @@ public class SeaweedInputStream extends FSInputStream { if (b == null) { throw new IllegalArgumentException("null byte array passed in to read() method"); } - if (offset >= b.length) { + if (off >= b.length) { throw new IllegalArgumentException("offset greater than length of array"); } - if (length < 0) { + if (len < 0) { throw new IllegalArgumentException("requested read length is less than zero"); } - if (length > (b.length - offset)) { + if (len > (b.length - off)) { throw new IllegalArgumentException("requested read length is more than will fit after requested offset in buffer"); } - long bytesRead = SeaweedRead.read(filerGrpcClient, visibleIntervalList, position, b, offset, length); + long bytesRead = SeaweedRead.read(this.filerGrpcClient, this.visibleIntervalList, this.position, b, off, len, SeaweedRead.fileSize(entry)); if (bytesRead > Integer.MAX_VALUE) { throw new IOException("Unexpected Content-Length"); } + + if (bytesRead > 0) { + this.position += bytesRead; + if (statistics != null) { + statistics.incrementBytesRead(bytesRead); + } + } + return (int) bytesRead; + } /** @@ -239,17 +119,8 @@ public class SeaweedInputStream extends FSInputStream { throw new EOFException(FSExceptionMessages.CANNOT_SEEK_PAST_EOF); } - if (n >= fCursor - limit && n <= fCursor) { // within buffer - bCursor = (int) (n - (fCursor - limit)); - return; - } - - // next read will read from here - fCursor = n; + this.position = n; - //invalidate buffer - limit = 0; - bCursor = 0; } @Override @@ -257,20 +128,19 @@ public class SeaweedInputStream extends FSInputStream { if (closed) { throw new IOException(FSExceptionMessages.STREAM_IS_CLOSED); } - long currentPos = getPos(); - if (currentPos == contentLength) { + if (this.position == contentLength) { if (n > 0) { throw new EOFException(FSExceptionMessages.CANNOT_SEEK_PAST_EOF); } } - long newPos = currentPos + n; + long newPos = this.position + n; if (newPos < 0) { newPos = 0; - n = newPos - currentPos; + n = newPos - this.position; } if (newPos > contentLength) { newPos = contentLength; - n = newPos - currentPos; + n = newPos - this.position; } seek(newPos); return n; @@ -289,11 +159,11 @@ public class SeaweedInputStream extends FSInputStream { public synchronized int available() throws IOException { if (closed) { throw new IOException( - FSExceptionMessages.STREAM_IS_CLOSED); + FSExceptionMessages.STREAM_IS_CLOSED); } final long remaining = this.contentLength - this.getPos(); return remaining <= Integer.MAX_VALUE - ? (int) remaining : Integer.MAX_VALUE; + ? (int) remaining : Integer.MAX_VALUE; } /** @@ -321,7 +191,7 @@ public class SeaweedInputStream extends FSInputStream { if (closed) { throw new IOException(FSExceptionMessages.STREAM_IS_CLOSED); } - return fCursor - limit + bCursor; + return position; } /** @@ -338,7 +208,6 @@ public class SeaweedInputStream extends FSInputStream { @Override public synchronized void close() throws IOException { closed = true; - buffer = null; // de-reference the buffer so it can be GC'ed sooner } /** diff --git a/other/java/hdfs3/src/main/java/seaweed/hdfs/SeaweedOutputStream.java b/other/java/hdfs3/src/main/java/seaweed/hdfs/SeaweedOutputStream.java index 4f307ff96..d4c967a06 100644 --- a/other/java/hdfs3/src/main/java/seaweed/hdfs/SeaweedOutputStream.java +++ b/other/java/hdfs3/src/main/java/seaweed/hdfs/SeaweedOutputStream.java @@ -9,6 +9,7 @@ import org.apache.hadoop.fs.StreamCapabilities; import org.apache.hadoop.fs.Syncable; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import seaweedfs.client.ByteBufferPool; import seaweedfs.client.FilerGrpcClient; import seaweedfs.client.FilerProto; import seaweedfs.client.SeaweedWrite; @@ -16,14 +17,9 @@ import seaweedfs.client.SeaweedWrite; import java.io.IOException; import java.io.InterruptedIOException; import java.io.OutputStream; +import java.nio.ByteBuffer; import java.util.Locale; -import java.util.concurrent.Callable; -import java.util.concurrent.ConcurrentLinkedDeque; -import java.util.concurrent.ExecutorCompletionService; -import java.util.concurrent.Future; -import java.util.concurrent.LinkedBlockingQueue; -import java.util.concurrent.ThreadPoolExecutor; -import java.util.concurrent.TimeUnit; +import java.util.concurrent.*; import static seaweed.hdfs.SeaweedFileSystemStore.getParentDirectory; @@ -37,16 +33,16 @@ public class SeaweedOutputStream extends OutputStream implements Syncable, Strea private final int maxConcurrentRequestCount; private final ThreadPoolExecutor threadExecutor; private final ExecutorCompletionService<Void> completionService; - private FilerProto.Entry.Builder entry; + private final FilerProto.Entry.Builder entry; + private final boolean supportFlush = false; // true; + private final ConcurrentLinkedDeque<WriteOperation> writeOperations; private long position; private boolean closed; - private boolean supportFlush = true; private volatile IOException lastError; private long lastFlushOffset; private long lastTotalAppendOffset = 0; - private byte[] buffer; - private int bufferIndex; - private ConcurrentLinkedDeque<WriteOperation> writeOperations; + private ByteBuffer buffer; + private long outputIndex; private String replication = "000"; public SeaweedOutputStream(FilerGrpcClient filerGrpcClient, final Path path, FilerProto.Entry.Builder entry, @@ -59,18 +55,18 @@ public class SeaweedOutputStream extends OutputStream implements Syncable, Strea this.lastError = null; this.lastFlushOffset = 0; this.bufferSize = bufferSize; - this.buffer = new byte[bufferSize]; - this.bufferIndex = 0; + this.buffer = ByteBufferPool.request(bufferSize); + this.outputIndex = 0; this.writeOperations = new ConcurrentLinkedDeque<>(); - this.maxConcurrentRequestCount = 4 * Runtime.getRuntime().availableProcessors(); + this.maxConcurrentRequestCount = Runtime.getRuntime().availableProcessors(); this.threadExecutor - = new ThreadPoolExecutor(maxConcurrentRequestCount, - maxConcurrentRequestCount, - 10L, - TimeUnit.SECONDS, - new LinkedBlockingQueue<Runnable>()); + = new ThreadPoolExecutor(maxConcurrentRequestCount, + maxConcurrentRequestCount, + 120L, + TimeUnit.SECONDS, + new LinkedBlockingQueue<Runnable>()); this.completionService = new ExecutorCompletionService<>(this.threadExecutor); this.entry = entry; @@ -78,9 +74,6 @@ public class SeaweedOutputStream extends OutputStream implements Syncable, Strea } private synchronized void flushWrittenBytesToServiceInternal(final long offset) throws IOException { - - LOG.debug("SeaweedWrite.writeMeta path: {} entry:{}", path, entry); - try { SeaweedWrite.writeMeta(filerGrpcClient, getParentDirectory(path), entry); } catch (Exception ex) { @@ -96,7 +89,7 @@ public class SeaweedOutputStream extends OutputStream implements Syncable, Strea @Override public synchronized void write(final byte[] data, final int off, final int length) - throws IOException { + throws IOException { maybeThrowLastError(); Preconditions.checkArgument(data != null, "null data"); @@ -105,25 +98,29 @@ public class SeaweedOutputStream extends OutputStream implements Syncable, Strea throw new IndexOutOfBoundsException(); } + // System.out.println(path + " write [" + (outputIndex + off) + "," + ((outputIndex + off) + length) + ")"); + int currentOffset = off; - int writableBytes = bufferSize - bufferIndex; + int writableBytes = bufferSize - buffer.position(); int numberOfBytesToWrite = length; while (numberOfBytesToWrite > 0) { - if (writableBytes <= numberOfBytesToWrite) { - System.arraycopy(data, currentOffset, buffer, bufferIndex, writableBytes); - bufferIndex += writableBytes; - writeCurrentBufferToService(); - currentOffset += writableBytes; - numberOfBytesToWrite = numberOfBytesToWrite - writableBytes; - } else { - System.arraycopy(data, currentOffset, buffer, bufferIndex, numberOfBytesToWrite); - bufferIndex += numberOfBytesToWrite; - numberOfBytesToWrite = 0; + + if (numberOfBytesToWrite < writableBytes) { + buffer.put(data, currentOffset, numberOfBytesToWrite); + outputIndex += numberOfBytesToWrite; + break; } - writableBytes = bufferSize - bufferIndex; + // System.out.println(path + " [" + (outputIndex + currentOffset) + "," + ((outputIndex + currentOffset) + writableBytes) + ") " + buffer.capacity()); + buffer.put(data, currentOffset, writableBytes); + outputIndex += writableBytes; + currentOffset += writableBytes; + writeCurrentBufferToService(); + numberOfBytesToWrite = numberOfBytesToWrite - writableBytes; + writableBytes = bufferSize - buffer.position(); } + } /** @@ -202,8 +199,9 @@ public class SeaweedOutputStream extends OutputStream implements Syncable, Strea threadExecutor.shutdown(); } finally { lastError = new IOException(FSExceptionMessages.STREAM_IS_CLOSED); + ByteBufferPool.release(buffer); buffer = null; - bufferIndex = 0; + outputIndex = 0; closed = true; writeOperations.clear(); if (!threadExecutor.isShutdown()) { @@ -213,35 +211,39 @@ public class SeaweedOutputStream extends OutputStream implements Syncable, Strea } private synchronized void writeCurrentBufferToService() throws IOException { - if (bufferIndex == 0) { + if (buffer.position() == 0) { return; } - final byte[] bytes = buffer; - final int bytesLength = bufferIndex; + position += submitWriteBufferToService(buffer, position); + + buffer = ByteBufferPool.request(bufferSize); - buffer = new byte[bufferSize]; - bufferIndex = 0; - final long offset = position; - position += bytesLength; + } - if (threadExecutor.getQueue().size() >= maxConcurrentRequestCount * 2) { + private synchronized int submitWriteBufferToService(final ByteBuffer bufferToWrite, final long writePosition) throws IOException { + + bufferToWrite.flip(); + int bytesLength = bufferToWrite.limit() - bufferToWrite.position(); + + if (threadExecutor.getQueue().size() >= maxConcurrentRequestCount) { waitForTaskToComplete(); } - - final Future<Void> job = completionService.submit(new Callable<Void>() { - @Override - public Void call() throws Exception { - // originally: client.append(path, offset, bytes, 0, bytesLength); - SeaweedWrite.writeData(entry, replication, filerGrpcClient, offset, bytes, 0, bytesLength); - return null; - } + final Future<Void> job = completionService.submit(() -> { + // System.out.println(path + " is going to save [" + (writePosition) + "," + ((writePosition) + bytesLength) + ")"); + SeaweedWrite.writeData(entry, replication, filerGrpcClient, writePosition, bufferToWrite.array(), bufferToWrite.position(), bufferToWrite.limit(), path.toUri().getPath()); + // System.out.println(path + " saved [" + (writePosition) + "," + ((writePosition) + bytesLength) + ")"); + ByteBufferPool.release(bufferToWrite); + return null; }); - writeOperations.add(new WriteOperation(job, offset, bytesLength)); + writeOperations.add(new WriteOperation(job, writePosition, bytesLength)); // Try to shrink the queue shrinkWriteOperationQueue(); + + return bytesLength; + } private void waitForTaskToComplete() throws IOException { diff --git a/other/java/s3copier/pom.xml b/other/java/s3copier/pom.xml index f8cb9e91c..c3ff30932 100644 --- a/other/java/s3copier/pom.xml +++ b/other/java/s3copier/pom.xml @@ -28,7 +28,7 @@ <dependency> <groupId>junit</groupId> <artifactId>junit</artifactId> - <version>3.8.1</version> + <version>4.13.1</version> <scope>test</scope> </dependency> </dependencies> |
