diff options
| author | chrislu <chris.lu@gmail.com> | 2024-01-16 08:52:42 -0800 |
|---|---|---|
| committer | chrislu <chris.lu@gmail.com> | 2024-01-16 08:52:42 -0800 |
| commit | 34a78ffad00310464a8f8731b2d31d54c19d618d (patch) | |
| tree | ad87ed9dac245ec64fea93a770d6338d787592d2 /weed/mq | |
| parent | 3795d8dca8b1548263376057173aaac310e9fdd3 (diff) | |
| download | seaweedfs-34a78ffad00310464a8f8731b2d31d54c19d618d.tar.xz seaweedfs-34a78ffad00310464a8f8731b2d31d54c19d618d.zip | |
remove isForPublish from LookupTopicBrokers
also adds a return parameter: whether the topic exists or not
Diffstat (limited to 'weed/mq')
| -rw-r--r-- | weed/mq/broker/broker_grpc_configure.go | 2 | ||||
| -rw-r--r-- | weed/mq/broker/broker_grpc_lookup.go | 2 | ||||
| -rw-r--r-- | weed/mq/client/pub_client/lookup.go | 2 | ||||
| -rw-r--r-- | weed/mq/pub_balancer/lookup.go | 6 |
4 files changed, 5 insertions, 7 deletions
diff --git a/weed/mq/broker/broker_grpc_configure.go b/weed/mq/broker/broker_grpc_configure.go index 8d3727b1f..8e36675c1 100644 --- a/weed/mq/broker/broker_grpc_configure.go +++ b/weed/mq/broker/broker_grpc_configure.go @@ -38,7 +38,7 @@ func (b *MessageQueueBroker) ConfigureTopic(ctx context.Context, request *mq_pb. } ret := &mq_pb.ConfigureTopicResponse{} - ret.BrokerPartitionAssignments, err = b.Balancer.LookupOrAllocateTopicPartitions(request.Topic, true, request.PartitionCount) + ret.BrokerPartitionAssignments, _, err = b.Balancer.LookupOrAllocateTopicPartitions(request.Topic, true, request.PartitionCount) for _, bpa := range ret.BrokerPartitionAssignments { fmt.Printf("create topic %s partition %+v on %s\n", request.Topic, bpa.Partition, bpa.LeaderBroker) diff --git a/weed/mq/broker/broker_grpc_lookup.go b/weed/mq/broker/broker_grpc_lookup.go index fa5f81172..ea0c8c0b4 100644 --- a/weed/mq/broker/broker_grpc_lookup.go +++ b/weed/mq/broker/broker_grpc_lookup.go @@ -35,7 +35,7 @@ func (b *MessageQueueBroker) LookupTopicBrokers(ctx context.Context, request *mq ret := &mq_pb.LookupTopicBrokersResponse{} ret.Topic = request.Topic - ret.BrokerPartitionAssignments, err = b.Balancer.LookupOrAllocateTopicPartitions(ret.Topic, request.IsForPublish, -1) + ret.BrokerPartitionAssignments, _, err = b.Balancer.LookupOrAllocateTopicPartitions(ret.Topic, false, -1) return ret, err } diff --git a/weed/mq/client/pub_client/lookup.go b/weed/mq/client/pub_client/lookup.go index ccc83b58d..ce65bbc92 100644 --- a/weed/mq/client/pub_client/lookup.go +++ b/weed/mq/client/pub_client/lookup.go @@ -38,7 +38,6 @@ func (p *TopicPublisher) doLookupAndConnect(brokerAddress string) error { Namespace: p.namespace, Name: p.topic, }, - IsForPublish: true, }) glog.V(0).Infof("lookup1 topic %s/%s: %v", p.namespace, p.topic, lookupResp) if p.config.CreateTopic && err != nil { @@ -58,7 +57,6 @@ func (p *TopicPublisher) doLookupAndConnect(brokerAddress string) error { Namespace: p.namespace, Name: p.topic, }, - IsForPublish: true, }) glog.V(0).Infof("lookup2 topic %s/%s: %v", p.namespace, p.topic, lookupResp) } diff --git a/weed/mq/pub_balancer/lookup.go b/weed/mq/pub_balancer/lookup.go index 249aaaa1f..5b9ca5a05 100644 --- a/weed/mq/pub_balancer/lookup.go +++ b/weed/mq/pub_balancer/lookup.go @@ -10,7 +10,7 @@ var ( ErrNoBroker = errors.New("no broker") ) -func (balancer *Balancer) LookupOrAllocateTopicPartitions(topic *mq_pb.Topic, publish bool, partitionCount int32) (assignments []*mq_pb.BrokerPartitionAssignment, err error) { +func (balancer *Balancer) LookupOrAllocateTopicPartitions(topic *mq_pb.Topic, publish bool, partitionCount int32) (assignments []*mq_pb.BrokerPartitionAssignment, alreadyExists bool, err error) { if partitionCount == 0 { partitionCount = 6 } @@ -37,7 +37,7 @@ func (balancer *Balancer) LookupOrAllocateTopicPartitions(topic *mq_pb.Topic, pu } if len(assignments) > 0 || !(publish && len(assignments) !=int(partitionCount) && partitionCount > 0) { glog.V(0).Infof("existing topic partitions %d: %+v", len(assignments), assignments) - return assignments, nil + return assignments, true, nil } // find the topic partitions on the filer @@ -48,7 +48,7 @@ func (balancer *Balancer) LookupOrAllocateTopicPartitions(topic *mq_pb.Topic, pu // return error not found // t := topic.FromPbTopic(request.Topic) if balancer.Brokers.IsEmpty() { - return nil, ErrNoBroker + return nil, alreadyExists, ErrNoBroker } assignments = AllocateTopicPartitions(balancer.Brokers, partitionCount) balancer.OnPartitionChange(topic, assignments) |
