aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorchrislu <chris.lu@gmail.com>2023-09-19 14:08:17 -0700
committerchrislu <chris.lu@gmail.com>2023-09-19 14:08:17 -0700
commitb18112ae10f2d342c8fc31145ad90586222290e5 (patch)
tree637e56de0a9ebe1a0b636eca0aec230d9d6d2056
parent6c6655de9d718b01d8e1caac2933261d887ecdf9 (diff)
downloadseaweedfs-b18112ae10f2d342c8fc31145ad90586222290e5.tar.xz
seaweedfs-b18112ae10f2d342c8fc31145ad90586222290e5.zip
fix compilation bugs during merge
-rw-r--r--weed/mq/broker/broker_grpc_lookup.go41
-rw-r--r--weed/mq/broker/broker_server.go1
2 files changed, 20 insertions, 22 deletions
diff --git a/weed/mq/broker/broker_grpc_lookup.go b/weed/mq/broker/broker_grpc_lookup.go
index 96448be83..9fc9c9fe4 100644
--- a/weed/mq/broker/broker_grpc_lookup.go
+++ b/weed/mq/broker/broker_grpc_lookup.go
@@ -3,6 +3,8 @@ package broker
import (
"context"
"github.com/seaweedfs/seaweedfs/weed/pb/mq_pb"
+ "google.golang.org/grpc/codes"
+ "google.golang.org/grpc/status"
)
// FindTopicBrokers returns the brokers that are serving the topic
@@ -17,30 +19,25 @@ import (
// 2.2 if the topic is found, return the brokers
//
// 3. unlock the topic
-func (broker *MessageQueueBroker) LookupTopicBrokers(ctx context.Context, request *mq_pb.LookupTopicBrokersRequest) (*mq_pb.LookupTopicBrokersResponse, error) {
- ret := &mq_pb.LookupTopicBrokersResponse{}
- // TODO lock the topic
+func (broker *MessageQueueBroker) LookupTopicBrokers(ctx context.Context, request *mq_pb.LookupTopicBrokersRequest) (resp *mq_pb.LookupTopicBrokersResponse, err error) {
+ if broker.currentBalancer == "" {
+ return nil, status.Errorf(codes.Unavailable, "no balancer")
+ }
+ if !broker.lockAsBalancer.IsLocked() {
+ proxyErr := broker.withBrokerClient(false, broker.currentBalancer, func(client mq_pb.SeaweedMessagingClient) error {
+ resp, err = client.LookupTopicBrokers(ctx, request)
+ return nil
+ })
+ if proxyErr != nil {
+ return nil, proxyErr
+ }
+ return resp, err
+ }
- // find the topic partitions on the filer
- // if the topic is not found
- // if the request is_for_publish
- // create the topic
- // if the request is_for_subscribe
- // return error not found
- // t := topic.FromPbTopic(request.Topic)
+ ret := &mq_pb.LookupTopicBrokersResponse{}
ret.Topic = request.Topic
- ret.BrokerPartitionAssignments = []*mq_pb.BrokerPartitionAssignment{
- {
- LeaderBroker: "localhost:17777",
- FollowerBrokers: []string{"localhost:17777"},
- Partition: &mq_pb.Partition{
- RingSize: MaxPartitionCount,
- RangeStart: 0,
- RangeStop: MaxPartitionCount,
- },
- },
- }
- return ret, nil
+ ret.BrokerPartitionAssignments, err = broker.Balancer.LookupOrAllocateTopicPartitions(ret.Topic, request.IsForPublish)
+ return ret, err
}
// CheckTopicPartitionsStatus check the topic partitions on the broker
diff --git a/weed/mq/broker/broker_server.go b/weed/mq/broker/broker_server.go
index db8329989..2ab20ac52 100644
--- a/weed/mq/broker/broker_server.go
+++ b/weed/mq/broker/broker_server.go
@@ -39,6 +39,7 @@ type MessageQueueBroker struct {
localTopicManager *topic.LocalTopicManager
Balancer *balancer.Balancer
lockAsBalancer *cluster.LiveLock
+ currentBalancer pb.ServerAddress
}
func NewMessageBroker(option *MessageQueueBrokerOption, grpcDialOption grpc.DialOption) (mqBroker *MessageQueueBroker, err error) {