aboutsummaryrefslogtreecommitdiff
path: root/weed/mq/broker/broker_grpc_pub.go
diff options
context:
space:
mode:
Diffstat (limited to 'weed/mq/broker/broker_grpc_pub.go')
-rw-r--r--weed/mq/broker/broker_grpc_pub.go34
1 files changed, 19 insertions, 15 deletions
diff --git a/weed/mq/broker/broker_grpc_pub.go b/weed/mq/broker/broker_grpc_pub.go
index 17d01f620..8c46ea99d 100644
--- a/weed/mq/broker/broker_grpc_pub.go
+++ b/weed/mq/broker/broker_grpc_pub.go
@@ -5,6 +5,7 @@ import (
"fmt"
"github.com/seaweedfs/seaweedfs/weed/glog"
"github.com/seaweedfs/seaweedfs/weed/mq/topic"
+ "github.com/seaweedfs/seaweedfs/weed/pb"
"github.com/seaweedfs/seaweedfs/weed/pb/mq_pb"
"google.golang.org/grpc/peer"
"io"
@@ -59,6 +60,21 @@ func (b *MessageQueueBroker) PublishMessage(stream mq_pb.SeaweedMessaging_Publis
return stream.Send(response)
}
ackInterval = int(initMessage.AckInterval)
+ for _, follower := range initMessage.FollowerBrokers {
+ followErr := b.withBrokerClient(false, pb.ServerAddress(follower), func(client mq_pb.SeaweedMessagingClient) error {
+ _, err := client.PublishFollowMe(context.Background(), &mq_pb.PublishFollowMeRequest{
+ Topic: initMessage.Topic,
+ Partition: initMessage.Partition,
+ BrokerSelf: string(b.option.BrokerAddress()),
+ })
+ return err
+ })
+ if followErr != nil {
+ response.Error = fmt.Sprintf("follower %v failed: %v", follower, followErr)
+ glog.Errorf("follower %v failed: %v", follower, followErr)
+ return stream.Send(response)
+ }
+ }
stream.Send(response)
} else {
response.Error = fmt.Sprintf("missing init message")
@@ -86,21 +102,9 @@ func (b *MessageQueueBroker) PublishMessage(stream mq_pb.SeaweedMessaging_Publis
glog.V(0).Infof("topic %v partition %v published %d messges.", initMessage.Topic, initMessage.Partition, ackSequence)
}()
go func() {
- for {
- select {
- case resp := <-respChan:
- if resp != nil {
- if err := stream.Send(resp); err != nil {
- glog.Errorf("Error sending response %v: %v", resp, err)
- }
- } else {
- return
- }
- case <-localTopicPartition.StopPublishersCh:
- respChan <- &mq_pb.PublishMessageResponse{
- AckSequence: ackSequence,
- ShouldClose: true,
- }
+ for resp := range respChan {
+ if err := stream.Send(resp); err != nil {
+ glog.Errorf("Error sending response %v: %v", resp, err)
}
}
}()