aboutsummaryrefslogtreecommitdiff
path: root/weed/messaging/broker/broker_append.go
diff options
context:
space:
mode:
Diffstat (limited to 'weed/messaging/broker/broker_append.go')
-rw-r--r--weed/messaging/broker/broker_append.go8
1 files changed, 4 insertions, 4 deletions
diff --git a/weed/messaging/broker/broker_append.go b/weed/messaging/broker/broker_append.go
index 9958a0752..9a31a8ac0 100644
--- a/weed/messaging/broker/broker_append.go
+++ b/weed/messaging/broker/broker_append.go
@@ -24,7 +24,7 @@ func (broker *MessageBroker) appendToFile(targetFile string, topicConfig *messag
dir, name := util.FullPath(targetFile).DirAndName()
// append the chunk
- if err := broker.WithFilerClient(func(client filer_pb.SeaweedFilerClient) error {
+ if err := broker.WithFilerClient(false, func(client filer_pb.SeaweedFilerClient) error {
request := &filer_pb.AppendToEntryRequest{
Directory: dir,
@@ -51,7 +51,7 @@ func (broker *MessageBroker) assignAndUpload(topicConfig *messaging_pb.TopicConf
var assignResult = &operation.AssignResult{}
// assign a volume location
- if err := broker.WithFilerClient(func(client filer_pb.SeaweedFilerClient) error {
+ if err := broker.WithFilerClient(false, func(client filer_pb.SeaweedFilerClient) error {
assignErr := util.Retry("assignVolume", func() error {
request := &filer_pb.AssignVolumeRequest{
@@ -108,10 +108,10 @@ func (broker *MessageBroker) assignAndUpload(topicConfig *messaging_pb.TopicConf
var _ = filer_pb.FilerClient(&MessageBroker{})
-func (broker *MessageBroker) WithFilerClient(fn func(filer_pb.SeaweedFilerClient) error) (err error) {
+func (broker *MessageBroker) WithFilerClient(streamingMode bool, fn func(filer_pb.SeaweedFilerClient) error) (err error) {
for _, filer := range broker.option.Filers {
- if err = pb.WithFilerClient(filer, broker.grpcDialOption, fn); err != nil {
+ if err = pb.WithFilerClient(streamingMode, filer, broker.grpcDialOption, fn); err != nil {
if err == io.EOF {
return
}