aboutsummaryrefslogtreecommitdiff
path: root/weed/filesys/dirty_page.go
diff options
context:
space:
mode:
Diffstat (limited to 'weed/filesys/dirty_page.go')
-rw-r--r--weed/filesys/dirty_page.go50
1 files changed, 4 insertions, 46 deletions
diff --git a/weed/filesys/dirty_page.go b/weed/filesys/dirty_page.go
index 45224b3e7..46d20e466 100644
--- a/weed/filesys/dirty_page.go
+++ b/weed/filesys/dirty_page.go
@@ -2,16 +2,12 @@ package filesys
import (
"bytes"
- "context"
- "fmt"
"io"
"sync"
"time"
"github.com/chrislusf/seaweedfs/weed/glog"
- "github.com/chrislusf/seaweedfs/weed/operation"
"github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
- "github.com/chrislusf/seaweedfs/weed/security"
)
type ContinuousDirtyPages struct {
@@ -141,53 +137,15 @@ func (pages *ContinuousDirtyPages) saveExistingLargestPageToStorage() (chunk *fi
func (pages *ContinuousDirtyPages) saveToStorage(reader io.Reader, offset int64, size int64) (*filer_pb.FileChunk, error) {
- var fileId, host string
- var auth security.EncodedJwt
-
dir, _ := pages.f.fullpath().DirAndName()
- if err := pages.f.wfs.WithFilerClient(func(client filer_pb.SeaweedFilerClient) error {
-
- request := &filer_pb.AssignVolumeRequest{
- Count: 1,
- Replication: pages.f.wfs.option.Replication,
- Collection: pages.f.wfs.option.Collection,
- TtlSec: pages.f.wfs.option.TtlSec,
- DataCenter: pages.f.wfs.option.DataCenter,
- ParentPath: dir,
- }
-
- resp, err := client.AssignVolume(context.Background(), request)
- if err != nil {
- glog.V(0).Infof("assign volume failure %v: %v", request, err)
- return err
- }
- if resp.Error != "" {
- return fmt.Errorf("assign volume failure %v: %v", request, resp.Error)
- }
-
- fileId, host, auth = resp.FileId, resp.Url, security.EncodedJwt(resp.Auth)
- host = pages.f.wfs.AdjustedUrl(host)
- pages.collection, pages.replication = resp.Collection, resp.Replication
-
- return nil
- }); err != nil {
- return nil, fmt.Errorf("filerGrpcAddress assign volume: %v", err)
- }
-
- fileUrl := fmt.Sprintf("http://%s/%s", host, fileId)
- uploadResult, err, data := operation.Upload(fileUrl, pages.f.Name, pages.f.wfs.option.Cipher, reader, false, "", nil, auth)
+ chunk, collection, replication, err := pages.f.wfs.saveDataAsChunk(dir)(reader, pages.f.Name, offset)
if err != nil {
- glog.V(0).Infof("upload data %v to %s: %v", pages.f.Name, fileUrl, err)
- return nil, fmt.Errorf("upload data: %v", err)
- }
- if uploadResult.Error != "" {
- glog.V(0).Infof("upload failure %v to %s: %v", pages.f.Name, fileUrl, err)
- return nil, fmt.Errorf("upload result: %v", uploadResult.Error)
+ return nil, err
}
- pages.f.wfs.chunkCache.SetChunk(fileId, data)
+ pages.collection, pages.replication = collection, replication
- return uploadResult.ToPbFileChunk(fileId, offset), nil
+ return chunk, nil
}