aboutsummaryrefslogtreecommitdiff
path: root/weed/messaging/broker/broker_append.go
diff options
context:
space:
mode:
Diffstat (limited to 'weed/messaging/broker/broker_append.go')
-rw-r--r--weed/messaging/broker/broker_append.go45
1 files changed, 26 insertions, 19 deletions
diff --git a/weed/messaging/broker/broker_append.go b/weed/messaging/broker/broker_append.go
index 8e5b56fd0..40c807164 100644
--- a/weed/messaging/broker/broker_append.go
+++ b/weed/messaging/broker/broker_append.go
@@ -3,6 +3,7 @@ package broker
import (
"context"
"fmt"
+ "github.com/chrislusf/seaweedfs/weed/security"
"io"
"github.com/chrislusf/seaweedfs/weed/glog"
@@ -10,7 +11,6 @@ import (
"github.com/chrislusf/seaweedfs/weed/pb"
"github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
"github.com/chrislusf/seaweedfs/weed/pb/messaging_pb"
- "github.com/chrislusf/seaweedfs/weed/security"
"github.com/chrislusf/seaweedfs/weed/util"
)
@@ -53,26 +53,33 @@ func (broker *MessageBroker) assignAndUpload(topicConfig *messaging_pb.TopicConf
// assign a volume location
if err := broker.WithFilerClient(func(client filer_pb.SeaweedFilerClient) error {
- request := &filer_pb.AssignVolumeRequest{
- Count: 1,
- Replication: topicConfig.Replication,
- Collection: topicConfig.Collection,
- }
+ assignErr := util.Retry("assignVolume", func() error {
+ request := &filer_pb.AssignVolumeRequest{
+ Count: 1,
+ Replication: topicConfig.Replication,
+ Collection: topicConfig.Collection,
+ }
- resp, err := client.AssignVolume(context.Background(), request)
- if err != nil {
- glog.V(0).Infof("assign volume failure %v: %v", request, err)
- return err
- }
- if resp.Error != "" {
- return fmt.Errorf("assign volume failure %v: %v", request, resp.Error)
- }
+ resp, err := client.AssignVolume(context.Background(), request)
+ if err != nil {
+ glog.V(0).Infof("assign volume failure %v: %v", request, err)
+ return err
+ }
+ if resp.Error != "" {
+ return fmt.Errorf("assign volume failure %v: %v", request, resp.Error)
+ }
+
+ assignResult.Auth = security.EncodedJwt(resp.Auth)
+ assignResult.Fid = resp.FileId
+ assignResult.Url = resp.Url
+ assignResult.PublicUrl = resp.PublicUrl
+ assignResult.Count = uint64(resp.Count)
- assignResult.Auth = security.EncodedJwt(resp.Auth)
- assignResult.Fid = resp.FileId
- assignResult.Url = resp.Url
- assignResult.PublicUrl = resp.PublicUrl
- assignResult.Count = uint64(resp.Count)
+ return nil
+ })
+ if assignErr != nil {
+ return assignErr
+ }
return nil
}); err != nil {