aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorChris Lu <chris.lu@gmail.com>2021-05-07 07:29:26 -0700
committerChris Lu <chris.lu@gmail.com>2021-05-07 07:29:26 -0700
commit8f8738867f10c98d59b11eaf165e1c6028d7b1d0 (patch)
treedb57540c45309f15d0efdaed274f84ece94f5a05
parent007401f3a0b87f0a3d07f16ea731d6eec63d6975 (diff)
downloadseaweedfs-8f8738867f10c98d59b11eaf165e1c6028d7b1d0.tar.xz
seaweedfs-8f8738867f10c98d59b11eaf165e1c6028d7b1d0.zip
add retry to assign volume
fix https://github.com/chrislusf/seaweedfs/issues/2056
-rw-r--r--weed/command/filer_copy.go76
-rw-r--r--weed/filesys/wfs_write.go53
-rw-r--r--weed/messaging/broker/broker_append.go45
-rw-r--r--weed/replication/sink/filersink/fetch_write.go45
-rw-r--r--weed/server/webdav_server.go41
5 files changed, 140 insertions, 120 deletions
diff --git a/weed/command/filer_copy.go b/weed/command/filer_copy.go
index ad3500915..dc729ed33 100644
--- a/weed/command/filer_copy.go
+++ b/weed/command/filer_copy.go
@@ -308,25 +308,27 @@ func (worker *FileCopyWorker) uploadFileAsOne(task FileCopyTask, f *os.File) err
}
// assign a volume
- err = pb.WithGrpcFilerClient(worker.filerGrpcAddress, worker.options.grpcDialOption, func(client filer_pb.SeaweedFilerClient) error {
-
- request := &filer_pb.AssignVolumeRequest{
- Count: 1,
- Replication: *worker.options.replication,
- Collection: *worker.options.collection,
- TtlSec: worker.options.ttlSec,
- DiskType: *worker.options.diskType,
- Path: task.destinationUrlPath,
- }
+ err = util.Retry("assignVolume", func() error {
+ return pb.WithGrpcFilerClient(worker.filerGrpcAddress, worker.options.grpcDialOption, func(client filer_pb.SeaweedFilerClient) error {
- assignResult, assignError = client.AssignVolume(context.Background(), request)
- if assignError != nil {
- return fmt.Errorf("assign volume failure %v: %v", request, assignError)
- }
- if assignResult.Error != "" {
- return fmt.Errorf("assign volume failure %v: %v", request, assignResult.Error)
- }
- return nil
+ request := &filer_pb.AssignVolumeRequest{
+ Count: 1,
+ Replication: *worker.options.replication,
+ Collection: *worker.options.collection,
+ TtlSec: worker.options.ttlSec,
+ DiskType: *worker.options.diskType,
+ Path: task.destinationUrlPath,
+ }
+
+ assignResult, assignError = client.AssignVolume(context.Background(), request)
+ if assignError != nil {
+ return fmt.Errorf("assign volume failure %v: %v", request, assignError)
+ }
+ if assignResult.Error != "" {
+ return fmt.Errorf("assign volume failure %v: %v", request, assignResult.Error)
+ }
+ return nil
+ })
})
if err != nil {
return fmt.Errorf("Failed to assign from %v: %v\n", worker.options.masters, err)
@@ -404,24 +406,26 @@ func (worker *FileCopyWorker) uploadFileInChunks(task FileCopyTask, f *os.File,
// assign a volume
var assignResult *filer_pb.AssignVolumeResponse
var assignError error
- err := pb.WithGrpcFilerClient(worker.filerGrpcAddress, worker.options.grpcDialOption, func(client filer_pb.SeaweedFilerClient) error {
- request := &filer_pb.AssignVolumeRequest{
- Count: 1,
- Replication: *worker.options.replication,
- Collection: *worker.options.collection,
- TtlSec: worker.options.ttlSec,
- DiskType: *worker.options.diskType,
- Path: task.destinationUrlPath + fileName,
- }
-
- assignResult, assignError = client.AssignVolume(context.Background(), request)
- if assignError != nil {
- return fmt.Errorf("assign volume failure %v: %v", request, assignError)
- }
- if assignResult.Error != "" {
- return fmt.Errorf("assign volume failure %v: %v", request, assignResult.Error)
- }
- return nil
+ err := util.Retry("assignVolume", func() error {
+ return pb.WithGrpcFilerClient(worker.filerGrpcAddress, worker.options.grpcDialOption, func(client filer_pb.SeaweedFilerClient) error {
+ request := &filer_pb.AssignVolumeRequest{
+ Count: 1,
+ Replication: *worker.options.replication,
+ Collection: *worker.options.collection,
+ TtlSec: worker.options.ttlSec,
+ DiskType: *worker.options.diskType,
+ Path: task.destinationUrlPath + fileName,
+ }
+
+ assignResult, assignError = client.AssignVolume(context.Background(), request)
+ if assignError != nil {
+ return fmt.Errorf("assign volume failure %v: %v", request, assignError)
+ }
+ if assignResult.Error != "" {
+ return fmt.Errorf("assign volume failure %v: %v", request, assignResult.Error)
+ }
+ return nil
+ })
})
if err != nil {
fmt.Printf("Failed to assign from %v: %v\n", worker.options.masters, err)
diff --git a/weed/filesys/wfs_write.go b/weed/filesys/wfs_write.go
index 9d2ce26ec..730578202 100644
--- a/weed/filesys/wfs_write.go
+++ b/weed/filesys/wfs_write.go
@@ -20,35 +20,36 @@ func (wfs *WFS) saveDataAsChunk(fullPath util.FullPath, writeOnly bool) filer.Sa
var auth security.EncodedJwt
if err := wfs.WithFilerClient(func(client filer_pb.SeaweedFilerClient) error {
+ return util.Retry("assignVolume", func() error {
+ request := &filer_pb.AssignVolumeRequest{
+ Count: 1,
+ Replication: wfs.option.Replication,
+ Collection: wfs.option.Collection,
+ TtlSec: wfs.option.TtlSec,
+ DiskType: string(wfs.option.DiskType),
+ DataCenter: wfs.option.DataCenter,
+ Path: string(fullPath),
+ }
- request := &filer_pb.AssignVolumeRequest{
- Count: 1,
- Replication: wfs.option.Replication,
- Collection: wfs.option.Collection,
- TtlSec: wfs.option.TtlSec,
- DiskType: string(wfs.option.DiskType),
- DataCenter: wfs.option.DataCenter,
- Path: string(fullPath),
- }
+ resp, err := client.AssignVolume(context.Background(), request)
+ if err != nil {
+ glog.V(0).Infof("assign volume failure %v: %v", request, err)
+ return err
+ }
+ if resp.Error != "" {
+ return fmt.Errorf("assign volume failure %v: %v", request, resp.Error)
+ }
- resp, err := client.AssignVolume(context.Background(), request)
- if err != nil {
- glog.V(0).Infof("assign volume failure %v: %v", request, err)
- return err
- }
- if resp.Error != "" {
- return fmt.Errorf("assign volume failure %v: %v", request, resp.Error)
- }
+ fileId, auth = resp.FileId, security.EncodedJwt(resp.Auth)
+ loc := &filer_pb.Location{
+ Url: resp.Url,
+ PublicUrl: resp.PublicUrl,
+ }
+ host = wfs.AdjustedUrl(loc)
+ collection, replication = resp.Collection, resp.Replication
- fileId, auth = resp.FileId, security.EncodedJwt(resp.Auth)
- loc := &filer_pb.Location{
- Url: resp.Url,
- PublicUrl: resp.PublicUrl,
- }
- host = wfs.AdjustedUrl(loc)
- collection, replication = resp.Collection, resp.Replication
-
- return nil
+ return nil
+ })
}); err != nil {
return nil, "", "", fmt.Errorf("filerGrpcAddress assign volume: %v", err)
}
diff --git a/weed/messaging/broker/broker_append.go b/weed/messaging/broker/broker_append.go
index 8e5b56fd0..40c807164 100644
--- a/weed/messaging/broker/broker_append.go
+++ b/weed/messaging/broker/broker_append.go
@@ -3,6 +3,7 @@ package broker
import (
"context"
"fmt"
+ "github.com/chrislusf/seaweedfs/weed/security"
"io"
"github.com/chrislusf/seaweedfs/weed/glog"
@@ -10,7 +11,6 @@ import (
"github.com/chrislusf/seaweedfs/weed/pb"
"github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
"github.com/chrislusf/seaweedfs/weed/pb/messaging_pb"
- "github.com/chrislusf/seaweedfs/weed/security"
"github.com/chrislusf/seaweedfs/weed/util"
)
@@ -53,26 +53,33 @@ func (broker *MessageBroker) assignAndUpload(topicConfig *messaging_pb.TopicConf
// assign a volume location
if err := broker.WithFilerClient(func(client filer_pb.SeaweedFilerClient) error {
- request := &filer_pb.AssignVolumeRequest{
- Count: 1,
- Replication: topicConfig.Replication,
- Collection: topicConfig.Collection,
- }
+ assignErr := util.Retry("assignVolume", func() error {
+ request := &filer_pb.AssignVolumeRequest{
+ Count: 1,
+ Replication: topicConfig.Replication,
+ Collection: topicConfig.Collection,
+ }
- resp, err := client.AssignVolume(context.Background(), request)
- if err != nil {
- glog.V(0).Infof("assign volume failure %v: %v", request, err)
- return err
- }
- if resp.Error != "" {
- return fmt.Errorf("assign volume failure %v: %v", request, resp.Error)
- }
+ resp, err := client.AssignVolume(context.Background(), request)
+ if err != nil {
+ glog.V(0).Infof("assign volume failure %v: %v", request, err)
+ return err
+ }
+ if resp.Error != "" {
+ return fmt.Errorf("assign volume failure %v: %v", request, resp.Error)
+ }
+
+ assignResult.Auth = security.EncodedJwt(resp.Auth)
+ assignResult.Fid = resp.FileId
+ assignResult.Url = resp.Url
+ assignResult.PublicUrl = resp.PublicUrl
+ assignResult.Count = uint64(resp.Count)
- assignResult.Auth = security.EncodedJwt(resp.Auth)
- assignResult.Fid = resp.FileId
- assignResult.Url = resp.Url
- assignResult.PublicUrl = resp.PublicUrl
- assignResult.Count = uint64(resp.Count)
+ return nil
+ })
+ if assignErr != nil {
+ return assignErr
+ }
return nil
}); err != nil {
diff --git a/weed/replication/sink/filersink/fetch_write.go b/weed/replication/sink/filersink/fetch_write.go
index a7392d856..b5ea3e2cb 100644
--- a/weed/replication/sink/filersink/fetch_write.go
+++ b/weed/replication/sink/filersink/fetch_write.go
@@ -71,29 +71,30 @@ func (fs *FilerSink) fetchAndWrite(sourceChunk *filer_pb.FileChunk, path string)
var auth security.EncodedJwt
if err := fs.WithFilerClient(func(client filer_pb.SeaweedFilerClient) error {
+ return util.Retry("assignVolume", func() error {
+ request := &filer_pb.AssignVolumeRequest{
+ Count: 1,
+ Replication: fs.replication,
+ Collection: fs.collection,
+ TtlSec: fs.ttlSec,
+ DataCenter: fs.dataCenter,
+ DiskType: fs.diskType,
+ Path: path,
+ }
+
+ resp, err := client.AssignVolume(context.Background(), request)
+ if err != nil {
+ glog.V(0).Infof("assign volume failure %v: %v", request, err)
+ return err
+ }
+ if resp.Error != "" {
+ return fmt.Errorf("assign volume failure %v: %v", request, resp.Error)
+ }
+
+ fileId, host, auth = resp.FileId, resp.Url, security.EncodedJwt(resp.Auth)
- request := &filer_pb.AssignVolumeRequest{
- Count: 1,
- Replication: fs.replication,
- Collection: fs.collection,
- TtlSec: fs.ttlSec,
- DataCenter: fs.dataCenter,
- DiskType: fs.diskType,
- Path: path,
- }
-
- resp, err := client.AssignVolume(context.Background(), request)
- if err != nil {
- glog.V(0).Infof("assign volume failure %v: %v", request, err)
- return err
- }
- if resp.Error != "" {
- return fmt.Errorf("assign volume failure %v: %v", request, resp.Error)
- }
-
- fileId, host, auth = resp.FileId, resp.Url, security.EncodedJwt(resp.Auth)
-
- return nil
+ return nil
+ })
}); err != nil {
return "", fmt.Errorf("filerGrpcAddress assign volume: %v", err)
}
diff --git a/weed/server/webdav_server.go b/weed/server/webdav_server.go
index c3f68fdee..c6550a36f 100644
--- a/weed/server/webdav_server.go
+++ b/weed/server/webdav_server.go
@@ -380,25 +380,32 @@ func (f *WebDavFile) saveDataAsChunk(reader io.Reader, name string, offset int64
ctx := context.Background()
- request := &filer_pb.AssignVolumeRequest{
- Count: 1,
- Replication: f.fs.option.Replication,
- Collection: f.fs.option.Collection,
- DiskType: f.fs.option.DiskType,
- Path: name,
- }
+ assignErr := util.Retry("assignVolume", func() error {
+ request := &filer_pb.AssignVolumeRequest{
+ Count: 1,
+ Replication: f.fs.option.Replication,
+ Collection: f.fs.option.Collection,
+ DiskType: f.fs.option.DiskType,
+ Path: name,
+ }
- resp, err := client.AssignVolume(ctx, request)
- if err != nil {
- glog.V(0).Infof("assign volume failure %v: %v", request, err)
- return err
- }
- if resp.Error != "" {
- return fmt.Errorf("assign volume failure %v: %v", request, resp.Error)
- }
+ resp, err := client.AssignVolume(ctx, request)
+ if err != nil {
+ glog.V(0).Infof("assign volume failure %v: %v", request, err)
+ return err
+ }
+ if resp.Error != "" {
+ return fmt.Errorf("assign volume failure %v: %v", request, resp.Error)
+ }
+
+ fileId, host, auth = resp.FileId, resp.Url, security.EncodedJwt(resp.Auth)
+ f.collection, f.replication = resp.Collection, resp.Replication
- fileId, host, auth = resp.FileId, resp.Url, security.EncodedJwt(resp.Auth)
- f.collection, f.replication = resp.Collection, resp.Replication
+ return nil
+ })
+ if assignErr != nil {
+ return assignErr
+ }
return nil
}); flushErr != nil {