aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorchrislu <chris.lu@gmail.com>2024-01-22 10:47:39 -0800
committerchrislu <chris.lu@gmail.com>2024-01-22 10:47:39 -0800
commite8b05ecc917464bba42c839ec2ddea7fd3a22e58 (patch)
tree2a8134f898124876c05cc7c4888e605ce8411301
parent861ad732cad60f8caa19f2524bf150e74d61aa73 (diff)
downloadseaweedfs-e8b05ecc917464bba42c839ec2ddea7fd3a22e58.tar.xz
seaweedfs-e8b05ecc917464bba42c839ec2ddea7fd3a22e58.zip
add/remove assigned partitions
-rw-r--r--weed/mq/broker/broker_grpc_assign.go52
-rw-r--r--weed/mq/broker/broker_grpc_configure.go104
-rw-r--r--weed/mq/broker/broker_grpc_lookup.go10
-rw-r--r--weed/mq/pub_balancer/broker_stats.go8
4 files changed, 103 insertions, 71 deletions
diff --git a/weed/mq/broker/broker_grpc_assign.go b/weed/mq/broker/broker_grpc_assign.go
new file mode 100644
index 000000000..323c0055c
--- /dev/null
+++ b/weed/mq/broker/broker_grpc_assign.go
@@ -0,0 +1,52 @@
+package broker
+
+import (
+ "context"
+ "fmt"
+ "github.com/seaweedfs/seaweedfs/weed/glog"
+ "github.com/seaweedfs/seaweedfs/weed/mq/topic"
+ "github.com/seaweedfs/seaweedfs/weed/pb"
+ "github.com/seaweedfs/seaweedfs/weed/pb/mq_pb"
+)
+
+// AssignTopicPartitions Runs on the assigned broker, to execute the topic partition assignment
+func (b *MessageQueueBroker) AssignTopicPartitions(c context.Context, request *mq_pb.AssignTopicPartitionsRequest) (*mq_pb.AssignTopicPartitionsResponse, error) {
+ ret := &mq_pb.AssignTopicPartitionsResponse{}
+ self := pb.ServerAddress(fmt.Sprintf("%s:%d", b.option.Ip, b.option.Port))
+
+ // drain existing topic partition subscriptions
+ for _, assignment := range request.BrokerPartitionAssignments {
+ t := topic.FromPbTopic(request.Topic)
+ partition := topic.FromPbPartition(assignment.Partition)
+ b.accessLock.Lock()
+ if request.IsDraining {
+ // TODO drain existing topic partition subscriptions
+ b.localTopicManager.RemoveTopicPartition(t, partition)
+ } else {
+ var localPartition *topic.LocalPartition
+ if localPartition = b.localTopicManager.GetTopicPartition(t, partition); localPartition == nil {
+ localPartition = topic.FromPbBrokerPartitionAssignment(self, partition, assignment, b.genLogFlushFunc(t, assignment.Partition), b.genLogOnDiskReadFunc(t, assignment.Partition))
+ b.localTopicManager.AddTopicPartition(t, localPartition)
+ }
+ }
+ b.accessLock.Unlock()
+ }
+
+ // if is leader, notify the followers to drain existing topic partition subscriptions
+ if request.IsLeader {
+ for _, brokerPartition := range request.BrokerPartitionAssignments {
+ for _, follower := range brokerPartition.FollowerBrokers {
+ err := pb.WithBrokerGrpcClient(false, follower, b.grpcDialOption, func(client mq_pb.SeaweedMessagingClient) error {
+ _, err := client.AssignTopicPartitions(context.Background(), request)
+ return err
+ })
+ if err != nil {
+ return ret, err
+ }
+ }
+ }
+ }
+
+ glog.V(0).Infof("AssignTopicPartitions: topic %s partition assignments: %v", request.Topic, request.BrokerPartitionAssignments)
+ return ret, nil
+}
diff --git a/weed/mq/broker/broker_grpc_configure.go b/weed/mq/broker/broker_grpc_configure.go
index e0f9319a4..9292a6184 100644
--- a/weed/mq/broker/broker_grpc_configure.go
+++ b/weed/mq/broker/broker_grpc_configure.go
@@ -10,6 +10,7 @@ import (
"github.com/seaweedfs/seaweedfs/weed/pb/mq_pb"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
+ "sync"
)
// ConfigureTopic Runs on any broker, but proxied to the balancer if not the balancer
@@ -54,7 +55,7 @@ func (b *MessageQueueBroker) ConfigureTopic(ctx context.Context, request *mq_pb.
b.Balancer.OnPartitionChange(request.Topic, resp.BrokerPartitionAssignments)
}
- if assignErr := b.assignTopicPartitionsToBrokers(ctx, request.Topic, resp.BrokerPartitionAssignments); assignErr != nil {
+ if assignErr := b.assignTopicPartitionsToBrokers(ctx, request.Topic, resp.BrokerPartitionAssignments, true); assignErr != nil {
return nil, assignErr
}
@@ -63,77 +64,46 @@ func (b *MessageQueueBroker) ConfigureTopic(ctx context.Context, request *mq_pb.
return resp, err
}
-func (b *MessageQueueBroker) assignTopicPartitionsToBrokers(ctx context.Context, t *mq_pb.Topic, assignments []*mq_pb.BrokerPartitionAssignment) error {
+func (b *MessageQueueBroker) assignTopicPartitionsToBrokers(ctx context.Context, t *mq_pb.Topic, assignments []*mq_pb.BrokerPartitionAssignment, isAdd bool) error {
+ // notify the brokers to create the topic partitions in parallel
+ var wg sync.WaitGroup
for _, bpa := range assignments {
- fmt.Printf("create topic %s partition %+v on %s\n", t, bpa.Partition, bpa.LeaderBroker)
- if doCreateErr := b.withBrokerClient(false, pb.ServerAddress(bpa.LeaderBroker), func(client mq_pb.SeaweedMessagingClient) error {
- _, doCreateErr := client.AssignTopicPartitions(ctx, &mq_pb.AssignTopicPartitionsRequest{
- Topic: t,
- BrokerPartitionAssignments: []*mq_pb.BrokerPartitionAssignment{
- {
- Partition: bpa.Partition,
+ wg.Add(1)
+ go func(bpa *mq_pb.BrokerPartitionAssignment) {
+ defer wg.Done()
+ if doCreateErr := b.withBrokerClient(false, pb.ServerAddress(bpa.LeaderBroker), func(client mq_pb.SeaweedMessagingClient) error {
+ _, doCreateErr := client.AssignTopicPartitions(ctx, &mq_pb.AssignTopicPartitionsRequest{
+ Topic: t,
+ BrokerPartitionAssignments: []*mq_pb.BrokerPartitionAssignment{
+ {
+ Partition: bpa.Partition,
+ },
},
- },
- IsLeader: true,
- IsDraining: false,
- })
- if doCreateErr != nil {
- return fmt.Errorf("do create topic %s on %s: %v", t, bpa.LeaderBroker, doCreateErr)
- }
- brokerStats, found := b.Balancer.Brokers.Get(bpa.LeaderBroker)
- if !found {
- brokerStats = pub_balancer.NewBrokerStats()
- if !b.Balancer.Brokers.SetIfAbsent(bpa.LeaderBroker, brokerStats) {
- brokerStats, _ = b.Balancer.Brokers.Get(bpa.LeaderBroker)
- }
- }
- brokerStats.RegisterAssignment(t, bpa.Partition)
- return nil
- }); doCreateErr != nil {
- return doCreateErr
- }
- }
- return nil
-}
-
-// AssignTopicPartitions Runs on the assigned broker, to execute the topic partition assignment
-func (b *MessageQueueBroker) AssignTopicPartitions(c context.Context, request *mq_pb.AssignTopicPartitionsRequest) (*mq_pb.AssignTopicPartitionsResponse, error) {
- ret := &mq_pb.AssignTopicPartitionsResponse{}
- self := pb.ServerAddress(fmt.Sprintf("%s:%d", b.option.Ip, b.option.Port))
-
- // drain existing topic partition subscriptions
- for _, assignment := range request.BrokerPartitionAssignments {
- t := topic.FromPbTopic(request.Topic)
- partition := topic.FromPbPartition(assignment.Partition)
- b.accessLock.Lock()
- if request.IsDraining {
- // TODO drain existing topic partition subscriptions
- b.localTopicManager.RemoveTopicPartition(t, partition)
- } else {
- var localPartition *topic.LocalPartition
- if localPartition = b.localTopicManager.GetTopicPartition(t, partition); localPartition == nil {
- localPartition = topic.FromPbBrokerPartitionAssignment(self, partition, assignment, b.genLogFlushFunc(t, assignment.Partition), b.genLogOnDiskReadFunc(t, assignment.Partition))
- b.localTopicManager.AddTopicPartition(t, localPartition)
- }
- }
- b.accessLock.Unlock()
- }
-
- // if is leader, notify the followers to drain existing topic partition subscriptions
- if request.IsLeader {
- for _, brokerPartition := range request.BrokerPartitionAssignments {
- for _, follower := range brokerPartition.FollowerBrokers {
- err := pb.WithBrokerGrpcClient(false, follower, b.grpcDialOption, func(client mq_pb.SeaweedMessagingClient) error {
- _, err := client.AssignTopicPartitions(context.Background(), request)
- return err
+ IsLeader: true,
+ IsDraining: !isAdd,
})
- if err != nil {
- return ret, err
+ if doCreateErr != nil {
+ if !isAdd {
+ return fmt.Errorf("drain topic %s %v on %s: %v", t, bpa.LeaderBroker, bpa.Partition, doCreateErr)
+ } else {
+ return fmt.Errorf("create topic %s %v on %s: %v", t, bpa.LeaderBroker, bpa.Partition, doCreateErr)
+ }
+ }
+ brokerStats, found := b.Balancer.Brokers.Get(bpa.LeaderBroker)
+ if !found {
+ brokerStats = pub_balancer.NewBrokerStats()
+ if !b.Balancer.Brokers.SetIfAbsent(bpa.LeaderBroker, brokerStats) {
+ brokerStats, _ = b.Balancer.Brokers.Get(bpa.LeaderBroker)
+ }
}
+ brokerStats.RegisterAssignment(t, bpa.Partition, isAdd)
+ return nil
+ }); doCreateErr != nil {
+ glog.Errorf("create topic %s partition %+v on %s: %v", t, bpa.Partition, bpa.LeaderBroker, doCreateErr)
}
- }
+ }(bpa)
}
+ wg.Wait()
- glog.V(0).Infof("AssignTopicPartitions: topic %s partition assignments: %v", request.Topic, request.BrokerPartitionAssignments)
- return ret, nil
+ return nil
}
diff --git a/weed/mq/broker/broker_grpc_lookup.go b/weed/mq/broker/broker_grpc_lookup.go
index 0ba0b628c..4ba1a0f75 100644
--- a/weed/mq/broker/broker_grpc_lookup.go
+++ b/weed/mq/broker/broker_grpc_lookup.go
@@ -3,6 +3,7 @@ package broker
import (
"context"
"fmt"
+ "github.com/seaweedfs/seaweedfs/weed/glog"
"github.com/seaweedfs/seaweedfs/weed/mq/topic"
"github.com/seaweedfs/seaweedfs/weed/pb/mq_pb"
"google.golang.org/grpc/codes"
@@ -25,12 +26,17 @@ func (b *MessageQueueBroker) LookupTopicBrokers(ctx context.Context, request *mq
return resp, err
}
+ t := topic.FromPbTopic(request.Topic)
ret := &mq_pb.LookupTopicBrokersResponse{}
+ conf := &mq_pb.ConfigureTopicResponse{}
ret.Topic = request.Topic
- conf, err := b.readTopicConfFromFiler(topic.FromPbTopic(request.Topic))
- if err == nil {
+ if conf, err = b.readTopicConfFromFiler(t); err != nil {
+ glog.V(0).Infof("lookup topic %s conf: %v", request.Topic, err)
ret.BrokerPartitionAssignments = conf.BrokerPartitionAssignments
+ } else {
+ err = b.ensureTopicActiveAssignments(t, conf)
}
+
return ret, err
}
diff --git a/weed/mq/pub_balancer/broker_stats.go b/weed/mq/pub_balancer/broker_stats.go
index 2ae123822..45c5271df 100644
--- a/weed/mq/pub_balancer/broker_stats.go
+++ b/weed/mq/pub_balancer/broker_stats.go
@@ -63,7 +63,7 @@ func (bs *BrokerStats) UpdateStats(stats *mq_pb.BrokerStats) {
}
-func (bs *BrokerStats) RegisterAssignment(t *mq_pb.Topic, partition *mq_pb.Partition) {
+func (bs *BrokerStats) RegisterAssignment(t *mq_pb.Topic, partition *mq_pb.Partition, isAdd bool) {
tps := &TopicPartitionStats{
TopicPartition: topic.TopicPartition{
Topic: topic.Topic{Namespace: t.Namespace, Name: t.Name},
@@ -78,5 +78,9 @@ func (bs *BrokerStats) RegisterAssignment(t *mq_pb.Topic, partition *mq_pb.Parti
IsLeader: true,
}
key := tps.TopicPartition.String()
- bs.TopicPartitionStats.Set(key, tps)
+ if isAdd {
+ bs.TopicPartitionStats.SetIfAbsent(key, tps)
+ } else {
+ bs.TopicPartitionStats.Remove(key)
+ }
}