diff options
Diffstat (limited to 'other/java/hdfs2/src')
| -rw-r--r-- | other/java/hdfs2/src/main/java/seaweed/hdfs/BufferedByteBufferReadableInputStream.java | 25 | ||||
| -rw-r--r-- | other/java/hdfs2/src/main/java/seaweed/hdfs/ReadBuffer.java | 137 | ||||
| -rw-r--r-- | other/java/hdfs2/src/main/java/seaweed/hdfs/ReadBufferManager.java | 394 | ||||
| -rw-r--r-- | other/java/hdfs2/src/main/java/seaweed/hdfs/ReadBufferWorker.java | 70 | ||||
| -rw-r--r-- | other/java/hdfs2/src/main/java/seaweed/hdfs/SeaweedAbstractFileSystem.java (renamed from other/java/hdfs2/src/main/java/seaweed/hdfs/ReadBufferStatus.java) | 22 | ||||
| -rw-r--r-- | other/java/hdfs2/src/main/java/seaweed/hdfs/SeaweedFileSystem.java | 72 | ||||
| -rw-r--r-- | other/java/hdfs2/src/main/java/seaweed/hdfs/SeaweedFileSystemStore.java | 75 | ||||
| -rw-r--r-- | other/java/hdfs2/src/main/java/seaweed/hdfs/SeaweedHadoopInputStream.java | 150 | ||||
| -rw-r--r-- | other/java/hdfs2/src/main/java/seaweed/hdfs/SeaweedHadoopOutputStream.java | 16 | ||||
| -rw-r--r-- | other/java/hdfs2/src/main/java/seaweed/hdfs/SeaweedInputStream.java | 371 | ||||
| -rw-r--r-- | other/java/hdfs2/src/main/java/seaweed/hdfs/SeaweedOutputStream.java | 283 |
11 files changed, 297 insertions, 1318 deletions
diff --git a/other/java/hdfs2/src/main/java/seaweed/hdfs/BufferedByteBufferReadableInputStream.java b/other/java/hdfs2/src/main/java/seaweed/hdfs/BufferedByteBufferReadableInputStream.java new file mode 100644 index 000000000..3d0b68a52 --- /dev/null +++ b/other/java/hdfs2/src/main/java/seaweed/hdfs/BufferedByteBufferReadableInputStream.java @@ -0,0 +1,25 @@ +package seaweed.hdfs; + +import org.apache.hadoop.fs.*; + +import java.io.IOException; +import java.nio.ByteBuffer; + +public class BufferedByteBufferReadableInputStream extends BufferedFSInputStream implements ByteBufferReadable { + + public BufferedByteBufferReadableInputStream(FSInputStream in, int size) { + super(in, size); + if (!(in instanceof Seekable) || !(in instanceof PositionedReadable)) { + throw new IllegalArgumentException("In is not an instance of Seekable or PositionedReadable"); + } + } + + @Override + public int read(ByteBuffer buf) throws IOException { + if (this.in instanceof ByteBufferReadable) { + return ((ByteBufferReadable)this.in).read(buf); + } else { + throw new UnsupportedOperationException("Byte-buffer read unsupported by input stream"); + } + } +} diff --git a/other/java/hdfs2/src/main/java/seaweed/hdfs/ReadBuffer.java b/other/java/hdfs2/src/main/java/seaweed/hdfs/ReadBuffer.java deleted file mode 100644 index 926d0b83b..000000000 --- a/other/java/hdfs2/src/main/java/seaweed/hdfs/ReadBuffer.java +++ /dev/null @@ -1,137 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package seaweed.hdfs; - -import java.util.concurrent.CountDownLatch; - -class ReadBuffer { - - private SeaweedInputStream stream; - private long offset; // offset within the file for the buffer - private int length; // actual length, set after the buffer is filles - private int requestedLength; // requested length of the read - private byte[] buffer; // the buffer itself - private int bufferindex = -1; // index in the buffers array in Buffer manager - private ReadBufferStatus status; // status of the buffer - private CountDownLatch latch = null; // signaled when the buffer is done reading, so any client - // waiting on this buffer gets unblocked - - // fields to help with eviction logic - private long timeStamp = 0; // tick at which buffer became available to read - private boolean isFirstByteConsumed = false; - private boolean isLastByteConsumed = false; - private boolean isAnyByteConsumed = false; - - public SeaweedInputStream getStream() { - return stream; - } - - public void setStream(SeaweedInputStream stream) { - this.stream = stream; - } - - public long getOffset() { - return offset; - } - - public void setOffset(long offset) { - this.offset = offset; - } - - public int getLength() { - return length; - } - - public void setLength(int length) { - this.length = length; - } - - public int getRequestedLength() { - return requestedLength; - } - - public void setRequestedLength(int requestedLength) { - this.requestedLength = requestedLength; - } - - public byte[] getBuffer() { - return buffer; - } - - public void setBuffer(byte[] buffer) { - this.buffer = buffer; - } - - public int getBufferindex() { - return bufferindex; - } - - public void setBufferindex(int bufferindex) { - this.bufferindex = bufferindex; - } - - public ReadBufferStatus getStatus() { - return status; - } - - public void setStatus(ReadBufferStatus status) { - this.status = status; - } - - public CountDownLatch getLatch() { - return latch; - } - - public void setLatch(CountDownLatch latch) { - this.latch = latch; - } - - public long getTimeStamp() { - return timeStamp; - } - - public void setTimeStamp(long timeStamp) { - this.timeStamp = timeStamp; - } - - public boolean isFirstByteConsumed() { - return isFirstByteConsumed; - } - - public void setFirstByteConsumed(boolean isFirstByteConsumed) { - this.isFirstByteConsumed = isFirstByteConsumed; - } - - public boolean isLastByteConsumed() { - return isLastByteConsumed; - } - - public void setLastByteConsumed(boolean isLastByteConsumed) { - this.isLastByteConsumed = isLastByteConsumed; - } - - public boolean isAnyByteConsumed() { - return isAnyByteConsumed; - } - - public void setAnyByteConsumed(boolean isAnyByteConsumed) { - this.isAnyByteConsumed = isAnyByteConsumed; - } - -} diff --git a/other/java/hdfs2/src/main/java/seaweed/hdfs/ReadBufferManager.java b/other/java/hdfs2/src/main/java/seaweed/hdfs/ReadBufferManager.java deleted file mode 100644 index 5b1e21529..000000000 --- a/other/java/hdfs2/src/main/java/seaweed/hdfs/ReadBufferManager.java +++ /dev/null @@ -1,394 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * <p> - * http://www.apache.org/licenses/LICENSE-2.0 - * <p> - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package seaweed.hdfs; - -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.util.Collection; -import java.util.LinkedList; -import java.util.Queue; -import java.util.Stack; -import java.util.concurrent.CountDownLatch; - -/** - * The Read Buffer Manager for Rest AbfsClient. - */ -final class ReadBufferManager { - private static final Logger LOGGER = LoggerFactory.getLogger(ReadBufferManager.class); - - private static final int NUM_BUFFERS = 16; - private static final int BLOCK_SIZE = 4 * 1024 * 1024; - private static final int NUM_THREADS = 8; - private static final int THRESHOLD_AGE_MILLISECONDS = 3000; // have to see if 3 seconds is a good threshold - - private Thread[] threads = new Thread[NUM_THREADS]; - private byte[][] buffers; // array of byte[] buffers, to hold the data that is read - private Stack<Integer> freeList = new Stack<>(); // indices in buffers[] array that are available - - private Queue<ReadBuffer> readAheadQueue = new LinkedList<>(); // queue of requests that are not picked up by any worker thread yet - private LinkedList<ReadBuffer> inProgressList = new LinkedList<>(); // requests being processed by worker threads - private LinkedList<ReadBuffer> completedReadList = new LinkedList<>(); // buffers available for reading - private static final ReadBufferManager BUFFER_MANAGER; // singleton, initialized in static initialization block - - static { - BUFFER_MANAGER = new ReadBufferManager(); - BUFFER_MANAGER.init(); - } - - static ReadBufferManager getBufferManager() { - return BUFFER_MANAGER; - } - - private void init() { - buffers = new byte[NUM_BUFFERS][]; - for (int i = 0; i < NUM_BUFFERS; i++) { - buffers[i] = new byte[BLOCK_SIZE]; // same buffers are reused. The byte array never goes back to GC - freeList.add(i); - } - for (int i = 0; i < NUM_THREADS; i++) { - Thread t = new Thread(new ReadBufferWorker(i)); - t.setDaemon(true); - threads[i] = t; - t.setName("SeaweedFS-prefetch-" + i); - t.start(); - } - ReadBufferWorker.UNLEASH_WORKERS.countDown(); - } - - // hide instance constructor - private ReadBufferManager() { - } - - - /* - * - * SeaweedInputStream-facing methods - * - */ - - - /** - * {@link SeaweedInputStream} calls this method to queue read-aheads. - * - * @param stream The {@link SeaweedInputStream} for which to do the read-ahead - * @param requestedOffset The offset in the file which shoukd be read - * @param requestedLength The length to read - */ - void queueReadAhead(final SeaweedInputStream stream, final long requestedOffset, final int requestedLength) { - if (LOGGER.isTraceEnabled()) { - LOGGER.trace("Start Queueing readAhead for {} offset {} length {}", - stream.getPath(), requestedOffset, requestedLength); - } - ReadBuffer buffer; - synchronized (this) { - if (isAlreadyQueued(stream, requestedOffset)) { - return; // already queued, do not queue again - } - if (freeList.isEmpty() && !tryEvict()) { - return; // no buffers available, cannot queue anything - } - - buffer = new ReadBuffer(); - buffer.setStream(stream); - buffer.setOffset(requestedOffset); - buffer.setLength(0); - buffer.setRequestedLength(requestedLength); - buffer.setStatus(ReadBufferStatus.NOT_AVAILABLE); - buffer.setLatch(new CountDownLatch(1)); - - Integer bufferIndex = freeList.pop(); // will return a value, since we have checked size > 0 already - - buffer.setBuffer(buffers[bufferIndex]); - buffer.setBufferindex(bufferIndex); - readAheadQueue.add(buffer); - notifyAll(); - } - if (LOGGER.isTraceEnabled()) { - LOGGER.trace("Done q-ing readAhead for file {} offset {} buffer idx {}", - stream.getPath(), requestedOffset, buffer.getBufferindex()); - } - } - - - /** - * {@link SeaweedInputStream} calls this method read any bytes already available in a buffer (thereby saving a - * remote read). This returns the bytes if the data already exists in buffer. If there is a buffer that is reading - * the requested offset, then this method blocks until that read completes. If the data is queued in a read-ahead - * but not picked up by a worker thread yet, then it cancels that read-ahead and reports cache miss. This is because - * depending on worker thread availability, the read-ahead may take a while - the calling thread can do it's own - * read to get the data faster (copmared to the read waiting in queue for an indeterminate amount of time). - * - * @param stream the file to read bytes for - * @param position the offset in the file to do a read for - * @param length the length to read - * @param buffer the buffer to read data into. Note that the buffer will be written into from offset 0. - * @return the number of bytes read - */ - int getBlock(final SeaweedInputStream stream, final long position, final int length, final byte[] buffer) { - // not synchronized, so have to be careful with locking - if (LOGGER.isTraceEnabled()) { - LOGGER.trace("getBlock for file {} position {} thread {}", - stream.getPath(), position, Thread.currentThread().getName()); - } - - waitForProcess(stream, position); - - int bytesRead = 0; - synchronized (this) { - bytesRead = getBlockFromCompletedQueue(stream, position, length, buffer); - } - if (bytesRead > 0) { - if (LOGGER.isTraceEnabled()) { - LOGGER.trace("Done read from Cache for {} position {} length {}", - stream.getPath(), position, bytesRead); - } - return bytesRead; - } - - // otherwise, just say we got nothing - calling thread can do its own read - return 0; - } - - /* - * - * Internal methods - * - */ - - private void waitForProcess(final SeaweedInputStream stream, final long position) { - ReadBuffer readBuf; - synchronized (this) { - clearFromReadAheadQueue(stream, position); - readBuf = getFromList(inProgressList, stream, position); - } - if (readBuf != null) { // if in in-progress queue, then block for it - try { - if (LOGGER.isTraceEnabled()) { - LOGGER.trace("got a relevant read buffer for file {} offset {} buffer idx {}", - stream.getPath(), readBuf.getOffset(), readBuf.getBufferindex()); - } - readBuf.getLatch().await(); // blocking wait on the caller stream's thread - // Note on correctness: readBuf gets out of inProgressList only in 1 place: after worker thread - // is done processing it (in doneReading). There, the latch is set after removing the buffer from - // inProgressList. So this latch is safe to be outside the synchronized block. - // Putting it in synchronized would result in a deadlock, since this thread would be holding the lock - // while waiting, so no one will be able to change any state. If this becomes more complex in the future, - // then the latch cane be removed and replaced with wait/notify whenever inProgressList is touched. - } catch (InterruptedException ex) { - Thread.currentThread().interrupt(); - } - if (LOGGER.isTraceEnabled()) { - LOGGER.trace("latch done for file {} buffer idx {} length {}", - stream.getPath(), readBuf.getBufferindex(), readBuf.getLength()); - } - } - } - - /** - * If any buffer in the completedlist can be reclaimed then reclaim it and return the buffer to free list. - * The objective is to find just one buffer - there is no advantage to evicting more than one. - * - * @return whether the eviction succeeeded - i.e., were we able to free up one buffer - */ - private synchronized boolean tryEvict() { - ReadBuffer nodeToEvict = null; - if (completedReadList.size() <= 0) { - return false; // there are no evict-able buffers - } - - // first, try buffers where all bytes have been consumed (approximated as first and last bytes consumed) - for (ReadBuffer buf : completedReadList) { - if (buf.isFirstByteConsumed() && buf.isLastByteConsumed()) { - nodeToEvict = buf; - break; - } - } - if (nodeToEvict != null) { - return evict(nodeToEvict); - } - - // next, try buffers where any bytes have been consumed (may be a bad idea? have to experiment and see) - for (ReadBuffer buf : completedReadList) { - if (buf.isAnyByteConsumed()) { - nodeToEvict = buf; - break; - } - } - - if (nodeToEvict != null) { - return evict(nodeToEvict); - } - - // next, try any old nodes that have not been consumed - long earliestBirthday = Long.MAX_VALUE; - for (ReadBuffer buf : completedReadList) { - if (buf.getTimeStamp() < earliestBirthday) { - nodeToEvict = buf; - earliestBirthday = buf.getTimeStamp(); - } - } - if ((currentTimeMillis() - earliestBirthday > THRESHOLD_AGE_MILLISECONDS) && (nodeToEvict != null)) { - return evict(nodeToEvict); - } - - // nothing can be evicted - return false; - } - - private boolean evict(final ReadBuffer buf) { - freeList.push(buf.getBufferindex()); - completedReadList.remove(buf); - if (LOGGER.isTraceEnabled()) { - LOGGER.trace("Evicting buffer idx {}; was used for file {} offset {} length {}", - buf.getBufferindex(), buf.getStream().getPath(), buf.getOffset(), buf.getLength()); - } - return true; - } - - private boolean isAlreadyQueued(final SeaweedInputStream stream, final long requestedOffset) { - // returns true if any part of the buffer is already queued - return (isInList(readAheadQueue, stream, requestedOffset) - || isInList(inProgressList, stream, requestedOffset) - || isInList(completedReadList, stream, requestedOffset)); - } - - private boolean isInList(final Collection<ReadBuffer> list, final SeaweedInputStream stream, final long requestedOffset) { - return (getFromList(list, stream, requestedOffset) != null); - } - - private ReadBuffer getFromList(final Collection<ReadBuffer> list, final SeaweedInputStream stream, final long requestedOffset) { - for (ReadBuffer buffer : list) { - if (buffer.getStream() == stream) { - if (buffer.getStatus() == ReadBufferStatus.AVAILABLE - && requestedOffset >= buffer.getOffset() - && requestedOffset < buffer.getOffset() + buffer.getLength()) { - return buffer; - } else if (requestedOffset >= buffer.getOffset() - && requestedOffset < buffer.getOffset() + buffer.getRequestedLength()) { - return buffer; - } - } - } - return null; - } - - private void clearFromReadAheadQueue(final SeaweedInputStream stream, final long requestedOffset) { - ReadBuffer buffer = getFromList(readAheadQueue, stream, requestedOffset); - if (buffer != null) { - readAheadQueue.remove(buffer); - notifyAll(); // lock is held in calling method - freeList.push(buffer.getBufferindex()); - } - } - - private int getBlockFromCompletedQueue(final SeaweedInputStream stream, final long position, final int length, - final byte[] buffer) { - ReadBuffer buf = getFromList(completedReadList, stream, position); - if (buf == null || position >= buf.getOffset() + buf.getLength()) { - return 0; - } - int cursor = (int) (position - buf.getOffset()); - int availableLengthInBuffer = buf.getLength() - cursor; - int lengthToCopy = Math.min(length, availableLengthInBuffer); - System.arraycopy(buf.getBuffer(), cursor, buffer, 0, lengthToCopy); - if (cursor == 0) { - buf.setFirstByteConsumed(true); - } - if (cursor + lengthToCopy == buf.getLength()) { - buf.setLastByteConsumed(true); - } - buf.setAnyByteConsumed(true); - return lengthToCopy; - } - - /* - * - * ReadBufferWorker-thread-facing methods - * - */ - - /** - * ReadBufferWorker thread calls this to get the next buffer that it should work on. - * - * @return {@link ReadBuffer} - * @throws InterruptedException if thread is interrupted - */ - ReadBuffer getNextBlockToRead() throws InterruptedException { - ReadBuffer buffer = null; - synchronized (this) { - //buffer = readAheadQueue.take(); // blocking method - while (readAheadQueue.size() == 0) { - wait(); - } - buffer = readAheadQueue.remove(); - notifyAll(); - if (buffer == null) { - return null; // should never happen - } - buffer.setStatus(ReadBufferStatus.READING_IN_PROGRESS); - inProgressList.add(buffer); - } - if (LOGGER.isTraceEnabled()) { - LOGGER.trace("ReadBufferWorker picked file {} for offset {}", - buffer.getStream().getPath(), buffer.getOffset()); - } - return buffer; - } - - /** - * ReadBufferWorker thread calls this method to post completion. - * - * @param buffer the buffer whose read was completed - * @param result the {@link ReadBufferStatus} after the read operation in the worker thread - * @param bytesActuallyRead the number of bytes that the worker thread was actually able to read - */ - void doneReading(final ReadBuffer buffer, final ReadBufferStatus result, final int bytesActuallyRead) { - if (LOGGER.isTraceEnabled()) { - LOGGER.trace("ReadBufferWorker completed file {} for offset {} bytes {}", - buffer.getStream().getPath(), buffer.getOffset(), bytesActuallyRead); - } - synchronized (this) { - inProgressList.remove(buffer); - if (result == ReadBufferStatus.AVAILABLE && bytesActuallyRead > 0) { - buffer.setStatus(ReadBufferStatus.AVAILABLE); - buffer.setTimeStamp(currentTimeMillis()); - buffer.setLength(bytesActuallyRead); - completedReadList.add(buffer); - } else { - freeList.push(buffer.getBufferindex()); - // buffer should go out of scope after the end of the calling method in ReadBufferWorker, and eligible for GC - } - } - //outside the synchronized, since anyone receiving a wake-up from the latch must see safe-published results - buffer.getLatch().countDown(); // wake up waiting threads (if any) - } - - /** - * Similar to System.currentTimeMillis, except implemented with System.nanoTime(). - * System.currentTimeMillis can go backwards when system clock is changed (e.g., with NTP time synchronization), - * making it unsuitable for measuring time intervals. nanotime is strictly monotonically increasing per CPU core. - * Note: it is not monotonic across Sockets, and even within a CPU, its only the - * more recent parts which share a clock across all cores. - * - * @return current time in milliseconds - */ - private long currentTimeMillis() { - return System.nanoTime() / 1000 / 1000; - } -} diff --git a/other/java/hdfs2/src/main/java/seaweed/hdfs/ReadBufferWorker.java b/other/java/hdfs2/src/main/java/seaweed/hdfs/ReadBufferWorker.java deleted file mode 100644 index 6ffbc4644..000000000 --- a/other/java/hdfs2/src/main/java/seaweed/hdfs/ReadBufferWorker.java +++ /dev/null @@ -1,70 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package seaweed.hdfs; - -import java.util.concurrent.CountDownLatch; - -class ReadBufferWorker implements Runnable { - - protected static final CountDownLatch UNLEASH_WORKERS = new CountDownLatch(1); - private int id; - - ReadBufferWorker(final int id) { - this.id = id; - } - - /** - * return the ID of ReadBufferWorker. - */ - public int getId() { - return this.id; - } - - /** - * Waits until a buffer becomes available in ReadAheadQueue. - * Once a buffer becomes available, reads the file specified in it and then posts results back to buffer manager. - * Rinse and repeat. Forever. - */ - public void run() { - try { - UNLEASH_WORKERS.await(); - } catch (InterruptedException ex) { - Thread.currentThread().interrupt(); - } - ReadBufferManager bufferManager = ReadBufferManager.getBufferManager(); - ReadBuffer buffer; - while (true) { - try { - buffer = bufferManager.getNextBlockToRead(); // blocks, until a buffer is available for this thread - } catch (InterruptedException ex) { - Thread.currentThread().interrupt(); - return; - } - if (buffer != null) { - try { - // do the actual read, from the file. - int bytesRead = buffer.getStream().readRemote(buffer.getOffset(), buffer.getBuffer(), 0, buffer.getRequestedLength()); - bufferManager.doneReading(buffer, ReadBufferStatus.AVAILABLE, bytesRead); // post result back to ReadBufferManager - } catch (Exception ex) { - bufferManager.doneReading(buffer, ReadBufferStatus.READ_FAILED, 0); - } - } - } - } -} diff --git a/other/java/hdfs2/src/main/java/seaweed/hdfs/ReadBufferStatus.java b/other/java/hdfs2/src/main/java/seaweed/hdfs/SeaweedAbstractFileSystem.java index d63674977..e021401aa 100644 --- a/other/java/hdfs2/src/main/java/seaweed/hdfs/ReadBufferStatus.java +++ b/other/java/hdfs2/src/main/java/seaweed/hdfs/SeaweedAbstractFileSystem.java @@ -18,12 +18,18 @@ package seaweed.hdfs; -/** - * The ReadBufferStatus for Rest AbfsClient - */ -public enum ReadBufferStatus { - NOT_AVAILABLE, // buffers sitting in readaheadqueue have this stats - READING_IN_PROGRESS, // reading is in progress on this buffer. Buffer should be in inProgressList - AVAILABLE, // data is available in buffer. It should be in completedList - READ_FAILED // read completed, but failed. +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.DelegateToFileSystem; + +import java.io.IOException; +import java.net.URI; +import java.net.URISyntaxException; + +public class SeaweedAbstractFileSystem extends DelegateToFileSystem { + + SeaweedAbstractFileSystem(final URI uri, final Configuration conf) + throws IOException, URISyntaxException { + super(uri, new SeaweedFileSystem(), conf, "seaweedfs", false); + } + } diff --git a/other/java/hdfs2/src/main/java/seaweed/hdfs/SeaweedFileSystem.java b/other/java/hdfs2/src/main/java/seaweed/hdfs/SeaweedFileSystem.java index d471d8440..25395db7a 100644 --- a/other/java/hdfs2/src/main/java/seaweed/hdfs/SeaweedFileSystem.java +++ b/other/java/hdfs2/src/main/java/seaweed/hdfs/SeaweedFileSystem.java @@ -5,31 +5,31 @@ import org.apache.hadoop.fs.*; import org.apache.hadoop.fs.permission.AclEntry; import org.apache.hadoop.fs.permission.AclStatus; import org.apache.hadoop.fs.permission.FsPermission; -import org.apache.hadoop.security.AccessControlException; import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.util.Progressable; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import seaweedfs.client.FilerProto; import java.io.FileNotFoundException; import java.io.IOException; -import java.io.InputStream; import java.io.OutputStream; import java.net.URI; import java.util.EnumSet; import java.util.List; import java.util.Map; -import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.IO_FILE_BUFFER_SIZE_KEY; +public class SeaweedFileSystem extends FileSystem { -public class SeaweedFileSystem extends org.apache.hadoop.fs.FileSystem { - - public static final int FS_SEAWEED_DEFAULT_PORT = 8888; public static final String FS_SEAWEED_FILER_HOST = "fs.seaweed.filer.host"; public static final String FS_SEAWEED_FILER_PORT = "fs.seaweed.filer.port"; + public static final int FS_SEAWEED_DEFAULT_PORT = 8888; + public static final String FS_SEAWEED_BUFFER_SIZE = "fs.seaweed.buffer.size"; + public static final String FS_SEAWEED_REPLICATION = "fs.seaweed.replication"; + public static final String FS_SEAWEED_VOLUME_SERVER_ACCESS = "fs.seaweed.volume.server.access"; + public static final int FS_SEAWEED_DEFAULT_BUFFER_SIZE = 4 * 1024 * 1024; private static final Logger LOG = LoggerFactory.getLogger(SeaweedFileSystem.class); - private static int BUFFER_SIZE = 16 * 1024 * 1024; private URI uri; private Path workingDirectory = new Path("/"); @@ -60,16 +60,20 @@ public class SeaweedFileSystem extends org.apache.hadoop.fs.FileSystem { port = (port == -1) ? FS_SEAWEED_DEFAULT_PORT : port; conf.setInt(FS_SEAWEED_FILER_PORT, port); - conf.setInt(IO_FILE_BUFFER_SIZE_KEY, BUFFER_SIZE); - setConf(conf); this.uri = uri; - seaweedFileSystemStore = new SeaweedFileSystemStore(host, port); + seaweedFileSystemStore = new SeaweedFileSystemStore(host, port, conf); } @Override + public void close() throws IOException { + super.close(); + this.seaweedFileSystemStore.close(); + } + + @Override public FSDataInputStream open(Path path, int bufferSize) throws IOException { LOG.debug("open path: {} bufferSize:{}", path, bufferSize); @@ -77,8 +81,9 @@ public class SeaweedFileSystem extends org.apache.hadoop.fs.FileSystem { path = qualify(path); try { - InputStream inputStream = seaweedFileSystemStore.openFileForRead(path, statistics, bufferSize); - return new FSDataInputStream(inputStream); + int seaweedBufferSize = this.getConf().getInt(FS_SEAWEED_BUFFER_SIZE, FS_SEAWEED_DEFAULT_BUFFER_SIZE); + FSInputStream inputStream = seaweedFileSystemStore.openFileForRead(path, statistics); + return new FSDataInputStream(new BufferedByteBufferReadableInputStream(inputStream, 4 * seaweedBufferSize)); } catch (Exception ex) { LOG.warn("open path: {} bufferSize:{}", path, bufferSize, ex); return null; @@ -94,8 +99,9 @@ public class SeaweedFileSystem extends org.apache.hadoop.fs.FileSystem { path = qualify(path); try { - String replicaPlacement = String.format("%03d", replication - 1); - OutputStream outputStream = seaweedFileSystemStore.createFile(path, overwrite, permission, bufferSize, replicaPlacement); + String replicaPlacement = this.getConf().get(FS_SEAWEED_REPLICATION, String.format("%03d", replication - 1)); + int seaweedBufferSize = this.getConf().getInt(FS_SEAWEED_BUFFER_SIZE, FS_SEAWEED_DEFAULT_BUFFER_SIZE); + OutputStream outputStream = seaweedFileSystemStore.createFile(path, overwrite, permission, seaweedBufferSize, replicaPlacement); return new FSDataOutputStream(outputStream, statistics); } catch (Exception ex) { LOG.warn("create path: {} bufferSize:{} blockSize:{}", path, bufferSize, blockSize, ex); @@ -105,8 +111,9 @@ public class SeaweedFileSystem extends org.apache.hadoop.fs.FileSystem { /** * {@inheritDoc} + * * @throws FileNotFoundException if the parent directory is not present -or - * is not a directory. + * is not a directory. */ @Override public FSDataOutputStream createNonRecursive(Path path, @@ -123,9 +130,10 @@ public class SeaweedFileSystem extends org.apache.hadoop.fs.FileSystem { throw new FileAlreadyExistsException("Not a directory: " + parent); } } + int seaweedBufferSize = this.getConf().getInt(FS_SEAWEED_BUFFER_SIZE, FS_SEAWEED_DEFAULT_BUFFER_SIZE); return create(path, permission, flags.contains(CreateFlag.OVERWRITE), bufferSize, - replication, blockSize, progress); + replication, seaweedBufferSize, progress); } @Override @@ -135,7 +143,8 @@ public class SeaweedFileSystem extends org.apache.hadoop.fs.FileSystem { path = qualify(path); try { - OutputStream outputStream = seaweedFileSystemStore.createFile(path, false, null, bufferSize, ""); + int seaweedBufferSize = this.getConf().getInt(FS_SEAWEED_BUFFER_SIZE, FS_SEAWEED_DEFAULT_BUFFER_SIZE); + OutputStream outputStream = seaweedFileSystemStore.createFile(path, false, null, seaweedBufferSize, ""); return new FSDataOutputStream(outputStream, statistics); } catch (Exception ex) { LOG.warn("append path: {} bufferSize:{}", path, bufferSize, ex); @@ -144,7 +153,7 @@ public class SeaweedFileSystem extends org.apache.hadoop.fs.FileSystem { } @Override - public boolean rename(Path src, Path dst) { + public boolean rename(Path src, Path dst) throws IOException { LOG.debug("rename path: {} => {}", src, dst); @@ -155,12 +164,13 @@ public class SeaweedFileSystem extends org.apache.hadoop.fs.FileSystem { if (src.equals(dst)) { return true; } - FileStatus dstFileStatus = getFileStatus(dst); + FilerProto.Entry entry = seaweedFileSystemStore.lookupEntry(dst); - String sourceFileName = src.getName(); Path adjustedDst = dst; - if (dstFileStatus != null) { + if (entry != null) { + FileStatus dstFileStatus = getFileStatus(dst); + String sourceFileName = src.getName(); if (!dstFileStatus.isDirectory()) { return false; } @@ -175,18 +185,20 @@ public class SeaweedFileSystem extends org.apache.hadoop.fs.FileSystem { } @Override - public boolean delete(Path path, boolean recursive) { + public boolean delete(Path path, boolean recursive) throws IOException { LOG.debug("delete path: {} recursive:{}", path, recursive); path = qualify(path); - FileStatus fileStatus = getFileStatus(path); + FilerProto.Entry entry = seaweedFileSystemStore.lookupEntry(path); - if (fileStatus == null) { + if (entry == null) { return true; } + FileStatus fileStatus = getFileStatus(path); + return seaweedFileSystemStore.deleteEntries(path, fileStatus.isDirectory(), recursive); } @@ -222,9 +234,9 @@ public class SeaweedFileSystem extends org.apache.hadoop.fs.FileSystem { path = qualify(path); - FileStatus fileStatus = getFileStatus(path); + FilerProto.Entry entry = seaweedFileSystemStore.lookupEntry(path); - if (fileStatus == null) { + if (entry == null) { UserGroupInformation currentUser = UserGroupInformation.getCurrentUser(); return seaweedFileSystemStore.createDirectory(path, currentUser, @@ -233,6 +245,8 @@ public class SeaweedFileSystem extends org.apache.hadoop.fs.FileSystem { } + FileStatus fileStatus = getFileStatus(path); + if (fileStatus.isDirectory()) { return true; } else { @@ -241,7 +255,7 @@ public class SeaweedFileSystem extends org.apache.hadoop.fs.FileSystem { } @Override - public FileStatus getFileStatus(Path path) { + public FileStatus getFileStatus(Path path) throws IOException { LOG.debug("getFileStatus path: {}", path); @@ -335,9 +349,7 @@ public class SeaweedFileSystem extends org.apache.hadoop.fs.FileSystem { @Override public void createSymlink(final Path target, final Path link, - final boolean createParent) throws AccessControlException, - FileAlreadyExistsException, FileNotFoundException, - ParentNotDirectoryException, UnsupportedFileSystemException, + final boolean createParent) throws IOException { // Supporting filesystems should override this method throw new UnsupportedOperationException( diff --git a/other/java/hdfs2/src/main/java/seaweed/hdfs/SeaweedFileSystemStore.java b/other/java/hdfs2/src/main/java/seaweed/hdfs/SeaweedFileSystemStore.java index 774c090e8..f4e8c9349 100644 --- a/other/java/hdfs2/src/main/java/seaweed/hdfs/SeaweedFileSystemStore.java +++ b/other/java/hdfs2/src/main/java/seaweed/hdfs/SeaweedFileSystemStore.java @@ -1,5 +1,7 @@ package seaweed.hdfs; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FSInputStream; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; @@ -7,30 +9,43 @@ import org.apache.hadoop.fs.permission.FsPermission; import org.apache.hadoop.security.UserGroupInformation; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import seaweedfs.client.FilerClient; -import seaweedfs.client.FilerGrpcClient; -import seaweedfs.client.FilerProto; -import seaweedfs.client.SeaweedRead; +import seaweedfs.client.*; import java.io.FileNotFoundException; import java.io.IOException; -import java.io.InputStream; import java.io.OutputStream; import java.util.ArrayList; import java.util.Arrays; import java.util.List; +import static seaweed.hdfs.SeaweedFileSystem.*; + public class SeaweedFileSystemStore { private static final Logger LOG = LoggerFactory.getLogger(SeaweedFileSystemStore.class); - private FilerGrpcClient filerGrpcClient; private FilerClient filerClient; + private Configuration conf; - public SeaweedFileSystemStore(String host, int port) { + public SeaweedFileSystemStore(String host, int port, Configuration conf) { int grpcPort = 10000 + port; - filerGrpcClient = new FilerGrpcClient(host, grpcPort); - filerClient = new FilerClient(filerGrpcClient); + filerClient = new FilerClient(host, grpcPort); + this.conf = conf; + String volumeServerAccessMode = this.conf.get(FS_SEAWEED_VOLUME_SERVER_ACCESS, "direct"); + if (volumeServerAccessMode.equals("publicUrl")) { + filerClient.setAccessVolumeServerByPublicUrl(); + } else if (volumeServerAccessMode.equals("filerProxy")) { + filerClient.setAccessVolumeServerByFilerProxy(); + } + + } + + public void close() { + try { + this.filerClient.shutdown(); + } catch (InterruptedException e) { + e.printStackTrace(); + } } public static String getParentDirectory(Path path) { @@ -61,9 +76,19 @@ public class SeaweedFileSystemStore { ); } - public FileStatus[] listEntries(final Path path) { + public FileStatus[] listEntries(final Path path) throws IOException { LOG.debug("listEntries path: {}", path); + FileStatus pathStatus = getFileStatus(path); + + if (pathStatus == null) { + return new FileStatus[0]; + } + + if (!pathStatus.isDirectory()) { + return new FileStatus[]{pathStatus}; + } + List<FileStatus> fileStatuses = new ArrayList<FileStatus>(); List<FilerProto.Entry> entries = filerClient.listEntries(path.toUri().getPath()); @@ -74,14 +99,16 @@ public class SeaweedFileSystemStore { fileStatuses.add(fileStatus); } + LOG.debug("listEntries path: {} size {}", fileStatuses, fileStatuses.size()); return fileStatuses.toArray(new FileStatus[0]); + } - public FileStatus getFileStatus(final Path path) { + public FileStatus getFileStatus(final Path path) throws IOException { FilerProto.Entry entry = lookupEntry(path); if (entry == null) { - return null; + throw new FileNotFoundException("File does not exist: " + path); } LOG.debug("doGetFileStatus path:{} entry:{}", path, entry); @@ -111,10 +138,10 @@ public class SeaweedFileSystemStore { private FileStatus doGetFileStatus(Path path, FilerProto.Entry entry) { FilerProto.FuseAttributes attributes = entry.getAttributes(); - long length = SeaweedRead.totalSize(entry.getChunksList()); + long length = SeaweedRead.fileSize(entry); boolean isDir = entry.getIsDirectory(); int block_replication = 1; - int blocksize = 512; + int blocksize = this.conf.getInt(FS_SEAWEED_BUFFER_SIZE, FS_SEAWEED_DEFAULT_BUFFER_SIZE); long modification_time = attributes.getMtime() * 1000; // milliseconds long access_time = 0; FsPermission permission = FsPermission.createImmutable((short) attributes.getFileMode()); @@ -124,7 +151,7 @@ public class SeaweedFileSystemStore { modification_time, access_time, permission, owner, group, null, path); } - private FilerProto.Entry lookupEntry(Path path) { + public FilerProto.Entry lookupEntry(Path path) { return filerClient.lookupEntry(getParentDirectory(path), path.getName()); @@ -170,9 +197,10 @@ public class SeaweedFileSystemStore { if (existingEntry != null) { entry = FilerProto.Entry.newBuilder(); entry.mergeFrom(existingEntry); + entry.clearContent(); entry.getAttributesBuilder().setMtime(now); LOG.debug("createFile merged entry path:{} entry:{} from:{}", path, entry, existingEntry); - writePosition = SeaweedRead.totalSize(existingEntry.getChunksList()); + writePosition = SeaweedRead.fileSize(existingEntry); replication = existingEntry.getAttributes().getReplication(); } } @@ -189,30 +217,27 @@ public class SeaweedFileSystemStore { .clearGroupName() .addAllGroupName(Arrays.asList(userGroupInformation.getGroupNames())) ); + SeaweedWrite.writeMeta(filerClient, getParentDirectory(path), entry); } - return new SeaweedOutputStream(filerGrpcClient, path, entry, writePosition, bufferSize, replication); + return new SeaweedHadoopOutputStream(filerClient, path.toString(), entry, writePosition, bufferSize, replication); } - public InputStream openFileForRead(final Path path, FileSystem.Statistics statistics, - int bufferSize) throws IOException { + public FSInputStream openFileForRead(final Path path, FileSystem.Statistics statistics) throws IOException { - LOG.debug("openFileForRead path:{} bufferSize:{}", path, bufferSize); + LOG.debug("openFileForRead path:{}", path); - int readAheadQueueDepth = 2; FilerProto.Entry entry = lookupEntry(path); if (entry == null) { throw new FileNotFoundException("read non-exist file " + path); } - return new SeaweedInputStream(filerGrpcClient, + return new SeaweedHadoopInputStream(filerClient, statistics, path.toUri().getPath(), - entry, - bufferSize, - readAheadQueueDepth); + entry); } public void setOwner(Path path, String owner, String group) { diff --git a/other/java/hdfs2/src/main/java/seaweed/hdfs/SeaweedHadoopInputStream.java b/other/java/hdfs2/src/main/java/seaweed/hdfs/SeaweedHadoopInputStream.java new file mode 100644 index 000000000..f26eae597 --- /dev/null +++ b/other/java/hdfs2/src/main/java/seaweed/hdfs/SeaweedHadoopInputStream.java @@ -0,0 +1,150 @@ +package seaweed.hdfs; + +// based on org.apache.hadoop.fs.azurebfs.services.AbfsInputStream + +import org.apache.hadoop.fs.ByteBufferReadable; +import org.apache.hadoop.fs.FSInputStream; +import org.apache.hadoop.fs.FileSystem.Statistics; +import seaweedfs.client.FilerClient; +import seaweedfs.client.FilerProto; +import seaweedfs.client.SeaweedInputStream; + +import java.io.EOFException; +import java.io.IOException; +import java.nio.ByteBuffer; + +public class SeaweedHadoopInputStream extends FSInputStream implements ByteBufferReadable { + + private final SeaweedInputStream seaweedInputStream; + private final Statistics statistics; + + public SeaweedHadoopInputStream( + final FilerClient filerClient, + final Statistics statistics, + final String path, + final FilerProto.Entry entry) throws IOException { + this.seaweedInputStream = new SeaweedInputStream(filerClient, path, entry); + this.statistics = statistics; + } + + @Override + public int read() throws IOException { + return seaweedInputStream.read(); + } + + @Override + public int read(final byte[] b, final int off, final int len) throws IOException { + return seaweedInputStream.read(b, off, len); + } + + // implement ByteBufferReadable + @Override + public synchronized int read(ByteBuffer buf) throws IOException { + int bytesRead = seaweedInputStream.read(buf); + + if (bytesRead > 0) { + if (statistics != null) { + statistics.incrementBytesRead(bytesRead); + } + } + + return bytesRead; + } + + /** + * Seek to given position in stream. + * + * @param n position to seek to + * @throws IOException if there is an error + * @throws EOFException if attempting to seek past end of file + */ + @Override + public synchronized void seek(long n) throws IOException { + seaweedInputStream.seek(n); + } + + @Override + public synchronized long skip(long n) throws IOException { + return seaweedInputStream.skip(n); + } + + /** + * Return the size of the remaining available bytes + * if the size is less than or equal to {@link Integer#MAX_VALUE}, + * otherwise, return {@link Integer#MAX_VALUE}. + * <p> + * This is to match the behavior of DFSInputStream.available(), + * which some clients may rely on (HBase write-ahead log reading in + * particular). + */ + @Override + public synchronized int available() throws IOException { + return seaweedInputStream.available(); + } + + /** + * Returns the length of the file that this stream refers to. Note that the length returned is the length + * as of the time the Stream was opened. Specifically, if there have been subsequent appends to the file, + * they wont be reflected in the returned length. + * + * @return length of the file. + * @throws IOException if the stream is closed + */ + public long length() throws IOException { + return seaweedInputStream.length(); + } + + /** + * Return the current offset from the start of the file + * + * @throws IOException throws {@link IOException} if there is an error + */ + @Override + public synchronized long getPos() throws IOException { + return seaweedInputStream.getPos(); + } + + /** + * Seeks a different copy of the data. Returns true if + * found a new source, false otherwise. + * + * @throws IOException throws {@link IOException} if there is an error + */ + @Override + public boolean seekToNewSource(long l) throws IOException { + return false; + } + + @Override + public synchronized void close() throws IOException { + seaweedInputStream.close(); + } + + /** + * Not supported by this stream. Throws {@link UnsupportedOperationException} + * + * @param readlimit ignored + */ + @Override + public synchronized void mark(int readlimit) { + throw new UnsupportedOperationException("mark()/reset() not supported on this stream"); + } + + /** + * Not supported by this stream. Throws {@link UnsupportedOperationException} + */ + @Override + public synchronized void reset() throws IOException { + throw new UnsupportedOperationException("mark()/reset() not supported on this stream"); + } + + /** + * gets whether mark and reset are supported by {@code ADLFileInputStream}. Always returns false. + * + * @return always {@code false} + */ + @Override + public boolean markSupported() { + return false; + } +} diff --git a/other/java/hdfs2/src/main/java/seaweed/hdfs/SeaweedHadoopOutputStream.java b/other/java/hdfs2/src/main/java/seaweed/hdfs/SeaweedHadoopOutputStream.java new file mode 100644 index 000000000..da5b56bbc --- /dev/null +++ b/other/java/hdfs2/src/main/java/seaweed/hdfs/SeaweedHadoopOutputStream.java @@ -0,0 +1,16 @@ +package seaweed.hdfs; + +// adapted from org.apache.hadoop.fs.azurebfs.services.AbfsOutputStream + +import seaweedfs.client.FilerClient; +import seaweedfs.client.FilerProto; +import seaweedfs.client.SeaweedOutputStream; + +public class SeaweedHadoopOutputStream extends SeaweedOutputStream { + + public SeaweedHadoopOutputStream(FilerClient filerClient, final String path, FilerProto.Entry.Builder entry, + final long position, final int bufferSize, final String replication) { + super(filerClient, path, entry, position, bufferSize, replication); + } + +} diff --git a/other/java/hdfs2/src/main/java/seaweed/hdfs/SeaweedInputStream.java b/other/java/hdfs2/src/main/java/seaweed/hdfs/SeaweedInputStream.java deleted file mode 100644 index 90c14c772..000000000 --- a/other/java/hdfs2/src/main/java/seaweed/hdfs/SeaweedInputStream.java +++ /dev/null @@ -1,371 +0,0 @@ -package seaweed.hdfs; - -// based on org.apache.hadoop.fs.azurebfs.services.AbfsInputStream - -import com.google.common.base.Preconditions; -import org.apache.hadoop.fs.FSExceptionMessages; -import org.apache.hadoop.fs.FSInputStream; -import org.apache.hadoop.fs.FileSystem.Statistics; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; -import seaweedfs.client.FilerGrpcClient; -import seaweedfs.client.FilerProto; -import seaweedfs.client.SeaweedRead; - -import java.io.EOFException; -import java.io.IOException; -import java.util.List; - -public class SeaweedInputStream extends FSInputStream { - - private static final Logger LOG = LoggerFactory.getLogger(SeaweedInputStream.class); - - private final FilerGrpcClient filerGrpcClient; - private final Statistics statistics; - private final String path; - private final FilerProto.Entry entry; - private final List<SeaweedRead.VisibleInterval> visibleIntervalList; - private final long contentLength; - private final int bufferSize; // default buffer size - private final int readAheadQueueDepth; // initialized in constructor - private final boolean readAheadEnabled; // whether enable readAhead; - - private byte[] buffer = null; // will be initialized on first use - - private long fCursor = 0; // cursor of buffer within file - offset of next byte to read from remote server - private long fCursorAfterLastRead = -1; - private int bCursor = 0; // cursor of read within buffer - offset of next byte to be returned from buffer - private int limit = 0; // offset of next byte to be read into buffer from service (i.e., upper marker+1 - // of valid bytes in buffer) - private boolean closed = false; - - public SeaweedInputStream( - final FilerGrpcClient filerGrpcClient, - final Statistics statistics, - final String path, - final FilerProto.Entry entry, - final int bufferSize, - final int readAheadQueueDepth) { - this.filerGrpcClient = filerGrpcClient; - this.statistics = statistics; - this.path = path; - this.entry = entry; - this.contentLength = SeaweedRead.totalSize(entry.getChunksList()); - this.bufferSize = bufferSize; - this.readAheadQueueDepth = (readAheadQueueDepth >= 0) ? readAheadQueueDepth : Runtime.getRuntime().availableProcessors(); - this.readAheadEnabled = true; - - this.visibleIntervalList = SeaweedRead.nonOverlappingVisibleIntervals(entry.getChunksList()); - - LOG.debug("new path:{} entry:{} visibleIntervalList:{}", path, entry, visibleIntervalList); - - } - - public String getPath() { - return path; - } - - @Override - public int read() throws IOException { - byte[] b = new byte[1]; - int numberOfBytesRead = read(b, 0, 1); - if (numberOfBytesRead < 0) { - return -1; - } else { - return (b[0] & 0xFF); - } - } - - @Override - public synchronized int read(final byte[] b, final int off, final int len) throws IOException { - int currentOff = off; - int currentLen = len; - int lastReadBytes; - int totalReadBytes = 0; - do { - lastReadBytes = readOneBlock(b, currentOff, currentLen); - if (lastReadBytes > 0) { - currentOff += lastReadBytes; - currentLen -= lastReadBytes; - totalReadBytes += lastReadBytes; - } - if (currentLen <= 0 || currentLen > b.length - currentOff) { - break; - } - } while (lastReadBytes > 0); - return totalReadBytes > 0 ? totalReadBytes : lastReadBytes; - } - - private int readOneBlock(final byte[] b, final int off, final int len) throws IOException { - if (closed) { - throw new IOException(FSExceptionMessages.STREAM_IS_CLOSED); - } - - Preconditions.checkNotNull(b); - - if (len == 0) { - return 0; - } - - if (this.available() == 0) { - return -1; - } - - if (off < 0 || len < 0 || len > b.length - off) { - throw new IndexOutOfBoundsException(); - } - - //If buffer is empty, then fill the buffer. - if (bCursor == limit) { - //If EOF, then return -1 - if (fCursor >= contentLength) { - return -1; - } - - long bytesRead = 0; - //reset buffer to initial state - i.e., throw away existing data - bCursor = 0; - limit = 0; - if (buffer == null) { - buffer = new byte[bufferSize]; - } - - // Enable readAhead when reading sequentially - if (-1 == fCursorAfterLastRead || fCursorAfterLastRead == fCursor || b.length >= bufferSize) { - bytesRead = readInternal(fCursor, buffer, 0, bufferSize, false); - } else { - bytesRead = readInternal(fCursor, buffer, 0, b.length, true); - } - - if (bytesRead == -1) { - return -1; - } - - limit += bytesRead; - fCursor += bytesRead; - fCursorAfterLastRead = fCursor; - } - - //If there is anything in the buffer, then return lesser of (requested bytes) and (bytes in buffer) - //(bytes returned may be less than requested) - int bytesRemaining = limit - bCursor; - int bytesToRead = Math.min(len, bytesRemaining); - System.arraycopy(buffer, bCursor, b, off, bytesToRead); - bCursor += bytesToRead; - if (statistics != null) { - statistics.incrementBytesRead(bytesToRead); - } - return bytesToRead; - } - - - private int readInternal(final long position, final byte[] b, final int offset, final int length, - final boolean bypassReadAhead) throws IOException { - if (readAheadEnabled && !bypassReadAhead) { - // try reading from read-ahead - if (offset != 0) { - throw new IllegalArgumentException("readahead buffers cannot have non-zero buffer offsets"); - } - int receivedBytes; - - // queue read-aheads - int numReadAheads = this.readAheadQueueDepth; - long nextSize; - long nextOffset = position; - while (numReadAheads > 0 && nextOffset < contentLength) { - nextSize = Math.min((long) bufferSize, contentLength - nextOffset); - ReadBufferManager.getBufferManager().queueReadAhead(this, nextOffset, (int) nextSize); - nextOffset = nextOffset + nextSize; - numReadAheads--; - } - - // try reading from buffers first - receivedBytes = ReadBufferManager.getBufferManager().getBlock(this, position, length, b); - if (receivedBytes > 0) { - return receivedBytes; - } - - // got nothing from read-ahead, do our own read now - receivedBytes = readRemote(position, b, offset, length); - return receivedBytes; - } else { - return readRemote(position, b, offset, length); - } - } - - int readRemote(long position, byte[] b, int offset, int length) throws IOException { - if (position < 0) { - throw new IllegalArgumentException("attempting to read from negative offset"); - } - if (position >= contentLength) { - return -1; // Hadoop prefers -1 to EOFException - } - if (b == null) { - throw new IllegalArgumentException("null byte array passed in to read() method"); - } - if (offset >= b.length) { - throw new IllegalArgumentException("offset greater than length of array"); - } - if (length < 0) { - throw new IllegalArgumentException("requested read length is less than zero"); - } - if (length > (b.length - offset)) { - throw new IllegalArgumentException("requested read length is more than will fit after requested offset in buffer"); - } - - long bytesRead = SeaweedRead.read(filerGrpcClient, visibleIntervalList, position, b, offset, length); - if (bytesRead > Integer.MAX_VALUE) { - throw new IOException("Unexpected Content-Length"); - } - return (int) bytesRead; - } - - /** - * Seek to given position in stream. - * - * @param n position to seek to - * @throws IOException if there is an error - * @throws EOFException if attempting to seek past end of file - */ - @Override - public synchronized void seek(long n) throws IOException { - if (closed) { - throw new IOException(FSExceptionMessages.STREAM_IS_CLOSED); - } - if (n < 0) { - throw new EOFException(FSExceptionMessages.NEGATIVE_SEEK); - } - if (n > contentLength) { - throw new EOFException(FSExceptionMessages.CANNOT_SEEK_PAST_EOF); - } - - if (n >= fCursor - limit && n <= fCursor) { // within buffer - bCursor = (int) (n - (fCursor - limit)); - return; - } - - // next read will read from here - fCursor = n; - - //invalidate buffer - limit = 0; - bCursor = 0; - } - - @Override - public synchronized long skip(long n) throws IOException { - if (closed) { - throw new IOException(FSExceptionMessages.STREAM_IS_CLOSED); - } - long currentPos = getPos(); - if (currentPos == contentLength) { - if (n > 0) { - throw new EOFException(FSExceptionMessages.CANNOT_SEEK_PAST_EOF); - } - } - long newPos = currentPos + n; - if (newPos < 0) { - newPos = 0; - n = newPos - currentPos; - } - if (newPos > contentLength) { - newPos = contentLength; - n = newPos - currentPos; - } - seek(newPos); - return n; - } - - /** - * Return the size of the remaining available bytes - * if the size is less than or equal to {@link Integer#MAX_VALUE}, - * otherwise, return {@link Integer#MAX_VALUE}. - * <p> - * This is to match the behavior of DFSInputStream.available(), - * which some clients may rely on (HBase write-ahead log reading in - * particular). - */ - @Override - public synchronized int available() throws IOException { - if (closed) { - throw new IOException( - FSExceptionMessages.STREAM_IS_CLOSED); - } - final long remaining = this.contentLength - this.getPos(); - return remaining <= Integer.MAX_VALUE - ? (int) remaining : Integer.MAX_VALUE; - } - - /** - * Returns the length of the file that this stream refers to. Note that the length returned is the length - * as of the time the Stream was opened. Specifically, if there have been subsequent appends to the file, - * they wont be reflected in the returned length. - * - * @return length of the file. - * @throws IOException if the stream is closed - */ - public long length() throws IOException { - if (closed) { - throw new IOException(FSExceptionMessages.STREAM_IS_CLOSED); - } - return contentLength; - } - - /** - * Return the current offset from the start of the file - * - * @throws IOException throws {@link IOException} if there is an error - */ - @Override - public synchronized long getPos() throws IOException { - if (closed) { - throw new IOException(FSExceptionMessages.STREAM_IS_CLOSED); - } - return fCursor - limit + bCursor; - } - - /** - * Seeks a different copy of the data. Returns true if - * found a new source, false otherwise. - * - * @throws IOException throws {@link IOException} if there is an error - */ - @Override - public boolean seekToNewSource(long l) throws IOException { - return false; - } - - @Override - public synchronized void close() throws IOException { - closed = true; - buffer = null; // de-reference the buffer so it can be GC'ed sooner - } - - /** - * Not supported by this stream. Throws {@link UnsupportedOperationException} - * - * @param readlimit ignored - */ - @Override - public synchronized void mark(int readlimit) { - throw new UnsupportedOperationException("mark()/reset() not supported on this stream"); - } - - /** - * Not supported by this stream. Throws {@link UnsupportedOperationException} - */ - @Override - public synchronized void reset() throws IOException { - throw new UnsupportedOperationException("mark()/reset() not supported on this stream"); - } - - /** - * gets whether mark and reset are supported by {@code ADLFileInputStream}. Always returns false. - * - * @return always {@code false} - */ - @Override - public boolean markSupported() { - return false; - } -} diff --git a/other/java/hdfs2/src/main/java/seaweed/hdfs/SeaweedOutputStream.java b/other/java/hdfs2/src/main/java/seaweed/hdfs/SeaweedOutputStream.java deleted file mode 100644 index 7b488a5da..000000000 --- a/other/java/hdfs2/src/main/java/seaweed/hdfs/SeaweedOutputStream.java +++ /dev/null @@ -1,283 +0,0 @@ -package seaweed.hdfs; - -// adapted from org.apache.hadoop.fs.azurebfs.services.AbfsOutputStream - -import com.google.common.base.Preconditions; -import org.apache.hadoop.fs.FSExceptionMessages; -import org.apache.hadoop.fs.Path; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; -import seaweedfs.client.FilerGrpcClient; -import seaweedfs.client.FilerProto; -import seaweedfs.client.SeaweedWrite; - -import java.io.IOException; -import java.io.InterruptedIOException; -import java.io.OutputStream; -import java.util.concurrent.*; - -import static seaweed.hdfs.SeaweedFileSystemStore.getParentDirectory; - -public class SeaweedOutputStream extends OutputStream { - - private static final Logger LOG = LoggerFactory.getLogger(SeaweedOutputStream.class); - - private final FilerGrpcClient filerGrpcClient; - private final Path path; - private final int bufferSize; - private final int maxConcurrentRequestCount; - private final ThreadPoolExecutor threadExecutor; - private final ExecutorCompletionService<Void> completionService; - private FilerProto.Entry.Builder entry; - private long position; - private boolean closed; - private boolean supportFlush = true; - private volatile IOException lastError; - private long lastFlushOffset; - private long lastTotalAppendOffset = 0; - private byte[] buffer; - private int bufferIndex; - private ConcurrentLinkedDeque<WriteOperation> writeOperations; - private String replication = "000"; - - public SeaweedOutputStream(FilerGrpcClient filerGrpcClient, final Path path, FilerProto.Entry.Builder entry, - final long position, final int bufferSize, final String replication) { - this.filerGrpcClient = filerGrpcClient; - this.replication = replication; - this.path = path; - this.position = position; - this.closed = false; - this.lastError = null; - this.lastFlushOffset = 0; - this.bufferSize = bufferSize; - this.buffer = new byte[bufferSize]; - this.bufferIndex = 0; - this.writeOperations = new ConcurrentLinkedDeque<>(); - - this.maxConcurrentRequestCount = 4 * Runtime.getRuntime().availableProcessors(); - - this.threadExecutor - = new ThreadPoolExecutor(maxConcurrentRequestCount, - maxConcurrentRequestCount, - 10L, - TimeUnit.SECONDS, - new LinkedBlockingQueue<Runnable>()); - this.completionService = new ExecutorCompletionService<>(this.threadExecutor); - - this.entry = entry; - - } - - private synchronized void flushWrittenBytesToServiceInternal(final long offset) throws IOException { - - LOG.debug("SeaweedWrite.writeMeta path: {} entry:{}", path, entry); - - try { - SeaweedWrite.writeMeta(filerGrpcClient, getParentDirectory(path), entry); - } catch (Exception ex) { - throw new IOException(ex); - } - this.lastFlushOffset = offset; - } - - @Override - public void write(final int byteVal) throws IOException { - write(new byte[]{(byte) (byteVal & 0xFF)}); - } - - @Override - public synchronized void write(final byte[] data, final int off, final int length) - throws IOException { - maybeThrowLastError(); - - Preconditions.checkArgument(data != null, "null data"); - - if (off < 0 || length < 0 || length > data.length - off) { - throw new IndexOutOfBoundsException(); - } - - int currentOffset = off; - int writableBytes = bufferSize - bufferIndex; - int numberOfBytesToWrite = length; - - while (numberOfBytesToWrite > 0) { - if (writableBytes <= numberOfBytesToWrite) { - System.arraycopy(data, currentOffset, buffer, bufferIndex, writableBytes); - bufferIndex += writableBytes; - writeCurrentBufferToService(); - currentOffset += writableBytes; - numberOfBytesToWrite = numberOfBytesToWrite - writableBytes; - } else { - System.arraycopy(data, currentOffset, buffer, bufferIndex, numberOfBytesToWrite); - bufferIndex += numberOfBytesToWrite; - numberOfBytesToWrite = 0; - } - - writableBytes = bufferSize - bufferIndex; - } - } - - /** - * Flushes this output stream and forces any buffered output bytes to be - * written out. If any data remains in the payload it is committed to the - * service. Data is queued for writing and forced out to the service - * before the call returns. - */ - @Override - public void flush() throws IOException { - if (supportFlush) { - flushInternalAsync(); - } - } - - /** - * Force all data in the output stream to be written to Azure storage. - * Wait to return until this is complete. Close the access to the stream and - * shutdown the upload thread pool. - * If the blob was created, its lease will be released. - * Any error encountered caught in threads and stored will be rethrown here - * after cleanup. - */ - @Override - public synchronized void close() throws IOException { - if (closed) { - return; - } - - LOG.debug("close path: {}", path); - try { - flushInternal(); - threadExecutor.shutdown(); - } finally { - lastError = new IOException(FSExceptionMessages.STREAM_IS_CLOSED); - buffer = null; - bufferIndex = 0; - closed = true; - writeOperations.clear(); - if (!threadExecutor.isShutdown()) { - threadExecutor.shutdownNow(); - } - } - } - - private synchronized void writeCurrentBufferToService() throws IOException { - if (bufferIndex == 0) { - return; - } - - final byte[] bytes = buffer; - final int bytesLength = bufferIndex; - - buffer = new byte[bufferSize]; - bufferIndex = 0; - final long offset = position; - position += bytesLength; - - if (threadExecutor.getQueue().size() >= maxConcurrentRequestCount * 2) { - waitForTaskToComplete(); - } - - final Future<Void> job = completionService.submit(new Callable<Void>() { - @Override - public Void call() throws Exception { - // originally: client.append(path, offset, bytes, 0, bytesLength); - SeaweedWrite.writeData(entry, replication, filerGrpcClient, offset, bytes, 0, bytesLength); - return null; - } - }); - - writeOperations.add(new WriteOperation(job, offset, bytesLength)); - - // Try to shrink the queue - shrinkWriteOperationQueue(); - } - - private void waitForTaskToComplete() throws IOException { - boolean completed; - for (completed = false; completionService.poll() != null; completed = true) { - // keep polling until there is no data - } - - if (!completed) { - try { - completionService.take(); - } catch (InterruptedException e) { - lastError = (IOException) new InterruptedIOException(e.toString()).initCause(e); - throw lastError; - } - } - } - - private void maybeThrowLastError() throws IOException { - if (lastError != null) { - throw lastError; - } - } - - /** - * Try to remove the completed write operations from the beginning of write - * operation FIFO queue. - */ - private synchronized void shrinkWriteOperationQueue() throws IOException { - try { - while (writeOperations.peek() != null && writeOperations.peek().task.isDone()) { - writeOperations.peek().task.get(); - lastTotalAppendOffset += writeOperations.peek().length; - writeOperations.remove(); - } - } catch (Exception e) { - lastError = new IOException(e); - throw lastError; - } - } - - private synchronized void flushInternal() throws IOException { - maybeThrowLastError(); - writeCurrentBufferToService(); - flushWrittenBytesToService(); - } - - private synchronized void flushInternalAsync() throws IOException { - maybeThrowLastError(); - writeCurrentBufferToService(); - flushWrittenBytesToServiceAsync(); - } - - private synchronized void flushWrittenBytesToService() throws IOException { - for (WriteOperation writeOperation : writeOperations) { - try { - writeOperation.task.get(); - } catch (Exception ex) { - lastError = new IOException(ex); - throw lastError; - } - } - LOG.debug("flushWrittenBytesToService: {} position:{}", path, position); - flushWrittenBytesToServiceInternal(position); - } - - private synchronized void flushWrittenBytesToServiceAsync() throws IOException { - shrinkWriteOperationQueue(); - - if (this.lastTotalAppendOffset > this.lastFlushOffset) { - this.flushWrittenBytesToServiceInternal(this.lastTotalAppendOffset); - } - } - - private static class WriteOperation { - private final Future<Void> task; - private final long startOffset; - private final long length; - - WriteOperation(final Future<Void> task, final long startOffset, final long length) { - Preconditions.checkNotNull(task, "task"); - Preconditions.checkArgument(startOffset >= 0, "startOffset"); - Preconditions.checkArgument(length >= 0, "length"); - - this.task = task; - this.startOffset = startOffset; - this.length = length; - } - } - -} |
