diff options
| author | Chris Lu <chris.lu@gmail.com> | 2021-05-07 07:29:26 -0700 |
|---|---|---|
| committer | Chris Lu <chris.lu@gmail.com> | 2021-05-07 07:29:26 -0700 |
| commit | 8f8738867f10c98d59b11eaf165e1c6028d7b1d0 (patch) | |
| tree | db57540c45309f15d0efdaed274f84ece94f5a05 /weed/command/filer_copy.go | |
| parent | 007401f3a0b87f0a3d07f16ea731d6eec63d6975 (diff) | |
| download | seaweedfs-8f8738867f10c98d59b11eaf165e1c6028d7b1d0.tar.xz seaweedfs-8f8738867f10c98d59b11eaf165e1c6028d7b1d0.zip | |
add retry to assign volume
fix https://github.com/chrislusf/seaweedfs/issues/2056
Diffstat (limited to 'weed/command/filer_copy.go')
| -rw-r--r-- | weed/command/filer_copy.go | 76 |
1 files changed, 40 insertions, 36 deletions
diff --git a/weed/command/filer_copy.go b/weed/command/filer_copy.go index ad3500915..dc729ed33 100644 --- a/weed/command/filer_copy.go +++ b/weed/command/filer_copy.go @@ -308,25 +308,27 @@ func (worker *FileCopyWorker) uploadFileAsOne(task FileCopyTask, f *os.File) err } // assign a volume - err = pb.WithGrpcFilerClient(worker.filerGrpcAddress, worker.options.grpcDialOption, func(client filer_pb.SeaweedFilerClient) error { - - request := &filer_pb.AssignVolumeRequest{ - Count: 1, - Replication: *worker.options.replication, - Collection: *worker.options.collection, - TtlSec: worker.options.ttlSec, - DiskType: *worker.options.diskType, - Path: task.destinationUrlPath, - } + err = util.Retry("assignVolume", func() error { + return pb.WithGrpcFilerClient(worker.filerGrpcAddress, worker.options.grpcDialOption, func(client filer_pb.SeaweedFilerClient) error { - assignResult, assignError = client.AssignVolume(context.Background(), request) - if assignError != nil { - return fmt.Errorf("assign volume failure %v: %v", request, assignError) - } - if assignResult.Error != "" { - return fmt.Errorf("assign volume failure %v: %v", request, assignResult.Error) - } - return nil + request := &filer_pb.AssignVolumeRequest{ + Count: 1, + Replication: *worker.options.replication, + Collection: *worker.options.collection, + TtlSec: worker.options.ttlSec, + DiskType: *worker.options.diskType, + Path: task.destinationUrlPath, + } + + assignResult, assignError = client.AssignVolume(context.Background(), request) + if assignError != nil { + return fmt.Errorf("assign volume failure %v: %v", request, assignError) + } + if assignResult.Error != "" { + return fmt.Errorf("assign volume failure %v: %v", request, assignResult.Error) + } + return nil + }) }) if err != nil { return fmt.Errorf("Failed to assign from %v: %v\n", worker.options.masters, err) @@ -404,24 +406,26 @@ func (worker *FileCopyWorker) uploadFileInChunks(task FileCopyTask, f *os.File, // assign a volume var assignResult *filer_pb.AssignVolumeResponse var assignError error - err := pb.WithGrpcFilerClient(worker.filerGrpcAddress, worker.options.grpcDialOption, func(client filer_pb.SeaweedFilerClient) error { - request := &filer_pb.AssignVolumeRequest{ - Count: 1, - Replication: *worker.options.replication, - Collection: *worker.options.collection, - TtlSec: worker.options.ttlSec, - DiskType: *worker.options.diskType, - Path: task.destinationUrlPath + fileName, - } - - assignResult, assignError = client.AssignVolume(context.Background(), request) - if assignError != nil { - return fmt.Errorf("assign volume failure %v: %v", request, assignError) - } - if assignResult.Error != "" { - return fmt.Errorf("assign volume failure %v: %v", request, assignResult.Error) - } - return nil + err := util.Retry("assignVolume", func() error { + return pb.WithGrpcFilerClient(worker.filerGrpcAddress, worker.options.grpcDialOption, func(client filer_pb.SeaweedFilerClient) error { + request := &filer_pb.AssignVolumeRequest{ + Count: 1, + Replication: *worker.options.replication, + Collection: *worker.options.collection, + TtlSec: worker.options.ttlSec, + DiskType: *worker.options.diskType, + Path: task.destinationUrlPath + fileName, + } + + assignResult, assignError = client.AssignVolume(context.Background(), request) + if assignError != nil { + return fmt.Errorf("assign volume failure %v: %v", request, assignError) + } + if assignResult.Error != "" { + return fmt.Errorf("assign volume failure %v: %v", request, assignResult.Error) + } + return nil + }) }) if err != nil { fmt.Printf("Failed to assign from %v: %v\n", worker.options.masters, err) |
