aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--weed/mq/broker/broker_grpc_assign.go48
-rw-r--r--weed/mq/broker/broker_grpc_configure.go46
2 files changed, 48 insertions, 46 deletions
diff --git a/weed/mq/broker/broker_grpc_assign.go b/weed/mq/broker/broker_grpc_assign.go
index 323c0055c..264565b7b 100644
--- a/weed/mq/broker/broker_grpc_assign.go
+++ b/weed/mq/broker/broker_grpc_assign.go
@@ -4,9 +4,11 @@ import (
"context"
"fmt"
"github.com/seaweedfs/seaweedfs/weed/glog"
+ "github.com/seaweedfs/seaweedfs/weed/mq/pub_balancer"
"github.com/seaweedfs/seaweedfs/weed/mq/topic"
"github.com/seaweedfs/seaweedfs/weed/pb"
"github.com/seaweedfs/seaweedfs/weed/pb/mq_pb"
+ "sync"
)
// AssignTopicPartitions Runs on the assigned broker, to execute the topic partition assignment
@@ -50,3 +52,49 @@ func (b *MessageQueueBroker) AssignTopicPartitions(c context.Context, request *m
glog.V(0).Infof("AssignTopicPartitions: topic %s partition assignments: %v", request.Topic, request.BrokerPartitionAssignments)
return ret, nil
}
+
+// called by broker leader to drain existing partitions.
+// new/updated partitions will be detected by broker from the filer
+func (b *MessageQueueBroker) assignTopicPartitionsToBrokers(ctx context.Context, t *mq_pb.Topic, assignments []*mq_pb.BrokerPartitionAssignment, isAdd bool) error {
+ // notify the brokers to create the topic partitions in parallel
+ var wg sync.WaitGroup
+ for _, bpa := range assignments {
+ wg.Add(1)
+ go func(bpa *mq_pb.BrokerPartitionAssignment) {
+ defer wg.Done()
+ if doCreateErr := b.withBrokerClient(false, pb.ServerAddress(bpa.LeaderBroker), func(client mq_pb.SeaweedMessagingClient) error {
+ _, doCreateErr := client.AssignTopicPartitions(ctx, &mq_pb.AssignTopicPartitionsRequest{
+ Topic: t,
+ BrokerPartitionAssignments: []*mq_pb.BrokerPartitionAssignment{
+ {
+ Partition: bpa.Partition,
+ },
+ },
+ IsLeader: true,
+ IsDraining: !isAdd,
+ })
+ if doCreateErr != nil {
+ if !isAdd {
+ return fmt.Errorf("drain topic %s %v on %s: %v", t, bpa.LeaderBroker, bpa.Partition, doCreateErr)
+ } else {
+ return fmt.Errorf("create topic %s %v on %s: %v", t, bpa.LeaderBroker, bpa.Partition, doCreateErr)
+ }
+ }
+ brokerStats, found := b.Balancer.Brokers.Get(bpa.LeaderBroker)
+ if !found {
+ brokerStats = pub_balancer.NewBrokerStats()
+ if !b.Balancer.Brokers.SetIfAbsent(bpa.LeaderBroker, brokerStats) {
+ brokerStats, _ = b.Balancer.Brokers.Get(bpa.LeaderBroker)
+ }
+ }
+ brokerStats.RegisterAssignment(t, bpa.Partition, isAdd)
+ return nil
+ }); doCreateErr != nil {
+ glog.Errorf("create topic %s partition %+v on %s: %v", t, bpa.Partition, bpa.LeaderBroker, doCreateErr)
+ }
+ }(bpa)
+ }
+ wg.Wait()
+
+ return nil
+}
diff --git a/weed/mq/broker/broker_grpc_configure.go b/weed/mq/broker/broker_grpc_configure.go
index c1984c05e..f5bcceb44 100644
--- a/weed/mq/broker/broker_grpc_configure.go
+++ b/weed/mq/broker/broker_grpc_configure.go
@@ -6,11 +6,9 @@ import (
"github.com/seaweedfs/seaweedfs/weed/glog"
"github.com/seaweedfs/seaweedfs/weed/mq/pub_balancer"
"github.com/seaweedfs/seaweedfs/weed/mq/topic"
- "github.com/seaweedfs/seaweedfs/weed/pb"
"github.com/seaweedfs/seaweedfs/weed/pb/mq_pb"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
- "sync"
)
// ConfigureTopic Runs on any broker, but proxied to the balancer if not the balancer
@@ -62,47 +60,3 @@ func (b *MessageQueueBroker) ConfigureTopic(ctx context.Context, request *mq_pb.
return resp, err
}
-
-func (b *MessageQueueBroker) assignTopicPartitionsToBrokers(ctx context.Context, t *mq_pb.Topic, assignments []*mq_pb.BrokerPartitionAssignment, isAdd bool) error {
- // notify the brokers to create the topic partitions in parallel
- var wg sync.WaitGroup
- for _, bpa := range assignments {
- wg.Add(1)
- go func(bpa *mq_pb.BrokerPartitionAssignment) {
- defer wg.Done()
- if doCreateErr := b.withBrokerClient(false, pb.ServerAddress(bpa.LeaderBroker), func(client mq_pb.SeaweedMessagingClient) error {
- _, doCreateErr := client.AssignTopicPartitions(ctx, &mq_pb.AssignTopicPartitionsRequest{
- Topic: t,
- BrokerPartitionAssignments: []*mq_pb.BrokerPartitionAssignment{
- {
- Partition: bpa.Partition,
- },
- },
- IsLeader: true,
- IsDraining: !isAdd,
- })
- if doCreateErr != nil {
- if !isAdd {
- return fmt.Errorf("drain topic %s %v on %s: %v", t, bpa.LeaderBroker, bpa.Partition, doCreateErr)
- } else {
- return fmt.Errorf("create topic %s %v on %s: %v", t, bpa.LeaderBroker, bpa.Partition, doCreateErr)
- }
- }
- brokerStats, found := b.Balancer.Brokers.Get(bpa.LeaderBroker)
- if !found {
- brokerStats = pub_balancer.NewBrokerStats()
- if !b.Balancer.Brokers.SetIfAbsent(bpa.LeaderBroker, brokerStats) {
- brokerStats, _ = b.Balancer.Brokers.Get(bpa.LeaderBroker)
- }
- }
- brokerStats.RegisterAssignment(t, bpa.Partition, isAdd)
- return nil
- }); doCreateErr != nil {
- glog.Errorf("create topic %s partition %+v on %s: %v", t, bpa.Partition, bpa.LeaderBroker, doCreateErr)
- }
- }(bpa)
- }
- wg.Wait()
-
- return nil
-}