aboutsummaryrefslogtreecommitdiff
path: root/weed
diff options
context:
space:
mode:
Diffstat (limited to 'weed')
-rw-r--r--weed/mq/broker/broker_grpc_pub_follow.go10
1 files changed, 6 insertions, 4 deletions
diff --git a/weed/mq/broker/broker_grpc_pub_follow.go b/weed/mq/broker/broker_grpc_pub_follow.go
index 8ef85110a..f0c4a26ca 100644
--- a/weed/mq/broker/broker_grpc_pub_follow.go
+++ b/weed/mq/broker/broker_grpc_pub_follow.go
@@ -23,7 +23,7 @@ func (b *MessageQueueBroker) PublishFollowMe(c context.Context, request *mq_pb.P
Message: &mq_pb.FollowInMemoryMessagesRequest_Init{
Init: &mq_pb.FollowInMemoryMessagesRequest_InitMessage{
ConsumerGroup: string(b.option.BrokerAddress()),
- ConsumerId: fmt.Sprintf("followMe-%d", followerId),
+ ConsumerId: fmt.Sprintf("followMe@%s-%d", b.option.BrokerAddress(), followerId),
FollowerId: followerId,
Topic: request.Topic,
PartitionOffset: &mq_pb.PartitionOffset{
@@ -52,7 +52,7 @@ func (b *MessageQueueBroker) PublishFollowMe(c context.Context, request *mq_pb.P
}
wg.Done()
- b.doFollowInMemoryMessage(context.Background(), subscribeClient)
+ b.doFollowInMemoryMessage(context.Background(), followerId, subscribeClient)
return nil
})
@@ -60,7 +60,7 @@ func (b *MessageQueueBroker) PublishFollowMe(c context.Context, request *mq_pb.P
return &mq_pb.PublishFollowMeResponse{}, ret
}
-func (b *MessageQueueBroker) doFollowInMemoryMessage(c context.Context, client mq_pb.SeaweedMessaging_FollowInMemoryMessagesClient) {
+func (b *MessageQueueBroker) doFollowInMemoryMessage(c context.Context, followerId int32, client mq_pb.SeaweedMessaging_FollowInMemoryMessagesClient) {
for {
resp, err := client.Recv()
if err != nil {
@@ -87,9 +87,11 @@ func (b *MessageQueueBroker) doFollowInMemoryMessage(c context.Context, client m
}
if m.Ctrl.FollowerChangedToId != 0 {
// follower changed
- glog.V(0).Infof("doFollowInMemoryMessage follower changed to %d", m.Ctrl.FollowerChangedToId)
+ glog.V(0).Infof("doFollowInMemoryMessage follower changed from %d to %d", followerId, m.Ctrl.FollowerChangedToId)
return
}
+ default:
+ glog.V(0).Infof("doFollowInMemoryMessage unknown message type: %v", m)
}
}
}