diff options
| author | chrislu <chris.lu@gmail.com> | 2024-01-11 23:03:55 -0800 |
|---|---|---|
| committer | chrislu <chris.lu@gmail.com> | 2024-01-11 23:03:55 -0800 |
| commit | f750a5e03b2dce678203d4cbabaccbe3b1cc8049 (patch) | |
| tree | 64acc53dcd2762779a4e2b713ee9f2d3e75a883f | |
| parent | 45994641e91a901d5ab826b393b09abcce3c0d81 (diff) | |
| download | seaweedfs-f750a5e03b2dce678203d4cbabaccbe3b1cc8049.tar.xz seaweedfs-f750a5e03b2dce678203d4cbabaccbe3b1cc8049.zip | |
passing timestamp
| -rw-r--r-- | weed/mq/sub_coordinator/consumer_group.go | 6 | ||||
| -rw-r--r-- | weed/mq/sub_coordinator/partition_list.go | 4 |
2 files changed, 4 insertions, 6 deletions
diff --git a/weed/mq/sub_coordinator/consumer_group.go b/weed/mq/sub_coordinator/consumer_group.go index 9b62f616e..a1279c204 100644 --- a/weed/mq/sub_coordinator/consumer_group.go +++ b/weed/mq/sub_coordinator/consumer_group.go @@ -71,8 +71,6 @@ func (cg *ConsumerGroup) OnPartitionListChange(assignments []*mq_pb.BrokerPartit func (cg *ConsumerGroup) RebalanceConsumberGroupInstances(knownPartitionSlotToBrokerList *pub_balancer.PartitionSlotToBrokerList, reason string) { glog.V(0).Infof("rebalance consumer group %s due to %s", cg.topic.String(), reason) - now := time.Now().UnixNano() - // collect current topic partitions partitionSlotToBrokerList := knownPartitionSlotToBrokerList if partitionSlotToBrokerList == nil { @@ -104,7 +102,7 @@ func (cg *ConsumerGroup) RebalanceConsumberGroupInstances(knownPartitionSlotToBr if !found { partitionSlots = make([]*PartitionSlotToConsumerInstance, 0) } - consumerGroupInstance.Partitions = ToPartitions(partitionSlotToBrokerList.RingSize, partitionSlots, now) + consumerGroupInstance.Partitions = ToPartitions(partitionSlotToBrokerList.RingSize, partitionSlots) assignedPartitions := make([]*mq_pb.SubscriberToSubCoordinatorResponse_AssignedPartition, len(partitionSlots)) for i, partitionSlot := range partitionSlots { assignedPartitions[i] = &mq_pb.SubscriberToSubCoordinatorResponse_AssignedPartition{ @@ -112,7 +110,7 @@ func (cg *ConsumerGroup) RebalanceConsumberGroupInstances(knownPartitionSlotToBr RangeStop: partitionSlot.RangeStop, RangeStart: partitionSlot.RangeStart, RingSize: partitionSlotToBrokerList.RingSize, - UnixTimeNs: now, + UnixTimeNs: partitionSlot.UnixTimeNs, }, Broker: partitionSlot.Broker, } diff --git a/weed/mq/sub_coordinator/partition_list.go b/weed/mq/sub_coordinator/partition_list.go index 7f02253f6..fa0e3761f 100644 --- a/weed/mq/sub_coordinator/partition_list.go +++ b/weed/mq/sub_coordinator/partition_list.go @@ -23,10 +23,10 @@ func NewPartitionSlotToConsumerInstanceList(ringSize int32, version int64) *Part } } -func ToPartitions(ringSize int32, slots []*PartitionSlotToConsumerInstance, unixTimeNs int64) []*topic.Partition { +func ToPartitions(ringSize int32, slots []*PartitionSlotToConsumerInstance) []*topic.Partition { partitions := make([]*topic.Partition, 0, len(slots)) for _, slot := range slots { - partitions = append(partitions, topic.NewPartition(slot.RangeStart, slot.RangeStop, ringSize, unixTimeNs)) + partitions = append(partitions, topic.NewPartition(slot.RangeStart, slot.RangeStop, ringSize, slot.UnixTimeNs)) } return partitions } |
