aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorchrislu <chris.lu@gmail.com>2024-01-11 23:03:55 -0800
committerchrislu <chris.lu@gmail.com>2024-01-11 23:03:55 -0800
commitf750a5e03b2dce678203d4cbabaccbe3b1cc8049 (patch)
tree64acc53dcd2762779a4e2b713ee9f2d3e75a883f
parent45994641e91a901d5ab826b393b09abcce3c0d81 (diff)
downloadseaweedfs-f750a5e03b2dce678203d4cbabaccbe3b1cc8049.tar.xz
seaweedfs-f750a5e03b2dce678203d4cbabaccbe3b1cc8049.zip
passing timestamp
-rw-r--r--weed/mq/sub_coordinator/consumer_group.go6
-rw-r--r--weed/mq/sub_coordinator/partition_list.go4
2 files changed, 4 insertions, 6 deletions
diff --git a/weed/mq/sub_coordinator/consumer_group.go b/weed/mq/sub_coordinator/consumer_group.go
index 9b62f616e..a1279c204 100644
--- a/weed/mq/sub_coordinator/consumer_group.go
+++ b/weed/mq/sub_coordinator/consumer_group.go
@@ -71,8 +71,6 @@ func (cg *ConsumerGroup) OnPartitionListChange(assignments []*mq_pb.BrokerPartit
func (cg *ConsumerGroup) RebalanceConsumberGroupInstances(knownPartitionSlotToBrokerList *pub_balancer.PartitionSlotToBrokerList, reason string) {
glog.V(0).Infof("rebalance consumer group %s due to %s", cg.topic.String(), reason)
- now := time.Now().UnixNano()
-
// collect current topic partitions
partitionSlotToBrokerList := knownPartitionSlotToBrokerList
if partitionSlotToBrokerList == nil {
@@ -104,7 +102,7 @@ func (cg *ConsumerGroup) RebalanceConsumberGroupInstances(knownPartitionSlotToBr
if !found {
partitionSlots = make([]*PartitionSlotToConsumerInstance, 0)
}
- consumerGroupInstance.Partitions = ToPartitions(partitionSlotToBrokerList.RingSize, partitionSlots, now)
+ consumerGroupInstance.Partitions = ToPartitions(partitionSlotToBrokerList.RingSize, partitionSlots)
assignedPartitions := make([]*mq_pb.SubscriberToSubCoordinatorResponse_AssignedPartition, len(partitionSlots))
for i, partitionSlot := range partitionSlots {
assignedPartitions[i] = &mq_pb.SubscriberToSubCoordinatorResponse_AssignedPartition{
@@ -112,7 +110,7 @@ func (cg *ConsumerGroup) RebalanceConsumberGroupInstances(knownPartitionSlotToBr
RangeStop: partitionSlot.RangeStop,
RangeStart: partitionSlot.RangeStart,
RingSize: partitionSlotToBrokerList.RingSize,
- UnixTimeNs: now,
+ UnixTimeNs: partitionSlot.UnixTimeNs,
},
Broker: partitionSlot.Broker,
}
diff --git a/weed/mq/sub_coordinator/partition_list.go b/weed/mq/sub_coordinator/partition_list.go
index 7f02253f6..fa0e3761f 100644
--- a/weed/mq/sub_coordinator/partition_list.go
+++ b/weed/mq/sub_coordinator/partition_list.go
@@ -23,10 +23,10 @@ func NewPartitionSlotToConsumerInstanceList(ringSize int32, version int64) *Part
}
}
-func ToPartitions(ringSize int32, slots []*PartitionSlotToConsumerInstance, unixTimeNs int64) []*topic.Partition {
+func ToPartitions(ringSize int32, slots []*PartitionSlotToConsumerInstance) []*topic.Partition {
partitions := make([]*topic.Partition, 0, len(slots))
for _, slot := range slots {
- partitions = append(partitions, topic.NewPartition(slot.RangeStart, slot.RangeStop, ringSize, unixTimeNs))
+ partitions = append(partitions, topic.NewPartition(slot.RangeStart, slot.RangeStop, ringSize, slot.UnixTimeNs))
}
return partitions
}