diff options
| author | chrislu <chris.lu@gmail.com> | 2024-01-05 15:35:19 -0800 |
|---|---|---|
| committer | chrislu <chris.lu@gmail.com> | 2024-01-05 15:35:19 -0800 |
| commit | 496fc8fbbfe85547e4b6a46f153076a27fd4e9aa (patch) | |
| tree | 1547ea7dba83b42852039228f15d24a7a8a1275a /weed | |
| parent | e8611ed85dd64ee2491a6beed3dd835499f7dc74 (diff) | |
| download | seaweedfs-496fc8fbbfe85547e4b6a46f153076a27fd4e9aa.tar.xz seaweedfs-496fc8fbbfe85547e4b6a46f153076a27fd4e9aa.zip | |
refactor
Diffstat (limited to 'weed')
| -rw-r--r-- | weed/mq/client/sub_client/connect_to_sub_coordinator.go | 15 |
1 files changed, 8 insertions, 7 deletions
diff --git a/weed/mq/client/sub_client/connect_to_sub_coordinator.go b/weed/mq/client/sub_client/connect_to_sub_coordinator.go index e7749f94b..fa75f87fe 100644 --- a/weed/mq/client/sub_client/connect_to_sub_coordinator.go +++ b/weed/mq/client/sub_client/connect_to_sub_coordinator.go @@ -108,15 +108,15 @@ func (sub *TopicSubscriber) onEachPartition(partition *mq_pb.Partition, broker s Namespace: sub.ContentConfig.Namespace, Name: sub.ContentConfig.Topic, }, - Partition: &mq_pb.Partition{ - RingSize: partition.RingSize, - RangeStart: partition.RangeStart, - RangeStop: partition.RangeStop, + PartitionOffset: &mq_pb.PartitionOffset{ + Partition: &mq_pb.Partition{ + RingSize: partition.RingSize, + RangeStart: partition.RangeStart, + RangeStop: partition.RangeStop, + }, + TsNs: sub.alreadyProcessedTsNs, }, Filter: sub.ContentConfig.Filter, - Offset: &mq_pb.SubscribeMessageRequest_InitMessage_StartTimestampNs{ - StartTimestampNs: sub.alreadyProcessedTsNs, - }, }, }, }) @@ -148,6 +148,7 @@ func (sub *TopicSubscriber) onEachPartition(partition *mq_pb.Partition, broker s return fmt.Errorf("subscribe recv: %v", err) } if resp.Message == nil { + glog.V(0).Infof("subscriber %s/%s/%s received nil message", sub.ContentConfig.Namespace, sub.ContentConfig.Topic, sub.SubscriberConfig.ConsumerGroup) continue } switch m := resp.Message.(type) { |
