diff options
Diffstat (limited to 'other/java/client')
19 files changed, 1940 insertions, 188 deletions
diff --git a/other/java/client/pom.xml b/other/java/client/pom.xml index 0c585a941..f4e522a3e 100644 --- a/other/java/client/pom.xml +++ b/other/java/client/pom.xml @@ -1,10 +1,11 @@ <?xml version="1.0" encoding="UTF-8"?> -<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> +<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" + xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> <groupId>com.github.chrislusf</groupId> <artifactId>seaweedfs-client</artifactId> - <version>1.2.4</version> + <version>1.6.4</version> <parent> <groupId>org.sonatype.oss</groupId> @@ -16,7 +17,7 @@ <protobuf.version>3.9.1</protobuf.version> <!-- follow https://github.com/grpc/grpc-java --> <grpc.version>1.23.0</grpc.version> - <guava.version>28.0-jre</guava.version> + <guava.version>30.0-jre</guava.version> </properties> <dependencies> @@ -64,9 +65,14 @@ <dependency> <groupId>junit</groupId> <artifactId>junit</artifactId> - <version>4.12</version> + <version>4.13.1</version> <scope>test</scope> </dependency> + <dependency> + <groupId>javax.annotation</groupId> + <artifactId>javax.annotation-api</artifactId> + <version>1.3.2</version> + </dependency> </dependencies> <distributionManagement> @@ -88,8 +94,8 @@ <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-compiler-plugin</artifactId> <configuration> - <source>7</source> - <target>7</target> + <source>8</source> + <target>8</target> </configuration> </plugin> <plugin> @@ -97,9 +103,11 @@ <artifactId>protobuf-maven-plugin</artifactId> <version>0.6.1</version> <configuration> - <protocArtifact>com.google.protobuf:protoc:${protobuf.version}:exe:${os.detected.classifier}</protocArtifact> + <protocArtifact>com.google.protobuf:protoc:${protobuf.version}:exe:${os.detected.classifier} + </protocArtifact> <pluginId>grpc-java</pluginId> - <pluginArtifact>io.grpc:protoc-gen-grpc-java:${grpc.version}:exe:${os.detected.classifier}</pluginArtifact> + <pluginArtifact>io.grpc:protoc-gen-grpc-java:${grpc.version}:exe:${os.detected.classifier} + </pluginArtifact> </configuration> <executions> <execution> diff --git a/other/java/client/pom.xml.deploy b/other/java/client/pom.xml.deploy new file mode 100644 index 000000000..9c8c4f45e --- /dev/null +++ b/other/java/client/pom.xml.deploy @@ -0,0 +1,170 @@ +<?xml version="1.0" encoding="UTF-8"?> +<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" + xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> + <modelVersion>4.0.0</modelVersion> + + <groupId>com.github.chrislusf</groupId> + <artifactId>seaweedfs-client</artifactId> + <version>1.6.4</version> + + <parent> + <groupId>org.sonatype.oss</groupId> + <artifactId>oss-parent</artifactId> + <version>9</version> + </parent> + + <properties> + <protobuf.version>3.9.1</protobuf.version> + <!-- follow https://github.com/grpc/grpc-java --> + <grpc.version>1.23.0</grpc.version> + <guava.version>28.0-jre</guava.version> + </properties> + + <dependencies> + <dependency> + <groupId>com.moandjiezana.toml</groupId> + <artifactId>toml4j</artifactId> + <version>0.7.2</version> + </dependency> + <!-- https://mvnrepository.com/artifact/com.google.protobuf/protobuf-java --> + <dependency> + <groupId>com.google.protobuf</groupId> + <artifactId>protobuf-java</artifactId> + <version>${protobuf.version}</version> + </dependency> + <dependency> + <groupId>com.google.guava</groupId> + <artifactId>guava</artifactId> + <version>${guava.version}</version> + </dependency> + <dependency> + <groupId>io.grpc</groupId> + <artifactId>grpc-netty-shaded</artifactId> + <version>${grpc.version}</version> + </dependency> + <dependency> + <groupId>io.grpc</groupId> + <artifactId>grpc-protobuf</artifactId> + <version>${grpc.version}</version> + </dependency> + <dependency> + <groupId>io.grpc</groupId> + <artifactId>grpc-stub</artifactId> + <version>${grpc.version}</version> + </dependency> + <dependency> + <groupId>org.slf4j</groupId> + <artifactId>slf4j-api</artifactId> + <version>1.7.25</version> + </dependency> + <dependency> + <groupId>org.apache.httpcomponents</groupId> + <artifactId>httpmime</artifactId> + <version>4.5.6</version> + </dependency> + <dependency> + <groupId>junit</groupId> + <artifactId>junit</artifactId> + <version>4.12</version> + <scope>test</scope> + </dependency> + </dependencies> + + <distributionManagement> + <snapshotRepository> + <id>ossrh</id> + <url>https://oss.sonatype.org/content/repositories/snapshots</url> + </snapshotRepository> + </distributionManagement> + <build> + <extensions> + <extension> + <groupId>kr.motd.maven</groupId> + <artifactId>os-maven-plugin</artifactId> + <version>1.6.2</version> + </extension> + </extensions> + <plugins> + <plugin> + <groupId>org.apache.maven.plugins</groupId> + <artifactId>maven-compiler-plugin</artifactId> + <configuration> + <source>8</source> + <target>8</target> + </configuration> + </plugin> + <plugin> + <groupId>org.xolstice.maven.plugins</groupId> + <artifactId>protobuf-maven-plugin</artifactId> + <version>0.6.1</version> + <configuration> + <protocArtifact>com.google.protobuf:protoc:${protobuf.version}:exe:${os.detected.classifier} + </protocArtifact> + <pluginId>grpc-java</pluginId> + <pluginArtifact>io.grpc:protoc-gen-grpc-java:${grpc.version}:exe:${os.detected.classifier} + </pluginArtifact> + </configuration> + <executions> + <execution> + <goals> + <goal>compile</goal> + <goal>compile-custom</goal> + </goals> + </execution> + </executions> + </plugin> + <plugin> + <groupId>org.apache.maven.plugins</groupId> + <artifactId>maven-gpg-plugin</artifactId> + <version>1.5</version> + <executions> + <execution> + <id>sign-artifacts</id> + <phase>verify</phase> + <goals> + <goal>sign</goal> + </goals> + </execution> + </executions> + </plugin> + <plugin> + <groupId>org.sonatype.plugins</groupId> + <artifactId>nexus-staging-maven-plugin</artifactId> + <version>1.6.7</version> + <extensions>true</extensions> + <configuration> + <serverId>ossrh</serverId> + <nexusUrl>https://oss.sonatype.org/</nexusUrl> + <autoReleaseAfterClose>true</autoReleaseAfterClose> + </configuration> + </plugin> + <plugin> + <groupId>org.apache.maven.plugins</groupId> + <artifactId>maven-source-plugin</artifactId> + <version>2.2.1</version> + <executions> + <execution> + <id>attach-sources</id> + <goals> + <goal>jar-no-fork</goal> + </goals> + </execution> + </executions> + </plugin> + <plugin> + <groupId>org.apache.maven.plugins</groupId> + <artifactId>maven-javadoc-plugin</artifactId> + <version>2.9.1</version> + <executions> + <execution> + <id>attach-javadocs</id> + <goals> + <goal>jar</goal> + </goals> + </execution> + </executions> + </plugin> + </plugins> + </build> + +</project> diff --git a/other/java/client/pom_debug.xml b/other/java/client/pom_debug.xml new file mode 100644 index 000000000..12ea860c2 --- /dev/null +++ b/other/java/client/pom_debug.xml @@ -0,0 +1,144 @@ +<?xml version="1.0" encoding="UTF-8"?> +<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" + xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> + <modelVersion>4.0.0</modelVersion> + + <groupId>com.github.chrislusf</groupId> + <artifactId>seaweedfs-client</artifactId> + <version>1.6.4</version> + + <parent> + <groupId>org.sonatype.oss</groupId> + <artifactId>oss-parent</artifactId> + <version>9</version> + </parent> + + <properties> + <protobuf.version>3.9.1</protobuf.version> + <!-- follow https://github.com/grpc/grpc-java --> + <grpc.version>1.23.0</grpc.version> + <guava.version>28.0-jre</guava.version> + </properties> + + <dependencies> + <dependency> + <groupId>com.moandjiezana.toml</groupId> + <artifactId>toml4j</artifactId> + <version>0.7.2</version> + </dependency> + <!-- https://mvnrepository.com/artifact/com.google.protobuf/protobuf-java --> + <dependency> + <groupId>com.google.protobuf</groupId> + <artifactId>protobuf-java</artifactId> + <version>${protobuf.version}</version> + </dependency> + <dependency> + <groupId>com.google.guava</groupId> + <artifactId>guava</artifactId> + <version>${guava.version}</version> + </dependency> + <dependency> + <groupId>io.grpc</groupId> + <artifactId>grpc-netty-shaded</artifactId> + <version>${grpc.version}</version> + </dependency> + <dependency> + <groupId>io.grpc</groupId> + <artifactId>grpc-protobuf</artifactId> + <version>${grpc.version}</version> + </dependency> + <dependency> + <groupId>io.grpc</groupId> + <artifactId>grpc-stub</artifactId> + <version>${grpc.version}</version> + </dependency> + <dependency> + <groupId>org.slf4j</groupId> + <artifactId>slf4j-api</artifactId> + <version>1.7.25</version> + </dependency> + <dependency> + <groupId>org.apache.httpcomponents</groupId> + <artifactId>httpmime</artifactId> + <version>4.5.6</version> + </dependency> + <dependency> + <groupId>junit</groupId> + <artifactId>junit</artifactId> + <version>4.13.1</version> + <scope>test</scope> + </dependency> + <dependency> + <groupId>javax.annotation</groupId> + <artifactId>javax.annotation-api</artifactId> + <version>1.3.2</version> + </dependency> + </dependencies> + + <build> + <extensions> + <extension> + <groupId>kr.motd.maven</groupId> + <artifactId>os-maven-plugin</artifactId> + <version>1.6.2</version> + </extension> + </extensions> + <plugins> + <plugin> + <groupId>org.apache.maven.plugins</groupId> + <artifactId>maven-compiler-plugin</artifactId> + <configuration> + <source>8</source> + <target>8</target> + </configuration> + </plugin> + <plugin> + <groupId>org.xolstice.maven.plugins</groupId> + <artifactId>protobuf-maven-plugin</artifactId> + <version>0.6.1</version> + <configuration> + <protocArtifact>com.google.protobuf:protoc:${protobuf.version}:exe:${os.detected.classifier} + </protocArtifact> + <pluginId>grpc-java</pluginId> + <pluginArtifact>io.grpc:protoc-gen-grpc-java:${grpc.version}:exe:${os.detected.classifier} + </pluginArtifact> + </configuration> + <executions> + <execution> + <goals> + <goal>compile</goal> + <goal>compile-custom</goal> + </goals> + </execution> + </executions> + </plugin> + <plugin> + <groupId>org.apache.maven.plugins</groupId> + <artifactId>maven-source-plugin</artifactId> + <version>2.2.1</version> + <executions> + <execution> + <id>attach-sources</id> + <goals> + <goal>jar-no-fork</goal> + </goals> + </execution> + </executions> + </plugin> + <plugin> + <groupId>org.apache.maven.plugins</groupId> + <artifactId>maven-javadoc-plugin</artifactId> + <version>2.9.1</version> + <executions> + <execution> + <id>attach-javadocs</id> + <goals> + <goal>jar</goal> + </goals> + </execution> + </executions> + </plugin> + </plugins> + </build> + +</project> diff --git a/other/java/client/src/main/java/seaweedfs/client/ByteBufferPool.java b/other/java/client/src/main/java/seaweedfs/client/ByteBufferPool.java new file mode 100644 index 000000000..51053becd --- /dev/null +++ b/other/java/client/src/main/java/seaweedfs/client/ByteBufferPool.java @@ -0,0 +1,42 @@ +package seaweedfs.client; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.nio.Buffer; +import java.nio.ByteBuffer; +import java.util.ArrayList; +import java.util.List; + +public class ByteBufferPool { + + private static final int MIN_BUFFER_SIZE = 8 * 1024 * 1024; + private static final Logger LOG = LoggerFactory.getLogger(ByteBufferPool.class); + + private static final List<ByteBuffer> bufferList = new ArrayList<>(); + + public static synchronized ByteBuffer request(int bufferSize) { + if (bufferSize < MIN_BUFFER_SIZE) { + bufferSize = MIN_BUFFER_SIZE; + } + LOG.debug("requested new buffer {}", bufferSize); + if (bufferList.isEmpty()) { + return ByteBuffer.allocate(bufferSize); + } + ByteBuffer buffer = bufferList.remove(bufferList.size() - 1); + if (buffer.capacity() >= bufferSize) { + return buffer; + } + + LOG.info("add new buffer from {} to {}", buffer.capacity(), bufferSize); + bufferList.add(0, buffer); + return ByteBuffer.allocate(bufferSize); + + } + + public static synchronized void release(ByteBuffer obj) { + ((Buffer)obj).clear(); + bufferList.add(0, obj); + } + +} diff --git a/other/java/client/src/main/java/seaweedfs/client/ChunkCache.java b/other/java/client/src/main/java/seaweedfs/client/ChunkCache.java new file mode 100644 index 000000000..58870d742 --- /dev/null +++ b/other/java/client/src/main/java/seaweedfs/client/ChunkCache.java @@ -0,0 +1,36 @@ +package seaweedfs.client; + +import com.google.common.cache.Cache; +import com.google.common.cache.CacheBuilder; + +import java.util.concurrent.TimeUnit; + +public class ChunkCache { + + private Cache<String, byte[]> cache = null; + + public ChunkCache(int maxEntries) { + if (maxEntries == 0) { + return; + } + this.cache = CacheBuilder.newBuilder() + .maximumSize(maxEntries) + .expireAfterAccess(1, TimeUnit.HOURS) + .build(); + } + + public byte[] getChunk(String fileId) { + if (this.cache == null) { + return null; + } + return this.cache.getIfPresent(fileId); + } + + public void setChunk(String fileId, byte[] data) { + if (this.cache == null) { + return; + } + this.cache.put(fileId, data); + } + +} diff --git a/other/java/client/src/main/java/seaweedfs/client/FileChunkManifest.java b/other/java/client/src/main/java/seaweedfs/client/FileChunkManifest.java new file mode 100644 index 000000000..9b6ba5dfc --- /dev/null +++ b/other/java/client/src/main/java/seaweedfs/client/FileChunkManifest.java @@ -0,0 +1,140 @@ +package seaweedfs.client; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; + +public class FileChunkManifest { + + private static final Logger LOG = LoggerFactory.getLogger(FileChunkManifest.class); + + private static final int mergeFactor = 1000; + + public static boolean hasChunkManifest(List<FilerProto.FileChunk> chunks) { + for (FilerProto.FileChunk chunk : chunks) { + if (chunk.getIsChunkManifest()) { + return true; + } + } + return false; + } + + public static List<FilerProto.FileChunk> resolveChunkManifest( + final FilerClient filerClient, List<FilerProto.FileChunk> chunks) throws IOException { + + List<FilerProto.FileChunk> dataChunks = new ArrayList<>(); + + for (FilerProto.FileChunk chunk : chunks) { + if (!chunk.getIsChunkManifest()) { + dataChunks.add(chunk); + continue; + } + + // IsChunkManifest + LOG.debug("fetching chunk manifest:{}", chunk); + byte[] data = fetchChunk(filerClient, chunk); + FilerProto.FileChunkManifest m = FilerProto.FileChunkManifest.newBuilder().mergeFrom(data).build(); + List<FilerProto.FileChunk> resolvedChunks = new ArrayList<>(); + for (FilerProto.FileChunk t : m.getChunksList()) { + // avoid deprecated chunk.getFileId() + resolvedChunks.add(t.toBuilder().setFileId(FilerClient.toFileId(t.getFid())).build()); + } + dataChunks.addAll(resolveChunkManifest(filerClient, resolvedChunks)); + } + + return dataChunks; + } + + private static byte[] fetchChunk(final FilerClient filerClient, FilerProto.FileChunk chunk) throws IOException { + + String vid = "" + chunk.getFid().getVolumeId(); + FilerProto.Locations locations = filerClient.vidLocations.get(vid); + if (locations == null) { + FilerProto.LookupVolumeRequest.Builder lookupRequest = FilerProto.LookupVolumeRequest.newBuilder(); + lookupRequest.addVolumeIds(vid); + FilerProto.LookupVolumeResponse lookupResponse = filerClient + .getBlockingStub().lookupVolume(lookupRequest.build()); + locations = lookupResponse.getLocationsMapMap().get(vid); + filerClient.vidLocations.put(vid, locations); + LOG.debug("fetchChunk vid:{} locations:{}", vid, locations); + } + + SeaweedRead.ChunkView chunkView = new SeaweedRead.ChunkView( + FilerClient.toFileId(chunk.getFid()), // avoid deprecated chunk.getFileId() + 0, + -1, + 0, + true, + chunk.getCipherKey().toByteArray(), + chunk.getIsCompressed()); + + byte[] chunkData = SeaweedRead.chunkCache.getChunk(chunkView.fileId); + if (chunkData == null) { + LOG.debug("doFetchFullChunkData:{}", chunkView); + chunkData = SeaweedRead.doFetchFullChunkData(filerClient, chunkView, locations); + } + if (chunk.getIsChunkManifest()){ + LOG.debug("chunk {} size {}", chunkView.fileId, chunkData.length); + SeaweedRead.chunkCache.setChunk(chunkView.fileId, chunkData); + } + + return chunkData; + + } + + public static List<FilerProto.FileChunk> maybeManifestize( + final FilerClient filerClient, List<FilerProto.FileChunk> inputChunks, String parentDirectory) throws IOException { + // the return variable + List<FilerProto.FileChunk> chunks = new ArrayList<>(); + + List<FilerProto.FileChunk> dataChunks = new ArrayList<>(); + for (FilerProto.FileChunk chunk : inputChunks) { + if (!chunk.getIsChunkManifest()) { + dataChunks.add(chunk); + } else { + chunks.add(chunk); + } + } + + int remaining = dataChunks.size(); + for (int i = 0; i + mergeFactor < dataChunks.size(); i += mergeFactor) { + FilerProto.FileChunk chunk = mergeIntoManifest(filerClient, dataChunks.subList(i, i + mergeFactor), parentDirectory); + chunks.add(chunk); + remaining -= mergeFactor; + } + + // remaining + for (int i = dataChunks.size() - remaining; i < dataChunks.size(); i++) { + chunks.add(dataChunks.get(i)); + } + return chunks; + } + + private static FilerProto.FileChunk mergeIntoManifest(final FilerClient filerClient, List<FilerProto.FileChunk> dataChunks, String parentDirectory) throws IOException { + // create and serialize the manifest + dataChunks = FilerClient.beforeEntrySerialization(dataChunks); + FilerProto.FileChunkManifest.Builder m = FilerProto.FileChunkManifest.newBuilder().addAllChunks(dataChunks); + byte[] data = m.build().toByteArray(); + + long minOffset = Long.MAX_VALUE; + long maxOffset = -1; + for (FilerProto.FileChunk chunk : dataChunks) { + minOffset = Math.min(minOffset, chunk.getOffset()); + maxOffset = Math.max(maxOffset, chunk.getSize() + chunk.getOffset()); + } + + FilerProto.FileChunk.Builder manifestChunk = SeaweedWrite.writeChunk( + filerClient.getReplication(), + filerClient, + minOffset, + data, 0, data.length, parentDirectory); + manifestChunk.setIsChunkManifest(true); + manifestChunk.setSize(maxOffset - minOffset); + return manifestChunk.build(); + + } + +} diff --git a/other/java/client/src/main/java/seaweedfs/client/FilerClient.java b/other/java/client/src/main/java/seaweedfs/client/FilerClient.java index 84aa26ad9..257a9873d 100644 --- a/other/java/client/src/main/java/seaweedfs/client/FilerClient.java +++ b/other/java/client/src/main/java/seaweedfs/client/FilerClient.java @@ -1,27 +1,82 @@ package seaweedfs.client; +import com.google.common.base.Strings; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.nio.file.Path; -import java.nio.file.Paths; +import java.io.File; import java.util.ArrayList; import java.util.Arrays; import java.util.Iterator; import java.util.List; -public class FilerClient { +public class FilerClient extends FilerGrpcClient { private static final Logger LOG = LoggerFactory.getLogger(FilerClient.class); - private FilerGrpcClient filerGrpcClient; - public FilerClient(String host, int grpcPort) { - filerGrpcClient = new FilerGrpcClient(host, grpcPort); + super(host, grpcPort); + } + + public static String toFileId(FilerProto.FileId fid) { + if (fid == null) { + return null; + } + return String.format("%d,%x%08x", fid.getVolumeId(), fid.getFileKey(), fid.getCookie()); + } + + public static FilerProto.FileId toFileIdObject(String fileIdStr) { + if (fileIdStr == null || fileIdStr.length() == 0) { + return null; + } + int commaIndex = fileIdStr.lastIndexOf(','); + String volumeIdStr = fileIdStr.substring(0, commaIndex); + String fileKeyStr = fileIdStr.substring(commaIndex + 1, fileIdStr.length() - 8); + String cookieStr = fileIdStr.substring(fileIdStr.length() - 8); + + return FilerProto.FileId.newBuilder() + .setVolumeId(Integer.parseInt(volumeIdStr)) + .setFileKey(Long.parseLong(fileKeyStr, 16)) + .setCookie((int) Long.parseLong(cookieStr, 16)) + .build(); + } + + public static List<FilerProto.FileChunk> beforeEntrySerialization(List<FilerProto.FileChunk> chunks) { + List<FilerProto.FileChunk> cleanedChunks = new ArrayList<>(); + for (FilerProto.FileChunk chunk : chunks) { + FilerProto.FileChunk.Builder chunkBuilder = chunk.toBuilder(); + chunkBuilder.clearFileId(); + chunkBuilder.clearSourceFileId(); + chunkBuilder.setFid(toFileIdObject(chunk.getFileId())); + FilerProto.FileId sourceFid = toFileIdObject(chunk.getSourceFileId()); + if (sourceFid != null) { + chunkBuilder.setSourceFid(sourceFid); + } + cleanedChunks.add(chunkBuilder.build()); + } + return cleanedChunks; } - public FilerClient(FilerGrpcClient filerGrpcClient) { - this.filerGrpcClient = filerGrpcClient; + public static FilerProto.Entry afterEntryDeserialization(FilerProto.Entry entry) { + if (entry.getChunksList().size() <= 0) { + return entry; + } + String fileId = entry.getChunks(0).getFileId(); + if (fileId != null && fileId.length() != 0) { + return entry; + } + FilerProto.Entry.Builder entryBuilder = entry.toBuilder(); + entryBuilder.clearChunks(); + for (FilerProto.FileChunk chunk : entry.getChunksList()) { + FilerProto.FileChunk.Builder chunkBuilder = chunk.toBuilder(); + chunkBuilder.setFileId(toFileId(chunk.getFid())); + String sourceFileId = toFileId(chunk.getSourceFid()); + if (sourceFileId != null) { + chunkBuilder.setSourceFileId(sourceFileId); + } + entryBuilder.addChunks(chunkBuilder); + } + return entryBuilder.build(); } public boolean mkdirs(String path, int mode) { @@ -38,9 +93,9 @@ public class FilerClient { if ("/".equals(path)) { return true; } - Path pathObject = Paths.get(path); - String parent = pathObject.getParent().toString(); - String name = pathObject.getFileName().toString(); + File pathFile = new File(path); + String parent = pathFile.getParent().replace('\\','/'); + String name = pathFile.getName(); mkdirs(parent, mode, uid, gid, userName, groupNames); @@ -59,13 +114,13 @@ public class FilerClient { public boolean mv(String oldPath, String newPath) { - Path oldPathObject = Paths.get(oldPath); - String oldParent = oldPathObject.getParent().toString(); - String oldName = oldPathObject.getFileName().toString(); + File oldPathFile = new File(oldPath); + String oldParent = oldPathFile.getParent().replace('\\','/'); + String oldName = oldPathFile.getName(); - Path newPathObject = Paths.get(newPath); - String newParent = newPathObject.getParent().toString(); - String newName = newPathObject.getFileName().toString(); + File newPathFile = new File(newPath); + String newParent = newPathFile.getParent().replace('\\','/'); + String newName = newPathFile.getName(); return atomicRenameEntry(oldParent, oldName, newParent, newName); @@ -73,9 +128,9 @@ public class FilerClient { public boolean rm(String path, boolean isRecursive, boolean ignoreRecusiveError) { - Path pathObject = Paths.get(path); - String parent = pathObject.getParent().toString(); - String name = pathObject.getFileName().toString(); + File pathFile = new File(path); + String parent = pathFile.getParent().replace('\\','/'); + String name = pathFile.getName(); return deleteEntry( parent, @@ -92,9 +147,9 @@ public class FilerClient { public boolean touch(String path, int mode, int uid, int gid, String userName, String[] groupNames) { - Path pathObject = Paths.get(path); - String parent = pathObject.getParent().toString(); - String name = pathObject.getFileName().toString(); + File pathFile = new File(path); + String parent = pathFile.getParent().replace('\\','/'); + String name = pathFile.getName(); FilerProto.Entry entry = lookupEntry(parent, name); if (entry == null) { @@ -156,7 +211,7 @@ public class FilerClient { List<FilerProto.Entry> results = new ArrayList<FilerProto.Entry>(); String lastFileName = ""; for (int limit = Integer.MAX_VALUE; limit > 0; ) { - List<FilerProto.Entry> t = listEntries(path, "", lastFileName, 1024); + List<FilerProto.Entry> t = listEntries(path, "", lastFileName, 1024, false); if (t == null) { break; } @@ -173,31 +228,35 @@ public class FilerClient { return results; } - public List<FilerProto.Entry> listEntries(String path, String entryPrefix, String lastEntryName, int limit) { - Iterator<FilerProto.ListEntriesResponse> iter = filerGrpcClient.getBlockingStub().listEntries(FilerProto.ListEntriesRequest.newBuilder() + public List<FilerProto.Entry> listEntries(String path, String entryPrefix, String lastEntryName, int limit, boolean includeLastEntry) { + Iterator<FilerProto.ListEntriesResponse> iter = this.getBlockingStub().listEntries(FilerProto.ListEntriesRequest.newBuilder() .setDirectory(path) .setPrefix(entryPrefix) .setStartFromFileName(lastEntryName) + .setInclusiveStartFrom(includeLastEntry) .setLimit(limit) .build()); List<FilerProto.Entry> entries = new ArrayList<>(); - while (iter.hasNext()){ + while (iter.hasNext()) { FilerProto.ListEntriesResponse resp = iter.next(); - entries.add(fixEntryAfterReading(resp.getEntry())); + entries.add(afterEntryDeserialization(resp.getEntry())); } return entries; } public FilerProto.Entry lookupEntry(String directory, String entryName) { try { - FilerProto.Entry entry = filerGrpcClient.getBlockingStub().lookupDirectoryEntry( + FilerProto.Entry entry = this.getBlockingStub().lookupDirectoryEntry( FilerProto.LookupDirectoryEntryRequest.newBuilder() .setDirectory(directory) .setName(entryName) .build()).getEntry(); - return fixEntryAfterReading(entry); + if (entry == null) { + return null; + } + return afterEntryDeserialization(entry); } catch (Exception e) { - if (e.getMessage().indexOf("filer: no entry is found in filer store")>0){ + if (e.getMessage().indexOf("filer: no entry is found in filer store") > 0) { return null; } LOG.warn("lookupEntry {}/{}: {}", directory, entryName, e); @@ -205,28 +264,32 @@ public class FilerClient { } } - public boolean createEntry(String parent, FilerProto.Entry entry) { try { - filerGrpcClient.getBlockingStub().createEntry(FilerProto.CreateEntryRequest.newBuilder() - .setDirectory(parent) - .setEntry(entry) - .build()); + FilerProto.CreateEntryResponse createEntryResponse = + this.getBlockingStub().createEntry(FilerProto.CreateEntryRequest.newBuilder() + .setDirectory(parent) + .setEntry(entry) + .build()); + if (Strings.isNullOrEmpty(createEntryResponse.getError())) { + return true; + } + LOG.warn("createEntry {}/{} error: {}", parent, entry.getName(), createEntryResponse.getError()); + return false; } catch (Exception e) { LOG.warn("createEntry {}/{}: {}", parent, entry.getName(), e); return false; } - return true; } public boolean updateEntry(String parent, FilerProto.Entry entry) { try { - filerGrpcClient.getBlockingStub().updateEntry(FilerProto.UpdateEntryRequest.newBuilder() + this.getBlockingStub().updateEntry(FilerProto.UpdateEntryRequest.newBuilder() .setDirectory(parent) .setEntry(entry) .build()); } catch (Exception e) { - LOG.warn("createEntry {}/{}: {}", parent, entry.getName(), e); + LOG.warn("updateEntry {}/{}: {}", parent, entry.getName(), e); return false; } return true; @@ -234,7 +297,7 @@ public class FilerClient { public boolean deleteEntry(String parent, String entryName, boolean isDeleteFileChunk, boolean isRecursive, boolean ignoreRecusiveError) { try { - filerGrpcClient.getBlockingStub().deleteEntry(FilerProto.DeleteEntryRequest.newBuilder() + this.getBlockingStub().deleteEntry(FilerProto.DeleteEntryRequest.newBuilder() .setDirectory(parent) .setName(entryName) .setIsDeleteData(isDeleteFileChunk) @@ -250,7 +313,7 @@ public class FilerClient { public boolean atomicRenameEntry(String oldParent, String oldName, String newParent, String newName) { try { - filerGrpcClient.getBlockingStub().atomicRenameEntry(FilerProto.AtomicRenameEntryRequest.newBuilder() + this.getBlockingStub().atomicRenameEntry(FilerProto.AtomicRenameEntryRequest.newBuilder() .setOldDirectory(oldParent) .setOldName(oldName) .setNewDirectory(newParent) @@ -263,24 +326,13 @@ public class FilerClient { return true; } - private FilerProto.Entry fixEntryAfterReading(FilerProto.Entry entry) { - if (entry.getChunksList().size() <= 0) { - return entry; - } - String fileId = entry.getChunks(0).getFileId(); - if (fileId != null && fileId.length() != 0) { - return entry; - } - FilerProto.Entry.Builder entryBuilder = entry.toBuilder(); - entryBuilder.clearChunks(); - for (FilerProto.FileChunk chunk : entry.getChunksList()) { - FilerProto.FileChunk.Builder chunkBuilder = chunk.toBuilder(); - FilerProto.FileId fid = chunk.getFid(); - fileId = String.format("%d,%d%x", fid.getVolumeId(), fid.getFileKey(), fid.getCookie()); - chunkBuilder.setFileId(fileId); - entryBuilder.addChunks(chunkBuilder); - } - return entryBuilder.build(); + public Iterator<FilerProto.SubscribeMetadataResponse> watch(String prefix, String clientName, long sinceNs) { + return this.getBlockingStub().subscribeMetadata(FilerProto.SubscribeMetadataRequest.newBuilder() + .setPathPrefix(prefix) + .setClientName(clientName) + .setSinceNs(sinceNs) + .build() + ); } } diff --git a/other/java/client/src/main/java/seaweedfs/client/FilerGrpcClient.java b/other/java/client/src/main/java/seaweedfs/client/FilerGrpcClient.java index 3626c76de..6c57e2e0d 100644 --- a/other/java/client/src/main/java/seaweedfs/client/FilerGrpcClient.java +++ b/other/java/client/src/main/java/seaweedfs/client/FilerGrpcClient.java @@ -9,17 +9,13 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import javax.net.ssl.SSLException; +import java.util.HashMap; +import java.util.Map; import java.util.concurrent.TimeUnit; public class FilerGrpcClient { private static final Logger logger = LoggerFactory.getLogger(FilerGrpcClient.class); - - private final ManagedChannel channel; - private final SeaweedFilerGrpc.SeaweedFilerBlockingStub blockingStub; - private final SeaweedFilerGrpc.SeaweedFilerStub asyncStub; - private final SeaweedFilerGrpc.SeaweedFilerFutureStub futureStub; - static SslContext sslContext; static { @@ -30,6 +26,20 @@ public class FilerGrpcClient { } } + public final int VOLUME_SERVER_ACCESS_DIRECT = 0; + public final int VOLUME_SERVER_ACCESS_PUBLIC_URL = 1; + public final int VOLUME_SERVER_ACCESS_FILER_PROXY = 2; + public final Map<String, FilerProto.Locations> vidLocations = new HashMap<>(); + private final ManagedChannel channel; + private final SeaweedFilerGrpc.SeaweedFilerBlockingStub blockingStub; + private final SeaweedFilerGrpc.SeaweedFilerStub asyncStub; + private final SeaweedFilerGrpc.SeaweedFilerFutureStub futureStub; + private boolean cipher = false; + private String collection = ""; + private String replication = ""; + private int volumeServerAccess = VOLUME_SERVER_ACCESS_DIRECT; + private String filerAddress; + public FilerGrpcClient(String host, int grpcPort) { this(host, grpcPort, sslContext); } @@ -37,20 +47,43 @@ public class FilerGrpcClient { public FilerGrpcClient(String host, int grpcPort, SslContext sslContext) { this(sslContext == null ? - ManagedChannelBuilder.forAddress(host, grpcPort).usePlaintext() : + ManagedChannelBuilder.forAddress(host, grpcPort).usePlaintext() + .maxInboundMessageSize(1024 * 1024 * 1024) : NettyChannelBuilder.forAddress(host, grpcPort) + .maxInboundMessageSize(1024 * 1024 * 1024) .negotiationType(NegotiationType.TLS) .sslContext(sslContext)); + filerAddress = String.format("%s:%d", host, grpcPort - 10000); + + FilerProto.GetFilerConfigurationResponse filerConfigurationResponse = + this.getBlockingStub().getFilerConfiguration( + FilerProto.GetFilerConfigurationRequest.newBuilder().build()); + cipher = filerConfigurationResponse.getCipher(); + collection = filerConfigurationResponse.getCollection(); + replication = filerConfigurationResponse.getReplication(); + } - public FilerGrpcClient(ManagedChannelBuilder<?> channelBuilder) { + private FilerGrpcClient(ManagedChannelBuilder<?> channelBuilder) { channel = channelBuilder.build(); blockingStub = SeaweedFilerGrpc.newBlockingStub(channel); asyncStub = SeaweedFilerGrpc.newStub(channel); futureStub = SeaweedFilerGrpc.newFutureStub(channel); } + public boolean isCipher() { + return cipher; + } + + public String getCollection() { + return collection; + } + + public String getReplication() { + return replication; + } + public void shutdown() throws InterruptedException { channel.shutdown().awaitTermination(5, TimeUnit.SECONDS); } @@ -67,4 +100,39 @@ public class FilerGrpcClient { return futureStub; } + public void setAccessVolumeServerDirectly() { + this.volumeServerAccess = VOLUME_SERVER_ACCESS_DIRECT; + } + + public boolean isAccessVolumeServerDirectly() { + return this.volumeServerAccess == VOLUME_SERVER_ACCESS_DIRECT; + } + + public void setAccessVolumeServerByPublicUrl() { + this.volumeServerAccess = VOLUME_SERVER_ACCESS_PUBLIC_URL; + } + + public boolean isAccessVolumeServerByPublicUrl() { + return this.volumeServerAccess == VOLUME_SERVER_ACCESS_PUBLIC_URL; + } + + public void setAccessVolumeServerByFilerProxy() { + this.volumeServerAccess = VOLUME_SERVER_ACCESS_FILER_PROXY; + } + + public boolean isAccessVolumeServerByFilerProxy() { + return this.volumeServerAccess == VOLUME_SERVER_ACCESS_FILER_PROXY; + } + + public String getChunkUrl(String chunkId, String url, String publicUrl) { + switch (this.volumeServerAccess) { + case VOLUME_SERVER_ACCESS_PUBLIC_URL: + return String.format("http://%s/%s", publicUrl, chunkId); + case VOLUME_SERVER_ACCESS_FILER_PROXY: + return String.format("http://%s/?proxyChunkId=%s", this.filerAddress, chunkId); + default: + return String.format("http://%s/%s", url, chunkId); + } + } + } diff --git a/other/java/client/src/main/java/seaweedfs/client/Gzip.java b/other/java/client/src/main/java/seaweedfs/client/Gzip.java new file mode 100644 index 000000000..4909094f5 --- /dev/null +++ b/other/java/client/src/main/java/seaweedfs/client/Gzip.java @@ -0,0 +1,41 @@ +package seaweedfs.client; + +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.io.InputStream; +import java.util.zip.GZIPInputStream; +import java.util.zip.GZIPOutputStream; + +public class Gzip { + public static byte[] compress(byte[] data) throws IOException { + ByteArrayOutputStream bos = new ByteArrayOutputStream(data.length); + GZIPOutputStream gzip = new GZIPOutputStream(bos); + gzip.write(data); + gzip.close(); + byte[] compressed = bos.toByteArray(); + bos.close(); + return compressed; + } + + public static byte[] decompress(byte[] compressed) { + try { + ByteArrayInputStream bis = new ByteArrayInputStream(compressed); + GZIPInputStream gis = new GZIPInputStream(bis); + return readAll(gis); + } catch (Exception e) { + return compressed; + } + } + + private static byte[] readAll(InputStream input) throws IOException { + try (ByteArrayOutputStream output = new ByteArrayOutputStream()) { + byte[] buffer = new byte[4096]; + int n; + while (-1 != (n = input.read(buffer))) { + output.write(buffer, 0, n); + } + return output.toByteArray(); + } + } +} diff --git a/other/java/client/src/main/java/seaweedfs/client/SeaweedCipher.java b/other/java/client/src/main/java/seaweedfs/client/SeaweedCipher.java new file mode 100644 index 000000000..8d0ebd755 --- /dev/null +++ b/other/java/client/src/main/java/seaweedfs/client/SeaweedCipher.java @@ -0,0 +1,55 @@ +package seaweedfs.client; + +import javax.crypto.Cipher; +import javax.crypto.spec.GCMParameterSpec; +import javax.crypto.spec.SecretKeySpec; +import java.security.SecureRandom; + +public class SeaweedCipher { + // AES-GCM parameters + public static final int AES_KEY_SIZE = 256; // in bits + public static final int GCM_NONCE_LENGTH = 12; // in bytes + public static final int GCM_TAG_LENGTH = 16; // in bytes + + private static SecureRandom random = new SecureRandom(); + + public static byte[] genCipherKey() throws Exception { + byte[] key = new byte[AES_KEY_SIZE / 8]; + random.nextBytes(key); + return key; + } + + public static byte[] encrypt(byte[] clearTextbytes, byte[] cipherKey) throws Exception { + return encrypt(clearTextbytes, 0, clearTextbytes.length, cipherKey); + } + + public static byte[] encrypt(byte[] clearTextbytes, int offset, int length, byte[] cipherKey) throws Exception { + + final byte[] nonce = new byte[GCM_NONCE_LENGTH]; + random.nextBytes(nonce); + GCMParameterSpec spec = new GCMParameterSpec(GCM_TAG_LENGTH * 8, nonce); + SecretKeySpec keySpec = new SecretKeySpec(cipherKey, "AES"); + + Cipher AES_cipherInstance = Cipher.getInstance("AES/GCM/NoPadding"); + AES_cipherInstance.init(Cipher.ENCRYPT_MODE, keySpec, spec); + + byte[] encryptedText = AES_cipherInstance.doFinal(clearTextbytes, offset, length); + + byte[] iv = AES_cipherInstance.getIV(); + byte[] message = new byte[GCM_NONCE_LENGTH + clearTextbytes.length + GCM_TAG_LENGTH]; + System.arraycopy(iv, 0, message, 0, GCM_NONCE_LENGTH); + System.arraycopy(encryptedText, 0, message, GCM_NONCE_LENGTH, encryptedText.length); + + return message; + } + + public static byte[] decrypt(byte[] encryptedText, byte[] cipherKey) throws Exception { + final Cipher AES_cipherInstance = Cipher.getInstance("AES/GCM/NoPadding"); + GCMParameterSpec params = new GCMParameterSpec(GCM_TAG_LENGTH * 8, encryptedText, 0, GCM_NONCE_LENGTH); + SecretKeySpec keySpec = new SecretKeySpec(cipherKey, "AES"); + AES_cipherInstance.init(Cipher.DECRYPT_MODE, keySpec, params); + byte[] decryptedText = AES_cipherInstance.doFinal(encryptedText, GCM_NONCE_LENGTH, encryptedText.length - GCM_NONCE_LENGTH); + return decryptedText; + } + +} diff --git a/other/java/client/src/main/java/seaweedfs/client/SeaweedInputStream.java b/other/java/client/src/main/java/seaweedfs/client/SeaweedInputStream.java new file mode 100644 index 000000000..4e40ce1b6 --- /dev/null +++ b/other/java/client/src/main/java/seaweedfs/client/SeaweedInputStream.java @@ -0,0 +1,208 @@ +package seaweedfs.client; + +// based on org.apache.hadoop.fs.azurebfs.services.AbfsInputStream + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.EOFException; +import java.io.IOException; +import java.io.InputStream; +import java.nio.ByteBuffer; +import java.util.List; + +public class SeaweedInputStream extends InputStream { + + private static final Logger LOG = LoggerFactory.getLogger(SeaweedInputStream.class); + private static final IOException EXCEPTION_STREAM_IS_CLOSED = new IOException("Stream is closed!"); + + private final FilerClient filerClient; + private final String path; + private final FilerProto.Entry entry; + private final List<SeaweedRead.VisibleInterval> visibleIntervalList; + private final long contentLength; + + private long position = 0; // cursor of the file + + private boolean closed = false; + + public SeaweedInputStream( + final FilerClient filerClient, + final String fullpath) throws IOException { + this.path = fullpath; + this.filerClient = filerClient; + this.entry = filerClient.lookupEntry( + SeaweedOutputStream.getParentDirectory(fullpath), + SeaweedOutputStream.getFileName(fullpath)); + this.contentLength = SeaweedRead.fileSize(entry); + + this.visibleIntervalList = SeaweedRead.nonOverlappingVisibleIntervals(filerClient, entry.getChunksList()); + + LOG.debug("new path:{} entry:{} visibleIntervalList:{}", path, entry, visibleIntervalList); + + } + + public SeaweedInputStream( + final FilerClient filerClient, + final String path, + final FilerProto.Entry entry) throws IOException { + this.filerClient = filerClient; + this.path = path; + this.entry = entry; + this.contentLength = SeaweedRead.fileSize(entry); + + this.visibleIntervalList = SeaweedRead.nonOverlappingVisibleIntervals(filerClient, entry.getChunksList()); + + LOG.debug("new path:{} entry:{} visibleIntervalList:{}", path, entry, visibleIntervalList); + + } + + public String getPath() { + return path; + } + + @Override + public int read() throws IOException { + byte[] b = new byte[1]; + int numberOfBytesRead = read(b, 0, 1); + if (numberOfBytesRead < 0) { + return -1; + } else { + return (b[0] & 0xFF); + } + } + + @Override + public int read(final byte[] b, final int off, final int len) throws IOException { + + if (b == null) { + throw new IllegalArgumentException("null byte array passed in to read() method"); + } + if (off >= b.length) { + throw new IllegalArgumentException("offset greater than length of array"); + } + if (len < 0) { + throw new IllegalArgumentException("requested read length is less than zero"); + } + if (len > (b.length - off)) { + throw new IllegalArgumentException("requested read length is more than will fit after requested offset in buffer"); + } + + ByteBuffer buf = ByteBuffer.wrap(b, off, len); + return read(buf); + + } + + // implement ByteBufferReadable + public synchronized int read(ByteBuffer buf) throws IOException { + + if (position < 0) { + throw new IllegalArgumentException("attempting to read from negative offset"); + } + if (position >= contentLength) { + return -1; // Hadoop prefers -1 to EOFException + } + + long bytesRead = 0; + int len = buf.remaining(); + int start = (int) this.position; + if (start+len <= entry.getContent().size()) { + entry.getContent().substring(start, start+len).copyTo(buf); + } else { + bytesRead = SeaweedRead.read(this.filerClient, this.visibleIntervalList, this.position, buf, SeaweedRead.fileSize(entry)); + } + + if (bytesRead > Integer.MAX_VALUE) { + throw new IOException("Unexpected Content-Length"); + } + + if (bytesRead > 0) { + this.position += bytesRead; + } + + return (int) bytesRead; + } + + public synchronized void seek(long n) throws IOException { + if (closed) { + throw EXCEPTION_STREAM_IS_CLOSED; + } + if (n < 0) { + throw new EOFException("Cannot seek to a negative offset"); + } + if (n > contentLength) { + throw new EOFException("Attempted to seek or read past the end of the file"); + } + this.position = n; + } + + @Override + public synchronized long skip(long n) throws IOException { + if (closed) { + throw EXCEPTION_STREAM_IS_CLOSED; + } + if (this.position == contentLength) { + if (n > 0) { + throw new EOFException("Attempted to seek or read past the end of the file"); + } + } + long newPos = this.position + n; + if (newPos < 0) { + newPos = 0; + n = newPos - this.position; + } + if (newPos > contentLength) { + newPos = contentLength; + n = newPos - this.position; + } + seek(newPos); + return n; + } + + /** + * Return the size of the remaining available bytes + * if the size is less than or equal to {@link Integer#MAX_VALUE}, + * otherwise, return {@link Integer#MAX_VALUE}. + * <p> + * This is to match the behavior of DFSInputStream.available(), + * which some clients may rely on (HBase write-ahead log reading in + * particular). + */ + @Override + public synchronized int available() throws IOException { + if (closed) { + throw EXCEPTION_STREAM_IS_CLOSED; + } + final long remaining = this.contentLength - this.position; + return remaining <= Integer.MAX_VALUE + ? (int) remaining : Integer.MAX_VALUE; + } + + /** + * Returns the length of the file that this stream refers to. Note that the length returned is the length + * as of the time the Stream was opened. Specifically, if there have been subsequent appends to the file, + * they wont be reflected in the returned length. + * + * @return length of the file. + * @throws IOException if the stream is closed + */ + public long length() throws IOException { + if (closed) { + throw EXCEPTION_STREAM_IS_CLOSED; + } + return contentLength; + } + + public synchronized long getPos() throws IOException { + if (closed) { + throw EXCEPTION_STREAM_IS_CLOSED; + } + return position; + } + + @Override + public synchronized void close() throws IOException { + closed = true; + } + +} diff --git a/other/java/client/src/main/java/seaweedfs/client/SeaweedOutputStream.java b/other/java/client/src/main/java/seaweedfs/client/SeaweedOutputStream.java new file mode 100644 index 000000000..ba298a713 --- /dev/null +++ b/other/java/client/src/main/java/seaweedfs/client/SeaweedOutputStream.java @@ -0,0 +1,328 @@ +package seaweedfs.client; + +// adapted from org.apache.hadoop.fs.azurebfs.services.AbfsOutputStream + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.io.InterruptedIOException; +import java.io.OutputStream; +import java.nio.Buffer; +import java.nio.ByteBuffer; +import java.util.concurrent.*; + +public class SeaweedOutputStream extends OutputStream { + + private static final Logger LOG = LoggerFactory.getLogger(SeaweedOutputStream.class); + protected final boolean supportFlush = true; + private final FilerClient filerClient; + private final String path; + private final int bufferSize; + private final int maxConcurrentRequestCount; + private final ThreadPoolExecutor threadExecutor; + private final ExecutorCompletionService<Void> completionService; + private final ConcurrentLinkedDeque<WriteOperation> writeOperations; + private final boolean shouldSaveMetadata = false; + private FilerProto.Entry.Builder entry; + private long position; + private boolean closed; + private volatile IOException lastError; + private long lastFlushOffset; + private long lastTotalAppendOffset = 0; + private ByteBuffer buffer; + private long outputIndex; + private String replication = "000"; + + public SeaweedOutputStream(FilerClient filerClient, final String fullpath) { + this(filerClient, fullpath, "000"); + } + + public SeaweedOutputStream(FilerClient filerClient, final String fullpath, final String replication) { + this(filerClient, fullpath, null, 0, 8 * 1024 * 1024, "000"); + } + + public SeaweedOutputStream(FilerClient filerClient, final String path, FilerProto.Entry.Builder entry, + final long position, final int bufferSize, final String replication) { + this.filerClient = filerClient; + this.replication = replication; + this.path = path; + this.position = position; + this.closed = false; + this.lastError = null; + this.lastFlushOffset = 0; + this.bufferSize = bufferSize; + this.buffer = ByteBufferPool.request(bufferSize); + this.outputIndex = 0; + this.writeOperations = new ConcurrentLinkedDeque<>(); + + this.maxConcurrentRequestCount = Runtime.getRuntime().availableProcessors(); + + this.threadExecutor + = new ThreadPoolExecutor(maxConcurrentRequestCount, + maxConcurrentRequestCount, + 120L, + TimeUnit.SECONDS, + new LinkedBlockingQueue<Runnable>()); + this.completionService = new ExecutorCompletionService<>(this.threadExecutor); + + this.entry = entry; + if (this.entry == null) { + long now = System.currentTimeMillis() / 1000L; + + this.entry = FilerProto.Entry.newBuilder() + .setName(getFileName(path)) + .setIsDirectory(false) + .setAttributes(FilerProto.FuseAttributes.newBuilder() + .setFileMode(0755) + .setReplication(replication) + .setCrtime(now) + .setMtime(now) + .clearGroupName() + ); + } + + } + + public static String getParentDirectory(String path) { + int protoIndex = path.indexOf("://"); + if (protoIndex >= 0) { + int pathStart = path.indexOf("/", protoIndex+3); + path = path.substring(pathStart); + } + if (path.equals("/")) { + return path; + } + int lastSlashIndex = path.lastIndexOf("/"); + if (lastSlashIndex == 0) { + return "/"; + } + return path.substring(0, lastSlashIndex); + } + + public static String getFileName(String path) { + if (path.indexOf("/") < 0) { + return path; + } + int lastSlashIndex = path.lastIndexOf("/"); + return path.substring(lastSlashIndex + 1); + } + + private synchronized void flushWrittenBytesToServiceInternal(final long offset) throws IOException { + try { + SeaweedWrite.writeMeta(filerClient, getParentDirectory(path), entry); + } catch (Exception ex) { + throw new IOException(ex); + } + this.lastFlushOffset = offset; + } + + @Override + public void write(final int byteVal) throws IOException { + write(new byte[]{(byte) (byteVal & 0xFF)}); + } + + @Override + public synchronized void write(final byte[] data, final int off, final int length) + throws IOException { + maybeThrowLastError(); + + if (data == null) { + return; + } + + if (off < 0 || length < 0 || length > data.length - off) { + throw new IndexOutOfBoundsException(); + } + + // System.out.println(path + " write [" + (outputIndex + off) + "," + ((outputIndex + off) + length) + ")"); + + int currentOffset = off; + int writableBytes = bufferSize - buffer.position(); + int numberOfBytesToWrite = length; + + while (numberOfBytesToWrite > 0) { + + if (numberOfBytesToWrite < writableBytes) { + buffer.put(data, currentOffset, numberOfBytesToWrite); + outputIndex += numberOfBytesToWrite; + break; + } + + // System.out.println(path + " [" + (outputIndex + currentOffset) + "," + ((outputIndex + currentOffset) + writableBytes) + ") " + buffer.capacity()); + buffer.put(data, currentOffset, writableBytes); + outputIndex += writableBytes; + currentOffset += writableBytes; + writeCurrentBufferToService(); + numberOfBytesToWrite = numberOfBytesToWrite - writableBytes; + writableBytes = bufferSize - buffer.position(); + } + + } + + /** + * Flushes this output stream and forces any buffered output bytes to be + * written out. If any data remains in the payload it is committed to the + * service. Data is queued for writing and forced out to the service + * before the call returns. + */ + @Override + public void flush() throws IOException { + if (supportFlush) { + flushInternalAsync(); + } + } + + /** + * Force all data in the output stream to be written to Azure storage. + * Wait to return until this is complete. Close the access to the stream and + * shutdown the upload thread pool. + * If the blob was created, its lease will be released. + * Any error encountered caught in threads and stored will be rethrown here + * after cleanup. + */ + @Override + public synchronized void close() throws IOException { + if (closed) { + return; + } + + LOG.debug("close path: {}", path); + try { + flushInternal(); + threadExecutor.shutdown(); + } finally { + lastError = new IOException("Stream is closed!"); + ByteBufferPool.release(buffer); + buffer = null; + outputIndex = 0; + closed = true; + writeOperations.clear(); + if (!threadExecutor.isShutdown()) { + threadExecutor.shutdownNow(); + } + } + + } + + private synchronized void writeCurrentBufferToService() throws IOException { + if (buffer.position() == 0) { + return; + } + + position += submitWriteBufferToService(buffer, position); + + buffer = ByteBufferPool.request(bufferSize); + + } + + private synchronized int submitWriteBufferToService(final ByteBuffer bufferToWrite, final long writePosition) throws IOException { + + ((Buffer)bufferToWrite).flip(); + int bytesLength = bufferToWrite.limit() - bufferToWrite.position(); + + if (threadExecutor.getQueue().size() >= maxConcurrentRequestCount) { + waitForTaskToComplete(); + } + final Future<Void> job = completionService.submit(() -> { + // System.out.println(path + " is going to save [" + (writePosition) + "," + ((writePosition) + bytesLength) + ")"); + SeaweedWrite.writeData(entry, replication, filerClient, writePosition, bufferToWrite.array(), bufferToWrite.position(), bufferToWrite.limit(), path); + // System.out.println(path + " saved [" + (writePosition) + "," + ((writePosition) + bytesLength) + ")"); + ByteBufferPool.release(bufferToWrite); + return null; + }); + + writeOperations.add(new WriteOperation(job, writePosition, bytesLength)); + + // Try to shrink the queue + shrinkWriteOperationQueue(); + + return bytesLength; + + } + + private void waitForTaskToComplete() throws IOException { + boolean completed; + for (completed = false; completionService.poll() != null; completed = true) { + // keep polling until there is no data + } + + if (!completed) { + try { + completionService.take(); + } catch (InterruptedException e) { + lastError = (IOException) new InterruptedIOException(e.toString()).initCause(e); + throw lastError; + } + } + } + + private void maybeThrowLastError() throws IOException { + if (lastError != null) { + throw lastError; + } + } + + /** + * Try to remove the completed write operations from the beginning of write + * operation FIFO queue. + */ + private synchronized void shrinkWriteOperationQueue() throws IOException { + try { + while (writeOperations.peek() != null && writeOperations.peek().task.isDone()) { + writeOperations.peek().task.get(); + lastTotalAppendOffset += writeOperations.peek().length; + writeOperations.remove(); + } + } catch (Exception e) { + lastError = new IOException(e); + throw lastError; + } + } + + protected synchronized void flushInternal() throws IOException { + maybeThrowLastError(); + writeCurrentBufferToService(); + flushWrittenBytesToService(); + } + + protected synchronized void flushInternalAsync() throws IOException { + maybeThrowLastError(); + writeCurrentBufferToService(); + flushWrittenBytesToServiceAsync(); + } + + private synchronized void flushWrittenBytesToService() throws IOException { + for (WriteOperation writeOperation : writeOperations) { + try { + writeOperation.task.get(); + } catch (Exception ex) { + lastError = new IOException(ex); + throw lastError; + } + } + LOG.debug("flushWrittenBytesToService: {} position:{}", path, position); + flushWrittenBytesToServiceInternal(position); + } + + private synchronized void flushWrittenBytesToServiceAsync() throws IOException { + shrinkWriteOperationQueue(); + + if (this.lastTotalAppendOffset > this.lastFlushOffset) { + this.flushWrittenBytesToServiceInternal(this.lastTotalAppendOffset); + } + } + + private static class WriteOperation { + private final Future<Void> task; + private final long startOffset; + private final long length; + + WriteOperation(final Future<Void> task, final long startOffset, final long length) { + this.task = task; + this.startOffset = startOffset; + this.length = length; + } + } + +} diff --git a/other/java/client/src/main/java/seaweedfs/client/SeaweedRead.java b/other/java/client/src/main/java/seaweedfs/client/SeaweedRead.java index 2efa64580..384636601 100644 --- a/other/java/client/src/main/java/seaweedfs/client/SeaweedRead.java +++ b/other/java/client/src/main/java/seaweedfs/client/SeaweedRead.java @@ -1,88 +1,200 @@ package seaweedfs.client; +import org.apache.http.Header; +import org.apache.http.HeaderElement; import org.apache.http.HttpEntity; import org.apache.http.HttpHeaders; -import org.apache.http.HttpResponse; -import org.apache.http.client.HttpClient; +import org.apache.http.client.entity.GzipDecompressingEntity; +import org.apache.http.client.methods.CloseableHttpResponse; import org.apache.http.client.methods.HttpGet; -import org.apache.http.impl.client.DefaultHttpClient; +import org.apache.http.util.EntityUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; -import java.io.Closeable; import java.io.IOException; -import java.io.OutputStream; import java.nio.ByteBuffer; import java.util.*; public class SeaweedRead { - // private static final Logger LOG = LoggerFactory.getLogger(SeaweedRead.class); + private static final Logger LOG = LoggerFactory.getLogger(SeaweedRead.class); + + static ChunkCache chunkCache = new ChunkCache(4); + static VolumeIdCache volumeIdCache = new VolumeIdCache(4 * 1024); // returns bytesRead - public static long read(FilerGrpcClient filerGrpcClient, List<VisibleInterval> visibleIntervals, - final long position, final byte[] buffer, final int bufferOffset, - final int bufferLength) throws IOException { + public static long read(FilerClient filerClient, List<VisibleInterval> visibleIntervals, + final long position, final ByteBuffer buf, final long fileSize) throws IOException { + + List<ChunkView> chunkViews = viewFromVisibles(visibleIntervals, position, buf.remaining()); - List<ChunkView> chunkViews = viewFromVisibles(visibleIntervals, position, bufferLength); + Map<String, FilerProto.Locations> knownLocations = new HashMap<>(); FilerProto.LookupVolumeRequest.Builder lookupRequest = FilerProto.LookupVolumeRequest.newBuilder(); for (ChunkView chunkView : chunkViews) { String vid = parseVolumeId(chunkView.fileId); - lookupRequest.addVolumeIds(vid); + FilerProto.Locations locations = volumeIdCache.getLocations(vid); + if (locations == null) { + lookupRequest.addVolumeIds(vid); + } else { + knownLocations.put(vid, locations); + } } - FilerProto.LookupVolumeResponse lookupResponse = filerGrpcClient - .getBlockingStub().lookupVolume(lookupRequest.build()); - - Map<String, FilerProto.Locations> vid2Locations = lookupResponse.getLocationsMapMap(); + if (lookupRequest.getVolumeIdsCount() > 0) { + FilerProto.LookupVolumeResponse lookupResponse = filerClient + .getBlockingStub().lookupVolume(lookupRequest.build()); + Map<String, FilerProto.Locations> vid2Locations = lookupResponse.getLocationsMapMap(); + for (Map.Entry<String, FilerProto.Locations> entry : vid2Locations.entrySet()) { + volumeIdCache.setLocations(entry.getKey(), entry.getValue()); + knownLocations.put(entry.getKey(), entry.getValue()); + } + } //TODO parallel this long readCount = 0; - int startOffset = bufferOffset; + long startOffset = position; for (ChunkView chunkView : chunkViews) { - FilerProto.Locations locations = vid2Locations.get(parseVolumeId(chunkView.fileId)); - if (locations.getLocationsCount() == 0) { + + if (startOffset < chunkView.logicOffset) { + long gap = chunkView.logicOffset - startOffset; + LOG.debug("zero [{},{})", startOffset, startOffset + gap); + buf.position(buf.position()+ (int)gap); + readCount += gap; + startOffset += gap; + } + + FilerProto.Locations locations = knownLocations.get(parseVolumeId(chunkView.fileId)); + if (locations == null || locations.getLocationsCount() == 0) { + LOG.error("failed to locate {}", chunkView.fileId); // log here! return 0; } - int len = readChunkView(position, buffer, startOffset, chunkView, locations); + int len = readChunkView(filerClient, startOffset, buf, chunkView, locations); + + LOG.debug("read [{},{}) {} size {}", startOffset, startOffset + len, chunkView.fileId, chunkView.size); readCount += len; startOffset += len; } + long limit = Math.min(buf.limit(), fileSize); + + if (startOffset < limit) { + long gap = limit - startOffset; + LOG.debug("zero2 [{},{})", startOffset, startOffset + gap); + buf.position(buf.position()+ (int)gap); + readCount += gap; + startOffset += gap; + } + return readCount; } - private static int readChunkView(long position, byte[] buffer, int startOffset, ChunkView chunkView, FilerProto.Locations locations) throws IOException { - HttpClient client = new DefaultHttpClient(); - HttpGet request = new HttpGet( - String.format("http://%s/%s", locations.getLocations(0).getUrl(), chunkView.fileId)); + private static int readChunkView(FilerClient filerClient, long startOffset, ByteBuffer buf, ChunkView chunkView, FilerProto.Locations locations) throws IOException { + + byte[] chunkData = chunkCache.getChunk(chunkView.fileId); + + if (chunkData == null) { + chunkData = doFetchFullChunkData(filerClient, chunkView, locations); + chunkCache.setChunk(chunkView.fileId, chunkData); + } + + int len = (int) chunkView.size; + LOG.debug("readChunkView fid:{} chunkData.length:{} chunkView.offset:{} chunkView[{};{}) startOffset:{}", + chunkView.fileId, chunkData.length, chunkView.offset, chunkView.logicOffset, chunkView.logicOffset + chunkView.size, startOffset); + buf.put(chunkData, (int) (startOffset - chunkView.logicOffset + chunkView.offset), len); + + return len; + } + + public static byte[] doFetchFullChunkData(FilerClient filerClient, ChunkView chunkView, FilerProto.Locations locations) throws IOException { + + byte[] data = null; + IOException lastException = null; + for (long waitTime = 1000L; waitTime < 10 * 1000; waitTime += waitTime / 2) { + for (FilerProto.Location location : locations.getLocationsList()) { + String url = filerClient.getChunkUrl(chunkView.fileId, location.getUrl(), location.getPublicUrl()); + try { + data = doFetchOneFullChunkData(chunkView, url); + lastException = null; + break; + } catch (IOException ioe) { + LOG.debug("doFetchFullChunkData {} :{}", url, ioe); + lastException = ioe; + } + } + if (data != null) { + break; + } + try { + Thread.sleep(waitTime); + } catch (InterruptedException e) { + } + } - if (!chunkView.isFullChunk) { - request.setHeader(HttpHeaders.ACCEPT_ENCODING, ""); - request.setHeader(HttpHeaders.RANGE, - String.format("bytes=%d-%d", chunkView.offset, chunkView.offset + chunkView.size)); + if (lastException != null) { + throw lastException; } + LOG.debug("doFetchFullChunkData fid:{} chunkData.length:{}", chunkView.fileId, data.length); + + return data; + + } + + private static byte[] doFetchOneFullChunkData(ChunkView chunkView, String url) throws IOException { + + HttpGet request = new HttpGet(url); + + request.setHeader(HttpHeaders.ACCEPT_ENCODING, "gzip"); + + byte[] data = null; + + CloseableHttpResponse response = SeaweedUtil.getClosableHttpClient().execute(request); + try { - HttpResponse response = client.execute(request); HttpEntity entity = response.getEntity(); - int len = (int) (chunkView.logicOffset - position + chunkView.size); - OutputStream outputStream = new ByteBufferOutputStream(ByteBuffer.wrap(buffer, startOffset, len)); - entity.writeTo(outputStream); - // LOG.debug("* read chunkView:{} startOffset:{} length:{}", chunkView, startOffset, len); + Header contentEncodingHeader = entity.getContentEncoding(); + + if (contentEncodingHeader != null) { + HeaderElement[] encodings = contentEncodingHeader.getElements(); + for (int i = 0; i < encodings.length; i++) { + if (encodings[i].getName().equalsIgnoreCase("gzip")) { + entity = new GzipDecompressingEntity(entity); + break; + } + } + } + + data = EntityUtils.toByteArray(entity); - return len; + EntityUtils.consume(entity); } finally { - if (client instanceof Closeable) { - Closeable t = (Closeable) client; - t.close(); + response.close(); + request.releaseConnection(); + } + + if (chunkView.cipherKey != null && chunkView.cipherKey.length != 0) { + try { + data = SeaweedCipher.decrypt(data, chunkView.cipherKey); + } catch (Exception e) { + throw new IOException("fail to decrypt", e); } } + + if (chunkView.isCompressed) { + data = Gzip.decompress(data); + } + + LOG.debug("doFetchOneFullChunkData url:{} chunkData.length:{}", url, data.length); + + return data; + } protected static List<ChunkView> viewFromVisibles(List<VisibleInterval> visibleIntervals, long offset, long size) { @@ -90,27 +202,40 @@ public class SeaweedRead { long stop = offset + size; for (VisibleInterval chunk : visibleIntervals) { - if (chunk.start <= offset && offset < chunk.stop && offset < stop) { + long chunkStart = Math.max(offset, chunk.start); + long chunkStop = Math.min(stop, chunk.stop); + if (chunkStart < chunkStop) { boolean isFullChunk = chunk.isFullChunk && chunk.start == offset && chunk.stop <= stop; views.add(new ChunkView( - chunk.fileId, - offset - chunk.start, - Math.min(chunk.stop, stop) - offset, - offset, - isFullChunk + chunk.fileId, + chunkStart - chunk.start + chunk.chunkOffset, + chunkStop - chunkStart, + chunkStart, + isFullChunk, + chunk.cipherKey, + chunk.isCompressed )); - offset = Math.min(chunk.stop, stop); } } return views; } - public static List<VisibleInterval> nonOverlappingVisibleIntervals(List<FilerProto.FileChunk> chunkList) { + public static List<VisibleInterval> nonOverlappingVisibleIntervals( + final FilerClient filerClient, List<FilerProto.FileChunk> chunkList) throws IOException { + + chunkList = FileChunkManifest.resolveChunkManifest(filerClient, chunkList); + FilerProto.FileChunk[] chunks = chunkList.toArray(new FilerProto.FileChunk[0]); Arrays.sort(chunks, new Comparator<FilerProto.FileChunk>() { @Override public int compare(FilerProto.FileChunk a, FilerProto.FileChunk b) { - return (int) (a.getMtime() - b.getMtime()); + // if just a.getMtime() - b.getMtime(), it will overflow! + if (a.getMtime() < b.getMtime()) { + return -1; + } else if (a.getMtime() > b.getMtime()) { + return 1; + } + return 0; } }); @@ -127,11 +252,14 @@ public class SeaweedRead { List<VisibleInterval> newVisibles, FilerProto.FileChunk chunk) { VisibleInterval newV = new VisibleInterval( - chunk.getOffset(), - chunk.getOffset() + chunk.getSize(), - chunk.getFileId(), - chunk.getMtime(), - true + chunk.getOffset(), + chunk.getOffset() + chunk.getSize(), + chunk.getFileId(), + chunk.getMtime(), + 0, + true, + chunk.getCipherKey().toByteArray(), + chunk.getIsCompressed() ); // easy cases to speed up @@ -147,21 +275,27 @@ public class SeaweedRead { for (VisibleInterval v : visibles) { if (v.start < chunk.getOffset() && chunk.getOffset() < v.stop) { newVisibles.add(new VisibleInterval( - v.start, - chunk.getOffset(), - v.fileId, - v.modifiedTime, - false + v.start, + chunk.getOffset(), + v.fileId, + v.modifiedTime, + v.chunkOffset, + false, + v.cipherKey, + v.isCompressed )); } long chunkStop = chunk.getOffset() + chunk.getSize(); if (v.start < chunkStop && chunkStop < v.stop) { newVisibles.add(new VisibleInterval( - chunkStop, - v.stop, - v.fileId, - v.modifiedTime, - false + chunkStop, + v.stop, + v.fileId, + v.modifiedTime, + v.chunkOffset + (chunkStop - v.start), + false, + v.cipherKey, + v.isCompressed )); } if (chunkStop <= v.start || v.stop <= chunk.getOffset()) { @@ -191,6 +325,10 @@ public class SeaweedRead { return fileId; } + public static long fileSize(FilerProto.Entry entry) { + return Math.max(totalSize(entry.getChunksList()), entry.getAttributes().getFileSize()); + } + public static long totalSize(List<FilerProto.FileChunk> chunksList) { long size = 0; for (FilerProto.FileChunk chunk : chunksList) { @@ -207,25 +345,33 @@ public class SeaweedRead { public final long stop; public final long modifiedTime; public final String fileId; + public final long chunkOffset; public final boolean isFullChunk; + public final byte[] cipherKey; + public final boolean isCompressed; - public VisibleInterval(long start, long stop, String fileId, long modifiedTime, boolean isFullChunk) { + public VisibleInterval(long start, long stop, String fileId, long modifiedTime, long chunkOffset, boolean isFullChunk, byte[] cipherKey, boolean isCompressed) { this.start = start; this.stop = stop; this.modifiedTime = modifiedTime; this.fileId = fileId; + this.chunkOffset = chunkOffset; this.isFullChunk = isFullChunk; + this.cipherKey = cipherKey; + this.isCompressed = isCompressed; } @Override public String toString() { return "VisibleInterval{" + - "start=" + start + - ", stop=" + stop + - ", modifiedTime=" + modifiedTime + - ", fileId='" + fileId + '\'' + - ", isFullChunk=" + isFullChunk + - '}'; + "start=" + start + + ", stop=" + stop + + ", modifiedTime=" + modifiedTime + + ", fileId='" + fileId + '\'' + + ", isFullChunk=" + isFullChunk + + ", cipherKey=" + Arrays.toString(cipherKey) + + ", isCompressed=" + isCompressed + + '}'; } } @@ -235,24 +381,30 @@ public class SeaweedRead { public final long size; public final long logicOffset; public final boolean isFullChunk; + public final byte[] cipherKey; + public final boolean isCompressed; - public ChunkView(String fileId, long offset, long size, long logicOffset, boolean isFullChunk) { + public ChunkView(String fileId, long offset, long size, long logicOffset, boolean isFullChunk, byte[] cipherKey, boolean isCompressed) { this.fileId = fileId; this.offset = offset; this.size = size; this.logicOffset = logicOffset; this.isFullChunk = isFullChunk; + this.cipherKey = cipherKey; + this.isCompressed = isCompressed; } @Override public String toString() { return "ChunkView{" + - "fileId='" + fileId + '\'' + - ", offset=" + offset + - ", size=" + size + - ", logicOffset=" + logicOffset + - ", isFullChunk=" + isFullChunk + - '}'; + "fileId='" + fileId + '\'' + + ", offset=" + offset + + ", size=" + size + + ", logicOffset=" + logicOffset + + ", isFullChunk=" + isFullChunk + + ", cipherKey=" + Arrays.toString(cipherKey) + + ", isCompressed=" + isCompressed + + '}'; } } diff --git a/other/java/client/src/main/java/seaweedfs/client/SeaweedUtil.java b/other/java/client/src/main/java/seaweedfs/client/SeaweedUtil.java new file mode 100644 index 000000000..c465d935f --- /dev/null +++ b/other/java/client/src/main/java/seaweedfs/client/SeaweedUtil.java @@ -0,0 +1,30 @@ +package seaweedfs.client; + +import org.apache.http.impl.DefaultConnectionReuseStrategy; +import org.apache.http.impl.client.CloseableHttpClient; +import org.apache.http.impl.client.DefaultConnectionKeepAliveStrategy; +import org.apache.http.impl.client.HttpClientBuilder; +import org.apache.http.impl.conn.PoolingHttpClientConnectionManager; + +public class SeaweedUtil { + + static PoolingHttpClientConnectionManager cm = new PoolingHttpClientConnectionManager(); + static CloseableHttpClient httpClient; + + static { + // Increase max total connection to 200 + cm.setMaxTotal(200); + // Increase default max connection per route to 20 + cm.setDefaultMaxPerRoute(20); + + httpClient = HttpClientBuilder.create() + .setConnectionManager(cm) + .setConnectionReuseStrategy(DefaultConnectionReuseStrategy.INSTANCE) + .setKeepAliveStrategy(DefaultConnectionKeepAliveStrategy.INSTANCE) + .build(); + } + + public static CloseableHttpClient getClosableHttpClient() { + return httpClient; + } +} diff --git a/other/java/client/src/main/java/seaweedfs/client/SeaweedWrite.java b/other/java/client/src/main/java/seaweedfs/client/SeaweedWrite.java index 0663e8d98..f8c0c76b6 100644 --- a/other/java/client/src/main/java/seaweedfs/client/SeaweedWrite.java +++ b/other/java/client/src/main/java/seaweedfs/client/SeaweedWrite.java @@ -1,68 +1,114 @@ package seaweedfs.client; -import org.apache.http.HttpResponse; -import org.apache.http.client.HttpClient; +import com.google.protobuf.ByteString; +import org.apache.http.client.methods.CloseableHttpResponse; import org.apache.http.client.methods.HttpPost; import org.apache.http.entity.mime.HttpMultipartMode; import org.apache.http.entity.mime.MultipartEntityBuilder; -import org.apache.http.impl.client.DefaultHttpClient; +import org.apache.http.util.EntityUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import java.io.ByteArrayInputStream; -import java.io.Closeable; import java.io.IOException; import java.io.InputStream; +import java.security.SecureRandom; +import java.util.List; public class SeaweedWrite { + private static final Logger LOG = LoggerFactory.getLogger(SeaweedWrite.class); + + private static final SecureRandom random = new SecureRandom(); + public static void writeData(FilerProto.Entry.Builder entry, final String replication, - final FilerGrpcClient filerGrpcClient, + final FilerClient filerClient, final long offset, final byte[] bytes, - final long bytesOffset, final long bytesLength) throws IOException { - FilerProto.AssignVolumeResponse response = filerGrpcClient.getBlockingStub().assignVolume( + final long bytesOffset, final long bytesLength, + final String path) throws IOException { + FilerProto.FileChunk.Builder chunkBuilder = writeChunk( + replication, filerClient, offset, bytes, bytesOffset, bytesLength, path); + synchronized (entry) { + entry.addChunks(chunkBuilder); + } + } + + public static FilerProto.FileChunk.Builder writeChunk(final String replication, + final FilerClient filerClient, + final long offset, + final byte[] bytes, + final long bytesOffset, + final long bytesLength, + final String path) throws IOException { + FilerProto.AssignVolumeResponse response = filerClient.getBlockingStub().assignVolume( FilerProto.AssignVolumeRequest.newBuilder() - .setCollection("") - .setReplication(replication) + .setCollection(filerClient.getCollection()) + .setReplication(replication == null ? filerClient.getReplication() : replication) .setDataCenter("") - .setReplication("") .setTtlSec(0) + .setPath(path) .build()); String fileId = response.getFileId(); - String url = response.getUrl(); String auth = response.getAuth(); - String targetUrl = String.format("http://%s/%s", url, fileId); - String etag = multipartUpload(targetUrl, auth, bytes, bytesOffset, bytesLength); + String targetUrl = filerClient.getChunkUrl(fileId, response.getUrl(), response.getPublicUrl()); - entry.addChunks(FilerProto.FileChunk.newBuilder() + ByteString cipherKeyString = com.google.protobuf.ByteString.EMPTY; + byte[] cipherKey = null; + if (filerClient.isCipher()) { + cipherKey = genCipherKey(); + cipherKeyString = ByteString.copyFrom(cipherKey); + } + + String etag = multipartUpload(targetUrl, auth, bytes, bytesOffset, bytesLength, cipherKey); + + LOG.debug("write file chunk {} size {}", targetUrl, bytesLength); + + return FilerProto.FileChunk.newBuilder() .setFileId(fileId) .setOffset(offset) .setSize(bytesLength) .setMtime(System.currentTimeMillis() / 10000L) .setETag(etag) - ); - + .setCipherKey(cipherKeyString); } - public static void writeMeta(final FilerGrpcClient filerGrpcClient, - final String parentDirectory, final FilerProto.Entry.Builder entry) { - filerGrpcClient.getBlockingStub().createEntry( - FilerProto.CreateEntryRequest.newBuilder() - .setDirectory(parentDirectory) - .setEntry(entry) - .build() - ); + public static void writeMeta(final FilerClient filerClient, + final String parentDirectory, + final FilerProto.Entry.Builder entry) throws IOException { + + synchronized (entry) { + List<FilerProto.FileChunk> chunks = FileChunkManifest.maybeManifestize(filerClient, entry.getChunksList(), parentDirectory); + entry.clearChunks(); + entry.addAllChunks(chunks); + filerClient.getBlockingStub().createEntry( + FilerProto.CreateEntryRequest.newBuilder() + .setDirectory(parentDirectory) + .setEntry(entry) + .build() + ); + } } private static String multipartUpload(String targetUrl, String auth, final byte[] bytes, - final long bytesOffset, final long bytesLength) throws IOException { - - HttpClient client = new DefaultHttpClient(); - - InputStream inputStream = new ByteArrayInputStream(bytes, (int) bytesOffset, (int) bytesLength); + final long bytesOffset, final long bytesLength, + byte[] cipherKey) throws IOException { + + InputStream inputStream = null; + if (cipherKey == null || cipherKey.length == 0) { + inputStream = new ByteArrayInputStream(bytes, (int) bytesOffset, (int) bytesLength); + } else { + try { + byte[] encryptedBytes = SeaweedCipher.encrypt(bytes, (int) bytesOffset, (int) bytesLength, cipherKey); + inputStream = new ByteArrayInputStream(encryptedBytes, 0, encryptedBytes.length); + } catch (Exception e) { + throw new IOException("fail to encrypt data", e); + } + } HttpPost post = new HttpPost(targetUrl); if (auth != null && auth.length() != 0) { @@ -74,8 +120,9 @@ public class SeaweedWrite { .addBinaryBody("upload", inputStream) .build()); + CloseableHttpResponse response = SeaweedUtil.getClosableHttpClient().execute(post); + try { - HttpResponse response = client.execute(post); String etag = response.getLastHeader("ETag").getValue(); @@ -83,13 +130,19 @@ public class SeaweedWrite { etag = etag.substring(1, etag.length() - 1); } + EntityUtils.consume(response.getEntity()); + return etag; } finally { - if (client instanceof Closeable) { - Closeable t = (Closeable) client; - t.close(); - } + response.close(); + post.releaseConnection(); } } + + private static byte[] genCipherKey() { + byte[] b = new byte[32]; + random.nextBytes(b); + return b; + } } diff --git a/other/java/client/src/main/java/seaweedfs/client/VolumeIdCache.java b/other/java/client/src/main/java/seaweedfs/client/VolumeIdCache.java new file mode 100644 index 000000000..fd2649cc2 --- /dev/null +++ b/other/java/client/src/main/java/seaweedfs/client/VolumeIdCache.java @@ -0,0 +1,36 @@ +package seaweedfs.client; + +import com.google.common.cache.Cache; +import com.google.common.cache.CacheBuilder; + +import java.util.concurrent.TimeUnit; + +public class VolumeIdCache { + + private Cache<String, FilerProto.Locations> cache = null; + + public VolumeIdCache(int maxEntries) { + if (maxEntries == 0) { + return; + } + this.cache = CacheBuilder.newBuilder() + .maximumSize(maxEntries) + .expireAfterAccess(5, TimeUnit.MINUTES) + .build(); + } + + public FilerProto.Locations getLocations(String volumeId) { + if (this.cache == null) { + return null; + } + return this.cache.getIfPresent(volumeId); + } + + public void setLocations(String volumeId, FilerProto.Locations locations) { + if (this.cache == null) { + return; + } + this.cache.put(volumeId, locations); + } + +} diff --git a/other/java/client/src/main/proto/filer.proto b/other/java/client/src/main/proto/filer.proto index ef847cbe7..ac4c9a0e7 100644 --- a/other/java/client/src/main/proto/filer.proto +++ b/other/java/client/src/main/proto/filer.proto @@ -2,6 +2,7 @@ syntax = "proto3"; package filer_pb; +option go_package = "github.com/chrislusf/seaweedfs/weed/pb/filer_pb"; option java_package = "seaweedfs.client"; option java_outer_classname = "FilerProto"; @@ -21,6 +22,9 @@ service SeaweedFiler { rpc UpdateEntry (UpdateEntryRequest) returns (UpdateEntryResponse) { } + rpc AppendToEntry (AppendToEntryRequest) returns (AppendToEntryResponse) { + } + rpc DeleteEntry (DeleteEntryRequest) returns (DeleteEntryResponse) { } @@ -33,6 +37,9 @@ service SeaweedFiler { rpc LookupVolume (LookupVolumeRequest) returns (LookupVolumeResponse) { } + rpc CollectionList (CollectionListRequest) returns (CollectionListResponse) { + } + rpc DeleteCollection (DeleteCollectionRequest) returns (DeleteCollectionResponse) { } @@ -42,6 +49,24 @@ service SeaweedFiler { rpc GetFilerConfiguration (GetFilerConfigurationRequest) returns (GetFilerConfigurationResponse) { } + rpc SubscribeMetadata (SubscribeMetadataRequest) returns (stream SubscribeMetadataResponse) { + } + + rpc SubscribeLocalMetadata (SubscribeMetadataRequest) returns (stream SubscribeMetadataResponse) { + } + + rpc KeepConnected (stream KeepConnectedRequest) returns (stream KeepConnectedResponse) { + } + + rpc LocateBroker (LocateBrokerRequest) returns (LocateBrokerResponse) { + } + + rpc KvGet (KvGetRequest) returns (KvGetResponse) { + } + + rpc KvPut (KvPutRequest) returns (KvPutResponse) { + } + } ////////////////////////////////////////////////// @@ -73,6 +98,9 @@ message Entry { repeated FileChunk chunks = 3; FuseAttributes attributes = 4; map<string, bytes> extended = 5; + bytes hard_link_id = 7; + int32 hard_link_counter = 8; // only exists in hard link meta data + bytes content = 9; // if not empty, the file content } message FullEntry { @@ -85,6 +113,8 @@ message EventNotification { Entry new_entry = 2; bool delete_chunks = 3; string new_parent_path = 4; + bool is_from_other_cluster = 5; + repeated int32 signatures = 6; } message FileChunk { @@ -96,6 +126,13 @@ message FileChunk { string source_file_id = 6; // to be deprecated FileId fid = 7; FileId source_fid = 8; + bytes cipher_key = 9; + bool is_compressed = 10; + bool is_chunk_manifest = 11; // content is a list of FileChunks +} + +message FileChunkManifest { + repeated FileChunk chunks = 1; } message FileId { @@ -118,23 +155,39 @@ message FuseAttributes { string user_name = 11; // for hdfs repeated string group_name = 12; // for hdfs string symlink_target = 13; + bytes md5 = 14; + string disk_type = 15; } message CreateEntryRequest { string directory = 1; Entry entry = 2; + bool o_excl = 3; + bool is_from_other_cluster = 4; + repeated int32 signatures = 5; } message CreateEntryResponse { + string error = 1; } message UpdateEntryRequest { string directory = 1; Entry entry = 2; + bool is_from_other_cluster = 3; + repeated int32 signatures = 4; } message UpdateEntryResponse { } +message AppendToEntryRequest { + string directory = 1; + string entry_name = 2; + repeated FileChunk chunks = 3; +} +message AppendToEntryResponse { +} + message DeleteEntryRequest { string directory = 1; string name = 2; @@ -142,9 +195,12 @@ message DeleteEntryRequest { bool is_delete_data = 4; bool is_recursive = 5; bool ignore_recursive_error = 6; + bool is_from_other_cluster = 7; + repeated int32 signatures = 8; } message DeleteEntryResponse { + string error = 1; } message AtomicRenameEntryRequest { @@ -163,6 +219,9 @@ message AssignVolumeRequest { string replication = 3; int32 ttl_sec = 4; string data_center = 5; + string path = 6; + string rack = 7; + string disk_type = 8; } message AssignVolumeResponse { @@ -171,6 +230,9 @@ message AssignVolumeResponse { string public_url = 3; int32 count = 4; string auth = 5; + string collection = 6; + string replication = 7; + string error = 8; } message LookupVolumeRequest { @@ -189,6 +251,16 @@ message LookupVolumeResponse { map<string, Locations> locations_map = 1; } +message Collection { + string name = 1; +} +message CollectionListRequest { + bool include_normal_volumes = 1; + bool include_ec_volumes = 2; +} +message CollectionListResponse { + repeated Collection collections = 1; +} message DeleteCollectionRequest { string collection = 1; } @@ -200,11 +272,9 @@ message StatisticsRequest { string replication = 1; string collection = 2; string ttl = 3; + string disk_type = 4; } message StatisticsResponse { - string replication = 1; - string collection = 2; - string ttl = 3; uint64 total_size = 4; uint64 used_size = 5; uint64 file_count = 6; @@ -217,4 +287,80 @@ message GetFilerConfigurationResponse { string replication = 2; string collection = 3; uint32 max_mb = 4; + string dir_buckets = 5; + bool cipher = 7; + int32 signature = 8; + string metrics_address = 9; + int32 metrics_interval_sec = 10; +} + +message SubscribeMetadataRequest { + string client_name = 1; + string path_prefix = 2; + int64 since_ns = 3; + int32 signature = 4; +} +message SubscribeMetadataResponse { + string directory = 1; + EventNotification event_notification = 2; + int64 ts_ns = 3; +} + +message LogEntry { + int64 ts_ns = 1; + int32 partition_key_hash = 2; + bytes data = 3; +} + +message KeepConnectedRequest { + string name = 1; + uint32 grpc_port = 2; + repeated string resources = 3; +} +message KeepConnectedResponse { +} + +message LocateBrokerRequest { + string resource = 1; +} +message LocateBrokerResponse { + bool found = 1; + // if found, send the exact address + // if not found, send the full list of existing brokers + message Resource { + string grpc_addresses = 1; + int32 resource_count = 2; + } + repeated Resource resources = 2; +} + +// Key-Value operations +message KvGetRequest { + bytes key = 1; +} +message KvGetResponse { + bytes value = 1; + string error = 2; +} +message KvPutRequest { + bytes key = 1; + bytes value = 2; +} +message KvPutResponse { + string error = 1; +} + +// path-based configurations +message FilerConf { + int32 version = 1; + message PathConf { + string location_prefix = 1; + string collection = 2; + string replication = 3; + string ttl = 4; + string disk_type = 5; + bool fsync = 6; + uint32 volume_growth_count = 7; + } + repeated PathConf locations = 2; } diff --git a/other/java/client/src/test/java/seaweedfs/client/SeaweedCipherTest.java b/other/java/client/src/test/java/seaweedfs/client/SeaweedCipherTest.java new file mode 100644 index 000000000..7b5e53e19 --- /dev/null +++ b/other/java/client/src/test/java/seaweedfs/client/SeaweedCipherTest.java @@ -0,0 +1,42 @@ +package seaweedfs.client; + +import org.junit.Test; + +import java.util.Base64; + +import static seaweedfs.client.SeaweedCipher.decrypt; +import static seaweedfs.client.SeaweedCipher.encrypt; + +public class SeaweedCipherTest { + + @Test + public void testSameAsGoImplemnetation() throws Exception { + byte[] secretKey = "256-bit key for AES 256 GCM encr".getBytes(); + + String plainText = "Now we need to generate a 256-bit key for AES 256 GCM"; + + System.out.println("Original Text : " + plainText); + + byte[] cipherText = encrypt(plainText.getBytes(), secretKey); + System.out.println("Encrypted Text : " + Base64.getEncoder().encodeToString(cipherText)); + + byte[] decryptedText = decrypt(cipherText, secretKey); + System.out.println("DeCrypted Text : " + new String(decryptedText)); + } + + @Test + public void testEncryptDecrypt() throws Exception { + byte[] secretKey = SeaweedCipher.genCipherKey(); + + String plainText = "Now we need to generate a 256-bit key for AES 256 GCM"; + + System.out.println("Original Text : " + plainText); + + byte[] cipherText = encrypt(plainText.getBytes(), secretKey); + System.out.println("Encrypted Text : " + Base64.getEncoder().encodeToString(cipherText)); + + byte[] decryptedText = decrypt(cipherText, secretKey); + System.out.println("DeCrypted Text : " + new String(decryptedText)); + } + +} diff --git a/other/java/client/src/test/java/seaweedfs/client/SeaweedReadTest.java b/other/java/client/src/test/java/seaweedfs/client/SeaweedReadTest.java index ccfcdb117..44b833c90 100644 --- a/other/java/client/src/test/java/seaweedfs/client/SeaweedReadTest.java +++ b/other/java/client/src/test/java/seaweedfs/client/SeaweedReadTest.java @@ -3,13 +3,14 @@ package seaweedfs.client; import org.junit.Assert; import org.junit.Test; +import java.io.IOException; import java.util.ArrayList; import java.util.List; public class SeaweedReadTest { @Test - public void testNonOverlappingVisibleIntervals() { + public void testNonOverlappingVisibleIntervals() throws IOException { List<FilerProto.FileChunk> chunks = new ArrayList<>(); chunks.add(FilerProto.FileChunk.newBuilder() .setFileId("aaa") @@ -24,7 +25,7 @@ public class SeaweedReadTest { .setMtime(2000) .build()); - List<SeaweedRead.VisibleInterval> visibleIntervals = SeaweedRead.nonOverlappingVisibleIntervals(chunks); + List<SeaweedRead.VisibleInterval> visibleIntervals = SeaweedRead.nonOverlappingVisibleIntervals(null, chunks); for (SeaweedRead.VisibleInterval visibleInterval : visibleIntervals) { System.out.println("visible:" + visibleInterval); } |
