diff options
Diffstat (limited to 'weed/server/webdav_server.go')
| -rw-r--r-- | weed/server/webdav_server.go | 285 |
1 files changed, 163 insertions, 122 deletions
diff --git a/weed/server/webdav_server.go b/weed/server/webdav_server.go index abd0b66eb..c3f68fdee 100644 --- a/weed/server/webdav_server.go +++ b/weed/server/webdav_server.go @@ -5,21 +5,23 @@ import ( "context" "fmt" "io" + "math" "os" "path" "strings" "time" + "github.com/chrislusf/seaweedfs/weed/util/buffered_writer" "golang.org/x/net/webdav" "google.golang.org/grpc" "github.com/chrislusf/seaweedfs/weed/operation" + "github.com/chrislusf/seaweedfs/weed/pb" "github.com/chrislusf/seaweedfs/weed/pb/filer_pb" "github.com/chrislusf/seaweedfs/weed/util" + "github.com/chrislusf/seaweedfs/weed/util/chunk_cache" - "github.com/spf13/viper" - - "github.com/chrislusf/seaweedfs/weed/filer2" + "github.com/chrislusf/seaweedfs/weed/filer" "github.com/chrislusf/seaweedfs/weed/glog" "github.com/chrislusf/seaweedfs/weed/security" ) @@ -31,14 +33,19 @@ type WebDavOption struct { BucketsPath string GrpcDialOption grpc.DialOption Collection string + Replication string + DiskType string Uid uint32 Gid uint32 + Cipher bool + CacheDir string + CacheSizeMB int64 } type WebDavServer struct { option *WebDavOption secret security.SigningKey - filer *filer2.Filer + filer *filer.Filer grpcDialOption grpc.DialOption Handler *webdav.Handler } @@ -49,7 +56,7 @@ func NewWebDavServer(option *WebDavOption) (ws *WebDavServer, err error) { ws = &WebDavServer{ option: option, - grpcDialOption: security.LoadClientTLS(viper.Sub("grpc"), "filer"), + grpcDialOption: security.LoadClientTLS(util.GetViper(), "grpc.filer"), Handler: &webdav.Handler{ FileSystem: fs, LockSystem: webdav.NewMemLS(), @@ -64,8 +71,10 @@ func NewWebDavServer(option *WebDavOption) (ws *WebDavServer, err error) { type WebDavFileSystem struct { option *WebDavOption secret security.SigningKey - filer *filer2.Filer + filer *filer.Filer grpcDialOption grpc.DialOption + chunkCache *chunk_cache.TieredChunkCache + signature int32 } type FileInfo struct { @@ -89,23 +98,40 @@ type WebDavFile struct { isDirectory bool off int64 entry *filer_pb.Entry - entryViewCache []filer2.VisibleInterval + entryViewCache []filer.VisibleInterval + reader io.ReaderAt + bufWriter *buffered_writer.BufferedWriteCloser + collection string + replication string } func NewWebDavFileSystem(option *WebDavOption) (webdav.FileSystem, error) { + + cacheUniqueId := util.Md5String([]byte("webdav" + option.FilerGrpcAddress + util.Version()))[0:8] + cacheDir := path.Join(option.CacheDir, cacheUniqueId) + + os.MkdirAll(cacheDir, os.FileMode(0755)) + chunkCache := chunk_cache.NewTieredChunkCache(256, cacheDir, option.CacheSizeMB, 1024*1024) return &WebDavFileSystem{ - option: option, + option: option, + chunkCache: chunkCache, + signature: util.RandomInt32(), }, nil } -func (fs *WebDavFileSystem) WithFilerClient(ctx context.Context, fn func(filer_pb.SeaweedFilerClient) error) error { +var _ = filer_pb.FilerClient(&WebDavFileSystem{}) - return util.WithCachedGrpcClient(ctx, func(grpcConnection *grpc.ClientConn) error { +func (fs *WebDavFileSystem) WithFilerClient(fn func(filer_pb.SeaweedFilerClient) error) error { + + return pb.WithCachedGrpcClient(func(grpcConnection *grpc.ClientConn) error { client := filer_pb.NewSeaweedFilerClient(grpcConnection) return fn(client) }, fs.option.FilerGrpcAddress, fs.option.GrpcDialOption) } +func (fs *WebDavFileSystem) AdjustedUrl(location *filer_pb.Location) string { + return location.Url +} func clearName(name string) (string, error) { slashed := strings.HasSuffix(name, "/") @@ -137,8 +163,8 @@ func (fs *WebDavFileSystem) Mkdir(ctx context.Context, fullDirPath string, perm return os.ErrExist } - return fs.WithFilerClient(ctx, func(client filer_pb.SeaweedFilerClient) error { - dir, name := filer2.FullPath(fullDirPath).DirAndName() + return fs.WithFilerClient(func(client filer_pb.SeaweedFilerClient) error { + dir, name := util.FullPath(fullDirPath).DirAndName() request := &filer_pb.CreateEntryRequest{ Directory: dir, Entry: &filer_pb.Entry{ @@ -152,10 +178,11 @@ func (fs *WebDavFileSystem) Mkdir(ctx context.Context, fullDirPath string, perm Gid: fs.option.Gid, }, }, + Signatures: []int32{fs.signature}, } glog.V(1).Infof("mkdir: %v", request) - if _, err := client.CreateEntry(ctx, request); err != nil { + if err := filer_pb.CreateEntry(client, request); err != nil { return fmt.Errorf("mkdir %s/%s: %v", dir, name, err) } @@ -185,9 +212,9 @@ func (fs *WebDavFileSystem) OpenFile(ctx context.Context, fullFilePath string, f fs.removeAll(ctx, fullFilePath) } - dir, name := filer2.FullPath(fullFilePath).DirAndName() - err = fs.WithFilerClient(ctx, func(client filer_pb.SeaweedFilerClient) error { - if _, err := client.CreateEntry(ctx, &filer_pb.CreateEntryRequest{ + dir, name := util.FullPath(fullFilePath).DirAndName() + err = fs.WithFilerClient(func(client filer_pb.SeaweedFilerClient) error { + if err := filer_pb.CreateEntry(client, &filer_pb.CreateEntryRequest{ Directory: dir, Entry: &filer_pb.Entry{ Name: name, @@ -199,10 +226,11 @@ func (fs *WebDavFileSystem) OpenFile(ctx context.Context, fullFilePath string, f Uid: fs.option.Uid, Gid: fs.option.Gid, Collection: fs.option.Collection, - Replication: "000", + Replication: fs.option.Replication, TtlSec: 0, }, }, + Signatures: []int32{fs.signature}, }); err != nil { return fmt.Errorf("create %s: %v", fullFilePath, err) } @@ -215,6 +243,7 @@ func (fs *WebDavFileSystem) OpenFile(ctx context.Context, fullFilePath string, f fs: fs, name: fullFilePath, isDirectory: false, + bufWriter: buffered_writer.NewBufferedWriteCloser(4 * 1024 * 1024), }, nil } @@ -230,6 +259,7 @@ func (fs *WebDavFileSystem) OpenFile(ctx context.Context, fullFilePath string, f fs: fs, name: fullFilePath, isDirectory: false, + bufWriter: buffered_writer.NewBufferedWriteCloser(4 * 1024 * 1024), }, nil } @@ -240,34 +270,10 @@ func (fs *WebDavFileSystem) removeAll(ctx context.Context, fullFilePath string) return err } - fi, err := fs.stat(ctx, fullFilePath) - if err != nil { - return err - } + dir, name := util.FullPath(fullFilePath).DirAndName() - if fi.IsDir() { - //_, err = fs.db.Exec(`delete from filesystem where fullFilePath like $1 escape '\'`, strings.Replace(fullFilePath, `%`, `\%`, -1)+`%`) - } else { - //_, err = fs.db.Exec(`delete from filesystem where fullFilePath = ?`, fullFilePath) - } - dir, name := filer2.FullPath(fullFilePath).DirAndName() - err = fs.WithFilerClient(ctx, func(client filer_pb.SeaweedFilerClient) error { - - request := &filer_pb.DeleteEntryRequest{ - Directory: dir, - Name: name, - IsDeleteData: true, - } + return filer_pb.Remove(fs, dir, name, true, false, false, false, []int32{fs.signature}) - glog.V(3).Infof("removing entry: %v", request) - _, err := client.DeleteEntry(ctx, request) - if err != nil { - return fmt.Errorf("remove %s: %v", fullFilePath, err) - } - - return nil - }) - return err } func (fs *WebDavFileSystem) RemoveAll(ctx context.Context, name string) error { @@ -307,10 +313,10 @@ func (fs *WebDavFileSystem) Rename(ctx context.Context, oldName, newName string) return os.ErrExist } - oldDir, oldBaseName := filer2.FullPath(oldName).DirAndName() - newDir, newBaseName := filer2.FullPath(newName).DirAndName() + oldDir, oldBaseName := util.FullPath(oldName).DirAndName() + newDir, newBaseName := util.FullPath(newName).DirAndName() - return fs.WithFilerClient(ctx, func(client filer_pb.SeaweedFilerClient) error { + return fs.WithFilerClient(func(client filer_pb.SeaweedFilerClient) error { request := &filer_pb.AtomicRenameEntryRequest{ OldDirectory: oldDir, @@ -335,23 +341,23 @@ func (fs *WebDavFileSystem) stat(ctx context.Context, fullFilePath string) (os.F return nil, err } + fullpath := util.FullPath(fullFilePath) + var fi FileInfo - entry, err := filer2.GetEntry(ctx, fs, fullFilePath) + entry, err := filer_pb.GetEntry(fs, fullpath) if entry == nil { return nil, os.ErrNotExist } if err != nil { return nil, err } - fi.size = int64(filer2.TotalSize(entry.GetChunks())) - fi.name = fullFilePath + fi.size = int64(filer.FileSize(entry)) + fi.name = string(fullpath) fi.mode = os.FileMode(entry.Attributes.FileMode) fi.modifiledTime = time.Unix(entry.Attributes.Mtime, 0) fi.isDirectory = entry.IsDirectory - _, fi.name = path.Split(path.Clean(fi.name)) - if fi.name == "" { - fi.name = "/" + if fi.name == "/" { fi.modifiledTime = time.Now() fi.isDirectory = true } @@ -365,32 +371,21 @@ func (fs *WebDavFileSystem) Stat(ctx context.Context, name string) (os.FileInfo, return fs.stat(ctx, name) } -func (f *WebDavFile) Write(buf []byte) (int, error) { - - glog.V(2).Infof("WebDavFileSystem.Write %v", f.name) - - var err error - ctx := context.Background() - if f.entry == nil { - f.entry, err = filer2.GetEntry(ctx, f.fs, f.name) - } - - if f.entry == nil { - return 0, err - } - if err != nil { - return 0, err - } +func (f *WebDavFile) saveDataAsChunk(reader io.Reader, name string, offset int64) (chunk *filer_pb.FileChunk, collection, replication string, err error) { var fileId, host string var auth security.EncodedJwt - if err = f.fs.WithFilerClient(ctx, func(client filer_pb.SeaweedFilerClient) error { + if flushErr := f.fs.WithFilerClient(func(client filer_pb.SeaweedFilerClient) error { + + ctx := context.Background() request := &filer_pb.AssignVolumeRequest{ Count: 1, - Replication: "000", + Replication: f.fs.option.Replication, Collection: f.fs.option.Collection, + DiskType: f.fs.option.DiskType, + Path: name, } resp, err := client.AssignVolume(ctx, request) @@ -398,79 +393,126 @@ func (f *WebDavFile) Write(buf []byte) (int, error) { glog.V(0).Infof("assign volume failure %v: %v", request, err) return err } + if resp.Error != "" { + return fmt.Errorf("assign volume failure %v: %v", request, resp.Error) + } fileId, host, auth = resp.FileId, resp.Url, security.EncodedJwt(resp.Auth) + f.collection, f.replication = resp.Collection, resp.Replication return nil - }); err != nil { - return 0, fmt.Errorf("filerGrpcAddress assign volume: %v", err) + }); flushErr != nil { + return nil, f.collection, f.replication, fmt.Errorf("filerGrpcAddress assign volume: %v", flushErr) } fileUrl := fmt.Sprintf("http://%s/%s", host, fileId) - bufReader := bytes.NewReader(buf) - uploadResult, err := operation.Upload(fileUrl, f.name, bufReader, false, "", nil, auth) - if err != nil { - glog.V(0).Infof("upload data %v to %s: %v", f.name, fileUrl, err) - return 0, fmt.Errorf("upload data: %v", err) + uploadResult, flushErr, _ := operation.Upload(fileUrl, f.name, f.fs.option.Cipher, reader, false, "", nil, auth) + if flushErr != nil { + glog.V(0).Infof("upload data %v to %s: %v", f.name, fileUrl, flushErr) + return nil, f.collection, f.replication, fmt.Errorf("upload data: %v", flushErr) } if uploadResult.Error != "" { - glog.V(0).Infof("upload failure %v to %s: %v", f.name, fileUrl, err) - return 0, fmt.Errorf("upload result: %v", uploadResult.Error) + glog.V(0).Infof("upload failure %v to %s: %v", f.name, fileUrl, flushErr) + return nil, f.collection, f.replication, fmt.Errorf("upload result: %v", uploadResult.Error) + } + return uploadResult.ToPbFileChunk(fileId, offset), f.collection, f.replication, nil +} + +func (f *WebDavFile) Write(buf []byte) (int, error) { + + glog.V(2).Infof("WebDavFileSystem.Write %v", f.name) + + dir, _ := util.FullPath(f.name).DirAndName() + + var getErr error + ctx := context.Background() + if f.entry == nil { + f.entry, getErr = filer_pb.GetEntry(f.fs, util.FullPath(f.name)) } - chunk := &filer_pb.FileChunk{ - FileId: fileId, - Offset: f.off, - Size: uint64(len(buf)), - Mtime: time.Now().UnixNano(), - ETag: uploadResult.ETag, + if f.entry == nil { + return 0, getErr + } + if getErr != nil { + return 0, getErr } - f.entry.Chunks = append(f.entry.Chunks, chunk) - dir, _ := filer2.FullPath(f.name).DirAndName() + if f.bufWriter.FlushFunc == nil { + f.bufWriter.FlushFunc = func(data []byte, offset int64) (flushErr error) { - err = f.fs.WithFilerClient(ctx, func(client filer_pb.SeaweedFilerClient) error { - f.entry.Attributes.Mtime = time.Now().Unix() + var chunk *filer_pb.FileChunk + chunk, f.collection, f.replication, flushErr = f.saveDataAsChunk(bytes.NewReader(data), f.name, offset) - request := &filer_pb.UpdateEntryRequest{ - Directory: dir, - Entry: f.entry, + if flushErr != nil { + return fmt.Errorf("%s upload result: %v", f.name, flushErr) + } + + f.entry.Content = nil + f.entry.Chunks = append(f.entry.Chunks, chunk) + + return flushErr } + f.bufWriter.CloseFunc = func() error { + + manifestedChunks, manifestErr := filer.MaybeManifestize(f.saveDataAsChunk, f.entry.Chunks) + if manifestErr != nil { + // not good, but should be ok + glog.V(0).Infof("file %s close MaybeManifestize: %v", f.name, manifestErr) + } else { + f.entry.Chunks = manifestedChunks + } - if _, err := client.UpdateEntry(ctx, request); err != nil { - return fmt.Errorf("update %s: %v", f.name, err) + flushErr := f.fs.WithFilerClient(func(client filer_pb.SeaweedFilerClient) error { + f.entry.Attributes.Mtime = time.Now().Unix() + f.entry.Attributes.Collection = f.collection + f.entry.Attributes.Replication = f.replication + + request := &filer_pb.UpdateEntryRequest{ + Directory: dir, + Entry: f.entry, + Signatures: []int32{f.fs.signature}, + } + + if _, err := client.UpdateEntry(ctx, request); err != nil { + return fmt.Errorf("update %s: %v", f.name, err) + } + + return nil + }) + return flushErr } + } - return nil - }) + written, err := f.bufWriter.Write(buf) if err == nil { glog.V(3).Infof("WebDavFileSystem.Write %v: written [%d,%d)", f.name, f.off, f.off+int64(len(buf))) - f.off += int64(len(buf)) + f.off += int64(written) } - return len(buf), err + return written, err } func (f *WebDavFile) Close() error { glog.V(2).Infof("WebDavFileSystem.Close %v", f.name) + err := f.bufWriter.Close() + if f.entry != nil { f.entry = nil f.entryViewCache = nil } - return nil + return err } func (f *WebDavFile) Read(p []byte) (readSize int, err error) { glog.V(2).Infof("WebDavFileSystem.Read %v", f.name) - ctx := context.Background() if f.entry == nil { - f.entry, err = filer2.GetEntry(ctx, f.fs, f.name) + f.entry, err = filer_pb.GetEntry(f.fs, util.FullPath(f.name)) } if f.entry == nil { return 0, err @@ -478,43 +520,41 @@ func (f *WebDavFile) Read(p []byte) (readSize int, err error) { if err != nil { return 0, err } - if len(f.entry.Chunks) == 0 { + fileSize := int64(filer.FileSize(f.entry)) + if fileSize == 0 { return 0, io.EOF } if f.entryViewCache == nil { - f.entryViewCache = filer2.NonOverlappingVisibleIntervals(f.entry.Chunks) + f.entryViewCache, _ = filer.NonOverlappingVisibleIntervals(filer.LookupFn(f.fs), f.entry.Chunks) + f.reader = nil } - chunkViews := filer2.ViewFromVisibleIntervals(f.entryViewCache, f.off, len(p)) - - totalRead, err := filer2.ReadIntoBuffer(ctx, f.fs, f.name, p, chunkViews, f.off) - if err != nil { - return 0, err + if f.reader == nil { + chunkViews := filer.ViewFromVisibleIntervals(f.entryViewCache, 0, math.MaxInt64) + f.reader = filer.NewChunkReaderAtFromClient(filer.LookupFn(f.fs), chunkViews, f.fs.chunkCache, fileSize) } - readSize = int(totalRead) - glog.V(3).Infof("WebDavFileSystem.Read %v: [%d,%d)", f.name, f.off, f.off+totalRead) + readSize, err = f.reader.ReadAt(p, f.off) - f.off += totalRead - if readSize == 0 { - return 0, io.EOF + glog.V(3).Infof("WebDavFileSystem.Read %v: [%d,%d)", f.name, f.off, f.off+int64(readSize)) + f.off += int64(readSize) + + if err != nil && err != io.EOF { + glog.Errorf("file read %s: %v", f.name, err) } return + } func (f *WebDavFile) Readdir(count int) (ret []os.FileInfo, err error) { glog.V(2).Infof("WebDavFileSystem.Readdir %v count %d", f.name, count) - ctx := context.Background() - dir := f.name - if dir != "/" && strings.HasSuffix(dir, "/") { - dir = dir[:len(dir)-1] - } + dir, _ := util.FullPath(f.name).DirAndName() - err = filer2.ReadDirAllEntries(ctx, f.fs, dir, "", func(entry *filer_pb.Entry, isLast bool) { + err = filer_pb.ReadDirAllEntries(f.fs, util.FullPath(dir), "", func(entry *filer_pb.Entry, isLast bool) error { fi := FileInfo{ - size: int64(filer2.TotalSize(entry.GetChunks())), + size: int64(filer.FileSize(entry)), name: entry.Name, mode: os.FileMode(entry.Attributes.FileMode), modifiledTime: time.Unix(entry.Attributes.Mtime, 0), @@ -526,6 +566,7 @@ func (f *WebDavFile) Readdir(count int) (ret []os.FileInfo, err error) { } glog.V(4).Infof("entry: %v", fi.name) ret = append(ret, &fi) + return nil }) old := f.off @@ -556,9 +597,9 @@ func (f *WebDavFile) Seek(offset int64, whence int) (int64, error) { var err error switch whence { - case 0: + case io.SeekStart: f.off = 0 - case 2: + case io.SeekEnd: if fi, err := f.fs.stat(ctx, f.name); err != nil { return 0, err } else { |
