aboutsummaryrefslogtreecommitdiff
path: root/weed/filesys
diff options
context:
space:
mode:
Diffstat (limited to 'weed/filesys')
-rw-r--r--weed/filesys/dirty_page.go50
-rw-r--r--weed/filesys/file.go2
-rw-r--r--weed/filesys/filehandle.go13
-rw-r--r--weed/filesys/meta_cache/meta_cache.go23
-rw-r--r--weed/filesys/wfs_write.go66
5 files changed, 103 insertions, 51 deletions
diff --git a/weed/filesys/dirty_page.go b/weed/filesys/dirty_page.go
index 45224b3e7..46d20e466 100644
--- a/weed/filesys/dirty_page.go
+++ b/weed/filesys/dirty_page.go
@@ -2,16 +2,12 @@ package filesys
import (
"bytes"
- "context"
- "fmt"
"io"
"sync"
"time"
"github.com/chrislusf/seaweedfs/weed/glog"
- "github.com/chrislusf/seaweedfs/weed/operation"
"github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
- "github.com/chrislusf/seaweedfs/weed/security"
)
type ContinuousDirtyPages struct {
@@ -141,53 +137,15 @@ func (pages *ContinuousDirtyPages) saveExistingLargestPageToStorage() (chunk *fi
func (pages *ContinuousDirtyPages) saveToStorage(reader io.Reader, offset int64, size int64) (*filer_pb.FileChunk, error) {
- var fileId, host string
- var auth security.EncodedJwt
-
dir, _ := pages.f.fullpath().DirAndName()
- if err := pages.f.wfs.WithFilerClient(func(client filer_pb.SeaweedFilerClient) error {
-
- request := &filer_pb.AssignVolumeRequest{
- Count: 1,
- Replication: pages.f.wfs.option.Replication,
- Collection: pages.f.wfs.option.Collection,
- TtlSec: pages.f.wfs.option.TtlSec,
- DataCenter: pages.f.wfs.option.DataCenter,
- ParentPath: dir,
- }
-
- resp, err := client.AssignVolume(context.Background(), request)
- if err != nil {
- glog.V(0).Infof("assign volume failure %v: %v", request, err)
- return err
- }
- if resp.Error != "" {
- return fmt.Errorf("assign volume failure %v: %v", request, resp.Error)
- }
-
- fileId, host, auth = resp.FileId, resp.Url, security.EncodedJwt(resp.Auth)
- host = pages.f.wfs.AdjustedUrl(host)
- pages.collection, pages.replication = resp.Collection, resp.Replication
-
- return nil
- }); err != nil {
- return nil, fmt.Errorf("filerGrpcAddress assign volume: %v", err)
- }
-
- fileUrl := fmt.Sprintf("http://%s/%s", host, fileId)
- uploadResult, err, data := operation.Upload(fileUrl, pages.f.Name, pages.f.wfs.option.Cipher, reader, false, "", nil, auth)
+ chunk, collection, replication, err := pages.f.wfs.saveDataAsChunk(dir)(reader, pages.f.Name, offset)
if err != nil {
- glog.V(0).Infof("upload data %v to %s: %v", pages.f.Name, fileUrl, err)
- return nil, fmt.Errorf("upload data: %v", err)
- }
- if uploadResult.Error != "" {
- glog.V(0).Infof("upload failure %v to %s: %v", pages.f.Name, fileUrl, err)
- return nil, fmt.Errorf("upload result: %v", uploadResult.Error)
+ return nil, err
}
- pages.f.wfs.chunkCache.SetChunk(fileId, data)
+ pages.collection, pages.replication = collection, replication
- return uploadResult.ToPbFileChunk(fileId, offset), nil
+ return chunk, nil
}
diff --git a/weed/filesys/file.go b/weed/filesys/file.go
index 4a6bc9a8a..dcda93522 100644
--- a/weed/filesys/file.go
+++ b/weed/filesys/file.go
@@ -253,7 +253,7 @@ func (file *File) addChunks(chunks []*filer_pb.FileChunk) {
func (file *File) setEntry(entry *filer_pb.Entry) {
file.entry = entry
- file.entryViewCache = filer2.NonOverlappingVisibleIntervals(file.entry.Chunks)
+ file.entryViewCache, _ = filer2.NonOverlappingVisibleIntervals(filer2.LookupFn(file.wfs), file.entry.Chunks)
file.reader = nil
}
diff --git a/weed/filesys/filehandle.go b/weed/filesys/filehandle.go
index 9b9df916c..31fd08f97 100644
--- a/weed/filesys/filehandle.go
+++ b/weed/filesys/filehandle.go
@@ -88,8 +88,12 @@ func (fh *FileHandle) readFromChunks(buff []byte, offset int64) (int64, error) {
return 0, nil
}
+ var chunkResolveErr error
if fh.f.entryViewCache == nil {
- fh.f.entryViewCache = filer2.NonOverlappingVisibleIntervals(fh.f.entry.Chunks)
+ fh.f.entryViewCache, chunkResolveErr = filer2.NonOverlappingVisibleIntervals(filer2.LookupFn(fh.f.wfs), fh.f.entry.Chunks)
+ if chunkResolveErr != nil {
+ return 0, fmt.Errorf("fail to resolve chunk manifest: %v", chunkResolveErr)
+ }
fh.f.reader = nil
}
@@ -206,7 +210,12 @@ func (fh *FileHandle) Flush(ctx context.Context, req *fuse.FlushRequest) error {
glog.V(3).Infof("%s chunks %d: %v [%d,%d)", fh.f.fullpath(), i, chunk.FileId, chunk.Offset, chunk.Offset+int64(chunk.Size))
}
- chunks, garbages := filer2.CompactFileChunks(fh.f.entry.Chunks)
+ chunks, garbages := filer2.CompactFileChunks(filer2.LookupFn(fh.f.wfs), fh.f.entry.Chunks)
+ chunks, manifestErr := filer2.MaybeManifestize(fh.f.wfs.saveDataAsChunk(fh.f.dir.FullPath()), chunks)
+ if manifestErr != nil {
+ // not good, but should be ok
+ glog.V(0).Infof("MaybeManifestize: %v", manifestErr)
+ }
fh.f.entry.Chunks = chunks
// fh.f.entryViewCache = nil
diff --git a/weed/filesys/meta_cache/meta_cache.go b/weed/filesys/meta_cache/meta_cache.go
index fdb486ba4..edf329143 100644
--- a/weed/filesys/meta_cache/meta_cache.go
+++ b/weed/filesys/meta_cache/meta_cache.go
@@ -8,10 +8,14 @@ import (
"github.com/chrislusf/seaweedfs/weed/filer2"
"github.com/chrislusf/seaweedfs/weed/filer2/leveldb"
"github.com/chrislusf/seaweedfs/weed/glog"
+ "github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
"github.com/chrislusf/seaweedfs/weed/util"
"github.com/chrislusf/seaweedfs/weed/util/bounded_tree"
)
+// need to have logic similar to FilerStoreWrapper
+// e.g. fill fileId field for chunks
+
type MetaCache struct {
actualStore filer2.FilerStore
sync.RWMutex
@@ -46,6 +50,7 @@ func openMetaStore(dbFolder string) filer2.FilerStore {
func (mc *MetaCache) InsertEntry(ctx context.Context, entry *filer2.Entry) error {
mc.Lock()
defer mc.Unlock()
+ filer_pb.BeforeEntrySerialization(entry.Chunks)
return mc.actualStore.InsertEntry(ctx, entry)
}
@@ -78,13 +83,19 @@ func (mc *MetaCache) AtomicUpdateEntry(ctx context.Context, oldPath util.FullPat
func (mc *MetaCache) UpdateEntry(ctx context.Context, entry *filer2.Entry) error {
mc.Lock()
defer mc.Unlock()
+ filer_pb.BeforeEntrySerialization(entry.Chunks)
return mc.actualStore.UpdateEntry(ctx, entry)
}
func (mc *MetaCache) FindEntry(ctx context.Context, fp util.FullPath) (entry *filer2.Entry, err error) {
mc.RLock()
defer mc.RUnlock()
- return mc.actualStore.FindEntry(ctx, fp)
+ entry, err = mc.actualStore.FindEntry(ctx, fp)
+ if err != nil {
+ return nil, err
+ }
+ filer_pb.AfterEntryDeserialization(entry.Chunks)
+ return
}
func (mc *MetaCache) DeleteEntry(ctx context.Context, fp util.FullPath) (err error) {
@@ -96,7 +107,15 @@ func (mc *MetaCache) DeleteEntry(ctx context.Context, fp util.FullPath) (err err
func (mc *MetaCache) ListDirectoryEntries(ctx context.Context, dirPath util.FullPath, startFileName string, includeStartFile bool, limit int) ([]*filer2.Entry, error) {
mc.RLock()
defer mc.RUnlock()
- return mc.actualStore.ListDirectoryEntries(ctx, dirPath, startFileName, includeStartFile, limit)
+
+ entries, err := mc.actualStore.ListDirectoryEntries(ctx, dirPath, startFileName, includeStartFile, limit)
+ if err != nil {
+ return nil, err
+ }
+ for _, entry := range entries {
+ filer_pb.AfterEntryDeserialization(entry.Chunks)
+ }
+ return entries, err
}
func (mc *MetaCache) Shutdown() {
diff --git a/weed/filesys/wfs_write.go b/weed/filesys/wfs_write.go
new file mode 100644
index 000000000..786d0b42a
--- /dev/null
+++ b/weed/filesys/wfs_write.go
@@ -0,0 +1,66 @@
+package filesys
+
+import (
+ "context"
+ "fmt"
+ "io"
+
+ "github.com/chrislusf/seaweedfs/weed/filer2"
+ "github.com/chrislusf/seaweedfs/weed/glog"
+ "github.com/chrislusf/seaweedfs/weed/operation"
+ "github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
+ "github.com/chrislusf/seaweedfs/weed/security"
+)
+
+func (wfs *WFS) saveDataAsChunk(dir string) filer2.SaveDataAsChunkFunctionType {
+
+ return func(reader io.Reader, filename string, offset int64) (chunk *filer_pb.FileChunk, collection, replication string, err error) {
+ var fileId, host string
+ var auth security.EncodedJwt
+
+ if err := wfs.WithFilerClient(func(client filer_pb.SeaweedFilerClient) error {
+
+ request := &filer_pb.AssignVolumeRequest{
+ Count: 1,
+ Replication: wfs.option.Replication,
+ Collection: wfs.option.Collection,
+ TtlSec: wfs.option.TtlSec,
+ DataCenter: wfs.option.DataCenter,
+ ParentPath: dir,
+ }
+
+ resp, err := client.AssignVolume(context.Background(), request)
+ if err != nil {
+ glog.V(0).Infof("assign volume failure %v: %v", request, err)
+ return err
+ }
+ if resp.Error != "" {
+ return fmt.Errorf("assign volume failure %v: %v", request, resp.Error)
+ }
+
+ fileId, host, auth = resp.FileId, resp.Url, security.EncodedJwt(resp.Auth)
+ host = wfs.AdjustedUrl(host)
+ collection, replication = resp.Collection, resp.Replication
+
+ return nil
+ }); err != nil {
+ return nil, "", "", fmt.Errorf("filerGrpcAddress assign volume: %v", err)
+ }
+
+ fileUrl := fmt.Sprintf("http://%s/%s", host, fileId)
+ uploadResult, err, data := operation.Upload(fileUrl, filename, wfs.option.Cipher, reader, false, "", nil, auth)
+ if err != nil {
+ glog.V(0).Infof("upload data %v to %s: %v", filename, fileUrl, err)
+ return nil, "", "", fmt.Errorf("upload data: %v", err)
+ }
+ if uploadResult.Error != "" {
+ glog.V(0).Infof("upload failure %v to %s: %v", filename, fileUrl, err)
+ return nil, "", "", fmt.Errorf("upload result: %v", uploadResult.Error)
+ }
+
+ wfs.chunkCache.SetChunk(fileId, data)
+
+ chunk = uploadResult.ToPbFileChunk(fileId, offset)
+ return chunk, "", "", nil
+ }
+}