aboutsummaryrefslogtreecommitdiff
path: root/weed/server/webdav_server.go
diff options
context:
space:
mode:
Diffstat (limited to 'weed/server/webdav_server.go')
-rw-r--r--weed/server/webdav_server.go285
1 files changed, 163 insertions, 122 deletions
diff --git a/weed/server/webdav_server.go b/weed/server/webdav_server.go
index abd0b66eb..c3f68fdee 100644
--- a/weed/server/webdav_server.go
+++ b/weed/server/webdav_server.go
@@ -5,21 +5,23 @@ import (
"context"
"fmt"
"io"
+ "math"
"os"
"path"
"strings"
"time"
+ "github.com/chrislusf/seaweedfs/weed/util/buffered_writer"
"golang.org/x/net/webdav"
"google.golang.org/grpc"
"github.com/chrislusf/seaweedfs/weed/operation"
+ "github.com/chrislusf/seaweedfs/weed/pb"
"github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
"github.com/chrislusf/seaweedfs/weed/util"
+ "github.com/chrislusf/seaweedfs/weed/util/chunk_cache"
- "github.com/spf13/viper"
-
- "github.com/chrislusf/seaweedfs/weed/filer2"
+ "github.com/chrislusf/seaweedfs/weed/filer"
"github.com/chrislusf/seaweedfs/weed/glog"
"github.com/chrislusf/seaweedfs/weed/security"
)
@@ -31,14 +33,19 @@ type WebDavOption struct {
BucketsPath string
GrpcDialOption grpc.DialOption
Collection string
+ Replication string
+ DiskType string
Uid uint32
Gid uint32
+ Cipher bool
+ CacheDir string
+ CacheSizeMB int64
}
type WebDavServer struct {
option *WebDavOption
secret security.SigningKey
- filer *filer2.Filer
+ filer *filer.Filer
grpcDialOption grpc.DialOption
Handler *webdav.Handler
}
@@ -49,7 +56,7 @@ func NewWebDavServer(option *WebDavOption) (ws *WebDavServer, err error) {
ws = &WebDavServer{
option: option,
- grpcDialOption: security.LoadClientTLS(viper.Sub("grpc"), "filer"),
+ grpcDialOption: security.LoadClientTLS(util.GetViper(), "grpc.filer"),
Handler: &webdav.Handler{
FileSystem: fs,
LockSystem: webdav.NewMemLS(),
@@ -64,8 +71,10 @@ func NewWebDavServer(option *WebDavOption) (ws *WebDavServer, err error) {
type WebDavFileSystem struct {
option *WebDavOption
secret security.SigningKey
- filer *filer2.Filer
+ filer *filer.Filer
grpcDialOption grpc.DialOption
+ chunkCache *chunk_cache.TieredChunkCache
+ signature int32
}
type FileInfo struct {
@@ -89,23 +98,40 @@ type WebDavFile struct {
isDirectory bool
off int64
entry *filer_pb.Entry
- entryViewCache []filer2.VisibleInterval
+ entryViewCache []filer.VisibleInterval
+ reader io.ReaderAt
+ bufWriter *buffered_writer.BufferedWriteCloser
+ collection string
+ replication string
}
func NewWebDavFileSystem(option *WebDavOption) (webdav.FileSystem, error) {
+
+ cacheUniqueId := util.Md5String([]byte("webdav" + option.FilerGrpcAddress + util.Version()))[0:8]
+ cacheDir := path.Join(option.CacheDir, cacheUniqueId)
+
+ os.MkdirAll(cacheDir, os.FileMode(0755))
+ chunkCache := chunk_cache.NewTieredChunkCache(256, cacheDir, option.CacheSizeMB, 1024*1024)
return &WebDavFileSystem{
- option: option,
+ option: option,
+ chunkCache: chunkCache,
+ signature: util.RandomInt32(),
}, nil
}
-func (fs *WebDavFileSystem) WithFilerClient(ctx context.Context, fn func(filer_pb.SeaweedFilerClient) error) error {
+var _ = filer_pb.FilerClient(&WebDavFileSystem{})
- return util.WithCachedGrpcClient(ctx, func(grpcConnection *grpc.ClientConn) error {
+func (fs *WebDavFileSystem) WithFilerClient(fn func(filer_pb.SeaweedFilerClient) error) error {
+
+ return pb.WithCachedGrpcClient(func(grpcConnection *grpc.ClientConn) error {
client := filer_pb.NewSeaweedFilerClient(grpcConnection)
return fn(client)
}, fs.option.FilerGrpcAddress, fs.option.GrpcDialOption)
}
+func (fs *WebDavFileSystem) AdjustedUrl(location *filer_pb.Location) string {
+ return location.Url
+}
func clearName(name string) (string, error) {
slashed := strings.HasSuffix(name, "/")
@@ -137,8 +163,8 @@ func (fs *WebDavFileSystem) Mkdir(ctx context.Context, fullDirPath string, perm
return os.ErrExist
}
- return fs.WithFilerClient(ctx, func(client filer_pb.SeaweedFilerClient) error {
- dir, name := filer2.FullPath(fullDirPath).DirAndName()
+ return fs.WithFilerClient(func(client filer_pb.SeaweedFilerClient) error {
+ dir, name := util.FullPath(fullDirPath).DirAndName()
request := &filer_pb.CreateEntryRequest{
Directory: dir,
Entry: &filer_pb.Entry{
@@ -152,10 +178,11 @@ func (fs *WebDavFileSystem) Mkdir(ctx context.Context, fullDirPath string, perm
Gid: fs.option.Gid,
},
},
+ Signatures: []int32{fs.signature},
}
glog.V(1).Infof("mkdir: %v", request)
- if _, err := client.CreateEntry(ctx, request); err != nil {
+ if err := filer_pb.CreateEntry(client, request); err != nil {
return fmt.Errorf("mkdir %s/%s: %v", dir, name, err)
}
@@ -185,9 +212,9 @@ func (fs *WebDavFileSystem) OpenFile(ctx context.Context, fullFilePath string, f
fs.removeAll(ctx, fullFilePath)
}
- dir, name := filer2.FullPath(fullFilePath).DirAndName()
- err = fs.WithFilerClient(ctx, func(client filer_pb.SeaweedFilerClient) error {
- if _, err := client.CreateEntry(ctx, &filer_pb.CreateEntryRequest{
+ dir, name := util.FullPath(fullFilePath).DirAndName()
+ err = fs.WithFilerClient(func(client filer_pb.SeaweedFilerClient) error {
+ if err := filer_pb.CreateEntry(client, &filer_pb.CreateEntryRequest{
Directory: dir,
Entry: &filer_pb.Entry{
Name: name,
@@ -199,10 +226,11 @@ func (fs *WebDavFileSystem) OpenFile(ctx context.Context, fullFilePath string, f
Uid: fs.option.Uid,
Gid: fs.option.Gid,
Collection: fs.option.Collection,
- Replication: "000",
+ Replication: fs.option.Replication,
TtlSec: 0,
},
},
+ Signatures: []int32{fs.signature},
}); err != nil {
return fmt.Errorf("create %s: %v", fullFilePath, err)
}
@@ -215,6 +243,7 @@ func (fs *WebDavFileSystem) OpenFile(ctx context.Context, fullFilePath string, f
fs: fs,
name: fullFilePath,
isDirectory: false,
+ bufWriter: buffered_writer.NewBufferedWriteCloser(4 * 1024 * 1024),
}, nil
}
@@ -230,6 +259,7 @@ func (fs *WebDavFileSystem) OpenFile(ctx context.Context, fullFilePath string, f
fs: fs,
name: fullFilePath,
isDirectory: false,
+ bufWriter: buffered_writer.NewBufferedWriteCloser(4 * 1024 * 1024),
}, nil
}
@@ -240,34 +270,10 @@ func (fs *WebDavFileSystem) removeAll(ctx context.Context, fullFilePath string)
return err
}
- fi, err := fs.stat(ctx, fullFilePath)
- if err != nil {
- return err
- }
+ dir, name := util.FullPath(fullFilePath).DirAndName()
- if fi.IsDir() {
- //_, err = fs.db.Exec(`delete from filesystem where fullFilePath like $1 escape '\'`, strings.Replace(fullFilePath, `%`, `\%`, -1)+`%`)
- } else {
- //_, err = fs.db.Exec(`delete from filesystem where fullFilePath = ?`, fullFilePath)
- }
- dir, name := filer2.FullPath(fullFilePath).DirAndName()
- err = fs.WithFilerClient(ctx, func(client filer_pb.SeaweedFilerClient) error {
-
- request := &filer_pb.DeleteEntryRequest{
- Directory: dir,
- Name: name,
- IsDeleteData: true,
- }
+ return filer_pb.Remove(fs, dir, name, true, false, false, false, []int32{fs.signature})
- glog.V(3).Infof("removing entry: %v", request)
- _, err := client.DeleteEntry(ctx, request)
- if err != nil {
- return fmt.Errorf("remove %s: %v", fullFilePath, err)
- }
-
- return nil
- })
- return err
}
func (fs *WebDavFileSystem) RemoveAll(ctx context.Context, name string) error {
@@ -307,10 +313,10 @@ func (fs *WebDavFileSystem) Rename(ctx context.Context, oldName, newName string)
return os.ErrExist
}
- oldDir, oldBaseName := filer2.FullPath(oldName).DirAndName()
- newDir, newBaseName := filer2.FullPath(newName).DirAndName()
+ oldDir, oldBaseName := util.FullPath(oldName).DirAndName()
+ newDir, newBaseName := util.FullPath(newName).DirAndName()
- return fs.WithFilerClient(ctx, func(client filer_pb.SeaweedFilerClient) error {
+ return fs.WithFilerClient(func(client filer_pb.SeaweedFilerClient) error {
request := &filer_pb.AtomicRenameEntryRequest{
OldDirectory: oldDir,
@@ -335,23 +341,23 @@ func (fs *WebDavFileSystem) stat(ctx context.Context, fullFilePath string) (os.F
return nil, err
}
+ fullpath := util.FullPath(fullFilePath)
+
var fi FileInfo
- entry, err := filer2.GetEntry(ctx, fs, fullFilePath)
+ entry, err := filer_pb.GetEntry(fs, fullpath)
if entry == nil {
return nil, os.ErrNotExist
}
if err != nil {
return nil, err
}
- fi.size = int64(filer2.TotalSize(entry.GetChunks()))
- fi.name = fullFilePath
+ fi.size = int64(filer.FileSize(entry))
+ fi.name = string(fullpath)
fi.mode = os.FileMode(entry.Attributes.FileMode)
fi.modifiledTime = time.Unix(entry.Attributes.Mtime, 0)
fi.isDirectory = entry.IsDirectory
- _, fi.name = path.Split(path.Clean(fi.name))
- if fi.name == "" {
- fi.name = "/"
+ if fi.name == "/" {
fi.modifiledTime = time.Now()
fi.isDirectory = true
}
@@ -365,32 +371,21 @@ func (fs *WebDavFileSystem) Stat(ctx context.Context, name string) (os.FileInfo,
return fs.stat(ctx, name)
}
-func (f *WebDavFile) Write(buf []byte) (int, error) {
-
- glog.V(2).Infof("WebDavFileSystem.Write %v", f.name)
-
- var err error
- ctx := context.Background()
- if f.entry == nil {
- f.entry, err = filer2.GetEntry(ctx, f.fs, f.name)
- }
-
- if f.entry == nil {
- return 0, err
- }
- if err != nil {
- return 0, err
- }
+func (f *WebDavFile) saveDataAsChunk(reader io.Reader, name string, offset int64) (chunk *filer_pb.FileChunk, collection, replication string, err error) {
var fileId, host string
var auth security.EncodedJwt
- if err = f.fs.WithFilerClient(ctx, func(client filer_pb.SeaweedFilerClient) error {
+ if flushErr := f.fs.WithFilerClient(func(client filer_pb.SeaweedFilerClient) error {
+
+ ctx := context.Background()
request := &filer_pb.AssignVolumeRequest{
Count: 1,
- Replication: "000",
+ Replication: f.fs.option.Replication,
Collection: f.fs.option.Collection,
+ DiskType: f.fs.option.DiskType,
+ Path: name,
}
resp, err := client.AssignVolume(ctx, request)
@@ -398,79 +393,126 @@ func (f *WebDavFile) Write(buf []byte) (int, error) {
glog.V(0).Infof("assign volume failure %v: %v", request, err)
return err
}
+ if resp.Error != "" {
+ return fmt.Errorf("assign volume failure %v: %v", request, resp.Error)
+ }
fileId, host, auth = resp.FileId, resp.Url, security.EncodedJwt(resp.Auth)
+ f.collection, f.replication = resp.Collection, resp.Replication
return nil
- }); err != nil {
- return 0, fmt.Errorf("filerGrpcAddress assign volume: %v", err)
+ }); flushErr != nil {
+ return nil, f.collection, f.replication, fmt.Errorf("filerGrpcAddress assign volume: %v", flushErr)
}
fileUrl := fmt.Sprintf("http://%s/%s", host, fileId)
- bufReader := bytes.NewReader(buf)
- uploadResult, err := operation.Upload(fileUrl, f.name, bufReader, false, "", nil, auth)
- if err != nil {
- glog.V(0).Infof("upload data %v to %s: %v", f.name, fileUrl, err)
- return 0, fmt.Errorf("upload data: %v", err)
+ uploadResult, flushErr, _ := operation.Upload(fileUrl, f.name, f.fs.option.Cipher, reader, false, "", nil, auth)
+ if flushErr != nil {
+ glog.V(0).Infof("upload data %v to %s: %v", f.name, fileUrl, flushErr)
+ return nil, f.collection, f.replication, fmt.Errorf("upload data: %v", flushErr)
}
if uploadResult.Error != "" {
- glog.V(0).Infof("upload failure %v to %s: %v", f.name, fileUrl, err)
- return 0, fmt.Errorf("upload result: %v", uploadResult.Error)
+ glog.V(0).Infof("upload failure %v to %s: %v", f.name, fileUrl, flushErr)
+ return nil, f.collection, f.replication, fmt.Errorf("upload result: %v", uploadResult.Error)
+ }
+ return uploadResult.ToPbFileChunk(fileId, offset), f.collection, f.replication, nil
+}
+
+func (f *WebDavFile) Write(buf []byte) (int, error) {
+
+ glog.V(2).Infof("WebDavFileSystem.Write %v", f.name)
+
+ dir, _ := util.FullPath(f.name).DirAndName()
+
+ var getErr error
+ ctx := context.Background()
+ if f.entry == nil {
+ f.entry, getErr = filer_pb.GetEntry(f.fs, util.FullPath(f.name))
}
- chunk := &filer_pb.FileChunk{
- FileId: fileId,
- Offset: f.off,
- Size: uint64(len(buf)),
- Mtime: time.Now().UnixNano(),
- ETag: uploadResult.ETag,
+ if f.entry == nil {
+ return 0, getErr
+ }
+ if getErr != nil {
+ return 0, getErr
}
- f.entry.Chunks = append(f.entry.Chunks, chunk)
- dir, _ := filer2.FullPath(f.name).DirAndName()
+ if f.bufWriter.FlushFunc == nil {
+ f.bufWriter.FlushFunc = func(data []byte, offset int64) (flushErr error) {
- err = f.fs.WithFilerClient(ctx, func(client filer_pb.SeaweedFilerClient) error {
- f.entry.Attributes.Mtime = time.Now().Unix()
+ var chunk *filer_pb.FileChunk
+ chunk, f.collection, f.replication, flushErr = f.saveDataAsChunk(bytes.NewReader(data), f.name, offset)
- request := &filer_pb.UpdateEntryRequest{
- Directory: dir,
- Entry: f.entry,
+ if flushErr != nil {
+ return fmt.Errorf("%s upload result: %v", f.name, flushErr)
+ }
+
+ f.entry.Content = nil
+ f.entry.Chunks = append(f.entry.Chunks, chunk)
+
+ return flushErr
}
+ f.bufWriter.CloseFunc = func() error {
+
+ manifestedChunks, manifestErr := filer.MaybeManifestize(f.saveDataAsChunk, f.entry.Chunks)
+ if manifestErr != nil {
+ // not good, but should be ok
+ glog.V(0).Infof("file %s close MaybeManifestize: %v", f.name, manifestErr)
+ } else {
+ f.entry.Chunks = manifestedChunks
+ }
- if _, err := client.UpdateEntry(ctx, request); err != nil {
- return fmt.Errorf("update %s: %v", f.name, err)
+ flushErr := f.fs.WithFilerClient(func(client filer_pb.SeaweedFilerClient) error {
+ f.entry.Attributes.Mtime = time.Now().Unix()
+ f.entry.Attributes.Collection = f.collection
+ f.entry.Attributes.Replication = f.replication
+
+ request := &filer_pb.UpdateEntryRequest{
+ Directory: dir,
+ Entry: f.entry,
+ Signatures: []int32{f.fs.signature},
+ }
+
+ if _, err := client.UpdateEntry(ctx, request); err != nil {
+ return fmt.Errorf("update %s: %v", f.name, err)
+ }
+
+ return nil
+ })
+ return flushErr
}
+ }
- return nil
- })
+ written, err := f.bufWriter.Write(buf)
if err == nil {
glog.V(3).Infof("WebDavFileSystem.Write %v: written [%d,%d)", f.name, f.off, f.off+int64(len(buf)))
- f.off += int64(len(buf))
+ f.off += int64(written)
}
- return len(buf), err
+ return written, err
}
func (f *WebDavFile) Close() error {
glog.V(2).Infof("WebDavFileSystem.Close %v", f.name)
+ err := f.bufWriter.Close()
+
if f.entry != nil {
f.entry = nil
f.entryViewCache = nil
}
- return nil
+ return err
}
func (f *WebDavFile) Read(p []byte) (readSize int, err error) {
glog.V(2).Infof("WebDavFileSystem.Read %v", f.name)
- ctx := context.Background()
if f.entry == nil {
- f.entry, err = filer2.GetEntry(ctx, f.fs, f.name)
+ f.entry, err = filer_pb.GetEntry(f.fs, util.FullPath(f.name))
}
if f.entry == nil {
return 0, err
@@ -478,43 +520,41 @@ func (f *WebDavFile) Read(p []byte) (readSize int, err error) {
if err != nil {
return 0, err
}
- if len(f.entry.Chunks) == 0 {
+ fileSize := int64(filer.FileSize(f.entry))
+ if fileSize == 0 {
return 0, io.EOF
}
if f.entryViewCache == nil {
- f.entryViewCache = filer2.NonOverlappingVisibleIntervals(f.entry.Chunks)
+ f.entryViewCache, _ = filer.NonOverlappingVisibleIntervals(filer.LookupFn(f.fs), f.entry.Chunks)
+ f.reader = nil
}
- chunkViews := filer2.ViewFromVisibleIntervals(f.entryViewCache, f.off, len(p))
-
- totalRead, err := filer2.ReadIntoBuffer(ctx, f.fs, f.name, p, chunkViews, f.off)
- if err != nil {
- return 0, err
+ if f.reader == nil {
+ chunkViews := filer.ViewFromVisibleIntervals(f.entryViewCache, 0, math.MaxInt64)
+ f.reader = filer.NewChunkReaderAtFromClient(filer.LookupFn(f.fs), chunkViews, f.fs.chunkCache, fileSize)
}
- readSize = int(totalRead)
- glog.V(3).Infof("WebDavFileSystem.Read %v: [%d,%d)", f.name, f.off, f.off+totalRead)
+ readSize, err = f.reader.ReadAt(p, f.off)
- f.off += totalRead
- if readSize == 0 {
- return 0, io.EOF
+ glog.V(3).Infof("WebDavFileSystem.Read %v: [%d,%d)", f.name, f.off, f.off+int64(readSize))
+ f.off += int64(readSize)
+
+ if err != nil && err != io.EOF {
+ glog.Errorf("file read %s: %v", f.name, err)
}
return
+
}
func (f *WebDavFile) Readdir(count int) (ret []os.FileInfo, err error) {
glog.V(2).Infof("WebDavFileSystem.Readdir %v count %d", f.name, count)
- ctx := context.Background()
- dir := f.name
- if dir != "/" && strings.HasSuffix(dir, "/") {
- dir = dir[:len(dir)-1]
- }
+ dir, _ := util.FullPath(f.name).DirAndName()
- err = filer2.ReadDirAllEntries(ctx, f.fs, dir, "", func(entry *filer_pb.Entry, isLast bool) {
+ err = filer_pb.ReadDirAllEntries(f.fs, util.FullPath(dir), "", func(entry *filer_pb.Entry, isLast bool) error {
fi := FileInfo{
- size: int64(filer2.TotalSize(entry.GetChunks())),
+ size: int64(filer.FileSize(entry)),
name: entry.Name,
mode: os.FileMode(entry.Attributes.FileMode),
modifiledTime: time.Unix(entry.Attributes.Mtime, 0),
@@ -526,6 +566,7 @@ func (f *WebDavFile) Readdir(count int) (ret []os.FileInfo, err error) {
}
glog.V(4).Infof("entry: %v", fi.name)
ret = append(ret, &fi)
+ return nil
})
old := f.off
@@ -556,9 +597,9 @@ func (f *WebDavFile) Seek(offset int64, whence int) (int64, error) {
var err error
switch whence {
- case 0:
+ case io.SeekStart:
f.off = 0
- case 2:
+ case io.SeekEnd:
if fi, err := f.fs.stat(ctx, f.name); err != nil {
return 0, err
} else {