aboutsummaryrefslogtreecommitdiff
path: root/weed/mq/pub_balancer/lookup.go
diff options
context:
space:
mode:
Diffstat (limited to 'weed/mq/pub_balancer/lookup.go')
-rw-r--r--weed/mq/pub_balancer/lookup.go9
1 files changed, 6 insertions, 3 deletions
diff --git a/weed/mq/pub_balancer/lookup.go b/weed/mq/pub_balancer/lookup.go
index 3e103a650..209261764 100644
--- a/weed/mq/pub_balancer/lookup.go
+++ b/weed/mq/pub_balancer/lookup.go
@@ -26,6 +26,7 @@ func (balancer *Balancer) LookupOrAllocateTopicPartitions(topic *mq_pb.Topic, pu
RingSize: MaxPartitionCount,
RangeStart: topicPartitionStat.RangeStart,
RangeStop: topicPartitionStat.RangeStop,
+ UnixTimeNs: topicPartitionStat.UnixTimeNs,
},
}
// TODO fix follower setting
@@ -34,8 +35,8 @@ func (balancer *Balancer) LookupOrAllocateTopicPartitions(topic *mq_pb.Topic, pu
}
}
}
- if len(assignments) > 0 && len(assignments) == int(partitionCount) || !publish {
- glog.V(0).Infof("existing topic partitions %d: %v", len(assignments), assignments)
+ if len(assignments) > 0 || !(publish && len(assignments) !=int(partitionCount) && partitionCount > 0) {
+ glog.V(0).Infof("existing topic partitions %d: %+v", len(assignments), assignments)
return assignments, nil
}
@@ -49,5 +50,7 @@ func (balancer *Balancer) LookupOrAllocateTopicPartitions(topic *mq_pb.Topic, pu
if balancer.Brokers.IsEmpty() {
return nil, ErrNoBroker
}
- return allocateTopicPartitions(balancer.Brokers, partitionCount), nil
+ assignments = allocateTopicPartitions(balancer.Brokers, partitionCount)
+ balancer.OnPartitionChange(topic, assignments)
+ return
}