aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorchrislu <chris.lu@gmail.com>2022-08-20 18:59:57 -0700
committerchrislu <chris.lu@gmail.com>2022-08-20 18:59:57 -0700
commit6c8822f269c1dbddcc80383137417b5f071652b0 (patch)
treeb6fdd10c0c89b34237042ee48abf0036b5d57920
parenta3553da7f7ce7ecf5e7c5082026218d50b1f7230 (diff)
downloadseaweedfs-6c8822f269c1dbddcc80383137417b5f071652b0.tar.xz
seaweedfs-6c8822f269c1dbddcc80383137417b5f071652b0.zip
filer.copy: retryable file part upload
-rw-r--r--weed/command/filer_copy.go69
1 files changed, 26 insertions, 43 deletions
diff --git a/weed/command/filer_copy.go b/weed/command/filer_copy.go
index a196d25f3..5fbddc07e 100644
--- a/weed/command/filer_copy.go
+++ b/weed/command/filer_copy.go
@@ -454,58 +454,41 @@ func (worker *FileCopyWorker) uploadFileInChunks(task FileCopyTask, f *os.File,
wg.Done()
<-concurrentChunks
}()
- // assign a volume
- var assignResult *filer_pb.AssignVolumeResponse
- var assignError error
- err := util.Retry("assignVolume", func() error {
- return pb.WithGrpcFilerClient(false, worker.filerAddress, worker.options.grpcDialOption, func(client filer_pb.SeaweedFilerClient) error {
- request := &filer_pb.AssignVolumeRequest{
- Count: 1,
- Replication: *worker.options.replication,
- Collection: *worker.options.collection,
- TtlSec: worker.options.ttlSec,
- DiskType: *worker.options.diskType,
- Path: task.destinationUrlPath + fileName,
- }
-
- assignResult, assignError = client.AssignVolume(context.Background(), request)
- if assignError != nil {
- return fmt.Errorf("assign volume failure %v: %v", request, assignError)
- }
- if assignResult.Error != "" {
- return fmt.Errorf("assign volume failure %v: %v", request, assignResult.Error)
- }
- return nil
- })
- })
- if err != nil {
- uploadError = fmt.Errorf("Failed to assign from %v: %v\n", worker.options.masters, err)
- return
- }
- targetUrl := "http://" + assignResult.Location.Url + "/" + assignResult.FileId
+ fileId, uploadResult, err, _ := operation.UploadWithRetry(
+ worker,
+ &filer_pb.AssignVolumeRequest{
+ Count: 1,
+ Replication: *worker.options.replication,
+ Collection: *worker.options.collection,
+ TtlSec: worker.options.ttlSec,
+ DiskType: *worker.options.diskType,
+ Path: task.destinationUrlPath + fileName,
+ },
+ &operation.UploadOption{
+ Filename: fileName + "-" + strconv.FormatInt(i+1, 10),
+ Cipher: worker.options.cipher,
+ IsInputCompressed: false,
+ MimeType: "",
+ PairMap: nil,
+ },
+ func(host, fileId string) string {
+ return fmt.Sprintf("http://%s/%s", host, fileId)
+ },
+ io.NewSectionReader(f, i*chunkSize, chunkSize),
+ )
- uploadOption := &operation.UploadOption{
- UploadUrl: targetUrl,
- Filename: fileName + "-" + strconv.FormatInt(i+1, 10),
- Cipher: worker.options.cipher,
- IsInputCompressed: false,
- MimeType: "",
- PairMap: nil,
- Jwt: security.EncodedJwt(assignResult.Auth),
- }
- uploadResult, err, _ := operation.Upload(io.NewSectionReader(f, i*chunkSize, chunkSize), uploadOption)
if err != nil {
- uploadError = fmt.Errorf("upload data %v to %s: %v\n", fileName, targetUrl, err)
+ uploadError = fmt.Errorf("upload data %v: %v\n", fileName, err)
return
}
if uploadResult.Error != "" {
- uploadError = fmt.Errorf("upload %v to %s result: %v\n", fileName, targetUrl, uploadResult.Error)
+ uploadError = fmt.Errorf("upload %v result: %v\n", fileName, uploadResult.Error)
return
}
- chunksChan <- uploadResult.ToPbFileChunk(assignResult.FileId, i*chunkSize)
+ chunksChan <- uploadResult.ToPbFileChunk(fileId, i*chunkSize)
- fmt.Printf("uploaded %s-%d to %s [%d,%d)\n", fileName, i+1, targetUrl, i*chunkSize, i*chunkSize+int64(uploadResult.Size))
+ fmt.Printf("uploaded %s-%d [%d,%d)\n", fileName, i+1, i*chunkSize, i*chunkSize+int64(uploadResult.Size))
}(i)
}
wg.Wait()