aboutsummaryrefslogtreecommitdiff
path: root/weed/filesys
diff options
context:
space:
mode:
Diffstat (limited to 'weed/filesys')
-rw-r--r--weed/filesys/dirty_page.go50
-rw-r--r--weed/filesys/file.go2
-rw-r--r--weed/filesys/filehandle.go13
-rw-r--r--weed/filesys/wfs_write.go66
4 files changed, 82 insertions, 49 deletions
diff --git a/weed/filesys/dirty_page.go b/weed/filesys/dirty_page.go
index 45224b3e7..46d20e466 100644
--- a/weed/filesys/dirty_page.go
+++ b/weed/filesys/dirty_page.go
@@ -2,16 +2,12 @@ package filesys
import (
"bytes"
- "context"
- "fmt"
"io"
"sync"
"time"
"github.com/chrislusf/seaweedfs/weed/glog"
- "github.com/chrislusf/seaweedfs/weed/operation"
"github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
- "github.com/chrislusf/seaweedfs/weed/security"
)
type ContinuousDirtyPages struct {
@@ -141,53 +137,15 @@ func (pages *ContinuousDirtyPages) saveExistingLargestPageToStorage() (chunk *fi
func (pages *ContinuousDirtyPages) saveToStorage(reader io.Reader, offset int64, size int64) (*filer_pb.FileChunk, error) {
- var fileId, host string
- var auth security.EncodedJwt
-
dir, _ := pages.f.fullpath().DirAndName()
- if err := pages.f.wfs.WithFilerClient(func(client filer_pb.SeaweedFilerClient) error {
-
- request := &filer_pb.AssignVolumeRequest{
- Count: 1,
- Replication: pages.f.wfs.option.Replication,
- Collection: pages.f.wfs.option.Collection,
- TtlSec: pages.f.wfs.option.TtlSec,
- DataCenter: pages.f.wfs.option.DataCenter,
- ParentPath: dir,
- }
-
- resp, err := client.AssignVolume(context.Background(), request)
- if err != nil {
- glog.V(0).Infof("assign volume failure %v: %v", request, err)
- return err
- }
- if resp.Error != "" {
- return fmt.Errorf("assign volume failure %v: %v", request, resp.Error)
- }
-
- fileId, host, auth = resp.FileId, resp.Url, security.EncodedJwt(resp.Auth)
- host = pages.f.wfs.AdjustedUrl(host)
- pages.collection, pages.replication = resp.Collection, resp.Replication
-
- return nil
- }); err != nil {
- return nil, fmt.Errorf("filerGrpcAddress assign volume: %v", err)
- }
-
- fileUrl := fmt.Sprintf("http://%s/%s", host, fileId)
- uploadResult, err, data := operation.Upload(fileUrl, pages.f.Name, pages.f.wfs.option.Cipher, reader, false, "", nil, auth)
+ chunk, collection, replication, err := pages.f.wfs.saveDataAsChunk(dir)(reader, pages.f.Name, offset)
if err != nil {
- glog.V(0).Infof("upload data %v to %s: %v", pages.f.Name, fileUrl, err)
- return nil, fmt.Errorf("upload data: %v", err)
- }
- if uploadResult.Error != "" {
- glog.V(0).Infof("upload failure %v to %s: %v", pages.f.Name, fileUrl, err)
- return nil, fmt.Errorf("upload result: %v", uploadResult.Error)
+ return nil, err
}
- pages.f.wfs.chunkCache.SetChunk(fileId, data)
+ pages.collection, pages.replication = collection, replication
- return uploadResult.ToPbFileChunk(fileId, offset), nil
+ return chunk, nil
}
diff --git a/weed/filesys/file.go b/weed/filesys/file.go
index 4a6bc9a8a..dcda93522 100644
--- a/weed/filesys/file.go
+++ b/weed/filesys/file.go
@@ -253,7 +253,7 @@ func (file *File) addChunks(chunks []*filer_pb.FileChunk) {
func (file *File) setEntry(entry *filer_pb.Entry) {
file.entry = entry
- file.entryViewCache = filer2.NonOverlappingVisibleIntervals(file.entry.Chunks)
+ file.entryViewCache, _ = filer2.NonOverlappingVisibleIntervals(filer2.LookupFn(file.wfs), file.entry.Chunks)
file.reader = nil
}
diff --git a/weed/filesys/filehandle.go b/weed/filesys/filehandle.go
index 9b9df916c..31fd08f97 100644
--- a/weed/filesys/filehandle.go
+++ b/weed/filesys/filehandle.go
@@ -88,8 +88,12 @@ func (fh *FileHandle) readFromChunks(buff []byte, offset int64) (int64, error) {
return 0, nil
}
+ var chunkResolveErr error
if fh.f.entryViewCache == nil {
- fh.f.entryViewCache = filer2.NonOverlappingVisibleIntervals(fh.f.entry.Chunks)
+ fh.f.entryViewCache, chunkResolveErr = filer2.NonOverlappingVisibleIntervals(filer2.LookupFn(fh.f.wfs), fh.f.entry.Chunks)
+ if chunkResolveErr != nil {
+ return 0, fmt.Errorf("fail to resolve chunk manifest: %v", chunkResolveErr)
+ }
fh.f.reader = nil
}
@@ -206,7 +210,12 @@ func (fh *FileHandle) Flush(ctx context.Context, req *fuse.FlushRequest) error {
glog.V(3).Infof("%s chunks %d: %v [%d,%d)", fh.f.fullpath(), i, chunk.FileId, chunk.Offset, chunk.Offset+int64(chunk.Size))
}
- chunks, garbages := filer2.CompactFileChunks(fh.f.entry.Chunks)
+ chunks, garbages := filer2.CompactFileChunks(filer2.LookupFn(fh.f.wfs), fh.f.entry.Chunks)
+ chunks, manifestErr := filer2.MaybeManifestize(fh.f.wfs.saveDataAsChunk(fh.f.dir.FullPath()), chunks)
+ if manifestErr != nil {
+ // not good, but should be ok
+ glog.V(0).Infof("MaybeManifestize: %v", manifestErr)
+ }
fh.f.entry.Chunks = chunks
// fh.f.entryViewCache = nil
diff --git a/weed/filesys/wfs_write.go b/weed/filesys/wfs_write.go
new file mode 100644
index 000000000..786d0b42a
--- /dev/null
+++ b/weed/filesys/wfs_write.go
@@ -0,0 +1,66 @@
+package filesys
+
+import (
+ "context"
+ "fmt"
+ "io"
+
+ "github.com/chrislusf/seaweedfs/weed/filer2"
+ "github.com/chrislusf/seaweedfs/weed/glog"
+ "github.com/chrislusf/seaweedfs/weed/operation"
+ "github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
+ "github.com/chrislusf/seaweedfs/weed/security"
+)
+
+func (wfs *WFS) saveDataAsChunk(dir string) filer2.SaveDataAsChunkFunctionType {
+
+ return func(reader io.Reader, filename string, offset int64) (chunk *filer_pb.FileChunk, collection, replication string, err error) {
+ var fileId, host string
+ var auth security.EncodedJwt
+
+ if err := wfs.WithFilerClient(func(client filer_pb.SeaweedFilerClient) error {
+
+ request := &filer_pb.AssignVolumeRequest{
+ Count: 1,
+ Replication: wfs.option.Replication,
+ Collection: wfs.option.Collection,
+ TtlSec: wfs.option.TtlSec,
+ DataCenter: wfs.option.DataCenter,
+ ParentPath: dir,
+ }
+
+ resp, err := client.AssignVolume(context.Background(), request)
+ if err != nil {
+ glog.V(0).Infof("assign volume failure %v: %v", request, err)
+ return err
+ }
+ if resp.Error != "" {
+ return fmt.Errorf("assign volume failure %v: %v", request, resp.Error)
+ }
+
+ fileId, host, auth = resp.FileId, resp.Url, security.EncodedJwt(resp.Auth)
+ host = wfs.AdjustedUrl(host)
+ collection, replication = resp.Collection, resp.Replication
+
+ return nil
+ }); err != nil {
+ return nil, "", "", fmt.Errorf("filerGrpcAddress assign volume: %v", err)
+ }
+
+ fileUrl := fmt.Sprintf("http://%s/%s", host, fileId)
+ uploadResult, err, data := operation.Upload(fileUrl, filename, wfs.option.Cipher, reader, false, "", nil, auth)
+ if err != nil {
+ glog.V(0).Infof("upload data %v to %s: %v", filename, fileUrl, err)
+ return nil, "", "", fmt.Errorf("upload data: %v", err)
+ }
+ if uploadResult.Error != "" {
+ glog.V(0).Infof("upload failure %v to %s: %v", filename, fileUrl, err)
+ return nil, "", "", fmt.Errorf("upload result: %v", uploadResult.Error)
+ }
+
+ wfs.chunkCache.SetChunk(fileId, data)
+
+ chunk = uploadResult.ToPbFileChunk(fileId, offset)
+ return chunk, "", "", nil
+ }
+}