diff options
Diffstat (limited to 'weed/mq/broker/broker_grpc_admin.go')
| -rw-r--r-- | weed/mq/broker/broker_grpc_admin.go | 225 |
1 files changed, 2 insertions, 223 deletions
diff --git a/weed/mq/broker/broker_grpc_admin.go b/weed/mq/broker/broker_grpc_admin.go index b24bf08a4..1313d09ec 100644 --- a/weed/mq/broker/broker_grpc_admin.go +++ b/weed/mq/broker/broker_grpc_admin.go @@ -3,18 +3,13 @@ package broker import ( "context" "github.com/seaweedfs/seaweedfs/weed/cluster" - "github.com/seaweedfs/seaweedfs/weed/glog" - "github.com/seaweedfs/seaweedfs/weed/mq/topic" - "github.com/seaweedfs/seaweedfs/weed/pb" "github.com/seaweedfs/seaweedfs/weed/pb/master_pb" "github.com/seaweedfs/seaweedfs/weed/pb/mq_pb" - "sort" - "sync" ) -func (broker *MessageQueueBroker) FindBrokerLeader(c context.Context, request *mq_pb.FindBrokerLeaderRequest) (*mq_pb.FindBrokerLeaderResponse, error) { +func (b *MessageQueueBroker) FindBrokerLeader(c context.Context, request *mq_pb.FindBrokerLeaderRequest) (*mq_pb.FindBrokerLeaderResponse, error) { ret := &mq_pb.FindBrokerLeaderResponse{} - err := broker.withMasterClient(false, broker.MasterClient.GetMaster(), func(client master_pb.SeaweedClient) error { + err := b.withMasterClient(false, b.MasterClient.GetMaster(), func(client master_pb.SeaweedClient) error { resp, err := client.ListClusterNodes(context.Background(), &master_pb.ListClusterNodesRequest{ ClientType: cluster.BrokerType, FilerGroup: request.FilerGroup, @@ -30,219 +25,3 @@ func (broker *MessageQueueBroker) FindBrokerLeader(c context.Context, request *m }) return ret, err } - -func (broker *MessageQueueBroker) AssignSegmentBrokers(c context.Context, request *mq_pb.AssignSegmentBrokersRequest) (*mq_pb.AssignSegmentBrokersResponse, error) { - ret := &mq_pb.AssignSegmentBrokersResponse{} - segment := topic.FromPbSegment(request.Segment) - - // check existing segment locations on filer - existingBrokers, err := broker.checkSegmentOnFiler(segment) - if err != nil { - return ret, err - } - - if len(existingBrokers) > 0 { - // good if the segment is still on the brokers - isActive, err := broker.checkSegmentsOnBrokers(segment, existingBrokers) - if err != nil { - return ret, err - } - if isActive { - for _, broker := range existingBrokers { - ret.Brokers = append(ret.Brokers, string(broker)) - } - return ret, nil - } - } - - // randomly pick up to 10 brokers, and find the ones with the lightest load - selectedBrokers, err := broker.selectBrokers() - if err != nil { - return ret, err - } - - // save the allocated brokers info for this segment on the filer - if err := broker.saveSegmentBrokersOnFiler(segment, selectedBrokers); err != nil { - return ret, err - } - - for _, broker := range selectedBrokers { - ret.Brokers = append(ret.Brokers, string(broker)) - } - return ret, nil -} - -func (broker *MessageQueueBroker) CheckSegmentStatus(c context.Context, request *mq_pb.CheckSegmentStatusRequest) (*mq_pb.CheckSegmentStatusResponse, error) { - ret := &mq_pb.CheckSegmentStatusResponse{} - // TODO add in memory active segment - return ret, nil -} - -func (broker *MessageQueueBroker) CheckBrokerLoad(c context.Context, request *mq_pb.CheckBrokerLoadRequest) (*mq_pb.CheckBrokerLoadResponse, error) { - ret := &mq_pb.CheckBrokerLoadResponse{} - // TODO read broker's load - return ret, nil -} - -// createOrUpdateTopicPartitions creates the topic partitions on the broker -// 1. check -func (broker *MessageQueueBroker) createOrUpdateTopicPartitions(topic *topic.Topic, prevAssignments []*mq_pb.BrokerPartitionAssignment) (err error) { - // create or update each partition - if prevAssignments == nil { - broker.createOrUpdateTopicPartition(topic, nil) - } else { - for _, brokerPartitionAssignment := range prevAssignments { - broker.createOrUpdateTopicPartition(topic, brokerPartitionAssignment) - } - } - return nil -} - -func (broker *MessageQueueBroker) createOrUpdateTopicPartition(topic *topic.Topic, oldAssignment *mq_pb.BrokerPartitionAssignment) (newAssignment *mq_pb.BrokerPartitionAssignment) { - shouldCreate := broker.confirmBrokerPartitionAssignment(topic, oldAssignment) - if !shouldCreate { - - } - return -} -func (broker *MessageQueueBroker) confirmBrokerPartitionAssignment(topic *topic.Topic, oldAssignment *mq_pb.BrokerPartitionAssignment) (shouldCreate bool) { - if oldAssignment == nil { - return true - } - for _, b := range oldAssignment.FollowerBrokers { - pb.WithBrokerGrpcClient(false, b, broker.grpcDialOption, func(client mq_pb.SeaweedMessagingClient) error { - _, err := client.CheckTopicPartitionsStatus(context.Background(), &mq_pb.CheckTopicPartitionsStatusRequest{ - Namespace: string(topic.Namespace), - Topic: topic.Name, - BrokerPartitionAssignment: oldAssignment, - ShouldCancelIfNotMatch: true, - }) - if err != nil { - shouldCreate = true - } - return nil - }) - } - return -} - -func (broker *MessageQueueBroker) checkSegmentsOnBrokers(segment *topic.Segment, brokers []pb.ServerAddress) (active bool, err error) { - var wg sync.WaitGroup - - for _, candidate := range brokers { - wg.Add(1) - go func(candidate pb.ServerAddress) { - defer wg.Done() - broker.withBrokerClient(false, candidate, func(client mq_pb.SeaweedMessagingClient) error { - resp, checkErr := client.CheckSegmentStatus(context.Background(), &mq_pb.CheckSegmentStatusRequest{ - Segment: &mq_pb.Segment{ - Namespace: string(segment.Topic.Namespace), - Topic: segment.Topic.Name, - Id: segment.Id, - }, - }) - if checkErr != nil { - err = checkErr - glog.V(0).Infof("check segment status on %s: %v", candidate, checkErr) - return nil - } - if resp.IsActive == false { - active = false - } - return nil - }) - }(candidate) - } - wg.Wait() - return -} - -func (broker *MessageQueueBroker) selectBrokers() (brokers []pb.ServerAddress, err error) { - candidates, err := broker.selectCandidatesFromMaster(10) - if err != nil { - return - } - brokers, err = broker.pickLightestCandidates(candidates, 3) - return -} - -func (broker *MessageQueueBroker) selectCandidatesFromMaster(limit int32) (candidates []pb.ServerAddress, err error) { - err = broker.withMasterClient(false, broker.MasterClient.GetMaster(), func(client master_pb.SeaweedClient) error { - resp, err := client.ListClusterNodes(context.Background(), &master_pb.ListClusterNodesRequest{ - ClientType: cluster.BrokerType, - FilerGroup: broker.option.FilerGroup, - Limit: limit, - }) - if err != nil { - return err - } - if len(resp.ClusterNodes) == 0 { - return nil - } - for _, node := range resp.ClusterNodes { - candidates = append(candidates, pb.ServerAddress(node.Address)) - } - return nil - }) - return -} - -type CandidateStatus struct { - address pb.ServerAddress - messageCount int64 - bytesCount int64 - load int64 -} - -func (broker *MessageQueueBroker) pickLightestCandidates(candidates []pb.ServerAddress, limit int) (selected []pb.ServerAddress, err error) { - - if len(candidates) <= limit { - return candidates, nil - } - - candidateStatuses, err := broker.checkBrokerStatus(candidates) - if err != nil { - return nil, err - } - - sort.Slice(candidateStatuses, func(i, j int) bool { - return candidateStatuses[i].load < candidateStatuses[j].load - }) - - for i, candidate := range candidateStatuses { - if i >= limit { - break - } - selected = append(selected, candidate.address) - } - - return -} - -func (broker *MessageQueueBroker) checkBrokerStatus(candidates []pb.ServerAddress) (candidateStatuses []*CandidateStatus, err error) { - - candidateStatuses = make([]*CandidateStatus, len(candidates)) - var wg sync.WaitGroup - for i, candidate := range candidates { - wg.Add(1) - go func(i int, candidate pb.ServerAddress) { - defer wg.Done() - err = broker.withBrokerClient(false, candidate, func(client mq_pb.SeaweedMessagingClient) error { - resp, checkErr := client.CheckBrokerLoad(context.Background(), &mq_pb.CheckBrokerLoadRequest{}) - if checkErr != nil { - err = checkErr - return err - } - candidateStatuses[i] = &CandidateStatus{ - address: candidate, - messageCount: resp.MessageCount, - bytesCount: resp.BytesCount, - load: resp.MessageCount + resp.BytesCount/(64*1024), - } - return nil - }) - }(i, candidate) - } - wg.Wait() - return -} |
