diff options
| author | chrislu <chris.lu@gmail.com> | 2024-05-29 23:34:39 -0700 |
|---|---|---|
| committer | chrislu <chris.lu@gmail.com> | 2024-05-29 23:34:39 -0700 |
| commit | af19256dbf95e91476bf182203ca5df34fd25ab3 (patch) | |
| tree | 5b9f4067327f0164c9495727b953aa07edda6517 | |
| parent | ad2b87813062f3227ff7d537a10306bbcfc6a7d4 (diff) | |
| download | seaweedfs-af19256dbf95e91476bf182203ca5df34fd25ab3.tar.xz seaweedfs-af19256dbf95e91476bf182203ca5df34fd25ab3.zip | |
skip control messages
| -rw-r--r-- | weed/mq/broker/broker_grpc_sub_follow.go | 6 | ||||
| -rw-r--r-- | weed/mq/client/sub_client/on_each_partition.go | 4 |
2 files changed, 8 insertions, 2 deletions
diff --git a/weed/mq/broker/broker_grpc_sub_follow.go b/weed/mq/broker/broker_grpc_sub_follow.go index eb041dc60..351b904c2 100644 --- a/weed/mq/broker/broker_grpc_sub_follow.go +++ b/weed/mq/broker/broker_grpc_sub_follow.go @@ -53,9 +53,11 @@ func (b *MessageQueueBroker) SubscribeFollowMe(stream mq_pb.SeaweedMessaging_Sub t, p := topic.FromPbTopic(initMessage.Topic), topic.FromPbPartition(initMessage.Partition) - err = b.saveConsumerGroupOffset(t, p, initMessage.ConsumerGroup, lastOffset) + if lastOffset > 0 { + err = b.saveConsumerGroupOffset(t, p, initMessage.ConsumerGroup, lastOffset) + } - glog.V(0).Infof("shut down follower for %v", initMessage) + glog.V(0).Infof("shut down follower for %v offset %d", initMessage, lastOffset) return err } diff --git a/weed/mq/client/sub_client/on_each_partition.go b/weed/mq/client/sub_client/on_each_partition.go index 33db81143..14e427713 100644 --- a/weed/mq/client/sub_client/on_each_partition.go +++ b/weed/mq/client/sub_client/on_each_partition.go @@ -93,6 +93,10 @@ func (sub *TopicSubscriber) onEachPartition(assigned *mq_pb.BrokerPartitionAssig } switch m := resp.Message.(type) { case *mq_pb.SubscribeMessageResponse_Data: + if m.Data.Ctrl != nil { + glog.V(2).Infof("subscriber %s received control from producer:%s isClose:%v", sub.SubscriberConfig.ConsumerGroup, m.Data.Ctrl.IsClose) + continue + } executors.Execute(func() { processErr := sub.OnEachMessageFunc(m.Data.Key, m.Data.Value) if processErr == nil { |
