aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorchrislu <chris.lu@gmail.com>2022-08-20 22:38:15 -0700
committerchrislu <chris.lu@gmail.com>2022-08-20 22:38:15 -0700
commit973f6dd1627996176bec3996372cfa48ed5a5240 (patch)
tree196b7304ae82d6ecf1d8ca1ed37351ea1ddb7b22
parent65ff7198feb0725346e3c1dcf7eaab7c34140bc0 (diff)
downloadseaweedfs-973f6dd1627996176bec3996372cfa48ed5a5240.tar.xz
seaweedfs-973f6dd1627996176bec3996372cfa48ed5a5240.zip
refactoring
-rw-r--r--weed/command/filer_copy.go76
1 files changed, 20 insertions, 56 deletions
diff --git a/weed/command/filer_copy.go b/weed/command/filer_copy.go
index 5fbddc07e..977e2a6b8 100644
--- a/weed/command/filer_copy.go
+++ b/weed/command/filer_copy.go
@@ -331,8 +331,6 @@ func (worker *FileCopyWorker) uploadFileAsOne(task FileCopyTask, f *os.File) err
var mimeType string
var chunks []*filer_pb.FileChunk
- var assignResult *filer_pb.AssignVolumeResponse
- var assignError error
if task.fileMode&os.ModeDir == 0 && task.fileSize > 0 {
@@ -342,66 +340,32 @@ func (worker *FileCopyWorker) uploadFileAsOne(task FileCopyTask, f *os.File) err
return err
}
- err = util.Retry("upload", func() error {
- // assign a volume
- assignErr := pb.WithGrpcFilerClient(false, worker.filerAddress, worker.options.grpcDialOption, func(client filer_pb.SeaweedFilerClient) error {
-
- request := &filer_pb.AssignVolumeRequest{
- Count: 1,
- Replication: *worker.options.replication,
- Collection: *worker.options.collection,
- TtlSec: worker.options.ttlSec,
- DiskType: *worker.options.diskType,
- Path: task.destinationUrlPath,
- }
-
- assignResult, assignError = client.AssignVolume(context.Background(), request)
- if assignError != nil {
- return fmt.Errorf("assign volume failure %v: %v", request, assignError)
- }
- if assignResult.Error != "" {
- return fmt.Errorf("assign volume failure %v: %v", request, assignResult.Error)
- }
- if assignResult.Location.Url == "" {
- return fmt.Errorf("assign volume failure %v: %v", request, assignResult)
- }
- return nil
- })
- if assignErr != nil {
- return assignErr
- }
-
- // upload data
- targetUrl := "http://" + assignResult.Location.Url + "/" + assignResult.FileId
- uploadOption := &operation.UploadOption{
- UploadUrl: targetUrl,
+ finalFileId, uploadResult, flushErr, _ := operation.UploadWithRetry(
+ worker,
+ &filer_pb.AssignVolumeRequest{
+ Count: 1,
+ Replication: *worker.options.replication,
+ Collection: *worker.options.collection,
+ TtlSec: worker.options.ttlSec,
+ DiskType: *worker.options.diskType,
+ Path: task.destinationUrlPath,
+ },
+ &operation.UploadOption{
Filename: fileName,
Cipher: worker.options.cipher,
IsInputCompressed: false,
MimeType: mimeType,
PairMap: nil,
- Jwt: security.EncodedJwt(assignResult.Auth),
- }
- uploadResult, err := operation.UploadData(data, uploadOption)
- if err != nil {
- return fmt.Errorf("upload data %v to %s: %v\n", fileName, targetUrl, err)
- }
- if uploadResult.Error != "" {
- return fmt.Errorf("upload %v to %s result: %v\n", fileName, targetUrl, uploadResult.Error)
- }
- if *worker.options.verbose {
- fmt.Printf("uploaded %s to %s\n", fileName, targetUrl)
- }
-
- fmt.Printf("copied %s => http://%s%s%s\n", f.Name(), worker.filerAddress.ToHttpAddress(), task.destinationUrlPath, fileName)
- chunks = append(chunks, uploadResult.ToPbFileChunk(assignResult.FileId, 0))
-
- return nil
- })
- if err != nil {
- return fmt.Errorf("upload %v: %v\n", fileName, err)
+ },
+ func(host, fileId string) string {
+ return fmt.Sprintf("http://%s/%s", host, fileId)
+ },
+ util.NewBytesReader(data),
+ )
+ if flushErr != nil {
+ return flushErr
}
-
+ chunks = append(chunks, uploadResult.ToPbFileChunk(finalFileId, 0))
}
if err := pb.WithGrpcFilerClient(false, worker.filerAddress, worker.options.grpcDialOption, func(client filer_pb.SeaweedFilerClient) error {