aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--weed/mq/client/sub_client/connect_to_sub_coordinator.go15
1 files changed, 8 insertions, 7 deletions
diff --git a/weed/mq/client/sub_client/connect_to_sub_coordinator.go b/weed/mq/client/sub_client/connect_to_sub_coordinator.go
index e7749f94b..fa75f87fe 100644
--- a/weed/mq/client/sub_client/connect_to_sub_coordinator.go
+++ b/weed/mq/client/sub_client/connect_to_sub_coordinator.go
@@ -108,15 +108,15 @@ func (sub *TopicSubscriber) onEachPartition(partition *mq_pb.Partition, broker s
Namespace: sub.ContentConfig.Namespace,
Name: sub.ContentConfig.Topic,
},
- Partition: &mq_pb.Partition{
- RingSize: partition.RingSize,
- RangeStart: partition.RangeStart,
- RangeStop: partition.RangeStop,
+ PartitionOffset: &mq_pb.PartitionOffset{
+ Partition: &mq_pb.Partition{
+ RingSize: partition.RingSize,
+ RangeStart: partition.RangeStart,
+ RangeStop: partition.RangeStop,
+ },
+ TsNs: sub.alreadyProcessedTsNs,
},
Filter: sub.ContentConfig.Filter,
- Offset: &mq_pb.SubscribeMessageRequest_InitMessage_StartTimestampNs{
- StartTimestampNs: sub.alreadyProcessedTsNs,
- },
},
},
})
@@ -148,6 +148,7 @@ func (sub *TopicSubscriber) onEachPartition(partition *mq_pb.Partition, broker s
return fmt.Errorf("subscribe recv: %v", err)
}
if resp.Message == nil {
+ glog.V(0).Infof("subscriber %s/%s/%s received nil message", sub.ContentConfig.Namespace, sub.ContentConfig.Topic, sub.SubscriberConfig.ConsumerGroup)
continue
}
switch m := resp.Message.(type) {