diff options
| author | shibinbin <shibinbin@megvii.com> | 2020-06-04 17:24:18 +0800 |
|---|---|---|
| committer | shibinbin <shibinbin@megvii.com> | 2020-06-04 17:24:18 +0800 |
| commit | 40334bc28d3fa694ce59b4e65077efb845264d20 (patch) | |
| tree | a085e2e33851c4d916bef2952abc7cfbfe95ee88 /weed/server/webdav_server.go | |
| parent | d892cad15d748327c2b7c649f6398ff35d8dce0b (diff) | |
| parent | fbed2e9026b71c810dd86bd826c9e068e93d3c48 (diff) | |
| download | seaweedfs-40334bc28d3fa694ce59b4e65077efb845264d20.tar.xz seaweedfs-40334bc28d3fa694ce59b4e65077efb845264d20.zip | |
Merge remote-tracking branch 'upstream/master'
Diffstat (limited to 'weed/server/webdav_server.go')
| -rw-r--r-- | weed/server/webdav_server.go | 141 |
1 files changed, 68 insertions, 73 deletions
diff --git a/weed/server/webdav_server.go b/weed/server/webdav_server.go index d75869f30..37c4afd5c 100644 --- a/weed/server/webdav_server.go +++ b/weed/server/webdav_server.go @@ -1,21 +1,24 @@ package weed_server import ( - "bytes" "context" "fmt" "io" + "math" "os" "path" "strings" "time" + "github.com/chrislusf/seaweedfs/weed/util/grace" "golang.org/x/net/webdav" "google.golang.org/grpc" "github.com/chrislusf/seaweedfs/weed/operation" + "github.com/chrislusf/seaweedfs/weed/pb" "github.com/chrislusf/seaweedfs/weed/pb/filer_pb" "github.com/chrislusf/seaweedfs/weed/util" + "github.com/chrislusf/seaweedfs/weed/util/chunk_cache" "github.com/chrislusf/seaweedfs/weed/filer2" "github.com/chrislusf/seaweedfs/weed/glog" @@ -31,6 +34,9 @@ type WebDavOption struct { Collection string Uid uint32 Gid uint32 + Cipher bool + CacheDir string + CacheSizeMB int64 } type WebDavServer struct { @@ -64,6 +70,7 @@ type WebDavFileSystem struct { secret security.SigningKey filer *filer2.Filer grpcDialOption grpc.DialOption + chunkCache *chunk_cache.ChunkCache } type FileInfo struct { @@ -88,22 +95,34 @@ type WebDavFile struct { off int64 entry *filer_pb.Entry entryViewCache []filer2.VisibleInterval + reader io.ReaderAt } func NewWebDavFileSystem(option *WebDavOption) (webdav.FileSystem, error) { + + chunkCache := chunk_cache.NewChunkCache(256, option.CacheDir, option.CacheSizeMB) + grace.OnInterrupt(func() { + chunkCache.Shutdown() + }) return &WebDavFileSystem{ - option: option, + option: option, + chunkCache: chunkCache, }, nil } -func (fs *WebDavFileSystem) WithFilerClient(ctx context.Context, fn func(context.Context, filer_pb.SeaweedFilerClient) error) error { +var _ = filer_pb.FilerClient(&WebDavFileSystem{}) - return util.WithCachedGrpcClient(ctx, func(ctx2 context.Context, grpcConnection *grpc.ClientConn) error { +func (fs *WebDavFileSystem) WithFilerClient(fn func(filer_pb.SeaweedFilerClient) error) error { + + return pb.WithCachedGrpcClient(func(grpcConnection *grpc.ClientConn) error { client := filer_pb.NewSeaweedFilerClient(grpcConnection) - return fn(ctx2, client) + return fn(client) }, fs.option.FilerGrpcAddress, fs.option.GrpcDialOption) } +func (fs *WebDavFileSystem) AdjustedUrl(hostAndPort string) string { + return hostAndPort +} func clearName(name string) (string, error) { slashed := strings.HasSuffix(name, "/") @@ -135,8 +154,8 @@ func (fs *WebDavFileSystem) Mkdir(ctx context.Context, fullDirPath string, perm return os.ErrExist } - return fs.WithFilerClient(ctx, func(ctx context.Context, client filer_pb.SeaweedFilerClient) error { - dir, name := filer2.FullPath(fullDirPath).DirAndName() + return fs.WithFilerClient(func(client filer_pb.SeaweedFilerClient) error { + dir, name := util.FullPath(fullDirPath).DirAndName() request := &filer_pb.CreateEntryRequest{ Directory: dir, Entry: &filer_pb.Entry{ @@ -153,7 +172,7 @@ func (fs *WebDavFileSystem) Mkdir(ctx context.Context, fullDirPath string, perm } glog.V(1).Infof("mkdir: %v", request) - if err := filer_pb.CreateEntry(ctx, client, request); err != nil { + if err := filer_pb.CreateEntry(client, request); err != nil { return fmt.Errorf("mkdir %s/%s: %v", dir, name, err) } @@ -183,9 +202,9 @@ func (fs *WebDavFileSystem) OpenFile(ctx context.Context, fullFilePath string, f fs.removeAll(ctx, fullFilePath) } - dir, name := filer2.FullPath(fullFilePath).DirAndName() - err = fs.WithFilerClient(ctx, func(ctx context.Context, client filer_pb.SeaweedFilerClient) error { - if err := filer_pb.CreateEntry(ctx, client, &filer_pb.CreateEntryRequest{ + dir, name := util.FullPath(fullFilePath).DirAndName() + err = fs.WithFilerClient(func(client filer_pb.SeaweedFilerClient) error { + if err := filer_pb.CreateEntry(client, &filer_pb.CreateEntryRequest{ Directory: dir, Entry: &filer_pb.Entry{ Name: name, @@ -238,34 +257,10 @@ func (fs *WebDavFileSystem) removeAll(ctx context.Context, fullFilePath string) return err } - fi, err := fs.stat(ctx, fullFilePath) - if err != nil { - return err - } - - if fi.IsDir() { - //_, err = fs.db.Exec(`delete from filesystem where fullFilePath like $1 escape '\'`, strings.Replace(fullFilePath, `%`, `\%`, -1)+`%`) - } else { - //_, err = fs.db.Exec(`delete from filesystem where fullFilePath = ?`, fullFilePath) - } - dir, name := filer2.FullPath(fullFilePath).DirAndName() - err = fs.WithFilerClient(ctx, func(ctx context.Context, client filer_pb.SeaweedFilerClient) error { - - request := &filer_pb.DeleteEntryRequest{ - Directory: dir, - Name: name, - IsDeleteData: true, - } + dir, name := util.FullPath(fullFilePath).DirAndName() - glog.V(3).Infof("removing entry: %v", request) - _, err := client.DeleteEntry(ctx, request) - if err != nil { - return fmt.Errorf("remove %s: %v", fullFilePath, err) - } + return filer_pb.Remove(fs, dir, name, true, false, false) - return nil - }) - return err } func (fs *WebDavFileSystem) RemoveAll(ctx context.Context, name string) error { @@ -305,10 +300,10 @@ func (fs *WebDavFileSystem) Rename(ctx context.Context, oldName, newName string) return os.ErrExist } - oldDir, oldBaseName := filer2.FullPath(oldName).DirAndName() - newDir, newBaseName := filer2.FullPath(newName).DirAndName() + oldDir, oldBaseName := util.FullPath(oldName).DirAndName() + newDir, newBaseName := util.FullPath(newName).DirAndName() - return fs.WithFilerClient(ctx, func(ctx context.Context, client filer_pb.SeaweedFilerClient) error { + return fs.WithFilerClient(func(client filer_pb.SeaweedFilerClient) error { request := &filer_pb.AtomicRenameEntryRequest{ OldDirectory: oldDir, @@ -333,10 +328,10 @@ func (fs *WebDavFileSystem) stat(ctx context.Context, fullFilePath string) (os.F return nil, err } - fullpath := filer2.FullPath(fullFilePath) + fullpath := util.FullPath(fullFilePath) var fi FileInfo - entry, err := filer2.GetEntry(ctx, fs, fullpath) + entry, err := filer_pb.GetEntry(fs, fullpath) if entry == nil { return nil, os.ErrNotExist } @@ -367,10 +362,12 @@ func (f *WebDavFile) Write(buf []byte) (int, error) { glog.V(2).Infof("WebDavFileSystem.Write %v", f.name) + dir, _ := util.FullPath(f.name).DirAndName() + var err error ctx := context.Background() if f.entry == nil { - f.entry, err = filer2.GetEntry(ctx, f.fs, filer2.FullPath(f.name)) + f.entry, err = filer_pb.GetEntry(f.fs, util.FullPath(f.name)) } if f.entry == nil { @@ -382,13 +379,15 @@ func (f *WebDavFile) Write(buf []byte) (int, error) { var fileId, host string var auth security.EncodedJwt + var collection, replication string - if err = f.fs.WithFilerClient(ctx, func(ctx context.Context, client filer_pb.SeaweedFilerClient) error { + if err = f.fs.WithFilerClient(func(client filer_pb.SeaweedFilerClient) error { request := &filer_pb.AssignVolumeRequest{ Count: 1, - Replication: "000", + Replication: "", Collection: f.fs.option.Collection, + ParentPath: dir, } resp, err := client.AssignVolume(ctx, request) @@ -396,8 +395,12 @@ func (f *WebDavFile) Write(buf []byte) (int, error) { glog.V(0).Infof("assign volume failure %v: %v", request, err) return err } + if resp.Error != "" { + return fmt.Errorf("assign volume failure %v: %v", request, resp.Error) + } fileId, host, auth = resp.FileId, resp.Url, security.EncodedJwt(resp.Auth) + collection, replication = resp.Collection, resp.Replication return nil }); err != nil { @@ -405,8 +408,7 @@ func (f *WebDavFile) Write(buf []byte) (int, error) { } fileUrl := fmt.Sprintf("http://%s/%s", host, fileId) - bufReader := bytes.NewReader(buf) - uploadResult, err := operation.Upload(fileUrl, f.name, bufReader, false, "", nil, auth) + uploadResult, err := operation.UploadData(fileUrl, f.name, f.fs.option.Cipher, buf, false, "", nil, auth) if err != nil { glog.V(0).Infof("upload data %v to %s: %v", f.name, fileUrl, err) return 0, fmt.Errorf("upload data: %v", err) @@ -416,19 +418,12 @@ func (f *WebDavFile) Write(buf []byte) (int, error) { return 0, fmt.Errorf("upload result: %v", uploadResult.Error) } - chunk := &filer_pb.FileChunk{ - FileId: fileId, - Offset: f.off, - Size: uint64(len(buf)), - Mtime: time.Now().UnixNano(), - ETag: uploadResult.ETag, - } - - f.entry.Chunks = append(f.entry.Chunks, chunk) - dir, _ := filer2.FullPath(f.name).DirAndName() + f.entry.Chunks = append(f.entry.Chunks, uploadResult.ToPbFileChunk(fileId, f.off)) - err = f.fs.WithFilerClient(ctx, func(ctx context.Context, client filer_pb.SeaweedFilerClient) error { + err = f.fs.WithFilerClient(func(client filer_pb.SeaweedFilerClient) error { f.entry.Attributes.Mtime = time.Now().Unix() + f.entry.Attributes.Collection = collection + f.entry.Attributes.Replication = replication request := &filer_pb.UpdateEntryRequest{ Directory: dir, @@ -465,10 +460,9 @@ func (f *WebDavFile) Close() error { func (f *WebDavFile) Read(p []byte) (readSize int, err error) { glog.V(2).Infof("WebDavFileSystem.Read %v", f.name) - ctx := context.Background() if f.entry == nil { - f.entry, err = filer2.GetEntry(ctx, f.fs, filer2.FullPath(f.name)) + f.entry, err = filer_pb.GetEntry(f.fs, util.FullPath(f.name)) } if f.entry == nil { return 0, err @@ -481,33 +475,33 @@ func (f *WebDavFile) Read(p []byte) (readSize int, err error) { } if f.entryViewCache == nil { f.entryViewCache = filer2.NonOverlappingVisibleIntervals(f.entry.Chunks) + f.reader = nil } - chunkViews := filer2.ViewFromVisibleIntervals(f.entryViewCache, f.off, len(p)) - - totalRead, err := filer2.ReadIntoBuffer(ctx, f.fs, filer2.FullPath(f.name), p, chunkViews, f.off) - if err != nil { - return 0, err + if f.reader == nil { + chunkViews := filer2.ViewFromVisibleIntervals(f.entryViewCache, 0, math.MaxInt32) + f.reader = filer2.NewChunkReaderAtFromClient(f.fs, chunkViews, f.fs.chunkCache) } - readSize = int(totalRead) - glog.V(3).Infof("WebDavFileSystem.Read %v: [%d,%d)", f.name, f.off, f.off+totalRead) + readSize, err = f.reader.ReadAt(p, f.off) - f.off += totalRead - if readSize == 0 { - return 0, io.EOF + glog.V(3).Infof("WebDavFileSystem.Read %v: [%d,%d)", f.name, f.off, f.off+int64(readSize)) + f.off += int64(readSize) + + if err != nil { + glog.Errorf("file read %s: %v", f.name, err) } return + } func (f *WebDavFile) Readdir(count int) (ret []os.FileInfo, err error) { glog.V(2).Infof("WebDavFileSystem.Readdir %v count %d", f.name, count) - ctx := context.Background() - dir, _ := filer2.FullPath(f.name).DirAndName() + dir, _ := util.FullPath(f.name).DirAndName() - err = filer2.ReadDirAllEntries(ctx, f.fs, filer2.FullPath(dir), "", func(entry *filer_pb.Entry, isLast bool) { + err = filer_pb.ReadDirAllEntries(f.fs, util.FullPath(dir), "", func(entry *filer_pb.Entry, isLast bool) error { fi := FileInfo{ size: int64(filer2.TotalSize(entry.GetChunks())), name: entry.Name, @@ -521,6 +515,7 @@ func (f *WebDavFile) Readdir(count int) (ret []os.FileInfo, err error) { } glog.V(4).Infof("entry: %v", fi.name) ret = append(ret, &fi) + return nil }) old := f.off |
