aboutsummaryrefslogtreecommitdiff
path: root/other/java/hdfs
diff options
context:
space:
mode:
authorChris Lu <chris.lu@gmail.com>2019-08-29 23:29:10 -0700
committerChris Lu <chris.lu@gmail.com>2019-08-29 23:29:10 -0700
commit170ee6ef0f9a94504580db5fa8c82e4ef6d50a99 (patch)
treead4aa485ff8af6e4df38fd547763fb5609d5c176 /other/java/hdfs
parent58168a8c527e8adb5c36566f63cf32a26dba3df4 (diff)
downloadseaweedfs-170ee6ef0f9a94504580db5fa8c82e4ef6d50a99.tar.xz
seaweedfs-170ee6ef0f9a94504580db5fa8c82e4ef6d50a99.zip
tmp
Diffstat (limited to 'other/java/hdfs')
-rw-r--r--other/java/hdfs/pom.xml159
-rw-r--r--other/java/hdfs/src/main/java/seaweed/hdfs/ReadBuffer.java137
-rw-r--r--other/java/hdfs/src/main/java/seaweed/hdfs/ReadBufferManager.java394
-rw-r--r--other/java/hdfs/src/main/java/seaweed/hdfs/ReadBufferStatus.java29
-rw-r--r--other/java/hdfs/src/main/java/seaweed/hdfs/ReadBufferWorker.java70
-rw-r--r--other/java/hdfs/src/main/java/seaweed/hdfs/SeaweedFileSystem.java611
-rw-r--r--other/java/hdfs/src/main/java/seaweed/hdfs/SeaweedFileSystemStore.java277
-rw-r--r--other/java/hdfs/src/main/java/seaweed/hdfs/SeaweedInputStream.java371
-rw-r--r--other/java/hdfs/src/main/java/seaweed/hdfs/SeaweedOutputStream.java335
9 files changed, 0 insertions, 2383 deletions
diff --git a/other/java/hdfs/pom.xml b/other/java/hdfs/pom.xml
deleted file mode 100644
index 6a1cd897f..000000000
--- a/other/java/hdfs/pom.xml
+++ /dev/null
@@ -1,159 +0,0 @@
-<?xml version="1.0" encoding="UTF-8"?>
-<project xmlns="http://maven.apache.org/POM/4.0.0"
- xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
- xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
- <modelVersion>4.0.0</modelVersion>
-
- <properties>
- <seaweedfs.client.version>1.1.0</seaweedfs.client.version>
- <hadoop.version>3.1.1</hadoop.version>
- </properties>
-
- <groupId>com.github.chrislusf</groupId>
- <artifactId>seaweedfs-hadoop-client</artifactId>
- <version>${seaweedfs.client.version}</version>
-
- <parent>
- <groupId>org.sonatype.oss</groupId>
- <artifactId>oss-parent</artifactId>
- <version>9</version>
- </parent>
-
- <distributionManagement>
- <snapshotRepository>
- <id>ossrh</id>
- <url>https://oss.sonatype.org/content/repositories/snapshots</url>
- </snapshotRepository>
- </distributionManagement>
- <build>
- <plugins>
- <plugin>
- <groupId>org.apache.maven.plugins</groupId>
- <artifactId>maven-compiler-plugin</artifactId>
- <configuration>
- <source>7</source>
- <target>7</target>
- </configuration>
- </plugin>
- <plugin>
- <groupId>org.apache.maven.plugins</groupId>
- <artifactId>maven-shade-plugin</artifactId>
- <version>3.2.1</version>
- <executions>
- <execution>
- <phase>package</phase>
- <goals>
- <goal>shade</goal>
- </goals>
- <configuration>
- <filters>
- <filter>
- <artifact>*:*</artifact>
- <excludes>
- <exclude>META-INF/*.SF</exclude>
- <exclude>META-INF/*.DSA</exclude>
- <exclude>META-INF/*.RSA</exclude>
- <exclude>org/slf4j/**</exclude>
- <exclude>META-INF/maven/org.slf4j/**</exclude>
- </excludes>
- </filter>
- </filters>
- <transformers>
- <transformer
- implementation="org.apache.maven.plugins.shade.resource.ServicesResourceTransformer"/>
- </transformers>
- <relocations>
- <relocation>
- <pattern>com.google</pattern>
- <shadedPattern>shaded.com.google</shadedPattern>
- </relocation>
- <relocation>
- <pattern>io.grpc.internal</pattern>
- <shadedPattern>shaded.io.grpc.internal</shadedPattern>
- </relocation>
- <relocation>
- <pattern>org.apache.commons</pattern>
- <shadedPattern>shaded.org.apache.commons</shadedPattern>
- <excludes>
- <exclude>org.apache.hadoop</exclude>
- <exclude>org.apache.log4j</exclude>
- </excludes>
- </relocation>
- </relocations>
- </configuration>
- </execution>
- </executions>
- </plugin>
- <plugin>
- <groupId>org.apache.maven.plugins</groupId>
- <artifactId>maven-gpg-plugin</artifactId>
- <version>1.5</version>
- <executions>
- <execution>
- <id>sign-artifacts</id>
- <phase>verify</phase>
- <goals>
- <goal>sign</goal>
- </goals>
- </execution>
- </executions>
- </plugin>
- <plugin>
- <groupId>org.sonatype.plugins</groupId>
- <artifactId>nexus-staging-maven-plugin</artifactId>
- <version>1.6.7</version>
- <extensions>true</extensions>
- <configuration>
- <serverId>ossrh</serverId>
- <nexusUrl>https://oss.sonatype.org/</nexusUrl>
- <autoReleaseAfterClose>true</autoReleaseAfterClose>
- </configuration>
- </plugin>
- <plugin>
- <groupId>org.apache.maven.plugins</groupId>
- <artifactId>maven-source-plugin</artifactId>
- <version>2.2.1</version>
- <executions>
- <execution>
- <id>attach-sources</id>
- <goals>
- <goal>jar-no-fork</goal>
- </goals>
- </execution>
- </executions>
- </plugin>
- <plugin>
- <groupId>org.apache.maven.plugins</groupId>
- <artifactId>maven-javadoc-plugin</artifactId>
- <version>2.9.1</version>
- <executions>
- <execution>
- <id>attach-javadocs</id>
- <goals>
- <goal>jar</goal>
- </goals>
- </execution>
- </executions>
- </plugin>
- </plugins>
- </build>
-
- <dependencies>
- <dependency>
- <groupId>org.apache.hadoop</groupId>
- <artifactId>hadoop-client</artifactId>
- <version>${hadoop.version}</version>
- </dependency>
- <dependency>
- <groupId>com.github.chrislusf</groupId>
- <artifactId>seaweedfs-client</artifactId>
- <version>${seaweedfs.client.version}</version>
- </dependency>
- <dependency>
- <groupId>org.apache.hadoop</groupId>
- <artifactId>hadoop-common</artifactId>
- <version>${hadoop.version}</version>
- </dependency>
- </dependencies>
-
-</project>
diff --git a/other/java/hdfs/src/main/java/seaweed/hdfs/ReadBuffer.java b/other/java/hdfs/src/main/java/seaweed/hdfs/ReadBuffer.java
deleted file mode 100644
index 926d0b83b..000000000
--- a/other/java/hdfs/src/main/java/seaweed/hdfs/ReadBuffer.java
+++ /dev/null
@@ -1,137 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package seaweed.hdfs;
-
-import java.util.concurrent.CountDownLatch;
-
-class ReadBuffer {
-
- private SeaweedInputStream stream;
- private long offset; // offset within the file for the buffer
- private int length; // actual length, set after the buffer is filles
- private int requestedLength; // requested length of the read
- private byte[] buffer; // the buffer itself
- private int bufferindex = -1; // index in the buffers array in Buffer manager
- private ReadBufferStatus status; // status of the buffer
- private CountDownLatch latch = null; // signaled when the buffer is done reading, so any client
- // waiting on this buffer gets unblocked
-
- // fields to help with eviction logic
- private long timeStamp = 0; // tick at which buffer became available to read
- private boolean isFirstByteConsumed = false;
- private boolean isLastByteConsumed = false;
- private boolean isAnyByteConsumed = false;
-
- public SeaweedInputStream getStream() {
- return stream;
- }
-
- public void setStream(SeaweedInputStream stream) {
- this.stream = stream;
- }
-
- public long getOffset() {
- return offset;
- }
-
- public void setOffset(long offset) {
- this.offset = offset;
- }
-
- public int getLength() {
- return length;
- }
-
- public void setLength(int length) {
- this.length = length;
- }
-
- public int getRequestedLength() {
- return requestedLength;
- }
-
- public void setRequestedLength(int requestedLength) {
- this.requestedLength = requestedLength;
- }
-
- public byte[] getBuffer() {
- return buffer;
- }
-
- public void setBuffer(byte[] buffer) {
- this.buffer = buffer;
- }
-
- public int getBufferindex() {
- return bufferindex;
- }
-
- public void setBufferindex(int bufferindex) {
- this.bufferindex = bufferindex;
- }
-
- public ReadBufferStatus getStatus() {
- return status;
- }
-
- public void setStatus(ReadBufferStatus status) {
- this.status = status;
- }
-
- public CountDownLatch getLatch() {
- return latch;
- }
-
- public void setLatch(CountDownLatch latch) {
- this.latch = latch;
- }
-
- public long getTimeStamp() {
- return timeStamp;
- }
-
- public void setTimeStamp(long timeStamp) {
- this.timeStamp = timeStamp;
- }
-
- public boolean isFirstByteConsumed() {
- return isFirstByteConsumed;
- }
-
- public void setFirstByteConsumed(boolean isFirstByteConsumed) {
- this.isFirstByteConsumed = isFirstByteConsumed;
- }
-
- public boolean isLastByteConsumed() {
- return isLastByteConsumed;
- }
-
- public void setLastByteConsumed(boolean isLastByteConsumed) {
- this.isLastByteConsumed = isLastByteConsumed;
- }
-
- public boolean isAnyByteConsumed() {
- return isAnyByteConsumed;
- }
-
- public void setAnyByteConsumed(boolean isAnyByteConsumed) {
- this.isAnyByteConsumed = isAnyByteConsumed;
- }
-
-}
diff --git a/other/java/hdfs/src/main/java/seaweed/hdfs/ReadBufferManager.java b/other/java/hdfs/src/main/java/seaweed/hdfs/ReadBufferManager.java
deleted file mode 100644
index 5b1e21529..000000000
--- a/other/java/hdfs/src/main/java/seaweed/hdfs/ReadBufferManager.java
+++ /dev/null
@@ -1,394 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- * <p>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package seaweed.hdfs;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.util.Collection;
-import java.util.LinkedList;
-import java.util.Queue;
-import java.util.Stack;
-import java.util.concurrent.CountDownLatch;
-
-/**
- * The Read Buffer Manager for Rest AbfsClient.
- */
-final class ReadBufferManager {
- private static final Logger LOGGER = LoggerFactory.getLogger(ReadBufferManager.class);
-
- private static final int NUM_BUFFERS = 16;
- private static final int BLOCK_SIZE = 4 * 1024 * 1024;
- private static final int NUM_THREADS = 8;
- private static final int THRESHOLD_AGE_MILLISECONDS = 3000; // have to see if 3 seconds is a good threshold
-
- private Thread[] threads = new Thread[NUM_THREADS];
- private byte[][] buffers; // array of byte[] buffers, to hold the data that is read
- private Stack<Integer> freeList = new Stack<>(); // indices in buffers[] array that are available
-
- private Queue<ReadBuffer> readAheadQueue = new LinkedList<>(); // queue of requests that are not picked up by any worker thread yet
- private LinkedList<ReadBuffer> inProgressList = new LinkedList<>(); // requests being processed by worker threads
- private LinkedList<ReadBuffer> completedReadList = new LinkedList<>(); // buffers available for reading
- private static final ReadBufferManager BUFFER_MANAGER; // singleton, initialized in static initialization block
-
- static {
- BUFFER_MANAGER = new ReadBufferManager();
- BUFFER_MANAGER.init();
- }
-
- static ReadBufferManager getBufferManager() {
- return BUFFER_MANAGER;
- }
-
- private void init() {
- buffers = new byte[NUM_BUFFERS][];
- for (int i = 0; i < NUM_BUFFERS; i++) {
- buffers[i] = new byte[BLOCK_SIZE]; // same buffers are reused. The byte array never goes back to GC
- freeList.add(i);
- }
- for (int i = 0; i < NUM_THREADS; i++) {
- Thread t = new Thread(new ReadBufferWorker(i));
- t.setDaemon(true);
- threads[i] = t;
- t.setName("SeaweedFS-prefetch-" + i);
- t.start();
- }
- ReadBufferWorker.UNLEASH_WORKERS.countDown();
- }
-
- // hide instance constructor
- private ReadBufferManager() {
- }
-
-
- /*
- *
- * SeaweedInputStream-facing methods
- *
- */
-
-
- /**
- * {@link SeaweedInputStream} calls this method to queue read-aheads.
- *
- * @param stream The {@link SeaweedInputStream} for which to do the read-ahead
- * @param requestedOffset The offset in the file which shoukd be read
- * @param requestedLength The length to read
- */
- void queueReadAhead(final SeaweedInputStream stream, final long requestedOffset, final int requestedLength) {
- if (LOGGER.isTraceEnabled()) {
- LOGGER.trace("Start Queueing readAhead for {} offset {} length {}",
- stream.getPath(), requestedOffset, requestedLength);
- }
- ReadBuffer buffer;
- synchronized (this) {
- if (isAlreadyQueued(stream, requestedOffset)) {
- return; // already queued, do not queue again
- }
- if (freeList.isEmpty() && !tryEvict()) {
- return; // no buffers available, cannot queue anything
- }
-
- buffer = new ReadBuffer();
- buffer.setStream(stream);
- buffer.setOffset(requestedOffset);
- buffer.setLength(0);
- buffer.setRequestedLength(requestedLength);
- buffer.setStatus(ReadBufferStatus.NOT_AVAILABLE);
- buffer.setLatch(new CountDownLatch(1));
-
- Integer bufferIndex = freeList.pop(); // will return a value, since we have checked size > 0 already
-
- buffer.setBuffer(buffers[bufferIndex]);
- buffer.setBufferindex(bufferIndex);
- readAheadQueue.add(buffer);
- notifyAll();
- }
- if (LOGGER.isTraceEnabled()) {
- LOGGER.trace("Done q-ing readAhead for file {} offset {} buffer idx {}",
- stream.getPath(), requestedOffset, buffer.getBufferindex());
- }
- }
-
-
- /**
- * {@link SeaweedInputStream} calls this method read any bytes already available in a buffer (thereby saving a
- * remote read). This returns the bytes if the data already exists in buffer. If there is a buffer that is reading
- * the requested offset, then this method blocks until that read completes. If the data is queued in a read-ahead
- * but not picked up by a worker thread yet, then it cancels that read-ahead and reports cache miss. This is because
- * depending on worker thread availability, the read-ahead may take a while - the calling thread can do it's own
- * read to get the data faster (copmared to the read waiting in queue for an indeterminate amount of time).
- *
- * @param stream the file to read bytes for
- * @param position the offset in the file to do a read for
- * @param length the length to read
- * @param buffer the buffer to read data into. Note that the buffer will be written into from offset 0.
- * @return the number of bytes read
- */
- int getBlock(final SeaweedInputStream stream, final long position, final int length, final byte[] buffer) {
- // not synchronized, so have to be careful with locking
- if (LOGGER.isTraceEnabled()) {
- LOGGER.trace("getBlock for file {} position {} thread {}",
- stream.getPath(), position, Thread.currentThread().getName());
- }
-
- waitForProcess(stream, position);
-
- int bytesRead = 0;
- synchronized (this) {
- bytesRead = getBlockFromCompletedQueue(stream, position, length, buffer);
- }
- if (bytesRead > 0) {
- if (LOGGER.isTraceEnabled()) {
- LOGGER.trace("Done read from Cache for {} position {} length {}",
- stream.getPath(), position, bytesRead);
- }
- return bytesRead;
- }
-
- // otherwise, just say we got nothing - calling thread can do its own read
- return 0;
- }
-
- /*
- *
- * Internal methods
- *
- */
-
- private void waitForProcess(final SeaweedInputStream stream, final long position) {
- ReadBuffer readBuf;
- synchronized (this) {
- clearFromReadAheadQueue(stream, position);
- readBuf = getFromList(inProgressList, stream, position);
- }
- if (readBuf != null) { // if in in-progress queue, then block for it
- try {
- if (LOGGER.isTraceEnabled()) {
- LOGGER.trace("got a relevant read buffer for file {} offset {} buffer idx {}",
- stream.getPath(), readBuf.getOffset(), readBuf.getBufferindex());
- }
- readBuf.getLatch().await(); // blocking wait on the caller stream's thread
- // Note on correctness: readBuf gets out of inProgressList only in 1 place: after worker thread
- // is done processing it (in doneReading). There, the latch is set after removing the buffer from
- // inProgressList. So this latch is safe to be outside the synchronized block.
- // Putting it in synchronized would result in a deadlock, since this thread would be holding the lock
- // while waiting, so no one will be able to change any state. If this becomes more complex in the future,
- // then the latch cane be removed and replaced with wait/notify whenever inProgressList is touched.
- } catch (InterruptedException ex) {
- Thread.currentThread().interrupt();
- }
- if (LOGGER.isTraceEnabled()) {
- LOGGER.trace("latch done for file {} buffer idx {} length {}",
- stream.getPath(), readBuf.getBufferindex(), readBuf.getLength());
- }
- }
- }
-
- /**
- * If any buffer in the completedlist can be reclaimed then reclaim it and return the buffer to free list.
- * The objective is to find just one buffer - there is no advantage to evicting more than one.
- *
- * @return whether the eviction succeeeded - i.e., were we able to free up one buffer
- */
- private synchronized boolean tryEvict() {
- ReadBuffer nodeToEvict = null;
- if (completedReadList.size() <= 0) {
- return false; // there are no evict-able buffers
- }
-
- // first, try buffers where all bytes have been consumed (approximated as first and last bytes consumed)
- for (ReadBuffer buf : completedReadList) {
- if (buf.isFirstByteConsumed() && buf.isLastByteConsumed()) {
- nodeToEvict = buf;
- break;
- }
- }
- if (nodeToEvict != null) {
- return evict(nodeToEvict);
- }
-
- // next, try buffers where any bytes have been consumed (may be a bad idea? have to experiment and see)
- for (ReadBuffer buf : completedReadList) {
- if (buf.isAnyByteConsumed()) {
- nodeToEvict = buf;
- break;
- }
- }
-
- if (nodeToEvict != null) {
- return evict(nodeToEvict);
- }
-
- // next, try any old nodes that have not been consumed
- long earliestBirthday = Long.MAX_VALUE;
- for (ReadBuffer buf : completedReadList) {
- if (buf.getTimeStamp() < earliestBirthday) {
- nodeToEvict = buf;
- earliestBirthday = buf.getTimeStamp();
- }
- }
- if ((currentTimeMillis() - earliestBirthday > THRESHOLD_AGE_MILLISECONDS) && (nodeToEvict != null)) {
- return evict(nodeToEvict);
- }
-
- // nothing can be evicted
- return false;
- }
-
- private boolean evict(final ReadBuffer buf) {
- freeList.push(buf.getBufferindex());
- completedReadList.remove(buf);
- if (LOGGER.isTraceEnabled()) {
- LOGGER.trace("Evicting buffer idx {}; was used for file {} offset {} length {}",
- buf.getBufferindex(), buf.getStream().getPath(), buf.getOffset(), buf.getLength());
- }
- return true;
- }
-
- private boolean isAlreadyQueued(final SeaweedInputStream stream, final long requestedOffset) {
- // returns true if any part of the buffer is already queued
- return (isInList(readAheadQueue, stream, requestedOffset)
- || isInList(inProgressList, stream, requestedOffset)
- || isInList(completedReadList, stream, requestedOffset));
- }
-
- private boolean isInList(final Collection<ReadBuffer> list, final SeaweedInputStream stream, final long requestedOffset) {
- return (getFromList(list, stream, requestedOffset) != null);
- }
-
- private ReadBuffer getFromList(final Collection<ReadBuffer> list, final SeaweedInputStream stream, final long requestedOffset) {
- for (ReadBuffer buffer : list) {
- if (buffer.getStream() == stream) {
- if (buffer.getStatus() == ReadBufferStatus.AVAILABLE
- && requestedOffset >= buffer.getOffset()
- && requestedOffset < buffer.getOffset() + buffer.getLength()) {
- return buffer;
- } else if (requestedOffset >= buffer.getOffset()
- && requestedOffset < buffer.getOffset() + buffer.getRequestedLength()) {
- return buffer;
- }
- }
- }
- return null;
- }
-
- private void clearFromReadAheadQueue(final SeaweedInputStream stream, final long requestedOffset) {
- ReadBuffer buffer = getFromList(readAheadQueue, stream, requestedOffset);
- if (buffer != null) {
- readAheadQueue.remove(buffer);
- notifyAll(); // lock is held in calling method
- freeList.push(buffer.getBufferindex());
- }
- }
-
- private int getBlockFromCompletedQueue(final SeaweedInputStream stream, final long position, final int length,
- final byte[] buffer) {
- ReadBuffer buf = getFromList(completedReadList, stream, position);
- if (buf == null || position >= buf.getOffset() + buf.getLength()) {
- return 0;
- }
- int cursor = (int) (position - buf.getOffset());
- int availableLengthInBuffer = buf.getLength() - cursor;
- int lengthToCopy = Math.min(length, availableLengthInBuffer);
- System.arraycopy(buf.getBuffer(), cursor, buffer, 0, lengthToCopy);
- if (cursor == 0) {
- buf.setFirstByteConsumed(true);
- }
- if (cursor + lengthToCopy == buf.getLength()) {
- buf.setLastByteConsumed(true);
- }
- buf.setAnyByteConsumed(true);
- return lengthToCopy;
- }
-
- /*
- *
- * ReadBufferWorker-thread-facing methods
- *
- */
-
- /**
- * ReadBufferWorker thread calls this to get the next buffer that it should work on.
- *
- * @return {@link ReadBuffer}
- * @throws InterruptedException if thread is interrupted
- */
- ReadBuffer getNextBlockToRead() throws InterruptedException {
- ReadBuffer buffer = null;
- synchronized (this) {
- //buffer = readAheadQueue.take(); // blocking method
- while (readAheadQueue.size() == 0) {
- wait();
- }
- buffer = readAheadQueue.remove();
- notifyAll();
- if (buffer == null) {
- return null; // should never happen
- }
- buffer.setStatus(ReadBufferStatus.READING_IN_PROGRESS);
- inProgressList.add(buffer);
- }
- if (LOGGER.isTraceEnabled()) {
- LOGGER.trace("ReadBufferWorker picked file {} for offset {}",
- buffer.getStream().getPath(), buffer.getOffset());
- }
- return buffer;
- }
-
- /**
- * ReadBufferWorker thread calls this method to post completion.
- *
- * @param buffer the buffer whose read was completed
- * @param result the {@link ReadBufferStatus} after the read operation in the worker thread
- * @param bytesActuallyRead the number of bytes that the worker thread was actually able to read
- */
- void doneReading(final ReadBuffer buffer, final ReadBufferStatus result, final int bytesActuallyRead) {
- if (LOGGER.isTraceEnabled()) {
- LOGGER.trace("ReadBufferWorker completed file {} for offset {} bytes {}",
- buffer.getStream().getPath(), buffer.getOffset(), bytesActuallyRead);
- }
- synchronized (this) {
- inProgressList.remove(buffer);
- if (result == ReadBufferStatus.AVAILABLE && bytesActuallyRead > 0) {
- buffer.setStatus(ReadBufferStatus.AVAILABLE);
- buffer.setTimeStamp(currentTimeMillis());
- buffer.setLength(bytesActuallyRead);
- completedReadList.add(buffer);
- } else {
- freeList.push(buffer.getBufferindex());
- // buffer should go out of scope after the end of the calling method in ReadBufferWorker, and eligible for GC
- }
- }
- //outside the synchronized, since anyone receiving a wake-up from the latch must see safe-published results
- buffer.getLatch().countDown(); // wake up waiting threads (if any)
- }
-
- /**
- * Similar to System.currentTimeMillis, except implemented with System.nanoTime().
- * System.currentTimeMillis can go backwards when system clock is changed (e.g., with NTP time synchronization),
- * making it unsuitable for measuring time intervals. nanotime is strictly monotonically increasing per CPU core.
- * Note: it is not monotonic across Sockets, and even within a CPU, its only the
- * more recent parts which share a clock across all cores.
- *
- * @return current time in milliseconds
- */
- private long currentTimeMillis() {
- return System.nanoTime() / 1000 / 1000;
- }
-}
diff --git a/other/java/hdfs/src/main/java/seaweed/hdfs/ReadBufferStatus.java b/other/java/hdfs/src/main/java/seaweed/hdfs/ReadBufferStatus.java
deleted file mode 100644
index d63674977..000000000
--- a/other/java/hdfs/src/main/java/seaweed/hdfs/ReadBufferStatus.java
+++ /dev/null
@@ -1,29 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package seaweed.hdfs;
-
-/**
- * The ReadBufferStatus for Rest AbfsClient
- */
-public enum ReadBufferStatus {
- NOT_AVAILABLE, // buffers sitting in readaheadqueue have this stats
- READING_IN_PROGRESS, // reading is in progress on this buffer. Buffer should be in inProgressList
- AVAILABLE, // data is available in buffer. It should be in completedList
- READ_FAILED // read completed, but failed.
-}
diff --git a/other/java/hdfs/src/main/java/seaweed/hdfs/ReadBufferWorker.java b/other/java/hdfs/src/main/java/seaweed/hdfs/ReadBufferWorker.java
deleted file mode 100644
index 6ffbc4644..000000000
--- a/other/java/hdfs/src/main/java/seaweed/hdfs/ReadBufferWorker.java
+++ /dev/null
@@ -1,70 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package seaweed.hdfs;
-
-import java.util.concurrent.CountDownLatch;
-
-class ReadBufferWorker implements Runnable {
-
- protected static final CountDownLatch UNLEASH_WORKERS = new CountDownLatch(1);
- private int id;
-
- ReadBufferWorker(final int id) {
- this.id = id;
- }
-
- /**
- * return the ID of ReadBufferWorker.
- */
- public int getId() {
- return this.id;
- }
-
- /**
- * Waits until a buffer becomes available in ReadAheadQueue.
- * Once a buffer becomes available, reads the file specified in it and then posts results back to buffer manager.
- * Rinse and repeat. Forever.
- */
- public void run() {
- try {
- UNLEASH_WORKERS.await();
- } catch (InterruptedException ex) {
- Thread.currentThread().interrupt();
- }
- ReadBufferManager bufferManager = ReadBufferManager.getBufferManager();
- ReadBuffer buffer;
- while (true) {
- try {
- buffer = bufferManager.getNextBlockToRead(); // blocks, until a buffer is available for this thread
- } catch (InterruptedException ex) {
- Thread.currentThread().interrupt();
- return;
- }
- if (buffer != null) {
- try {
- // do the actual read, from the file.
- int bytesRead = buffer.getStream().readRemote(buffer.getOffset(), buffer.getBuffer(), 0, buffer.getRequestedLength());
- bufferManager.doneReading(buffer, ReadBufferStatus.AVAILABLE, bytesRead); // post result back to ReadBufferManager
- } catch (Exception ex) {
- bufferManager.doneReading(buffer, ReadBufferStatus.READ_FAILED, 0);
- }
- }
- }
- }
-}
diff --git a/other/java/hdfs/src/main/java/seaweed/hdfs/SeaweedFileSystem.java b/other/java/hdfs/src/main/java/seaweed/hdfs/SeaweedFileSystem.java
deleted file mode 100644
index 453924cf7..000000000
--- a/other/java/hdfs/src/main/java/seaweed/hdfs/SeaweedFileSystem.java
+++ /dev/null
@@ -1,611 +0,0 @@
-package seaweed.hdfs;
-
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FSDataInputStream;
-import org.apache.hadoop.fs.FSDataOutputStream;
-import org.apache.hadoop.fs.FileAlreadyExistsException;
-import org.apache.hadoop.fs.FileStatus;
-import org.apache.hadoop.fs.ParentNotDirectoryException;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.fs.UnsupportedFileSystemException;
-import org.apache.hadoop.fs.XAttrSetFlag;
-import org.apache.hadoop.fs.permission.AclEntry;
-import org.apache.hadoop.fs.permission.AclStatus;
-import org.apache.hadoop.fs.permission.FsPermission;
-import org.apache.hadoop.security.AccessControlException;
-import org.apache.hadoop.security.UserGroupInformation;
-import org.apache.hadoop.util.Progressable;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.FileNotFoundException;
-import java.io.IOException;
-import java.io.InputStream;
-import java.io.OutputStream;
-import java.net.URI;
-import java.util.EnumSet;
-import java.util.List;
-import java.util.Map;
-
-import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.IO_FILE_BUFFER_SIZE_KEY;
-
-public class SeaweedFileSystem extends org.apache.hadoop.fs.FileSystem {
-
- public static final int FS_SEAWEED_DEFAULT_PORT = 8888;
- public static final String FS_SEAWEED_FILER_HOST = "fs.seaweed.filer.host";
- public static final String FS_SEAWEED_FILER_PORT = "fs.seaweed.filer.port";
- public static final String FS_SEAWEED_GRPC_CA = "fs.seaweed.ca";
- public static final String FS_SEAWEED_GRPC_CLIENT_KEY = "fs.seaweed.client.key";
- public static final String FS_SEAWEED_GRPC_CLIENT_CERT = "fs.seaweed.client.cert";
-
- private static final Logger LOG = LoggerFactory.getLogger(SeaweedFileSystem.class);
- private static int BUFFER_SIZE = 16 * 1024 * 1024;
-
- private URI uri;
- private Path workingDirectory = new Path("/");
- private SeaweedFileSystemStore seaweedFileSystemStore;
-
- public URI getUri() {
- return uri;
- }
-
- public String getScheme() {
- return "seaweedfs";
- }
-
- @Override
- public void initialize(URI uri, Configuration conf) throws IOException { // get
- super.initialize(uri, conf);
-
- // get host information from uri (overrides info in conf)
- String host = uri.getHost();
- host = (host == null) ? conf.get(FS_SEAWEED_FILER_HOST, "localhost") : host;
- if (host == null) {
- throw new IOException("Invalid host specified");
- }
- conf.set(FS_SEAWEED_FILER_HOST, host);
-
- // get port information from uri, (overrides info in conf)
- int port = uri.getPort();
- port = (port == -1) ? FS_SEAWEED_DEFAULT_PORT : port;
- conf.setInt(FS_SEAWEED_FILER_PORT, port);
-
- conf.setInt(IO_FILE_BUFFER_SIZE_KEY, BUFFER_SIZE);
-
- setConf(conf);
- this.uri = uri;
-
- if (conf.get(FS_SEAWEED_GRPC_CA) != null && conf.getTrimmed(FS_SEAWEED_GRPC_CA).length() != 0
- && conf.get(FS_SEAWEED_GRPC_CLIENT_CERT) != null && conf.getTrimmed(FS_SEAWEED_GRPC_CLIENT_CERT).length() != 0
- && conf.get(FS_SEAWEED_GRPC_CLIENT_KEY) != null && conf.getTrimmed(FS_SEAWEED_GRPC_CLIENT_KEY).length() != 0) {
- seaweedFileSystemStore = new SeaweedFileSystemStore(host, port,
- conf.get(FS_SEAWEED_GRPC_CA),
- conf.get(FS_SEAWEED_GRPC_CLIENT_CERT),
- conf.get(FS_SEAWEED_GRPC_CLIENT_KEY));
- } else {
- seaweedFileSystemStore = new SeaweedFileSystemStore(host, port);
- }
-
- }
-
- @Override
- public FSDataInputStream open(Path path, int bufferSize) throws IOException {
-
- LOG.debug("open path: {} bufferSize:{}", path, bufferSize);
-
- path = qualify(path);
-
- try {
- InputStream inputStream = seaweedFileSystemStore.openFileForRead(path, statistics, bufferSize);
- return new FSDataInputStream(inputStream);
- } catch (Exception ex) {
- return null;
- }
- }
-
- @Override
- public FSDataOutputStream create(Path path, FsPermission permission, final boolean overwrite, final int bufferSize,
- final short replication, final long blockSize, final Progressable progress) throws IOException {
-
- LOG.debug("create path: {} bufferSize:{} blockSize:{}", path, bufferSize, blockSize);
-
- path = qualify(path);
-
- try {
- String replicaPlacement = String.format("%03d", replication - 1);
- OutputStream outputStream = seaweedFileSystemStore.createFile(path, overwrite, permission, bufferSize, replicaPlacement);
- return new FSDataOutputStream(outputStream, statistics);
- } catch (Exception ex) {
- return null;
- }
- }
-
- @Override
- public FSDataOutputStream append(Path path, int bufferSize, Progressable progressable) throws IOException {
-
- LOG.debug("append path: {} bufferSize:{}", path, bufferSize);
-
- path = qualify(path);
- try {
- OutputStream outputStream = seaweedFileSystemStore.createFile(path, false, null, bufferSize, "");
- return new FSDataOutputStream(outputStream, statistics);
- } catch (Exception ex) {
- return null;
- }
- }
-
- @Override
- public boolean rename(Path src, Path dst) {
-
- LOG.debug("rename path: {} => {}", src, dst);
-
- if (src.isRoot()) {
- return false;
- }
-
- if (src.equals(dst)) {
- return true;
- }
- FileStatus dstFileStatus = getFileStatus(dst);
-
- String sourceFileName = src.getName();
- Path adjustedDst = dst;
-
- if (dstFileStatus != null) {
- if (!dstFileStatus.isDirectory()) {
- return false;
- }
- adjustedDst = new Path(dst, sourceFileName);
- }
-
- Path qualifiedSrcPath = qualify(src);
- Path qualifiedDstPath = qualify(adjustedDst);
-
- seaweedFileSystemStore.rename(qualifiedSrcPath, qualifiedDstPath);
- return true;
- }
-
- @Override
- public boolean delete(Path path, boolean recursive) {
-
- LOG.debug("delete path: {} recursive:{}", path, recursive);
-
- path = qualify(path);
-
- FileStatus fileStatus = getFileStatus(path);
-
- if (fileStatus == null) {
- return true;
- }
-
- return seaweedFileSystemStore.deleteEntries(path, fileStatus.isDirectory(), recursive);
-
- }
-
- @Override
- public FileStatus[] listStatus(Path path) throws IOException {
-
- LOG.debug("listStatus path: {}", path);
-
- path = qualify(path);
-
- return seaweedFileSystemStore.listEntries(path);
- }
-
- @Override
- public Path getWorkingDirectory() {
- return workingDirectory;
- }
-
- @Override
- public void setWorkingDirectory(Path path) {
- if (path.isAbsolute()) {
- workingDirectory = path;
- } else {
- workingDirectory = new Path(workingDirectory, path);
- }
- }
-
- @Override
- public boolean mkdirs(Path path, FsPermission fsPermission) throws IOException {
-
- LOG.debug("mkdirs path: {}", path);
-
- path = qualify(path);
-
- FileStatus fileStatus = getFileStatus(path);
-
- if (fileStatus == null) {
-
- UserGroupInformation currentUser = UserGroupInformation.getCurrentUser();
- return seaweedFileSystemStore.createDirectory(path, currentUser,
- fsPermission == null ? FsPermission.getDirDefault() : fsPermission,
- FsPermission.getUMask(getConf()));
-
- }
-
- if (fileStatus.isDirectory()) {
- return true;
- } else {
- throw new FileAlreadyExistsException("Path is a file: " + path);
- }
- }
-
- @Override
- public FileStatus getFileStatus(Path path) {
-
- LOG.debug("getFileStatus path: {}", path);
-
- path = qualify(path);
-
- return seaweedFileSystemStore.getFileStatus(path);
- }
-
- /**
- * Set owner of a path (i.e. a file or a directory).
- * The parameters owner and group cannot both be null.
- *
- * @param path The path
- * @param owner If it is null, the original username remains unchanged.
- * @param group If it is null, the original groupname remains unchanged.
- */
- @Override
- public void setOwner(Path path, final String owner, final String group)
- throws IOException {
- LOG.debug("setOwner path: {}", path);
- path = qualify(path);
-
- seaweedFileSystemStore.setOwner(path, owner, group);
- }
-
-
- /**
- * Set permission of a path.
- *
- * @param path The path
- * @param permission Access permission
- */
- @Override
- public void setPermission(Path path, final FsPermission permission) throws IOException {
- LOG.debug("setPermission path: {}", path);
-
- if (permission == null) {
- throw new IllegalArgumentException("The permission can't be null");
- }
-
- path = qualify(path);
-
- seaweedFileSystemStore.setPermission(path, permission);
- }
-
- Path qualify(Path path) {
- return path.makeQualified(uri, workingDirectory);
- }
-
- /**
- * Concat existing files together.
- *
- * @param trg the path to the target destination.
- * @param psrcs the paths to the sources to use for the concatenation.
- * @throws IOException IO failure
- * @throws UnsupportedOperationException if the operation is unsupported
- * (default).
- */
- @Override
- public void concat(final Path trg, final Path[] psrcs) throws IOException {
- throw new UnsupportedOperationException("Not implemented by the " +
- getClass().getSimpleName() + " FileSystem implementation");
- }
-
- /**
- * Truncate the file in the indicated path to the indicated size.
- * <ul>
- * <li>Fails if path is a directory.</li>
- * <li>Fails if path does not exist.</li>
- * <li>Fails if path is not closed.</li>
- * <li>Fails if new size is greater than current size.</li>
- * </ul>
- *
- * @param f The path to the file to be truncated
- * @param newLength The size the file is to be truncated to
- * @return <code>true</code> if the file has been truncated to the desired
- * <code>newLength</code> and is immediately available to be reused for
- * write operations such as <code>append</code>, or
- * <code>false</code> if a background process of adjusting the length of
- * the last block has been started, and clients should wait for it to
- * complete before proceeding with further file updates.
- * @throws IOException IO failure
- * @throws UnsupportedOperationException if the operation is unsupported
- * (default).
- */
- @Override
- public boolean truncate(Path f, long newLength) throws IOException {
- throw new UnsupportedOperationException("Not implemented by the " +
- getClass().getSimpleName() + " FileSystem implementation");
- }
-
- @Override
- public void createSymlink(final Path target, final Path link,
- final boolean createParent) throws AccessControlException,
- FileAlreadyExistsException, FileNotFoundException,
- ParentNotDirectoryException, UnsupportedFileSystemException,
- IOException {
- // Supporting filesystems should override this method
- throw new UnsupportedOperationException(
- "Filesystem does not support symlinks!");
- }
-
- public boolean supportsSymlinks() {
- return false;
- }
-
- /**
- * Create a snapshot.
- *
- * @param path The directory where snapshots will be taken.
- * @param snapshotName The name of the snapshot
- * @return the snapshot path.
- * @throws IOException IO failure
- * @throws UnsupportedOperationException if the operation is unsupported
- */
- @Override
- public Path createSnapshot(Path path, String snapshotName)
- throws IOException {
- throw new UnsupportedOperationException(getClass().getSimpleName()
- + " doesn't support createSnapshot");
- }
-
- /**
- * Rename a snapshot.
- *
- * @param path The directory path where the snapshot was taken
- * @param snapshotOldName Old name of the snapshot
- * @param snapshotNewName New name of the snapshot
- * @throws IOException IO failure
- * @throws UnsupportedOperationException if the operation is unsupported
- * (default outcome).
- */
- @Override
- public void renameSnapshot(Path path, String snapshotOldName,
- String snapshotNewName) throws IOException {
- throw new UnsupportedOperationException(getClass().getSimpleName()
- + " doesn't support renameSnapshot");
- }
-
- /**
- * Delete a snapshot of a directory.
- *
- * @param path The directory that the to-be-deleted snapshot belongs to
- * @param snapshotName The name of the snapshot
- * @throws IOException IO failure
- * @throws UnsupportedOperationException if the operation is unsupported
- * (default outcome).
- */
- @Override
- public void deleteSnapshot(Path path, String snapshotName)
- throws IOException {
- throw new UnsupportedOperationException(getClass().getSimpleName()
- + " doesn't support deleteSnapshot");
- }
-
- /**
- * Modifies ACL entries of files and directories. This method can add new ACL
- * entries or modify the permissions on existing ACL entries. All existing
- * ACL entries that are not specified in this call are retained without
- * changes. (Modifications are merged into the current ACL.)
- *
- * @param path Path to modify
- * @param aclSpec List&lt;AclEntry&gt; describing modifications
- * @throws IOException if an ACL could not be modified
- * @throws UnsupportedOperationException if the operation is unsupported
- * (default outcome).
- */
- @Override
- public void modifyAclEntries(Path path, List<AclEntry> aclSpec)
- throws IOException {
- throw new UnsupportedOperationException(getClass().getSimpleName()
- + " doesn't support modifyAclEntries");
- }
-
- /**
- * Removes ACL entries from files and directories. Other ACL entries are
- * retained.
- *
- * @param path Path to modify
- * @param aclSpec List describing entries to remove
- * @throws IOException if an ACL could not be modified
- * @throws UnsupportedOperationException if the operation is unsupported
- * (default outcome).
- */
- @Override
- public void removeAclEntries(Path path, List<AclEntry> aclSpec)
- throws IOException {
- throw new UnsupportedOperationException(getClass().getSimpleName()
- + " doesn't support removeAclEntries");
- }
-
- /**
- * Removes all default ACL entries from files and directories.
- *
- * @param path Path to modify
- * @throws IOException if an ACL could not be modified
- * @throws UnsupportedOperationException if the operation is unsupported
- * (default outcome).
- */
- @Override
- public void removeDefaultAcl(Path path)
- throws IOException {
- throw new UnsupportedOperationException(getClass().getSimpleName()
- + " doesn't support removeDefaultAcl");
- }
-
- /**
- * Removes all but the base ACL entries of files and directories. The entries
- * for user, group, and others are retained for compatibility with permission
- * bits.
- *
- * @param path Path to modify
- * @throws IOException if an ACL could not be removed
- * @throws UnsupportedOperationException if the operation is unsupported
- * (default outcome).
- */
- @Override
- public void removeAcl(Path path)
- throws IOException {
- throw new UnsupportedOperationException(getClass().getSimpleName()
- + " doesn't support removeAcl");
- }
-
- /**
- * Fully replaces ACL of files and directories, discarding all existing
- * entries.
- *
- * @param path Path to modify
- * @param aclSpec List describing modifications, which must include entries
- * for user, group, and others for compatibility with permission bits.
- * @throws IOException if an ACL could not be modified
- * @throws UnsupportedOperationException if the operation is unsupported
- * (default outcome).
- */
- @Override
- public void setAcl(Path path, List<AclEntry> aclSpec) throws IOException {
- throw new UnsupportedOperationException(getClass().getSimpleName()
- + " doesn't support setAcl");
- }
-
- /**
- * Gets the ACL of a file or directory.
- *
- * @param path Path to get
- * @return AclStatus describing the ACL of the file or directory
- * @throws IOException if an ACL could not be read
- * @throws UnsupportedOperationException if the operation is unsupported
- * (default outcome).
- */
- @Override
- public AclStatus getAclStatus(Path path) throws IOException {
- throw new UnsupportedOperationException(getClass().getSimpleName()
- + " doesn't support getAclStatus");
- }
-
- /**
- * Set an xattr of a file or directory.
- * The name must be prefixed with the namespace followed by ".". For example,
- * "user.attr".
- * <p>
- * Refer to the HDFS extended attributes user documentation for details.
- *
- * @param path Path to modify
- * @param name xattr name.
- * @param value xattr value.
- * @param flag xattr set flag
- * @throws IOException IO failure
- * @throws UnsupportedOperationException if the operation is unsupported
- * (default outcome).
- */
- @Override
- public void setXAttr(Path path, String name, byte[] value,
- EnumSet<XAttrSetFlag> flag) throws IOException {
- throw new UnsupportedOperationException(getClass().getSimpleName()
- + " doesn't support setXAttr");
- }
-
- /**
- * Get an xattr name and value for a file or directory.
- * The name must be prefixed with the namespace followed by ".". For example,
- * "user.attr".
- * <p>
- * Refer to the HDFS extended attributes user documentation for details.
- *
- * @param path Path to get extended attribute
- * @param name xattr name.
- * @return byte[] xattr value.
- * @throws IOException IO failure
- * @throws UnsupportedOperationException if the operation is unsupported
- * (default outcome).
- */
- @Override
- public byte[] getXAttr(Path path, String name) throws IOException {
- throw new UnsupportedOperationException(getClass().getSimpleName()
- + " doesn't support getXAttr");
- }
-
- /**
- * Get all of the xattr name/value pairs for a file or directory.
- * Only those xattrs which the logged-in user has permissions to view
- * are returned.
- * <p>
- * Refer to the HDFS extended attributes user documentation for details.
- *
- * @param path Path to get extended attributes
- * @return Map describing the XAttrs of the file or directory
- * @throws IOException IO failure
- * @throws UnsupportedOperationException if the operation is unsupported
- * (default outcome).
- */
- @Override
- public Map<String, byte[]> getXAttrs(Path path) throws IOException {
- throw new UnsupportedOperationException(getClass().getSimpleName()
- + " doesn't support getXAttrs");
- }
-
- /**
- * Get all of the xattrs name/value pairs for a file or directory.
- * Only those xattrs which the logged-in user has permissions to view
- * are returned.
- * <p>
- * Refer to the HDFS extended attributes user documentation for details.
- *
- * @param path Path to get extended attributes
- * @param names XAttr names.
- * @return Map describing the XAttrs of the file or directory
- * @throws IOException IO failure
- * @throws UnsupportedOperationException if the operation is unsupported
- * (default outcome).
- */
- @Override
- public Map<String, byte[]> getXAttrs(Path path, List<String> names)
- throws IOException {
- throw new UnsupportedOperationException(getClass().getSimpleName()
- + " doesn't support getXAttrs");
- }
-
- /**
- * Get all of the xattr names for a file or directory.
- * Only those xattr names which the logged-in user has permissions to view
- * are returned.
- * <p>
- * Refer to the HDFS extended attributes user documentation for details.
- *
- * @param path Path to get extended attributes
- * @return List{@literal <String>} of the XAttr names of the file or directory
- * @throws IOException IO failure
- * @throws UnsupportedOperationException if the operation is unsupported
- * (default outcome).
- */
- @Override
- public List<String> listXAttrs(Path path) throws IOException {
- throw new UnsupportedOperationException(getClass().getSimpleName()
- + " doesn't support listXAttrs");
- }
-
- /**
- * Remove an xattr of a file or directory.
- * The name must be prefixed with the namespace followed by ".". For example,
- * "user.attr".
- * <p>
- * Refer to the HDFS extended attributes user documentation for details.
- *
- * @param path Path to remove extended attribute
- * @param name xattr name
- * @throws IOException IO failure
- * @throws UnsupportedOperationException if the operation is unsupported
- * (default outcome).
- */
- @Override
- public void removeXAttr(Path path, String name) throws IOException {
- throw new UnsupportedOperationException(getClass().getSimpleName()
- + " doesn't support removeXAttr");
- }
-
-}
diff --git a/other/java/hdfs/src/main/java/seaweed/hdfs/SeaweedFileSystemStore.java b/other/java/hdfs/src/main/java/seaweed/hdfs/SeaweedFileSystemStore.java
deleted file mode 100644
index 643467898..000000000
--- a/other/java/hdfs/src/main/java/seaweed/hdfs/SeaweedFileSystemStore.java
+++ /dev/null
@@ -1,277 +0,0 @@
-package seaweed.hdfs;
-
-import org.apache.hadoop.fs.FileStatus;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.fs.permission.FsPermission;
-import org.apache.hadoop.security.UserGroupInformation;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import seaweedfs.client.FilerClient;
-import seaweedfs.client.FilerGrpcClient;
-import seaweedfs.client.FilerProto;
-import seaweedfs.client.SeaweedRead;
-
-import javax.net.ssl.SSLException;
-import java.io.FileNotFoundException;
-import java.io.IOException;
-import java.io.InputStream;
-import java.io.OutputStream;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.List;
-
-public class SeaweedFileSystemStore {
-
- private static final Logger LOG = LoggerFactory.getLogger(SeaweedFileSystemStore.class);
-
- private FilerGrpcClient filerGrpcClient;
- private FilerClient filerClient;
-
- public SeaweedFileSystemStore(String host, int port) {
- int grpcPort = 10000 + port;
- filerGrpcClient = new FilerGrpcClient(host, grpcPort);
- filerClient = new FilerClient(filerGrpcClient);
- }
-
- public SeaweedFileSystemStore(String host, int port,
- String caFile, String clientCertFile, String clientKeyFile) throws SSLException {
- int grpcPort = 10000 + port;
- filerGrpcClient = new FilerGrpcClient(host, grpcPort, caFile, clientCertFile, clientKeyFile);
- filerClient = new FilerClient(filerGrpcClient);
- }
-
- public static String getParentDirectory(Path path) {
- return path.isRoot() ? "/" : path.getParent().toUri().getPath();
- }
-
- static int permissionToMode(FsPermission permission, boolean isDirectory) {
- int p = permission.toShort();
- if (isDirectory) {
- p = p | 1 << 31;
- }
- return p;
- }
-
- public boolean createDirectory(final Path path, UserGroupInformation currentUser,
- final FsPermission permission, final FsPermission umask) {
-
- LOG.debug("createDirectory path: {} permission: {} umask: {}",
- path,
- permission,
- umask);
-
- return filerClient.mkdirs(
- path.toUri().getPath(),
- permissionToMode(permission, true),
- currentUser.getUserName(),
- currentUser.getGroupNames()
- );
- }
-
- public FileStatus[] listEntries(final Path path) {
- LOG.debug("listEntries path: {}", path);
-
- List<FileStatus> fileStatuses = new ArrayList<FileStatus>();
-
- List<FilerProto.Entry> entries = filerClient.listEntries(path.toUri().getPath());
-
- for (FilerProto.Entry entry : entries) {
-
- FileStatus fileStatus = doGetFileStatus(new Path(path, entry.getName()), entry);
-
- fileStatuses.add(fileStatus);
- }
- return fileStatuses.toArray(new FileStatus[0]);
- }
-
- public FileStatus getFileStatus(final Path path) {
-
- FilerProto.Entry entry = lookupEntry(path);
- if (entry == null) {
- return null;
- }
- LOG.debug("doGetFileStatus path:{} entry:{}", path, entry);
-
- FileStatus fileStatus = doGetFileStatus(path, entry);
- return fileStatus;
- }
-
- public boolean deleteEntries(final Path path, boolean isDirectory, boolean recursive) {
- LOG.debug("deleteEntries path: {} isDirectory {} recursive: {}",
- path,
- String.valueOf(isDirectory),
- String.valueOf(recursive));
-
- if (path.isRoot()) {
- return true;
- }
-
- if (recursive && isDirectory) {
- List<FilerProto.Entry> entries = filerClient.listEntries(path.toUri().getPath());
- for (FilerProto.Entry entry : entries) {
- deleteEntries(new Path(path, entry.getName()), entry.getIsDirectory(), true);
- }
- }
-
- return filerClient.deleteEntry(getParentDirectory(path), path.getName(), true, recursive);
- }
-
- private FileStatus doGetFileStatus(Path path, FilerProto.Entry entry) {
- FilerProto.FuseAttributes attributes = entry.getAttributes();
- long length = SeaweedRead.totalSize(entry.getChunksList());
- boolean isDir = entry.getIsDirectory();
- int block_replication = 1;
- int blocksize = 512;
- long modification_time = attributes.getMtime() * 1000; // milliseconds
- long access_time = 0;
- FsPermission permission = FsPermission.createImmutable((short) attributes.getFileMode());
- String owner = attributes.getUserName();
- String group = attributes.getGroupNameCount() > 0 ? attributes.getGroupName(0) : "";
- return new FileStatus(length, isDir, block_replication, blocksize,
- modification_time, access_time, permission, owner, group, null, path);
- }
-
- private FilerProto.Entry lookupEntry(Path path) {
-
- return filerClient.lookupEntry(getParentDirectory(path), path.getName());
-
- }
-
- public void rename(Path source, Path destination) {
-
- LOG.debug("rename source: {} destination:{}", source, destination);
-
- if (source.isRoot()) {
- return;
- }
- LOG.warn("rename lookupEntry source: {}", source);
- FilerProto.Entry entry = lookupEntry(source);
- if (entry == null) {
- LOG.warn("rename non-existing source: {}", source);
- return;
- }
- filerClient.mv(source.toUri().getPath(), destination.toUri().getPath());
- }
-
- public OutputStream createFile(final Path path,
- final boolean overwrite,
- FsPermission permission,
- int bufferSize,
- String replication) throws IOException {
-
- permission = permission == null ? FsPermission.getFileDefault() : permission;
-
- LOG.debug("createFile path: {} overwrite: {} permission: {}",
- path,
- overwrite,
- permission.toString());
-
- UserGroupInformation userGroupInformation = UserGroupInformation.getCurrentUser();
- long now = System.currentTimeMillis() / 1000L;
-
- FilerProto.Entry.Builder entry = null;
- long writePosition = 0;
- if (!overwrite) {
- FilerProto.Entry existingEntry = lookupEntry(path);
- LOG.debug("createFile merged entry path:{} existingEntry:{}", path, existingEntry);
- if (existingEntry != null) {
- entry = FilerProto.Entry.newBuilder();
- entry.mergeFrom(existingEntry);
- entry.getAttributesBuilder().setMtime(now);
- }
- LOG.debug("createFile merged entry path:{} entry:{} from:{}", path, entry, existingEntry);
- writePosition = SeaweedRead.totalSize(existingEntry.getChunksList());
- replication = existingEntry.getAttributes().getReplication();
- }
- if (entry == null) {
- entry = FilerProto.Entry.newBuilder()
- .setName(path.getName())
- .setIsDirectory(false)
- .setAttributes(FilerProto.FuseAttributes.newBuilder()
- .setFileMode(permissionToMode(permission, false))
- .setReplication(replication)
- .setCrtime(now)
- .setMtime(now)
- .setUserName(userGroupInformation.getUserName())
- .clearGroupName()
- .addAllGroupName(Arrays.asList(userGroupInformation.getGroupNames()))
- );
- }
-
- return new SeaweedOutputStream(filerGrpcClient, path, entry, writePosition, bufferSize, replication);
-
- }
-
- public InputStream openFileForRead(final Path path, FileSystem.Statistics statistics,
- int bufferSize) throws IOException {
-
- LOG.debug("openFileForRead path:{} bufferSize:{}", path, bufferSize);
-
- int readAheadQueueDepth = 2;
- FilerProto.Entry entry = lookupEntry(path);
-
- if (entry == null) {
- throw new FileNotFoundException("read non-exist file " + path);
- }
-
- return new SeaweedInputStream(filerGrpcClient,
- statistics,
- path.toUri().getPath(),
- entry,
- bufferSize,
- readAheadQueueDepth);
- }
-
- public void setOwner(Path path, String owner, String group) {
-
- LOG.debug("setOwner path:{} owner:{} group:{}", path, owner, group);
-
- FilerProto.Entry entry = lookupEntry(path);
- if (entry == null) {
- LOG.debug("setOwner path:{} entry:{}", path, entry);
- return;
- }
-
- FilerProto.Entry.Builder entryBuilder = entry.toBuilder();
- FilerProto.FuseAttributes.Builder attributesBuilder = entry.getAttributes().toBuilder();
-
- if (owner != null) {
- attributesBuilder.setUserName(owner);
- }
- if (group != null) {
- attributesBuilder.clearGroupName();
- attributesBuilder.addGroupName(group);
- }
-
- entryBuilder.setAttributes(attributesBuilder);
-
- LOG.debug("setOwner path:{} entry:{}", path, entryBuilder);
-
- filerClient.updateEntry(getParentDirectory(path), entryBuilder.build());
-
- }
-
- public void setPermission(Path path, FsPermission permission) {
-
- LOG.debug("setPermission path:{} permission:{}", path, permission);
-
- FilerProto.Entry entry = lookupEntry(path);
- if (entry == null) {
- LOG.debug("setPermission path:{} entry:{}", path, entry);
- return;
- }
-
- FilerProto.Entry.Builder entryBuilder = entry.toBuilder();
- FilerProto.FuseAttributes.Builder attributesBuilder = entry.getAttributes().toBuilder();
-
- attributesBuilder.setFileMode(permissionToMode(permission, entry.getIsDirectory()));
-
- entryBuilder.setAttributes(attributesBuilder);
-
- LOG.debug("setPermission path:{} entry:{}", path, entryBuilder);
-
- filerClient.updateEntry(getParentDirectory(path), entryBuilder.build());
-
- }
-}
diff --git a/other/java/hdfs/src/main/java/seaweed/hdfs/SeaweedInputStream.java b/other/java/hdfs/src/main/java/seaweed/hdfs/SeaweedInputStream.java
deleted file mode 100644
index 90c14c772..000000000
--- a/other/java/hdfs/src/main/java/seaweed/hdfs/SeaweedInputStream.java
+++ /dev/null
@@ -1,371 +0,0 @@
-package seaweed.hdfs;
-
-// based on org.apache.hadoop.fs.azurebfs.services.AbfsInputStream
-
-import com.google.common.base.Preconditions;
-import org.apache.hadoop.fs.FSExceptionMessages;
-import org.apache.hadoop.fs.FSInputStream;
-import org.apache.hadoop.fs.FileSystem.Statistics;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import seaweedfs.client.FilerGrpcClient;
-import seaweedfs.client.FilerProto;
-import seaweedfs.client.SeaweedRead;
-
-import java.io.EOFException;
-import java.io.IOException;
-import java.util.List;
-
-public class SeaweedInputStream extends FSInputStream {
-
- private static final Logger LOG = LoggerFactory.getLogger(SeaweedInputStream.class);
-
- private final FilerGrpcClient filerGrpcClient;
- private final Statistics statistics;
- private final String path;
- private final FilerProto.Entry entry;
- private final List<SeaweedRead.VisibleInterval> visibleIntervalList;
- private final long contentLength;
- private final int bufferSize; // default buffer size
- private final int readAheadQueueDepth; // initialized in constructor
- private final boolean readAheadEnabled; // whether enable readAhead;
-
- private byte[] buffer = null; // will be initialized on first use
-
- private long fCursor = 0; // cursor of buffer within file - offset of next byte to read from remote server
- private long fCursorAfterLastRead = -1;
- private int bCursor = 0; // cursor of read within buffer - offset of next byte to be returned from buffer
- private int limit = 0; // offset of next byte to be read into buffer from service (i.e., upper marker+1
- // of valid bytes in buffer)
- private boolean closed = false;
-
- public SeaweedInputStream(
- final FilerGrpcClient filerGrpcClient,
- final Statistics statistics,
- final String path,
- final FilerProto.Entry entry,
- final int bufferSize,
- final int readAheadQueueDepth) {
- this.filerGrpcClient = filerGrpcClient;
- this.statistics = statistics;
- this.path = path;
- this.entry = entry;
- this.contentLength = SeaweedRead.totalSize(entry.getChunksList());
- this.bufferSize = bufferSize;
- this.readAheadQueueDepth = (readAheadQueueDepth >= 0) ? readAheadQueueDepth : Runtime.getRuntime().availableProcessors();
- this.readAheadEnabled = true;
-
- this.visibleIntervalList = SeaweedRead.nonOverlappingVisibleIntervals(entry.getChunksList());
-
- LOG.debug("new path:{} entry:{} visibleIntervalList:{}", path, entry, visibleIntervalList);
-
- }
-
- public String getPath() {
- return path;
- }
-
- @Override
- public int read() throws IOException {
- byte[] b = new byte[1];
- int numberOfBytesRead = read(b, 0, 1);
- if (numberOfBytesRead < 0) {
- return -1;
- } else {
- return (b[0] & 0xFF);
- }
- }
-
- @Override
- public synchronized int read(final byte[] b, final int off, final int len) throws IOException {
- int currentOff = off;
- int currentLen = len;
- int lastReadBytes;
- int totalReadBytes = 0;
- do {
- lastReadBytes = readOneBlock(b, currentOff, currentLen);
- if (lastReadBytes > 0) {
- currentOff += lastReadBytes;
- currentLen -= lastReadBytes;
- totalReadBytes += lastReadBytes;
- }
- if (currentLen <= 0 || currentLen > b.length - currentOff) {
- break;
- }
- } while (lastReadBytes > 0);
- return totalReadBytes > 0 ? totalReadBytes : lastReadBytes;
- }
-
- private int readOneBlock(final byte[] b, final int off, final int len) throws IOException {
- if (closed) {
- throw new IOException(FSExceptionMessages.STREAM_IS_CLOSED);
- }
-
- Preconditions.checkNotNull(b);
-
- if (len == 0) {
- return 0;
- }
-
- if (this.available() == 0) {
- return -1;
- }
-
- if (off < 0 || len < 0 || len > b.length - off) {
- throw new IndexOutOfBoundsException();
- }
-
- //If buffer is empty, then fill the buffer.
- if (bCursor == limit) {
- //If EOF, then return -1
- if (fCursor >= contentLength) {
- return -1;
- }
-
- long bytesRead = 0;
- //reset buffer to initial state - i.e., throw away existing data
- bCursor = 0;
- limit = 0;
- if (buffer == null) {
- buffer = new byte[bufferSize];
- }
-
- // Enable readAhead when reading sequentially
- if (-1 == fCursorAfterLastRead || fCursorAfterLastRead == fCursor || b.length >= bufferSize) {
- bytesRead = readInternal(fCursor, buffer, 0, bufferSize, false);
- } else {
- bytesRead = readInternal(fCursor, buffer, 0, b.length, true);
- }
-
- if (bytesRead == -1) {
- return -1;
- }
-
- limit += bytesRead;
- fCursor += bytesRead;
- fCursorAfterLastRead = fCursor;
- }
-
- //If there is anything in the buffer, then return lesser of (requested bytes) and (bytes in buffer)
- //(bytes returned may be less than requested)
- int bytesRemaining = limit - bCursor;
- int bytesToRead = Math.min(len, bytesRemaining);
- System.arraycopy(buffer, bCursor, b, off, bytesToRead);
- bCursor += bytesToRead;
- if (statistics != null) {
- statistics.incrementBytesRead(bytesToRead);
- }
- return bytesToRead;
- }
-
-
- private int readInternal(final long position, final byte[] b, final int offset, final int length,
- final boolean bypassReadAhead) throws IOException {
- if (readAheadEnabled && !bypassReadAhead) {
- // try reading from read-ahead
- if (offset != 0) {
- throw new IllegalArgumentException("readahead buffers cannot have non-zero buffer offsets");
- }
- int receivedBytes;
-
- // queue read-aheads
- int numReadAheads = this.readAheadQueueDepth;
- long nextSize;
- long nextOffset = position;
- while (numReadAheads > 0 && nextOffset < contentLength) {
- nextSize = Math.min((long) bufferSize, contentLength - nextOffset);
- ReadBufferManager.getBufferManager().queueReadAhead(this, nextOffset, (int) nextSize);
- nextOffset = nextOffset + nextSize;
- numReadAheads--;
- }
-
- // try reading from buffers first
- receivedBytes = ReadBufferManager.getBufferManager().getBlock(this, position, length, b);
- if (receivedBytes > 0) {
- return receivedBytes;
- }
-
- // got nothing from read-ahead, do our own read now
- receivedBytes = readRemote(position, b, offset, length);
- return receivedBytes;
- } else {
- return readRemote(position, b, offset, length);
- }
- }
-
- int readRemote(long position, byte[] b, int offset, int length) throws IOException {
- if (position < 0) {
- throw new IllegalArgumentException("attempting to read from negative offset");
- }
- if (position >= contentLength) {
- return -1; // Hadoop prefers -1 to EOFException
- }
- if (b == null) {
- throw new IllegalArgumentException("null byte array passed in to read() method");
- }
- if (offset >= b.length) {
- throw new IllegalArgumentException("offset greater than length of array");
- }
- if (length < 0) {
- throw new IllegalArgumentException("requested read length is less than zero");
- }
- if (length > (b.length - offset)) {
- throw new IllegalArgumentException("requested read length is more than will fit after requested offset in buffer");
- }
-
- long bytesRead = SeaweedRead.read(filerGrpcClient, visibleIntervalList, position, b, offset, length);
- if (bytesRead > Integer.MAX_VALUE) {
- throw new IOException("Unexpected Content-Length");
- }
- return (int) bytesRead;
- }
-
- /**
- * Seek to given position in stream.
- *
- * @param n position to seek to
- * @throws IOException if there is an error
- * @throws EOFException if attempting to seek past end of file
- */
- @Override
- public synchronized void seek(long n) throws IOException {
- if (closed) {
- throw new IOException(FSExceptionMessages.STREAM_IS_CLOSED);
- }
- if (n < 0) {
- throw new EOFException(FSExceptionMessages.NEGATIVE_SEEK);
- }
- if (n > contentLength) {
- throw new EOFException(FSExceptionMessages.CANNOT_SEEK_PAST_EOF);
- }
-
- if (n >= fCursor - limit && n <= fCursor) { // within buffer
- bCursor = (int) (n - (fCursor - limit));
- return;
- }
-
- // next read will read from here
- fCursor = n;
-
- //invalidate buffer
- limit = 0;
- bCursor = 0;
- }
-
- @Override
- public synchronized long skip(long n) throws IOException {
- if (closed) {
- throw new IOException(FSExceptionMessages.STREAM_IS_CLOSED);
- }
- long currentPos = getPos();
- if (currentPos == contentLength) {
- if (n > 0) {
- throw new EOFException(FSExceptionMessages.CANNOT_SEEK_PAST_EOF);
- }
- }
- long newPos = currentPos + n;
- if (newPos < 0) {
- newPos = 0;
- n = newPos - currentPos;
- }
- if (newPos > contentLength) {
- newPos = contentLength;
- n = newPos - currentPos;
- }
- seek(newPos);
- return n;
- }
-
- /**
- * Return the size of the remaining available bytes
- * if the size is less than or equal to {@link Integer#MAX_VALUE},
- * otherwise, return {@link Integer#MAX_VALUE}.
- * <p>
- * This is to match the behavior of DFSInputStream.available(),
- * which some clients may rely on (HBase write-ahead log reading in
- * particular).
- */
- @Override
- public synchronized int available() throws IOException {
- if (closed) {
- throw new IOException(
- FSExceptionMessages.STREAM_IS_CLOSED);
- }
- final long remaining = this.contentLength - this.getPos();
- return remaining <= Integer.MAX_VALUE
- ? (int) remaining : Integer.MAX_VALUE;
- }
-
- /**
- * Returns the length of the file that this stream refers to. Note that the length returned is the length
- * as of the time the Stream was opened. Specifically, if there have been subsequent appends to the file,
- * they wont be reflected in the returned length.
- *
- * @return length of the file.
- * @throws IOException if the stream is closed
- */
- public long length() throws IOException {
- if (closed) {
- throw new IOException(FSExceptionMessages.STREAM_IS_CLOSED);
- }
- return contentLength;
- }
-
- /**
- * Return the current offset from the start of the file
- *
- * @throws IOException throws {@link IOException} if there is an error
- */
- @Override
- public synchronized long getPos() throws IOException {
- if (closed) {
- throw new IOException(FSExceptionMessages.STREAM_IS_CLOSED);
- }
- return fCursor - limit + bCursor;
- }
-
- /**
- * Seeks a different copy of the data. Returns true if
- * found a new source, false otherwise.
- *
- * @throws IOException throws {@link IOException} if there is an error
- */
- @Override
- public boolean seekToNewSource(long l) throws IOException {
- return false;
- }
-
- @Override
- public synchronized void close() throws IOException {
- closed = true;
- buffer = null; // de-reference the buffer so it can be GC'ed sooner
- }
-
- /**
- * Not supported by this stream. Throws {@link UnsupportedOperationException}
- *
- * @param readlimit ignored
- */
- @Override
- public synchronized void mark(int readlimit) {
- throw new UnsupportedOperationException("mark()/reset() not supported on this stream");
- }
-
- /**
- * Not supported by this stream. Throws {@link UnsupportedOperationException}
- */
- @Override
- public synchronized void reset() throws IOException {
- throw new UnsupportedOperationException("mark()/reset() not supported on this stream");
- }
-
- /**
- * gets whether mark and reset are supported by {@code ADLFileInputStream}. Always returns false.
- *
- * @return always {@code false}
- */
- @Override
- public boolean markSupported() {
- return false;
- }
-}
diff --git a/other/java/hdfs/src/main/java/seaweed/hdfs/SeaweedOutputStream.java b/other/java/hdfs/src/main/java/seaweed/hdfs/SeaweedOutputStream.java
deleted file mode 100644
index 4f307ff96..000000000
--- a/other/java/hdfs/src/main/java/seaweed/hdfs/SeaweedOutputStream.java
+++ /dev/null
@@ -1,335 +0,0 @@
-package seaweed.hdfs;
-
-// adapted from org.apache.hadoop.fs.azurebfs.services.AbfsOutputStream
-
-import com.google.common.base.Preconditions;
-import org.apache.hadoop.fs.FSExceptionMessages;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.fs.StreamCapabilities;
-import org.apache.hadoop.fs.Syncable;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import seaweedfs.client.FilerGrpcClient;
-import seaweedfs.client.FilerProto;
-import seaweedfs.client.SeaweedWrite;
-
-import java.io.IOException;
-import java.io.InterruptedIOException;
-import java.io.OutputStream;
-import java.util.Locale;
-import java.util.concurrent.Callable;
-import java.util.concurrent.ConcurrentLinkedDeque;
-import java.util.concurrent.ExecutorCompletionService;
-import java.util.concurrent.Future;
-import java.util.concurrent.LinkedBlockingQueue;
-import java.util.concurrent.ThreadPoolExecutor;
-import java.util.concurrent.TimeUnit;
-
-import static seaweed.hdfs.SeaweedFileSystemStore.getParentDirectory;
-
-public class SeaweedOutputStream extends OutputStream implements Syncable, StreamCapabilities {
-
- private static final Logger LOG = LoggerFactory.getLogger(SeaweedOutputStream.class);
-
- private final FilerGrpcClient filerGrpcClient;
- private final Path path;
- private final int bufferSize;
- private final int maxConcurrentRequestCount;
- private final ThreadPoolExecutor threadExecutor;
- private final ExecutorCompletionService<Void> completionService;
- private FilerProto.Entry.Builder entry;
- private long position;
- private boolean closed;
- private boolean supportFlush = true;
- private volatile IOException lastError;
- private long lastFlushOffset;
- private long lastTotalAppendOffset = 0;
- private byte[] buffer;
- private int bufferIndex;
- private ConcurrentLinkedDeque<WriteOperation> writeOperations;
- private String replication = "000";
-
- public SeaweedOutputStream(FilerGrpcClient filerGrpcClient, final Path path, FilerProto.Entry.Builder entry,
- final long position, final int bufferSize, final String replication) {
- this.filerGrpcClient = filerGrpcClient;
- this.replication = replication;
- this.path = path;
- this.position = position;
- this.closed = false;
- this.lastError = null;
- this.lastFlushOffset = 0;
- this.bufferSize = bufferSize;
- this.buffer = new byte[bufferSize];
- this.bufferIndex = 0;
- this.writeOperations = new ConcurrentLinkedDeque<>();
-
- this.maxConcurrentRequestCount = 4 * Runtime.getRuntime().availableProcessors();
-
- this.threadExecutor
- = new ThreadPoolExecutor(maxConcurrentRequestCount,
- maxConcurrentRequestCount,
- 10L,
- TimeUnit.SECONDS,
- new LinkedBlockingQueue<Runnable>());
- this.completionService = new ExecutorCompletionService<>(this.threadExecutor);
-
- this.entry = entry;
-
- }
-
- private synchronized void flushWrittenBytesToServiceInternal(final long offset) throws IOException {
-
- LOG.debug("SeaweedWrite.writeMeta path: {} entry:{}", path, entry);
-
- try {
- SeaweedWrite.writeMeta(filerGrpcClient, getParentDirectory(path), entry);
- } catch (Exception ex) {
- throw new IOException(ex);
- }
- this.lastFlushOffset = offset;
- }
-
- @Override
- public void write(final int byteVal) throws IOException {
- write(new byte[]{(byte) (byteVal & 0xFF)});
- }
-
- @Override
- public synchronized void write(final byte[] data, final int off, final int length)
- throws IOException {
- maybeThrowLastError();
-
- Preconditions.checkArgument(data != null, "null data");
-
- if (off < 0 || length < 0 || length > data.length - off) {
- throw new IndexOutOfBoundsException();
- }
-
- int currentOffset = off;
- int writableBytes = bufferSize - bufferIndex;
- int numberOfBytesToWrite = length;
-
- while (numberOfBytesToWrite > 0) {
- if (writableBytes <= numberOfBytesToWrite) {
- System.arraycopy(data, currentOffset, buffer, bufferIndex, writableBytes);
- bufferIndex += writableBytes;
- writeCurrentBufferToService();
- currentOffset += writableBytes;
- numberOfBytesToWrite = numberOfBytesToWrite - writableBytes;
- } else {
- System.arraycopy(data, currentOffset, buffer, bufferIndex, numberOfBytesToWrite);
- bufferIndex += numberOfBytesToWrite;
- numberOfBytesToWrite = 0;
- }
-
- writableBytes = bufferSize - bufferIndex;
- }
- }
-
- /**
- * Flushes this output stream and forces any buffered output bytes to be
- * written out. If any data remains in the payload it is committed to the
- * service. Data is queued for writing and forced out to the service
- * before the call returns.
- */
- @Override
- public void flush() throws IOException {
- if (supportFlush) {
- flushInternalAsync();
- }
- }
-
- /**
- * Similar to posix fsync, flush out the data in client's user buffer
- * all the way to the disk device (but the disk may have it in its cache).
- *
- * @throws IOException if error occurs
- */
- @Override
- public void hsync() throws IOException {
- if (supportFlush) {
- flushInternal();
- }
- }
-
- /**
- * Flush out the data in client's user buffer. After the return of
- * this call, new readers will see the data.
- *
- * @throws IOException if any error occurs
- */
- @Override
- public void hflush() throws IOException {
- if (supportFlush) {
- flushInternal();
- }
- }
-
- /**
- * Query the stream for a specific capability.
- *
- * @param capability string to query the stream support for.
- * @return true for hsync and hflush.
- */
- @Override
- public boolean hasCapability(String capability) {
- switch (capability.toLowerCase(Locale.ENGLISH)) {
- case StreamCapabilities.HSYNC:
- case StreamCapabilities.HFLUSH:
- return supportFlush;
- default:
- return false;
- }
- }
-
- /**
- * Force all data in the output stream to be written to Azure storage.
- * Wait to return until this is complete. Close the access to the stream and
- * shutdown the upload thread pool.
- * If the blob was created, its lease will be released.
- * Any error encountered caught in threads and stored will be rethrown here
- * after cleanup.
- */
- @Override
- public synchronized void close() throws IOException {
- if (closed) {
- return;
- }
-
- LOG.debug("close path: {}", path);
- try {
- flushInternal();
- threadExecutor.shutdown();
- } finally {
- lastError = new IOException(FSExceptionMessages.STREAM_IS_CLOSED);
- buffer = null;
- bufferIndex = 0;
- closed = true;
- writeOperations.clear();
- if (!threadExecutor.isShutdown()) {
- threadExecutor.shutdownNow();
- }
- }
- }
-
- private synchronized void writeCurrentBufferToService() throws IOException {
- if (bufferIndex == 0) {
- return;
- }
-
- final byte[] bytes = buffer;
- final int bytesLength = bufferIndex;
-
- buffer = new byte[bufferSize];
- bufferIndex = 0;
- final long offset = position;
- position += bytesLength;
-
- if (threadExecutor.getQueue().size() >= maxConcurrentRequestCount * 2) {
- waitForTaskToComplete();
- }
-
- final Future<Void> job = completionService.submit(new Callable<Void>() {
- @Override
- public Void call() throws Exception {
- // originally: client.append(path, offset, bytes, 0, bytesLength);
- SeaweedWrite.writeData(entry, replication, filerGrpcClient, offset, bytes, 0, bytesLength);
- return null;
- }
- });
-
- writeOperations.add(new WriteOperation(job, offset, bytesLength));
-
- // Try to shrink the queue
- shrinkWriteOperationQueue();
- }
-
- private void waitForTaskToComplete() throws IOException {
- boolean completed;
- for (completed = false; completionService.poll() != null; completed = true) {
- // keep polling until there is no data
- }
-
- if (!completed) {
- try {
- completionService.take();
- } catch (InterruptedException e) {
- lastError = (IOException) new InterruptedIOException(e.toString()).initCause(e);
- throw lastError;
- }
- }
- }
-
- private void maybeThrowLastError() throws IOException {
- if (lastError != null) {
- throw lastError;
- }
- }
-
- /**
- * Try to remove the completed write operations from the beginning of write
- * operation FIFO queue.
- */
- private synchronized void shrinkWriteOperationQueue() throws IOException {
- try {
- while (writeOperations.peek() != null && writeOperations.peek().task.isDone()) {
- writeOperations.peek().task.get();
- lastTotalAppendOffset += writeOperations.peek().length;
- writeOperations.remove();
- }
- } catch (Exception e) {
- lastError = new IOException(e);
- throw lastError;
- }
- }
-
- private synchronized void flushInternal() throws IOException {
- maybeThrowLastError();
- writeCurrentBufferToService();
- flushWrittenBytesToService();
- }
-
- private synchronized void flushInternalAsync() throws IOException {
- maybeThrowLastError();
- writeCurrentBufferToService();
- flushWrittenBytesToServiceAsync();
- }
-
- private synchronized void flushWrittenBytesToService() throws IOException {
- for (WriteOperation writeOperation : writeOperations) {
- try {
- writeOperation.task.get();
- } catch (Exception ex) {
- lastError = new IOException(ex);
- throw lastError;
- }
- }
- LOG.debug("flushWrittenBytesToService: {} position:{}", path, position);
- flushWrittenBytesToServiceInternal(position);
- }
-
- private synchronized void flushWrittenBytesToServiceAsync() throws IOException {
- shrinkWriteOperationQueue();
-
- if (this.lastTotalAppendOffset > this.lastFlushOffset) {
- this.flushWrittenBytesToServiceInternal(this.lastTotalAppendOffset);
- }
- }
-
- private static class WriteOperation {
- private final Future<Void> task;
- private final long startOffset;
- private final long length;
-
- WriteOperation(final Future<Void> task, final long startOffset, final long length) {
- Preconditions.checkNotNull(task, "task");
- Preconditions.checkArgument(startOffset >= 0, "startOffset");
- Preconditions.checkArgument(length >= 0, "length");
-
- this.task = task;
- this.startOffset = startOffset;
- this.length = length;
- }
- }
-
-}