aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorchrislu <chris.lu@gmail.com>2022-08-20 18:50:57 -0700
committerchrislu <chris.lu@gmail.com>2022-08-20 18:50:57 -0700
commitd49d0a9fc29db0ae3fd667d3f988561ca8e3b635 (patch)
treeb4ee553998e95b2b9600411e7e6ecbacc8313dce
parent689b4ecdcc9dc6edeaf557a6988f520e1fe608ac (diff)
downloadseaweedfs-d49d0a9fc29db0ae3fd667d3f988561ca8e3b635.tar.xz
seaweedfs-d49d0a9fc29db0ae3fd667d3f988561ca8e3b635.zip
filer.copy: retryable upload
-rw-r--r--weed/command/filer_copy.go94
-rw-r--r--weed/operation/upload_content.go49
2 files changed, 95 insertions, 48 deletions
diff --git a/weed/command/filer_copy.go b/weed/command/filer_copy.go
index 7bfd484f0..e20087ce4 100644
--- a/weed/command/filer_copy.go
+++ b/weed/command/filer_copy.go
@@ -585,59 +585,57 @@ func detectMimeType(f *os.File) string {
func (worker *FileCopyWorker) saveDataAsChunk(reader io.Reader, name string, offset int64) (chunk *filer_pb.FileChunk, err error) {
- var fileId, host string
- var auth security.EncodedJwt
+ var finalFileId string
+ uploadResult, flushErr, _ := operation.UploadWithRetry(
+ worker,
+ &filer_pb.AssignVolumeRequest{
+ Count: 1,
+ Replication: *worker.options.replication,
+ Collection: *worker.options.collection,
+ TtlSec: worker.options.ttlSec,
+ DiskType: *worker.options.diskType,
+ Path: name,
+ },
+ &operation.UploadOption{
+ Filename: name,
+ Cipher: worker.options.cipher,
+ IsInputCompressed: false,
+ MimeType: "",
+ PairMap: nil,
+ },
+ func(host, fileId string) string {
+ finalFileId = fileId
+ return fmt.Sprintf("http://%s/%s", host, fileId)
+ },
+ reader,
+ )
- if flushErr := pb.WithGrpcFilerClient(false, worker.filerAddress, worker.options.grpcDialOption, func(client filer_pb.SeaweedFilerClient) error {
-
- ctx := context.Background()
-
- assignErr := util.Retry("assignVolume", func() error {
- request := &filer_pb.AssignVolumeRequest{
- Count: 1,
- Replication: *worker.options.replication,
- Collection: *worker.options.collection,
- TtlSec: worker.options.ttlSec,
- DiskType: *worker.options.diskType,
- Path: name,
- }
-
- resp, err := client.AssignVolume(ctx, request)
- if err != nil {
- return fmt.Errorf("assign volume failure %v: %v", request, err)
- }
- if resp.Error != "" {
- return fmt.Errorf("assign volume failure %v: %v", request, resp.Error)
- }
-
- fileId, host, auth = resp.FileId, resp.Location.Url, security.EncodedJwt(resp.Auth)
-
- return nil
- })
- if assignErr != nil {
- return assignErr
- }
-
- return nil
- }); flushErr != nil {
- return nil, fmt.Errorf("filerGrpcAddress assign volume: %v", flushErr)
- }
-
- uploadOption := &operation.UploadOption{
- UploadUrl: fmt.Sprintf("http://%s/%s", host, fileId),
- Filename: name,
- Cipher: worker.options.cipher,
- IsInputCompressed: false,
- MimeType: "",
- PairMap: nil,
- Jwt: auth,
- }
- uploadResult, flushErr, _ := operation.Upload(reader, uploadOption)
if flushErr != nil {
return nil, fmt.Errorf("upload data: %v", flushErr)
}
if uploadResult.Error != "" {
return nil, fmt.Errorf("upload result: %v", uploadResult.Error)
}
- return uploadResult.ToPbFileChunk(fileId, offset), nil
+ return uploadResult.ToPbFileChunk(finalFileId, offset), nil
+}
+
+var _ = filer_pb.FilerClient(&FileCopyWorker{})
+
+func (worker *FileCopyWorker) WithFilerClient(streamingMode bool, fn func(filer_pb.SeaweedFilerClient) error) (err error) {
+
+ filerGrpcAddress := worker.filerAddress.ToGrpcAddress()
+ err = pb.WithGrpcClient(streamingMode, func(grpcConnection *grpc.ClientConn) error {
+ client := filer_pb.NewSeaweedFilerClient(grpcConnection)
+ return fn(client)
+ }, filerGrpcAddress, worker.options.grpcDialOption)
+
+ return
+}
+
+func (worker *FileCopyWorker) AdjustedUrl(location *filer_pb.Location) string {
+ return location.Url
+}
+
+func (worker *FileCopyWorker) GetDataCenter() string {
+ return ""
}
diff --git a/weed/operation/upload_content.go b/weed/operation/upload_content.go
index cafba5ce9..76f55523a 100644
--- a/weed/operation/upload_content.go
+++ b/weed/operation/upload_content.go
@@ -2,6 +2,7 @@ package operation
import (
"bytes"
+ "context"
"encoding/json"
"fmt"
"github.com/seaweedfs/seaweedfs/weed/glog"
@@ -28,6 +29,7 @@ type UploadOption struct {
MimeType string
PairMap map[string]string
Jwt security.EncodedJwt
+ RetryForever bool
}
type UploadResult struct {
@@ -76,6 +78,53 @@ func init() {
}}
}
+// UploadWithRetry will retry both assigning volume request and uploading content
+// The option parameter does not need to specify UploadUrl and Jwt, which will come from assigning volume.
+func UploadWithRetry(filerClient filer_pb.FilerClient, assignRequest *filer_pb.AssignVolumeRequest, uploadOption *UploadOption, genFileUrlFn func(host, fileId string) string, reader io.Reader) (uploadResult *UploadResult, err error, data []byte) {
+ doUploadFunc := func() error {
+
+ var fileId, host string
+ var auth security.EncodedJwt
+
+ // grpc assign volume
+ if grpcAssignErr := filerClient.WithFilerClient(false, func(client filer_pb.SeaweedFilerClient) error {
+ resp, assignErr := client.AssignVolume(context.Background(), assignRequest)
+ if assignErr != nil {
+ glog.V(0).Infof("assign volume failure %v: %v", assignRequest, assignErr)
+ return assignErr
+ }
+ if resp.Error != "" {
+ return fmt.Errorf("assign volume failure %v: %v", assignRequest, resp.Error)
+ }
+
+ fileId, auth = resp.FileId, security.EncodedJwt(resp.Auth)
+ loc := resp.Location
+ host = filerClient.AdjustedUrl(loc)
+
+ return nil
+ }); grpcAssignErr != nil {
+ return fmt.Errorf("filerGrpcAddress assign volume: %v", grpcAssignErr)
+ }
+
+ uploadOption.UploadUrl = genFileUrlFn(host, fileId)
+ uploadOption.Jwt = auth
+
+ var uploadErr error
+ uploadResult, uploadErr, data = doUpload(reader, uploadOption)
+ return uploadErr
+ }
+ if uploadOption.RetryForever {
+ util.RetryForever("uploadWithRetryForever", doUploadFunc, func(err error) (shouldContinue bool) {
+ glog.V(0).Infof("upload content: %v", err)
+ return true
+ })
+ } else {
+ err = util.Retry("uploadWithRetry", doUploadFunc)
+ }
+
+ return
+}
+
var fileNameEscaper = strings.NewReplacer(`\`, `\\`, `"`, `\"`, "\n", "")
// Upload sends a POST request to a volume server to upload the content with adjustable compression level