diff options
Diffstat (limited to 'weed/mq/broker/brokder_grpc_admin.go')
| -rw-r--r-- | weed/mq/broker/brokder_grpc_admin.go | 31 |
1 files changed, 13 insertions, 18 deletions
diff --git a/weed/mq/broker/brokder_grpc_admin.go b/weed/mq/broker/brokder_grpc_admin.go index 31b5bb84e..0dfb69c50 100644 --- a/weed/mq/broker/brokder_grpc_admin.go +++ b/weed/mq/broker/brokder_grpc_admin.go @@ -53,16 +53,19 @@ func (broker *MessageQueueBroker) AssignSegmentBrokers(c context.Context, reques if err != nil { return ret, err } - // good if the segment is still on the brokers - isActive, err := broker.checkSegmentsOnBrokers(segment, existingBrokers) - if err != nil { - return ret, err - } - if isActive { - for _, broker := range existingBrokers { - ret.Brokers = append(ret.Brokers, string(broker)) + + if len(existingBrokers) > 0 { + // good if the segment is still on the brokers + isActive, err := broker.checkSegmentsOnBrokers(segment, existingBrokers) + if err != nil { + return ret, err + } + if isActive { + for _, broker := range existingBrokers { + ret.Brokers = append(ret.Brokers, string(broker)) + } + return ret, nil } - return ret, nil } // randomly pick up to 10 brokers, and find the ones with the lightest load @@ -72,7 +75,7 @@ func (broker *MessageQueueBroker) AssignSegmentBrokers(c context.Context, reques } // save the allocated brokers info for this segment on the filer - if err := broker.saveSegmentOnFiler(segment, selectedBrokers); err != nil { + if err := broker.saveSegmentBrokersOnFiler(segment, selectedBrokers); err != nil { return ret, err } @@ -82,10 +85,6 @@ func (broker *MessageQueueBroker) AssignSegmentBrokers(c context.Context, reques return ret, nil } -func (broker *MessageQueueBroker) checkSegmentOnFiler(segment *mq.Segment) (brokers []pb.ServerAddress, err error) { - return -} - func (broker *MessageQueueBroker) checkSegmentsOnBrokers(segment *mq.Segment, brokers []pb.ServerAddress) (active bool, err error) { var wg sync.WaitGroup @@ -206,7 +205,3 @@ func (broker *MessageQueueBroker) checkBrokerStatus(candidates []pb.ServerAddres wg.Wait() return } - -func (broker *MessageQueueBroker) saveSegmentOnFiler(segment *mq.Segment, brokers []pb.ServerAddress) (err error) { - return -} |
