diff options
Diffstat (limited to 'weed/messaging/broker/broker_append.go')
| -rw-r--r-- | weed/messaging/broker/broker_append.go | 8 |
1 files changed, 4 insertions, 4 deletions
diff --git a/weed/messaging/broker/broker_append.go b/weed/messaging/broker/broker_append.go index 9958a0752..9a31a8ac0 100644 --- a/weed/messaging/broker/broker_append.go +++ b/weed/messaging/broker/broker_append.go @@ -24,7 +24,7 @@ func (broker *MessageBroker) appendToFile(targetFile string, topicConfig *messag dir, name := util.FullPath(targetFile).DirAndName() // append the chunk - if err := broker.WithFilerClient(func(client filer_pb.SeaweedFilerClient) error { + if err := broker.WithFilerClient(false, func(client filer_pb.SeaweedFilerClient) error { request := &filer_pb.AppendToEntryRequest{ Directory: dir, @@ -51,7 +51,7 @@ func (broker *MessageBroker) assignAndUpload(topicConfig *messaging_pb.TopicConf var assignResult = &operation.AssignResult{} // assign a volume location - if err := broker.WithFilerClient(func(client filer_pb.SeaweedFilerClient) error { + if err := broker.WithFilerClient(false, func(client filer_pb.SeaweedFilerClient) error { assignErr := util.Retry("assignVolume", func() error { request := &filer_pb.AssignVolumeRequest{ @@ -108,10 +108,10 @@ func (broker *MessageBroker) assignAndUpload(topicConfig *messaging_pb.TopicConf var _ = filer_pb.FilerClient(&MessageBroker{}) -func (broker *MessageBroker) WithFilerClient(fn func(filer_pb.SeaweedFilerClient) error) (err error) { +func (broker *MessageBroker) WithFilerClient(streamingMode bool, fn func(filer_pb.SeaweedFilerClient) error) (err error) { for _, filer := range broker.option.Filers { - if err = pb.WithFilerClient(filer, broker.grpcDialOption, fn); err != nil { + if err = pb.WithFilerClient(streamingMode, filer, broker.grpcDialOption, fn); err != nil { if err == io.EOF { return } |
