aboutsummaryrefslogtreecommitdiff
path: root/weed/mq/broker/brokder_grpc_admin.go
diff options
context:
space:
mode:
Diffstat (limited to 'weed/mq/broker/brokder_grpc_admin.go')
-rw-r--r--weed/mq/broker/brokder_grpc_admin.go31
1 files changed, 13 insertions, 18 deletions
diff --git a/weed/mq/broker/brokder_grpc_admin.go b/weed/mq/broker/brokder_grpc_admin.go
index 31b5bb84e..0dfb69c50 100644
--- a/weed/mq/broker/brokder_grpc_admin.go
+++ b/weed/mq/broker/brokder_grpc_admin.go
@@ -53,16 +53,19 @@ func (broker *MessageQueueBroker) AssignSegmentBrokers(c context.Context, reques
if err != nil {
return ret, err
}
- // good if the segment is still on the brokers
- isActive, err := broker.checkSegmentsOnBrokers(segment, existingBrokers)
- if err != nil {
- return ret, err
- }
- if isActive {
- for _, broker := range existingBrokers {
- ret.Brokers = append(ret.Brokers, string(broker))
+
+ if len(existingBrokers) > 0 {
+ // good if the segment is still on the brokers
+ isActive, err := broker.checkSegmentsOnBrokers(segment, existingBrokers)
+ if err != nil {
+ return ret, err
+ }
+ if isActive {
+ for _, broker := range existingBrokers {
+ ret.Brokers = append(ret.Brokers, string(broker))
+ }
+ return ret, nil
}
- return ret, nil
}
// randomly pick up to 10 brokers, and find the ones with the lightest load
@@ -72,7 +75,7 @@ func (broker *MessageQueueBroker) AssignSegmentBrokers(c context.Context, reques
}
// save the allocated brokers info for this segment on the filer
- if err := broker.saveSegmentOnFiler(segment, selectedBrokers); err != nil {
+ if err := broker.saveSegmentBrokersOnFiler(segment, selectedBrokers); err != nil {
return ret, err
}
@@ -82,10 +85,6 @@ func (broker *MessageQueueBroker) AssignSegmentBrokers(c context.Context, reques
return ret, nil
}
-func (broker *MessageQueueBroker) checkSegmentOnFiler(segment *mq.Segment) (brokers []pb.ServerAddress, err error) {
- return
-}
-
func (broker *MessageQueueBroker) checkSegmentsOnBrokers(segment *mq.Segment, brokers []pb.ServerAddress) (active bool, err error) {
var wg sync.WaitGroup
@@ -206,7 +205,3 @@ func (broker *MessageQueueBroker) checkBrokerStatus(candidates []pb.ServerAddres
wg.Wait()
return
}
-
-func (broker *MessageQueueBroker) saveSegmentOnFiler(segment *mq.Segment, brokers []pb.ServerAddress) (err error) {
- return
-}