diff options
| author | chrislu <chris.lu@gmail.com> | 2024-03-16 17:11:18 -0700 |
|---|---|---|
| committer | chrislu <chris.lu@gmail.com> | 2024-03-16 17:11:18 -0700 |
| commit | 3f2dd8cc3eca4a7782d0857e5c2b2ca41ee4b94f (patch) | |
| tree | 58f5dd980f85c912a91ed5404ab546244d80a9bd | |
| parent | b74e8082bac408138be99e128b8c28fd19eca7a6 (diff) | |
| download | seaweedfs-3f2dd8cc3eca4a7782d0857e5c2b2ca41ee4b94f.tar.xz seaweedfs-3f2dd8cc3eca4a7782d0857e5c2b2ca41ee4b94f.zip | |
add follower id info
| -rw-r--r-- | weed/mq/broker/broker_grpc_pub_follow.go | 10 |
1 files changed, 6 insertions, 4 deletions
diff --git a/weed/mq/broker/broker_grpc_pub_follow.go b/weed/mq/broker/broker_grpc_pub_follow.go index 8ef85110a..f0c4a26ca 100644 --- a/weed/mq/broker/broker_grpc_pub_follow.go +++ b/weed/mq/broker/broker_grpc_pub_follow.go @@ -23,7 +23,7 @@ func (b *MessageQueueBroker) PublishFollowMe(c context.Context, request *mq_pb.P Message: &mq_pb.FollowInMemoryMessagesRequest_Init{ Init: &mq_pb.FollowInMemoryMessagesRequest_InitMessage{ ConsumerGroup: string(b.option.BrokerAddress()), - ConsumerId: fmt.Sprintf("followMe-%d", followerId), + ConsumerId: fmt.Sprintf("followMe@%s-%d", b.option.BrokerAddress(), followerId), FollowerId: followerId, Topic: request.Topic, PartitionOffset: &mq_pb.PartitionOffset{ @@ -52,7 +52,7 @@ func (b *MessageQueueBroker) PublishFollowMe(c context.Context, request *mq_pb.P } wg.Done() - b.doFollowInMemoryMessage(context.Background(), subscribeClient) + b.doFollowInMemoryMessage(context.Background(), followerId, subscribeClient) return nil }) @@ -60,7 +60,7 @@ func (b *MessageQueueBroker) PublishFollowMe(c context.Context, request *mq_pb.P return &mq_pb.PublishFollowMeResponse{}, ret } -func (b *MessageQueueBroker) doFollowInMemoryMessage(c context.Context, client mq_pb.SeaweedMessaging_FollowInMemoryMessagesClient) { +func (b *MessageQueueBroker) doFollowInMemoryMessage(c context.Context, followerId int32, client mq_pb.SeaweedMessaging_FollowInMemoryMessagesClient) { for { resp, err := client.Recv() if err != nil { @@ -87,9 +87,11 @@ func (b *MessageQueueBroker) doFollowInMemoryMessage(c context.Context, client m } if m.Ctrl.FollowerChangedToId != 0 { // follower changed - glog.V(0).Infof("doFollowInMemoryMessage follower changed to %d", m.Ctrl.FollowerChangedToId) + glog.V(0).Infof("doFollowInMemoryMessage follower changed from %d to %d", followerId, m.Ctrl.FollowerChangedToId) return } + default: + glog.V(0).Infof("doFollowInMemoryMessage unknown message type: %v", m) } } } |
