aboutsummaryrefslogtreecommitdiff
path: root/weed/operation/upload_content.go
diff options
context:
space:
mode:
Diffstat (limited to 'weed/operation/upload_content.go')
-rw-r--r--weed/operation/upload_content.go49
1 files changed, 49 insertions, 0 deletions
diff --git a/weed/operation/upload_content.go b/weed/operation/upload_content.go
index cafba5ce9..76f55523a 100644
--- a/weed/operation/upload_content.go
+++ b/weed/operation/upload_content.go
@@ -2,6 +2,7 @@ package operation
import (
"bytes"
+ "context"
"encoding/json"
"fmt"
"github.com/seaweedfs/seaweedfs/weed/glog"
@@ -28,6 +29,7 @@ type UploadOption struct {
MimeType string
PairMap map[string]string
Jwt security.EncodedJwt
+ RetryForever bool
}
type UploadResult struct {
@@ -76,6 +78,53 @@ func init() {
}}
}
+// UploadWithRetry will retry both assigning volume request and uploading content
+// The option parameter does not need to specify UploadUrl and Jwt, which will come from assigning volume.
+func UploadWithRetry(filerClient filer_pb.FilerClient, assignRequest *filer_pb.AssignVolumeRequest, uploadOption *UploadOption, genFileUrlFn func(host, fileId string) string, reader io.Reader) (uploadResult *UploadResult, err error, data []byte) {
+ doUploadFunc := func() error {
+
+ var fileId, host string
+ var auth security.EncodedJwt
+
+ // grpc assign volume
+ if grpcAssignErr := filerClient.WithFilerClient(false, func(client filer_pb.SeaweedFilerClient) error {
+ resp, assignErr := client.AssignVolume(context.Background(), assignRequest)
+ if assignErr != nil {
+ glog.V(0).Infof("assign volume failure %v: %v", assignRequest, assignErr)
+ return assignErr
+ }
+ if resp.Error != "" {
+ return fmt.Errorf("assign volume failure %v: %v", assignRequest, resp.Error)
+ }
+
+ fileId, auth = resp.FileId, security.EncodedJwt(resp.Auth)
+ loc := resp.Location
+ host = filerClient.AdjustedUrl(loc)
+
+ return nil
+ }); grpcAssignErr != nil {
+ return fmt.Errorf("filerGrpcAddress assign volume: %v", grpcAssignErr)
+ }
+
+ uploadOption.UploadUrl = genFileUrlFn(host, fileId)
+ uploadOption.Jwt = auth
+
+ var uploadErr error
+ uploadResult, uploadErr, data = doUpload(reader, uploadOption)
+ return uploadErr
+ }
+ if uploadOption.RetryForever {
+ util.RetryForever("uploadWithRetryForever", doUploadFunc, func(err error) (shouldContinue bool) {
+ glog.V(0).Infof("upload content: %v", err)
+ return true
+ })
+ } else {
+ err = util.Retry("uploadWithRetry", doUploadFunc)
+ }
+
+ return
+}
+
var fileNameEscaper = strings.NewReplacer(`\`, `\\`, `"`, `\"`, "\n", "")
// Upload sends a POST request to a volume server to upload the content with adjustable compression level