aboutsummaryrefslogtreecommitdiff
path: root/weed/mq
diff options
context:
space:
mode:
Diffstat (limited to 'weed/mq')
-rw-r--r--weed/mq/broker/broker_grpc_admin.go225
-rw-r--r--weed/mq/broker/broker_grpc_balance.go31
-rw-r--r--weed/mq/broker/broker_grpc_configure.go107
-rw-r--r--weed/mq/broker/broker_grpc_create.go72
-rw-r--r--weed/mq/broker/broker_grpc_lookup.go28
-rw-r--r--weed/mq/broker/broker_grpc_pub.go134
-rw-r--r--weed/mq/broker/broker_grpc_pub_balancer.go (renamed from weed/mq/broker/broker_grpc_balancer.go)26
-rw-r--r--weed/mq/broker/broker_grpc_sub.go56
-rw-r--r--weed/mq/broker/broker_grpc_sub_coordinator.go77
-rw-r--r--weed/mq/broker/broker_grpc_topic_partition_control.go28
-rw-r--r--weed/mq/broker/broker_segment_serde.go89
-rw-r--r--weed/mq/broker/broker_server.go51
-rw-r--r--weed/mq/broker/broker_stats.go30
-rw-r--r--weed/mq/client/cmd/weed_pub/publisher.go14
-rw-r--r--weed/mq/client/cmd/weed_sub/subscriber.go18
-rw-r--r--weed/mq/client/pub_client/connect.go73
-rw-r--r--weed/mq/client/pub_client/lookup.go116
-rw-r--r--weed/mq/client/pub_client/publish.go4
-rw-r--r--weed/mq/client/pub_client/publisher.go26
-rw-r--r--weed/mq/client/sub_client/process.go4
-rw-r--r--weed/mq/client/sub_client/subscribe.go17
-rw-r--r--weed/mq/client/sub_client/subscriber.go20
-rw-r--r--weed/mq/coordinator/consumer_group.go92
-rw-r--r--weed/mq/coordinator/coordinator.go36
-rw-r--r--weed/mq/pub_balancer/allocate.go (renamed from weed/mq/balancer/allocate.go)4
-rw-r--r--weed/mq/pub_balancer/allocate_test.go (renamed from weed/mq/balancer/allocate_test.go)2
-rw-r--r--weed/mq/pub_balancer/balance.go73
-rw-r--r--weed/mq/pub_balancer/balance_action.go58
-rw-r--r--weed/mq/pub_balancer/balance_action_split.go43
-rw-r--r--weed/mq/pub_balancer/balance_brokers.go52
-rw-r--r--weed/mq/pub_balancer/balance_brokers_test.go75
-rw-r--r--weed/mq/pub_balancer/balancer.go83
-rw-r--r--weed/mq/pub_balancer/broker_stats.go (renamed from weed/mq/balancer/balancer.go)77
-rw-r--r--weed/mq/pub_balancer/lookup.go (renamed from weed/mq/balancer/lookup.go)19
-rw-r--r--weed/mq/pub_balancer/partition_list_broker.go50
-rw-r--r--weed/mq/pub_balancer/repair.go127
-rw-r--r--weed/mq/pub_balancer/repair_test.go97
-rw-r--r--weed/mq/sub_coordinator/consumer_group.go41
-rw-r--r--weed/mq/sub_coordinator/coordinator.go86
-rw-r--r--weed/mq/sub_coordinator/partition_consumer_mapping.go119
-rw-r--r--weed/mq/sub_coordinator/partition_consumer_mapping_test.go312
-rw-r--r--weed/mq/sub_coordinator/partition_list.go32
-rw-r--r--weed/mq/topic/local_manager.go29
-rw-r--r--weed/mq/topic/local_partition.go59
-rw-r--r--weed/mq/topic/local_partition_publishers.go52
-rw-r--r--weed/mq/topic/local_partition_subscribers.go49
-rw-r--r--weed/mq/topic/local_topic.go58
-rw-r--r--weed/mq/topic/partition.go17
-rw-r--r--weed/mq/topic/topic.go48
49 files changed, 2134 insertions, 902 deletions
diff --git a/weed/mq/broker/broker_grpc_admin.go b/weed/mq/broker/broker_grpc_admin.go
index b24bf08a4..1313d09ec 100644
--- a/weed/mq/broker/broker_grpc_admin.go
+++ b/weed/mq/broker/broker_grpc_admin.go
@@ -3,18 +3,13 @@ package broker
import (
"context"
"github.com/seaweedfs/seaweedfs/weed/cluster"
- "github.com/seaweedfs/seaweedfs/weed/glog"
- "github.com/seaweedfs/seaweedfs/weed/mq/topic"
- "github.com/seaweedfs/seaweedfs/weed/pb"
"github.com/seaweedfs/seaweedfs/weed/pb/master_pb"
"github.com/seaweedfs/seaweedfs/weed/pb/mq_pb"
- "sort"
- "sync"
)
-func (broker *MessageQueueBroker) FindBrokerLeader(c context.Context, request *mq_pb.FindBrokerLeaderRequest) (*mq_pb.FindBrokerLeaderResponse, error) {
+func (b *MessageQueueBroker) FindBrokerLeader(c context.Context, request *mq_pb.FindBrokerLeaderRequest) (*mq_pb.FindBrokerLeaderResponse, error) {
ret := &mq_pb.FindBrokerLeaderResponse{}
- err := broker.withMasterClient(false, broker.MasterClient.GetMaster(), func(client master_pb.SeaweedClient) error {
+ err := b.withMasterClient(false, b.MasterClient.GetMaster(), func(client master_pb.SeaweedClient) error {
resp, err := client.ListClusterNodes(context.Background(), &master_pb.ListClusterNodesRequest{
ClientType: cluster.BrokerType,
FilerGroup: request.FilerGroup,
@@ -30,219 +25,3 @@ func (broker *MessageQueueBroker) FindBrokerLeader(c context.Context, request *m
})
return ret, err
}
-
-func (broker *MessageQueueBroker) AssignSegmentBrokers(c context.Context, request *mq_pb.AssignSegmentBrokersRequest) (*mq_pb.AssignSegmentBrokersResponse, error) {
- ret := &mq_pb.AssignSegmentBrokersResponse{}
- segment := topic.FromPbSegment(request.Segment)
-
- // check existing segment locations on filer
- existingBrokers, err := broker.checkSegmentOnFiler(segment)
- if err != nil {
- return ret, err
- }
-
- if len(existingBrokers) > 0 {
- // good if the segment is still on the brokers
- isActive, err := broker.checkSegmentsOnBrokers(segment, existingBrokers)
- if err != nil {
- return ret, err
- }
- if isActive {
- for _, broker := range existingBrokers {
- ret.Brokers = append(ret.Brokers, string(broker))
- }
- return ret, nil
- }
- }
-
- // randomly pick up to 10 brokers, and find the ones with the lightest load
- selectedBrokers, err := broker.selectBrokers()
- if err != nil {
- return ret, err
- }
-
- // save the allocated brokers info for this segment on the filer
- if err := broker.saveSegmentBrokersOnFiler(segment, selectedBrokers); err != nil {
- return ret, err
- }
-
- for _, broker := range selectedBrokers {
- ret.Brokers = append(ret.Brokers, string(broker))
- }
- return ret, nil
-}
-
-func (broker *MessageQueueBroker) CheckSegmentStatus(c context.Context, request *mq_pb.CheckSegmentStatusRequest) (*mq_pb.CheckSegmentStatusResponse, error) {
- ret := &mq_pb.CheckSegmentStatusResponse{}
- // TODO add in memory active segment
- return ret, nil
-}
-
-func (broker *MessageQueueBroker) CheckBrokerLoad(c context.Context, request *mq_pb.CheckBrokerLoadRequest) (*mq_pb.CheckBrokerLoadResponse, error) {
- ret := &mq_pb.CheckBrokerLoadResponse{}
- // TODO read broker's load
- return ret, nil
-}
-
-// createOrUpdateTopicPartitions creates the topic partitions on the broker
-// 1. check
-func (broker *MessageQueueBroker) createOrUpdateTopicPartitions(topic *topic.Topic, prevAssignments []*mq_pb.BrokerPartitionAssignment) (err error) {
- // create or update each partition
- if prevAssignments == nil {
- broker.createOrUpdateTopicPartition(topic, nil)
- } else {
- for _, brokerPartitionAssignment := range prevAssignments {
- broker.createOrUpdateTopicPartition(topic, brokerPartitionAssignment)
- }
- }
- return nil
-}
-
-func (broker *MessageQueueBroker) createOrUpdateTopicPartition(topic *topic.Topic, oldAssignment *mq_pb.BrokerPartitionAssignment) (newAssignment *mq_pb.BrokerPartitionAssignment) {
- shouldCreate := broker.confirmBrokerPartitionAssignment(topic, oldAssignment)
- if !shouldCreate {
-
- }
- return
-}
-func (broker *MessageQueueBroker) confirmBrokerPartitionAssignment(topic *topic.Topic, oldAssignment *mq_pb.BrokerPartitionAssignment) (shouldCreate bool) {
- if oldAssignment == nil {
- return true
- }
- for _, b := range oldAssignment.FollowerBrokers {
- pb.WithBrokerGrpcClient(false, b, broker.grpcDialOption, func(client mq_pb.SeaweedMessagingClient) error {
- _, err := client.CheckTopicPartitionsStatus(context.Background(), &mq_pb.CheckTopicPartitionsStatusRequest{
- Namespace: string(topic.Namespace),
- Topic: topic.Name,
- BrokerPartitionAssignment: oldAssignment,
- ShouldCancelIfNotMatch: true,
- })
- if err != nil {
- shouldCreate = true
- }
- return nil
- })
- }
- return
-}
-
-func (broker *MessageQueueBroker) checkSegmentsOnBrokers(segment *topic.Segment, brokers []pb.ServerAddress) (active bool, err error) {
- var wg sync.WaitGroup
-
- for _, candidate := range brokers {
- wg.Add(1)
- go func(candidate pb.ServerAddress) {
- defer wg.Done()
- broker.withBrokerClient(false, candidate, func(client mq_pb.SeaweedMessagingClient) error {
- resp, checkErr := client.CheckSegmentStatus(context.Background(), &mq_pb.CheckSegmentStatusRequest{
- Segment: &mq_pb.Segment{
- Namespace: string(segment.Topic.Namespace),
- Topic: segment.Topic.Name,
- Id: segment.Id,
- },
- })
- if checkErr != nil {
- err = checkErr
- glog.V(0).Infof("check segment status on %s: %v", candidate, checkErr)
- return nil
- }
- if resp.IsActive == false {
- active = false
- }
- return nil
- })
- }(candidate)
- }
- wg.Wait()
- return
-}
-
-func (broker *MessageQueueBroker) selectBrokers() (brokers []pb.ServerAddress, err error) {
- candidates, err := broker.selectCandidatesFromMaster(10)
- if err != nil {
- return
- }
- brokers, err = broker.pickLightestCandidates(candidates, 3)
- return
-}
-
-func (broker *MessageQueueBroker) selectCandidatesFromMaster(limit int32) (candidates []pb.ServerAddress, err error) {
- err = broker.withMasterClient(false, broker.MasterClient.GetMaster(), func(client master_pb.SeaweedClient) error {
- resp, err := client.ListClusterNodes(context.Background(), &master_pb.ListClusterNodesRequest{
- ClientType: cluster.BrokerType,
- FilerGroup: broker.option.FilerGroup,
- Limit: limit,
- })
- if err != nil {
- return err
- }
- if len(resp.ClusterNodes) == 0 {
- return nil
- }
- for _, node := range resp.ClusterNodes {
- candidates = append(candidates, pb.ServerAddress(node.Address))
- }
- return nil
- })
- return
-}
-
-type CandidateStatus struct {
- address pb.ServerAddress
- messageCount int64
- bytesCount int64
- load int64
-}
-
-func (broker *MessageQueueBroker) pickLightestCandidates(candidates []pb.ServerAddress, limit int) (selected []pb.ServerAddress, err error) {
-
- if len(candidates) <= limit {
- return candidates, nil
- }
-
- candidateStatuses, err := broker.checkBrokerStatus(candidates)
- if err != nil {
- return nil, err
- }
-
- sort.Slice(candidateStatuses, func(i, j int) bool {
- return candidateStatuses[i].load < candidateStatuses[j].load
- })
-
- for i, candidate := range candidateStatuses {
- if i >= limit {
- break
- }
- selected = append(selected, candidate.address)
- }
-
- return
-}
-
-func (broker *MessageQueueBroker) checkBrokerStatus(candidates []pb.ServerAddress) (candidateStatuses []*CandidateStatus, err error) {
-
- candidateStatuses = make([]*CandidateStatus, len(candidates))
- var wg sync.WaitGroup
- for i, candidate := range candidates {
- wg.Add(1)
- go func(i int, candidate pb.ServerAddress) {
- defer wg.Done()
- err = broker.withBrokerClient(false, candidate, func(client mq_pb.SeaweedMessagingClient) error {
- resp, checkErr := client.CheckBrokerLoad(context.Background(), &mq_pb.CheckBrokerLoadRequest{})
- if checkErr != nil {
- err = checkErr
- return err
- }
- candidateStatuses[i] = &CandidateStatus{
- address: candidate,
- messageCount: resp.MessageCount,
- bytesCount: resp.BytesCount,
- load: resp.MessageCount + resp.BytesCount/(64*1024),
- }
- return nil
- })
- }(i, candidate)
- }
- wg.Wait()
- return
-}
diff --git a/weed/mq/broker/broker_grpc_balance.go b/weed/mq/broker/broker_grpc_balance.go
new file mode 100644
index 000000000..c09161ff9
--- /dev/null
+++ b/weed/mq/broker/broker_grpc_balance.go
@@ -0,0 +1,31 @@
+package broker
+
+import (
+ "context"
+ "github.com/seaweedfs/seaweedfs/weed/pb/mq_pb"
+ "google.golang.org/grpc/codes"
+ "google.golang.org/grpc/status"
+)
+
+func (b *MessageQueueBroker) BalanceTopics(ctx context.Context, request *mq_pb.BalanceTopicsRequest) (resp *mq_pb.BalanceTopicsResponse, err error) {
+ if b.currentBalancer == "" {
+ return nil, status.Errorf(codes.Unavailable, "no balancer")
+ }
+ if !b.lockAsBalancer.IsLocked() {
+ proxyErr := b.withBrokerClient(false, b.currentBalancer, func(client mq_pb.SeaweedMessagingClient) error {
+ resp, err = client.BalanceTopics(ctx, request)
+ return nil
+ })
+ if proxyErr != nil {
+ return nil, proxyErr
+ }
+ return resp, err
+ }
+
+ ret := &mq_pb.BalanceTopicsResponse{}
+
+ actions := b.Balancer.BalancePublishers()
+ err = b.Balancer.ExecuteBalanceAction(actions, b.grpcDialOption)
+
+ return ret, err
+}
diff --git a/weed/mq/broker/broker_grpc_configure.go b/weed/mq/broker/broker_grpc_configure.go
new file mode 100644
index 000000000..7f7c8f84b
--- /dev/null
+++ b/weed/mq/broker/broker_grpc_configure.go
@@ -0,0 +1,107 @@
+package broker
+
+import (
+ "context"
+ "fmt"
+ "github.com/seaweedfs/seaweedfs/weed/mq/pub_balancer"
+ "github.com/seaweedfs/seaweedfs/weed/mq/topic"
+ "github.com/seaweedfs/seaweedfs/weed/pb"
+ "github.com/seaweedfs/seaweedfs/weed/pb/mq_pb"
+ "google.golang.org/grpc/codes"
+ "google.golang.org/grpc/status"
+)
+
+// ConfigureTopic Runs on any broker, but proxied to the balancer if not the balancer
+// It generates an assignments based on existing allocations,
+// and then assign the partitions to the brokers.
+func (b *MessageQueueBroker) ConfigureTopic(ctx context.Context, request *mq_pb.ConfigureTopicRequest) (resp *mq_pb.ConfigureTopicResponse, err error) {
+ if b.currentBalancer == "" {
+ return nil, status.Errorf(codes.Unavailable, "no balancer")
+ }
+ if !b.lockAsBalancer.IsLocked() {
+ proxyErr := b.withBrokerClient(false, b.currentBalancer, func(client mq_pb.SeaweedMessagingClient) error {
+ resp, err = client.ConfigureTopic(ctx, request)
+ return nil
+ })
+ if proxyErr != nil {
+ return nil, proxyErr
+ }
+ return resp, err
+ }
+
+ ret := &mq_pb.ConfigureTopicResponse{}
+ ret.BrokerPartitionAssignments, err = b.Balancer.LookupOrAllocateTopicPartitions(request.Topic, true, request.PartitionCount)
+
+ for _, bpa := range ret.BrokerPartitionAssignments {
+ // fmt.Printf("create topic %s on %s\n", request.Topic, bpa.LeaderBroker)
+ if doCreateErr := b.withBrokerClient(false, pb.ServerAddress(bpa.LeaderBroker), func(client mq_pb.SeaweedMessagingClient) error {
+ _, doCreateErr := client.AssignTopicPartitions(ctx, &mq_pb.AssignTopicPartitionsRequest{
+ Topic: request.Topic,
+ BrokerPartitionAssignments: []*mq_pb.BrokerPartitionAssignment{
+ {
+ Partition: bpa.Partition,
+ },
+ },
+ IsLeader: true,
+ IsDraining: false,
+ })
+ if doCreateErr != nil {
+ return fmt.Errorf("do create topic %s on %s: %v", request.Topic, bpa.LeaderBroker, doCreateErr)
+ }
+ brokerStats, found := b.Balancer.Brokers.Get(bpa.LeaderBroker)
+ if !found {
+ brokerStats = pub_balancer.NewBrokerStats()
+ if !b.Balancer.Brokers.SetIfAbsent(bpa.LeaderBroker, brokerStats) {
+ brokerStats, _ = b.Balancer.Brokers.Get(bpa.LeaderBroker)
+ }
+ }
+ brokerStats.RegisterAssignment(request.Topic, bpa.Partition)
+ return nil
+ }); doCreateErr != nil {
+ return nil, doCreateErr
+ }
+ }
+
+ // TODO revert if some error happens in the middle of the assignments
+
+ return ret, err
+}
+
+// AssignTopicPartitions Runs on the assigned broker, to execute the topic partition assignment
+func (b *MessageQueueBroker) AssignTopicPartitions(c context.Context, request *mq_pb.AssignTopicPartitionsRequest) (*mq_pb.AssignTopicPartitionsResponse, error) {
+ ret := &mq_pb.AssignTopicPartitionsResponse{}
+ self := pb.ServerAddress(fmt.Sprintf("%s:%d", b.option.Ip, b.option.Port))
+
+ // drain existing topic partition subscriptions
+ for _, brokerPartition := range request.BrokerPartitionAssignments {
+ localPartition := topic.FromPbBrokerPartitionAssignment(self, brokerPartition)
+ if request.IsDraining {
+ // TODO drain existing topic partition subscriptions
+
+ b.localTopicManager.RemoveTopicPartition(
+ topic.FromPbTopic(request.Topic),
+ localPartition.Partition)
+ } else {
+ b.localTopicManager.AddTopicPartition(
+ topic.FromPbTopic(request.Topic),
+ localPartition)
+ }
+ }
+
+ // if is leader, notify the followers to drain existing topic partition subscriptions
+ if request.IsLeader {
+ for _, brokerPartition := range request.BrokerPartitionAssignments {
+ for _, follower := range brokerPartition.FollowerBrokers {
+ err := pb.WithBrokerGrpcClient(false, follower, b.grpcDialOption, func(client mq_pb.SeaweedMessagingClient) error {
+ _, err := client.AssignTopicPartitions(context.Background(), request)
+ return err
+ })
+ if err != nil {
+ return ret, err
+ }
+ }
+ }
+ }
+
+ return ret, nil
+}
diff --git a/weed/mq/broker/broker_grpc_create.go b/weed/mq/broker/broker_grpc_create.go
deleted file mode 100644
index 15b3efd26..000000000
--- a/weed/mq/broker/broker_grpc_create.go
+++ /dev/null
@@ -1,72 +0,0 @@
-package broker
-
-import (
- "context"
- "fmt"
- "github.com/seaweedfs/seaweedfs/weed/mq/balancer"
- "github.com/seaweedfs/seaweedfs/weed/mq/topic"
- "github.com/seaweedfs/seaweedfs/weed/pb"
- "github.com/seaweedfs/seaweedfs/weed/pb/mq_pb"
- "google.golang.org/grpc/codes"
- "google.golang.org/grpc/status"
-)
-
-// ConfigureTopic Runs on any broker, but proxied to the balancer if not the balancer
-func (broker *MessageQueueBroker) ConfigureTopic(ctx context.Context, request *mq_pb.ConfigureTopicRequest) (resp *mq_pb.ConfigureTopicResponse, err error) {
- if broker.currentBalancer == "" {
- return nil, status.Errorf(codes.Unavailable, "no balancer")
- }
- if !broker.lockAsBalancer.IsLocked() {
- proxyErr := broker.withBrokerClient(false, broker.currentBalancer, func(client mq_pb.SeaweedMessagingClient) error {
- resp, err = client.ConfigureTopic(ctx, request)
- return nil
- })
- if proxyErr != nil {
- return nil, proxyErr
- }
- return resp, err
- }
-
- ret := &mq_pb.ConfigureTopicResponse{}
- ret.BrokerPartitionAssignments, err = broker.Balancer.LookupOrAllocateTopicPartitions(request.Topic, true, request.PartitionCount)
-
- for _, bpa := range ret.BrokerPartitionAssignments {
- // fmt.Printf("create topic %s on %s\n", request.Topic, bpa.LeaderBroker)
- if doCreateErr := broker.withBrokerClient(false, pb.ServerAddress(bpa.LeaderBroker), func(client mq_pb.SeaweedMessagingClient) error {
- _, doCreateErr := client.DoConfigureTopic(ctx, &mq_pb.DoConfigureTopicRequest{
- Topic: request.Topic,
- Partition: bpa.Partition,
- })
- if doCreateErr != nil {
- return fmt.Errorf("do create topic %s on %s: %v", request.Topic, bpa.LeaderBroker, doCreateErr)
- }
- brokerStats, found := broker.Balancer.Brokers.Get(bpa.LeaderBroker)
- if !found {
- brokerStats = balancer.NewBrokerStats()
- if !broker.Balancer.Brokers.SetIfAbsent(bpa.LeaderBroker, brokerStats) {
- brokerStats, _ = broker.Balancer.Brokers.Get(bpa.LeaderBroker)
- }
- }
- brokerStats.RegisterAssignment(request.Topic, bpa.Partition)
- return nil
- }); doCreateErr != nil {
- return nil, doCreateErr
- }
- }
-
- // TODO revert if some error happens in the middle of the assignments
-
- return ret, err
-}
-
-func (broker *MessageQueueBroker) DoConfigureTopic(ctx context.Context, req *mq_pb.DoConfigureTopicRequest) (resp *mq_pb.DoConfigureTopicResponse, err error) {
- ret := &mq_pb.DoConfigureTopicResponse{}
- t, p := topic.FromPbTopic(req.Topic), topic.FromPbPartition(req.Partition)
- localTopicPartition := broker.localTopicManager.GetTopicPartition(t, p)
- if localTopicPartition == nil {
- localTopicPartition = topic.NewLocalPartition(t, p, true, nil)
- broker.localTopicManager.AddTopicPartition(t, localTopicPartition)
- }
-
- return ret, err
-}
diff --git a/weed/mq/broker/broker_grpc_lookup.go b/weed/mq/broker/broker_grpc_lookup.go
index 74a3a9822..4e9c9e441 100644
--- a/weed/mq/broker/broker_grpc_lookup.go
+++ b/weed/mq/broker/broker_grpc_lookup.go
@@ -20,12 +20,12 @@ import (
// 2.2 if the topic is found, return the brokers
//
// 3. unlock the topic
-func (broker *MessageQueueBroker) LookupTopicBrokers(ctx context.Context, request *mq_pb.LookupTopicBrokersRequest) (resp *mq_pb.LookupTopicBrokersResponse, err error) {
- if broker.currentBalancer == "" {
+func (b *MessageQueueBroker) LookupTopicBrokers(ctx context.Context, request *mq_pb.LookupTopicBrokersRequest) (resp *mq_pb.LookupTopicBrokersResponse, err error) {
+ if b.currentBalancer == "" {
return nil, status.Errorf(codes.Unavailable, "no balancer")
}
- if !broker.lockAsBalancer.IsLocked() {
- proxyErr := broker.withBrokerClient(false, broker.currentBalancer, func(client mq_pb.SeaweedMessagingClient) error {
+ if !b.lockAsBalancer.IsLocked() {
+ proxyErr := b.withBrokerClient(false, b.currentBalancer, func(client mq_pb.SeaweedMessagingClient) error {
resp, err = client.LookupTopicBrokers(ctx, request)
return nil
})
@@ -37,22 +37,16 @@ func (broker *MessageQueueBroker) LookupTopicBrokers(ctx context.Context, reques
ret := &mq_pb.LookupTopicBrokersResponse{}
ret.Topic = request.Topic
- ret.BrokerPartitionAssignments, err = broker.Balancer.LookupOrAllocateTopicPartitions(ret.Topic, request.IsForPublish, 6)
+ ret.BrokerPartitionAssignments, err = b.Balancer.LookupOrAllocateTopicPartitions(ret.Topic, request.IsForPublish, 6)
return ret, err
}
-// CheckTopicPartitionsStatus check the topic partitions on the broker
-func (broker *MessageQueueBroker) CheckTopicPartitionsStatus(c context.Context, request *mq_pb.CheckTopicPartitionsStatusRequest) (*mq_pb.CheckTopicPartitionsStatusResponse, error) {
- ret := &mq_pb.CheckTopicPartitionsStatusResponse{}
- return ret, nil
-}
-
-func (broker *MessageQueueBroker) ListTopics(ctx context.Context, request *mq_pb.ListTopicsRequest) (resp *mq_pb.ListTopicsResponse, err error) {
- if broker.currentBalancer == "" {
+func (b *MessageQueueBroker) ListTopics(ctx context.Context, request *mq_pb.ListTopicsRequest) (resp *mq_pb.ListTopicsResponse, err error) {
+ if b.currentBalancer == "" {
return nil, status.Errorf(codes.Unavailable, "no balancer")
}
- if !broker.lockAsBalancer.IsLocked() {
- proxyErr := broker.withBrokerClient(false, broker.currentBalancer, func(client mq_pb.SeaweedMessagingClient) error {
+ if !b.lockAsBalancer.IsLocked() {
+ proxyErr := b.withBrokerClient(false, b.currentBalancer, func(client mq_pb.SeaweedMessagingClient) error {
resp, err = client.ListTopics(ctx, request)
return nil
})
@@ -64,9 +58,9 @@ func (broker *MessageQueueBroker) ListTopics(ctx context.Context, request *mq_pb
ret := &mq_pb.ListTopicsResponse{}
knownTopics := make(map[string]struct{})
- for brokerStatsItem := range broker.Balancer.Brokers.IterBuffered() {
+ for brokerStatsItem := range b.Balancer.Brokers.IterBuffered() {
_, brokerStats := brokerStatsItem.Key, brokerStatsItem.Val
- for topicPartitionStatsItem := range brokerStats.Stats.IterBuffered() {
+ for topicPartitionStatsItem := range brokerStats.TopicPartitionStats.IterBuffered() {
topicPartitionStat := topicPartitionStatsItem.Val
topic := &mq_pb.Topic{
Namespace: topicPartitionStat.TopicPartition.Namespace,
diff --git a/weed/mq/broker/broker_grpc_pub.go b/weed/mq/broker/broker_grpc_pub.go
index acbffefba..43280e9be 100644
--- a/weed/mq/broker/broker_grpc_pub.go
+++ b/weed/mq/broker/broker_grpc_pub.go
@@ -5,71 +5,36 @@ import (
"fmt"
"github.com/seaweedfs/seaweedfs/weed/glog"
"github.com/seaweedfs/seaweedfs/weed/mq/topic"
- "github.com/seaweedfs/seaweedfs/weed/pb"
"github.com/seaweedfs/seaweedfs/weed/pb/mq_pb"
+ "google.golang.org/grpc/peer"
+ "math/rand"
+ "net"
"sync/atomic"
"time"
)
-// For a new or re-configured topic, or one of the broker went offline,
-// the pub clients ask one broker what are the brokers for all the topic partitions.
-// The broker will lock the topic on write.
-// 1. if the topic is not found, create the topic, and allocate the topic partitions to the brokers
-// 2. if the topic is found, return the brokers for the topic partitions
-// For a topic to read from, the sub clients ask one broker what are the brokers for all the topic partitions.
-// The broker will lock the topic on read.
-// 1. if the topic is not found, return error
-// 2. if the topic is found, return the brokers for the topic partitions
-//
-// If the topic needs to be re-balanced, the admin client will lock the topic,
-// 1. collect throughput information for all the brokers
-// 2. adjust the topic partitions to the brokers
-// 3. notify the brokers to add/remove partitions to host
-// 3.1 When locking the topic, the partitions and brokers should be remembered in the lock.
-// 4. the brokers will stop process incoming messages if not the right partition
-// 4.1 the pub clients will need to re-partition the messages and publish to the right brokers for the partition3
-// 4.2 the sub clients will need to change the brokers to read from
-//
-// The following is from each individual component's perspective:
-// For a pub client
-// For current topic/partition, ask one broker for the brokers for the topic partitions
-// 1. connect to the brokers and keep sending, until the broker returns error, or the broker leader is moved.
-// For a sub client
-// For current topic/partition, ask one broker for the brokers for the topic partitions
-// 1. connect to the brokers and keep reading, until the broker returns error, or the broker leader is moved.
-// For a broker
-// Upon a pub client lookup:
-// 1. lock the topic
-// 2. if already has topic partition assignment, check all brokers are healthy
-// 3. if not, create topic partition assignment
-// 2. return the brokers for the topic partitions
-// 3. unlock the topic
-// Upon a sub client lookup:
-// 1. lock the topic
-// 2. if already has topic partition assignment, check all brokers are healthy
-// 3. if not, return error
-// 2. return the brokers for the topic partitions
-// 3. unlock the topic
-// For an admin tool
-// 0. collect stats from all the brokers, and find the topic worth moving
-// 1. lock the topic
-// 2. collect throughput information for all the brokers
-// 3. adjust the topic partitions to the brokers
-// 4. notify the brokers to add/remove partitions to host
-// 5. the brokers will stop process incoming messages if not the right partition
-// 6. unlock the topic
+// PUB
+// 1. gRPC API to configure a topic
+// 1.1 create a topic with existing partition count
+// 1.2 assign partitions to brokers
+// 2. gRPC API to lookup topic partitions
+// 3. gRPC API to publish by topic partitions
-/*
-The messages are buffered in memory, and saved to filer under
- /topics/<topic>/<date>/<hour>/<segment>/*.msg
- /topics/<topic>/<date>/<hour>/segment
- /topics/<topic>/info/segment_<id>.meta
+// SUB
+// 1. gRPC API to lookup a topic partitions
+// Re-balance topic partitions for publishing
+// 1. collect stats from all the brokers
+// 2. Rebalance and configure new generation of partitions on brokers
+// 3. Tell brokers to close current gneration of publishing.
+// Publishers needs to lookup again and publish to the new generation of partitions.
+// Re-balance topic partitions for subscribing
+// 1. collect stats from all the brokers
+// Subscribers needs to listen for new partitions and connect to the brokers.
+// Each subscription may not get data. It can act as a backup.
-*/
-
-func (broker *MessageQueueBroker) Publish(stream mq_pb.SeaweedMessaging_PublishServer) error {
+func (b *MessageQueueBroker) Publish(stream mq_pb.SeaweedMessaging_PublishServer) error {
// 1. write to the volume server
// 2. find the topic metadata owning filer
// 3. write to the filer
@@ -85,19 +50,23 @@ func (broker *MessageQueueBroker) Publish(stream mq_pb.SeaweedMessaging_PublishS
initMessage := req.GetInit()
if initMessage != nil {
t, p := topic.FromPbTopic(initMessage.Topic), topic.FromPbPartition(initMessage.Partition)
- localTopicPartition = broker.localTopicManager.GetTopicPartition(t, p)
+ localTopicPartition = b.localTopicManager.GetTopicPartition(t, p)
if localTopicPartition == nil {
- localTopicPartition = topic.NewLocalPartition(t, p, true, nil)
- broker.localTopicManager.AddTopicPartition(t, localTopicPartition)
+ response.Error = fmt.Sprintf("topic %v partition %v not setup", initMessage.Topic, initMessage.Partition)
+ glog.Errorf("topic %v partition %v not setup", initMessage.Topic, initMessage.Partition)
+ return stream.Send(response)
}
ackInterval = int(initMessage.AckInterval)
stream.Send(response)
} else {
- response.Error = fmt.Sprintf("topic %v partition %v not found", initMessage.Topic, initMessage.Partition)
- glog.Errorf("topic %v partition %v not found", initMessage.Topic, initMessage.Partition)
+ response.Error = fmt.Sprintf("missing init message")
+ glog.Errorf("missing init message")
return stream.Send(response)
}
+ clientName := fmt.Sprintf("%v-%4d/%s/%v", findClientAddress(stream.Context()), rand.Intn(10000), initMessage.Topic, initMessage.Partition)
+ localTopicPartition.Publishers.AddPublisher(clientName, topic.NewLocalPublisher())
+
ackCounter := 0
var ackSequence int64
var isStopping int32
@@ -105,6 +74,7 @@ func (broker *MessageQueueBroker) Publish(stream mq_pb.SeaweedMessaging_PublishS
defer func() {
atomic.StoreInt32(&isStopping, 1)
close(respChan)
+ localTopicPartition.Publishers.RemovePublisher(clientName)
}()
go func() {
ticker := time.NewTicker(1 * time.Second)
@@ -127,6 +97,11 @@ func (broker *MessageQueueBroker) Publish(stream mq_pb.SeaweedMessaging_PublishS
} else {
return
}
+ case <-localTopicPartition.StopPublishersCh:
+ respChan <- &mq_pb.PublishResponse{
+ AckSequence: ackSequence,
+ ShouldClose: true,
+ }
}
}
}()
@@ -156,33 +131,22 @@ func (broker *MessageQueueBroker) Publish(stream mq_pb.SeaweedMessaging_PublishS
}
}
- glog.Infof("publish stream closed")
+ glog.V(0).Infof("topic %v partition %v publish stream closed.", initMessage.Topic, initMessage.Partition)
return nil
}
-// AssignTopicPartitions Runs on the assigned broker, to execute the topic partition assignment
-func (broker *MessageQueueBroker) AssignTopicPartitions(c context.Context, request *mq_pb.AssignTopicPartitionsRequest) (*mq_pb.AssignTopicPartitionsResponse, error) {
- ret := &mq_pb.AssignTopicPartitionsResponse{}
- self := pb.ServerAddress(fmt.Sprintf("%s:%d", broker.option.Ip, broker.option.Port))
-
- for _, brokerPartition := range request.BrokerPartitionAssignments {
- localPartiton := topic.FromPbBrokerPartitionAssignment(self, brokerPartition)
- broker.localTopicManager.AddTopicPartition(
- topic.FromPbTopic(request.Topic),
- localPartiton)
- if request.IsLeader {
- for _, follower := range localPartiton.FollowerBrokers {
- err := pb.WithBrokerGrpcClient(false, follower.String(), broker.grpcDialOption, func(client mq_pb.SeaweedMessagingClient) error {
- _, err := client.AssignTopicPartitions(context.Background(), request)
- return err
- })
- if err != nil {
- return ret, err
- }
- }
- }
+// duplicated from master_grpc_server.go
+func findClientAddress(ctx context.Context) string {
+ // fmt.Printf("FromContext %+v\n", ctx)
+ pr, ok := peer.FromContext(ctx)
+ if !ok {
+ glog.Error("failed to get peer from ctx")
+ return ""
}
-
- return ret, nil
+ if pr.Addr == net.Addr(nil) {
+ glog.Error("failed to get peer address")
+ return ""
+ }
+ return pr.Addr.String()
}
diff --git a/weed/mq/broker/broker_grpc_balancer.go b/weed/mq/broker/broker_grpc_pub_balancer.go
index 5e23f89df..e3d49f816 100644
--- a/weed/mq/broker/broker_grpc_balancer.go
+++ b/weed/mq/broker/broker_grpc_pub_balancer.go
@@ -2,15 +2,15 @@ package broker
import (
"github.com/seaweedfs/seaweedfs/weed/glog"
- "github.com/seaweedfs/seaweedfs/weed/mq/balancer"
+ "github.com/seaweedfs/seaweedfs/weed/mq/pub_balancer"
"github.com/seaweedfs/seaweedfs/weed/pb/mq_pb"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
)
-// ConnectToBalancer receives connections from brokers and collects stats
-func (broker *MessageQueueBroker) ConnectToBalancer(stream mq_pb.SeaweedMessaging_ConnectToBalancerServer) error {
- if !broker.lockAsBalancer.IsLocked() {
+// PublisherToPubBalancer receives connections from brokers and collects stats
+func (b *MessageQueueBroker) PublisherToPubBalancer(stream mq_pb.SeaweedMessaging_PublisherToPubBalancerServer) error {
+ if !b.lockAsBalancer.IsLocked() {
return status.Errorf(codes.Unavailable, "not current broker balancer")
}
req, err := stream.Recv()
@@ -20,21 +20,14 @@ func (broker *MessageQueueBroker) ConnectToBalancer(stream mq_pb.SeaweedMessagin
// process init message
initMessage := req.GetInit()
- var brokerStats *balancer.BrokerStats
+ var brokerStats *pub_balancer.BrokerStats
if initMessage != nil {
- var found bool
- brokerStats, found = broker.Balancer.Brokers.Get(initMessage.Broker)
- if !found {
- brokerStats = balancer.NewBrokerStats()
- if !broker.Balancer.Brokers.SetIfAbsent(initMessage.Broker, brokerStats) {
- brokerStats, _ = broker.Balancer.Brokers.Get(initMessage.Broker)
- }
- }
+ brokerStats = b.Balancer.OnBrokerConnected(initMessage.Broker)
} else {
return status.Errorf(codes.InvalidArgument, "balancer init message is empty")
}
defer func() {
- broker.Balancer.Brokers.Remove(initMessage.Broker)
+ b.Balancer.OnBrokerDisconnected(initMessage.Broker, brokerStats)
}()
// process stats message
@@ -43,12 +36,11 @@ func (broker *MessageQueueBroker) ConnectToBalancer(stream mq_pb.SeaweedMessagin
if err != nil {
return err
}
- if !broker.lockAsBalancer.IsLocked() {
+ if !b.lockAsBalancer.IsLocked() {
return status.Errorf(codes.Unavailable, "not current broker balancer")
}
if receivedStats := req.GetStats(); receivedStats != nil {
- brokerStats.UpdateStats(receivedStats)
-
+ b.Balancer.OnBrokerStatsUpdated(initMessage.Broker, brokerStats, receivedStats)
glog.V(4).Infof("broker %s stats: %+v", initMessage.Broker, brokerStats)
glog.V(4).Infof("received stats: %+v", receivedStats)
}
diff --git a/weed/mq/broker/broker_grpc_sub.go b/weed/mq/broker/broker_grpc_sub.go
index 376ced03d..c98ce4684 100644
--- a/weed/mq/broker/broker_grpc_sub.go
+++ b/weed/mq/broker/broker_grpc_sub.go
@@ -1,6 +1,7 @@
package broker
import (
+ "context"
"fmt"
"github.com/seaweedfs/seaweedfs/weed/glog"
"github.com/seaweedfs/seaweedfs/weed/mq/topic"
@@ -9,10 +10,11 @@ import (
"time"
)
-func (broker *MessageQueueBroker) Subscribe(req *mq_pb.SubscribeRequest, stream mq_pb.SeaweedMessaging_SubscribeServer) error {
+func (b *MessageQueueBroker) Subscribe(req *mq_pb.SubscribeRequest, stream mq_pb.SeaweedMessaging_SubscribeServer) error {
- localTopicPartition := broker.localTopicManager.GetTopicPartition(topic.FromPbTopic(req.GetInit().Topic),
- topic.FromPbPartition(req.GetInit().Partition))
+ t := topic.FromPbTopic(req.GetInit().Topic)
+ partition := topic.FromPbPartition(req.GetInit().Partition)
+ localTopicPartition := b.localTopicManager.GetTopicPartition(t, partition)
if localTopicPartition == nil {
stream.Send(&mq_pb.SubscribeResponse{
Message: &mq_pb.SubscribeResponse_Ctrl{
@@ -25,13 +27,59 @@ func (broker *MessageQueueBroker) Subscribe(req *mq_pb.SubscribeRequest, stream
}
clientName := fmt.Sprintf("%s/%s-%s", req.GetInit().ConsumerGroup, req.GetInit().ConsumerId, req.GetInit().ClientId)
+ localTopicPartition.Subscribers.AddSubscriber(clientName, topic.NewLocalSubscriber())
+ glog.V(0).Infof("Subscriber %s connected on %v %v", clientName, t, partition)
+ isConnected := true
+ sleepIntervalCount := 0
+ defer func() {
+ isConnected = false
+ localTopicPartition.Subscribers.RemoveSubscriber(clientName)
+ glog.V(0).Infof("Subscriber %s on %v %v disconnected", clientName, t, partition)
+ }()
+
+ ctx := stream.Context()
+ var startTime time.Time
+ if startTs := req.GetInit().GetStartTimestampNs(); startTs > 0 {
+ startTime = time.Unix(0, startTs)
+ } else {
+ startTime = time.Now()
+ }
+
+ localTopicPartition.Subscribe(clientName, startTime, func() bool {
+ if !isConnected {
+ return false
+ }
+ sleepIntervalCount++
+ if sleepIntervalCount > 10 {
+ sleepIntervalCount = 10
+ }
+ time.Sleep(time.Duration(sleepIntervalCount) * 2339 * time.Millisecond)
+
+ // Check if the client has disconnected by monitoring the context
+ select {
+ case <-ctx.Done():
+ err := ctx.Err()
+ if err == context.Canceled {
+ // Client disconnected
+ return false
+ }
+ glog.V(0).Infof("Subscriber %s disconnected: %v", clientName, err)
+ return false
+ default:
+ // Continue processing the request
+ }
+
+ return true
+ }, func(logEntry *filer_pb.LogEntry) error {
+ // reset the sleep interval count
+ sleepIntervalCount = 0
- localTopicPartition.Subscribe(clientName, time.Now(), func(logEntry *filer_pb.LogEntry) error {
value := logEntry.GetData()
if err := stream.Send(&mq_pb.SubscribeResponse{Message: &mq_pb.SubscribeResponse_Data{
Data: &mq_pb.DataMessage{
Key: []byte(fmt.Sprintf("key-%d", logEntry.PartitionKeyHash)),
Value: value,
+ TsNs: logEntry.TsNs,
},
}}); err != nil {
glog.Errorf("Error sending setup response: %v", err)
diff --git a/weed/mq/broker/broker_grpc_sub_coordinator.go b/weed/mq/broker/broker_grpc_sub_coordinator.go
new file mode 100644
index 000000000..349db3178
--- /dev/null
+++ b/weed/mq/broker/broker_grpc_sub_coordinator.go
@@ -0,0 +1,77 @@
+package broker
+
+import (
+ "context"
+ "github.com/seaweedfs/seaweedfs/weed/glog"
+ "github.com/seaweedfs/seaweedfs/weed/mq/sub_coordinator"
+ "github.com/seaweedfs/seaweedfs/weed/pb/mq_pb"
+ "google.golang.org/grpc/codes"
+ "google.golang.org/grpc/status"
+)
+
+// SubscriberToSubCoordinator coordinates the subscribers
+func (b *MessageQueueBroker) SubscriberToSubCoordinator(stream mq_pb.SeaweedMessaging_SubscriberToSubCoordinatorServer) error {
+ if !b.lockAsBalancer.IsLocked() {
+ return status.Errorf(codes.Unavailable, "not current broker balancer")
+ }
+ req, err := stream.Recv()
+ if err != nil {
+ return err
+ }
+
+ var cgi *sub_coordinator.ConsumerGroupInstance
+ // process init message
+ initMessage := req.GetInit()
+ if initMessage != nil {
+ cgi = b.Coordinator.AddSubscriber(initMessage.ConsumerGroup, initMessage.ConsumerInstanceId, initMessage.Topic)
+ glog.V(0).Infof("subscriber %s/%s/%s connected", initMessage.ConsumerGroup, initMessage.ConsumerInstanceId, initMessage.Topic)
+ } else {
+ return status.Errorf(codes.InvalidArgument, "subscriber init message is empty")
+ }
+ defer func() {
+ b.Coordinator.RemoveSubscriber(initMessage.ConsumerGroup, initMessage.ConsumerInstanceId, initMessage.Topic)
+ glog.V(0).Infof("subscriber %s/%s/%s disconnected: %v", initMessage.ConsumerGroup, initMessage.ConsumerInstanceId, initMessage.Topic, err)
+ }()
+
+ ctx := stream.Context()
+
+ // process ack messages
+ go func() {
+ for {
+ _, err := stream.Recv()
+ if err != nil {
+ glog.V(0).Infof("subscriber %s/%s/%s receive: %v", initMessage.ConsumerGroup, initMessage.ConsumerInstanceId, initMessage.Topic, err)
+ }
+
+ select {
+ case <-ctx.Done():
+ err := ctx.Err()
+ if err == context.Canceled {
+ // Client disconnected
+ return
+ }
+ return
+ default:
+ // Continue processing the request
+ }
+ }
+ }()
+
+ // send commands to subscriber
+ for {
+ select {
+ case <-ctx.Done():
+ err := ctx.Err()
+ if err == context.Canceled {
+ // Client disconnected
+ return err
+ }
+ glog.V(0).Infof("subscriber %s/%s/%s disconnected: %v", initMessage.ConsumerGroup, initMessage.ConsumerInstanceId, initMessage.Topic, err)
+ return err
+ case message := <- cgi.ResponseChan:
+ if err := stream.Send(message); err != nil {
+ glog.V(0).Infof("subscriber %s/%s/%s send: %v", initMessage.ConsumerGroup, initMessage.ConsumerInstanceId, initMessage.Topic, err)
+ }
+ }
+ }
+}
diff --git a/weed/mq/broker/broker_grpc_topic_partition_control.go b/weed/mq/broker/broker_grpc_topic_partition_control.go
new file mode 100644
index 000000000..66547b010
--- /dev/null
+++ b/weed/mq/broker/broker_grpc_topic_partition_control.go
@@ -0,0 +1,28 @@
+package broker
+
+import (
+ "context"
+ "github.com/seaweedfs/seaweedfs/weed/mq/topic"
+ "github.com/seaweedfs/seaweedfs/weed/pb/mq_pb"
+)
+
+func (b *MessageQueueBroker) ClosePublishers(ctx context.Context, request *mq_pb.ClosePublishersRequest) (resp *mq_pb.ClosePublishersResponse, err error) {
+ resp = &mq_pb.ClosePublishersResponse{}
+
+ t := topic.FromPbTopic(request.Topic)
+
+ b.localTopicManager.ClosePublishers(t, request.UnixTimeNs)
+
+ // wait until all publishers are closed
+ b.localTopicManager.WaitUntilNoPublishers(t)
+
+ return
+}
+
+func (b *MessageQueueBroker) CloseSubscribers(ctx context.Context, request *mq_pb.CloseSubscribersRequest) (resp *mq_pb.CloseSubscribersResponse, err error) {
+ resp = &mq_pb.CloseSubscribersResponse{}
+
+ b.localTopicManager.CloseSubscribers(topic.FromPbTopic(request.Topic), request.UnixTimeNs)
+
+ return
+}
diff --git a/weed/mq/broker/broker_segment_serde.go b/weed/mq/broker/broker_segment_serde.go
deleted file mode 100644
index bb9aecc0b..000000000
--- a/weed/mq/broker/broker_segment_serde.go
+++ /dev/null
@@ -1,89 +0,0 @@
-package broker
-
-import (
- "bytes"
- "fmt"
- "github.com/seaweedfs/seaweedfs/weed/filer"
- "github.com/seaweedfs/seaweedfs/weed/mq/topic"
- "github.com/seaweedfs/seaweedfs/weed/pb"
- "github.com/seaweedfs/seaweedfs/weed/pb/filer_pb"
- "github.com/seaweedfs/seaweedfs/weed/pb/mq_pb"
- jsonpb "google.golang.org/protobuf/encoding/protojson"
- "time"
-)
-
-func (broker *MessageQueueBroker) checkSegmentOnFiler(segment *topic.Segment) (brokers []pb.ServerAddress, err error) {
- info, found, err := broker.readSegmentInfoOnFiler(segment)
- if err != nil {
- return
- }
- if !found {
- return
- }
- for _, b := range info.Brokers {
- brokers = append(brokers, pb.ServerAddress(b))
- }
-
- return
-}
-
-func (broker *MessageQueueBroker) saveSegmentBrokersOnFiler(segment *topic.Segment, brokers []pb.ServerAddress) (err error) {
- var nodes []string
- for _, b := range brokers {
- nodes = append(nodes, string(b))
- }
- broker.saveSegmentInfoToFiler(segment, &mq_pb.SegmentInfo{
- Segment: segment.ToPbSegment(),
- StartTsNs: time.Now().UnixNano(),
- Brokers: nodes,
- StopTsNs: 0,
- PreviousSegments: nil,
- NextSegments: nil,
- })
- return
-}
-
-func (broker *MessageQueueBroker) readSegmentInfoOnFiler(segment *topic.Segment) (info *mq_pb.SegmentInfo, found bool, err error) {
- dir, name := segment.DirAndName()
-
- found, err = filer_pb.Exists(broker, dir, name, false)
- if !found || err != nil {
- return
- }
-
- err = pb.WithFilerClient(false, 0, broker.GetFiler(), broker.grpcDialOption, func(client filer_pb.SeaweedFilerClient) error {
- // read filer conf first
- data, err := filer.ReadInsideFiler(client, dir, name)
- if err != nil {
- return fmt.Errorf("ReadEntry: %v", err)
- }
-
- // parse into filer conf object
- info = &mq_pb.SegmentInfo{}
- if err = jsonpb.Unmarshal(data, info); err != nil {
- return err
- }
- found = true
- return nil
- })
-
- return
-}
-
-func (broker *MessageQueueBroker) saveSegmentInfoToFiler(segment *topic.Segment, info *mq_pb.SegmentInfo) (err error) {
- dir, name := segment.DirAndName()
-
- var buf bytes.Buffer
- filer.ProtoToText(&buf, info)
-
- err = pb.WithFilerClient(false, 0, broker.GetFiler(), broker.grpcDialOption, func(client filer_pb.SeaweedFilerClient) error {
- // read filer conf first
- err := filer.SaveInsideFiler(client, dir, name, buf.Bytes())
- if err != nil {
- return fmt.Errorf("save segment info: %v", err)
- }
- return nil
- })
-
- return
-}
diff --git a/weed/mq/broker/broker_server.go b/weed/mq/broker/broker_server.go
index 94888e14c..52b34ddbc 100644
--- a/weed/mq/broker/broker_server.go
+++ b/weed/mq/broker/broker_server.go
@@ -3,7 +3,8 @@ package broker
import (
"fmt"
"github.com/seaweedfs/seaweedfs/weed/glog"
- "github.com/seaweedfs/seaweedfs/weed/mq/balancer"
+ "github.com/seaweedfs/seaweedfs/weed/mq/pub_balancer"
+ "github.com/seaweedfs/seaweedfs/weed/mq/sub_coordinator"
"github.com/seaweedfs/seaweedfs/weed/mq/topic"
"time"
@@ -37,20 +38,24 @@ type MessageQueueBroker struct {
filers map[pb.ServerAddress]struct{}
currentFiler pb.ServerAddress
localTopicManager *topic.LocalTopicManager
- Balancer *balancer.Balancer
+ Balancer *pub_balancer.Balancer
lockAsBalancer *cluster.LiveLock
currentBalancer pb.ServerAddress
+ Coordinator *sub_coordinator.Coordinator
}
func NewMessageBroker(option *MessageQueueBrokerOption, grpcDialOption grpc.DialOption) (mqBroker *MessageQueueBroker, err error) {
+ pub_broker_balancer := pub_balancer.NewBalancer()
+
mqBroker = &MessageQueueBroker{
option: option,
grpcDialOption: grpcDialOption,
MasterClient: wdclient.NewMasterClient(grpcDialOption, option.FilerGroup, cluster.BrokerType, pb.NewServerAddress(option.Ip, option.Port, 0), option.DataCenter, option.Rack, *pb.NewServiceDiscoveryFromMap(option.Masters)),
filers: make(map[pb.ServerAddress]struct{}),
localTopicManager: topic.NewLocalTopicManager(),
- Balancer: balancer.NewBalancer(),
+ Balancer: pub_broker_balancer,
+ Coordinator: sub_coordinator.NewCoordinator(pub_broker_balancer),
}
mqBroker.MasterClient.SetOnPeerUpdateFn(mqBroker.OnBrokerUpdate)
@@ -67,10 +72,10 @@ func NewMessageBroker(option *MessageQueueBrokerOption, grpcDialOption grpc.Dial
time.Sleep(time.Millisecond * 237)
}
self := fmt.Sprintf("%s:%d", option.Ip, option.Port)
- glog.V(1).Infof("broker %s found filer %s", self, mqBroker.currentFiler)
+ glog.V(0).Infof("broker %s found filer %s", self, mqBroker.currentFiler)
lockClient := cluster.NewLockClient(grpcDialOption, mqBroker.currentFiler)
- mqBroker.lockAsBalancer = lockClient.StartLock(balancer.LockBrokerBalancer, self)
+ mqBroker.lockAsBalancer = lockClient.StartLock(pub_balancer.LockBrokerBalancer, self)
for {
err := mqBroker.BrokerConnectToBalancer(self)
if err != nil {
@@ -83,22 +88,22 @@ func NewMessageBroker(option *MessageQueueBrokerOption, grpcDialOption grpc.Dial
return mqBroker, nil
}
-func (broker *MessageQueueBroker) OnBrokerUpdate(update *master_pb.ClusterNodeUpdate, startFrom time.Time) {
+func (b *MessageQueueBroker) OnBrokerUpdate(update *master_pb.ClusterNodeUpdate, startFrom time.Time) {
if update.NodeType != cluster.FilerType {
return
}
address := pb.ServerAddress(update.Address)
if update.IsAdd {
- broker.filers[address] = struct{}{}
- if broker.currentFiler == "" {
- broker.currentFiler = address
+ b.filers[address] = struct{}{}
+ if b.currentFiler == "" {
+ b.currentFiler = address
}
} else {
- delete(broker.filers, address)
- if broker.currentFiler == address {
- for filer := range broker.filers {
- broker.currentFiler = filer
+ delete(b.filers, address)
+ if b.currentFiler == address {
+ for filer := range b.filers {
+ b.currentFiler = filer
break
}
}
@@ -106,39 +111,39 @@ func (broker *MessageQueueBroker) OnBrokerUpdate(update *master_pb.ClusterNodeUp
}
-func (broker *MessageQueueBroker) GetFiler() pb.ServerAddress {
- return broker.currentFiler
+func (b *MessageQueueBroker) GetFiler() pb.ServerAddress {
+ return b.currentFiler
}
-func (broker *MessageQueueBroker) WithFilerClient(streamingMode bool, fn func(filer_pb.SeaweedFilerClient) error) error {
+func (b *MessageQueueBroker) WithFilerClient(streamingMode bool, fn func(filer_pb.SeaweedFilerClient) error) error {
- return pb.WithFilerClient(streamingMode, 0, broker.GetFiler(), broker.grpcDialOption, fn)
+ return pb.WithFilerClient(streamingMode, 0, b.GetFiler(), b.grpcDialOption, fn)
}
-func (broker *MessageQueueBroker) AdjustedUrl(location *filer_pb.Location) string {
+func (b *MessageQueueBroker) AdjustedUrl(location *filer_pb.Location) string {
return location.Url
}
-func (broker *MessageQueueBroker) GetDataCenter() string {
+func (b *MessageQueueBroker) GetDataCenter() string {
return ""
}
-func (broker *MessageQueueBroker) withMasterClient(streamingMode bool, master pb.ServerAddress, fn func(client master_pb.SeaweedClient) error) error {
+func (b *MessageQueueBroker) withMasterClient(streamingMode bool, master pb.ServerAddress, fn func(client master_pb.SeaweedClient) error) error {
- return pb.WithMasterClient(streamingMode, master, broker.grpcDialOption, false, func(client master_pb.SeaweedClient) error {
+ return pb.WithMasterClient(streamingMode, master, b.grpcDialOption, false, func(client master_pb.SeaweedClient) error {
return fn(client)
})
}
-func (broker *MessageQueueBroker) withBrokerClient(streamingMode bool, server pb.ServerAddress, fn func(client mq_pb.SeaweedMessagingClient) error) error {
+func (b *MessageQueueBroker) withBrokerClient(streamingMode bool, server pb.ServerAddress, fn func(client mq_pb.SeaweedMessagingClient) error) error {
- return pb.WithBrokerGrpcClient(streamingMode, server.String(), broker.grpcDialOption, func(client mq_pb.SeaweedMessagingClient) error {
+ return pb.WithBrokerGrpcClient(streamingMode, server.String(), b.grpcDialOption, func(client mq_pb.SeaweedMessagingClient) error {
return fn(client)
})
diff --git a/weed/mq/broker/broker_stats.go b/weed/mq/broker/broker_stats.go
index 0cc23837b..3cd217519 100644
--- a/weed/mq/broker/broker_stats.go
+++ b/weed/mq/broker/broker_stats.go
@@ -4,21 +4,22 @@ import (
"context"
"fmt"
"github.com/seaweedfs/seaweedfs/weed/glog"
- "github.com/seaweedfs/seaweedfs/weed/mq/balancer"
+ "github.com/seaweedfs/seaweedfs/weed/mq/pub_balancer"
"github.com/seaweedfs/seaweedfs/weed/pb"
"github.com/seaweedfs/seaweedfs/weed/pb/filer_pb"
"github.com/seaweedfs/seaweedfs/weed/pb/mq_pb"
+ "io"
"math/rand"
"time"
)
// BrokerConnectToBalancer connects to the broker balancer and sends stats
-func (broker *MessageQueueBroker) BrokerConnectToBalancer(self string) error {
+func (b *MessageQueueBroker) BrokerConnectToBalancer(self string) error {
// find the lock owner
var brokerBalancer string
- err := broker.WithFilerClient(false, func(client filer_pb.SeaweedFilerClient) error {
+ err := b.WithFilerClient(false, func(client filer_pb.SeaweedFilerClient) error {
resp, err := client.FindLockOwner(context.Background(), &filer_pb.FindLockOwnerRequest{
- Name: balancer.LockBrokerBalancer,
+ Name: pub_balancer.LockBrokerBalancer,
})
if err != nil {
return err
@@ -29,7 +30,7 @@ func (broker *MessageQueueBroker) BrokerConnectToBalancer(self string) error {
if err != nil {
return err
}
- broker.currentBalancer = pb.ServerAddress(brokerBalancer)
+ b.currentBalancer = pb.ServerAddress(brokerBalancer)
glog.V(0).Infof("broker %s found balancer %s", self, brokerBalancer)
if brokerBalancer == "" {
@@ -37,15 +38,15 @@ func (broker *MessageQueueBroker) BrokerConnectToBalancer(self string) error {
}
// connect to the lock owner
- err = pb.WithBrokerGrpcClient(false, brokerBalancer, broker.grpcDialOption, func(client mq_pb.SeaweedMessagingClient) error {
- stream, err := client.ConnectToBalancer(context.Background())
+ err = pb.WithBrokerGrpcClient(false, brokerBalancer, b.grpcDialOption, func(client mq_pb.SeaweedMessagingClient) error {
+ stream, err := client.PublisherToPubBalancer(context.Background())
if err != nil {
return fmt.Errorf("connect to balancer %v: %v", brokerBalancer, err)
}
defer stream.CloseSend()
- err = stream.Send(&mq_pb.ConnectToBalancerRequest{
- Message: &mq_pb.ConnectToBalancerRequest_Init{
- Init: &mq_pb.ConnectToBalancerRequest_InitMessage{
+ err = stream.Send(&mq_pb.PublisherToPubBalancerRequest{
+ Message: &mq_pb.PublisherToPubBalancerRequest_Init{
+ Init: &mq_pb.PublisherToPubBalancerRequest_InitMessage{
Broker: self,
},
},
@@ -55,13 +56,16 @@ func (broker *MessageQueueBroker) BrokerConnectToBalancer(self string) error {
}
for {
- stats := broker.localTopicManager.CollectStats(time.Second * 5)
- err = stream.Send(&mq_pb.ConnectToBalancerRequest{
- Message: &mq_pb.ConnectToBalancerRequest_Stats{
+ stats := b.localTopicManager.CollectStats(time.Second * 5)
+ err = stream.Send(&mq_pb.PublisherToPubBalancerRequest{
+ Message: &mq_pb.PublisherToPubBalancerRequest_Stats{
Stats: stats,
},
})
if err != nil {
+ if err == io.EOF {
+ return err
+ }
return fmt.Errorf("send stats message: %v", err)
}
glog.V(3).Infof("sent stats: %+v", stats)
diff --git a/weed/mq/client/cmd/weed_pub/publisher.go b/weed/mq/client/cmd/weed_pub/publisher.go
index 03674db3f..ee00be9f8 100644
--- a/weed/mq/client/cmd/weed_pub/publisher.go
+++ b/weed/mq/client/cmd/weed_pub/publisher.go
@@ -5,6 +5,7 @@ import (
"fmt"
"github.com/seaweedfs/seaweedfs/weed/mq/client/pub_client"
"log"
+ "strings"
"sync"
"time"
)
@@ -12,6 +13,10 @@ import (
var (
messageCount = flag.Int("n", 1000, "message count")
concurrency = flag.Int("c", 4, "concurrency count")
+
+ namespace = flag.String("ns", "test", "namespace")
+ topic = flag.String("topic", "test", "topic")
+ seedBrokers = flag.String("brokers", "localhost:17777", "seed brokers")
)
func doPublish(publisher *pub_client.TopicPublisher, id int) {
@@ -29,9 +34,12 @@ func doPublish(publisher *pub_client.TopicPublisher, id int) {
func main() {
flag.Parse()
- publisher := pub_client.NewTopicPublisher(
- "test", "test")
- if err := publisher.Connect("localhost:17777"); err != nil {
+ config := &pub_client.PublisherConfiguration{
+ CreateTopic: true,
+ }
+ publisher := pub_client.NewTopicPublisher(*namespace, *topic, config)
+ brokers := strings.Split(*seedBrokers, ",")
+ if err := publisher.Connect(brokers); err != nil {
fmt.Println(err)
return
}
diff --git a/weed/mq/client/cmd/weed_sub/subscriber.go b/weed/mq/client/cmd/weed_sub/subscriber.go
index 6cb18c574..d5bd8f12d 100644
--- a/weed/mq/client/cmd/weed_sub/subscriber.go
+++ b/weed/mq/client/cmd/weed_sub/subscriber.go
@@ -1,13 +1,23 @@
package main
import (
+ "flag"
"fmt"
"github.com/seaweedfs/seaweedfs/weed/mq/client/sub_client"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials/insecure"
+ "strings"
+ "time"
+)
+
+var (
+ namespace = flag.String("ns", "test", "namespace")
+ topic = flag.String("topic", "test", "topic")
+ seedBrokers = flag.String("brokers", "localhost:17777", "seed brokers")
)
func main() {
+ flag.Parse()
subscriberConfig := &sub_client.SubscriberConfiguration{
ClientId: "testSubscriber",
@@ -17,12 +27,14 @@ func main() {
}
contentConfig := &sub_client.ContentConfiguration{
- Namespace: "test",
- Topic: "test",
+ Namespace: *namespace,
+ Topic: *topic,
Filter: "",
+ StartTime: time.Now(),
}
- subscriber := sub_client.NewTopicSubscriber("localhost:17777", subscriberConfig, contentConfig)
+ brokers := strings.Split(*seedBrokers, ",")
+ subscriber := sub_client.NewTopicSubscriber(brokers, subscriberConfig, contentConfig)
subscriber.SetEachMessageFunc(func(key, value []byte) bool {
println(string(key), "=>", string(value))
diff --git a/weed/mq/client/pub_client/connect.go b/weed/mq/client/pub_client/connect.go
new file mode 100644
index 000000000..fc7ff4d77
--- /dev/null
+++ b/weed/mq/client/pub_client/connect.go
@@ -0,0 +1,73 @@
+package pub_client
+
+import (
+ "context"
+ "fmt"
+ "github.com/seaweedfs/seaweedfs/weed/pb"
+ "github.com/seaweedfs/seaweedfs/weed/pb/mq_pb"
+ "google.golang.org/grpc/codes"
+ "google.golang.org/grpc/status"
+ "log"
+)
+
+// broker => publish client
+// send init message
+// save the publishing client
+func (p *TopicPublisher) doConnect(partition *mq_pb.Partition, brokerAddress string) (publishClient *PublishClient, err error) {
+ log.Printf("connecting to %v for topic partition %+v", brokerAddress, partition)
+
+ grpcConnection, err := pb.GrpcDial(context.Background(), brokerAddress, true, p.grpcDialOption)
+ if err != nil {
+ return publishClient, fmt.Errorf("dial broker %s: %v", brokerAddress, err)
+ }
+ brokerClient := mq_pb.NewSeaweedMessagingClient(grpcConnection)
+ stream, err := brokerClient.Publish(context.Background())
+ if err != nil {
+ return publishClient, fmt.Errorf("create publish client: %v", err)
+ }
+ publishClient = &PublishClient{
+ SeaweedMessaging_PublishClient: stream,
+ Broker: brokerAddress,
+ }
+ if err = publishClient.Send(&mq_pb.PublishRequest{
+ Message: &mq_pb.PublishRequest_Init{
+ Init: &mq_pb.PublishRequest_InitMessage{
+ Topic: &mq_pb.Topic{
+ Namespace: p.namespace,
+ Name: p.topic,
+ },
+ Partition: &mq_pb.Partition{
+ RingSize: partition.RingSize,
+ RangeStart: partition.RangeStart,
+ RangeStop: partition.RangeStop,
+ },
+ AckInterval: 128,
+ },
+ },
+ }); err != nil {
+ return publishClient, fmt.Errorf("send init message: %v", err)
+ }
+ resp, err := stream.Recv()
+ if err != nil {
+ return publishClient, fmt.Errorf("recv init response: %v", err)
+ }
+ if resp.Error != "" {
+ return publishClient, fmt.Errorf("init response error: %v", resp.Error)
+ }
+
+ go func() {
+ for {
+ _, err := publishClient.Recv()
+ if err != nil {
+ e, ok := status.FromError(err)
+ if ok && e.Code() == codes.Unknown && e.Message() == "EOF" {
+ return
+ }
+ publishClient.Err = err
+ fmt.Printf("publish to %s error: %v\n", publishClient.Broker, err)
+ return
+ }
+ }
+ }()
+ return publishClient, nil
+}
diff --git a/weed/mq/client/pub_client/lookup.go b/weed/mq/client/pub_client/lookup.go
index 28cb29015..e55bfd256 100644
--- a/weed/mq/client/pub_client/lookup.go
+++ b/weed/mq/client/pub_client/lookup.go
@@ -5,11 +5,28 @@ import (
"fmt"
"github.com/seaweedfs/seaweedfs/weed/pb"
"github.com/seaweedfs/seaweedfs/weed/pb/mq_pb"
- "google.golang.org/grpc/codes"
- "google.golang.org/grpc/status"
)
-func (p *TopicPublisher) doLookup(brokerAddress string) error {
+func (p *TopicPublisher) doLookupAndConnect(brokerAddress string) error {
+ if p.config.CreateTopic {
+ err := pb.WithBrokerGrpcClient(true,
+ brokerAddress,
+ p.grpcDialOption,
+ func(client mq_pb.SeaweedMessagingClient) error {
+ _, err := client.ConfigureTopic(context.Background(), &mq_pb.ConfigureTopicRequest{
+ Topic: &mq_pb.Topic{
+ Namespace: p.namespace,
+ Name: p.topic,
+ },
+ PartitionCount: p.config.CreateTopicPartitionCount,
+ })
+ return err
+ })
+ if err != nil {
+ return fmt.Errorf("configure topic %s/%s: %v", p.namespace, p.topic, err)
+ }
+ }
+
err := pb.WithBrokerGrpcClient(true,
brokerAddress,
p.grpcDialOption,
@@ -22,21 +39,36 @@ func (p *TopicPublisher) doLookup(brokerAddress string) error {
},
IsForPublish: true,
})
+ if p.config.CreateTopic && err != nil {
+ _, err = client.ConfigureTopic(context.Background(), &mq_pb.ConfigureTopicRequest{
+ Topic: &mq_pb.Topic{
+ Namespace: p.namespace,
+ Name: p.topic,
+ },
+ PartitionCount: p.config.CreateTopicPartitionCount,
+ })
+ if err != nil {
+ return err
+ }
+ lookupResp, err = client.LookupTopicBrokers(context.Background(),
+ &mq_pb.LookupTopicBrokersRequest{
+ Topic: &mq_pb.Topic{
+ Namespace: p.namespace,
+ Name: p.topic,
+ },
+ IsForPublish: true,
+ })
+ }
if err != nil {
return err
}
+
for _, brokerPartitionAssignment := range lookupResp.BrokerPartitionAssignments {
// partition => publishClient
- publishClient, redirectTo, err := p.doConnect(brokerPartitionAssignment.Partition, brokerPartitionAssignment.LeaderBroker)
+ publishClient, err := p.doConnect(brokerPartitionAssignment.Partition, brokerPartitionAssignment.LeaderBroker)
if err != nil {
return err
}
- for redirectTo != "" {
- publishClient, redirectTo, err = p.doConnect(brokerPartitionAssignment.Partition, redirectTo)
- if err != nil {
- return err
- }
- }
p.partition2Broker.Insert(
brokerPartitionAssignment.Partition.RangeStart,
brokerPartitionAssignment.Partition.RangeStop,
@@ -50,67 +82,3 @@ func (p *TopicPublisher) doLookup(brokerAddress string) error {
}
return nil
}
-
-// broker => publish client
-// send init message
-// save the publishing client
-func (p *TopicPublisher) doConnect(partition *mq_pb.Partition, brokerAddress string) (publishClient *PublishClient, redirectTo string, err error) {
- grpcConnection, err := pb.GrpcDial(context.Background(), brokerAddress, true, p.grpcDialOption)
- if err != nil {
- return publishClient, redirectTo, fmt.Errorf("dial broker %s: %v", brokerAddress, err)
- }
- brokerClient := mq_pb.NewSeaweedMessagingClient(grpcConnection)
- stream, err := brokerClient.Publish(context.Background())
- if err != nil {
- return publishClient, redirectTo, fmt.Errorf("create publish client: %v", err)
- }
- publishClient = &PublishClient{
- SeaweedMessaging_PublishClient: stream,
- Broker: brokerAddress,
- }
- if err = publishClient.Send(&mq_pb.PublishRequest{
- Message: &mq_pb.PublishRequest_Init{
- Init: &mq_pb.PublishRequest_InitMessage{
- Topic: &mq_pb.Topic{
- Namespace: p.namespace,
- Name: p.topic,
- },
- Partition: &mq_pb.Partition{
- RingSize: partition.RingSize,
- RangeStart: partition.RangeStart,
- RangeStop: partition.RangeStop,
- },
- AckInterval: 128,
- },
- },
- }); err != nil {
- return publishClient, redirectTo, fmt.Errorf("send init message: %v", err)
- }
- resp, err := stream.Recv()
- if err != nil {
- return publishClient, redirectTo, fmt.Errorf("recv init response: %v", err)
- }
- if resp.Error != "" {
- return publishClient, redirectTo, fmt.Errorf("init response error: %v", resp.Error)
- }
- if resp.RedirectToBroker != "" {
- redirectTo = resp.RedirectToBroker
- return publishClient, redirectTo, nil
- }
-
- go func() {
- for {
- _, err := publishClient.Recv()
- if err != nil {
- e, ok := status.FromError(err)
- if ok && e.Code() == codes.Unknown && e.Message() == "EOF" {
- return
- }
- publishClient.Err = err
- fmt.Printf("publish to %s error: %v\n", publishClient.Broker, err)
- return
- }
- }
- }()
- return publishClient, redirectTo, nil
-}
diff --git a/weed/mq/client/pub_client/publish.go b/weed/mq/client/pub_client/publish.go
index 9495e380c..1e250ede3 100644
--- a/weed/mq/client/pub_client/publish.go
+++ b/weed/mq/client/pub_client/publish.go
@@ -2,13 +2,13 @@ package pub_client
import (
"fmt"
- "github.com/seaweedfs/seaweedfs/weed/mq/balancer"
+ "github.com/seaweedfs/seaweedfs/weed/mq/pub_balancer"
"github.com/seaweedfs/seaweedfs/weed/pb/mq_pb"
"github.com/seaweedfs/seaweedfs/weed/util"
)
func (p *TopicPublisher) Publish(key, value []byte) error {
- hashKey := util.HashToInt32(key) % balancer.MaxPartitionCount
+ hashKey := util.HashToInt32(key) % pub_balancer.MaxPartitionCount
if hashKey < 0 {
hashKey = -hashKey
}
diff --git a/weed/mq/client/pub_client/publisher.go b/weed/mq/client/pub_client/publisher.go
index bf1711e38..a0c26db36 100644
--- a/weed/mq/client/pub_client/publisher.go
+++ b/weed/mq/client/pub_client/publisher.go
@@ -1,8 +1,9 @@
package pub_client
import (
+ "fmt"
"github.com/rdleal/intervalst/interval"
- "github.com/seaweedfs/seaweedfs/weed/mq/balancer"
+ "github.com/seaweedfs/seaweedfs/weed/mq/pub_balancer"
"github.com/seaweedfs/seaweedfs/weed/pb/mq_pb"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials/insecure"
@@ -11,6 +12,8 @@ import (
)
type PublisherConfiguration struct {
+ CreateTopic bool
+ CreateTopicPartitionCount int32
}
type PublishClient struct {
@@ -24,9 +27,10 @@ type TopicPublisher struct {
partition2Broker *interval.SearchTree[*PublishClient, int32]
grpcDialOption grpc.DialOption
sync.Mutex // protects grpc
+ config *PublisherConfiguration
}
-func NewTopicPublisher(namespace, topic string) *TopicPublisher {
+func NewTopicPublisher(namespace, topic string, config *PublisherConfiguration) *TopicPublisher {
return &TopicPublisher{
namespace: namespace,
topic: topic,
@@ -34,19 +38,27 @@ func NewTopicPublisher(namespace, topic string) *TopicPublisher {
return int(a - b)
}),
grpcDialOption: grpc.WithTransportCredentials(insecure.NewCredentials()),
+ config: config,
}
}
-func (p *TopicPublisher) Connect(bootstrapBroker string) error {
- if err := p.doLookup(bootstrapBroker); err != nil {
- return err
+func (p *TopicPublisher) Connect(bootstrapBrokers []string) (err error) {
+ if len(bootstrapBrokers) == 0 {
+ return nil
}
- return nil
+ for _, b := range bootstrapBrokers {
+ err = p.doLookupAndConnect(b)
+ if err == nil {
+ return nil
+ }
+ fmt.Printf("failed to connect to %s: %v\n\n", b, err)
+ }
+ return err
}
func (p *TopicPublisher) Shutdown() error {
- if clients, found := p.partition2Broker.AllIntersections(0, balancer.MaxPartitionCount); found {
+ if clients, found := p.partition2Broker.AllIntersections(0, pub_balancer.MaxPartitionCount); found {
for _, client := range clients {
client.CloseSend()
}
diff --git a/weed/mq/client/sub_client/process.go b/weed/mq/client/sub_client/process.go
index 7717a101f..b6bdb14ee 100644
--- a/weed/mq/client/sub_client/process.go
+++ b/weed/mq/client/sub_client/process.go
@@ -32,6 +32,9 @@ func (sub *TopicSubscriber) doProcess() error {
RangeStop: brokerPartitionAssignment.Partition.RangeStop,
},
Filter: sub.ContentConfig.Filter,
+ Offset: &mq_pb.SubscribeRequest_InitMessage_StartTimestampNs{
+ StartTimestampNs: sub.alreadyProcessedTsNs,
+ },
},
},
})
@@ -68,6 +71,7 @@ func (sub *TopicSubscriber) doProcess() error {
if !sub.OnEachMessageFunc(m.Data.Key, m.Data.Value) {
return
}
+ sub.alreadyProcessedTsNs = m.Data.TsNs
case *mq_pb.SubscribeResponse_Ctrl:
if m.Ctrl.IsEndOfStream || m.Ctrl.IsEndOfTopic {
return
diff --git a/weed/mq/client/sub_client/subscribe.go b/weed/mq/client/sub_client/subscribe.go
index 0803b2c79..370f5aa3c 100644
--- a/weed/mq/client/sub_client/subscribe.go
+++ b/weed/mq/client/sub_client/subscribe.go
@@ -4,17 +4,30 @@ import (
"fmt"
"github.com/seaweedfs/seaweedfs/weed/util"
"io"
+ "log"
+ "time"
)
// Subscribe subscribes to a topic's specified partitions.
// If a partition is moved to another broker, the subscriber will automatically reconnect to the new broker.
func (sub *TopicSubscriber) Subscribe() error {
+ index := -1
util.RetryUntil("subscribe", func() error {
+ index++
+ index = index % len(sub.bootstrapBrokers)
// ask balancer for brokers of the topic
- if err := sub.doLookup(sub.bootstrapBroker); err != nil {
+ if err := sub.doLookup(sub.bootstrapBrokers[index]); err != nil {
return fmt.Errorf("lookup topic %s/%s: %v", sub.ContentConfig.Namespace, sub.ContentConfig.Topic, err)
}
+ if len(sub.brokerPartitionAssignments) == 0 {
+ if sub.waitForMoreMessage {
+ time.Sleep(1 * time.Second)
+ return fmt.Errorf("no broker partition assignments")
+ } else {
+ return nil
+ }
+ }
// treat the first broker as the topic leader
// connect to the leader broker
@@ -25,6 +38,8 @@ func (sub *TopicSubscriber) Subscribe() error {
return nil
}, func(err error) bool {
if err == io.EOF {
+ log.Printf("subscriber %s/%s: %v", sub.ContentConfig.Namespace, sub.ContentConfig.Topic, err)
+ sub.waitForMoreMessage = false
return false
}
return true
diff --git a/weed/mq/client/sub_client/subscriber.go b/weed/mq/client/sub_client/subscriber.go
index 809673de1..9b96b14cb 100644
--- a/weed/mq/client/sub_client/subscriber.go
+++ b/weed/mq/client/sub_client/subscriber.go
@@ -3,6 +3,7 @@ package sub_client
import (
"github.com/seaweedfs/seaweedfs/weed/pb/mq_pb"
"google.golang.org/grpc"
+ "time"
)
type SubscriberConfiguration struct {
@@ -19,6 +20,7 @@ type ContentConfiguration struct {
Namespace string
Topic string
Filter string
+ StartTime time.Time
}
type OnEachMessageFunc func(key, value []byte) (shouldContinue bool)
@@ -30,14 +32,18 @@ type TopicSubscriber struct {
brokerPartitionAssignments []*mq_pb.BrokerPartitionAssignment
OnEachMessageFunc OnEachMessageFunc
OnCompletionFunc OnCompletionFunc
- bootstrapBroker string
+ bootstrapBrokers []string
+ waitForMoreMessage bool
+ alreadyProcessedTsNs int64
}
-func NewTopicSubscriber(bootstrapBroker string, subscriber *SubscriberConfiguration, content *ContentConfiguration) *TopicSubscriber {
+func NewTopicSubscriber(bootstrapBrokers []string, subscriber *SubscriberConfiguration, content *ContentConfiguration) *TopicSubscriber {
return &TopicSubscriber{
- SubscriberConfig: subscriber,
- ContentConfig: content,
- bootstrapBroker: bootstrapBroker,
+ SubscriberConfig: subscriber,
+ ContentConfig: content,
+ bootstrapBrokers: bootstrapBrokers,
+ waitForMoreMessage: true,
+ alreadyProcessedTsNs: content.StartTime.UnixNano(),
}
}
@@ -45,6 +51,6 @@ func (sub *TopicSubscriber) SetEachMessageFunc(onEachMessageFn OnEachMessageFunc
sub.OnEachMessageFunc = onEachMessageFn
}
-func (sub *TopicSubscriber) SetCompletionFunc(onCompeletionFn OnCompletionFunc) {
- sub.OnCompletionFunc = onCompeletionFn
+func (sub *TopicSubscriber) SetCompletionFunc(onCompletionFn OnCompletionFunc) {
+ sub.OnCompletionFunc = onCompletionFn
}
diff --git a/weed/mq/coordinator/consumer_group.go b/weed/mq/coordinator/consumer_group.go
deleted file mode 100644
index e3dec493c..000000000
--- a/weed/mq/coordinator/consumer_group.go
+++ /dev/null
@@ -1,92 +0,0 @@
-package coordinator
-
-import (
- "github.com/seaweedfs/seaweedfs/weed/mq/topic"
- "sync"
-)
-
-func (cg *ConsumerGroup) SetMinMaxActiveInstances(min, max int32) {
- cg.MinimumActiveInstances = min
- cg.MaximumActiveInstances = max
-}
-
-func (cg *ConsumerGroup) AddConsumerGroupInstance(clientId string) *ConsumerGroupInstance {
- cgi := &ConsumerGroupInstance{
- ClientId: clientId,
- }
- cg.ConsumerGroupInstances.Set(clientId, cgi)
- return cgi
-}
-
-func (cg *ConsumerGroup) RemoveConsumerGroupInstance(clientId string) {
- cg.ConsumerGroupInstances.Remove(clientId)
-}
-
-func (cg *ConsumerGroup) CoordinateIfNeeded() {
- emptyInstanceCount, activeInstanceCount := int32(0), int32(0)
- for cgi := range cg.ConsumerGroupInstances.IterBuffered() {
- if cgi.Val.Partition == nil {
- // this consumer group instance is not assigned a partition
- // need to assign one
- emptyInstanceCount++
- } else {
- activeInstanceCount++
- }
- }
-
- var delta int32
- if emptyInstanceCount > 0 {
- if cg.MinimumActiveInstances <= 0 {
- // need to assign more partitions
- delta = emptyInstanceCount
- } else if activeInstanceCount < cg.MinimumActiveInstances && activeInstanceCount+emptyInstanceCount >= cg.MinimumActiveInstances {
- // need to assign more partitions
- delta = cg.MinimumActiveInstances - activeInstanceCount
- }
- }
-
- if cg.MaximumActiveInstances > 0 {
- if activeInstanceCount > cg.MaximumActiveInstances {
- // need to remove some partitions
- delta = cg.MaximumActiveInstances - activeInstanceCount
- }
- }
- if delta == 0 {
- return
- }
- cg.doCoordinate(activeInstanceCount + delta)
-}
-
-func (cg *ConsumerGroup) doCoordinate(target int32) {
- // stop existing instances from processing
- var wg sync.WaitGroup
- for cgi := range cg.ConsumerGroupInstances.IterBuffered() {
- if cgi.Val.Partition != nil {
- wg.Add(1)
- go func(cgi *ConsumerGroupInstance) {
- defer wg.Done()
- // stop processing
- // flush internal state
- // wait for all messages to be processed
- // close the connection
- }(cgi.Val)
- }
- }
- wg.Wait()
-
- partitions := topic.SplitPartitions(target)
-
- // assign partitions to new instances
- i := 0
- for cgi := range cg.ConsumerGroupInstances.IterBuffered() {
- cgi.Val.Partition = partitions[i]
- i++
- wg.Add(1)
- go func(cgi *ConsumerGroupInstance) {
- defer wg.Done()
- // start processing
- // start consuming from the last offset
- }(cgi.Val)
- }
- wg.Wait()
-}
diff --git a/weed/mq/coordinator/coordinator.go b/weed/mq/coordinator/coordinator.go
deleted file mode 100644
index e94ac3371..000000000
--- a/weed/mq/coordinator/coordinator.go
+++ /dev/null
@@ -1,36 +0,0 @@
-package coordinator
-
-import (
- cmap "github.com/orcaman/concurrent-map/v2"
- "github.com/seaweedfs/seaweedfs/weed/mq/topic"
-)
-
-type ConsumerGroupInstance struct {
- ClientId string
- // the consumer group instance may not have an active partition
- Partition *topic.Partition
- // processed message count
- ProcessedMessageCount int64
-}
-type ConsumerGroup struct {
- // map a client id to a consumer group instance
- ConsumerGroupInstances cmap.ConcurrentMap[string, *ConsumerGroupInstance]
- MinimumActiveInstances int32
- MaximumActiveInstances int32
-}
-type TopicConsumerGroups struct {
- // map a consumer group name to a consumer group
- ConsumerGroups cmap.ConcurrentMap[string, *ConsumerGroup]
-}
-
-// Coordinator coordinates the instances in the consumer group for one topic.
-// It is responsible for:
-// 1. Assigning partitions to consumer instances.
-// 2. Reassigning partitions when a consumer instance is down.
-// 3. Reassigning partitions when a consumer instance is up.
-type Coordinator struct {
- // map client id to subscriber
- Subscribers cmap.ConcurrentMap[string, *ConsumerGroupInstance]
- // map topic name to consumer groups
- TopicSubscribers cmap.ConcurrentMap[string, map[string]TopicConsumerGroups]
-}
diff --git a/weed/mq/balancer/allocate.go b/weed/mq/pub_balancer/allocate.go
index 96a7bcb81..9b2113162 100644
--- a/weed/mq/balancer/allocate.go
+++ b/weed/mq/pub_balancer/allocate.go
@@ -1,7 +1,8 @@
-package balancer
+package pub_balancer
import (
cmap "github.com/orcaman/concurrent-map/v2"
+ "github.com/seaweedfs/seaweedfs/weed/glog"
"github.com/seaweedfs/seaweedfs/weed/pb/mq_pb"
"math/rand"
)
@@ -30,6 +31,7 @@ func allocateTopicPartitions(brokers cmap.ConcurrentMap[string, *BrokerStats], p
for i, assignment := range assignments {
assignment.LeaderBroker = pickedBrokers[i]
}
+ glog.V(0).Infof("allocate topic partitions %d: %v", len(assignments), assignments)
return
}
diff --git a/weed/mq/balancer/allocate_test.go b/weed/mq/pub_balancer/allocate_test.go
index f5f9d49e3..298b9ebc1 100644
--- a/weed/mq/balancer/allocate_test.go
+++ b/weed/mq/pub_balancer/allocate_test.go
@@ -1,4 +1,4 @@
-package balancer
+package pub_balancer
import (
cmap "github.com/orcaman/concurrent-map/v2"
diff --git a/weed/mq/pub_balancer/balance.go b/weed/mq/pub_balancer/balance.go
new file mode 100644
index 000000000..87fc5739b
--- /dev/null
+++ b/weed/mq/pub_balancer/balance.go
@@ -0,0 +1,73 @@
+package pub_balancer
+
+import (
+ "github.com/seaweedfs/seaweedfs/weed/mq/topic"
+ "google.golang.org/grpc"
+)
+
+/*
+* Assuming a topic has [x,y] number of partitions when publishing, and there are b number of brokers.
+* and p is the number of partitions per topic.
+* if the broker number b <= x, then p = x.
+* if the broker number x < b < y, then x <= p <= b.
+* if the broker number b >= y, x <= p <= y
+
+Balance topic partitions to brokers
+===================================
+
+When the goal is to make sure that low traffic partitions can be merged, (and p >= x, and after last rebalance interval):
+1. Calculate the average load(throughput) of partitions per topic.
+2. If any two neighboring partitions have a load that is less than the average load, merge them.
+3. If min(b, y) < p, then merge two neighboring partitions that have the least combined load.
+
+When the goal is to make sure that high traffic partitions can be split, (and p < y and p < b, and after last rebalance interval):
+1. Calculate the average number of partitions per broker.
+2. If any partition has a load that is more than the average load, split it into two partitions.
+
+When the goal is to make sure that each broker has the same number of partitions:
+1. Calculate the average number of partitions per broker.
+2. For the brokers that have more than the average number of partitions, move the partitions to the brokers that have less than the average number of partitions.
+
+*/
+
+type BalanceAction interface {
+}
+type BalanceActionMerge struct {
+ Before []topic.TopicPartition
+ After topic.TopicPartition
+}
+type BalanceActionSplit struct {
+ Before topic.TopicPartition
+ After []topic.TopicPartition
+}
+
+type BalanceActionMove struct {
+ TopicPartition topic.TopicPartition
+ SourceBroker string
+ TargetBroker string
+}
+
+type BalanceActionCreate struct {
+ TopicPartition topic.TopicPartition
+ TargetBroker string
+}
+
+// BalancePublishers check the stats of all brokers,
+// and balance the publishers to the brokers.
+func (balancer *Balancer) BalancePublishers() []BalanceAction {
+ action := BalanceTopicPartitionOnBrokers(balancer.Brokers)
+ return []BalanceAction{action}
+}
+
+func (balancer *Balancer) ExecuteBalanceAction(actions []BalanceAction, grpcDialOption grpc.DialOption) (err error) {
+ for _, action := range actions {
+ switch action.(type) {
+ case *BalanceActionMove:
+ err = balancer.ExecuteBalanceActionMove(action.(*BalanceActionMove), grpcDialOption)
+ }
+ if err != nil {
+ return err
+ }
+ }
+ return nil
+}
diff --git a/weed/mq/pub_balancer/balance_action.go b/weed/mq/pub_balancer/balance_action.go
new file mode 100644
index 000000000..c29ec3469
--- /dev/null
+++ b/weed/mq/pub_balancer/balance_action.go
@@ -0,0 +1,58 @@
+package pub_balancer
+
+import (
+ "context"
+ "fmt"
+ "github.com/seaweedfs/seaweedfs/weed/pb"
+ "github.com/seaweedfs/seaweedfs/weed/pb/mq_pb"
+ "google.golang.org/grpc"
+)
+
+// Balancer <= PublisherToPubBalancer() <= Broker <=> Publish()
+// ExecuteBalanceActionMove from Balancer => AssignTopicPartitions() => Broker => Publish()
+
+func (balancer *Balancer) ExecuteBalanceActionMove(move *BalanceActionMove, grpcDialOption grpc.DialOption) error {
+ if _, found := balancer.Brokers.Get(move.SourceBroker); !found {
+ return fmt.Errorf("source broker %s not found", move.SourceBroker)
+ }
+ if _, found := balancer.Brokers.Get(move.TargetBroker); !found {
+ return fmt.Errorf("target broker %s not found", move.TargetBroker)
+ }
+
+ err := pb.WithBrokerGrpcClient(false, move.TargetBroker, grpcDialOption, func(client mq_pb.SeaweedMessagingClient) error {
+ _, err := client.AssignTopicPartitions(context.Background(), &mq_pb.AssignTopicPartitionsRequest{
+ Topic: move.TopicPartition.Topic.ToPbTopic(),
+ BrokerPartitionAssignments: []*mq_pb.BrokerPartitionAssignment{
+ {
+ Partition: move.TopicPartition.ToPbPartition(),
+ },
+ },
+ IsLeader: true,
+ IsDraining: false,
+ })
+ return err
+ })
+ if err != nil {
+ return fmt.Errorf("assign topic partition %v to %s: %v", move.TopicPartition, move.TargetBroker, err)
+ }
+
+ err = pb.WithBrokerGrpcClient(false, move.SourceBroker, grpcDialOption, func(client mq_pb.SeaweedMessagingClient) error {
+ _, err := client.AssignTopicPartitions(context.Background(), &mq_pb.AssignTopicPartitionsRequest{
+ Topic: move.TopicPartition.Topic.ToPbTopic(),
+ BrokerPartitionAssignments: []*mq_pb.BrokerPartitionAssignment{
+ {
+ Partition: move.TopicPartition.ToPbPartition(),
+ },
+ },
+ IsLeader: true,
+ IsDraining: true,
+ })
+ return err
+ })
+ if err != nil {
+ return fmt.Errorf("assign topic partition %v to %s: %v", move.TopicPartition, move.SourceBroker, err)
+ }
+
+ return nil
+
+}
diff --git a/weed/mq/pub_balancer/balance_action_split.go b/weed/mq/pub_balancer/balance_action_split.go
new file mode 100644
index 000000000..6d317ffb9
--- /dev/null
+++ b/weed/mq/pub_balancer/balance_action_split.go
@@ -0,0 +1,43 @@
+package pub_balancer
+
+/*
+Sequence of operations to ensure ordering
+
+Assuming Publisher P10 is publishing to Topic Partition TP10, and Subscriber S10 is subscribing to Topic TP10.
+After splitting Topic TP10 into Topic Partition TP11 and Topic Partition TP21,
+Publisher P11 is publishing to Topic Partition TP11, and Publisher P21 is publishing to Topic Partition TP21.
+Subscriber S12 is subscribing to Topic Partition TP11, and Subscriber S21 is subscribing to Topic Partition TP21.
+
+(The last digit is ephoch generation number, which is increasing when the topic partitioning is changed.)
+
+The diagram is as follows:
+P10 -> TP10 -> S10
+ ||
+ \/
+P11 -> TP11 -> S11
+P21 -> TP21 -> S21
+
+The following is the sequence of events:
+1. Create Topic Partition TP11 and TP21
+2. Close Publisher(s) P10
+3. Close Subscriber(s) S10
+4. Close Topic Partition TP10
+5. Start Publisher P11, P21
+6. Start Subscriber S11, S21
+
+The dependency is as follows:
+ 2 => 3 => 4
+ | |
+ v v
+ 1 => (5 | 6)
+
+And also:
+2 => 5
+3 => 6
+
+For brokers:
+1. Close all publishers for a topic partition
+2. Close all subscribers for a topic partition
+3. Close the topic partition
+
+*/
diff --git a/weed/mq/pub_balancer/balance_brokers.go b/weed/mq/pub_balancer/balance_brokers.go
new file mode 100644
index 000000000..a6b25b7ca
--- /dev/null
+++ b/weed/mq/pub_balancer/balance_brokers.go
@@ -0,0 +1,52 @@
+package pub_balancer
+
+import (
+ cmap "github.com/orcaman/concurrent-map/v2"
+ "github.com/seaweedfs/seaweedfs/weed/mq/topic"
+ "math/rand"
+)
+
+func BalanceTopicPartitionOnBrokers(brokers cmap.ConcurrentMap[string, *BrokerStats]) BalanceAction {
+ // 1. calculate the average number of partitions per broker
+ var totalPartitionCount int32
+ var totalBrokerCount int32
+ for brokerStats := range brokers.IterBuffered() {
+ totalBrokerCount++
+ totalPartitionCount += brokerStats.Val.TopicPartitionCount
+ }
+ averagePartitionCountPerBroker := totalPartitionCount / totalBrokerCount
+ minPartitionCountPerBroker := averagePartitionCountPerBroker
+ maxPartitionCountPerBroker := averagePartitionCountPerBroker
+ var sourceBroker, targetBroker string
+ var candidatePartition *topic.TopicPartition
+ for brokerStats := range brokers.IterBuffered() {
+ if minPartitionCountPerBroker > brokerStats.Val.TopicPartitionCount {
+ minPartitionCountPerBroker = brokerStats.Val.TopicPartitionCount
+ targetBroker = brokerStats.Key
+ }
+ if maxPartitionCountPerBroker < brokerStats.Val.TopicPartitionCount {
+ maxPartitionCountPerBroker = brokerStats.Val.TopicPartitionCount
+ sourceBroker = brokerStats.Key
+ // select a random partition from the source broker
+ randomePartitionIndex := rand.Intn(int(brokerStats.Val.TopicPartitionCount))
+ index := 0
+ for topicPartitionStats := range brokerStats.Val.TopicPartitionStats.IterBuffered() {
+ if index == randomePartitionIndex {
+ candidatePartition = &topicPartitionStats.Val.TopicPartition
+ break
+ } else {
+ index++
+ }
+ }
+ }
+ }
+ if minPartitionCountPerBroker >= maxPartitionCountPerBroker-1 {
+ return nil
+ }
+ // 2. move the partitions from the source broker to the target broker
+ return &BalanceActionMove{
+ TopicPartition: *candidatePartition,
+ SourceBroker: sourceBroker,
+ TargetBroker: targetBroker,
+ }
+}
diff --git a/weed/mq/pub_balancer/balance_brokers_test.go b/weed/mq/pub_balancer/balance_brokers_test.go
new file mode 100644
index 000000000..54667d154
--- /dev/null
+++ b/weed/mq/pub_balancer/balance_brokers_test.go
@@ -0,0 +1,75 @@
+package pub_balancer
+
+import (
+ cmap "github.com/orcaman/concurrent-map/v2"
+ "github.com/seaweedfs/seaweedfs/weed/mq/topic"
+ "reflect"
+ "testing"
+)
+
+func TestBalanceTopicPartitionOnBrokers(t *testing.T) {
+
+ brokers := cmap.New[*BrokerStats]()
+ broker1Stats := &BrokerStats{
+ TopicPartitionCount: 1,
+ ConsumerCount: 1,
+ CpuUsagePercent: 1,
+ TopicPartitionStats: cmap.New[*TopicPartitionStats](),
+ }
+ broker1Stats.TopicPartitionStats.Set("topic1:0", &TopicPartitionStats{
+ TopicPartition: topic.TopicPartition{
+ Topic: topic.Topic{Namespace: "topic1", Name: "topic1"},
+ Partition: topic.Partition{RangeStart: 0, RangeStop: 512, RingSize: 1024},
+ },
+ ConsumerCount: 1,
+ IsLeader: true,
+ })
+ broker2Stats := &BrokerStats{
+ TopicPartitionCount: 2,
+ ConsumerCount: 1,
+ CpuUsagePercent: 1,
+ TopicPartitionStats: cmap.New[*TopicPartitionStats](),
+ }
+ broker2Stats.TopicPartitionStats.Set("topic1:1", &TopicPartitionStats{
+ TopicPartition: topic.TopicPartition{
+ Topic: topic.Topic{Namespace: "topic1", Name: "topic1"},
+ Partition: topic.Partition{RangeStart: 512, RangeStop: 1024, RingSize: 1024},
+ },
+ ConsumerCount: 1,
+ IsLeader: true,
+ })
+ broker2Stats.TopicPartitionStats.Set("topic2:0", &TopicPartitionStats{
+ TopicPartition: topic.TopicPartition{
+ Topic: topic.Topic{Namespace: "topic2", Name: "topic2"},
+ Partition: topic.Partition{RangeStart: 0, RangeStop: 1024, RingSize: 1024},
+ },
+ ConsumerCount: 1,
+ IsLeader: true,
+ })
+ brokers.Set("broker1", broker1Stats)
+ brokers.Set("broker2", broker2Stats)
+
+ type args struct {
+ brokers cmap.ConcurrentMap[string, *BrokerStats]
+ }
+ tests := []struct {
+ name string
+ args args
+ want BalanceAction
+ }{
+ {
+ name: "test",
+ args: args{
+ brokers: brokers,
+ },
+ want: nil,
+ },
+ }
+ for _, tt := range tests {
+ t.Run(tt.name, func(t *testing.T) {
+ if got := BalanceTopicPartitionOnBrokers(tt.args.brokers); !reflect.DeepEqual(got, tt.want) {
+ t.Errorf("BalanceTopicPartitionOnBrokers() = %v, want %v", got, tt.want)
+ }
+ })
+ }
+}
diff --git a/weed/mq/pub_balancer/balancer.go b/weed/mq/pub_balancer/balancer.go
new file mode 100644
index 000000000..988b971af
--- /dev/null
+++ b/weed/mq/pub_balancer/balancer.go
@@ -0,0 +1,83 @@
+package pub_balancer
+
+import (
+ cmap "github.com/orcaman/concurrent-map/v2"
+ "github.com/seaweedfs/seaweedfs/weed/pb/mq_pb"
+)
+
+const (
+ MaxPartitionCount = 8 * 9 * 5 * 7 //2520
+ LockBrokerBalancer = "broker_balancer"
+)
+
+// Balancer collects stats from all brokers.
+//
+// When publishers wants to create topics, it picks brokers to assign the topic partitions.
+// When consumers wants to subscribe topics, it tells which brokers are serving the topic partitions.
+//
+// When a partition needs to be split or merged, or a partition needs to be moved to another broker,
+// the balancer will let the broker tell the consumer instance to stop processing the partition.
+// The existing consumer instance will flush the internal state, and then stop processing.
+// Then the balancer will tell the brokers to start sending new messages in the new/moved partition to the consumer instances.
+//
+// Failover to standby consumer instances:
+//
+// A consumer group can have min and max number of consumer instances.
+// For consumer instances joined after the max number, they will be in standby mode.
+//
+// When a consumer instance is down, the broker will notice this and inform the balancer.
+// The balancer will then tell the broker to send the partition to another standby consumer instance.
+type Balancer struct {
+ Brokers cmap.ConcurrentMap[string, *BrokerStats] // key: broker address
+ // Collected from all brokers when they connect to the broker leader
+ TopicToBrokers cmap.ConcurrentMap[string, *PartitionSlotToBrokerList] // key: topic name
+}
+func NewBalancer() *Balancer {
+ return &Balancer{
+ Brokers: cmap.New[*BrokerStats](),
+ TopicToBrokers: cmap.New[*PartitionSlotToBrokerList](),
+ }
+}
+
+func (balancer *Balancer) OnBrokerConnected(broker string) (brokerStats *BrokerStats) {
+ var found bool
+ brokerStats, found = balancer.Brokers.Get(broker)
+ if !found {
+ brokerStats = NewBrokerStats()
+ if !balancer.Brokers.SetIfAbsent(broker, brokerStats) {
+ brokerStats, _ = balancer.Brokers.Get(broker)
+ }
+ }
+ return brokerStats
+}
+
+func (balancer *Balancer) OnBrokerDisconnected(broker string, stats *BrokerStats) {
+ balancer.Brokers.Remove(broker)
+
+ // update TopicToBrokers
+ for _, topic := range stats.Topics {
+ partitionSlotToBrokerList, found := balancer.TopicToBrokers.Get(topic.String())
+ if !found {
+ continue
+ }
+ partitionSlotToBrokerList.RemoveBroker(broker)
+ }
+}
+
+func (balancer *Balancer) OnBrokerStatsUpdated(broker string, brokerStats *BrokerStats, receivedStats *mq_pb.BrokerStats) {
+ brokerStats.UpdateStats(receivedStats)
+
+ // update TopicToBrokers
+ for _, topicPartitionStats := range receivedStats.Stats {
+ topic := topicPartitionStats.Topic
+ partition := topicPartitionStats.Partition
+ partitionSlotToBrokerList, found := balancer.TopicToBrokers.Get(topic.String())
+ if !found {
+ partitionSlotToBrokerList = NewPartitionSlotToBrokerList(MaxPartitionCount)
+ if !balancer.TopicToBrokers.SetIfAbsent(topic.String(), partitionSlotToBrokerList) {
+ partitionSlotToBrokerList, _ = balancer.TopicToBrokers.Get(topic.String())
+ }
+ }
+ partitionSlotToBrokerList.AddBroker(partition, broker)
+ }
+}
diff --git a/weed/mq/balancer/balancer.go b/weed/mq/pub_balancer/broker_stats.go
index d93cc8de8..461e93c61 100644
--- a/weed/mq/balancer/balancer.go
+++ b/weed/mq/pub_balancer/broker_stats.go
@@ -1,4 +1,4 @@
-package balancer
+package pub_balancer
import (
"fmt"
@@ -7,37 +7,27 @@ import (
"github.com/seaweedfs/seaweedfs/weed/pb/mq_pb"
)
-const (
- MaxPartitionCount = 8 * 9 * 5 * 7 //2520
- LockBrokerBalancer = "broker_balancer"
-)
-
-// Balancer collects stats from all brokers.
-//
-// When publishers wants to create topics, it picks brokers to assign the topic partitions.
-// When consumers wants to subscribe topics, it tells which brokers are serving the topic partitions.
-//
-// When a partition needs to be split or merged, or a partition needs to be moved to another broker,
-// the balancer will let the broker tell the consumer instance to stop processing the partition.
-// The existing consumer instance will flush the internal state, and then stop processing.
-// Then the balancer will tell the brokers to start sending new messages in the new/moved partition to the consumer instances.
-//
-// Failover to standby consumer instances:
-//
-// A consumer group can have min and max number of consumer instances.
-// For consumer instances joined after the max number, they will be in standby mode.
-//
-// When a consumer instance is down, the broker will notice this and inform the balancer.
-// The balancer will then tell the broker to send the partition to another standby consumer instance.
-type Balancer struct {
- Brokers cmap.ConcurrentMap[string, *BrokerStats]
-}
-
type BrokerStats struct {
TopicPartitionCount int32
ConsumerCount int32
CpuUsagePercent int32
- Stats cmap.ConcurrentMap[string, *TopicPartitionStats]
+ TopicPartitionStats cmap.ConcurrentMap[string, *TopicPartitionStats] // key: topic_partition
+ Topics []topic.Topic
+}
+type TopicPartitionStats struct {
+ topic.TopicPartition
+ ConsumerCount int32
+ IsLeader bool
+}
+
+func NewBrokerStats() *BrokerStats {
+ return &BrokerStats{
+ TopicPartitionStats: cmap.New[*TopicPartitionStats](),
+ }
+}
+func (bs *BrokerStats) String() string {
+ return fmt.Sprintf("BrokerStats{TopicPartitionCount:%d, ConsumerCount:%d, CpuUsagePercent:%d, Stats:%+v}",
+ bs.TopicPartitionCount, bs.ConsumerCount, bs.CpuUsagePercent, bs.TopicPartitionStats.Items())
}
func (bs *BrokerStats) UpdateStats(stats *mq_pb.BrokerStats) {
@@ -45,7 +35,7 @@ func (bs *BrokerStats) UpdateStats(stats *mq_pb.BrokerStats) {
bs.CpuUsagePercent = stats.CpuUsagePercent
var consumerCount int32
- currentTopicPartitions := bs.Stats.Items()
+ currentTopicPartitions := bs.TopicPartitionStats.Items()
for _, topicPartitionStats := range stats.Stats {
tps := &TopicPartitionStats{
TopicPartition: topic.TopicPartition{
@@ -57,12 +47,12 @@ func (bs *BrokerStats) UpdateStats(stats *mq_pb.BrokerStats) {
}
consumerCount += topicPartitionStats.ConsumerCount
key := tps.TopicPartition.String()
- bs.Stats.Set(key, tps)
+ bs.TopicPartitionStats.Set(key, tps)
delete(currentTopicPartitions, key)
}
// remove the topic partitions that are not in the stats
for key := range currentTopicPartitions {
- bs.Stats.Remove(key)
+ bs.TopicPartitionStats.Remove(key)
}
bs.ConsumerCount = consumerCount
@@ -78,28 +68,5 @@ func (bs *BrokerStats) RegisterAssignment(t *mq_pb.Topic, partition *mq_pb.Parti
IsLeader: true,
}
key := tps.TopicPartition.String()
- bs.Stats.Set(key, tps)
-}
-
-func (bs *BrokerStats) String() string {
- return fmt.Sprintf("BrokerStats{TopicPartitionCount:%d, ConsumerCount:%d, CpuUsagePercent:%d, Stats:%+v}",
- bs.TopicPartitionCount, bs.ConsumerCount, bs.CpuUsagePercent, bs.Stats.Items())
-}
-
-type TopicPartitionStats struct {
- topic.TopicPartition
- ConsumerCount int32
- IsLeader bool
-}
-
-func NewBalancer() *Balancer {
- return &Balancer{
- Brokers: cmap.New[*BrokerStats](),
- }
-}
-
-func NewBrokerStats() *BrokerStats {
- return &BrokerStats{
- Stats: cmap.New[*TopicPartitionStats](),
- }
+ bs.TopicPartitionStats.Set(key, tps)
}
diff --git a/weed/mq/balancer/lookup.go b/weed/mq/pub_balancer/lookup.go
index d5b78fc45..3e103a650 100644
--- a/weed/mq/balancer/lookup.go
+++ b/weed/mq/pub_balancer/lookup.go
@@ -1,7 +1,8 @@
-package balancer
+package pub_balancer
import (
"errors"
+ "github.com/seaweedfs/seaweedfs/weed/glog"
"github.com/seaweedfs/seaweedfs/weed/pb/mq_pb"
)
@@ -9,11 +10,14 @@ var (
ErrNoBroker = errors.New("no broker")
)
-func (b *Balancer) LookupOrAllocateTopicPartitions(topic *mq_pb.Topic, publish bool, partitionCount int32) (assignments []*mq_pb.BrokerPartitionAssignment, err error) {
+func (balancer *Balancer) LookupOrAllocateTopicPartitions(topic *mq_pb.Topic, publish bool, partitionCount int32) (assignments []*mq_pb.BrokerPartitionAssignment, err error) {
+ if partitionCount == 0 {
+ partitionCount = 6
+ }
// find existing topic partition assignments
- for brokerStatsItem := range b.Brokers.IterBuffered() {
+ for brokerStatsItem := range balancer.Brokers.IterBuffered() {
broker, brokerStats := brokerStatsItem.Key, brokerStatsItem.Val
- for topicPartitionStatsItem := range brokerStats.Stats.IterBuffered() {
+ for topicPartitionStatsItem := range brokerStats.TopicPartitionStats.IterBuffered() {
topicPartitionStat := topicPartitionStatsItem.Val
if topicPartitionStat.TopicPartition.Namespace == topic.Namespace &&
topicPartitionStat.TopicPartition.Name == topic.Name {
@@ -30,7 +34,8 @@ func (b *Balancer) LookupOrAllocateTopicPartitions(topic *mq_pb.Topic, publish b
}
}
}
- if len(assignments) > 0 {
+ if len(assignments) > 0 && len(assignments) == int(partitionCount) || !publish {
+ glog.V(0).Infof("existing topic partitions %d: %v", len(assignments), assignments)
return assignments, nil
}
@@ -41,8 +46,8 @@ func (b *Balancer) LookupOrAllocateTopicPartitions(topic *mq_pb.Topic, publish b
// if the request is_for_subscribe
// return error not found
// t := topic.FromPbTopic(request.Topic)
- if b.Brokers.IsEmpty() {
+ if balancer.Brokers.IsEmpty() {
return nil, ErrNoBroker
}
- return allocateTopicPartitions(b.Brokers, partitionCount), nil
+ return allocateTopicPartitions(balancer.Brokers, partitionCount), nil
}
diff --git a/weed/mq/pub_balancer/partition_list_broker.go b/weed/mq/pub_balancer/partition_list_broker.go
new file mode 100644
index 000000000..7ceb2a9fc
--- /dev/null
+++ b/weed/mq/pub_balancer/partition_list_broker.go
@@ -0,0 +1,50 @@
+package pub_balancer
+
+import (
+ "github.com/seaweedfs/seaweedfs/weed/glog"
+ "github.com/seaweedfs/seaweedfs/weed/pb/mq_pb"
+)
+
+type PartitionSlotToBroker struct {
+ RangeStart int32
+ RangeStop int32
+ AssignedBroker string
+}
+
+type PartitionSlotToBrokerList struct {
+ PartitionSlots []*PartitionSlotToBroker
+ RingSize int32
+}
+
+func NewPartitionSlotToBrokerList(ringSize int32) *PartitionSlotToBrokerList {
+ return &PartitionSlotToBrokerList{
+ RingSize: ringSize,
+ }
+}
+
+func (ps *PartitionSlotToBrokerList) AddBroker(partition *mq_pb.Partition, broker string) {
+ for _, partitionSlot := range ps.PartitionSlots {
+ if partitionSlot.RangeStart == partition.RangeStart && partitionSlot.RangeStop == partition.RangeStop {
+ if partitionSlot.AssignedBroker == broker {
+ return
+ }
+ if partitionSlot.AssignedBroker != "" {
+ glog.V(0).Infof("partition %s broker change: %s => %s", partition, partitionSlot.AssignedBroker, broker)
+ }
+ partitionSlot.AssignedBroker = broker
+ return
+ }
+ }
+ ps.PartitionSlots = append(ps.PartitionSlots, &PartitionSlotToBroker{
+ RangeStart: partition.RangeStart,
+ RangeStop: partition.RangeStop,
+ AssignedBroker: broker,
+ })
+}
+func (ps *PartitionSlotToBrokerList) RemoveBroker(broker string) {
+ for _, partitionSlot := range ps.PartitionSlots {
+ if partitionSlot.AssignedBroker == broker {
+ partitionSlot.AssignedBroker = ""
+ }
+ }
+}
diff --git a/weed/mq/pub_balancer/repair.go b/weed/mq/pub_balancer/repair.go
new file mode 100644
index 000000000..0ab1a5ea9
--- /dev/null
+++ b/weed/mq/pub_balancer/repair.go
@@ -0,0 +1,127 @@
+package pub_balancer
+
+import (
+ cmap "github.com/orcaman/concurrent-map/v2"
+ "github.com/seaweedfs/seaweedfs/weed/mq/topic"
+ "math/rand"
+ "modernc.org/mathutil"
+ "sort"
+)
+
+func (balancer *Balancer) RepairTopics() []BalanceAction {
+ action := BalanceTopicPartitionOnBrokers(balancer.Brokers)
+ return []BalanceAction{action}
+}
+
+type TopicPartitionInfo struct {
+ Leader string
+ Followers []string
+}
+
+// RepairMissingTopicPartitions check the stats of all brokers,
+// and repair the missing topic partitions on the brokers.
+func RepairMissingTopicPartitions(brokers cmap.ConcurrentMap[string, *BrokerStats]) (actions []BalanceAction) {
+
+ // find all topic partitions
+ topicToTopicPartitions := make(map[topic.Topic]map[topic.Partition]*TopicPartitionInfo)
+ for brokerStatsItem := range brokers.IterBuffered() {
+ broker, brokerStats := brokerStatsItem.Key, brokerStatsItem.Val
+ for topicPartitionStatsItem := range brokerStats.TopicPartitionStats.IterBuffered() {
+ topicPartitionStat := topicPartitionStatsItem.Val
+ topicPartitionToInfo, found := topicToTopicPartitions[topicPartitionStat.Topic]
+ if !found {
+ topicPartitionToInfo = make(map[topic.Partition]*TopicPartitionInfo)
+ topicToTopicPartitions[topicPartitionStat.Topic] = topicPartitionToInfo
+ }
+ tpi, found := topicPartitionToInfo[topicPartitionStat.Partition]
+ if !found {
+ tpi = &TopicPartitionInfo{}
+ topicPartitionToInfo[topicPartitionStat.Partition] = tpi
+ }
+ if topicPartitionStat.IsLeader {
+ tpi.Leader = broker
+ } else {
+ tpi.Followers = append(tpi.Followers, broker)
+ }
+ }
+ }
+
+ // collect all brokers as candidates
+ candidates := make([]string, 0, brokers.Count())
+ for brokerStatsItem := range brokers.IterBuffered() {
+ candidates = append(candidates, brokerStatsItem.Key)
+ }
+
+ // find the missing topic partitions
+ for t, topicPartitionToInfo := range topicToTopicPartitions {
+ missingPartitions := EachTopicRepairMissingTopicPartitions(t, topicPartitionToInfo)
+ for _, partition := range missingPartitions {
+ actions = append(actions, BalanceActionCreate{
+ TopicPartition: topic.TopicPartition{
+ Topic: t,
+ Partition: partition,
+ },
+ TargetBroker: candidates[rand.Intn(len(candidates))],
+ })
+ }
+ }
+
+ return actions
+}
+
+func EachTopicRepairMissingTopicPartitions(t topic.Topic, info map[topic.Partition]*TopicPartitionInfo) (missingPartitions []topic.Partition) {
+
+ // find the missing topic partitions
+ var partitions []topic.Partition
+ for partition := range info {
+ partitions = append(partitions, partition)
+ }
+ return findMissingPartitions(partitions, MaxPartitionCount)
+}
+
+// findMissingPartitions find the missing partitions
+func findMissingPartitions(partitions []topic.Partition, ringSize int32) (missingPartitions []topic.Partition) {
+ // sort the partitions by range start
+ sort.Slice(partitions, func(i, j int) bool {
+ return partitions[i].RangeStart < partitions[j].RangeStart
+ })
+
+ // calculate the average partition size
+ var covered int32
+ for _, partition := range partitions {
+ covered += partition.RangeStop - partition.RangeStart
+ }
+ averagePartitionSize := covered / int32(len(partitions))
+
+ // find the missing partitions
+ var coveredWatermark int32
+ i := 0
+ for i < len(partitions) {
+ partition := partitions[i]
+ if partition.RangeStart > coveredWatermark {
+ upperBound := mathutil.MinInt32(coveredWatermark+averagePartitionSize, partition.RangeStart)
+ missingPartitions = append(missingPartitions, topic.Partition{
+ RangeStart: coveredWatermark,
+ RangeStop: upperBound,
+ RingSize: ringSize,
+ })
+ coveredWatermark = upperBound
+ if coveredWatermark == partition.RangeStop {
+ i++
+ }
+ } else {
+ coveredWatermark = partition.RangeStop
+ i++
+ }
+ }
+ for coveredWatermark < ringSize {
+ upperBound := mathutil.MinInt32(coveredWatermark+averagePartitionSize, ringSize)
+ missingPartitions = append(missingPartitions, topic.Partition{
+ RangeStart: coveredWatermark,
+ RangeStop: upperBound,
+ RingSize: ringSize,
+ })
+ coveredWatermark = upperBound
+ }
+ return missingPartitions
+}
diff --git a/weed/mq/pub_balancer/repair_test.go b/weed/mq/pub_balancer/repair_test.go
new file mode 100644
index 000000000..08465c7e8
--- /dev/null
+++ b/weed/mq/pub_balancer/repair_test.go
@@ -0,0 +1,97 @@
+package pub_balancer
+
+import (
+ "github.com/seaweedfs/seaweedfs/weed/mq/topic"
+ "reflect"
+ "testing"
+)
+
+func Test_findMissingPartitions(t *testing.T) {
+ type args struct {
+ partitions []topic.Partition
+ }
+ tests := []struct {
+ name string
+ args args
+ wantMissingPartitions []topic.Partition
+ }{
+ {
+ name: "one partition",
+ args: args{
+ partitions: []topic.Partition{
+ {RingSize: 1024, RangeStart: 0, RangeStop: 1024},
+ },
+ },
+ wantMissingPartitions: nil,
+ },
+ {
+ name: "two partitions",
+ args: args{
+ partitions: []topic.Partition{
+ {RingSize: 1024, RangeStart: 0, RangeStop: 512},
+ {RingSize: 1024, RangeStart: 512, RangeStop: 1024},
+ },
+ },
+ wantMissingPartitions: nil,
+ },
+ {
+ name: "four partitions, missing last two",
+ args: args{
+ partitions: []topic.Partition{
+ {RingSize: 1024, RangeStart: 0, RangeStop: 256},
+ {RingSize: 1024, RangeStart: 256, RangeStop: 512},
+ },
+ },
+ wantMissingPartitions: []topic.Partition{
+ {RingSize: 1024, RangeStart: 512, RangeStop: 768},
+ {RingSize: 1024, RangeStart: 768, RangeStop: 1024},
+ },
+ },
+ {
+ name: "four partitions, missing first two",
+ args: args{
+ partitions: []topic.Partition{
+ {RingSize: 1024, RangeStart: 512, RangeStop: 768},
+ {RingSize: 1024, RangeStart: 768, RangeStop: 1024},
+ },
+ },
+ wantMissingPartitions: []topic.Partition{
+ {RingSize: 1024, RangeStart: 0, RangeStop: 256},
+ {RingSize: 1024, RangeStart: 256, RangeStop: 512},
+ },
+ },
+ {
+ name: "four partitions, missing middle two",
+ args: args{
+ partitions: []topic.Partition{
+ {RingSize: 1024, RangeStart: 0, RangeStop: 256},
+ {RingSize: 1024, RangeStart: 768, RangeStop: 1024},
+ },
+ },
+ wantMissingPartitions: []topic.Partition{
+ {RingSize: 1024, RangeStart: 256, RangeStop: 512},
+ {RingSize: 1024, RangeStart: 512, RangeStop: 768},
+ },
+ },
+ {
+ name: "four partitions, missing three",
+ args: args{
+ partitions: []topic.Partition{
+ {RingSize: 1024, RangeStart: 512, RangeStop: 768},
+ },
+ },
+ wantMissingPartitions: []topic.Partition{
+ {RingSize: 1024, RangeStart: 0, RangeStop: 256},
+ {RingSize: 1024, RangeStart: 256, RangeStop: 512},
+ {RingSize: 1024, RangeStart: 768, RangeStop: 1024},
+ },
+ },
+ }
+ for _, tt := range tests {
+ t.Run(tt.name, func(t *testing.T) {
+ if gotMissingPartitions := findMissingPartitions(tt.args.partitions, 1024); !reflect.DeepEqual(gotMissingPartitions, tt.wantMissingPartitions) {
+ t.Errorf("findMissingPartitions() = %v, want %v", gotMissingPartitions, tt.wantMissingPartitions)
+ }
+ })
+ }
+}
diff --git a/weed/mq/sub_coordinator/consumer_group.go b/weed/mq/sub_coordinator/consumer_group.go
new file mode 100644
index 000000000..be06a01f8
--- /dev/null
+++ b/weed/mq/sub_coordinator/consumer_group.go
@@ -0,0 +1,41 @@
+package sub_coordinator
+
+import (
+ cmap "github.com/orcaman/concurrent-map/v2"
+ "github.com/seaweedfs/seaweedfs/weed/mq/pub_balancer"
+ "github.com/seaweedfs/seaweedfs/weed/mq/topic"
+ "github.com/seaweedfs/seaweedfs/weed/pb/mq_pb"
+)
+type ConsumerGroupInstance struct {
+ InstanceId string
+ // the consumer group instance may not have an active partition
+ Partitions []*topic.Partition
+ ResponseChan chan *mq_pb.SubscriberToSubCoordinatorResponse
+}
+type ConsumerGroup struct {
+ // map a consumer group instance id to a consumer group instance
+ ConsumerGroupInstances cmap.ConcurrentMap[string, *ConsumerGroupInstance]
+ mapping *PartitionConsumerMapping
+}
+
+func NewConsumerGroup() *ConsumerGroup {
+ return &ConsumerGroup{
+ ConsumerGroupInstances: cmap.New[*ConsumerGroupInstance](),
+ mapping: NewPartitionConsumerMapping(pub_balancer.MaxPartitionCount),
+ }
+}
+
+func NewConsumerGroupInstance(instanceId string) *ConsumerGroupInstance {
+ return &ConsumerGroupInstance{
+ InstanceId: instanceId,
+ ResponseChan: make(chan *mq_pb.SubscriberToSubCoordinatorResponse, 1),
+ }
+}
+func (cg *ConsumerGroup) OnAddConsumerGroupInstance(consumerGroupInstance string, topic *mq_pb.Topic) {
+}
+func (cg *ConsumerGroup) OnRemoveConsumerGroupInstance(consumerGroupInstance string, topic *mq_pb.Topic) {
+
+}
+func (cg *ConsumerGroup) OnPartitionListChange() {
+
+}
diff --git a/weed/mq/sub_coordinator/coordinator.go b/weed/mq/sub_coordinator/coordinator.go
new file mode 100644
index 000000000..f4d65ea5b
--- /dev/null
+++ b/weed/mq/sub_coordinator/coordinator.go
@@ -0,0 +1,86 @@
+package sub_coordinator
+
+import (
+ cmap "github.com/orcaman/concurrent-map/v2"
+ "github.com/seaweedfs/seaweedfs/weed/mq/pub_balancer"
+ "github.com/seaweedfs/seaweedfs/weed/pb/mq_pb"
+)
+
+
+type TopicConsumerGroups struct {
+ // map a consumer group name to a consumer group
+ ConsumerGroups cmap.ConcurrentMap[string, *ConsumerGroup]
+}
+
+// Coordinator coordinates the instances in the consumer group for one topic.
+// It is responsible for:
+// 1. (Maybe) assigning partitions when a consumer instance is up/down.
+
+type Coordinator struct {
+ // map topic name to consumer groups
+ TopicSubscribers cmap.ConcurrentMap[string, *TopicConsumerGroups]
+ balancer *pub_balancer.Balancer
+}
+
+func NewCoordinator(balancer *pub_balancer.Balancer) *Coordinator {
+ return &Coordinator{
+ TopicSubscribers: cmap.New[*TopicConsumerGroups](),
+ balancer: balancer,
+ }
+}
+
+func (c *Coordinator) GetTopicConsumerGroups(topic *mq_pb.Topic) *TopicConsumerGroups {
+ topicName := toTopicName(topic)
+ tcg, _ := c.TopicSubscribers.Get(topicName)
+ if tcg == nil {
+ tcg = &TopicConsumerGroups{
+ ConsumerGroups: cmap.New[*ConsumerGroup](),
+ }
+ c.TopicSubscribers.Set(topicName, tcg)
+ }
+ return tcg
+}
+func (c *Coordinator) RemoveTopic(topic *mq_pb.Topic) {
+ topicName := toTopicName(topic)
+ c.TopicSubscribers.Remove(topicName)
+}
+
+func toTopicName(topic *mq_pb.Topic) string {
+ topicName := topic.Namespace + "." + topic.Name
+ return topicName
+}
+
+func (c *Coordinator) AddSubscriber(consumerGroup, consumerGroupInstance string, topic *mq_pb.Topic) *ConsumerGroupInstance{
+ tcg := c.GetTopicConsumerGroups(topic)
+ cg, _ := tcg.ConsumerGroups.Get(consumerGroup)
+ if cg == nil {
+ cg = NewConsumerGroup()
+ tcg.ConsumerGroups.Set(consumerGroup, cg)
+ }
+ cgi, _ := cg.ConsumerGroupInstances.Get(consumerGroupInstance)
+ if cgi == nil {
+ cgi = NewConsumerGroupInstance(consumerGroupInstance)
+ cg.ConsumerGroupInstances.Set(consumerGroupInstance, cgi)
+ }
+ cg.OnAddConsumerGroupInstance(consumerGroupInstance, topic)
+ return cgi
+}
+
+func (c *Coordinator) RemoveSubscriber(consumerGroup, consumerGroupInstance string, topic *mq_pb.Topic) {
+ tcg, _ := c.TopicSubscribers.Get(toTopicName(topic))
+ if tcg == nil {
+ return
+ }
+ cg, _ := tcg.ConsumerGroups.Get(consumerGroup)
+ if cg == nil {
+ return
+ }
+ cg.ConsumerGroupInstances.Remove(consumerGroupInstance)
+ cg.OnRemoveConsumerGroupInstance(consumerGroupInstance, topic)
+ if cg.ConsumerGroupInstances.Count() == 0 {
+ tcg.ConsumerGroups.Remove(consumerGroup)
+ }
+ if tcg.ConsumerGroups.Count() == 0 {
+ c.RemoveTopic(topic)
+ }
+}
diff --git a/weed/mq/sub_coordinator/partition_consumer_mapping.go b/weed/mq/sub_coordinator/partition_consumer_mapping.go
new file mode 100644
index 000000000..9c80b09c6
--- /dev/null
+++ b/weed/mq/sub_coordinator/partition_consumer_mapping.go
@@ -0,0 +1,119 @@
+package sub_coordinator
+
+import (
+ "fmt"
+ "github.com/seaweedfs/seaweedfs/weed/mq/topic"
+ "time"
+)
+
+type PartitionConsumerMapping struct {
+ currentMapping *PartitionSlotToConsumerInstanceList
+ prevMappings []*PartitionSlotToConsumerInstanceList
+}
+
+func NewPartitionConsumerMapping(ringSize int32) *PartitionConsumerMapping {
+ newVersion := time.Now().UnixNano()
+ return &PartitionConsumerMapping{
+ currentMapping: NewPartitionSlotToConsumerInstanceList(ringSize, newVersion),
+ }
+}
+
+// Balance goal:
+// 1. max processing power utilization
+// 2. allow one consumer instance to be down unexpectedly
+// without affecting the processing power utilization
+
+func (pcm *PartitionConsumerMapping) BalanceToConsumerInstanceIds(partitions []*topic.Partition, consumerInstanceIds []string) {
+ if len(partitions) == 0 || len(consumerInstanceIds) == 0 {
+ return
+ }
+ newVersion := time.Now().UnixNano()
+ newMapping := NewPartitionSlotToConsumerInstanceList(partitions[0].RingSize, newVersion)
+ newMapping.PartitionSlots = doBalanceSticky(partitions, consumerInstanceIds, pcm.prevMappings[0])
+ if pcm.currentMapping != nil {
+ pcm.prevMappings = append(pcm.prevMappings, pcm.currentMapping)
+ }
+ pcm.currentMapping = newMapping
+}
+
+func doBalanceSticky(partitions []*topic.Partition, consumerInstanceIds []string, prevMapping *PartitionSlotToConsumerInstanceList) (partitionSlots []*PartitionSlotToConsumerInstance) {
+ // collect previous consumer instance ids
+ prevConsumerInstanceIds := make(map[string]struct{})
+ if prevMapping != nil {
+ for _, prevPartitionSlot := range prevMapping.PartitionSlots {
+ if prevPartitionSlot.AssignedInstanceId != "" {
+ prevConsumerInstanceIds[prevPartitionSlot.AssignedInstanceId] = struct{}{}
+ }
+ }
+ }
+ // collect current consumer instance ids
+ currConsumerInstanceIds := make(map[string]struct{})
+ for _, consumerInstanceId := range consumerInstanceIds {
+ currConsumerInstanceIds[consumerInstanceId] = struct{}{}
+ }
+
+ // check deleted consumer instances
+ deletedConsumerInstanceIds := make(map[string]struct{})
+ for consumerInstanceId := range prevConsumerInstanceIds {
+ if _, ok := currConsumerInstanceIds[consumerInstanceId]; !ok {
+ deletedConsumerInstanceIds[consumerInstanceId] = struct{}{}
+ }
+ }
+
+ // convert partition slots from list to a map
+ prevPartitionSlotMap := make(map[string]*PartitionSlotToConsumerInstance)
+ if prevMapping != nil {
+ for _, partitionSlot := range prevMapping.PartitionSlots {
+ key := fmt.Sprintf("%d-%d", partitionSlot.RangeStart, partitionSlot.RangeStop)
+ prevPartitionSlotMap[key] = partitionSlot
+ }
+ }
+
+ // make a copy of old mapping, skipping the deleted consumer instances
+ newPartitionSlots := ToPartitionSlots(partitions)
+ for _, newPartitionSlot := range newPartitionSlots {
+ key := fmt.Sprintf("%d-%d", newPartitionSlot.RangeStart, newPartitionSlot.RangeStop)
+ if prevPartitionSlot, ok := prevPartitionSlotMap[key]; ok {
+ if _, ok := deletedConsumerInstanceIds[prevPartitionSlot.AssignedInstanceId]; !ok {
+ newPartitionSlot.AssignedInstanceId = prevPartitionSlot.AssignedInstanceId
+ }
+ }
+ }
+
+ // for all consumer instances, count the average number of partitions
+ // that are assigned to them
+ consumerInstancePartitionCount := make(map[string]int)
+ for _, newPartitionSlot := range newPartitionSlots {
+ if newPartitionSlot.AssignedInstanceId != "" {
+ consumerInstancePartitionCount[newPartitionSlot.AssignedInstanceId]++
+ }
+ }
+ // average number of partitions that are assigned to each consumer instance
+ averageConsumerInstanceLoad := float32(len(partitions)) / float32(len(consumerInstanceIds))
+
+ // assign unassigned partition slots to consumer instances that is underloaded
+ consumerInstanceIdsIndex := 0
+ for _, newPartitionSlot := range newPartitionSlots {
+ if newPartitionSlot.AssignedInstanceId == "" {
+ for avoidDeadLoop := len(consumerInstanceIds); avoidDeadLoop > 0; avoidDeadLoop-- {
+ consumerInstanceId := consumerInstanceIds[consumerInstanceIdsIndex]
+ if float32(consumerInstancePartitionCount[consumerInstanceId]) < averageConsumerInstanceLoad {
+ newPartitionSlot.AssignedInstanceId = consumerInstanceId
+ consumerInstancePartitionCount[consumerInstanceId]++
+ consumerInstanceIdsIndex++
+ if consumerInstanceIdsIndex >= len(consumerInstanceIds) {
+ consumerInstanceIdsIndex = 0
+ }
+ break
+ } else {
+ consumerInstanceIdsIndex++
+ if consumerInstanceIdsIndex >= len(consumerInstanceIds) {
+ consumerInstanceIdsIndex = 0
+ }
+ }
+ }
+ }
+ }
+
+ return newPartitionSlots
+}
diff --git a/weed/mq/sub_coordinator/partition_consumer_mapping_test.go b/weed/mq/sub_coordinator/partition_consumer_mapping_test.go
new file mode 100644
index 000000000..1d3050ef4
--- /dev/null
+++ b/weed/mq/sub_coordinator/partition_consumer_mapping_test.go
@@ -0,0 +1,312 @@
+package sub_coordinator
+
+import (
+ "github.com/seaweedfs/seaweedfs/weed/mq/topic"
+ "reflect"
+ "testing"
+)
+
+func Test_doBalanceSticky(t *testing.T) {
+ type args struct {
+ partitions []*topic.Partition
+ consumerInstanceIds []string
+ prevMapping *PartitionSlotToConsumerInstanceList
+ }
+ tests := []struct {
+ name string
+ args args
+ wantPartitionSlots []*PartitionSlotToConsumerInstance
+ }{
+ {
+ name: "1 consumer instance, 1 partition",
+ args: args{
+ partitions: []*topic.Partition{
+ {
+ RangeStart: 0,
+ RangeStop: 100,
+ },
+ },
+ consumerInstanceIds: []string{"consumer-instance-1"},
+ prevMapping: nil,
+ },
+ wantPartitionSlots: []*PartitionSlotToConsumerInstance{
+ {
+ RangeStart: 0,
+ RangeStop: 100,
+ AssignedInstanceId: "consumer-instance-1",
+ },
+ },
+ },
+ {
+ name: "2 consumer instances, 1 partition",
+ args: args{
+ partitions: []*topic.Partition{
+ {
+ RangeStart: 0,
+ RangeStop: 100,
+ },
+ },
+ consumerInstanceIds: []string{"consumer-instance-1", "consumer-instance-2"},
+ prevMapping: nil,
+ },
+ wantPartitionSlots: []*PartitionSlotToConsumerInstance{
+ {
+ RangeStart: 0,
+ RangeStop: 100,
+ AssignedInstanceId: "consumer-instance-1",
+ },
+ },
+ },
+ {
+ name: "1 consumer instance, 2 partitions",
+ args: args{
+ partitions: []*topic.Partition{
+ {
+ RangeStart: 0,
+ RangeStop: 50,
+ },
+ {
+ RangeStart: 50,
+ RangeStop: 100,
+ },
+ },
+ consumerInstanceIds: []string{"consumer-instance-1"},
+ prevMapping: nil,
+ },
+ wantPartitionSlots: []*PartitionSlotToConsumerInstance{
+ {
+ RangeStart: 0,
+ RangeStop: 50,
+ AssignedInstanceId: "consumer-instance-1",
+ },
+ {
+ RangeStart: 50,
+ RangeStop: 100,
+ AssignedInstanceId: "consumer-instance-1",
+ },
+ },
+ },
+ {
+ name: "2 consumer instances, 2 partitions",
+ args: args{
+ partitions: []*topic.Partition{
+ {
+ RangeStart: 0,
+ RangeStop: 50,
+ },
+ {
+ RangeStart: 50,
+ RangeStop: 100,
+ },
+ },
+ consumerInstanceIds: []string{"consumer-instance-1", "consumer-instance-2"},
+ prevMapping: nil,
+ },
+ wantPartitionSlots: []*PartitionSlotToConsumerInstance{
+ {
+ RangeStart: 0,
+ RangeStop: 50,
+ AssignedInstanceId: "consumer-instance-1",
+ },
+ {
+ RangeStart: 50,
+ RangeStop: 100,
+ AssignedInstanceId: "consumer-instance-2",
+ },
+ },
+ },
+ {
+ name: "2 consumer instances, 2 partitions, 1 deleted consumer instance",
+ args: args{
+ partitions: []*topic.Partition{
+ {
+ RangeStart: 0,
+ RangeStop: 50,
+ },
+ {
+ RangeStart: 50,
+ RangeStop: 100,
+ },
+ },
+ consumerInstanceIds: []string{"consumer-instance-1", "consumer-instance-2"},
+ prevMapping: &PartitionSlotToConsumerInstanceList{
+ PartitionSlots: []*PartitionSlotToConsumerInstance{
+ {
+ RangeStart: 0,
+ RangeStop: 50,
+ AssignedInstanceId: "consumer-instance-3",
+ },
+ {
+ RangeStart: 50,
+ RangeStop: 100,
+ AssignedInstanceId: "consumer-instance-2",
+ },
+ },
+ },
+ },
+ wantPartitionSlots: []*PartitionSlotToConsumerInstance{
+ {
+ RangeStart: 0,
+ RangeStop: 50,
+ AssignedInstanceId: "consumer-instance-1",
+ },
+ {
+ RangeStart: 50,
+ RangeStop: 100,
+ AssignedInstanceId: "consumer-instance-2",
+ },
+ },
+ },
+ {
+ name: "2 consumer instances, 2 partitions, 1 new consumer instance",
+ args: args{
+ partitions: []*topic.Partition{
+ {
+ RangeStart: 0,
+ RangeStop: 50,
+ },
+ {
+ RangeStart: 50,
+ RangeStop: 100,
+ },
+ },
+ consumerInstanceIds: []string{"consumer-instance-1", "consumer-instance-2", "consumer-instance-3"},
+ prevMapping: &PartitionSlotToConsumerInstanceList{
+ PartitionSlots: []*PartitionSlotToConsumerInstance{
+ {
+ RangeStart: 0,
+ RangeStop: 50,
+ AssignedInstanceId: "consumer-instance-3",
+ },
+ {
+ RangeStart: 50,
+ RangeStop: 100,
+ AssignedInstanceId: "consumer-instance-2",
+ },
+ },
+ },
+ },
+ wantPartitionSlots: []*PartitionSlotToConsumerInstance{
+ {
+ RangeStart: 0,
+ RangeStop: 50,
+ AssignedInstanceId: "consumer-instance-3",
+ },
+ {
+ RangeStart: 50,
+ RangeStop: 100,
+ AssignedInstanceId: "consumer-instance-2",
+ },
+ },
+ },
+ {
+ name: "2 consumer instances, 2 partitions, 1 new partition",
+ args: args{
+ partitions: []*topic.Partition{
+ {
+ RangeStart: 0,
+ RangeStop: 50,
+ },
+ {
+ RangeStart: 50,
+ RangeStop: 100,
+ },
+ {
+ RangeStart: 100,
+ RangeStop: 150,
+ },
+ },
+ consumerInstanceIds: []string{"consumer-instance-1", "consumer-instance-2"},
+ prevMapping: &PartitionSlotToConsumerInstanceList{
+ PartitionSlots: []*PartitionSlotToConsumerInstance{
+ {
+ RangeStart: 0,
+ RangeStop: 50,
+ AssignedInstanceId: "consumer-instance-1",
+ },
+ {
+ RangeStart: 50,
+ RangeStop: 100,
+ AssignedInstanceId: "consumer-instance-2",
+ },
+ },
+ },
+ },
+ wantPartitionSlots: []*PartitionSlotToConsumerInstance{
+ {
+ RangeStart: 0,
+ RangeStop: 50,
+ AssignedInstanceId: "consumer-instance-1",
+ },
+ {
+ RangeStart: 50,
+ RangeStop: 100,
+ AssignedInstanceId: "consumer-instance-2",
+ },
+ {
+ RangeStart: 100,
+ RangeStop: 150,
+ AssignedInstanceId: "consumer-instance-1",
+ },
+ },
+ },
+ {
+ name: "2 consumer instances, 2 partitions, 1 new partition, 1 new consumer instance",
+ args: args{
+ partitions: []*topic.Partition{
+ {
+ RangeStart: 0,
+ RangeStop: 50,
+ },
+ {
+ RangeStart: 50,
+ RangeStop: 100,
+ },
+ {
+ RangeStart: 100,
+ RangeStop: 150,
+ },
+ },
+ consumerInstanceIds: []string{"consumer-instance-1", "consumer-instance-2", "consumer-instance-3"},
+ prevMapping: &PartitionSlotToConsumerInstanceList{
+ PartitionSlots: []*PartitionSlotToConsumerInstance{
+ {
+ RangeStart: 0,
+ RangeStop: 50,
+ AssignedInstanceId: "consumer-instance-1",
+ },
+ {
+ RangeStart: 50,
+ RangeStop: 100,
+ AssignedInstanceId: "consumer-instance-2",
+ },
+ },
+ },
+ },
+ wantPartitionSlots: []*PartitionSlotToConsumerInstance{
+ {
+ RangeStart: 0,
+ RangeStop: 50,
+ AssignedInstanceId: "consumer-instance-1",
+ },
+ {
+ RangeStart: 50,
+ RangeStop: 100,
+ AssignedInstanceId: "consumer-instance-2",
+ },
+ {
+ RangeStart: 100,
+ RangeStop: 150,
+ AssignedInstanceId: "consumer-instance-3",
+ },
+ },
+ },
+ }
+ for _, tt := range tests {
+ t.Run(tt.name, func(t *testing.T) {
+ if gotPartitionSlots := doBalanceSticky(tt.args.partitions, tt.args.consumerInstanceIds, tt.args.prevMapping); !reflect.DeepEqual(gotPartitionSlots, tt.wantPartitionSlots) {
+ t.Errorf("doBalanceSticky() = %v, want %v", gotPartitionSlots, tt.wantPartitionSlots)
+ }
+ })
+ }
+}
diff --git a/weed/mq/sub_coordinator/partition_list.go b/weed/mq/sub_coordinator/partition_list.go
new file mode 100644
index 000000000..ca097f2b3
--- /dev/null
+++ b/weed/mq/sub_coordinator/partition_list.go
@@ -0,0 +1,32 @@
+package sub_coordinator
+
+import "github.com/seaweedfs/seaweedfs/weed/mq/topic"
+
+type PartitionSlotToConsumerInstance struct {
+ RangeStart int32
+ RangeStop int32
+ AssignedInstanceId string
+}
+
+type PartitionSlotToConsumerInstanceList struct {
+ PartitionSlots []*PartitionSlotToConsumerInstance
+ RingSize int32
+ Version int64
+}
+
+func NewPartitionSlotToConsumerInstanceList(ringSize int32, version int64) *PartitionSlotToConsumerInstanceList {
+ return &PartitionSlotToConsumerInstanceList{
+ RingSize: ringSize,
+ Version: version,
+ }
+}
+
+func ToPartitionSlots(partitions []*topic.Partition) (partitionSlots []*PartitionSlotToConsumerInstance) {
+ for _, partition := range partitions {
+ partitionSlots = append(partitionSlots, &PartitionSlotToConsumerInstance{
+ RangeStart: partition.RangeStart,
+ RangeStop: partition.RangeStop,
+ })
+ }
+ return
+}
diff --git a/weed/mq/topic/local_manager.go b/weed/mq/topic/local_manager.go
index 0c54f2bb1..173df090d 100644
--- a/weed/mq/topic/local_manager.go
+++ b/weed/mq/topic/local_manager.go
@@ -23,10 +23,7 @@ func NewLocalTopicManager() *LocalTopicManager {
func (manager *LocalTopicManager) AddTopicPartition(topic Topic, localPartition *LocalPartition) {
localTopic, ok := manager.topics.Get(topic.String())
if !ok {
- localTopic = &LocalTopic{
- Topic: topic,
- Partitions: make([]*LocalPartition, 0),
- }
+ localTopic = NewLocalTopic(topic)
}
if !manager.topics.SetIfAbsent(topic.String(), localTopic) {
localTopic, _ = manager.topics.Get(topic.String())
@@ -59,6 +56,22 @@ func (manager *LocalTopicManager) RemoveTopicPartition(topic Topic, partition Pa
return localTopic.removePartition(partition)
}
+func (manager *LocalTopicManager) ClosePublishers(topic Topic, unixTsNs int64) (removed bool) {
+ localTopic, ok := manager.topics.Get(topic.String())
+ if !ok {
+ return false
+ }
+ return localTopic.closePartitionPublishers(unixTsNs)
+}
+
+func (manager *LocalTopicManager) CloseSubscribers(topic Topic, unixTsNs int64) (removed bool) {
+ localTopic, ok := manager.topics.Get(topic.String())
+ if !ok {
+ return false
+ }
+ return localTopic.closePartitionSubscribers(unixTsNs)
+}
+
func (manager *LocalTopicManager) CollectStats(duration time.Duration) *mq_pb.BrokerStats {
stats := &mq_pb.BrokerStats{
Stats: make(map[string]*mq_pb.TopicPartitionStats),
@@ -101,3 +114,11 @@ func (manager *LocalTopicManager) CollectStats(duration time.Duration) *mq_pb.Br
return stats
}
+
+func (manager *LocalTopicManager) WaitUntilNoPublishers(topic Topic) {
+ localTopic, ok := manager.topics.Get(topic.String())
+ if !ok {
+ return
+ }
+ localTopic.WaitUntilNoPublishers()
+}
diff --git a/weed/mq/topic/local_partition.go b/weed/mq/topic/local_partition.go
index 49b639dfa..aa1274ff5 100644
--- a/weed/mq/topic/local_partition.go
+++ b/weed/mq/topic/local_partition.go
@@ -11,19 +11,23 @@ import (
type LocalPartition struct {
Partition
- isLeader bool
- FollowerBrokers []pb.ServerAddress
- logBuffer *log_buffer.LogBuffer
- ConsumerCount int32
+ isLeader bool
+ FollowerBrokers []pb.ServerAddress
+ logBuffer *log_buffer.LogBuffer
+ ConsumerCount int32
+ StopPublishersCh chan struct{}
+ Publishers *LocalPartitionPublishers
+ StopSubscribersCh chan struct{}
+ Subscribers *LocalPartitionSubscribers
}
-func NewLocalPartition(topic Topic, partition Partition, isLeader bool, followerBrokers []pb.ServerAddress) *LocalPartition {
+func NewLocalPartition(partition Partition, isLeader bool, followerBrokers []pb.ServerAddress) *LocalPartition {
return &LocalPartition{
Partition: partition,
isLeader: isLeader,
FollowerBrokers: followerBrokers,
logBuffer: log_buffer.NewLogBuffer(
- fmt.Sprintf("%s/%s/%4d-%4d", topic.Namespace, topic.Name, partition.RangeStart, partition.RangeStop),
+ fmt.Sprintf("%d/%4d-%4d", partition.UnixTimeNs, partition.RangeStart, partition.RangeStop),
2*time.Minute,
func(startTime, stopTime time.Time, buf []byte) {
@@ -32,34 +36,43 @@ func NewLocalPartition(topic Topic, partition Partition, isLeader bool, follower
},
),
+ Publishers: NewLocalPartitionPublishers(),
+ Subscribers: NewLocalPartitionSubscribers(),
}
}
type OnEachMessageFn func(logEntry *filer_pb.LogEntry) error
-func (p LocalPartition) Publish(message *mq_pb.DataMessage) {
+func (p *LocalPartition) Publish(message *mq_pb.DataMessage) {
p.logBuffer.AddToBuffer(message.Key, message.Value, time.Now().UnixNano())
}
-func (p LocalPartition) Subscribe(clientName string, startReadTime time.Time, eachMessageFn OnEachMessageFn) {
- p.logBuffer.LoopProcessLogData(clientName, startReadTime, 0, func() bool {
- return true
- }, eachMessageFn)
+func (p *LocalPartition) Subscribe(clientName string, startReadTime time.Time, onNoMessageFn func() bool, eachMessageFn OnEachMessageFn) {
+ p.logBuffer.LoopProcessLogData(clientName, startReadTime, 0, onNoMessageFn, eachMessageFn)
}
func FromPbBrokerPartitionAssignment(self pb.ServerAddress, assignment *mq_pb.BrokerPartitionAssignment) *LocalPartition {
- isLeaer := assignment.LeaderBroker == string(self)
- localPartition := &LocalPartition{
- Partition: FromPbPartition(assignment.Partition),
- isLeader: isLeaer,
- }
- if !isLeaer {
- return localPartition
- }
+ isLeader := assignment.LeaderBroker == string(self)
followers := make([]pb.ServerAddress, len(assignment.FollowerBrokers))
- for i, follower := range assignment.FollowerBrokers {
- followers[i] = pb.ServerAddress(follower)
+ for i, followerBroker := range assignment.FollowerBrokers {
+ followers[i] = pb.ServerAddress(followerBroker)
+ }
+ return NewLocalPartition(FromPbPartition(assignment.Partition), isLeader, followers)
+}
+
+func (p *LocalPartition) closePublishers() {
+ p.Publishers.SignalShutdown()
+ close(p.StopPublishersCh)
+}
+func (p *LocalPartition) closeSubscribers() {
+ p.Subscribers.SignalShutdown()
+}
+
+func (p *LocalPartition) WaitUntilNoPublishers() {
+ for {
+ if p.Publishers.IsEmpty() {
+ return
+ }
+ time.Sleep(113 * time.Millisecond)
}
- localPartition.FollowerBrokers = followers
- return localPartition
}
diff --git a/weed/mq/topic/local_partition_publishers.go b/weed/mq/topic/local_partition_publishers.go
new file mode 100644
index 000000000..367ccce5f
--- /dev/null
+++ b/weed/mq/topic/local_partition_publishers.go
@@ -0,0 +1,52 @@
+package topic
+
+import "sync"
+
+type LocalPartitionPublishers struct {
+ publishers map[string]*LocalPublisher
+ publishersLock sync.RWMutex
+}
+type LocalPublisher struct {
+}
+
+func NewLocalPublisher() *LocalPublisher {
+ return &LocalPublisher{}
+}
+func (p *LocalPublisher) SignalShutdown() {
+}
+
+func NewLocalPartitionPublishers() *LocalPartitionPublishers {
+ return &LocalPartitionPublishers{
+ publishers: make(map[string]*LocalPublisher),
+ }
+}
+
+func (p *LocalPartitionPublishers) AddPublisher(clientName string, publisher *LocalPublisher) {
+ p.publishersLock.Lock()
+ defer p.publishersLock.Unlock()
+
+ p.publishers[clientName] = publisher
+}
+
+func (p *LocalPartitionPublishers) RemovePublisher(clientName string) {
+ p.publishersLock.Lock()
+ defer p.publishersLock.Unlock()
+
+ delete(p.publishers, clientName)
+}
+
+func (p *LocalPartitionPublishers) SignalShutdown() {
+ p.publishersLock.RLock()
+ defer p.publishersLock.RUnlock()
+
+ for _, publisher := range p.publishers {
+ publisher.SignalShutdown()
+ }
+}
+
+func (p *LocalPartitionPublishers) IsEmpty() bool {
+ p.publishersLock.RLock()
+ defer p.publishersLock.RUnlock()
+
+ return len(p.publishers) == 0
+}
diff --git a/weed/mq/topic/local_partition_subscribers.go b/weed/mq/topic/local_partition_subscribers.go
new file mode 100644
index 000000000..e177ec7e8
--- /dev/null
+++ b/weed/mq/topic/local_partition_subscribers.go
@@ -0,0 +1,49 @@
+package topic
+
+import "sync"
+
+type LocalPartitionSubscribers struct {
+ Subscribers map[string]*LocalSubscriber
+ SubscribersLock sync.RWMutex
+}
+type LocalSubscriber struct {
+ stopCh chan struct{}
+}
+
+func NewLocalSubscriber() *LocalSubscriber {
+ return &LocalSubscriber{
+ stopCh: make(chan struct{}, 1),
+ }
+}
+func (p *LocalSubscriber) SignalShutdown() {
+ close(p.stopCh)
+}
+
+func NewLocalPartitionSubscribers() *LocalPartitionSubscribers {
+ return &LocalPartitionSubscribers{
+ Subscribers: make(map[string]*LocalSubscriber),
+ }
+}
+
+func (p *LocalPartitionSubscribers) AddSubscriber(clientName string, Subscriber *LocalSubscriber) {
+ p.SubscribersLock.Lock()
+ defer p.SubscribersLock.Unlock()
+
+ p.Subscribers[clientName] = Subscriber
+}
+
+func (p *LocalPartitionSubscribers) RemoveSubscriber(clientName string) {
+ p.SubscribersLock.Lock()
+ defer p.SubscribersLock.Unlock()
+
+ delete(p.Subscribers, clientName)
+}
+
+func (p *LocalPartitionSubscribers) SignalShutdown() {
+ p.SubscribersLock.RLock()
+ defer p.SubscribersLock.RUnlock()
+
+ for _, Subscriber := range p.Subscribers {
+ Subscriber.SignalShutdown()
+ }
+}
diff --git a/weed/mq/topic/local_topic.go b/weed/mq/topic/local_topic.go
index ef3c0e65e..7825d2168 100644
--- a/weed/mq/topic/local_topic.go
+++ b/weed/mq/topic/local_topic.go
@@ -1,10 +1,19 @@
package topic
+import "sync"
+
type LocalTopic struct {
Topic
Partitions []*LocalPartition
}
+func NewLocalTopic(topic Topic) *LocalTopic {
+ return &LocalTopic{
+ Topic: topic,
+ Partitions: make([]*LocalPartition, 0),
+ }
+}
+
func (localTopic *LocalTopic) findPartition(partition Partition) *LocalPartition {
for _, localPartition := range localTopic.Partitions {
if localPartition.Partition.Equals(partition) {
@@ -27,3 +36,52 @@ func (localTopic *LocalTopic) removePartition(partition Partition) bool {
localTopic.Partitions = append(localTopic.Partitions[:foundPartitionIndex], localTopic.Partitions[foundPartitionIndex+1:]...)
return true
}
+
+func (localTopic *LocalTopic) closePartitionPublishers(unixTsNs int64) bool {
+ var wg sync.WaitGroup
+ for _, localPartition := range localTopic.Partitions {
+ if localPartition.UnixTimeNs != unixTsNs {
+ continue
+ }
+ wg.Add(1)
+ go func(localPartition *LocalPartition) {
+ defer wg.Done()
+ localPartition.closePublishers()
+ }(localPartition)
+ }
+ wg.Wait()
+ return true
+}
+
+func (localTopic *LocalTopic) closePartitionSubscribers(unixTsNs int64) bool {
+ var wg sync.WaitGroup
+ for _, localPartition := range localTopic.Partitions {
+ if localPartition.UnixTimeNs != unixTsNs {
+ continue
+ }
+ wg.Add(1)
+ go func(localPartition *LocalPartition) {
+ defer wg.Done()
+ localPartition.closeSubscribers()
+ }(localPartition)
+ }
+ wg.Wait()
+ return true
+}
+
+func (localTopic *LocalTopic) WaitUntilNoPublishers() {
+ for {
+ var wg sync.WaitGroup
+ for _, localPartition := range localTopic.Partitions {
+ wg.Add(1)
+ go func(localPartition *LocalPartition) {
+ defer wg.Done()
+ localPartition.WaitUntilNoPublishers()
+ }(localPartition)
+ }
+ wg.Wait()
+ if len(localTopic.Partitions) == 0 {
+ return
+ }
+ }
+}
diff --git a/weed/mq/topic/partition.go b/weed/mq/topic/partition.go
index 79c830f13..ca34c2390 100644
--- a/weed/mq/topic/partition.go
+++ b/weed/mq/topic/partition.go
@@ -8,6 +8,7 @@ type Partition struct {
RangeStart int32
RangeStop int32 // exclusive
RingSize int32
+ UnixTimeNs int64 // in nanoseconds
}
func (partition Partition) Equals(other Partition) bool {
@@ -20,6 +21,9 @@ func (partition Partition) Equals(other Partition) bool {
if partition.RingSize != other.RingSize {
return false
}
+ if partition.UnixTimeNs != other.UnixTimeNs {
+ return false
+ }
return true
}
@@ -28,10 +32,11 @@ func FromPbPartition(partition *mq_pb.Partition) Partition {
RangeStart: partition.RangeStart,
RangeStop: partition.RangeStop,
RingSize: partition.RingSize,
+ UnixTimeNs: partition.UnixTimeNs,
}
}
-func SplitPartitions(targetCount int32) []*Partition {
+func SplitPartitions(targetCount int32, ts int64) []*Partition {
partitions := make([]*Partition, 0, targetCount)
partitionSize := PartitionCount / targetCount
for i := int32(0); i < targetCount; i++ {
@@ -43,7 +48,17 @@ func SplitPartitions(targetCount int32) []*Partition {
RangeStart: i * partitionSize,
RangeStop: partitionStop,
RingSize: PartitionCount,
+ UnixTimeNs: ts,
})
}
return partitions
}
+
+func (partition Partition) ToPbPartition() *mq_pb.Partition {
+ return &mq_pb.Partition{
+ RangeStart: partition.RangeStart,
+ RangeStop: partition.RangeStop,
+ RingSize: partition.RingSize,
+ UnixTimeNs: partition.UnixTimeNs,
+ }
+}
diff --git a/weed/mq/topic/topic.go b/weed/mq/topic/topic.go
index 3d457e6f1..6932fcb56 100644
--- a/weed/mq/topic/topic.go
+++ b/weed/mq/topic/topic.go
@@ -2,9 +2,7 @@ package topic
import (
"fmt"
- "github.com/seaweedfs/seaweedfs/weed/filer"
"github.com/seaweedfs/seaweedfs/weed/pb/mq_pb"
- "time"
)
type Topic struct {
@@ -25,47 +23,13 @@ func FromPbTopic(topic *mq_pb.Topic) Topic {
}
}
-func (tp Topic) String() string {
- return fmt.Sprintf("%s.%s", tp.Namespace, tp.Name)
-}
-
-type Segment struct {
- Topic Topic
- Id int32
- Partition Partition
- LastModified time.Time
-}
-
-func FromPbSegment(segment *mq_pb.Segment) *Segment {
- return &Segment{
- Topic: Topic{
- Namespace: segment.Namespace,
- Name: segment.Topic,
- },
- Id: segment.Id,
- Partition: Partition{
- RangeStart: segment.Partition.RangeStart,
- RangeStop: segment.Partition.RangeStop,
- RingSize: segment.Partition.RingSize,
- },
+func (tp Topic) ToPbTopic() *mq_pb.Topic {
+ return &mq_pb.Topic{
+ Namespace: tp.Namespace,
+ Name: tp.Name,
}
}
-func (segment *Segment) ToPbSegment() *mq_pb.Segment {
- return &mq_pb.Segment{
- Namespace: string(segment.Topic.Namespace),
- Topic: segment.Topic.Name,
- Id: segment.Id,
- Partition: &mq_pb.Partition{
- RingSize: segment.Partition.RingSize,
- RangeStart: segment.Partition.RangeStart,
- RangeStop: segment.Partition.RangeStop,
- },
- }
-}
-
-func (segment *Segment) DirAndName() (dir string, name string) {
- dir = fmt.Sprintf("%s/%s/%s", filer.TopicsDir, segment.Topic.Namespace, segment.Topic.Name)
- name = fmt.Sprintf("%4d.segment", segment.Id)
- return
+func (tp Topic) String() string {
+ return fmt.Sprintf("%s.%s", tp.Namespace, tp.Name)
}