diff options
Diffstat (limited to 'other/java/hdfs')
9 files changed, 0 insertions, 2383 deletions
diff --git a/other/java/hdfs/pom.xml b/other/java/hdfs/pom.xml deleted file mode 100644 index 6a1cd897f..000000000 --- a/other/java/hdfs/pom.xml +++ /dev/null @@ -1,159 +0,0 @@ -<?xml version="1.0" encoding="UTF-8"?> -<project xmlns="http://maven.apache.org/POM/4.0.0" - xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" - xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> - <modelVersion>4.0.0</modelVersion> - - <properties> - <seaweedfs.client.version>1.1.0</seaweedfs.client.version> - <hadoop.version>3.1.1</hadoop.version> - </properties> - - <groupId>com.github.chrislusf</groupId> - <artifactId>seaweedfs-hadoop-client</artifactId> - <version>${seaweedfs.client.version}</version> - - <parent> - <groupId>org.sonatype.oss</groupId> - <artifactId>oss-parent</artifactId> - <version>9</version> - </parent> - - <distributionManagement> - <snapshotRepository> - <id>ossrh</id> - <url>https://oss.sonatype.org/content/repositories/snapshots</url> - </snapshotRepository> - </distributionManagement> - <build> - <plugins> - <plugin> - <groupId>org.apache.maven.plugins</groupId> - <artifactId>maven-compiler-plugin</artifactId> - <configuration> - <source>7</source> - <target>7</target> - </configuration> - </plugin> - <plugin> - <groupId>org.apache.maven.plugins</groupId> - <artifactId>maven-shade-plugin</artifactId> - <version>3.2.1</version> - <executions> - <execution> - <phase>package</phase> - <goals> - <goal>shade</goal> - </goals> - <configuration> - <filters> - <filter> - <artifact>*:*</artifact> - <excludes> - <exclude>META-INF/*.SF</exclude> - <exclude>META-INF/*.DSA</exclude> - <exclude>META-INF/*.RSA</exclude> - <exclude>org/slf4j/**</exclude> - <exclude>META-INF/maven/org.slf4j/**</exclude> - </excludes> - </filter> - </filters> - <transformers> - <transformer - implementation="org.apache.maven.plugins.shade.resource.ServicesResourceTransformer"/> - </transformers> - <relocations> - <relocation> - <pattern>com.google</pattern> - <shadedPattern>shaded.com.google</shadedPattern> - </relocation> - <relocation> - <pattern>io.grpc.internal</pattern> - <shadedPattern>shaded.io.grpc.internal</shadedPattern> - </relocation> - <relocation> - <pattern>org.apache.commons</pattern> - <shadedPattern>shaded.org.apache.commons</shadedPattern> - <excludes> - <exclude>org.apache.hadoop</exclude> - <exclude>org.apache.log4j</exclude> - </excludes> - </relocation> - </relocations> - </configuration> - </execution> - </executions> - </plugin> - <plugin> - <groupId>org.apache.maven.plugins</groupId> - <artifactId>maven-gpg-plugin</artifactId> - <version>1.5</version> - <executions> - <execution> - <id>sign-artifacts</id> - <phase>verify</phase> - <goals> - <goal>sign</goal> - </goals> - </execution> - </executions> - </plugin> - <plugin> - <groupId>org.sonatype.plugins</groupId> - <artifactId>nexus-staging-maven-plugin</artifactId> - <version>1.6.7</version> - <extensions>true</extensions> - <configuration> - <serverId>ossrh</serverId> - <nexusUrl>https://oss.sonatype.org/</nexusUrl> - <autoReleaseAfterClose>true</autoReleaseAfterClose> - </configuration> - </plugin> - <plugin> - <groupId>org.apache.maven.plugins</groupId> - <artifactId>maven-source-plugin</artifactId> - <version>2.2.1</version> - <executions> - <execution> - <id>attach-sources</id> - <goals> - <goal>jar-no-fork</goal> - </goals> - </execution> - </executions> - </plugin> - <plugin> - <groupId>org.apache.maven.plugins</groupId> - <artifactId>maven-javadoc-plugin</artifactId> - <version>2.9.1</version> - <executions> - <execution> - <id>attach-javadocs</id> - <goals> - <goal>jar</goal> - </goals> - </execution> - </executions> - </plugin> - </plugins> - </build> - - <dependencies> - <dependency> - <groupId>org.apache.hadoop</groupId> - <artifactId>hadoop-client</artifactId> - <version>${hadoop.version}</version> - </dependency> - <dependency> - <groupId>com.github.chrislusf</groupId> - <artifactId>seaweedfs-client</artifactId> - <version>${seaweedfs.client.version}</version> - </dependency> - <dependency> - <groupId>org.apache.hadoop</groupId> - <artifactId>hadoop-common</artifactId> - <version>${hadoop.version}</version> - </dependency> - </dependencies> - -</project> diff --git a/other/java/hdfs/src/main/java/seaweed/hdfs/ReadBuffer.java b/other/java/hdfs/src/main/java/seaweed/hdfs/ReadBuffer.java deleted file mode 100644 index 926d0b83b..000000000 --- a/other/java/hdfs/src/main/java/seaweed/hdfs/ReadBuffer.java +++ /dev/null @@ -1,137 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package seaweed.hdfs; - -import java.util.concurrent.CountDownLatch; - -class ReadBuffer { - - private SeaweedInputStream stream; - private long offset; // offset within the file for the buffer - private int length; // actual length, set after the buffer is filles - private int requestedLength; // requested length of the read - private byte[] buffer; // the buffer itself - private int bufferindex = -1; // index in the buffers array in Buffer manager - private ReadBufferStatus status; // status of the buffer - private CountDownLatch latch = null; // signaled when the buffer is done reading, so any client - // waiting on this buffer gets unblocked - - // fields to help with eviction logic - private long timeStamp = 0; // tick at which buffer became available to read - private boolean isFirstByteConsumed = false; - private boolean isLastByteConsumed = false; - private boolean isAnyByteConsumed = false; - - public SeaweedInputStream getStream() { - return stream; - } - - public void setStream(SeaweedInputStream stream) { - this.stream = stream; - } - - public long getOffset() { - return offset; - } - - public void setOffset(long offset) { - this.offset = offset; - } - - public int getLength() { - return length; - } - - public void setLength(int length) { - this.length = length; - } - - public int getRequestedLength() { - return requestedLength; - } - - public void setRequestedLength(int requestedLength) { - this.requestedLength = requestedLength; - } - - public byte[] getBuffer() { - return buffer; - } - - public void setBuffer(byte[] buffer) { - this.buffer = buffer; - } - - public int getBufferindex() { - return bufferindex; - } - - public void setBufferindex(int bufferindex) { - this.bufferindex = bufferindex; - } - - public ReadBufferStatus getStatus() { - return status; - } - - public void setStatus(ReadBufferStatus status) { - this.status = status; - } - - public CountDownLatch getLatch() { - return latch; - } - - public void setLatch(CountDownLatch latch) { - this.latch = latch; - } - - public long getTimeStamp() { - return timeStamp; - } - - public void setTimeStamp(long timeStamp) { - this.timeStamp = timeStamp; - } - - public boolean isFirstByteConsumed() { - return isFirstByteConsumed; - } - - public void setFirstByteConsumed(boolean isFirstByteConsumed) { - this.isFirstByteConsumed = isFirstByteConsumed; - } - - public boolean isLastByteConsumed() { - return isLastByteConsumed; - } - - public void setLastByteConsumed(boolean isLastByteConsumed) { - this.isLastByteConsumed = isLastByteConsumed; - } - - public boolean isAnyByteConsumed() { - return isAnyByteConsumed; - } - - public void setAnyByteConsumed(boolean isAnyByteConsumed) { - this.isAnyByteConsumed = isAnyByteConsumed; - } - -} diff --git a/other/java/hdfs/src/main/java/seaweed/hdfs/ReadBufferManager.java b/other/java/hdfs/src/main/java/seaweed/hdfs/ReadBufferManager.java deleted file mode 100644 index 5b1e21529..000000000 --- a/other/java/hdfs/src/main/java/seaweed/hdfs/ReadBufferManager.java +++ /dev/null @@ -1,394 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * <p> - * http://www.apache.org/licenses/LICENSE-2.0 - * <p> - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package seaweed.hdfs; - -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.util.Collection; -import java.util.LinkedList; -import java.util.Queue; -import java.util.Stack; -import java.util.concurrent.CountDownLatch; - -/** - * The Read Buffer Manager for Rest AbfsClient. - */ -final class ReadBufferManager { - private static final Logger LOGGER = LoggerFactory.getLogger(ReadBufferManager.class); - - private static final int NUM_BUFFERS = 16; - private static final int BLOCK_SIZE = 4 * 1024 * 1024; - private static final int NUM_THREADS = 8; - private static final int THRESHOLD_AGE_MILLISECONDS = 3000; // have to see if 3 seconds is a good threshold - - private Thread[] threads = new Thread[NUM_THREADS]; - private byte[][] buffers; // array of byte[] buffers, to hold the data that is read - private Stack<Integer> freeList = new Stack<>(); // indices in buffers[] array that are available - - private Queue<ReadBuffer> readAheadQueue = new LinkedList<>(); // queue of requests that are not picked up by any worker thread yet - private LinkedList<ReadBuffer> inProgressList = new LinkedList<>(); // requests being processed by worker threads - private LinkedList<ReadBuffer> completedReadList = new LinkedList<>(); // buffers available for reading - private static final ReadBufferManager BUFFER_MANAGER; // singleton, initialized in static initialization block - - static { - BUFFER_MANAGER = new ReadBufferManager(); - BUFFER_MANAGER.init(); - } - - static ReadBufferManager getBufferManager() { - return BUFFER_MANAGER; - } - - private void init() { - buffers = new byte[NUM_BUFFERS][]; - for (int i = 0; i < NUM_BUFFERS; i++) { - buffers[i] = new byte[BLOCK_SIZE]; // same buffers are reused. The byte array never goes back to GC - freeList.add(i); - } - for (int i = 0; i < NUM_THREADS; i++) { - Thread t = new Thread(new ReadBufferWorker(i)); - t.setDaemon(true); - threads[i] = t; - t.setName("SeaweedFS-prefetch-" + i); - t.start(); - } - ReadBufferWorker.UNLEASH_WORKERS.countDown(); - } - - // hide instance constructor - private ReadBufferManager() { - } - - - /* - * - * SeaweedInputStream-facing methods - * - */ - - - /** - * {@link SeaweedInputStream} calls this method to queue read-aheads. - * - * @param stream The {@link SeaweedInputStream} for which to do the read-ahead - * @param requestedOffset The offset in the file which shoukd be read - * @param requestedLength The length to read - */ - void queueReadAhead(final SeaweedInputStream stream, final long requestedOffset, final int requestedLength) { - if (LOGGER.isTraceEnabled()) { - LOGGER.trace("Start Queueing readAhead for {} offset {} length {}", - stream.getPath(), requestedOffset, requestedLength); - } - ReadBuffer buffer; - synchronized (this) { - if (isAlreadyQueued(stream, requestedOffset)) { - return; // already queued, do not queue again - } - if (freeList.isEmpty() && !tryEvict()) { - return; // no buffers available, cannot queue anything - } - - buffer = new ReadBuffer(); - buffer.setStream(stream); - buffer.setOffset(requestedOffset); - buffer.setLength(0); - buffer.setRequestedLength(requestedLength); - buffer.setStatus(ReadBufferStatus.NOT_AVAILABLE); - buffer.setLatch(new CountDownLatch(1)); - - Integer bufferIndex = freeList.pop(); // will return a value, since we have checked size > 0 already - - buffer.setBuffer(buffers[bufferIndex]); - buffer.setBufferindex(bufferIndex); - readAheadQueue.add(buffer); - notifyAll(); - } - if (LOGGER.isTraceEnabled()) { - LOGGER.trace("Done q-ing readAhead for file {} offset {} buffer idx {}", - stream.getPath(), requestedOffset, buffer.getBufferindex()); - } - } - - - /** - * {@link SeaweedInputStream} calls this method read any bytes already available in a buffer (thereby saving a - * remote read). This returns the bytes if the data already exists in buffer. If there is a buffer that is reading - * the requested offset, then this method blocks until that read completes. If the data is queued in a read-ahead - * but not picked up by a worker thread yet, then it cancels that read-ahead and reports cache miss. This is because - * depending on worker thread availability, the read-ahead may take a while - the calling thread can do it's own - * read to get the data faster (copmared to the read waiting in queue for an indeterminate amount of time). - * - * @param stream the file to read bytes for - * @param position the offset in the file to do a read for - * @param length the length to read - * @param buffer the buffer to read data into. Note that the buffer will be written into from offset 0. - * @return the number of bytes read - */ - int getBlock(final SeaweedInputStream stream, final long position, final int length, final byte[] buffer) { - // not synchronized, so have to be careful with locking - if (LOGGER.isTraceEnabled()) { - LOGGER.trace("getBlock for file {} position {} thread {}", - stream.getPath(), position, Thread.currentThread().getName()); - } - - waitForProcess(stream, position); - - int bytesRead = 0; - synchronized (this) { - bytesRead = getBlockFromCompletedQueue(stream, position, length, buffer); - } - if (bytesRead > 0) { - if (LOGGER.isTraceEnabled()) { - LOGGER.trace("Done read from Cache for {} position {} length {}", - stream.getPath(), position, bytesRead); - } - return bytesRead; - } - - // otherwise, just say we got nothing - calling thread can do its own read - return 0; - } - - /* - * - * Internal methods - * - */ - - private void waitForProcess(final SeaweedInputStream stream, final long position) { - ReadBuffer readBuf; - synchronized (this) { - clearFromReadAheadQueue(stream, position); - readBuf = getFromList(inProgressList, stream, position); - } - if (readBuf != null) { // if in in-progress queue, then block for it - try { - if (LOGGER.isTraceEnabled()) { - LOGGER.trace("got a relevant read buffer for file {} offset {} buffer idx {}", - stream.getPath(), readBuf.getOffset(), readBuf.getBufferindex()); - } - readBuf.getLatch().await(); // blocking wait on the caller stream's thread - // Note on correctness: readBuf gets out of inProgressList only in 1 place: after worker thread - // is done processing it (in doneReading). There, the latch is set after removing the buffer from - // inProgressList. So this latch is safe to be outside the synchronized block. - // Putting it in synchronized would result in a deadlock, since this thread would be holding the lock - // while waiting, so no one will be able to change any state. If this becomes more complex in the future, - // then the latch cane be removed and replaced with wait/notify whenever inProgressList is touched. - } catch (InterruptedException ex) { - Thread.currentThread().interrupt(); - } - if (LOGGER.isTraceEnabled()) { - LOGGER.trace("latch done for file {} buffer idx {} length {}", - stream.getPath(), readBuf.getBufferindex(), readBuf.getLength()); - } - } - } - - /** - * If any buffer in the completedlist can be reclaimed then reclaim it and return the buffer to free list. - * The objective is to find just one buffer - there is no advantage to evicting more than one. - * - * @return whether the eviction succeeeded - i.e., were we able to free up one buffer - */ - private synchronized boolean tryEvict() { - ReadBuffer nodeToEvict = null; - if (completedReadList.size() <= 0) { - return false; // there are no evict-able buffers - } - - // first, try buffers where all bytes have been consumed (approximated as first and last bytes consumed) - for (ReadBuffer buf : completedReadList) { - if (buf.isFirstByteConsumed() && buf.isLastByteConsumed()) { - nodeToEvict = buf; - break; - } - } - if (nodeToEvict != null) { - return evict(nodeToEvict); - } - - // next, try buffers where any bytes have been consumed (may be a bad idea? have to experiment and see) - for (ReadBuffer buf : completedReadList) { - if (buf.isAnyByteConsumed()) { - nodeToEvict = buf; - break; - } - } - - if (nodeToEvict != null) { - return evict(nodeToEvict); - } - - // next, try any old nodes that have not been consumed - long earliestBirthday = Long.MAX_VALUE; - for (ReadBuffer buf : completedReadList) { - if (buf.getTimeStamp() < earliestBirthday) { - nodeToEvict = buf; - earliestBirthday = buf.getTimeStamp(); - } - } - if ((currentTimeMillis() - earliestBirthday > THRESHOLD_AGE_MILLISECONDS) && (nodeToEvict != null)) { - return evict(nodeToEvict); - } - - // nothing can be evicted - return false; - } - - private boolean evict(final ReadBuffer buf) { - freeList.push(buf.getBufferindex()); - completedReadList.remove(buf); - if (LOGGER.isTraceEnabled()) { - LOGGER.trace("Evicting buffer idx {}; was used for file {} offset {} length {}", - buf.getBufferindex(), buf.getStream().getPath(), buf.getOffset(), buf.getLength()); - } - return true; - } - - private boolean isAlreadyQueued(final SeaweedInputStream stream, final long requestedOffset) { - // returns true if any part of the buffer is already queued - return (isInList(readAheadQueue, stream, requestedOffset) - || isInList(inProgressList, stream, requestedOffset) - || isInList(completedReadList, stream, requestedOffset)); - } - - private boolean isInList(final Collection<ReadBuffer> list, final SeaweedInputStream stream, final long requestedOffset) { - return (getFromList(list, stream, requestedOffset) != null); - } - - private ReadBuffer getFromList(final Collection<ReadBuffer> list, final SeaweedInputStream stream, final long requestedOffset) { - for (ReadBuffer buffer : list) { - if (buffer.getStream() == stream) { - if (buffer.getStatus() == ReadBufferStatus.AVAILABLE - && requestedOffset >= buffer.getOffset() - && requestedOffset < buffer.getOffset() + buffer.getLength()) { - return buffer; - } else if (requestedOffset >= buffer.getOffset() - && requestedOffset < buffer.getOffset() + buffer.getRequestedLength()) { - return buffer; - } - } - } - return null; - } - - private void clearFromReadAheadQueue(final SeaweedInputStream stream, final long requestedOffset) { - ReadBuffer buffer = getFromList(readAheadQueue, stream, requestedOffset); - if (buffer != null) { - readAheadQueue.remove(buffer); - notifyAll(); // lock is held in calling method - freeList.push(buffer.getBufferindex()); - } - } - - private int getBlockFromCompletedQueue(final SeaweedInputStream stream, final long position, final int length, - final byte[] buffer) { - ReadBuffer buf = getFromList(completedReadList, stream, position); - if (buf == null || position >= buf.getOffset() + buf.getLength()) { - return 0; - } - int cursor = (int) (position - buf.getOffset()); - int availableLengthInBuffer = buf.getLength() - cursor; - int lengthToCopy = Math.min(length, availableLengthInBuffer); - System.arraycopy(buf.getBuffer(), cursor, buffer, 0, lengthToCopy); - if (cursor == 0) { - buf.setFirstByteConsumed(true); - } - if (cursor + lengthToCopy == buf.getLength()) { - buf.setLastByteConsumed(true); - } - buf.setAnyByteConsumed(true); - return lengthToCopy; - } - - /* - * - * ReadBufferWorker-thread-facing methods - * - */ - - /** - * ReadBufferWorker thread calls this to get the next buffer that it should work on. - * - * @return {@link ReadBuffer} - * @throws InterruptedException if thread is interrupted - */ - ReadBuffer getNextBlockToRead() throws InterruptedException { - ReadBuffer buffer = null; - synchronized (this) { - //buffer = readAheadQueue.take(); // blocking method - while (readAheadQueue.size() == 0) { - wait(); - } - buffer = readAheadQueue.remove(); - notifyAll(); - if (buffer == null) { - return null; // should never happen - } - buffer.setStatus(ReadBufferStatus.READING_IN_PROGRESS); - inProgressList.add(buffer); - } - if (LOGGER.isTraceEnabled()) { - LOGGER.trace("ReadBufferWorker picked file {} for offset {}", - buffer.getStream().getPath(), buffer.getOffset()); - } - return buffer; - } - - /** - * ReadBufferWorker thread calls this method to post completion. - * - * @param buffer the buffer whose read was completed - * @param result the {@link ReadBufferStatus} after the read operation in the worker thread - * @param bytesActuallyRead the number of bytes that the worker thread was actually able to read - */ - void doneReading(final ReadBuffer buffer, final ReadBufferStatus result, final int bytesActuallyRead) { - if (LOGGER.isTraceEnabled()) { - LOGGER.trace("ReadBufferWorker completed file {} for offset {} bytes {}", - buffer.getStream().getPath(), buffer.getOffset(), bytesActuallyRead); - } - synchronized (this) { - inProgressList.remove(buffer); - if (result == ReadBufferStatus.AVAILABLE && bytesActuallyRead > 0) { - buffer.setStatus(ReadBufferStatus.AVAILABLE); - buffer.setTimeStamp(currentTimeMillis()); - buffer.setLength(bytesActuallyRead); - completedReadList.add(buffer); - } else { - freeList.push(buffer.getBufferindex()); - // buffer should go out of scope after the end of the calling method in ReadBufferWorker, and eligible for GC - } - } - //outside the synchronized, since anyone receiving a wake-up from the latch must see safe-published results - buffer.getLatch().countDown(); // wake up waiting threads (if any) - } - - /** - * Similar to System.currentTimeMillis, except implemented with System.nanoTime(). - * System.currentTimeMillis can go backwards when system clock is changed (e.g., with NTP time synchronization), - * making it unsuitable for measuring time intervals. nanotime is strictly monotonically increasing per CPU core. - * Note: it is not monotonic across Sockets, and even within a CPU, its only the - * more recent parts which share a clock across all cores. - * - * @return current time in milliseconds - */ - private long currentTimeMillis() { - return System.nanoTime() / 1000 / 1000; - } -} diff --git a/other/java/hdfs/src/main/java/seaweed/hdfs/ReadBufferStatus.java b/other/java/hdfs/src/main/java/seaweed/hdfs/ReadBufferStatus.java deleted file mode 100644 index d63674977..000000000 --- a/other/java/hdfs/src/main/java/seaweed/hdfs/ReadBufferStatus.java +++ /dev/null @@ -1,29 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package seaweed.hdfs; - -/** - * The ReadBufferStatus for Rest AbfsClient - */ -public enum ReadBufferStatus { - NOT_AVAILABLE, // buffers sitting in readaheadqueue have this stats - READING_IN_PROGRESS, // reading is in progress on this buffer. Buffer should be in inProgressList - AVAILABLE, // data is available in buffer. It should be in completedList - READ_FAILED // read completed, but failed. -} diff --git a/other/java/hdfs/src/main/java/seaweed/hdfs/ReadBufferWorker.java b/other/java/hdfs/src/main/java/seaweed/hdfs/ReadBufferWorker.java deleted file mode 100644 index 6ffbc4644..000000000 --- a/other/java/hdfs/src/main/java/seaweed/hdfs/ReadBufferWorker.java +++ /dev/null @@ -1,70 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package seaweed.hdfs; - -import java.util.concurrent.CountDownLatch; - -class ReadBufferWorker implements Runnable { - - protected static final CountDownLatch UNLEASH_WORKERS = new CountDownLatch(1); - private int id; - - ReadBufferWorker(final int id) { - this.id = id; - } - - /** - * return the ID of ReadBufferWorker. - */ - public int getId() { - return this.id; - } - - /** - * Waits until a buffer becomes available in ReadAheadQueue. - * Once a buffer becomes available, reads the file specified in it and then posts results back to buffer manager. - * Rinse and repeat. Forever. - */ - public void run() { - try { - UNLEASH_WORKERS.await(); - } catch (InterruptedException ex) { - Thread.currentThread().interrupt(); - } - ReadBufferManager bufferManager = ReadBufferManager.getBufferManager(); - ReadBuffer buffer; - while (true) { - try { - buffer = bufferManager.getNextBlockToRead(); // blocks, until a buffer is available for this thread - } catch (InterruptedException ex) { - Thread.currentThread().interrupt(); - return; - } - if (buffer != null) { - try { - // do the actual read, from the file. - int bytesRead = buffer.getStream().readRemote(buffer.getOffset(), buffer.getBuffer(), 0, buffer.getRequestedLength()); - bufferManager.doneReading(buffer, ReadBufferStatus.AVAILABLE, bytesRead); // post result back to ReadBufferManager - } catch (Exception ex) { - bufferManager.doneReading(buffer, ReadBufferStatus.READ_FAILED, 0); - } - } - } - } -} diff --git a/other/java/hdfs/src/main/java/seaweed/hdfs/SeaweedFileSystem.java b/other/java/hdfs/src/main/java/seaweed/hdfs/SeaweedFileSystem.java deleted file mode 100644 index 453924cf7..000000000 --- a/other/java/hdfs/src/main/java/seaweed/hdfs/SeaweedFileSystem.java +++ /dev/null @@ -1,611 +0,0 @@ -package seaweed.hdfs; - -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.FSDataInputStream; -import org.apache.hadoop.fs.FSDataOutputStream; -import org.apache.hadoop.fs.FileAlreadyExistsException; -import org.apache.hadoop.fs.FileStatus; -import org.apache.hadoop.fs.ParentNotDirectoryException; -import org.apache.hadoop.fs.Path; -import org.apache.hadoop.fs.UnsupportedFileSystemException; -import org.apache.hadoop.fs.XAttrSetFlag; -import org.apache.hadoop.fs.permission.AclEntry; -import org.apache.hadoop.fs.permission.AclStatus; -import org.apache.hadoop.fs.permission.FsPermission; -import org.apache.hadoop.security.AccessControlException; -import org.apache.hadoop.security.UserGroupInformation; -import org.apache.hadoop.util.Progressable; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.io.FileNotFoundException; -import java.io.IOException; -import java.io.InputStream; -import java.io.OutputStream; -import java.net.URI; -import java.util.EnumSet; -import java.util.List; -import java.util.Map; - -import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.IO_FILE_BUFFER_SIZE_KEY; - -public class SeaweedFileSystem extends org.apache.hadoop.fs.FileSystem { - - public static final int FS_SEAWEED_DEFAULT_PORT = 8888; - public static final String FS_SEAWEED_FILER_HOST = "fs.seaweed.filer.host"; - public static final String FS_SEAWEED_FILER_PORT = "fs.seaweed.filer.port"; - public static final String FS_SEAWEED_GRPC_CA = "fs.seaweed.ca"; - public static final String FS_SEAWEED_GRPC_CLIENT_KEY = "fs.seaweed.client.key"; - public static final String FS_SEAWEED_GRPC_CLIENT_CERT = "fs.seaweed.client.cert"; - - private static final Logger LOG = LoggerFactory.getLogger(SeaweedFileSystem.class); - private static int BUFFER_SIZE = 16 * 1024 * 1024; - - private URI uri; - private Path workingDirectory = new Path("/"); - private SeaweedFileSystemStore seaweedFileSystemStore; - - public URI getUri() { - return uri; - } - - public String getScheme() { - return "seaweedfs"; - } - - @Override - public void initialize(URI uri, Configuration conf) throws IOException { // get - super.initialize(uri, conf); - - // get host information from uri (overrides info in conf) - String host = uri.getHost(); - host = (host == null) ? conf.get(FS_SEAWEED_FILER_HOST, "localhost") : host; - if (host == null) { - throw new IOException("Invalid host specified"); - } - conf.set(FS_SEAWEED_FILER_HOST, host); - - // get port information from uri, (overrides info in conf) - int port = uri.getPort(); - port = (port == -1) ? FS_SEAWEED_DEFAULT_PORT : port; - conf.setInt(FS_SEAWEED_FILER_PORT, port); - - conf.setInt(IO_FILE_BUFFER_SIZE_KEY, BUFFER_SIZE); - - setConf(conf); - this.uri = uri; - - if (conf.get(FS_SEAWEED_GRPC_CA) != null && conf.getTrimmed(FS_SEAWEED_GRPC_CA).length() != 0 - && conf.get(FS_SEAWEED_GRPC_CLIENT_CERT) != null && conf.getTrimmed(FS_SEAWEED_GRPC_CLIENT_CERT).length() != 0 - && conf.get(FS_SEAWEED_GRPC_CLIENT_KEY) != null && conf.getTrimmed(FS_SEAWEED_GRPC_CLIENT_KEY).length() != 0) { - seaweedFileSystemStore = new SeaweedFileSystemStore(host, port, - conf.get(FS_SEAWEED_GRPC_CA), - conf.get(FS_SEAWEED_GRPC_CLIENT_CERT), - conf.get(FS_SEAWEED_GRPC_CLIENT_KEY)); - } else { - seaweedFileSystemStore = new SeaweedFileSystemStore(host, port); - } - - } - - @Override - public FSDataInputStream open(Path path, int bufferSize) throws IOException { - - LOG.debug("open path: {} bufferSize:{}", path, bufferSize); - - path = qualify(path); - - try { - InputStream inputStream = seaweedFileSystemStore.openFileForRead(path, statistics, bufferSize); - return new FSDataInputStream(inputStream); - } catch (Exception ex) { - return null; - } - } - - @Override - public FSDataOutputStream create(Path path, FsPermission permission, final boolean overwrite, final int bufferSize, - final short replication, final long blockSize, final Progressable progress) throws IOException { - - LOG.debug("create path: {} bufferSize:{} blockSize:{}", path, bufferSize, blockSize); - - path = qualify(path); - - try { - String replicaPlacement = String.format("%03d", replication - 1); - OutputStream outputStream = seaweedFileSystemStore.createFile(path, overwrite, permission, bufferSize, replicaPlacement); - return new FSDataOutputStream(outputStream, statistics); - } catch (Exception ex) { - return null; - } - } - - @Override - public FSDataOutputStream append(Path path, int bufferSize, Progressable progressable) throws IOException { - - LOG.debug("append path: {} bufferSize:{}", path, bufferSize); - - path = qualify(path); - try { - OutputStream outputStream = seaweedFileSystemStore.createFile(path, false, null, bufferSize, ""); - return new FSDataOutputStream(outputStream, statistics); - } catch (Exception ex) { - return null; - } - } - - @Override - public boolean rename(Path src, Path dst) { - - LOG.debug("rename path: {} => {}", src, dst); - - if (src.isRoot()) { - return false; - } - - if (src.equals(dst)) { - return true; - } - FileStatus dstFileStatus = getFileStatus(dst); - - String sourceFileName = src.getName(); - Path adjustedDst = dst; - - if (dstFileStatus != null) { - if (!dstFileStatus.isDirectory()) { - return false; - } - adjustedDst = new Path(dst, sourceFileName); - } - - Path qualifiedSrcPath = qualify(src); - Path qualifiedDstPath = qualify(adjustedDst); - - seaweedFileSystemStore.rename(qualifiedSrcPath, qualifiedDstPath); - return true; - } - - @Override - public boolean delete(Path path, boolean recursive) { - - LOG.debug("delete path: {} recursive:{}", path, recursive); - - path = qualify(path); - - FileStatus fileStatus = getFileStatus(path); - - if (fileStatus == null) { - return true; - } - - return seaweedFileSystemStore.deleteEntries(path, fileStatus.isDirectory(), recursive); - - } - - @Override - public FileStatus[] listStatus(Path path) throws IOException { - - LOG.debug("listStatus path: {}", path); - - path = qualify(path); - - return seaweedFileSystemStore.listEntries(path); - } - - @Override - public Path getWorkingDirectory() { - return workingDirectory; - } - - @Override - public void setWorkingDirectory(Path path) { - if (path.isAbsolute()) { - workingDirectory = path; - } else { - workingDirectory = new Path(workingDirectory, path); - } - } - - @Override - public boolean mkdirs(Path path, FsPermission fsPermission) throws IOException { - - LOG.debug("mkdirs path: {}", path); - - path = qualify(path); - - FileStatus fileStatus = getFileStatus(path); - - if (fileStatus == null) { - - UserGroupInformation currentUser = UserGroupInformation.getCurrentUser(); - return seaweedFileSystemStore.createDirectory(path, currentUser, - fsPermission == null ? FsPermission.getDirDefault() : fsPermission, - FsPermission.getUMask(getConf())); - - } - - if (fileStatus.isDirectory()) { - return true; - } else { - throw new FileAlreadyExistsException("Path is a file: " + path); - } - } - - @Override - public FileStatus getFileStatus(Path path) { - - LOG.debug("getFileStatus path: {}", path); - - path = qualify(path); - - return seaweedFileSystemStore.getFileStatus(path); - } - - /** - * Set owner of a path (i.e. a file or a directory). - * The parameters owner and group cannot both be null. - * - * @param path The path - * @param owner If it is null, the original username remains unchanged. - * @param group If it is null, the original groupname remains unchanged. - */ - @Override - public void setOwner(Path path, final String owner, final String group) - throws IOException { - LOG.debug("setOwner path: {}", path); - path = qualify(path); - - seaweedFileSystemStore.setOwner(path, owner, group); - } - - - /** - * Set permission of a path. - * - * @param path The path - * @param permission Access permission - */ - @Override - public void setPermission(Path path, final FsPermission permission) throws IOException { - LOG.debug("setPermission path: {}", path); - - if (permission == null) { - throw new IllegalArgumentException("The permission can't be null"); - } - - path = qualify(path); - - seaweedFileSystemStore.setPermission(path, permission); - } - - Path qualify(Path path) { - return path.makeQualified(uri, workingDirectory); - } - - /** - * Concat existing files together. - * - * @param trg the path to the target destination. - * @param psrcs the paths to the sources to use for the concatenation. - * @throws IOException IO failure - * @throws UnsupportedOperationException if the operation is unsupported - * (default). - */ - @Override - public void concat(final Path trg, final Path[] psrcs) throws IOException { - throw new UnsupportedOperationException("Not implemented by the " + - getClass().getSimpleName() + " FileSystem implementation"); - } - - /** - * Truncate the file in the indicated path to the indicated size. - * <ul> - * <li>Fails if path is a directory.</li> - * <li>Fails if path does not exist.</li> - * <li>Fails if path is not closed.</li> - * <li>Fails if new size is greater than current size.</li> - * </ul> - * - * @param f The path to the file to be truncated - * @param newLength The size the file is to be truncated to - * @return <code>true</code> if the file has been truncated to the desired - * <code>newLength</code> and is immediately available to be reused for - * write operations such as <code>append</code>, or - * <code>false</code> if a background process of adjusting the length of - * the last block has been started, and clients should wait for it to - * complete before proceeding with further file updates. - * @throws IOException IO failure - * @throws UnsupportedOperationException if the operation is unsupported - * (default). - */ - @Override - public boolean truncate(Path f, long newLength) throws IOException { - throw new UnsupportedOperationException("Not implemented by the " + - getClass().getSimpleName() + " FileSystem implementation"); - } - - @Override - public void createSymlink(final Path target, final Path link, - final boolean createParent) throws AccessControlException, - FileAlreadyExistsException, FileNotFoundException, - ParentNotDirectoryException, UnsupportedFileSystemException, - IOException { - // Supporting filesystems should override this method - throw new UnsupportedOperationException( - "Filesystem does not support symlinks!"); - } - - public boolean supportsSymlinks() { - return false; - } - - /** - * Create a snapshot. - * - * @param path The directory where snapshots will be taken. - * @param snapshotName The name of the snapshot - * @return the snapshot path. - * @throws IOException IO failure - * @throws UnsupportedOperationException if the operation is unsupported - */ - @Override - public Path createSnapshot(Path path, String snapshotName) - throws IOException { - throw new UnsupportedOperationException(getClass().getSimpleName() - + " doesn't support createSnapshot"); - } - - /** - * Rename a snapshot. - * - * @param path The directory path where the snapshot was taken - * @param snapshotOldName Old name of the snapshot - * @param snapshotNewName New name of the snapshot - * @throws IOException IO failure - * @throws UnsupportedOperationException if the operation is unsupported - * (default outcome). - */ - @Override - public void renameSnapshot(Path path, String snapshotOldName, - String snapshotNewName) throws IOException { - throw new UnsupportedOperationException(getClass().getSimpleName() - + " doesn't support renameSnapshot"); - } - - /** - * Delete a snapshot of a directory. - * - * @param path The directory that the to-be-deleted snapshot belongs to - * @param snapshotName The name of the snapshot - * @throws IOException IO failure - * @throws UnsupportedOperationException if the operation is unsupported - * (default outcome). - */ - @Override - public void deleteSnapshot(Path path, String snapshotName) - throws IOException { - throw new UnsupportedOperationException(getClass().getSimpleName() - + " doesn't support deleteSnapshot"); - } - - /** - * Modifies ACL entries of files and directories. This method can add new ACL - * entries or modify the permissions on existing ACL entries. All existing - * ACL entries that are not specified in this call are retained without - * changes. (Modifications are merged into the current ACL.) - * - * @param path Path to modify - * @param aclSpec List<AclEntry> describing modifications - * @throws IOException if an ACL could not be modified - * @throws UnsupportedOperationException if the operation is unsupported - * (default outcome). - */ - @Override - public void modifyAclEntries(Path path, List<AclEntry> aclSpec) - throws IOException { - throw new UnsupportedOperationException(getClass().getSimpleName() - + " doesn't support modifyAclEntries"); - } - - /** - * Removes ACL entries from files and directories. Other ACL entries are - * retained. - * - * @param path Path to modify - * @param aclSpec List describing entries to remove - * @throws IOException if an ACL could not be modified - * @throws UnsupportedOperationException if the operation is unsupported - * (default outcome). - */ - @Override - public void removeAclEntries(Path path, List<AclEntry> aclSpec) - throws IOException { - throw new UnsupportedOperationException(getClass().getSimpleName() - + " doesn't support removeAclEntries"); - } - - /** - * Removes all default ACL entries from files and directories. - * - * @param path Path to modify - * @throws IOException if an ACL could not be modified - * @throws UnsupportedOperationException if the operation is unsupported - * (default outcome). - */ - @Override - public void removeDefaultAcl(Path path) - throws IOException { - throw new UnsupportedOperationException(getClass().getSimpleName() - + " doesn't support removeDefaultAcl"); - } - - /** - * Removes all but the base ACL entries of files and directories. The entries - * for user, group, and others are retained for compatibility with permission - * bits. - * - * @param path Path to modify - * @throws IOException if an ACL could not be removed - * @throws UnsupportedOperationException if the operation is unsupported - * (default outcome). - */ - @Override - public void removeAcl(Path path) - throws IOException { - throw new UnsupportedOperationException(getClass().getSimpleName() - + " doesn't support removeAcl"); - } - - /** - * Fully replaces ACL of files and directories, discarding all existing - * entries. - * - * @param path Path to modify - * @param aclSpec List describing modifications, which must include entries - * for user, group, and others for compatibility with permission bits. - * @throws IOException if an ACL could not be modified - * @throws UnsupportedOperationException if the operation is unsupported - * (default outcome). - */ - @Override - public void setAcl(Path path, List<AclEntry> aclSpec) throws IOException { - throw new UnsupportedOperationException(getClass().getSimpleName() - + " doesn't support setAcl"); - } - - /** - * Gets the ACL of a file or directory. - * - * @param path Path to get - * @return AclStatus describing the ACL of the file or directory - * @throws IOException if an ACL could not be read - * @throws UnsupportedOperationException if the operation is unsupported - * (default outcome). - */ - @Override - public AclStatus getAclStatus(Path path) throws IOException { - throw new UnsupportedOperationException(getClass().getSimpleName() - + " doesn't support getAclStatus"); - } - - /** - * Set an xattr of a file or directory. - * The name must be prefixed with the namespace followed by ".". For example, - * "user.attr". - * <p> - * Refer to the HDFS extended attributes user documentation for details. - * - * @param path Path to modify - * @param name xattr name. - * @param value xattr value. - * @param flag xattr set flag - * @throws IOException IO failure - * @throws UnsupportedOperationException if the operation is unsupported - * (default outcome). - */ - @Override - public void setXAttr(Path path, String name, byte[] value, - EnumSet<XAttrSetFlag> flag) throws IOException { - throw new UnsupportedOperationException(getClass().getSimpleName() - + " doesn't support setXAttr"); - } - - /** - * Get an xattr name and value for a file or directory. - * The name must be prefixed with the namespace followed by ".". For example, - * "user.attr". - * <p> - * Refer to the HDFS extended attributes user documentation for details. - * - * @param path Path to get extended attribute - * @param name xattr name. - * @return byte[] xattr value. - * @throws IOException IO failure - * @throws UnsupportedOperationException if the operation is unsupported - * (default outcome). - */ - @Override - public byte[] getXAttr(Path path, String name) throws IOException { - throw new UnsupportedOperationException(getClass().getSimpleName() - + " doesn't support getXAttr"); - } - - /** - * Get all of the xattr name/value pairs for a file or directory. - * Only those xattrs which the logged-in user has permissions to view - * are returned. - * <p> - * Refer to the HDFS extended attributes user documentation for details. - * - * @param path Path to get extended attributes - * @return Map describing the XAttrs of the file or directory - * @throws IOException IO failure - * @throws UnsupportedOperationException if the operation is unsupported - * (default outcome). - */ - @Override - public Map<String, byte[]> getXAttrs(Path path) throws IOException { - throw new UnsupportedOperationException(getClass().getSimpleName() - + " doesn't support getXAttrs"); - } - - /** - * Get all of the xattrs name/value pairs for a file or directory. - * Only those xattrs which the logged-in user has permissions to view - * are returned. - * <p> - * Refer to the HDFS extended attributes user documentation for details. - * - * @param path Path to get extended attributes - * @param names XAttr names. - * @return Map describing the XAttrs of the file or directory - * @throws IOException IO failure - * @throws UnsupportedOperationException if the operation is unsupported - * (default outcome). - */ - @Override - public Map<String, byte[]> getXAttrs(Path path, List<String> names) - throws IOException { - throw new UnsupportedOperationException(getClass().getSimpleName() - + " doesn't support getXAttrs"); - } - - /** - * Get all of the xattr names for a file or directory. - * Only those xattr names which the logged-in user has permissions to view - * are returned. - * <p> - * Refer to the HDFS extended attributes user documentation for details. - * - * @param path Path to get extended attributes - * @return List{@literal <String>} of the XAttr names of the file or directory - * @throws IOException IO failure - * @throws UnsupportedOperationException if the operation is unsupported - * (default outcome). - */ - @Override - public List<String> listXAttrs(Path path) throws IOException { - throw new UnsupportedOperationException(getClass().getSimpleName() - + " doesn't support listXAttrs"); - } - - /** - * Remove an xattr of a file or directory. - * The name must be prefixed with the namespace followed by ".". For example, - * "user.attr". - * <p> - * Refer to the HDFS extended attributes user documentation for details. - * - * @param path Path to remove extended attribute - * @param name xattr name - * @throws IOException IO failure - * @throws UnsupportedOperationException if the operation is unsupported - * (default outcome). - */ - @Override - public void removeXAttr(Path path, String name) throws IOException { - throw new UnsupportedOperationException(getClass().getSimpleName() - + " doesn't support removeXAttr"); - } - -} diff --git a/other/java/hdfs/src/main/java/seaweed/hdfs/SeaweedFileSystemStore.java b/other/java/hdfs/src/main/java/seaweed/hdfs/SeaweedFileSystemStore.java deleted file mode 100644 index 643467898..000000000 --- a/other/java/hdfs/src/main/java/seaweed/hdfs/SeaweedFileSystemStore.java +++ /dev/null @@ -1,277 +0,0 @@ -package seaweed.hdfs; - -import org.apache.hadoop.fs.FileStatus; -import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.fs.Path; -import org.apache.hadoop.fs.permission.FsPermission; -import org.apache.hadoop.security.UserGroupInformation; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; -import seaweedfs.client.FilerClient; -import seaweedfs.client.FilerGrpcClient; -import seaweedfs.client.FilerProto; -import seaweedfs.client.SeaweedRead; - -import javax.net.ssl.SSLException; -import java.io.FileNotFoundException; -import java.io.IOException; -import java.io.InputStream; -import java.io.OutputStream; -import java.util.ArrayList; -import java.util.Arrays; -import java.util.List; - -public class SeaweedFileSystemStore { - - private static final Logger LOG = LoggerFactory.getLogger(SeaweedFileSystemStore.class); - - private FilerGrpcClient filerGrpcClient; - private FilerClient filerClient; - - public SeaweedFileSystemStore(String host, int port) { - int grpcPort = 10000 + port; - filerGrpcClient = new FilerGrpcClient(host, grpcPort); - filerClient = new FilerClient(filerGrpcClient); - } - - public SeaweedFileSystemStore(String host, int port, - String caFile, String clientCertFile, String clientKeyFile) throws SSLException { - int grpcPort = 10000 + port; - filerGrpcClient = new FilerGrpcClient(host, grpcPort, caFile, clientCertFile, clientKeyFile); - filerClient = new FilerClient(filerGrpcClient); - } - - public static String getParentDirectory(Path path) { - return path.isRoot() ? "/" : path.getParent().toUri().getPath(); - } - - static int permissionToMode(FsPermission permission, boolean isDirectory) { - int p = permission.toShort(); - if (isDirectory) { - p = p | 1 << 31; - } - return p; - } - - public boolean createDirectory(final Path path, UserGroupInformation currentUser, - final FsPermission permission, final FsPermission umask) { - - LOG.debug("createDirectory path: {} permission: {} umask: {}", - path, - permission, - umask); - - return filerClient.mkdirs( - path.toUri().getPath(), - permissionToMode(permission, true), - currentUser.getUserName(), - currentUser.getGroupNames() - ); - } - - public FileStatus[] listEntries(final Path path) { - LOG.debug("listEntries path: {}", path); - - List<FileStatus> fileStatuses = new ArrayList<FileStatus>(); - - List<FilerProto.Entry> entries = filerClient.listEntries(path.toUri().getPath()); - - for (FilerProto.Entry entry : entries) { - - FileStatus fileStatus = doGetFileStatus(new Path(path, entry.getName()), entry); - - fileStatuses.add(fileStatus); - } - return fileStatuses.toArray(new FileStatus[0]); - } - - public FileStatus getFileStatus(final Path path) { - - FilerProto.Entry entry = lookupEntry(path); - if (entry == null) { - return null; - } - LOG.debug("doGetFileStatus path:{} entry:{}", path, entry); - - FileStatus fileStatus = doGetFileStatus(path, entry); - return fileStatus; - } - - public boolean deleteEntries(final Path path, boolean isDirectory, boolean recursive) { - LOG.debug("deleteEntries path: {} isDirectory {} recursive: {}", - path, - String.valueOf(isDirectory), - String.valueOf(recursive)); - - if (path.isRoot()) { - return true; - } - - if (recursive && isDirectory) { - List<FilerProto.Entry> entries = filerClient.listEntries(path.toUri().getPath()); - for (FilerProto.Entry entry : entries) { - deleteEntries(new Path(path, entry.getName()), entry.getIsDirectory(), true); - } - } - - return filerClient.deleteEntry(getParentDirectory(path), path.getName(), true, recursive); - } - - private FileStatus doGetFileStatus(Path path, FilerProto.Entry entry) { - FilerProto.FuseAttributes attributes = entry.getAttributes(); - long length = SeaweedRead.totalSize(entry.getChunksList()); - boolean isDir = entry.getIsDirectory(); - int block_replication = 1; - int blocksize = 512; - long modification_time = attributes.getMtime() * 1000; // milliseconds - long access_time = 0; - FsPermission permission = FsPermission.createImmutable((short) attributes.getFileMode()); - String owner = attributes.getUserName(); - String group = attributes.getGroupNameCount() > 0 ? attributes.getGroupName(0) : ""; - return new FileStatus(length, isDir, block_replication, blocksize, - modification_time, access_time, permission, owner, group, null, path); - } - - private FilerProto.Entry lookupEntry(Path path) { - - return filerClient.lookupEntry(getParentDirectory(path), path.getName()); - - } - - public void rename(Path source, Path destination) { - - LOG.debug("rename source: {} destination:{}", source, destination); - - if (source.isRoot()) { - return; - } - LOG.warn("rename lookupEntry source: {}", source); - FilerProto.Entry entry = lookupEntry(source); - if (entry == null) { - LOG.warn("rename non-existing source: {}", source); - return; - } - filerClient.mv(source.toUri().getPath(), destination.toUri().getPath()); - } - - public OutputStream createFile(final Path path, - final boolean overwrite, - FsPermission permission, - int bufferSize, - String replication) throws IOException { - - permission = permission == null ? FsPermission.getFileDefault() : permission; - - LOG.debug("createFile path: {} overwrite: {} permission: {}", - path, - overwrite, - permission.toString()); - - UserGroupInformation userGroupInformation = UserGroupInformation.getCurrentUser(); - long now = System.currentTimeMillis() / 1000L; - - FilerProto.Entry.Builder entry = null; - long writePosition = 0; - if (!overwrite) { - FilerProto.Entry existingEntry = lookupEntry(path); - LOG.debug("createFile merged entry path:{} existingEntry:{}", path, existingEntry); - if (existingEntry != null) { - entry = FilerProto.Entry.newBuilder(); - entry.mergeFrom(existingEntry); - entry.getAttributesBuilder().setMtime(now); - } - LOG.debug("createFile merged entry path:{} entry:{} from:{}", path, entry, existingEntry); - writePosition = SeaweedRead.totalSize(existingEntry.getChunksList()); - replication = existingEntry.getAttributes().getReplication(); - } - if (entry == null) { - entry = FilerProto.Entry.newBuilder() - .setName(path.getName()) - .setIsDirectory(false) - .setAttributes(FilerProto.FuseAttributes.newBuilder() - .setFileMode(permissionToMode(permission, false)) - .setReplication(replication) - .setCrtime(now) - .setMtime(now) - .setUserName(userGroupInformation.getUserName()) - .clearGroupName() - .addAllGroupName(Arrays.asList(userGroupInformation.getGroupNames())) - ); - } - - return new SeaweedOutputStream(filerGrpcClient, path, entry, writePosition, bufferSize, replication); - - } - - public InputStream openFileForRead(final Path path, FileSystem.Statistics statistics, - int bufferSize) throws IOException { - - LOG.debug("openFileForRead path:{} bufferSize:{}", path, bufferSize); - - int readAheadQueueDepth = 2; - FilerProto.Entry entry = lookupEntry(path); - - if (entry == null) { - throw new FileNotFoundException("read non-exist file " + path); - } - - return new SeaweedInputStream(filerGrpcClient, - statistics, - path.toUri().getPath(), - entry, - bufferSize, - readAheadQueueDepth); - } - - public void setOwner(Path path, String owner, String group) { - - LOG.debug("setOwner path:{} owner:{} group:{}", path, owner, group); - - FilerProto.Entry entry = lookupEntry(path); - if (entry == null) { - LOG.debug("setOwner path:{} entry:{}", path, entry); - return; - } - - FilerProto.Entry.Builder entryBuilder = entry.toBuilder(); - FilerProto.FuseAttributes.Builder attributesBuilder = entry.getAttributes().toBuilder(); - - if (owner != null) { - attributesBuilder.setUserName(owner); - } - if (group != null) { - attributesBuilder.clearGroupName(); - attributesBuilder.addGroupName(group); - } - - entryBuilder.setAttributes(attributesBuilder); - - LOG.debug("setOwner path:{} entry:{}", path, entryBuilder); - - filerClient.updateEntry(getParentDirectory(path), entryBuilder.build()); - - } - - public void setPermission(Path path, FsPermission permission) { - - LOG.debug("setPermission path:{} permission:{}", path, permission); - - FilerProto.Entry entry = lookupEntry(path); - if (entry == null) { - LOG.debug("setPermission path:{} entry:{}", path, entry); - return; - } - - FilerProto.Entry.Builder entryBuilder = entry.toBuilder(); - FilerProto.FuseAttributes.Builder attributesBuilder = entry.getAttributes().toBuilder(); - - attributesBuilder.setFileMode(permissionToMode(permission, entry.getIsDirectory())); - - entryBuilder.setAttributes(attributesBuilder); - - LOG.debug("setPermission path:{} entry:{}", path, entryBuilder); - - filerClient.updateEntry(getParentDirectory(path), entryBuilder.build()); - - } -} diff --git a/other/java/hdfs/src/main/java/seaweed/hdfs/SeaweedInputStream.java b/other/java/hdfs/src/main/java/seaweed/hdfs/SeaweedInputStream.java deleted file mode 100644 index 90c14c772..000000000 --- a/other/java/hdfs/src/main/java/seaweed/hdfs/SeaweedInputStream.java +++ /dev/null @@ -1,371 +0,0 @@ -package seaweed.hdfs; - -// based on org.apache.hadoop.fs.azurebfs.services.AbfsInputStream - -import com.google.common.base.Preconditions; -import org.apache.hadoop.fs.FSExceptionMessages; -import org.apache.hadoop.fs.FSInputStream; -import org.apache.hadoop.fs.FileSystem.Statistics; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; -import seaweedfs.client.FilerGrpcClient; -import seaweedfs.client.FilerProto; -import seaweedfs.client.SeaweedRead; - -import java.io.EOFException; -import java.io.IOException; -import java.util.List; - -public class SeaweedInputStream extends FSInputStream { - - private static final Logger LOG = LoggerFactory.getLogger(SeaweedInputStream.class); - - private final FilerGrpcClient filerGrpcClient; - private final Statistics statistics; - private final String path; - private final FilerProto.Entry entry; - private final List<SeaweedRead.VisibleInterval> visibleIntervalList; - private final long contentLength; - private final int bufferSize; // default buffer size - private final int readAheadQueueDepth; // initialized in constructor - private final boolean readAheadEnabled; // whether enable readAhead; - - private byte[] buffer = null; // will be initialized on first use - - private long fCursor = 0; // cursor of buffer within file - offset of next byte to read from remote server - private long fCursorAfterLastRead = -1; - private int bCursor = 0; // cursor of read within buffer - offset of next byte to be returned from buffer - private int limit = 0; // offset of next byte to be read into buffer from service (i.e., upper marker+1 - // of valid bytes in buffer) - private boolean closed = false; - - public SeaweedInputStream( - final FilerGrpcClient filerGrpcClient, - final Statistics statistics, - final String path, - final FilerProto.Entry entry, - final int bufferSize, - final int readAheadQueueDepth) { - this.filerGrpcClient = filerGrpcClient; - this.statistics = statistics; - this.path = path; - this.entry = entry; - this.contentLength = SeaweedRead.totalSize(entry.getChunksList()); - this.bufferSize = bufferSize; - this.readAheadQueueDepth = (readAheadQueueDepth >= 0) ? readAheadQueueDepth : Runtime.getRuntime().availableProcessors(); - this.readAheadEnabled = true; - - this.visibleIntervalList = SeaweedRead.nonOverlappingVisibleIntervals(entry.getChunksList()); - - LOG.debug("new path:{} entry:{} visibleIntervalList:{}", path, entry, visibleIntervalList); - - } - - public String getPath() { - return path; - } - - @Override - public int read() throws IOException { - byte[] b = new byte[1]; - int numberOfBytesRead = read(b, 0, 1); - if (numberOfBytesRead < 0) { - return -1; - } else { - return (b[0] & 0xFF); - } - } - - @Override - public synchronized int read(final byte[] b, final int off, final int len) throws IOException { - int currentOff = off; - int currentLen = len; - int lastReadBytes; - int totalReadBytes = 0; - do { - lastReadBytes = readOneBlock(b, currentOff, currentLen); - if (lastReadBytes > 0) { - currentOff += lastReadBytes; - currentLen -= lastReadBytes; - totalReadBytes += lastReadBytes; - } - if (currentLen <= 0 || currentLen > b.length - currentOff) { - break; - } - } while (lastReadBytes > 0); - return totalReadBytes > 0 ? totalReadBytes : lastReadBytes; - } - - private int readOneBlock(final byte[] b, final int off, final int len) throws IOException { - if (closed) { - throw new IOException(FSExceptionMessages.STREAM_IS_CLOSED); - } - - Preconditions.checkNotNull(b); - - if (len == 0) { - return 0; - } - - if (this.available() == 0) { - return -1; - } - - if (off < 0 || len < 0 || len > b.length - off) { - throw new IndexOutOfBoundsException(); - } - - //If buffer is empty, then fill the buffer. - if (bCursor == limit) { - //If EOF, then return -1 - if (fCursor >= contentLength) { - return -1; - } - - long bytesRead = 0; - //reset buffer to initial state - i.e., throw away existing data - bCursor = 0; - limit = 0; - if (buffer == null) { - buffer = new byte[bufferSize]; - } - - // Enable readAhead when reading sequentially - if (-1 == fCursorAfterLastRead || fCursorAfterLastRead == fCursor || b.length >= bufferSize) { - bytesRead = readInternal(fCursor, buffer, 0, bufferSize, false); - } else { - bytesRead = readInternal(fCursor, buffer, 0, b.length, true); - } - - if (bytesRead == -1) { - return -1; - } - - limit += bytesRead; - fCursor += bytesRead; - fCursorAfterLastRead = fCursor; - } - - //If there is anything in the buffer, then return lesser of (requested bytes) and (bytes in buffer) - //(bytes returned may be less than requested) - int bytesRemaining = limit - bCursor; - int bytesToRead = Math.min(len, bytesRemaining); - System.arraycopy(buffer, bCursor, b, off, bytesToRead); - bCursor += bytesToRead; - if (statistics != null) { - statistics.incrementBytesRead(bytesToRead); - } - return bytesToRead; - } - - - private int readInternal(final long position, final byte[] b, final int offset, final int length, - final boolean bypassReadAhead) throws IOException { - if (readAheadEnabled && !bypassReadAhead) { - // try reading from read-ahead - if (offset != 0) { - throw new IllegalArgumentException("readahead buffers cannot have non-zero buffer offsets"); - } - int receivedBytes; - - // queue read-aheads - int numReadAheads = this.readAheadQueueDepth; - long nextSize; - long nextOffset = position; - while (numReadAheads > 0 && nextOffset < contentLength) { - nextSize = Math.min((long) bufferSize, contentLength - nextOffset); - ReadBufferManager.getBufferManager().queueReadAhead(this, nextOffset, (int) nextSize); - nextOffset = nextOffset + nextSize; - numReadAheads--; - } - - // try reading from buffers first - receivedBytes = ReadBufferManager.getBufferManager().getBlock(this, position, length, b); - if (receivedBytes > 0) { - return receivedBytes; - } - - // got nothing from read-ahead, do our own read now - receivedBytes = readRemote(position, b, offset, length); - return receivedBytes; - } else { - return readRemote(position, b, offset, length); - } - } - - int readRemote(long position, byte[] b, int offset, int length) throws IOException { - if (position < 0) { - throw new IllegalArgumentException("attempting to read from negative offset"); - } - if (position >= contentLength) { - return -1; // Hadoop prefers -1 to EOFException - } - if (b == null) { - throw new IllegalArgumentException("null byte array passed in to read() method"); - } - if (offset >= b.length) { - throw new IllegalArgumentException("offset greater than length of array"); - } - if (length < 0) { - throw new IllegalArgumentException("requested read length is less than zero"); - } - if (length > (b.length - offset)) { - throw new IllegalArgumentException("requested read length is more than will fit after requested offset in buffer"); - } - - long bytesRead = SeaweedRead.read(filerGrpcClient, visibleIntervalList, position, b, offset, length); - if (bytesRead > Integer.MAX_VALUE) { - throw new IOException("Unexpected Content-Length"); - } - return (int) bytesRead; - } - - /** - * Seek to given position in stream. - * - * @param n position to seek to - * @throws IOException if there is an error - * @throws EOFException if attempting to seek past end of file - */ - @Override - public synchronized void seek(long n) throws IOException { - if (closed) { - throw new IOException(FSExceptionMessages.STREAM_IS_CLOSED); - } - if (n < 0) { - throw new EOFException(FSExceptionMessages.NEGATIVE_SEEK); - } - if (n > contentLength) { - throw new EOFException(FSExceptionMessages.CANNOT_SEEK_PAST_EOF); - } - - if (n >= fCursor - limit && n <= fCursor) { // within buffer - bCursor = (int) (n - (fCursor - limit)); - return; - } - - // next read will read from here - fCursor = n; - - //invalidate buffer - limit = 0; - bCursor = 0; - } - - @Override - public synchronized long skip(long n) throws IOException { - if (closed) { - throw new IOException(FSExceptionMessages.STREAM_IS_CLOSED); - } - long currentPos = getPos(); - if (currentPos == contentLength) { - if (n > 0) { - throw new EOFException(FSExceptionMessages.CANNOT_SEEK_PAST_EOF); - } - } - long newPos = currentPos + n; - if (newPos < 0) { - newPos = 0; - n = newPos - currentPos; - } - if (newPos > contentLength) { - newPos = contentLength; - n = newPos - currentPos; - } - seek(newPos); - return n; - } - - /** - * Return the size of the remaining available bytes - * if the size is less than or equal to {@link Integer#MAX_VALUE}, - * otherwise, return {@link Integer#MAX_VALUE}. - * <p> - * This is to match the behavior of DFSInputStream.available(), - * which some clients may rely on (HBase write-ahead log reading in - * particular). - */ - @Override - public synchronized int available() throws IOException { - if (closed) { - throw new IOException( - FSExceptionMessages.STREAM_IS_CLOSED); - } - final long remaining = this.contentLength - this.getPos(); - return remaining <= Integer.MAX_VALUE - ? (int) remaining : Integer.MAX_VALUE; - } - - /** - * Returns the length of the file that this stream refers to. Note that the length returned is the length - * as of the time the Stream was opened. Specifically, if there have been subsequent appends to the file, - * they wont be reflected in the returned length. - * - * @return length of the file. - * @throws IOException if the stream is closed - */ - public long length() throws IOException { - if (closed) { - throw new IOException(FSExceptionMessages.STREAM_IS_CLOSED); - } - return contentLength; - } - - /** - * Return the current offset from the start of the file - * - * @throws IOException throws {@link IOException} if there is an error - */ - @Override - public synchronized long getPos() throws IOException { - if (closed) { - throw new IOException(FSExceptionMessages.STREAM_IS_CLOSED); - } - return fCursor - limit + bCursor; - } - - /** - * Seeks a different copy of the data. Returns true if - * found a new source, false otherwise. - * - * @throws IOException throws {@link IOException} if there is an error - */ - @Override - public boolean seekToNewSource(long l) throws IOException { - return false; - } - - @Override - public synchronized void close() throws IOException { - closed = true; - buffer = null; // de-reference the buffer so it can be GC'ed sooner - } - - /** - * Not supported by this stream. Throws {@link UnsupportedOperationException} - * - * @param readlimit ignored - */ - @Override - public synchronized void mark(int readlimit) { - throw new UnsupportedOperationException("mark()/reset() not supported on this stream"); - } - - /** - * Not supported by this stream. Throws {@link UnsupportedOperationException} - */ - @Override - public synchronized void reset() throws IOException { - throw new UnsupportedOperationException("mark()/reset() not supported on this stream"); - } - - /** - * gets whether mark and reset are supported by {@code ADLFileInputStream}. Always returns false. - * - * @return always {@code false} - */ - @Override - public boolean markSupported() { - return false; - } -} diff --git a/other/java/hdfs/src/main/java/seaweed/hdfs/SeaweedOutputStream.java b/other/java/hdfs/src/main/java/seaweed/hdfs/SeaweedOutputStream.java deleted file mode 100644 index 4f307ff96..000000000 --- a/other/java/hdfs/src/main/java/seaweed/hdfs/SeaweedOutputStream.java +++ /dev/null @@ -1,335 +0,0 @@ -package seaweed.hdfs; - -// adapted from org.apache.hadoop.fs.azurebfs.services.AbfsOutputStream - -import com.google.common.base.Preconditions; -import org.apache.hadoop.fs.FSExceptionMessages; -import org.apache.hadoop.fs.Path; -import org.apache.hadoop.fs.StreamCapabilities; -import org.apache.hadoop.fs.Syncable; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; -import seaweedfs.client.FilerGrpcClient; -import seaweedfs.client.FilerProto; -import seaweedfs.client.SeaweedWrite; - -import java.io.IOException; -import java.io.InterruptedIOException; -import java.io.OutputStream; -import java.util.Locale; -import java.util.concurrent.Callable; -import java.util.concurrent.ConcurrentLinkedDeque; -import java.util.concurrent.ExecutorCompletionService; -import java.util.concurrent.Future; -import java.util.concurrent.LinkedBlockingQueue; -import java.util.concurrent.ThreadPoolExecutor; -import java.util.concurrent.TimeUnit; - -import static seaweed.hdfs.SeaweedFileSystemStore.getParentDirectory; - -public class SeaweedOutputStream extends OutputStream implements Syncable, StreamCapabilities { - - private static final Logger LOG = LoggerFactory.getLogger(SeaweedOutputStream.class); - - private final FilerGrpcClient filerGrpcClient; - private final Path path; - private final int bufferSize; - private final int maxConcurrentRequestCount; - private final ThreadPoolExecutor threadExecutor; - private final ExecutorCompletionService<Void> completionService; - private FilerProto.Entry.Builder entry; - private long position; - private boolean closed; - private boolean supportFlush = true; - private volatile IOException lastError; - private long lastFlushOffset; - private long lastTotalAppendOffset = 0; - private byte[] buffer; - private int bufferIndex; - private ConcurrentLinkedDeque<WriteOperation> writeOperations; - private String replication = "000"; - - public SeaweedOutputStream(FilerGrpcClient filerGrpcClient, final Path path, FilerProto.Entry.Builder entry, - final long position, final int bufferSize, final String replication) { - this.filerGrpcClient = filerGrpcClient; - this.replication = replication; - this.path = path; - this.position = position; - this.closed = false; - this.lastError = null; - this.lastFlushOffset = 0; - this.bufferSize = bufferSize; - this.buffer = new byte[bufferSize]; - this.bufferIndex = 0; - this.writeOperations = new ConcurrentLinkedDeque<>(); - - this.maxConcurrentRequestCount = 4 * Runtime.getRuntime().availableProcessors(); - - this.threadExecutor - = new ThreadPoolExecutor(maxConcurrentRequestCount, - maxConcurrentRequestCount, - 10L, - TimeUnit.SECONDS, - new LinkedBlockingQueue<Runnable>()); - this.completionService = new ExecutorCompletionService<>(this.threadExecutor); - - this.entry = entry; - - } - - private synchronized void flushWrittenBytesToServiceInternal(final long offset) throws IOException { - - LOG.debug("SeaweedWrite.writeMeta path: {} entry:{}", path, entry); - - try { - SeaweedWrite.writeMeta(filerGrpcClient, getParentDirectory(path), entry); - } catch (Exception ex) { - throw new IOException(ex); - } - this.lastFlushOffset = offset; - } - - @Override - public void write(final int byteVal) throws IOException { - write(new byte[]{(byte) (byteVal & 0xFF)}); - } - - @Override - public synchronized void write(final byte[] data, final int off, final int length) - throws IOException { - maybeThrowLastError(); - - Preconditions.checkArgument(data != null, "null data"); - - if (off < 0 || length < 0 || length > data.length - off) { - throw new IndexOutOfBoundsException(); - } - - int currentOffset = off; - int writableBytes = bufferSize - bufferIndex; - int numberOfBytesToWrite = length; - - while (numberOfBytesToWrite > 0) { - if (writableBytes <= numberOfBytesToWrite) { - System.arraycopy(data, currentOffset, buffer, bufferIndex, writableBytes); - bufferIndex += writableBytes; - writeCurrentBufferToService(); - currentOffset += writableBytes; - numberOfBytesToWrite = numberOfBytesToWrite - writableBytes; - } else { - System.arraycopy(data, currentOffset, buffer, bufferIndex, numberOfBytesToWrite); - bufferIndex += numberOfBytesToWrite; - numberOfBytesToWrite = 0; - } - - writableBytes = bufferSize - bufferIndex; - } - } - - /** - * Flushes this output stream and forces any buffered output bytes to be - * written out. If any data remains in the payload it is committed to the - * service. Data is queued for writing and forced out to the service - * before the call returns. - */ - @Override - public void flush() throws IOException { - if (supportFlush) { - flushInternalAsync(); - } - } - - /** - * Similar to posix fsync, flush out the data in client's user buffer - * all the way to the disk device (but the disk may have it in its cache). - * - * @throws IOException if error occurs - */ - @Override - public void hsync() throws IOException { - if (supportFlush) { - flushInternal(); - } - } - - /** - * Flush out the data in client's user buffer. After the return of - * this call, new readers will see the data. - * - * @throws IOException if any error occurs - */ - @Override - public void hflush() throws IOException { - if (supportFlush) { - flushInternal(); - } - } - - /** - * Query the stream for a specific capability. - * - * @param capability string to query the stream support for. - * @return true for hsync and hflush. - */ - @Override - public boolean hasCapability(String capability) { - switch (capability.toLowerCase(Locale.ENGLISH)) { - case StreamCapabilities.HSYNC: - case StreamCapabilities.HFLUSH: - return supportFlush; - default: - return false; - } - } - - /** - * Force all data in the output stream to be written to Azure storage. - * Wait to return until this is complete. Close the access to the stream and - * shutdown the upload thread pool. - * If the blob was created, its lease will be released. - * Any error encountered caught in threads and stored will be rethrown here - * after cleanup. - */ - @Override - public synchronized void close() throws IOException { - if (closed) { - return; - } - - LOG.debug("close path: {}", path); - try { - flushInternal(); - threadExecutor.shutdown(); - } finally { - lastError = new IOException(FSExceptionMessages.STREAM_IS_CLOSED); - buffer = null; - bufferIndex = 0; - closed = true; - writeOperations.clear(); - if (!threadExecutor.isShutdown()) { - threadExecutor.shutdownNow(); - } - } - } - - private synchronized void writeCurrentBufferToService() throws IOException { - if (bufferIndex == 0) { - return; - } - - final byte[] bytes = buffer; - final int bytesLength = bufferIndex; - - buffer = new byte[bufferSize]; - bufferIndex = 0; - final long offset = position; - position += bytesLength; - - if (threadExecutor.getQueue().size() >= maxConcurrentRequestCount * 2) { - waitForTaskToComplete(); - } - - final Future<Void> job = completionService.submit(new Callable<Void>() { - @Override - public Void call() throws Exception { - // originally: client.append(path, offset, bytes, 0, bytesLength); - SeaweedWrite.writeData(entry, replication, filerGrpcClient, offset, bytes, 0, bytesLength); - return null; - } - }); - - writeOperations.add(new WriteOperation(job, offset, bytesLength)); - - // Try to shrink the queue - shrinkWriteOperationQueue(); - } - - private void waitForTaskToComplete() throws IOException { - boolean completed; - for (completed = false; completionService.poll() != null; completed = true) { - // keep polling until there is no data - } - - if (!completed) { - try { - completionService.take(); - } catch (InterruptedException e) { - lastError = (IOException) new InterruptedIOException(e.toString()).initCause(e); - throw lastError; - } - } - } - - private void maybeThrowLastError() throws IOException { - if (lastError != null) { - throw lastError; - } - } - - /** - * Try to remove the completed write operations from the beginning of write - * operation FIFO queue. - */ - private synchronized void shrinkWriteOperationQueue() throws IOException { - try { - while (writeOperations.peek() != null && writeOperations.peek().task.isDone()) { - writeOperations.peek().task.get(); - lastTotalAppendOffset += writeOperations.peek().length; - writeOperations.remove(); - } - } catch (Exception e) { - lastError = new IOException(e); - throw lastError; - } - } - - private synchronized void flushInternal() throws IOException { - maybeThrowLastError(); - writeCurrentBufferToService(); - flushWrittenBytesToService(); - } - - private synchronized void flushInternalAsync() throws IOException { - maybeThrowLastError(); - writeCurrentBufferToService(); - flushWrittenBytesToServiceAsync(); - } - - private synchronized void flushWrittenBytesToService() throws IOException { - for (WriteOperation writeOperation : writeOperations) { - try { - writeOperation.task.get(); - } catch (Exception ex) { - lastError = new IOException(ex); - throw lastError; - } - } - LOG.debug("flushWrittenBytesToService: {} position:{}", path, position); - flushWrittenBytesToServiceInternal(position); - } - - private synchronized void flushWrittenBytesToServiceAsync() throws IOException { - shrinkWriteOperationQueue(); - - if (this.lastTotalAppendOffset > this.lastFlushOffset) { - this.flushWrittenBytesToServiceInternal(this.lastTotalAppendOffset); - } - } - - private static class WriteOperation { - private final Future<Void> task; - private final long startOffset; - private final long length; - - WriteOperation(final Future<Void> task, final long startOffset, final long length) { - Preconditions.checkNotNull(task, "task"); - Preconditions.checkArgument(startOffset >= 0, "startOffset"); - Preconditions.checkArgument(length >= 0, "length"); - - this.task = task; - this.startOffset = startOffset; - this.length = length; - } - } - -} |
