aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorchrislu <chris.lu@gmail.com>2024-01-05 15:35:19 -0800
committerchrislu <chris.lu@gmail.com>2024-01-05 15:35:19 -0800
commit496fc8fbbfe85547e4b6a46f153076a27fd4e9aa (patch)
tree1547ea7dba83b42852039228f15d24a7a8a1275a
parente8611ed85dd64ee2491a6beed3dd835499f7dc74 (diff)
downloadseaweedfs-496fc8fbbfe85547e4b6a46f153076a27fd4e9aa.tar.xz
seaweedfs-496fc8fbbfe85547e4b6a46f153076a27fd4e9aa.zip
refactor
-rw-r--r--weed/mq/client/sub_client/connect_to_sub_coordinator.go15
1 files changed, 8 insertions, 7 deletions
diff --git a/weed/mq/client/sub_client/connect_to_sub_coordinator.go b/weed/mq/client/sub_client/connect_to_sub_coordinator.go
index e7749f94b..fa75f87fe 100644
--- a/weed/mq/client/sub_client/connect_to_sub_coordinator.go
+++ b/weed/mq/client/sub_client/connect_to_sub_coordinator.go
@@ -108,15 +108,15 @@ func (sub *TopicSubscriber) onEachPartition(partition *mq_pb.Partition, broker s
Namespace: sub.ContentConfig.Namespace,
Name: sub.ContentConfig.Topic,
},
- Partition: &mq_pb.Partition{
- RingSize: partition.RingSize,
- RangeStart: partition.RangeStart,
- RangeStop: partition.RangeStop,
+ PartitionOffset: &mq_pb.PartitionOffset{
+ Partition: &mq_pb.Partition{
+ RingSize: partition.RingSize,
+ RangeStart: partition.RangeStart,
+ RangeStop: partition.RangeStop,
+ },
+ TsNs: sub.alreadyProcessedTsNs,
},
Filter: sub.ContentConfig.Filter,
- Offset: &mq_pb.SubscribeMessageRequest_InitMessage_StartTimestampNs{
- StartTimestampNs: sub.alreadyProcessedTsNs,
- },
},
},
})
@@ -148,6 +148,7 @@ func (sub *TopicSubscriber) onEachPartition(partition *mq_pb.Partition, broker s
return fmt.Errorf("subscribe recv: %v", err)
}
if resp.Message == nil {
+ glog.V(0).Infof("subscriber %s/%s/%s received nil message", sub.ContentConfig.Namespace, sub.ContentConfig.Topic, sub.SubscriberConfig.ConsumerGroup)
continue
}
switch m := resp.Message.(type) {