diff options
13 files changed, 266 insertions, 84 deletions
diff --git a/other/java/client/pom_debug.xml b/other/java/client/pom_debug.xml index acdf621a5..078778b6c 100644 --- a/other/java/client/pom_debug.xml +++ b/other/java/client/pom_debug.xml @@ -68,6 +68,11 @@ <version>4.13.1</version> <scope>test</scope> </dependency> + <dependency> + <groupId>javax.annotation</groupId> + <artifactId>javax.annotation-api</artifactId> + <version>1.3.2</version> + </dependency> </dependencies> <build> diff --git a/other/java/client/src/main/java/seaweedfs/client/SeaweedRead.java b/other/java/client/src/main/java/seaweedfs/client/SeaweedRead.java index 2b530d2dd..c45987bed 100644 --- a/other/java/client/src/main/java/seaweedfs/client/SeaweedRead.java +++ b/other/java/client/src/main/java/seaweedfs/client/SeaweedRead.java @@ -12,6 +12,7 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.io.IOException; +import java.nio.ByteBuffer; import java.util.*; public class SeaweedRead { @@ -23,10 +24,9 @@ public class SeaweedRead { // returns bytesRead public static long read(FilerGrpcClient filerGrpcClient, List<VisibleInterval> visibleIntervals, - final long position, final byte[] buffer, final int bufferOffset, - final int bufferLength, final long fileSize) throws IOException { + final long position, final ByteBuffer buf, final long fileSize) throws IOException { - List<ChunkView> chunkViews = viewFromVisibles(visibleIntervals, position, bufferLength); + List<ChunkView> chunkViews = viewFromVisibles(visibleIntervals, position, buf.remaining()); Map<String, FilerProto.Locations> knownLocations = new HashMap<>(); @@ -59,6 +59,7 @@ public class SeaweedRead { if (startOffset < chunkView.logicOffset) { long gap = chunkView.logicOffset - startOffset; LOG.debug("zero [{},{})", startOffset, startOffset + gap); + buf.position(buf.position()+ (int)gap); readCount += gap; startOffset += gap; } @@ -70,7 +71,7 @@ public class SeaweedRead { return 0; } - int len = readChunkView(startOffset, buffer, bufferOffset + readCount, chunkView, locations); + int len = readChunkView(startOffset, buf, chunkView, locations); LOG.debug("read [{},{}) {} size {}", startOffset, startOffset + len, chunkView.fileId, chunkView.size); @@ -79,11 +80,12 @@ public class SeaweedRead { } - long limit = Math.min(bufferOffset + bufferLength, fileSize); + long limit = Math.min(buf.limit(), fileSize); if (startOffset < limit) { long gap = limit - startOffset; LOG.debug("zero2 [{},{})", startOffset, startOffset + gap); + buf.position(buf.position()+ (int)gap); readCount += gap; startOffset += gap; } @@ -91,7 +93,7 @@ public class SeaweedRead { return readCount; } - private static int readChunkView(long startOffset, byte[] buffer, long bufOffset, ChunkView chunkView, FilerProto.Locations locations) throws IOException { + private static int readChunkView(long startOffset, ByteBuffer buf, ChunkView chunkView, FilerProto.Locations locations) throws IOException { byte[] chunkData = chunkCache.getChunk(chunkView.fileId); @@ -101,9 +103,9 @@ public class SeaweedRead { } int len = (int) chunkView.size; - LOG.debug("readChunkView fid:{} chunkData.length:{} chunkView.offset:{} chunkView[{};{}) buf[{},{})/{} startOffset:{}", - chunkView.fileId, chunkData.length, chunkView.offset, chunkView.logicOffset, chunkView.logicOffset + chunkView.size, bufOffset, bufOffset + len, buffer.length, startOffset); - System.arraycopy(chunkData, (int) (startOffset - chunkView.logicOffset + chunkView.offset), buffer, (int) bufOffset, len); + LOG.debug("readChunkView fid:{} chunkData.length:{} chunkView.offset:{} chunkView[{};{}) startOffset:{}", + chunkView.fileId, chunkData.length, chunkView.offset, chunkView.logicOffset, chunkView.logicOffset + chunkView.size, startOffset); + buf.put(chunkData, (int) (startOffset - chunkView.logicOffset + chunkView.offset), len); return len; } diff --git a/other/java/hdfs2/src/main/java/seaweed/hdfs/BufferedByteBufferReadableInputStream.java b/other/java/hdfs2/src/main/java/seaweed/hdfs/BufferedByteBufferReadableInputStream.java new file mode 100644 index 000000000..3d0b68a52 --- /dev/null +++ b/other/java/hdfs2/src/main/java/seaweed/hdfs/BufferedByteBufferReadableInputStream.java @@ -0,0 +1,25 @@ +package seaweed.hdfs; + +import org.apache.hadoop.fs.*; + +import java.io.IOException; +import java.nio.ByteBuffer; + +public class BufferedByteBufferReadableInputStream extends BufferedFSInputStream implements ByteBufferReadable { + + public BufferedByteBufferReadableInputStream(FSInputStream in, int size) { + super(in, size); + if (!(in instanceof Seekable) || !(in instanceof PositionedReadable)) { + throw new IllegalArgumentException("In is not an instance of Seekable or PositionedReadable"); + } + } + + @Override + public int read(ByteBuffer buf) throws IOException { + if (this.in instanceof ByteBufferReadable) { + return ((ByteBufferReadable)this.in).read(buf); + } else { + throw new UnsupportedOperationException("Byte-buffer read unsupported by input stream"); + } + } +} diff --git a/other/java/hdfs2/src/main/java/seaweed/hdfs/SeaweedFileSystem.java b/other/java/hdfs2/src/main/java/seaweed/hdfs/SeaweedFileSystem.java index fb1f4c53b..84f11e846 100644 --- a/other/java/hdfs2/src/main/java/seaweed/hdfs/SeaweedFileSystem.java +++ b/other/java/hdfs2/src/main/java/seaweed/hdfs/SeaweedFileSystem.java @@ -82,7 +82,7 @@ public class SeaweedFileSystem extends FileSystem { try { int seaweedBufferSize = this.getConf().getInt(FS_SEAWEED_BUFFER_SIZE, FS_SEAWEED_DEFAULT_BUFFER_SIZE); FSInputStream inputStream = seaweedFileSystemStore.openFileForRead(path, statistics); - return new FSDataInputStream(new BufferedFSInputStream(inputStream, 4 * seaweedBufferSize)); + return new FSDataInputStream(new BufferedByteBufferReadableInputStream(inputStream, 4 * seaweedBufferSize)); } catch (Exception ex) { LOG.warn("open path: {} bufferSize:{}", path, bufferSize, ex); return null; diff --git a/other/java/hdfs2/src/main/java/seaweed/hdfs/SeaweedInputStream.java b/other/java/hdfs2/src/main/java/seaweed/hdfs/SeaweedInputStream.java index 8e406206d..690366849 100644 --- a/other/java/hdfs2/src/main/java/seaweed/hdfs/SeaweedInputStream.java +++ b/other/java/hdfs2/src/main/java/seaweed/hdfs/SeaweedInputStream.java @@ -2,6 +2,7 @@ package seaweed.hdfs; // based on org.apache.hadoop.fs.azurebfs.services.AbfsInputStream +import org.apache.hadoop.fs.ByteBufferReadable; import org.apache.hadoop.fs.FSExceptionMessages; import org.apache.hadoop.fs.FSInputStream; import org.apache.hadoop.fs.FileSystem.Statistics; @@ -13,9 +14,10 @@ import seaweedfs.client.SeaweedRead; import java.io.EOFException; import java.io.IOException; +import java.nio.ByteBuffer; import java.util.List; -public class SeaweedInputStream extends FSInputStream { +public class SeaweedInputStream extends FSInputStream implements ByteBufferReadable { private static final Logger LOG = LoggerFactory.getLogger(SeaweedInputStream.class); @@ -63,14 +65,8 @@ public class SeaweedInputStream extends FSInputStream { } @Override - public synchronized int read(final byte[] b, final int off, final int len) throws IOException { + public int read(final byte[] b, final int off, final int len) throws IOException { - if (position < 0) { - throw new IllegalArgumentException("attempting to read from negative offset"); - } - if (position >= contentLength) { - return -1; // Hadoop prefers -1 to EOFException - } if (b == null) { throw new IllegalArgumentException("null byte array passed in to read() method"); } @@ -84,11 +80,29 @@ public class SeaweedInputStream extends FSInputStream { throw new IllegalArgumentException("requested read length is more than will fit after requested offset in buffer"); } + ByteBuffer buf = ByteBuffer.wrap(b, off, len); + return read(buf); + + } + + // implement ByteBufferReadable + @Override + public synchronized int read(ByteBuffer buf) throws IOException { + + if (position < 0) { + throw new IllegalArgumentException("attempting to read from negative offset"); + } + if (position >= contentLength) { + return -1; // Hadoop prefers -1 to EOFException + } + long bytesRead = 0; - if (position+len < entry.getContent().size()) { - entry.getContent().copyTo(b, (int) position, (int) off, len); + int len = buf.remaining(); + int start = (int) this.position; + if (start+len <= entry.getContent().size()) { + entry.getContent().substring(start, start+len).copyTo(buf); } else { - bytesRead = SeaweedRead.read(this.filerGrpcClient, this.visibleIntervalList, this.position, b, off, len, SeaweedRead.fileSize(entry)); + bytesRead = SeaweedRead.read(this.filerGrpcClient, this.visibleIntervalList, this.position, buf, SeaweedRead.fileSize(entry)); } if (bytesRead > Integer.MAX_VALUE) { @@ -103,7 +117,6 @@ public class SeaweedInputStream extends FSInputStream { } return (int) bytesRead; - } /** diff --git a/other/java/hdfs3/src/main/java/seaweed/hdfs/BufferedByteBufferReadableInputStream.java b/other/java/hdfs3/src/main/java/seaweed/hdfs/BufferedByteBufferReadableInputStream.java new file mode 100644 index 000000000..3d0b68a52 --- /dev/null +++ b/other/java/hdfs3/src/main/java/seaweed/hdfs/BufferedByteBufferReadableInputStream.java @@ -0,0 +1,25 @@ +package seaweed.hdfs; + +import org.apache.hadoop.fs.*; + +import java.io.IOException; +import java.nio.ByteBuffer; + +public class BufferedByteBufferReadableInputStream extends BufferedFSInputStream implements ByteBufferReadable { + + public BufferedByteBufferReadableInputStream(FSInputStream in, int size) { + super(in, size); + if (!(in instanceof Seekable) || !(in instanceof PositionedReadable)) { + throw new IllegalArgumentException("In is not an instance of Seekable or PositionedReadable"); + } + } + + @Override + public int read(ByteBuffer buf) throws IOException { + if (this.in instanceof ByteBufferReadable) { + return ((ByteBufferReadable)this.in).read(buf); + } else { + throw new UnsupportedOperationException("Byte-buffer read unsupported by input stream"); + } + } +} diff --git a/other/java/hdfs3/src/main/java/seaweed/hdfs/SeaweedFileSystem.java b/other/java/hdfs3/src/main/java/seaweed/hdfs/SeaweedFileSystem.java index fb1f4c53b..84f11e846 100644 --- a/other/java/hdfs3/src/main/java/seaweed/hdfs/SeaweedFileSystem.java +++ b/other/java/hdfs3/src/main/java/seaweed/hdfs/SeaweedFileSystem.java @@ -82,7 +82,7 @@ public class SeaweedFileSystem extends FileSystem { try { int seaweedBufferSize = this.getConf().getInt(FS_SEAWEED_BUFFER_SIZE, FS_SEAWEED_DEFAULT_BUFFER_SIZE); FSInputStream inputStream = seaweedFileSystemStore.openFileForRead(path, statistics); - return new FSDataInputStream(new BufferedFSInputStream(inputStream, 4 * seaweedBufferSize)); + return new FSDataInputStream(new BufferedByteBufferReadableInputStream(inputStream, 4 * seaweedBufferSize)); } catch (Exception ex) { LOG.warn("open path: {} bufferSize:{}", path, bufferSize, ex); return null; diff --git a/other/java/hdfs3/src/main/java/seaweed/hdfs/SeaweedFileSystemStore.java b/other/java/hdfs3/src/main/java/seaweed/hdfs/SeaweedFileSystemStore.java index 2ef1a7468..14b32528e 100644 --- a/other/java/hdfs3/src/main/java/seaweed/hdfs/SeaweedFileSystemStore.java +++ b/other/java/hdfs3/src/main/java/seaweed/hdfs/SeaweedFileSystemStore.java @@ -288,4 +288,4 @@ public class SeaweedFileSystemStore { } -}
\ No newline at end of file +} diff --git a/other/java/hdfs3/src/main/java/seaweed/hdfs/SeaweedInputStream.java b/other/java/hdfs3/src/main/java/seaweed/hdfs/SeaweedInputStream.java index 8e406206d..690366849 100644 --- a/other/java/hdfs3/src/main/java/seaweed/hdfs/SeaweedInputStream.java +++ b/other/java/hdfs3/src/main/java/seaweed/hdfs/SeaweedInputStream.java @@ -2,6 +2,7 @@ package seaweed.hdfs; // based on org.apache.hadoop.fs.azurebfs.services.AbfsInputStream +import org.apache.hadoop.fs.ByteBufferReadable; import org.apache.hadoop.fs.FSExceptionMessages; import org.apache.hadoop.fs.FSInputStream; import org.apache.hadoop.fs.FileSystem.Statistics; @@ -13,9 +14,10 @@ import seaweedfs.client.SeaweedRead; import java.io.EOFException; import java.io.IOException; +import java.nio.ByteBuffer; import java.util.List; -public class SeaweedInputStream extends FSInputStream { +public class SeaweedInputStream extends FSInputStream implements ByteBufferReadable { private static final Logger LOG = LoggerFactory.getLogger(SeaweedInputStream.class); @@ -63,14 +65,8 @@ public class SeaweedInputStream extends FSInputStream { } @Override - public synchronized int read(final byte[] b, final int off, final int len) throws IOException { + public int read(final byte[] b, final int off, final int len) throws IOException { - if (position < 0) { - throw new IllegalArgumentException("attempting to read from negative offset"); - } - if (position >= contentLength) { - return -1; // Hadoop prefers -1 to EOFException - } if (b == null) { throw new IllegalArgumentException("null byte array passed in to read() method"); } @@ -84,11 +80,29 @@ public class SeaweedInputStream extends FSInputStream { throw new IllegalArgumentException("requested read length is more than will fit after requested offset in buffer"); } + ByteBuffer buf = ByteBuffer.wrap(b, off, len); + return read(buf); + + } + + // implement ByteBufferReadable + @Override + public synchronized int read(ByteBuffer buf) throws IOException { + + if (position < 0) { + throw new IllegalArgumentException("attempting to read from negative offset"); + } + if (position >= contentLength) { + return -1; // Hadoop prefers -1 to EOFException + } + long bytesRead = 0; - if (position+len < entry.getContent().size()) { - entry.getContent().copyTo(b, (int) position, (int) off, len); + int len = buf.remaining(); + int start = (int) this.position; + if (start+len <= entry.getContent().size()) { + entry.getContent().substring(start, start+len).copyTo(buf); } else { - bytesRead = SeaweedRead.read(this.filerGrpcClient, this.visibleIntervalList, this.position, b, off, len, SeaweedRead.fileSize(entry)); + bytesRead = SeaweedRead.read(this.filerGrpcClient, this.visibleIntervalList, this.position, buf, SeaweedRead.fileSize(entry)); } if (bytesRead > Integer.MAX_VALUE) { @@ -103,7 +117,6 @@ public class SeaweedInputStream extends FSInputStream { } return (int) bytesRead; - } /** diff --git a/weed/server/webdav_server.go b/weed/server/webdav_server.go index f4b6a6f28..2b238e534 100644 --- a/weed/server/webdav_server.go +++ b/weed/server/webdav_server.go @@ -1,6 +1,7 @@ package weed_server import ( + "bytes" "context" "fmt" "io" @@ -10,6 +11,7 @@ import ( "strings" "time" + "github.com/chrislusf/seaweedfs/weed/util/buffered_writer" "golang.org/x/net/webdav" "google.golang.org/grpc" @@ -96,6 +98,9 @@ type WebDavFile struct { entry *filer_pb.Entry entryViewCache []filer.VisibleInterval reader io.ReaderAt + bufWriter *buffered_writer.BufferedWriteCloser + collection string + replication string } func NewWebDavFileSystem(option *WebDavOption) (webdav.FileSystem, error) { @@ -232,6 +237,7 @@ func (fs *WebDavFileSystem) OpenFile(ctx context.Context, fullFilePath string, f fs: fs, name: fullFilePath, isDirectory: false, + bufWriter: buffered_writer.NewBufferedWriteCloser(4 * 1024 * 1024), }, nil } @@ -247,6 +253,7 @@ func (fs *WebDavFileSystem) OpenFile(ctx context.Context, fullFilePath string, f fs: fs, name: fullFilePath, isDirectory: false, + bufWriter: buffered_writer.NewBufferedWriteCloser(4 * 1024 * 1024), }, nil } @@ -358,36 +365,20 @@ func (fs *WebDavFileSystem) Stat(ctx context.Context, name string) (os.FileInfo, return fs.stat(ctx, name) } -func (f *WebDavFile) Write(buf []byte) (int, error) { - - glog.V(2).Infof("WebDavFileSystem.Write %v", f.name) - - dir, _ := util.FullPath(f.name).DirAndName() - - var err error - ctx := context.Background() - if f.entry == nil { - f.entry, err = filer_pb.GetEntry(f.fs, util.FullPath(f.name)) - } - - if f.entry == nil { - return 0, err - } - if err != nil { - return 0, err - } +func (f *WebDavFile) saveDataAsChunk(reader io.Reader, name string, offset int64) (chunk *filer_pb.FileChunk, collection, replication string, err error) { var fileId, host string var auth security.EncodedJwt - var collection, replication string - if err = f.fs.WithFilerClient(func(client filer_pb.SeaweedFilerClient) error { + if flushErr := f.fs.WithFilerClient(func(client filer_pb.SeaweedFilerClient) error { + + ctx := context.Background() request := &filer_pb.AssignVolumeRequest{ Count: 1, Replication: "", Collection: f.fs.option.Collection, - Path: f.name, + Path: name, } resp, err := client.AssignVolume(ctx, request) @@ -400,63 +391,113 @@ func (f *WebDavFile) Write(buf []byte) (int, error) { } fileId, host, auth = resp.FileId, resp.Url, security.EncodedJwt(resp.Auth) - collection, replication = resp.Collection, resp.Replication + f.collection, f.replication = resp.Collection, resp.Replication return nil - }); err != nil { - return 0, fmt.Errorf("filerGrpcAddress assign volume: %v", err) + }); flushErr != nil { + return nil, f.collection, f.replication, fmt.Errorf("filerGrpcAddress assign volume: %v", flushErr) } fileUrl := fmt.Sprintf("http://%s/%s", host, fileId) - uploadResult, err := operation.UploadData(fileUrl, f.name, f.fs.option.Cipher, buf, false, "", nil, auth) - if err != nil { - glog.V(0).Infof("upload data %v to %s: %v", f.name, fileUrl, err) - return 0, fmt.Errorf("upload data: %v", err) + uploadResult, flushErr, _ := operation.Upload(fileUrl, f.name, f.fs.option.Cipher, reader, false, "", nil, auth) + if flushErr != nil { + glog.V(0).Infof("upload data %v to %s: %v", f.name, fileUrl, flushErr) + return nil, f.collection, f.replication, fmt.Errorf("upload data: %v", flushErr) } if uploadResult.Error != "" { - glog.V(0).Infof("upload failure %v to %s: %v", f.name, fileUrl, err) - return 0, fmt.Errorf("upload result: %v", uploadResult.Error) + glog.V(0).Infof("upload failure %v to %s: %v", f.name, fileUrl, flushErr) + return nil, f.collection, f.replication, fmt.Errorf("upload result: %v", uploadResult.Error) + } + return uploadResult.ToPbFileChunk(fileId, offset), f.collection, f.replication, nil +} + +func (f *WebDavFile) Write(buf []byte) (int, error) { + + glog.V(2).Infof("WebDavFileSystem.Write %v", f.name) + + dir, _ := util.FullPath(f.name).DirAndName() + + var getErr error + ctx := context.Background() + if f.entry == nil { + f.entry, getErr = filer_pb.GetEntry(f.fs, util.FullPath(f.name)) + } + + if f.entry == nil { + return 0, getErr + } + if getErr != nil { + return 0, getErr } - f.entry.Content = nil - f.entry.Chunks = append(f.entry.Chunks, uploadResult.ToPbFileChunk(fileId, f.off)) + if f.bufWriter.FlushFunc == nil { + f.bufWriter.FlushFunc = func(data []byte, offset int64) (flushErr error) { - err = f.fs.WithFilerClient(func(client filer_pb.SeaweedFilerClient) error { - f.entry.Attributes.Mtime = time.Now().Unix() - f.entry.Attributes.Collection = collection - f.entry.Attributes.Replication = replication + var chunk *filer_pb.FileChunk + chunk, f.collection, f.replication, flushErr = f.saveDataAsChunk(bytes.NewReader(data), f.name, offset) - request := &filer_pb.UpdateEntryRequest{ - Directory: dir, - Entry: f.entry, - Signatures: []int32{f.fs.signature}, + if flushErr != nil { + return fmt.Errorf("%s upload result: %v", f.name, flushErr) + } + + f.entry.Content = nil + f.entry.Chunks = append(f.entry.Chunks, chunk) + + return flushErr } + f.bufWriter.CloseFunc = func() error { + + manifestedChunks, manifestErr := filer.MaybeManifestize(f.saveDataAsChunk, f.entry.Chunks) + if manifestErr != nil { + // not good, but should be ok + glog.V(0).Infof("file %s close MaybeManifestize: %v", f.name, manifestErr) + } else { + f.entry.Chunks = manifestedChunks + } + + flushErr := f.fs.WithFilerClient(func(client filer_pb.SeaweedFilerClient) error { + f.entry.Attributes.Mtime = time.Now().Unix() + f.entry.Attributes.Collection = f.collection + f.entry.Attributes.Replication = f.replication + + request := &filer_pb.UpdateEntryRequest{ + Directory: dir, + Entry: f.entry, + Signatures: []int32{f.fs.signature}, + } - if _, err := client.UpdateEntry(ctx, request); err != nil { - return fmt.Errorf("update %s: %v", f.name, err) + if _, err := client.UpdateEntry(ctx, request); err != nil { + return fmt.Errorf("update %s: %v", f.name, err) + } + + return nil + }) + return flushErr } + } - return nil - }) + written, err := f.bufWriter.Write(buf) if err == nil { glog.V(3).Infof("WebDavFileSystem.Write %v: written [%d,%d)", f.name, f.off, f.off+int64(len(buf))) - f.off += int64(len(buf)) + f.off += int64(written) } - return len(buf), err + return written, err } func (f *WebDavFile) Close() error { glog.V(2).Infof("WebDavFileSystem.Close %v", f.name) + err := f.bufWriter.Close() + if f.entry != nil { f.entry = nil f.entryViewCache = nil } - return nil + return err } func (f *WebDavFile) Read(p []byte) (readSize int, err error) { diff --git a/weed/storage/volume_vacuum.go b/weed/storage/volume_vacuum.go index f7b16b7db..5884eca87 100644 --- a/weed/storage/volume_vacuum.go +++ b/weed/storage/volume_vacuum.go @@ -180,9 +180,15 @@ func (v *Volume) makeupDiff(newDatFileName, newIdxFileName, oldDatFileName, oldI var indexSize int64 oldIdxFile, err := os.Open(oldIdxFileName) + if err != nil { + return fmt.Errorf("makeupDiff open %s failed: %v", oldIdxFileName, err) + } defer oldIdxFile.Close() oldDatFile, err := os.Open(oldDatFileName) + if err != nil { + return fmt.Errorf("makeupDiff open %s failed: %v", oldDatFileName, err) + } oldDatBackend := backend.NewDiskFile(oldDatFile) defer oldDatBackend.Close() diff --git a/weed/topology/topology_vacuum.go b/weed/topology/topology_vacuum.go index 04c26bc4d..89eefefc7 100644 --- a/weed/topology/topology_vacuum.go +++ b/weed/topology/topology_vacuum.go @@ -108,7 +108,7 @@ func batchVacuumVolumeCommit(grpcDialOption grpc.DialOption, vl *VolumeLayout, v resp, err := volumeServerClient.VacuumVolumeCommit(context.Background(), &volume_server_pb.VacuumVolumeCommitRequest{ VolumeId: uint32(vid), }) - if resp.IsReadOnly { + if resp != nil && resp.IsReadOnly { isReadOnly = true } return err diff --git a/weed/util/buffered_writer/buffered_writer.go b/weed/util/buffered_writer/buffered_writer.go new file mode 100644 index 000000000..73d9f4995 --- /dev/null +++ b/weed/util/buffered_writer/buffered_writer.go @@ -0,0 +1,52 @@ +package buffered_writer + +import ( + "bytes" + "io" +) + +var _ = io.WriteCloser(&BufferedWriteCloser{}) + +type BufferedWriteCloser struct { + buffer bytes.Buffer + bufferLimit int + position int64 + nextFlushOffset int64 + FlushFunc func([]byte, int64) error + CloseFunc func() error +} + +func NewBufferedWriteCloser(bufferLimit int) *BufferedWriteCloser { + return &BufferedWriteCloser{ + bufferLimit: bufferLimit, + } +} + +func (b *BufferedWriteCloser) Write(p []byte) (n int, err error) { + + if b.buffer.Len()+len(p) >= b.bufferLimit { + if err := b.FlushFunc(b.buffer.Bytes(), b.nextFlushOffset); err != nil { + return 0, err + } + b.nextFlushOffset += int64(b.buffer.Len()) + b.buffer.Reset() + } + + return b.buffer.Write(p) + +} + +func (b *BufferedWriteCloser) Close() error { + if b.buffer.Len() > 0 { + if err := b.FlushFunc(b.buffer.Bytes(), b.nextFlushOffset); err != nil { + return err + } + } + if b.CloseFunc != nil { + if err := b.CloseFunc(); err != nil { + return err + } + } + + return nil +} |
