diff options
Diffstat (limited to 'weed/filesys')
| -rw-r--r-- | weed/filesys/dirty_page.go | 50 | ||||
| -rw-r--r-- | weed/filesys/file.go | 2 | ||||
| -rw-r--r-- | weed/filesys/filehandle.go | 13 | ||||
| -rw-r--r-- | weed/filesys/meta_cache/meta_cache.go | 23 | ||||
| -rw-r--r-- | weed/filesys/wfs_write.go | 66 |
5 files changed, 103 insertions, 51 deletions
diff --git a/weed/filesys/dirty_page.go b/weed/filesys/dirty_page.go index 45224b3e7..46d20e466 100644 --- a/weed/filesys/dirty_page.go +++ b/weed/filesys/dirty_page.go @@ -2,16 +2,12 @@ package filesys import ( "bytes" - "context" - "fmt" "io" "sync" "time" "github.com/chrislusf/seaweedfs/weed/glog" - "github.com/chrislusf/seaweedfs/weed/operation" "github.com/chrislusf/seaweedfs/weed/pb/filer_pb" - "github.com/chrislusf/seaweedfs/weed/security" ) type ContinuousDirtyPages struct { @@ -141,53 +137,15 @@ func (pages *ContinuousDirtyPages) saveExistingLargestPageToStorage() (chunk *fi func (pages *ContinuousDirtyPages) saveToStorage(reader io.Reader, offset int64, size int64) (*filer_pb.FileChunk, error) { - var fileId, host string - var auth security.EncodedJwt - dir, _ := pages.f.fullpath().DirAndName() - if err := pages.f.wfs.WithFilerClient(func(client filer_pb.SeaweedFilerClient) error { - - request := &filer_pb.AssignVolumeRequest{ - Count: 1, - Replication: pages.f.wfs.option.Replication, - Collection: pages.f.wfs.option.Collection, - TtlSec: pages.f.wfs.option.TtlSec, - DataCenter: pages.f.wfs.option.DataCenter, - ParentPath: dir, - } - - resp, err := client.AssignVolume(context.Background(), request) - if err != nil { - glog.V(0).Infof("assign volume failure %v: %v", request, err) - return err - } - if resp.Error != "" { - return fmt.Errorf("assign volume failure %v: %v", request, resp.Error) - } - - fileId, host, auth = resp.FileId, resp.Url, security.EncodedJwt(resp.Auth) - host = pages.f.wfs.AdjustedUrl(host) - pages.collection, pages.replication = resp.Collection, resp.Replication - - return nil - }); err != nil { - return nil, fmt.Errorf("filerGrpcAddress assign volume: %v", err) - } - - fileUrl := fmt.Sprintf("http://%s/%s", host, fileId) - uploadResult, err, data := operation.Upload(fileUrl, pages.f.Name, pages.f.wfs.option.Cipher, reader, false, "", nil, auth) + chunk, collection, replication, err := pages.f.wfs.saveDataAsChunk(dir)(reader, pages.f.Name, offset) if err != nil { - glog.V(0).Infof("upload data %v to %s: %v", pages.f.Name, fileUrl, err) - return nil, fmt.Errorf("upload data: %v", err) - } - if uploadResult.Error != "" { - glog.V(0).Infof("upload failure %v to %s: %v", pages.f.Name, fileUrl, err) - return nil, fmt.Errorf("upload result: %v", uploadResult.Error) + return nil, err } - pages.f.wfs.chunkCache.SetChunk(fileId, data) + pages.collection, pages.replication = collection, replication - return uploadResult.ToPbFileChunk(fileId, offset), nil + return chunk, nil } diff --git a/weed/filesys/file.go b/weed/filesys/file.go index 4a6bc9a8a..dcda93522 100644 --- a/weed/filesys/file.go +++ b/weed/filesys/file.go @@ -253,7 +253,7 @@ func (file *File) addChunks(chunks []*filer_pb.FileChunk) { func (file *File) setEntry(entry *filer_pb.Entry) { file.entry = entry - file.entryViewCache = filer2.NonOverlappingVisibleIntervals(file.entry.Chunks) + file.entryViewCache, _ = filer2.NonOverlappingVisibleIntervals(filer2.LookupFn(file.wfs), file.entry.Chunks) file.reader = nil } diff --git a/weed/filesys/filehandle.go b/weed/filesys/filehandle.go index 9b9df916c..31fd08f97 100644 --- a/weed/filesys/filehandle.go +++ b/weed/filesys/filehandle.go @@ -88,8 +88,12 @@ func (fh *FileHandle) readFromChunks(buff []byte, offset int64) (int64, error) { return 0, nil } + var chunkResolveErr error if fh.f.entryViewCache == nil { - fh.f.entryViewCache = filer2.NonOverlappingVisibleIntervals(fh.f.entry.Chunks) + fh.f.entryViewCache, chunkResolveErr = filer2.NonOverlappingVisibleIntervals(filer2.LookupFn(fh.f.wfs), fh.f.entry.Chunks) + if chunkResolveErr != nil { + return 0, fmt.Errorf("fail to resolve chunk manifest: %v", chunkResolveErr) + } fh.f.reader = nil } @@ -206,7 +210,12 @@ func (fh *FileHandle) Flush(ctx context.Context, req *fuse.FlushRequest) error { glog.V(3).Infof("%s chunks %d: %v [%d,%d)", fh.f.fullpath(), i, chunk.FileId, chunk.Offset, chunk.Offset+int64(chunk.Size)) } - chunks, garbages := filer2.CompactFileChunks(fh.f.entry.Chunks) + chunks, garbages := filer2.CompactFileChunks(filer2.LookupFn(fh.f.wfs), fh.f.entry.Chunks) + chunks, manifestErr := filer2.MaybeManifestize(fh.f.wfs.saveDataAsChunk(fh.f.dir.FullPath()), chunks) + if manifestErr != nil { + // not good, but should be ok + glog.V(0).Infof("MaybeManifestize: %v", manifestErr) + } fh.f.entry.Chunks = chunks // fh.f.entryViewCache = nil diff --git a/weed/filesys/meta_cache/meta_cache.go b/weed/filesys/meta_cache/meta_cache.go index fdb486ba4..edf329143 100644 --- a/weed/filesys/meta_cache/meta_cache.go +++ b/weed/filesys/meta_cache/meta_cache.go @@ -8,10 +8,14 @@ import ( "github.com/chrislusf/seaweedfs/weed/filer2" "github.com/chrislusf/seaweedfs/weed/filer2/leveldb" "github.com/chrislusf/seaweedfs/weed/glog" + "github.com/chrislusf/seaweedfs/weed/pb/filer_pb" "github.com/chrislusf/seaweedfs/weed/util" "github.com/chrislusf/seaweedfs/weed/util/bounded_tree" ) +// need to have logic similar to FilerStoreWrapper +// e.g. fill fileId field for chunks + type MetaCache struct { actualStore filer2.FilerStore sync.RWMutex @@ -46,6 +50,7 @@ func openMetaStore(dbFolder string) filer2.FilerStore { func (mc *MetaCache) InsertEntry(ctx context.Context, entry *filer2.Entry) error { mc.Lock() defer mc.Unlock() + filer_pb.BeforeEntrySerialization(entry.Chunks) return mc.actualStore.InsertEntry(ctx, entry) } @@ -78,13 +83,19 @@ func (mc *MetaCache) AtomicUpdateEntry(ctx context.Context, oldPath util.FullPat func (mc *MetaCache) UpdateEntry(ctx context.Context, entry *filer2.Entry) error { mc.Lock() defer mc.Unlock() + filer_pb.BeforeEntrySerialization(entry.Chunks) return mc.actualStore.UpdateEntry(ctx, entry) } func (mc *MetaCache) FindEntry(ctx context.Context, fp util.FullPath) (entry *filer2.Entry, err error) { mc.RLock() defer mc.RUnlock() - return mc.actualStore.FindEntry(ctx, fp) + entry, err = mc.actualStore.FindEntry(ctx, fp) + if err != nil { + return nil, err + } + filer_pb.AfterEntryDeserialization(entry.Chunks) + return } func (mc *MetaCache) DeleteEntry(ctx context.Context, fp util.FullPath) (err error) { @@ -96,7 +107,15 @@ func (mc *MetaCache) DeleteEntry(ctx context.Context, fp util.FullPath) (err err func (mc *MetaCache) ListDirectoryEntries(ctx context.Context, dirPath util.FullPath, startFileName string, includeStartFile bool, limit int) ([]*filer2.Entry, error) { mc.RLock() defer mc.RUnlock() - return mc.actualStore.ListDirectoryEntries(ctx, dirPath, startFileName, includeStartFile, limit) + + entries, err := mc.actualStore.ListDirectoryEntries(ctx, dirPath, startFileName, includeStartFile, limit) + if err != nil { + return nil, err + } + for _, entry := range entries { + filer_pb.AfterEntryDeserialization(entry.Chunks) + } + return entries, err } func (mc *MetaCache) Shutdown() { diff --git a/weed/filesys/wfs_write.go b/weed/filesys/wfs_write.go new file mode 100644 index 000000000..786d0b42a --- /dev/null +++ b/weed/filesys/wfs_write.go @@ -0,0 +1,66 @@ +package filesys + +import ( + "context" + "fmt" + "io" + + "github.com/chrislusf/seaweedfs/weed/filer2" + "github.com/chrislusf/seaweedfs/weed/glog" + "github.com/chrislusf/seaweedfs/weed/operation" + "github.com/chrislusf/seaweedfs/weed/pb/filer_pb" + "github.com/chrislusf/seaweedfs/weed/security" +) + +func (wfs *WFS) saveDataAsChunk(dir string) filer2.SaveDataAsChunkFunctionType { + + return func(reader io.Reader, filename string, offset int64) (chunk *filer_pb.FileChunk, collection, replication string, err error) { + var fileId, host string + var auth security.EncodedJwt + + if err := wfs.WithFilerClient(func(client filer_pb.SeaweedFilerClient) error { + + request := &filer_pb.AssignVolumeRequest{ + Count: 1, + Replication: wfs.option.Replication, + Collection: wfs.option.Collection, + TtlSec: wfs.option.TtlSec, + DataCenter: wfs.option.DataCenter, + ParentPath: dir, + } + + resp, err := client.AssignVolume(context.Background(), request) + if err != nil { + glog.V(0).Infof("assign volume failure %v: %v", request, err) + return err + } + if resp.Error != "" { + return fmt.Errorf("assign volume failure %v: %v", request, resp.Error) + } + + fileId, host, auth = resp.FileId, resp.Url, security.EncodedJwt(resp.Auth) + host = wfs.AdjustedUrl(host) + collection, replication = resp.Collection, resp.Replication + + return nil + }); err != nil { + return nil, "", "", fmt.Errorf("filerGrpcAddress assign volume: %v", err) + } + + fileUrl := fmt.Sprintf("http://%s/%s", host, fileId) + uploadResult, err, data := operation.Upload(fileUrl, filename, wfs.option.Cipher, reader, false, "", nil, auth) + if err != nil { + glog.V(0).Infof("upload data %v to %s: %v", filename, fileUrl, err) + return nil, "", "", fmt.Errorf("upload data: %v", err) + } + if uploadResult.Error != "" { + glog.V(0).Infof("upload failure %v to %s: %v", filename, fileUrl, err) + return nil, "", "", fmt.Errorf("upload result: %v", uploadResult.Error) + } + + wfs.chunkCache.SetChunk(fileId, data) + + chunk = uploadResult.ToPbFileChunk(fileId, offset) + return chunk, "", "", nil + } +} |
