aboutsummaryrefslogtreecommitdiff
path: root/weed/messaging/broker/broker_server.go
diff options
context:
space:
mode:
Diffstat (limited to 'weed/messaging/broker/broker_server.go')
-rw-r--r--weed/messaging/broker/broker_server.go10
1 files changed, 5 insertions, 5 deletions
diff --git a/weed/messaging/broker/broker_server.go b/weed/messaging/broker/broker_server.go
index 193c1c689..acf2d6d34 100644
--- a/weed/messaging/broker/broker_server.go
+++ b/weed/messaging/broker/broker_server.go
@@ -49,7 +49,7 @@ func (broker *MessageBroker) keepConnectedToOneFiler() {
for {
for _, filer := range broker.option.Filers {
- broker.withFilerClient(filer, func(client filer_pb.SeaweedFilerClient) error {
+ broker.withFilerClient(false, filer, func(client filer_pb.SeaweedFilerClient) error {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
stream, err := client.KeepConnected(ctx)
@@ -101,15 +101,15 @@ func (broker *MessageBroker) keepConnectedToOneFiler() {
}
-func (broker *MessageBroker) withFilerClient(filer pb.ServerAddress, fn func(filer_pb.SeaweedFilerClient) error) error {
+func (broker *MessageBroker) withFilerClient(streamingMode bool, filer pb.ServerAddress, fn func(filer_pb.SeaweedFilerClient) error) error {
- return pb.WithFilerClient(filer, broker.grpcDialOption, fn)
+ return pb.WithFilerClient(streamingMode, filer, broker.grpcDialOption, fn)
}
-func (broker *MessageBroker) withMasterClient(master pb.ServerAddress, fn func(client master_pb.SeaweedClient) error) error {
+func (broker *MessageBroker) withMasterClient(streamingMode bool, master pb.ServerAddress, fn func(client master_pb.SeaweedClient) error) error {
- return pb.WithMasterClient(master, broker.grpcDialOption, func(client master_pb.SeaweedClient) error {
+ return pb.WithMasterClient(streamingMode, master, broker.grpcDialOption, func(client master_pb.SeaweedClient) error {
return fn(client)
})