aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorchrislu <chris.lu@gmail.com>2022-08-20 19:09:15 -0700
committerchrislu <chris.lu@gmail.com>2022-08-20 19:09:15 -0700
commit4081d50607bbc71b35d042d97497d88cc1fc3427 (patch)
treeb3d083b16ee53ac198e2b4f6880d7a8dd31ea5be
parent409f39390d8b38c5f17082956f77bc39b0e6d7b4 (diff)
downloadseaweedfs-4081d50607bbc71b35d042d97497d88cc1fc3427.tar.xz
seaweedfs-4081d50607bbc71b35d042d97497d88cc1fc3427.zip
filer sink: retryable data chunk uploading
-rw-r--r--weed/replication/sink/filersink/fetch_write.go81
1 files changed, 29 insertions, 52 deletions
diff --git a/weed/replication/sink/filersink/fetch_write.go b/weed/replication/sink/filersink/fetch_write.go
index 7b3e22b90..d1a5d7ebd 100644
--- a/weed/replication/sink/filersink/fetch_write.go
+++ b/weed/replication/sink/filersink/fetch_write.go
@@ -1,7 +1,6 @@
package filersink
import (
- "context"
"fmt"
"github.com/seaweedfs/seaweedfs/weed/util"
"sync"
@@ -12,7 +11,6 @@ import (
"github.com/seaweedfs/seaweedfs/weed/operation"
"github.com/seaweedfs/seaweedfs/weed/pb"
"github.com/seaweedfs/seaweedfs/weed/pb/filer_pb"
- "github.com/seaweedfs/seaweedfs/weed/security"
)
func (fs *FilerSink) replicateChunks(sourceChunks []*filer_pb.FileChunk, path string) (replicatedChunks []*filer_pb.FileChunk, err error) {
@@ -67,62 +65,41 @@ func (fs *FilerSink) fetchAndWrite(sourceChunk *filer_pb.FileChunk, path string)
}
defer util.CloseResponse(resp)
- var host string
- var auth security.EncodedJwt
-
- if err := fs.WithFilerClient(false, func(client filer_pb.SeaweedFilerClient) error {
- return util.Retry("assignVolume", func() error {
- request := &filer_pb.AssignVolumeRequest{
- Count: 1,
- Replication: fs.replication,
- Collection: fs.collection,
- TtlSec: fs.ttlSec,
- DataCenter: fs.dataCenter,
- DiskType: fs.diskType,
- Path: path,
+ fileId, uploadResult, err, _ := operation.UploadWithRetry(
+ fs,
+ &filer_pb.AssignVolumeRequest{
+ Count: 1,
+ Replication: fs.replication,
+ Collection: fs.collection,
+ TtlSec: fs.ttlSec,
+ DataCenter: fs.dataCenter,
+ DiskType: fs.diskType,
+ Path: path,
+ },
+ &operation.UploadOption{
+ Filename: filename,
+ Cipher: false,
+ IsInputCompressed: "gzip" == header.Get("Content-Encoding"),
+ MimeType: header.Get("Content-Type"),
+ PairMap: nil,
+ },
+ func(host, fileId string) string {
+ fileUrl := fmt.Sprintf("http://%s/%s", host, fileId)
+ if fs.writeChunkByFiler {
+ fileUrl = fmt.Sprintf("http://%s/?proxyChunkId=%s", fs.address, fileId)
}
+ glog.V(4).Infof("replicating %s to %s header:%+v", filename, fileUrl, header)
+ return fileUrl
+ },
+ resp.Body,
+ )
- resp, err := client.AssignVolume(context.Background(), request)
- if err != nil {
- glog.V(0).Infof("assign volume failure %v: %v", request, err)
- return err
- }
- if resp.Error != "" {
- return fmt.Errorf("assign volume failure %v: %v", request, resp.Error)
- }
-
- fileId, host, auth = resp.FileId, resp.Location.Url, security.EncodedJwt(resp.Auth)
-
- return nil
- })
- }); err != nil {
- return "", fmt.Errorf("filerGrpcAddress assign volume: %v", err)
- }
-
- fileUrl := fmt.Sprintf("http://%s/%s", host, fileId)
- if fs.writeChunkByFiler {
- fileUrl = fmt.Sprintf("http://%s/?proxyChunkId=%s", fs.address, fileId)
- }
-
- glog.V(4).Infof("replicating %s to %s header:%+v", filename, fileUrl, header)
-
- // fetch data as is, regardless whether it is encrypted or not
- uploadOption := &operation.UploadOption{
- UploadUrl: fileUrl,
- Filename: filename,
- Cipher: false,
- IsInputCompressed: "gzip" == header.Get("Content-Encoding"),
- MimeType: header.Get("Content-Type"),
- PairMap: nil,
- Jwt: auth,
- }
- uploadResult, err, _ := operation.Upload(resp.Body, uploadOption)
if err != nil {
- glog.V(0).Infof("upload source data %v to %s: %v", sourceChunk.GetFileIdString(), fileUrl, err)
+ glog.V(0).Infof("upload source data %v: %v", sourceChunk.GetFileIdString(), err)
return "", fmt.Errorf("upload data: %v", err)
}
if uploadResult.Error != "" {
- glog.V(0).Infof("upload failure %v to %s: %v", filename, fileUrl, err)
+ glog.V(0).Infof("upload failure %v: %v", filename, err)
return "", fmt.Errorf("upload result: %v", uploadResult.Error)
}