aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorChris Lu <chris.lu@gmail.com>2021-10-01 23:23:39 -0700
committerChris Lu <chris.lu@gmail.com>2021-10-01 23:23:39 -0700
commitaf207bbaf05ce441e9a87f0833c09a36c0629e85 (patch)
tree95b2c10e7a7aa9b1218d7d7ba5d71137da1819ec
parentec3351a4ec21553b3d14587f101ed214ccd8fa7c (diff)
downloadseaweedfs-af207bbaf05ce441e9a87f0833c09a36c0629e85.tar.xz
seaweedfs-af207bbaf05ce441e9a87f0833c09a36c0629e85.zip
retry both assign volume and uploading data
fix https://github.com/chrislusf/seaweedfs/issues/2351
-rw-r--r--weed/command/filer_copy.go62
1 files changed, 34 insertions, 28 deletions
diff --git a/weed/command/filer_copy.go b/weed/command/filer_copy.go
index 0feae63b3..630f066d6 100644
--- a/weed/command/filer_copy.go
+++ b/weed/command/filer_copy.go
@@ -344,9 +344,9 @@ func (worker *FileCopyWorker) uploadFileAsOne(task FileCopyTask, f *os.File) err
return err
}
- // assign a volume
- err = util.Retry("assignVolume", func() error {
- return pb.WithGrpcFilerClient(worker.filerAddress, worker.options.grpcDialOption, func(client filer_pb.SeaweedFilerClient) error {
+ err = util.Retry("upload", func() error {
+ // assign a volume
+ assignErr := pb.WithGrpcFilerClient(worker.filerAddress, worker.options.grpcDialOption, func(client filer_pb.SeaweedFilerClient) error {
request := &filer_pb.AssignVolumeRequest{
Count: 1,
@@ -369,35 +369,41 @@ func (worker *FileCopyWorker) uploadFileAsOne(task FileCopyTask, f *os.File) err
}
return nil
})
- })
- if err != nil {
- return fmt.Errorf("Failed to assign from %v: %v\n", worker.options.masters, err)
- }
+ if assignErr != nil {
+ return assignErr
+ }
- targetUrl := "http://" + assignResult.Location.Url + "/" + assignResult.FileId
- uploadOption := &operation.UploadOption{
- UploadUrl: targetUrl,
- Filename: fileName,
- Cipher: worker.options.cipher,
- IsInputCompressed: false,
- MimeType: mimeType,
- PairMap: nil,
- Jwt: security.EncodedJwt(assignResult.Auth),
- }
- uploadResult, err := operation.UploadData(data, uploadOption)
+ // uplaod data
+ targetUrl := "http://" + assignResult.Location.Url + "/" + assignResult.FileId
+ uploadOption := &operation.UploadOption{
+ UploadUrl: targetUrl,
+ Filename: fileName,
+ Cipher: worker.options.cipher,
+ IsInputCompressed: false,
+ MimeType: mimeType,
+ PairMap: nil,
+ Jwt: security.EncodedJwt(assignResult.Auth),
+ }
+ uploadResult, err := operation.UploadData(data, uploadOption)
+ if err != nil {
+ return fmt.Errorf("upload data %v to %s: %v\n", fileName, targetUrl, err)
+ }
+ if uploadResult.Error != "" {
+ return fmt.Errorf("upload %v to %s result: %v\n", fileName, targetUrl, uploadResult.Error)
+ }
+ if *worker.options.verbose {
+ fmt.Printf("uploaded %s to %s\n", fileName, targetUrl)
+ }
+
+ fmt.Printf("copied %s => http://%s%s%s\n", f.Name(), worker.filerAddress.ToHttpAddress(), task.destinationUrlPath, fileName)
+ chunks = append(chunks, uploadResult.ToPbFileChunk(assignResult.FileId, 0))
+
+ return nil
+ })
if err != nil {
- return fmt.Errorf("upload data %v to %s: %v\n", fileName, targetUrl, err)
- }
- if uploadResult.Error != "" {
- return fmt.Errorf("upload %v to %s result: %v\n", fileName, targetUrl, uploadResult.Error)
+ return fmt.Errorf("upload %v: %v\n", fileName, err)
}
- if *worker.options.verbose {
- fmt.Printf("uploaded %s to %s\n", fileName, targetUrl)
- }
-
- chunks = append(chunks, uploadResult.ToPbFileChunk(assignResult.FileId, 0))
- fmt.Printf("copied %s => http://%s%s%s\n", f.Name(), worker.filerAddress.ToHttpAddress(), task.destinationUrlPath, fileName)
}
if err := pb.WithGrpcFilerClient(worker.filerAddress, worker.options.grpcDialOption, func(client filer_pb.SeaweedFilerClient) error {