aboutsummaryrefslogtreecommitdiff
path: root/weed/messaging
diff options
context:
space:
mode:
Diffstat (limited to 'weed/messaging')
-rw-r--r--weed/messaging/broker/broker_append.go8
-rw-r--r--weed/messaging/broker/broker_grpc_server_discovery.go6
-rw-r--r--weed/messaging/broker/broker_server.go10
3 files changed, 12 insertions, 12 deletions
diff --git a/weed/messaging/broker/broker_append.go b/weed/messaging/broker/broker_append.go
index 9958a0752..9a31a8ac0 100644
--- a/weed/messaging/broker/broker_append.go
+++ b/weed/messaging/broker/broker_append.go
@@ -24,7 +24,7 @@ func (broker *MessageBroker) appendToFile(targetFile string, topicConfig *messag
dir, name := util.FullPath(targetFile).DirAndName()
// append the chunk
- if err := broker.WithFilerClient(func(client filer_pb.SeaweedFilerClient) error {
+ if err := broker.WithFilerClient(false, func(client filer_pb.SeaweedFilerClient) error {
request := &filer_pb.AppendToEntryRequest{
Directory: dir,
@@ -51,7 +51,7 @@ func (broker *MessageBroker) assignAndUpload(topicConfig *messaging_pb.TopicConf
var assignResult = &operation.AssignResult{}
// assign a volume location
- if err := broker.WithFilerClient(func(client filer_pb.SeaweedFilerClient) error {
+ if err := broker.WithFilerClient(false, func(client filer_pb.SeaweedFilerClient) error {
assignErr := util.Retry("assignVolume", func() error {
request := &filer_pb.AssignVolumeRequest{
@@ -108,10 +108,10 @@ func (broker *MessageBroker) assignAndUpload(topicConfig *messaging_pb.TopicConf
var _ = filer_pb.FilerClient(&MessageBroker{})
-func (broker *MessageBroker) WithFilerClient(fn func(filer_pb.SeaweedFilerClient) error) (err error) {
+func (broker *MessageBroker) WithFilerClient(streamingMode bool, fn func(filer_pb.SeaweedFilerClient) error) (err error) {
for _, filer := range broker.option.Filers {
- if err = pb.WithFilerClient(filer, broker.grpcDialOption, fn); err != nil {
+ if err = pb.WithFilerClient(streamingMode, filer, broker.grpcDialOption, fn); err != nil {
if err == io.EOF {
return
}
diff --git a/weed/messaging/broker/broker_grpc_server_discovery.go b/weed/messaging/broker/broker_grpc_server_discovery.go
index 66821d404..5cd8edd33 100644
--- a/weed/messaging/broker/broker_grpc_server_discovery.go
+++ b/weed/messaging/broker/broker_grpc_server_discovery.go
@@ -34,7 +34,7 @@ func (broker *MessageBroker) FindBroker(c context.Context, request *messaging_pb
targetTopicPartition := fmt.Sprintf(TopicPartitionFmt, request.Namespace, request.Topic, request.Parition)
for _, filer := range broker.option.Filers {
- err := broker.withFilerClient(filer, func(client filer_pb.SeaweedFilerClient) error {
+ err := broker.withFilerClient(false, filer, func(client filer_pb.SeaweedFilerClient) error {
resp, err := client.LocateBroker(context.Background(), &filer_pb.LocateBrokerRequest{
Resource: targetTopicPartition,
})
@@ -68,7 +68,7 @@ func (broker *MessageBroker) checkFilers() {
found := false
for !found {
for _, filer := range broker.option.Filers {
- err := broker.withFilerClient(filer, func(client filer_pb.SeaweedFilerClient) error {
+ err := broker.withFilerClient(false, filer, func(client filer_pb.SeaweedFilerClient) error {
resp, err := client.GetFilerConfiguration(context.Background(), &filer_pb.GetFilerConfigurationRequest{})
if err != nil {
return err
@@ -93,7 +93,7 @@ func (broker *MessageBroker) checkFilers() {
found = false
for !found {
for _, master := range masters {
- err := broker.withMasterClient(master, func(client master_pb.SeaweedClient) error {
+ err := broker.withMasterClient(false, master, func(client master_pb.SeaweedClient) error {
resp, err := client.ListClusterNodes(context.Background(), &master_pb.ListClusterNodesRequest{
ClientType: cluster.FilerType,
})
diff --git a/weed/messaging/broker/broker_server.go b/weed/messaging/broker/broker_server.go
index 193c1c689..acf2d6d34 100644
--- a/weed/messaging/broker/broker_server.go
+++ b/weed/messaging/broker/broker_server.go
@@ -49,7 +49,7 @@ func (broker *MessageBroker) keepConnectedToOneFiler() {
for {
for _, filer := range broker.option.Filers {
- broker.withFilerClient(filer, func(client filer_pb.SeaweedFilerClient) error {
+ broker.withFilerClient(false, filer, func(client filer_pb.SeaweedFilerClient) error {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
stream, err := client.KeepConnected(ctx)
@@ -101,15 +101,15 @@ func (broker *MessageBroker) keepConnectedToOneFiler() {
}
-func (broker *MessageBroker) withFilerClient(filer pb.ServerAddress, fn func(filer_pb.SeaweedFilerClient) error) error {
+func (broker *MessageBroker) withFilerClient(streamingMode bool, filer pb.ServerAddress, fn func(filer_pb.SeaweedFilerClient) error) error {
- return pb.WithFilerClient(filer, broker.grpcDialOption, fn)
+ return pb.WithFilerClient(streamingMode, filer, broker.grpcDialOption, fn)
}
-func (broker *MessageBroker) withMasterClient(master pb.ServerAddress, fn func(client master_pb.SeaweedClient) error) error {
+func (broker *MessageBroker) withMasterClient(streamingMode bool, master pb.ServerAddress, fn func(client master_pb.SeaweedClient) error) error {
- return pb.WithMasterClient(master, broker.grpcDialOption, func(client master_pb.SeaweedClient) error {
+ return pb.WithMasterClient(streamingMode, master, broker.grpcDialOption, func(client master_pb.SeaweedClient) error {
return fn(client)
})