aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--weed/mq/broker/broker_grpc_sub_follow.go6
-rw-r--r--weed/mq/client/sub_client/on_each_partition.go4
2 files changed, 8 insertions, 2 deletions
diff --git a/weed/mq/broker/broker_grpc_sub_follow.go b/weed/mq/broker/broker_grpc_sub_follow.go
index eb041dc60..351b904c2 100644
--- a/weed/mq/broker/broker_grpc_sub_follow.go
+++ b/weed/mq/broker/broker_grpc_sub_follow.go
@@ -53,9 +53,11 @@ func (b *MessageQueueBroker) SubscribeFollowMe(stream mq_pb.SeaweedMessaging_Sub
t, p := topic.FromPbTopic(initMessage.Topic), topic.FromPbPartition(initMessage.Partition)
- err = b.saveConsumerGroupOffset(t, p, initMessage.ConsumerGroup, lastOffset)
+ if lastOffset > 0 {
+ err = b.saveConsumerGroupOffset(t, p, initMessage.ConsumerGroup, lastOffset)
+ }
- glog.V(0).Infof("shut down follower for %v", initMessage)
+ glog.V(0).Infof("shut down follower for %v offset %d", initMessage, lastOffset)
return err
}
diff --git a/weed/mq/client/sub_client/on_each_partition.go b/weed/mq/client/sub_client/on_each_partition.go
index 33db81143..14e427713 100644
--- a/weed/mq/client/sub_client/on_each_partition.go
+++ b/weed/mq/client/sub_client/on_each_partition.go
@@ -93,6 +93,10 @@ func (sub *TopicSubscriber) onEachPartition(assigned *mq_pb.BrokerPartitionAssig
}
switch m := resp.Message.(type) {
case *mq_pb.SubscribeMessageResponse_Data:
+ if m.Data.Ctrl != nil {
+ glog.V(2).Infof("subscriber %s received control from producer:%s isClose:%v", sub.SubscriberConfig.ConsumerGroup, m.Data.Ctrl.IsClose)
+ continue
+ }
executors.Execute(func() {
processErr := sub.OnEachMessageFunc(m.Data.Key, m.Data.Value)
if processErr == nil {