aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorchrislu <chris.lu@gmail.com>2024-01-16 08:55:47 -0800
committerchrislu <chris.lu@gmail.com>2024-01-16 08:55:47 -0800
commitdb3670a3a5dda3fb512cb45e8d082bcf9358468b (patch)
treed26d3c59fe2056da394325d2365b723eb2a99039
parent34a78ffad00310464a8f8731b2d31d54c19d618d (diff)
downloadseaweedfs-db3670a3a5dda3fb512cb45e8d082bcf9358468b.tar.xz
seaweedfs-db3670a3a5dda3fb512cb45e8d082bcf9358468b.zip
simplify api
-rw-r--r--weed/mq/broker/broker_grpc_configure.go2
-rw-r--r--weed/mq/broker/broker_grpc_lookup.go2
-rw-r--r--weed/mq/pub_balancer/lookup.go7
3 files changed, 7 insertions, 4 deletions
diff --git a/weed/mq/broker/broker_grpc_configure.go b/weed/mq/broker/broker_grpc_configure.go
index 8e36675c1..e8b70a0ce 100644
--- a/weed/mq/broker/broker_grpc_configure.go
+++ b/weed/mq/broker/broker_grpc_configure.go
@@ -38,7 +38,7 @@ func (b *MessageQueueBroker) ConfigureTopic(ctx context.Context, request *mq_pb.
}
ret := &mq_pb.ConfigureTopicResponse{}
- ret.BrokerPartitionAssignments, _, err = b.Balancer.LookupOrAllocateTopicPartitions(request.Topic, true, request.PartitionCount)
+ ret.BrokerPartitionAssignments, _, err = b.Balancer.LookupOrAllocateTopicPartitions(request.Topic, request.PartitionCount)
for _, bpa := range ret.BrokerPartitionAssignments {
fmt.Printf("create topic %s partition %+v on %s\n", request.Topic, bpa.Partition, bpa.LeaderBroker)
diff --git a/weed/mq/broker/broker_grpc_lookup.go b/weed/mq/broker/broker_grpc_lookup.go
index ea0c8c0b4..74456c6e3 100644
--- a/weed/mq/broker/broker_grpc_lookup.go
+++ b/weed/mq/broker/broker_grpc_lookup.go
@@ -35,7 +35,7 @@ func (b *MessageQueueBroker) LookupTopicBrokers(ctx context.Context, request *mq
ret := &mq_pb.LookupTopicBrokersResponse{}
ret.Topic = request.Topic
- ret.BrokerPartitionAssignments, _, err = b.Balancer.LookupOrAllocateTopicPartitions(ret.Topic, false, -1)
+ ret.BrokerPartitionAssignments, _, err = b.Balancer.LookupOrAllocateTopicPartitions(ret.Topic, -1)
return ret, err
}
diff --git a/weed/mq/pub_balancer/lookup.go b/weed/mq/pub_balancer/lookup.go
index 5b9ca5a05..b74909729 100644
--- a/weed/mq/pub_balancer/lookup.go
+++ b/weed/mq/pub_balancer/lookup.go
@@ -10,7 +10,7 @@ var (
ErrNoBroker = errors.New("no broker")
)
-func (balancer *Balancer) LookupOrAllocateTopicPartitions(topic *mq_pb.Topic, publish bool, partitionCount int32) (assignments []*mq_pb.BrokerPartitionAssignment, alreadyExists bool, err error) {
+func (balancer *Balancer) LookupOrAllocateTopicPartitions(topic *mq_pb.Topic, partitionCount int32) (assignments []*mq_pb.BrokerPartitionAssignment, alreadyExists bool, err error) {
if partitionCount == 0 {
partitionCount = 6
}
@@ -35,10 +35,13 @@ func (balancer *Balancer) LookupOrAllocateTopicPartitions(topic *mq_pb.Topic, pu
}
}
}
- if len(assignments) > 0 || !(publish && len(assignments) !=int(partitionCount) && partitionCount > 0) {
+ if len(assignments) > 0 {
glog.V(0).Infof("existing topic partitions %d: %+v", len(assignments), assignments)
return assignments, true, nil
}
+ if partitionCount < 0 {
+ return nil, false, nil
+ }
// find the topic partitions on the filer
// if the topic is not found