diff options
Diffstat (limited to 'weed/mq/broker/broker_grpc_pub.go')
| -rw-r--r-- | weed/mq/broker/broker_grpc_pub.go | 34 |
1 files changed, 19 insertions, 15 deletions
diff --git a/weed/mq/broker/broker_grpc_pub.go b/weed/mq/broker/broker_grpc_pub.go index 17d01f620..8c46ea99d 100644 --- a/weed/mq/broker/broker_grpc_pub.go +++ b/weed/mq/broker/broker_grpc_pub.go @@ -5,6 +5,7 @@ import ( "fmt" "github.com/seaweedfs/seaweedfs/weed/glog" "github.com/seaweedfs/seaweedfs/weed/mq/topic" + "github.com/seaweedfs/seaweedfs/weed/pb" "github.com/seaweedfs/seaweedfs/weed/pb/mq_pb" "google.golang.org/grpc/peer" "io" @@ -59,6 +60,21 @@ func (b *MessageQueueBroker) PublishMessage(stream mq_pb.SeaweedMessaging_Publis return stream.Send(response) } ackInterval = int(initMessage.AckInterval) + for _, follower := range initMessage.FollowerBrokers { + followErr := b.withBrokerClient(false, pb.ServerAddress(follower), func(client mq_pb.SeaweedMessagingClient) error { + _, err := client.PublishFollowMe(context.Background(), &mq_pb.PublishFollowMeRequest{ + Topic: initMessage.Topic, + Partition: initMessage.Partition, + BrokerSelf: string(b.option.BrokerAddress()), + }) + return err + }) + if followErr != nil { + response.Error = fmt.Sprintf("follower %v failed: %v", follower, followErr) + glog.Errorf("follower %v failed: %v", follower, followErr) + return stream.Send(response) + } + } stream.Send(response) } else { response.Error = fmt.Sprintf("missing init message") @@ -86,21 +102,9 @@ func (b *MessageQueueBroker) PublishMessage(stream mq_pb.SeaweedMessaging_Publis glog.V(0).Infof("topic %v partition %v published %d messges.", initMessage.Topic, initMessage.Partition, ackSequence) }() go func() { - for { - select { - case resp := <-respChan: - if resp != nil { - if err := stream.Send(resp); err != nil { - glog.Errorf("Error sending response %v: %v", resp, err) - } - } else { - return - } - case <-localTopicPartition.StopPublishersCh: - respChan <- &mq_pb.PublishMessageResponse{ - AckSequence: ackSequence, - ShouldClose: true, - } + for resp := range respChan { + if err := stream.Send(resp); err != nil { + glog.Errorf("Error sending response %v: %v", resp, err) } } }() |
