diff options
Diffstat (limited to 'weed/messaging/broker')
| -rw-r--r-- | weed/messaging/broker/broker_append.go | 8 | ||||
| -rw-r--r-- | weed/messaging/broker/broker_grpc_server_discovery.go | 6 | ||||
| -rw-r--r-- | weed/messaging/broker/broker_server.go | 10 |
3 files changed, 12 insertions, 12 deletions
diff --git a/weed/messaging/broker/broker_append.go b/weed/messaging/broker/broker_append.go index 9958a0752..9a31a8ac0 100644 --- a/weed/messaging/broker/broker_append.go +++ b/weed/messaging/broker/broker_append.go @@ -24,7 +24,7 @@ func (broker *MessageBroker) appendToFile(targetFile string, topicConfig *messag dir, name := util.FullPath(targetFile).DirAndName() // append the chunk - if err := broker.WithFilerClient(func(client filer_pb.SeaweedFilerClient) error { + if err := broker.WithFilerClient(false, func(client filer_pb.SeaweedFilerClient) error { request := &filer_pb.AppendToEntryRequest{ Directory: dir, @@ -51,7 +51,7 @@ func (broker *MessageBroker) assignAndUpload(topicConfig *messaging_pb.TopicConf var assignResult = &operation.AssignResult{} // assign a volume location - if err := broker.WithFilerClient(func(client filer_pb.SeaweedFilerClient) error { + if err := broker.WithFilerClient(false, func(client filer_pb.SeaweedFilerClient) error { assignErr := util.Retry("assignVolume", func() error { request := &filer_pb.AssignVolumeRequest{ @@ -108,10 +108,10 @@ func (broker *MessageBroker) assignAndUpload(topicConfig *messaging_pb.TopicConf var _ = filer_pb.FilerClient(&MessageBroker{}) -func (broker *MessageBroker) WithFilerClient(fn func(filer_pb.SeaweedFilerClient) error) (err error) { +func (broker *MessageBroker) WithFilerClient(streamingMode bool, fn func(filer_pb.SeaweedFilerClient) error) (err error) { for _, filer := range broker.option.Filers { - if err = pb.WithFilerClient(filer, broker.grpcDialOption, fn); err != nil { + if err = pb.WithFilerClient(streamingMode, filer, broker.grpcDialOption, fn); err != nil { if err == io.EOF { return } diff --git a/weed/messaging/broker/broker_grpc_server_discovery.go b/weed/messaging/broker/broker_grpc_server_discovery.go index 66821d404..5cd8edd33 100644 --- a/weed/messaging/broker/broker_grpc_server_discovery.go +++ b/weed/messaging/broker/broker_grpc_server_discovery.go @@ -34,7 +34,7 @@ func (broker *MessageBroker) FindBroker(c context.Context, request *messaging_pb targetTopicPartition := fmt.Sprintf(TopicPartitionFmt, request.Namespace, request.Topic, request.Parition) for _, filer := range broker.option.Filers { - err := broker.withFilerClient(filer, func(client filer_pb.SeaweedFilerClient) error { + err := broker.withFilerClient(false, filer, func(client filer_pb.SeaweedFilerClient) error { resp, err := client.LocateBroker(context.Background(), &filer_pb.LocateBrokerRequest{ Resource: targetTopicPartition, }) @@ -68,7 +68,7 @@ func (broker *MessageBroker) checkFilers() { found := false for !found { for _, filer := range broker.option.Filers { - err := broker.withFilerClient(filer, func(client filer_pb.SeaweedFilerClient) error { + err := broker.withFilerClient(false, filer, func(client filer_pb.SeaweedFilerClient) error { resp, err := client.GetFilerConfiguration(context.Background(), &filer_pb.GetFilerConfigurationRequest{}) if err != nil { return err @@ -93,7 +93,7 @@ func (broker *MessageBroker) checkFilers() { found = false for !found { for _, master := range masters { - err := broker.withMasterClient(master, func(client master_pb.SeaweedClient) error { + err := broker.withMasterClient(false, master, func(client master_pb.SeaweedClient) error { resp, err := client.ListClusterNodes(context.Background(), &master_pb.ListClusterNodesRequest{ ClientType: cluster.FilerType, }) diff --git a/weed/messaging/broker/broker_server.go b/weed/messaging/broker/broker_server.go index 193c1c689..acf2d6d34 100644 --- a/weed/messaging/broker/broker_server.go +++ b/weed/messaging/broker/broker_server.go @@ -49,7 +49,7 @@ func (broker *MessageBroker) keepConnectedToOneFiler() { for { for _, filer := range broker.option.Filers { - broker.withFilerClient(filer, func(client filer_pb.SeaweedFilerClient) error { + broker.withFilerClient(false, filer, func(client filer_pb.SeaweedFilerClient) error { ctx, cancel := context.WithCancel(context.Background()) defer cancel() stream, err := client.KeepConnected(ctx) @@ -101,15 +101,15 @@ func (broker *MessageBroker) keepConnectedToOneFiler() { } -func (broker *MessageBroker) withFilerClient(filer pb.ServerAddress, fn func(filer_pb.SeaweedFilerClient) error) error { +func (broker *MessageBroker) withFilerClient(streamingMode bool, filer pb.ServerAddress, fn func(filer_pb.SeaweedFilerClient) error) error { - return pb.WithFilerClient(filer, broker.grpcDialOption, fn) + return pb.WithFilerClient(streamingMode, filer, broker.grpcDialOption, fn) } -func (broker *MessageBroker) withMasterClient(master pb.ServerAddress, fn func(client master_pb.SeaweedClient) error) error { +func (broker *MessageBroker) withMasterClient(streamingMode bool, master pb.ServerAddress, fn func(client master_pb.SeaweedClient) error) error { - return pb.WithMasterClient(master, broker.grpcDialOption, func(client master_pb.SeaweedClient) error { + return pb.WithMasterClient(streamingMode, master, broker.grpcDialOption, func(client master_pb.SeaweedClient) error { return fn(client) }) |
