diff options
Diffstat (limited to 'weed/mq/broker/brokder_grpc_admin.go')
| -rw-r--r-- | weed/mq/broker/brokder_grpc_admin.go | 107 |
1 files changed, 92 insertions, 15 deletions
diff --git a/weed/mq/broker/brokder_grpc_admin.go b/weed/mq/broker/brokder_grpc_admin.go index 7e669ef9b..5aac780fb 100644 --- a/weed/mq/broker/brokder_grpc_admin.go +++ b/weed/mq/broker/brokder_grpc_admin.go @@ -4,7 +4,7 @@ import ( "context" "github.com/seaweedfs/seaweedfs/weed/cluster" "github.com/seaweedfs/seaweedfs/weed/glog" - "github.com/seaweedfs/seaweedfs/weed/mq" + "github.com/seaweedfs/seaweedfs/weed/mq/topic" "github.com/seaweedfs/seaweedfs/weed/pb" "github.com/seaweedfs/seaweedfs/weed/pb/master_pb" "github.com/seaweedfs/seaweedfs/weed/pb/mq_pb" @@ -12,6 +12,10 @@ import ( "sync" ) +const ( + MaxPartitionCount = 1024 +) + func (broker *MessageQueueBroker) FindBrokerLeader(c context.Context, request *mq_pb.FindBrokerLeaderRequest) (*mq_pb.FindBrokerLeaderResponse, error) { ret := &mq_pb.FindBrokerLeaderResponse{} err := broker.withMasterClient(false, broker.MasterClient.GetMaster(), func(client master_pb.SeaweedClient) error { @@ -31,21 +35,9 @@ func (broker *MessageQueueBroker) FindBrokerLeader(c context.Context, request *m return ret, err } -func (broker *MessageQueueBroker) CheckSegmentStatus(c context.Context, request *mq_pb.CheckSegmentStatusRequest) (*mq_pb.CheckSegmentStatusResponse, error) { - ret := &mq_pb.CheckSegmentStatusResponse{} - // TODO add in memory active segment - return ret, nil -} - -func (broker *MessageQueueBroker) CheckBrokerLoad(c context.Context, request *mq_pb.CheckBrokerLoadRequest) (*mq_pb.CheckBrokerLoadResponse, error) { - ret := &mq_pb.CheckBrokerLoadResponse{} - // TODO read broker's load - return ret, nil -} - func (broker *MessageQueueBroker) AssignSegmentBrokers(c context.Context, request *mq_pb.AssignSegmentBrokersRequest) (*mq_pb.AssignSegmentBrokersResponse, error) { ret := &mq_pb.AssignSegmentBrokersResponse{} - segment := mq.FromPbSegment(request.Segment) + segment := topic.FromPbSegment(request.Segment) // check existing segment locations on filer existingBrokers, err := broker.checkSegmentOnFiler(segment) @@ -84,7 +76,92 @@ func (broker *MessageQueueBroker) AssignSegmentBrokers(c context.Context, reques return ret, nil } -func (broker *MessageQueueBroker) checkSegmentsOnBrokers(segment *mq.Segment, brokers []pb.ServerAddress) (active bool, err error) { +func (broker *MessageQueueBroker) CheckSegmentStatus(c context.Context, request *mq_pb.CheckSegmentStatusRequest) (*mq_pb.CheckSegmentStatusResponse, error) { + ret := &mq_pb.CheckSegmentStatusResponse{} + // TODO add in memory active segment + return ret, nil +} + +func (broker *MessageQueueBroker) CheckBrokerLoad(c context.Context, request *mq_pb.CheckBrokerLoadRequest) (*mq_pb.CheckBrokerLoadResponse, error) { + ret := &mq_pb.CheckBrokerLoadResponse{} + // TODO read broker's load + return ret, nil +} + +// FindTopicBrokers returns the brokers that are serving the topic +// +// 1. lock the topic +// +// 2. find the topic partitions on the filer +// 2.1 if the topic is not found, return error +// 2.2 if the request is_for_publish, create the topic +// 2.2.1 if the request is_for_subscribe, return error not found +// 2.2.2 if the request is_for_publish, create the topic +// 2.2 if the topic is found, return the brokers +// +// 3. unlock the topic +func (broker *MessageQueueBroker) FindTopicBrokers(c context.Context, request *mq_pb.FindTopicBrokersRequest) (*mq_pb.FindTopicBrokersResponse, error) { + ret := &mq_pb.FindTopicBrokersResponse{} + // lock the topic + + // find the topic partitions on the filer + // if the topic is not found + // if the request is_for_publish + // create the topic + // if the request is_for_subscribe + // return error not found + return ret, nil +} + +// CheckTopicPartitionsStatus check the topic partitions on the broker +func (broker *MessageQueueBroker) CheckTopicPartitionsStatus(c context.Context, request *mq_pb.CheckTopicPartitionsStatusRequest) (*mq_pb.CheckTopicPartitionsStatusResponse, error) { + ret := &mq_pb.CheckTopicPartitionsStatusResponse{} + return ret, nil +} + +// createOrUpdateTopicPartitions creates the topic partitions on the broker +// 1. check +func (broker *MessageQueueBroker) createOrUpdateTopicPartitions(topic *topic.Topic, prevAssignment *mq_pb.TopicPartitionsAssignment) (err error) { + // create or update each partition + if prevAssignment == nil { + broker.createOrUpdateTopicPartition(topic, nil) + } else { + for _, partitionAssignment := range prevAssignment.BrokerPartitions { + broker.createOrUpdateTopicPartition(topic, partitionAssignment) + } + } + return nil +} + +func (broker *MessageQueueBroker) createOrUpdateTopicPartition(topic *topic.Topic, oldAssignment *mq_pb.BrokerPartitionsAssignment) (newAssignment *mq_pb.BrokerPartitionsAssignment) { + shouldCreate := broker.confirmBrokerPartitionAssignment(topic, oldAssignment) + if !shouldCreate { + + } + return +} +func (broker *MessageQueueBroker) confirmBrokerPartitionAssignment(topic *topic.Topic, oldAssignment *mq_pb.BrokerPartitionsAssignment) (shouldCreate bool) { + if oldAssignment == nil { + return true + } + for _, b := range oldAssignment.FollowerBrokers { + pb.WithBrokerClient(false, pb.ServerAddress(b), broker.grpcDialOption, func(client mq_pb.SeaweedMessagingClient) error { + _, err := client.CheckTopicPartitionsStatus(context.Background(), &mq_pb.CheckTopicPartitionsStatusRequest{ + Namespace: string(topic.Namespace), + Topic: topic.Name, + BrokerPartitionsAssignment: oldAssignment, + ShouldCancelIfNotMatch: true, + }) + if err != nil { + shouldCreate = true + } + return nil + }) + } + return +} + +func (broker *MessageQueueBroker) checkSegmentsOnBrokers(segment *topic.Segment, brokers []pb.ServerAddress) (active bool, err error) { var wg sync.WaitGroup for _, candidate := range brokers { |
