diff options
| author | Chris Lu <chris.lu@gmail.com> | 2021-10-01 23:23:39 -0700 |
|---|---|---|
| committer | Chris Lu <chris.lu@gmail.com> | 2021-10-01 23:23:39 -0700 |
| commit | af207bbaf05ce441e9a87f0833c09a36c0629e85 (patch) | |
| tree | 95b2c10e7a7aa9b1218d7d7ba5d71137da1819ec /weed/command/filer_copy.go | |
| parent | ec3351a4ec21553b3d14587f101ed214ccd8fa7c (diff) | |
| download | seaweedfs-af207bbaf05ce441e9a87f0833c09a36c0629e85.tar.xz seaweedfs-af207bbaf05ce441e9a87f0833c09a36c0629e85.zip | |
retry both assign volume and uploading data
fix https://github.com/chrislusf/seaweedfs/issues/2351
Diffstat (limited to 'weed/command/filer_copy.go')
| -rw-r--r-- | weed/command/filer_copy.go | 62 |
1 files changed, 34 insertions, 28 deletions
diff --git a/weed/command/filer_copy.go b/weed/command/filer_copy.go index 0feae63b3..630f066d6 100644 --- a/weed/command/filer_copy.go +++ b/weed/command/filer_copy.go @@ -344,9 +344,9 @@ func (worker *FileCopyWorker) uploadFileAsOne(task FileCopyTask, f *os.File) err return err } - // assign a volume - err = util.Retry("assignVolume", func() error { - return pb.WithGrpcFilerClient(worker.filerAddress, worker.options.grpcDialOption, func(client filer_pb.SeaweedFilerClient) error { + err = util.Retry("upload", func() error { + // assign a volume + assignErr := pb.WithGrpcFilerClient(worker.filerAddress, worker.options.grpcDialOption, func(client filer_pb.SeaweedFilerClient) error { request := &filer_pb.AssignVolumeRequest{ Count: 1, @@ -369,35 +369,41 @@ func (worker *FileCopyWorker) uploadFileAsOne(task FileCopyTask, f *os.File) err } return nil }) - }) - if err != nil { - return fmt.Errorf("Failed to assign from %v: %v\n", worker.options.masters, err) - } + if assignErr != nil { + return assignErr + } - targetUrl := "http://" + assignResult.Location.Url + "/" + assignResult.FileId - uploadOption := &operation.UploadOption{ - UploadUrl: targetUrl, - Filename: fileName, - Cipher: worker.options.cipher, - IsInputCompressed: false, - MimeType: mimeType, - PairMap: nil, - Jwt: security.EncodedJwt(assignResult.Auth), - } - uploadResult, err := operation.UploadData(data, uploadOption) + // uplaod data + targetUrl := "http://" + assignResult.Location.Url + "/" + assignResult.FileId + uploadOption := &operation.UploadOption{ + UploadUrl: targetUrl, + Filename: fileName, + Cipher: worker.options.cipher, + IsInputCompressed: false, + MimeType: mimeType, + PairMap: nil, + Jwt: security.EncodedJwt(assignResult.Auth), + } + uploadResult, err := operation.UploadData(data, uploadOption) + if err != nil { + return fmt.Errorf("upload data %v to %s: %v\n", fileName, targetUrl, err) + } + if uploadResult.Error != "" { + return fmt.Errorf("upload %v to %s result: %v\n", fileName, targetUrl, uploadResult.Error) + } + if *worker.options.verbose { + fmt.Printf("uploaded %s to %s\n", fileName, targetUrl) + } + + fmt.Printf("copied %s => http://%s%s%s\n", f.Name(), worker.filerAddress.ToHttpAddress(), task.destinationUrlPath, fileName) + chunks = append(chunks, uploadResult.ToPbFileChunk(assignResult.FileId, 0)) + + return nil + }) if err != nil { - return fmt.Errorf("upload data %v to %s: %v\n", fileName, targetUrl, err) - } - if uploadResult.Error != "" { - return fmt.Errorf("upload %v to %s result: %v\n", fileName, targetUrl, uploadResult.Error) + return fmt.Errorf("upload %v: %v\n", fileName, err) } - if *worker.options.verbose { - fmt.Printf("uploaded %s to %s\n", fileName, targetUrl) - } - - chunks = append(chunks, uploadResult.ToPbFileChunk(assignResult.FileId, 0)) - fmt.Printf("copied %s => http://%s%s%s\n", f.Name(), worker.filerAddress.ToHttpAddress(), task.destinationUrlPath, fileName) } if err := pb.WithGrpcFilerClient(worker.filerAddress, worker.options.grpcDialOption, func(client filer_pb.SeaweedFilerClient) error { |
