diff options
Diffstat (limited to 'weed/mq/pub_balancer/lookup.go')
| -rw-r--r-- | weed/mq/pub_balancer/lookup.go | 9 |
1 files changed, 6 insertions, 3 deletions
diff --git a/weed/mq/pub_balancer/lookup.go b/weed/mq/pub_balancer/lookup.go index 3e103a650..209261764 100644 --- a/weed/mq/pub_balancer/lookup.go +++ b/weed/mq/pub_balancer/lookup.go @@ -26,6 +26,7 @@ func (balancer *Balancer) LookupOrAllocateTopicPartitions(topic *mq_pb.Topic, pu RingSize: MaxPartitionCount, RangeStart: topicPartitionStat.RangeStart, RangeStop: topicPartitionStat.RangeStop, + UnixTimeNs: topicPartitionStat.UnixTimeNs, }, } // TODO fix follower setting @@ -34,8 +35,8 @@ func (balancer *Balancer) LookupOrAllocateTopicPartitions(topic *mq_pb.Topic, pu } } } - if len(assignments) > 0 && len(assignments) == int(partitionCount) || !publish { - glog.V(0).Infof("existing topic partitions %d: %v", len(assignments), assignments) + if len(assignments) > 0 || !(publish && len(assignments) !=int(partitionCount) && partitionCount > 0) { + glog.V(0).Infof("existing topic partitions %d: %+v", len(assignments), assignments) return assignments, nil } @@ -49,5 +50,7 @@ func (balancer *Balancer) LookupOrAllocateTopicPartitions(topic *mq_pb.Topic, pu if balancer.Brokers.IsEmpty() { return nil, ErrNoBroker } - return allocateTopicPartitions(balancer.Brokers, partitionCount), nil + assignments = allocateTopicPartitions(balancer.Brokers, partitionCount) + balancer.OnPartitionChange(topic, assignments) + return } |
