aboutsummaryrefslogtreecommitdiff
path: root/weed/command/filer_copy.go
diff options
context:
space:
mode:
authorChris Lu <chris.lu@gmail.com>2021-05-07 07:29:26 -0700
committerChris Lu <chris.lu@gmail.com>2021-05-07 07:29:26 -0700
commit8f8738867f10c98d59b11eaf165e1c6028d7b1d0 (patch)
treedb57540c45309f15d0efdaed274f84ece94f5a05 /weed/command/filer_copy.go
parent007401f3a0b87f0a3d07f16ea731d6eec63d6975 (diff)
downloadseaweedfs-8f8738867f10c98d59b11eaf165e1c6028d7b1d0.tar.xz
seaweedfs-8f8738867f10c98d59b11eaf165e1c6028d7b1d0.zip
add retry to assign volume
fix https://github.com/chrislusf/seaweedfs/issues/2056
Diffstat (limited to 'weed/command/filer_copy.go')
-rw-r--r--weed/command/filer_copy.go76
1 files changed, 40 insertions, 36 deletions
diff --git a/weed/command/filer_copy.go b/weed/command/filer_copy.go
index ad3500915..dc729ed33 100644
--- a/weed/command/filer_copy.go
+++ b/weed/command/filer_copy.go
@@ -308,25 +308,27 @@ func (worker *FileCopyWorker) uploadFileAsOne(task FileCopyTask, f *os.File) err
}
// assign a volume
- err = pb.WithGrpcFilerClient(worker.filerGrpcAddress, worker.options.grpcDialOption, func(client filer_pb.SeaweedFilerClient) error {
-
- request := &filer_pb.AssignVolumeRequest{
- Count: 1,
- Replication: *worker.options.replication,
- Collection: *worker.options.collection,
- TtlSec: worker.options.ttlSec,
- DiskType: *worker.options.diskType,
- Path: task.destinationUrlPath,
- }
+ err = util.Retry("assignVolume", func() error {
+ return pb.WithGrpcFilerClient(worker.filerGrpcAddress, worker.options.grpcDialOption, func(client filer_pb.SeaweedFilerClient) error {
- assignResult, assignError = client.AssignVolume(context.Background(), request)
- if assignError != nil {
- return fmt.Errorf("assign volume failure %v: %v", request, assignError)
- }
- if assignResult.Error != "" {
- return fmt.Errorf("assign volume failure %v: %v", request, assignResult.Error)
- }
- return nil
+ request := &filer_pb.AssignVolumeRequest{
+ Count: 1,
+ Replication: *worker.options.replication,
+ Collection: *worker.options.collection,
+ TtlSec: worker.options.ttlSec,
+ DiskType: *worker.options.diskType,
+ Path: task.destinationUrlPath,
+ }
+
+ assignResult, assignError = client.AssignVolume(context.Background(), request)
+ if assignError != nil {
+ return fmt.Errorf("assign volume failure %v: %v", request, assignError)
+ }
+ if assignResult.Error != "" {
+ return fmt.Errorf("assign volume failure %v: %v", request, assignResult.Error)
+ }
+ return nil
+ })
})
if err != nil {
return fmt.Errorf("Failed to assign from %v: %v\n", worker.options.masters, err)
@@ -404,24 +406,26 @@ func (worker *FileCopyWorker) uploadFileInChunks(task FileCopyTask, f *os.File,
// assign a volume
var assignResult *filer_pb.AssignVolumeResponse
var assignError error
- err := pb.WithGrpcFilerClient(worker.filerGrpcAddress, worker.options.grpcDialOption, func(client filer_pb.SeaweedFilerClient) error {
- request := &filer_pb.AssignVolumeRequest{
- Count: 1,
- Replication: *worker.options.replication,
- Collection: *worker.options.collection,
- TtlSec: worker.options.ttlSec,
- DiskType: *worker.options.diskType,
- Path: task.destinationUrlPath + fileName,
- }
-
- assignResult, assignError = client.AssignVolume(context.Background(), request)
- if assignError != nil {
- return fmt.Errorf("assign volume failure %v: %v", request, assignError)
- }
- if assignResult.Error != "" {
- return fmt.Errorf("assign volume failure %v: %v", request, assignResult.Error)
- }
- return nil
+ err := util.Retry("assignVolume", func() error {
+ return pb.WithGrpcFilerClient(worker.filerGrpcAddress, worker.options.grpcDialOption, func(client filer_pb.SeaweedFilerClient) error {
+ request := &filer_pb.AssignVolumeRequest{
+ Count: 1,
+ Replication: *worker.options.replication,
+ Collection: *worker.options.collection,
+ TtlSec: worker.options.ttlSec,
+ DiskType: *worker.options.diskType,
+ Path: task.destinationUrlPath + fileName,
+ }
+
+ assignResult, assignError = client.AssignVolume(context.Background(), request)
+ if assignError != nil {
+ return fmt.Errorf("assign volume failure %v: %v", request, assignError)
+ }
+ if assignResult.Error != "" {
+ return fmt.Errorf("assign volume failure %v: %v", request, assignResult.Error)
+ }
+ return nil
+ })
})
if err != nil {
fmt.Printf("Failed to assign from %v: %v\n", worker.options.masters, err)