aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--other/java/client/pom_debug.xml5
-rw-r--r--other/java/client/src/main/java/seaweedfs/client/SeaweedRead.java20
-rw-r--r--other/java/hdfs2/src/main/java/seaweed/hdfs/BufferedByteBufferReadableInputStream.java25
-rw-r--r--other/java/hdfs2/src/main/java/seaweed/hdfs/SeaweedFileSystem.java2
-rw-r--r--other/java/hdfs2/src/main/java/seaweed/hdfs/SeaweedInputStream.java37
-rw-r--r--other/java/hdfs3/src/main/java/seaweed/hdfs/BufferedByteBufferReadableInputStream.java25
-rw-r--r--other/java/hdfs3/src/main/java/seaweed/hdfs/SeaweedFileSystem.java2
-rw-r--r--other/java/hdfs3/src/main/java/seaweed/hdfs/SeaweedFileSystemStore.java2
-rw-r--r--other/java/hdfs3/src/main/java/seaweed/hdfs/SeaweedInputStream.java37
-rw-r--r--weed/server/webdav_server.go135
-rw-r--r--weed/storage/volume_vacuum.go6
-rw-r--r--weed/topology/topology_vacuum.go2
-rw-r--r--weed/util/buffered_writer/buffered_writer.go52
13 files changed, 266 insertions, 84 deletions
diff --git a/other/java/client/pom_debug.xml b/other/java/client/pom_debug.xml
index acdf621a5..078778b6c 100644
--- a/other/java/client/pom_debug.xml
+++ b/other/java/client/pom_debug.xml
@@ -68,6 +68,11 @@
<version>4.13.1</version>
<scope>test</scope>
</dependency>
+ <dependency>
+ <groupId>javax.annotation</groupId>
+ <artifactId>javax.annotation-api</artifactId>
+ <version>1.3.2</version>
+ </dependency>
</dependencies>
<build>
diff --git a/other/java/client/src/main/java/seaweedfs/client/SeaweedRead.java b/other/java/client/src/main/java/seaweedfs/client/SeaweedRead.java
index 2b530d2dd..c45987bed 100644
--- a/other/java/client/src/main/java/seaweedfs/client/SeaweedRead.java
+++ b/other/java/client/src/main/java/seaweedfs/client/SeaweedRead.java
@@ -12,6 +12,7 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
+import java.nio.ByteBuffer;
import java.util.*;
public class SeaweedRead {
@@ -23,10 +24,9 @@ public class SeaweedRead {
// returns bytesRead
public static long read(FilerGrpcClient filerGrpcClient, List<VisibleInterval> visibleIntervals,
- final long position, final byte[] buffer, final int bufferOffset,
- final int bufferLength, final long fileSize) throws IOException {
+ final long position, final ByteBuffer buf, final long fileSize) throws IOException {
- List<ChunkView> chunkViews = viewFromVisibles(visibleIntervals, position, bufferLength);
+ List<ChunkView> chunkViews = viewFromVisibles(visibleIntervals, position, buf.remaining());
Map<String, FilerProto.Locations> knownLocations = new HashMap<>();
@@ -59,6 +59,7 @@ public class SeaweedRead {
if (startOffset < chunkView.logicOffset) {
long gap = chunkView.logicOffset - startOffset;
LOG.debug("zero [{},{})", startOffset, startOffset + gap);
+ buf.position(buf.position()+ (int)gap);
readCount += gap;
startOffset += gap;
}
@@ -70,7 +71,7 @@ public class SeaweedRead {
return 0;
}
- int len = readChunkView(startOffset, buffer, bufferOffset + readCount, chunkView, locations);
+ int len = readChunkView(startOffset, buf, chunkView, locations);
LOG.debug("read [{},{}) {} size {}", startOffset, startOffset + len, chunkView.fileId, chunkView.size);
@@ -79,11 +80,12 @@ public class SeaweedRead {
}
- long limit = Math.min(bufferOffset + bufferLength, fileSize);
+ long limit = Math.min(buf.limit(), fileSize);
if (startOffset < limit) {
long gap = limit - startOffset;
LOG.debug("zero2 [{},{})", startOffset, startOffset + gap);
+ buf.position(buf.position()+ (int)gap);
readCount += gap;
startOffset += gap;
}
@@ -91,7 +93,7 @@ public class SeaweedRead {
return readCount;
}
- private static int readChunkView(long startOffset, byte[] buffer, long bufOffset, ChunkView chunkView, FilerProto.Locations locations) throws IOException {
+ private static int readChunkView(long startOffset, ByteBuffer buf, ChunkView chunkView, FilerProto.Locations locations) throws IOException {
byte[] chunkData = chunkCache.getChunk(chunkView.fileId);
@@ -101,9 +103,9 @@ public class SeaweedRead {
}
int len = (int) chunkView.size;
- LOG.debug("readChunkView fid:{} chunkData.length:{} chunkView.offset:{} chunkView[{};{}) buf[{},{})/{} startOffset:{}",
- chunkView.fileId, chunkData.length, chunkView.offset, chunkView.logicOffset, chunkView.logicOffset + chunkView.size, bufOffset, bufOffset + len, buffer.length, startOffset);
- System.arraycopy(chunkData, (int) (startOffset - chunkView.logicOffset + chunkView.offset), buffer, (int) bufOffset, len);
+ LOG.debug("readChunkView fid:{} chunkData.length:{} chunkView.offset:{} chunkView[{};{}) startOffset:{}",
+ chunkView.fileId, chunkData.length, chunkView.offset, chunkView.logicOffset, chunkView.logicOffset + chunkView.size, startOffset);
+ buf.put(chunkData, (int) (startOffset - chunkView.logicOffset + chunkView.offset), len);
return len;
}
diff --git a/other/java/hdfs2/src/main/java/seaweed/hdfs/BufferedByteBufferReadableInputStream.java b/other/java/hdfs2/src/main/java/seaweed/hdfs/BufferedByteBufferReadableInputStream.java
new file mode 100644
index 000000000..3d0b68a52
--- /dev/null
+++ b/other/java/hdfs2/src/main/java/seaweed/hdfs/BufferedByteBufferReadableInputStream.java
@@ -0,0 +1,25 @@
+package seaweed.hdfs;
+
+import org.apache.hadoop.fs.*;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+
+public class BufferedByteBufferReadableInputStream extends BufferedFSInputStream implements ByteBufferReadable {
+
+ public BufferedByteBufferReadableInputStream(FSInputStream in, int size) {
+ super(in, size);
+ if (!(in instanceof Seekable) || !(in instanceof PositionedReadable)) {
+ throw new IllegalArgumentException("In is not an instance of Seekable or PositionedReadable");
+ }
+ }
+
+ @Override
+ public int read(ByteBuffer buf) throws IOException {
+ if (this.in instanceof ByteBufferReadable) {
+ return ((ByteBufferReadable)this.in).read(buf);
+ } else {
+ throw new UnsupportedOperationException("Byte-buffer read unsupported by input stream");
+ }
+ }
+}
diff --git a/other/java/hdfs2/src/main/java/seaweed/hdfs/SeaweedFileSystem.java b/other/java/hdfs2/src/main/java/seaweed/hdfs/SeaweedFileSystem.java
index fb1f4c53b..84f11e846 100644
--- a/other/java/hdfs2/src/main/java/seaweed/hdfs/SeaweedFileSystem.java
+++ b/other/java/hdfs2/src/main/java/seaweed/hdfs/SeaweedFileSystem.java
@@ -82,7 +82,7 @@ public class SeaweedFileSystem extends FileSystem {
try {
int seaweedBufferSize = this.getConf().getInt(FS_SEAWEED_BUFFER_SIZE, FS_SEAWEED_DEFAULT_BUFFER_SIZE);
FSInputStream inputStream = seaweedFileSystemStore.openFileForRead(path, statistics);
- return new FSDataInputStream(new BufferedFSInputStream(inputStream, 4 * seaweedBufferSize));
+ return new FSDataInputStream(new BufferedByteBufferReadableInputStream(inputStream, 4 * seaweedBufferSize));
} catch (Exception ex) {
LOG.warn("open path: {} bufferSize:{}", path, bufferSize, ex);
return null;
diff --git a/other/java/hdfs2/src/main/java/seaweed/hdfs/SeaweedInputStream.java b/other/java/hdfs2/src/main/java/seaweed/hdfs/SeaweedInputStream.java
index 8e406206d..690366849 100644
--- a/other/java/hdfs2/src/main/java/seaweed/hdfs/SeaweedInputStream.java
+++ b/other/java/hdfs2/src/main/java/seaweed/hdfs/SeaweedInputStream.java
@@ -2,6 +2,7 @@ package seaweed.hdfs;
// based on org.apache.hadoop.fs.azurebfs.services.AbfsInputStream
+import org.apache.hadoop.fs.ByteBufferReadable;
import org.apache.hadoop.fs.FSExceptionMessages;
import org.apache.hadoop.fs.FSInputStream;
import org.apache.hadoop.fs.FileSystem.Statistics;
@@ -13,9 +14,10 @@ import seaweedfs.client.SeaweedRead;
import java.io.EOFException;
import java.io.IOException;
+import java.nio.ByteBuffer;
import java.util.List;
-public class SeaweedInputStream extends FSInputStream {
+public class SeaweedInputStream extends FSInputStream implements ByteBufferReadable {
private static final Logger LOG = LoggerFactory.getLogger(SeaweedInputStream.class);
@@ -63,14 +65,8 @@ public class SeaweedInputStream extends FSInputStream {
}
@Override
- public synchronized int read(final byte[] b, final int off, final int len) throws IOException {
+ public int read(final byte[] b, final int off, final int len) throws IOException {
- if (position < 0) {
- throw new IllegalArgumentException("attempting to read from negative offset");
- }
- if (position >= contentLength) {
- return -1; // Hadoop prefers -1 to EOFException
- }
if (b == null) {
throw new IllegalArgumentException("null byte array passed in to read() method");
}
@@ -84,11 +80,29 @@ public class SeaweedInputStream extends FSInputStream {
throw new IllegalArgumentException("requested read length is more than will fit after requested offset in buffer");
}
+ ByteBuffer buf = ByteBuffer.wrap(b, off, len);
+ return read(buf);
+
+ }
+
+ // implement ByteBufferReadable
+ @Override
+ public synchronized int read(ByteBuffer buf) throws IOException {
+
+ if (position < 0) {
+ throw new IllegalArgumentException("attempting to read from negative offset");
+ }
+ if (position >= contentLength) {
+ return -1; // Hadoop prefers -1 to EOFException
+ }
+
long bytesRead = 0;
- if (position+len < entry.getContent().size()) {
- entry.getContent().copyTo(b, (int) position, (int) off, len);
+ int len = buf.remaining();
+ int start = (int) this.position;
+ if (start+len <= entry.getContent().size()) {
+ entry.getContent().substring(start, start+len).copyTo(buf);
} else {
- bytesRead = SeaweedRead.read(this.filerGrpcClient, this.visibleIntervalList, this.position, b, off, len, SeaweedRead.fileSize(entry));
+ bytesRead = SeaweedRead.read(this.filerGrpcClient, this.visibleIntervalList, this.position, buf, SeaweedRead.fileSize(entry));
}
if (bytesRead > Integer.MAX_VALUE) {
@@ -103,7 +117,6 @@ public class SeaweedInputStream extends FSInputStream {
}
return (int) bytesRead;
-
}
/**
diff --git a/other/java/hdfs3/src/main/java/seaweed/hdfs/BufferedByteBufferReadableInputStream.java b/other/java/hdfs3/src/main/java/seaweed/hdfs/BufferedByteBufferReadableInputStream.java
new file mode 100644
index 000000000..3d0b68a52
--- /dev/null
+++ b/other/java/hdfs3/src/main/java/seaweed/hdfs/BufferedByteBufferReadableInputStream.java
@@ -0,0 +1,25 @@
+package seaweed.hdfs;
+
+import org.apache.hadoop.fs.*;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+
+public class BufferedByteBufferReadableInputStream extends BufferedFSInputStream implements ByteBufferReadable {
+
+ public BufferedByteBufferReadableInputStream(FSInputStream in, int size) {
+ super(in, size);
+ if (!(in instanceof Seekable) || !(in instanceof PositionedReadable)) {
+ throw new IllegalArgumentException("In is not an instance of Seekable or PositionedReadable");
+ }
+ }
+
+ @Override
+ public int read(ByteBuffer buf) throws IOException {
+ if (this.in instanceof ByteBufferReadable) {
+ return ((ByteBufferReadable)this.in).read(buf);
+ } else {
+ throw new UnsupportedOperationException("Byte-buffer read unsupported by input stream");
+ }
+ }
+}
diff --git a/other/java/hdfs3/src/main/java/seaweed/hdfs/SeaweedFileSystem.java b/other/java/hdfs3/src/main/java/seaweed/hdfs/SeaweedFileSystem.java
index fb1f4c53b..84f11e846 100644
--- a/other/java/hdfs3/src/main/java/seaweed/hdfs/SeaweedFileSystem.java
+++ b/other/java/hdfs3/src/main/java/seaweed/hdfs/SeaweedFileSystem.java
@@ -82,7 +82,7 @@ public class SeaweedFileSystem extends FileSystem {
try {
int seaweedBufferSize = this.getConf().getInt(FS_SEAWEED_BUFFER_SIZE, FS_SEAWEED_DEFAULT_BUFFER_SIZE);
FSInputStream inputStream = seaweedFileSystemStore.openFileForRead(path, statistics);
- return new FSDataInputStream(new BufferedFSInputStream(inputStream, 4 * seaweedBufferSize));
+ return new FSDataInputStream(new BufferedByteBufferReadableInputStream(inputStream, 4 * seaweedBufferSize));
} catch (Exception ex) {
LOG.warn("open path: {} bufferSize:{}", path, bufferSize, ex);
return null;
diff --git a/other/java/hdfs3/src/main/java/seaweed/hdfs/SeaweedFileSystemStore.java b/other/java/hdfs3/src/main/java/seaweed/hdfs/SeaweedFileSystemStore.java
index 2ef1a7468..14b32528e 100644
--- a/other/java/hdfs3/src/main/java/seaweed/hdfs/SeaweedFileSystemStore.java
+++ b/other/java/hdfs3/src/main/java/seaweed/hdfs/SeaweedFileSystemStore.java
@@ -288,4 +288,4 @@ public class SeaweedFileSystemStore {
}
-} \ No newline at end of file
+}
diff --git a/other/java/hdfs3/src/main/java/seaweed/hdfs/SeaweedInputStream.java b/other/java/hdfs3/src/main/java/seaweed/hdfs/SeaweedInputStream.java
index 8e406206d..690366849 100644
--- a/other/java/hdfs3/src/main/java/seaweed/hdfs/SeaweedInputStream.java
+++ b/other/java/hdfs3/src/main/java/seaweed/hdfs/SeaweedInputStream.java
@@ -2,6 +2,7 @@ package seaweed.hdfs;
// based on org.apache.hadoop.fs.azurebfs.services.AbfsInputStream
+import org.apache.hadoop.fs.ByteBufferReadable;
import org.apache.hadoop.fs.FSExceptionMessages;
import org.apache.hadoop.fs.FSInputStream;
import org.apache.hadoop.fs.FileSystem.Statistics;
@@ -13,9 +14,10 @@ import seaweedfs.client.SeaweedRead;
import java.io.EOFException;
import java.io.IOException;
+import java.nio.ByteBuffer;
import java.util.List;
-public class SeaweedInputStream extends FSInputStream {
+public class SeaweedInputStream extends FSInputStream implements ByteBufferReadable {
private static final Logger LOG = LoggerFactory.getLogger(SeaweedInputStream.class);
@@ -63,14 +65,8 @@ public class SeaweedInputStream extends FSInputStream {
}
@Override
- public synchronized int read(final byte[] b, final int off, final int len) throws IOException {
+ public int read(final byte[] b, final int off, final int len) throws IOException {
- if (position < 0) {
- throw new IllegalArgumentException("attempting to read from negative offset");
- }
- if (position >= contentLength) {
- return -1; // Hadoop prefers -1 to EOFException
- }
if (b == null) {
throw new IllegalArgumentException("null byte array passed in to read() method");
}
@@ -84,11 +80,29 @@ public class SeaweedInputStream extends FSInputStream {
throw new IllegalArgumentException("requested read length is more than will fit after requested offset in buffer");
}
+ ByteBuffer buf = ByteBuffer.wrap(b, off, len);
+ return read(buf);
+
+ }
+
+ // implement ByteBufferReadable
+ @Override
+ public synchronized int read(ByteBuffer buf) throws IOException {
+
+ if (position < 0) {
+ throw new IllegalArgumentException("attempting to read from negative offset");
+ }
+ if (position >= contentLength) {
+ return -1; // Hadoop prefers -1 to EOFException
+ }
+
long bytesRead = 0;
- if (position+len < entry.getContent().size()) {
- entry.getContent().copyTo(b, (int) position, (int) off, len);
+ int len = buf.remaining();
+ int start = (int) this.position;
+ if (start+len <= entry.getContent().size()) {
+ entry.getContent().substring(start, start+len).copyTo(buf);
} else {
- bytesRead = SeaweedRead.read(this.filerGrpcClient, this.visibleIntervalList, this.position, b, off, len, SeaweedRead.fileSize(entry));
+ bytesRead = SeaweedRead.read(this.filerGrpcClient, this.visibleIntervalList, this.position, buf, SeaweedRead.fileSize(entry));
}
if (bytesRead > Integer.MAX_VALUE) {
@@ -103,7 +117,6 @@ public class SeaweedInputStream extends FSInputStream {
}
return (int) bytesRead;
-
}
/**
diff --git a/weed/server/webdav_server.go b/weed/server/webdav_server.go
index f4b6a6f28..2b238e534 100644
--- a/weed/server/webdav_server.go
+++ b/weed/server/webdav_server.go
@@ -1,6 +1,7 @@
package weed_server
import (
+ "bytes"
"context"
"fmt"
"io"
@@ -10,6 +11,7 @@ import (
"strings"
"time"
+ "github.com/chrislusf/seaweedfs/weed/util/buffered_writer"
"golang.org/x/net/webdav"
"google.golang.org/grpc"
@@ -96,6 +98,9 @@ type WebDavFile struct {
entry *filer_pb.Entry
entryViewCache []filer.VisibleInterval
reader io.ReaderAt
+ bufWriter *buffered_writer.BufferedWriteCloser
+ collection string
+ replication string
}
func NewWebDavFileSystem(option *WebDavOption) (webdav.FileSystem, error) {
@@ -232,6 +237,7 @@ func (fs *WebDavFileSystem) OpenFile(ctx context.Context, fullFilePath string, f
fs: fs,
name: fullFilePath,
isDirectory: false,
+ bufWriter: buffered_writer.NewBufferedWriteCloser(4 * 1024 * 1024),
}, nil
}
@@ -247,6 +253,7 @@ func (fs *WebDavFileSystem) OpenFile(ctx context.Context, fullFilePath string, f
fs: fs,
name: fullFilePath,
isDirectory: false,
+ bufWriter: buffered_writer.NewBufferedWriteCloser(4 * 1024 * 1024),
}, nil
}
@@ -358,36 +365,20 @@ func (fs *WebDavFileSystem) Stat(ctx context.Context, name string) (os.FileInfo,
return fs.stat(ctx, name)
}
-func (f *WebDavFile) Write(buf []byte) (int, error) {
-
- glog.V(2).Infof("WebDavFileSystem.Write %v", f.name)
-
- dir, _ := util.FullPath(f.name).DirAndName()
-
- var err error
- ctx := context.Background()
- if f.entry == nil {
- f.entry, err = filer_pb.GetEntry(f.fs, util.FullPath(f.name))
- }
-
- if f.entry == nil {
- return 0, err
- }
- if err != nil {
- return 0, err
- }
+func (f *WebDavFile) saveDataAsChunk(reader io.Reader, name string, offset int64) (chunk *filer_pb.FileChunk, collection, replication string, err error) {
var fileId, host string
var auth security.EncodedJwt
- var collection, replication string
- if err = f.fs.WithFilerClient(func(client filer_pb.SeaweedFilerClient) error {
+ if flushErr := f.fs.WithFilerClient(func(client filer_pb.SeaweedFilerClient) error {
+
+ ctx := context.Background()
request := &filer_pb.AssignVolumeRequest{
Count: 1,
Replication: "",
Collection: f.fs.option.Collection,
- Path: f.name,
+ Path: name,
}
resp, err := client.AssignVolume(ctx, request)
@@ -400,63 +391,113 @@ func (f *WebDavFile) Write(buf []byte) (int, error) {
}
fileId, host, auth = resp.FileId, resp.Url, security.EncodedJwt(resp.Auth)
- collection, replication = resp.Collection, resp.Replication
+ f.collection, f.replication = resp.Collection, resp.Replication
return nil
- }); err != nil {
- return 0, fmt.Errorf("filerGrpcAddress assign volume: %v", err)
+ }); flushErr != nil {
+ return nil, f.collection, f.replication, fmt.Errorf("filerGrpcAddress assign volume: %v", flushErr)
}
fileUrl := fmt.Sprintf("http://%s/%s", host, fileId)
- uploadResult, err := operation.UploadData(fileUrl, f.name, f.fs.option.Cipher, buf, false, "", nil, auth)
- if err != nil {
- glog.V(0).Infof("upload data %v to %s: %v", f.name, fileUrl, err)
- return 0, fmt.Errorf("upload data: %v", err)
+ uploadResult, flushErr, _ := operation.Upload(fileUrl, f.name, f.fs.option.Cipher, reader, false, "", nil, auth)
+ if flushErr != nil {
+ glog.V(0).Infof("upload data %v to %s: %v", f.name, fileUrl, flushErr)
+ return nil, f.collection, f.replication, fmt.Errorf("upload data: %v", flushErr)
}
if uploadResult.Error != "" {
- glog.V(0).Infof("upload failure %v to %s: %v", f.name, fileUrl, err)
- return 0, fmt.Errorf("upload result: %v", uploadResult.Error)
+ glog.V(0).Infof("upload failure %v to %s: %v", f.name, fileUrl, flushErr)
+ return nil, f.collection, f.replication, fmt.Errorf("upload result: %v", uploadResult.Error)
+ }
+ return uploadResult.ToPbFileChunk(fileId, offset), f.collection, f.replication, nil
+}
+
+func (f *WebDavFile) Write(buf []byte) (int, error) {
+
+ glog.V(2).Infof("WebDavFileSystem.Write %v", f.name)
+
+ dir, _ := util.FullPath(f.name).DirAndName()
+
+ var getErr error
+ ctx := context.Background()
+ if f.entry == nil {
+ f.entry, getErr = filer_pb.GetEntry(f.fs, util.FullPath(f.name))
+ }
+
+ if f.entry == nil {
+ return 0, getErr
+ }
+ if getErr != nil {
+ return 0, getErr
}
- f.entry.Content = nil
- f.entry.Chunks = append(f.entry.Chunks, uploadResult.ToPbFileChunk(fileId, f.off))
+ if f.bufWriter.FlushFunc == nil {
+ f.bufWriter.FlushFunc = func(data []byte, offset int64) (flushErr error) {
- err = f.fs.WithFilerClient(func(client filer_pb.SeaweedFilerClient) error {
- f.entry.Attributes.Mtime = time.Now().Unix()
- f.entry.Attributes.Collection = collection
- f.entry.Attributes.Replication = replication
+ var chunk *filer_pb.FileChunk
+ chunk, f.collection, f.replication, flushErr = f.saveDataAsChunk(bytes.NewReader(data), f.name, offset)
- request := &filer_pb.UpdateEntryRequest{
- Directory: dir,
- Entry: f.entry,
- Signatures: []int32{f.fs.signature},
+ if flushErr != nil {
+ return fmt.Errorf("%s upload result: %v", f.name, flushErr)
+ }
+
+ f.entry.Content = nil
+ f.entry.Chunks = append(f.entry.Chunks, chunk)
+
+ return flushErr
}
+ f.bufWriter.CloseFunc = func() error {
+
+ manifestedChunks, manifestErr := filer.MaybeManifestize(f.saveDataAsChunk, f.entry.Chunks)
+ if manifestErr != nil {
+ // not good, but should be ok
+ glog.V(0).Infof("file %s close MaybeManifestize: %v", f.name, manifestErr)
+ } else {
+ f.entry.Chunks = manifestedChunks
+ }
+
+ flushErr := f.fs.WithFilerClient(func(client filer_pb.SeaweedFilerClient) error {
+ f.entry.Attributes.Mtime = time.Now().Unix()
+ f.entry.Attributes.Collection = f.collection
+ f.entry.Attributes.Replication = f.replication
+
+ request := &filer_pb.UpdateEntryRequest{
+ Directory: dir,
+ Entry: f.entry,
+ Signatures: []int32{f.fs.signature},
+ }
- if _, err := client.UpdateEntry(ctx, request); err != nil {
- return fmt.Errorf("update %s: %v", f.name, err)
+ if _, err := client.UpdateEntry(ctx, request); err != nil {
+ return fmt.Errorf("update %s: %v", f.name, err)
+ }
+
+ return nil
+ })
+ return flushErr
}
+ }
- return nil
- })
+ written, err := f.bufWriter.Write(buf)
if err == nil {
glog.V(3).Infof("WebDavFileSystem.Write %v: written [%d,%d)", f.name, f.off, f.off+int64(len(buf)))
- f.off += int64(len(buf))
+ f.off += int64(written)
}
- return len(buf), err
+ return written, err
}
func (f *WebDavFile) Close() error {
glog.V(2).Infof("WebDavFileSystem.Close %v", f.name)
+ err := f.bufWriter.Close()
+
if f.entry != nil {
f.entry = nil
f.entryViewCache = nil
}
- return nil
+ return err
}
func (f *WebDavFile) Read(p []byte) (readSize int, err error) {
diff --git a/weed/storage/volume_vacuum.go b/weed/storage/volume_vacuum.go
index f7b16b7db..5884eca87 100644
--- a/weed/storage/volume_vacuum.go
+++ b/weed/storage/volume_vacuum.go
@@ -180,9 +180,15 @@ func (v *Volume) makeupDiff(newDatFileName, newIdxFileName, oldDatFileName, oldI
var indexSize int64
oldIdxFile, err := os.Open(oldIdxFileName)
+ if err != nil {
+ return fmt.Errorf("makeupDiff open %s failed: %v", oldIdxFileName, err)
+ }
defer oldIdxFile.Close()
oldDatFile, err := os.Open(oldDatFileName)
+ if err != nil {
+ return fmt.Errorf("makeupDiff open %s failed: %v", oldDatFileName, err)
+ }
oldDatBackend := backend.NewDiskFile(oldDatFile)
defer oldDatBackend.Close()
diff --git a/weed/topology/topology_vacuum.go b/weed/topology/topology_vacuum.go
index 04c26bc4d..89eefefc7 100644
--- a/weed/topology/topology_vacuum.go
+++ b/weed/topology/topology_vacuum.go
@@ -108,7 +108,7 @@ func batchVacuumVolumeCommit(grpcDialOption grpc.DialOption, vl *VolumeLayout, v
resp, err := volumeServerClient.VacuumVolumeCommit(context.Background(), &volume_server_pb.VacuumVolumeCommitRequest{
VolumeId: uint32(vid),
})
- if resp.IsReadOnly {
+ if resp != nil && resp.IsReadOnly {
isReadOnly = true
}
return err
diff --git a/weed/util/buffered_writer/buffered_writer.go b/weed/util/buffered_writer/buffered_writer.go
new file mode 100644
index 000000000..73d9f4995
--- /dev/null
+++ b/weed/util/buffered_writer/buffered_writer.go
@@ -0,0 +1,52 @@
+package buffered_writer
+
+import (
+ "bytes"
+ "io"
+)
+
+var _ = io.WriteCloser(&BufferedWriteCloser{})
+
+type BufferedWriteCloser struct {
+ buffer bytes.Buffer
+ bufferLimit int
+ position int64
+ nextFlushOffset int64
+ FlushFunc func([]byte, int64) error
+ CloseFunc func() error
+}
+
+func NewBufferedWriteCloser(bufferLimit int) *BufferedWriteCloser {
+ return &BufferedWriteCloser{
+ bufferLimit: bufferLimit,
+ }
+}
+
+func (b *BufferedWriteCloser) Write(p []byte) (n int, err error) {
+
+ if b.buffer.Len()+len(p) >= b.bufferLimit {
+ if err := b.FlushFunc(b.buffer.Bytes(), b.nextFlushOffset); err != nil {
+ return 0, err
+ }
+ b.nextFlushOffset += int64(b.buffer.Len())
+ b.buffer.Reset()
+ }
+
+ return b.buffer.Write(p)
+
+}
+
+func (b *BufferedWriteCloser) Close() error {
+ if b.buffer.Len() > 0 {
+ if err := b.FlushFunc(b.buffer.Bytes(), b.nextFlushOffset); err != nil {
+ return err
+ }
+ }
+ if b.CloseFunc != nil {
+ if err := b.CloseFunc(); err != nil {
+ return err
+ }
+ }
+
+ return nil
+}