aboutsummaryrefslogtreecommitdiff
path: root/weed/mq/broker/brokder_grpc_admin.go
diff options
context:
space:
mode:
Diffstat (limited to 'weed/mq/broker/brokder_grpc_admin.go')
-rw-r--r--weed/mq/broker/brokder_grpc_admin.go107
1 files changed, 92 insertions, 15 deletions
diff --git a/weed/mq/broker/brokder_grpc_admin.go b/weed/mq/broker/brokder_grpc_admin.go
index 7e669ef9b..5aac780fb 100644
--- a/weed/mq/broker/brokder_grpc_admin.go
+++ b/weed/mq/broker/brokder_grpc_admin.go
@@ -4,7 +4,7 @@ import (
"context"
"github.com/seaweedfs/seaweedfs/weed/cluster"
"github.com/seaweedfs/seaweedfs/weed/glog"
- "github.com/seaweedfs/seaweedfs/weed/mq"
+ "github.com/seaweedfs/seaweedfs/weed/mq/topic"
"github.com/seaweedfs/seaweedfs/weed/pb"
"github.com/seaweedfs/seaweedfs/weed/pb/master_pb"
"github.com/seaweedfs/seaweedfs/weed/pb/mq_pb"
@@ -12,6 +12,10 @@ import (
"sync"
)
+const (
+ MaxPartitionCount = 1024
+)
+
func (broker *MessageQueueBroker) FindBrokerLeader(c context.Context, request *mq_pb.FindBrokerLeaderRequest) (*mq_pb.FindBrokerLeaderResponse, error) {
ret := &mq_pb.FindBrokerLeaderResponse{}
err := broker.withMasterClient(false, broker.MasterClient.GetMaster(), func(client master_pb.SeaweedClient) error {
@@ -31,21 +35,9 @@ func (broker *MessageQueueBroker) FindBrokerLeader(c context.Context, request *m
return ret, err
}
-func (broker *MessageQueueBroker) CheckSegmentStatus(c context.Context, request *mq_pb.CheckSegmentStatusRequest) (*mq_pb.CheckSegmentStatusResponse, error) {
- ret := &mq_pb.CheckSegmentStatusResponse{}
- // TODO add in memory active segment
- return ret, nil
-}
-
-func (broker *MessageQueueBroker) CheckBrokerLoad(c context.Context, request *mq_pb.CheckBrokerLoadRequest) (*mq_pb.CheckBrokerLoadResponse, error) {
- ret := &mq_pb.CheckBrokerLoadResponse{}
- // TODO read broker's load
- return ret, nil
-}
-
func (broker *MessageQueueBroker) AssignSegmentBrokers(c context.Context, request *mq_pb.AssignSegmentBrokersRequest) (*mq_pb.AssignSegmentBrokersResponse, error) {
ret := &mq_pb.AssignSegmentBrokersResponse{}
- segment := mq.FromPbSegment(request.Segment)
+ segment := topic.FromPbSegment(request.Segment)
// check existing segment locations on filer
existingBrokers, err := broker.checkSegmentOnFiler(segment)
@@ -84,7 +76,92 @@ func (broker *MessageQueueBroker) AssignSegmentBrokers(c context.Context, reques
return ret, nil
}
-func (broker *MessageQueueBroker) checkSegmentsOnBrokers(segment *mq.Segment, brokers []pb.ServerAddress) (active bool, err error) {
+func (broker *MessageQueueBroker) CheckSegmentStatus(c context.Context, request *mq_pb.CheckSegmentStatusRequest) (*mq_pb.CheckSegmentStatusResponse, error) {
+ ret := &mq_pb.CheckSegmentStatusResponse{}
+ // TODO add in memory active segment
+ return ret, nil
+}
+
+func (broker *MessageQueueBroker) CheckBrokerLoad(c context.Context, request *mq_pb.CheckBrokerLoadRequest) (*mq_pb.CheckBrokerLoadResponse, error) {
+ ret := &mq_pb.CheckBrokerLoadResponse{}
+ // TODO read broker's load
+ return ret, nil
+}
+
+// FindTopicBrokers returns the brokers that are serving the topic
+//
+// 1. lock the topic
+//
+// 2. find the topic partitions on the filer
+// 2.1 if the topic is not found, return error
+// 2.2 if the request is_for_publish, create the topic
+// 2.2.1 if the request is_for_subscribe, return error not found
+// 2.2.2 if the request is_for_publish, create the topic
+// 2.2 if the topic is found, return the brokers
+//
+// 3. unlock the topic
+func (broker *MessageQueueBroker) FindTopicBrokers(c context.Context, request *mq_pb.FindTopicBrokersRequest) (*mq_pb.FindTopicBrokersResponse, error) {
+ ret := &mq_pb.FindTopicBrokersResponse{}
+ // lock the topic
+
+ // find the topic partitions on the filer
+ // if the topic is not found
+ // if the request is_for_publish
+ // create the topic
+ // if the request is_for_subscribe
+ // return error not found
+ return ret, nil
+}
+
+// CheckTopicPartitionsStatus check the topic partitions on the broker
+func (broker *MessageQueueBroker) CheckTopicPartitionsStatus(c context.Context, request *mq_pb.CheckTopicPartitionsStatusRequest) (*mq_pb.CheckTopicPartitionsStatusResponse, error) {
+ ret := &mq_pb.CheckTopicPartitionsStatusResponse{}
+ return ret, nil
+}
+
+// createOrUpdateTopicPartitions creates the topic partitions on the broker
+// 1. check
+func (broker *MessageQueueBroker) createOrUpdateTopicPartitions(topic *topic.Topic, prevAssignment *mq_pb.TopicPartitionsAssignment) (err error) {
+ // create or update each partition
+ if prevAssignment == nil {
+ broker.createOrUpdateTopicPartition(topic, nil)
+ } else {
+ for _, partitionAssignment := range prevAssignment.BrokerPartitions {
+ broker.createOrUpdateTopicPartition(topic, partitionAssignment)
+ }
+ }
+ return nil
+}
+
+func (broker *MessageQueueBroker) createOrUpdateTopicPartition(topic *topic.Topic, oldAssignment *mq_pb.BrokerPartitionsAssignment) (newAssignment *mq_pb.BrokerPartitionsAssignment) {
+ shouldCreate := broker.confirmBrokerPartitionAssignment(topic, oldAssignment)
+ if !shouldCreate {
+
+ }
+ return
+}
+func (broker *MessageQueueBroker) confirmBrokerPartitionAssignment(topic *topic.Topic, oldAssignment *mq_pb.BrokerPartitionsAssignment) (shouldCreate bool) {
+ if oldAssignment == nil {
+ return true
+ }
+ for _, b := range oldAssignment.FollowerBrokers {
+ pb.WithBrokerClient(false, pb.ServerAddress(b), broker.grpcDialOption, func(client mq_pb.SeaweedMessagingClient) error {
+ _, err := client.CheckTopicPartitionsStatus(context.Background(), &mq_pb.CheckTopicPartitionsStatusRequest{
+ Namespace: string(topic.Namespace),
+ Topic: topic.Name,
+ BrokerPartitionsAssignment: oldAssignment,
+ ShouldCancelIfNotMatch: true,
+ })
+ if err != nil {
+ shouldCreate = true
+ }
+ return nil
+ })
+ }
+ return
+}
+
+func (broker *MessageQueueBroker) checkSegmentsOnBrokers(segment *topic.Segment, brokers []pb.ServerAddress) (active bool, err error) {
var wg sync.WaitGroup
for _, candidate := range brokers {