aboutsummaryrefslogtreecommitdiff
path: root/weed/replication/sink/filersink/fetch_write.go
diff options
context:
space:
mode:
Diffstat (limited to 'weed/replication/sink/filersink/fetch_write.go')
-rw-r--r--weed/replication/sink/filersink/fetch_write.go45
1 files changed, 23 insertions, 22 deletions
diff --git a/weed/replication/sink/filersink/fetch_write.go b/weed/replication/sink/filersink/fetch_write.go
index a7392d856..b5ea3e2cb 100644
--- a/weed/replication/sink/filersink/fetch_write.go
+++ b/weed/replication/sink/filersink/fetch_write.go
@@ -71,29 +71,30 @@ func (fs *FilerSink) fetchAndWrite(sourceChunk *filer_pb.FileChunk, path string)
var auth security.EncodedJwt
if err := fs.WithFilerClient(func(client filer_pb.SeaweedFilerClient) error {
+ return util.Retry("assignVolume", func() error {
+ request := &filer_pb.AssignVolumeRequest{
+ Count: 1,
+ Replication: fs.replication,
+ Collection: fs.collection,
+ TtlSec: fs.ttlSec,
+ DataCenter: fs.dataCenter,
+ DiskType: fs.diskType,
+ Path: path,
+ }
+
+ resp, err := client.AssignVolume(context.Background(), request)
+ if err != nil {
+ glog.V(0).Infof("assign volume failure %v: %v", request, err)
+ return err
+ }
+ if resp.Error != "" {
+ return fmt.Errorf("assign volume failure %v: %v", request, resp.Error)
+ }
+
+ fileId, host, auth = resp.FileId, resp.Url, security.EncodedJwt(resp.Auth)
- request := &filer_pb.AssignVolumeRequest{
- Count: 1,
- Replication: fs.replication,
- Collection: fs.collection,
- TtlSec: fs.ttlSec,
- DataCenter: fs.dataCenter,
- DiskType: fs.diskType,
- Path: path,
- }
-
- resp, err := client.AssignVolume(context.Background(), request)
- if err != nil {
- glog.V(0).Infof("assign volume failure %v: %v", request, err)
- return err
- }
- if resp.Error != "" {
- return fmt.Errorf("assign volume failure %v: %v", request, resp.Error)
- }
-
- fileId, host, auth = resp.FileId, resp.Url, security.EncodedJwt(resp.Auth)
-
- return nil
+ return nil
+ })
}); err != nil {
return "", fmt.Errorf("filerGrpcAddress assign volume: %v", err)
}