aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorChris Lu <chris.lu@gmail.com>2020-12-01 15:32:27 -0800
committerChris Lu <chris.lu@gmail.com>2020-12-01 15:32:27 -0800
commit04062c56c74591c32380bbdfb589a9c641e05fa5 (patch)
treeb309756f284bd347c90fbe0a90403f3c70868f56
parent005a6123e98170b2bdf99eb5b8a67ca3cea94190 (diff)
downloadseaweedfs-04062c56c74591c32380bbdfb589a9c641e05fa5.tar.xz
seaweedfs-04062c56c74591c32380bbdfb589a9c641e05fa5.zip
webdav: improve webdav upload speed
-rw-r--r--weed/server/webdav_server.go135
-rw-r--r--weed/util/buffered_writer/buffered_writer.go52
2 files changed, 140 insertions, 47 deletions
diff --git a/weed/server/webdav_server.go b/weed/server/webdav_server.go
index f4b6a6f28..8547fe629 100644
--- a/weed/server/webdav_server.go
+++ b/weed/server/webdav_server.go
@@ -1,6 +1,7 @@
package weed_server
import (
+ "bytes"
"context"
"fmt"
"io"
@@ -10,6 +11,7 @@ import (
"strings"
"time"
+ "github.com/chrislusf/seaweedfs/weed/util/buffered_writer"
"golang.org/x/net/webdav"
"google.golang.org/grpc"
@@ -96,6 +98,9 @@ type WebDavFile struct {
entry *filer_pb.Entry
entryViewCache []filer.VisibleInterval
reader io.ReaderAt
+ bufWriter *buffered_writer.BufferedWriteCloser
+ collection string
+ replication string
}
func NewWebDavFileSystem(option *WebDavOption) (webdav.FileSystem, error) {
@@ -232,6 +237,7 @@ func (fs *WebDavFileSystem) OpenFile(ctx context.Context, fullFilePath string, f
fs: fs,
name: fullFilePath,
isDirectory: false,
+ bufWriter: buffered_writer.NewBufferedWriteCloser(4 * 1024 * 1024),
}, nil
}
@@ -247,6 +253,7 @@ func (fs *WebDavFileSystem) OpenFile(ctx context.Context, fullFilePath string, f
fs: fs,
name: fullFilePath,
isDirectory: false,
+ bufWriter: buffered_writer.NewBufferedWriteCloser(4 * 1024 * 1024),
}, nil
}
@@ -358,36 +365,20 @@ func (fs *WebDavFileSystem) Stat(ctx context.Context, name string) (os.FileInfo,
return fs.stat(ctx, name)
}
-func (f *WebDavFile) Write(buf []byte) (int, error) {
-
- glog.V(2).Infof("WebDavFileSystem.Write %v", f.name)
-
- dir, _ := util.FullPath(f.name).DirAndName()
-
- var err error
- ctx := context.Background()
- if f.entry == nil {
- f.entry, err = filer_pb.GetEntry(f.fs, util.FullPath(f.name))
- }
-
- if f.entry == nil {
- return 0, err
- }
- if err != nil {
- return 0, err
- }
+func (f *WebDavFile) saveDataAsChunk(reader io.Reader, name string, offset int64) (chunk *filer_pb.FileChunk, collection, replication string, err error) {
var fileId, host string
var auth security.EncodedJwt
- var collection, replication string
- if err = f.fs.WithFilerClient(func(client filer_pb.SeaweedFilerClient) error {
+ if flushErr := f.fs.WithFilerClient(func(client filer_pb.SeaweedFilerClient) error {
+
+ ctx := context.Background()
request := &filer_pb.AssignVolumeRequest{
Count: 1,
Replication: "",
Collection: f.fs.option.Collection,
- Path: f.name,
+ Path: name,
}
resp, err := client.AssignVolume(ctx, request)
@@ -400,63 +391,113 @@ func (f *WebDavFile) Write(buf []byte) (int, error) {
}
fileId, host, auth = resp.FileId, resp.Url, security.EncodedJwt(resp.Auth)
- collection, replication = resp.Collection, resp.Replication
+ f.collection, f.replication = resp.Collection, resp.Replication
return nil
- }); err != nil {
- return 0, fmt.Errorf("filerGrpcAddress assign volume: %v", err)
+ }); flushErr != nil {
+ return nil, f.collection, f.replication, fmt.Errorf("filerGrpcAddress assign volume: %v", flushErr)
}
fileUrl := fmt.Sprintf("http://%s/%s", host, fileId)
- uploadResult, err := operation.UploadData(fileUrl, f.name, f.fs.option.Cipher, buf, false, "", nil, auth)
- if err != nil {
- glog.V(0).Infof("upload data %v to %s: %v", f.name, fileUrl, err)
- return 0, fmt.Errorf("upload data: %v", err)
+ uploadResult, flushErr, _ := operation.Upload(fileUrl, f.name, f.fs.option.Cipher, reader, false, "", nil, auth)
+ if flushErr != nil {
+ glog.V(0).Infof("upload data %v to %s: %v", f.name, fileUrl, flushErr)
+ return nil, f.collection, f.replication, fmt.Errorf("upload data: %v", flushErr)
}
if uploadResult.Error != "" {
- glog.V(0).Infof("upload failure %v to %s: %v", f.name, fileUrl, err)
- return 0, fmt.Errorf("upload result: %v", uploadResult.Error)
+ glog.V(0).Infof("upload failure %v to %s: %v", f.name, fileUrl, flushErr)
+ return nil, f.collection, f.replication, fmt.Errorf("upload result: %v", uploadResult.Error)
+ }
+ return uploadResult.ToPbFileChunk(fileId, offset), f.collection, f.replication, nil
+}
+
+func (f *WebDavFile) Write(buf []byte) (int, error) {
+
+ glog.V(2).Infof("WebDavFileSystem.Write %v", f.name)
+
+ dir, _ := util.FullPath(f.name).DirAndName()
+
+ var getErr error
+ ctx := context.Background()
+ if f.entry == nil {
+ f.entry, getErr = filer_pb.GetEntry(f.fs, util.FullPath(f.name))
+ }
+
+ if f.entry == nil {
+ return 0, getErr
+ }
+ if getErr != nil {
+ return 0, getErr
}
- f.entry.Content = nil
- f.entry.Chunks = append(f.entry.Chunks, uploadResult.ToPbFileChunk(fileId, f.off))
+ if f.bufWriter.FlushFunc == nil {
+ f.bufWriter.FlushFunc = func(data []byte, offset int64) (flushErr error) {
- err = f.fs.WithFilerClient(func(client filer_pb.SeaweedFilerClient) error {
- f.entry.Attributes.Mtime = time.Now().Unix()
- f.entry.Attributes.Collection = collection
- f.entry.Attributes.Replication = replication
+ var chunk *filer_pb.FileChunk
+ chunk, f.collection, f.replication, flushErr = f.saveDataAsChunk(bytes.NewReader(data), f.name, offset)
- request := &filer_pb.UpdateEntryRequest{
- Directory: dir,
- Entry: f.entry,
- Signatures: []int32{f.fs.signature},
+ if flushErr != nil {
+ return fmt.Errorf("%s upload result: %v", f.name, flushErr)
+ }
+
+ f.entry.Content = nil
+ f.entry.Chunks = append(f.entry.Chunks, chunk)
+
+ return flushErr
}
+ f.bufWriter.CloseFunc = func() error {
+
+ manifestedChunks, manifestErr := filer.MaybeManifestize(f.saveDataAsChunk, f.entry.Chunks)
+ if manifestErr != nil {
+ // not good, but should be ok
+ glog.V(0).Infof("file %s close MaybeManifestize: %v", manifestErr)
+ } else {
+ f.entry.Chunks = manifestedChunks
+ }
+
+ flushErr := f.fs.WithFilerClient(func(client filer_pb.SeaweedFilerClient) error {
+ f.entry.Attributes.Mtime = time.Now().Unix()
+ f.entry.Attributes.Collection = f.collection
+ f.entry.Attributes.Replication = f.replication
+
+ request := &filer_pb.UpdateEntryRequest{
+ Directory: dir,
+ Entry: f.entry,
+ Signatures: []int32{f.fs.signature},
+ }
- if _, err := client.UpdateEntry(ctx, request); err != nil {
- return fmt.Errorf("update %s: %v", f.name, err)
+ if _, err := client.UpdateEntry(ctx, request); err != nil {
+ return fmt.Errorf("update %s: %v", f.name, err)
+ }
+
+ return nil
+ })
+ return flushErr
}
+ }
- return nil
- })
+ written, err := f.bufWriter.Write(buf)
if err == nil {
glog.V(3).Infof("WebDavFileSystem.Write %v: written [%d,%d)", f.name, f.off, f.off+int64(len(buf)))
- f.off += int64(len(buf))
+ f.off += int64(written)
}
- return len(buf), err
+ return written, err
}
func (f *WebDavFile) Close() error {
glog.V(2).Infof("WebDavFileSystem.Close %v", f.name)
+ err := f.bufWriter.Close()
+
if f.entry != nil {
f.entry = nil
f.entryViewCache = nil
}
- return nil
+ return err
}
func (f *WebDavFile) Read(p []byte) (readSize int, err error) {
diff --git a/weed/util/buffered_writer/buffered_writer.go b/weed/util/buffered_writer/buffered_writer.go
new file mode 100644
index 000000000..73d9f4995
--- /dev/null
+++ b/weed/util/buffered_writer/buffered_writer.go
@@ -0,0 +1,52 @@
+package buffered_writer
+
+import (
+ "bytes"
+ "io"
+)
+
+var _ = io.WriteCloser(&BufferedWriteCloser{})
+
+type BufferedWriteCloser struct {
+ buffer bytes.Buffer
+ bufferLimit int
+ position int64
+ nextFlushOffset int64
+ FlushFunc func([]byte, int64) error
+ CloseFunc func() error
+}
+
+func NewBufferedWriteCloser(bufferLimit int) *BufferedWriteCloser {
+ return &BufferedWriteCloser{
+ bufferLimit: bufferLimit,
+ }
+}
+
+func (b *BufferedWriteCloser) Write(p []byte) (n int, err error) {
+
+ if b.buffer.Len()+len(p) >= b.bufferLimit {
+ if err := b.FlushFunc(b.buffer.Bytes(), b.nextFlushOffset); err != nil {
+ return 0, err
+ }
+ b.nextFlushOffset += int64(b.buffer.Len())
+ b.buffer.Reset()
+ }
+
+ return b.buffer.Write(p)
+
+}
+
+func (b *BufferedWriteCloser) Close() error {
+ if b.buffer.Len() > 0 {
+ if err := b.FlushFunc(b.buffer.Bytes(), b.nextFlushOffset); err != nil {
+ return err
+ }
+ }
+ if b.CloseFunc != nil {
+ if err := b.CloseFunc(); err != nil {
+ return err
+ }
+ }
+
+ return nil
+}