diff options
| author | chrislu <chris.lu@gmail.com> | 2023-08-20 22:53:05 -0700 |
|---|---|---|
| committer | chrislu <chris.lu@gmail.com> | 2023-08-20 22:53:05 -0700 |
| commit | 01d70c21f30988bffa37ffdcb6b80f1646293390 (patch) | |
| tree | ace1aa23be04dc833092e1a33fcf36237b11798f /weed/mq | |
| parent | 3650e5adda604dc7507ba3f0f63799c4cbfa4dfe (diff) | |
| download | seaweedfs-01d70c21f30988bffa37ffdcb6b80f1646293390.tar.xz seaweedfs-01d70c21f30988bffa37ffdcb6b80f1646293390.zip | |
Squashed commit of the following:
commit 32f4b1a13057d56b6de487cdb80ff7c205af01a6
Author: chrislu <chris.lu@gmail.com>
Date: Sun Aug 20 22:52:19 2023 -0700
fix compilation
commit e77ad33b7ca0423138fbae26a4433b60923a9588
Author: chrislu <chris.lu@gmail.com>
Date: Sun Aug 20 22:46:44 2023 -0700
pub
commit f431f30cc7ca277ca299e3cd118c05537fb9f5c3
Author: chrislu <chris.lu@gmail.com>
Date: Sun Aug 20 13:27:39 2023 -0700
fix generic type
commit 4e9dcb18293fd1e3e306e2dceb995dfd67a35e1d
Merge: 30f942580 16e3f2d52
Author: chrislu <chris.lu@gmail.com>
Date: Sun Aug 20 12:47:14 2023 -0700
Merge branch 'master' into pubsub
commit 30f942580ad1bb32ae94aade2e3a21ec3ab63e21
Author: chrislu <chris.lu@gmail.com>
Date: Sun Aug 20 11:10:58 2023 -0700
wip
commit f8b00980bc2f3879bb43decffd9a08d842f196f2
Author: chrislu <chris.lu@gmail.com>
Date: Tue Jul 25 09:14:35 2023 -0700
add design document
commit 08d2bebe42a26ebc39f1542f54d99e73620727dd
Author: chrislu <chris.lu@gmail.com>
Date: Tue Jul 25 09:14:06 2023 -0700
minor
commit bcfa7982b262a40fcdce6fc6613fad2ce07c13da
Author: chrislu <chris.lu@gmail.com>
Date: Tue Jul 25 09:13:49 2023 -0700
rename
Diffstat (limited to 'weed/mq')
| -rw-r--r-- | weed/mq/broker/brokder_grpc_admin.go | 107 | ||||
| -rw-r--r-- | weed/mq/broker/brokder_grpc_pub.go | 122 | ||||
| -rw-r--r-- | weed/mq/broker/broker_segment_serde.go | 14 | ||||
| -rw-r--r-- | weed/mq/broker/broker_server.go | 21 | ||||
| -rw-r--r-- | weed/mq/segment/message_serde_test.go | 5 | ||||
| -rw-r--r-- | weed/mq/topic/local_manager.go | 54 | ||||
| -rw-r--r-- | weed/mq/topic/local_partition.go | 40 | ||||
| -rw-r--r-- | weed/mq/topic/local_topic.go | 29 | ||||
| -rw-r--r-- | weed/mq/topic/partition.go | 32 | ||||
| -rw-r--r-- | weed/mq/topic/topic.go (renamed from weed/mq/topic.go) | 21 | ||||
| -rw-r--r-- | weed/mq/topic_allocation/allocation.go | 81 |
11 files changed, 487 insertions, 39 deletions
diff --git a/weed/mq/broker/brokder_grpc_admin.go b/weed/mq/broker/brokder_grpc_admin.go index 7e669ef9b..5aac780fb 100644 --- a/weed/mq/broker/brokder_grpc_admin.go +++ b/weed/mq/broker/brokder_grpc_admin.go @@ -4,7 +4,7 @@ import ( "context" "github.com/seaweedfs/seaweedfs/weed/cluster" "github.com/seaweedfs/seaweedfs/weed/glog" - "github.com/seaweedfs/seaweedfs/weed/mq" + "github.com/seaweedfs/seaweedfs/weed/mq/topic" "github.com/seaweedfs/seaweedfs/weed/pb" "github.com/seaweedfs/seaweedfs/weed/pb/master_pb" "github.com/seaweedfs/seaweedfs/weed/pb/mq_pb" @@ -12,6 +12,10 @@ import ( "sync" ) +const ( + MaxPartitionCount = 1024 +) + func (broker *MessageQueueBroker) FindBrokerLeader(c context.Context, request *mq_pb.FindBrokerLeaderRequest) (*mq_pb.FindBrokerLeaderResponse, error) { ret := &mq_pb.FindBrokerLeaderResponse{} err := broker.withMasterClient(false, broker.MasterClient.GetMaster(), func(client master_pb.SeaweedClient) error { @@ -31,21 +35,9 @@ func (broker *MessageQueueBroker) FindBrokerLeader(c context.Context, request *m return ret, err } -func (broker *MessageQueueBroker) CheckSegmentStatus(c context.Context, request *mq_pb.CheckSegmentStatusRequest) (*mq_pb.CheckSegmentStatusResponse, error) { - ret := &mq_pb.CheckSegmentStatusResponse{} - // TODO add in memory active segment - return ret, nil -} - -func (broker *MessageQueueBroker) CheckBrokerLoad(c context.Context, request *mq_pb.CheckBrokerLoadRequest) (*mq_pb.CheckBrokerLoadResponse, error) { - ret := &mq_pb.CheckBrokerLoadResponse{} - // TODO read broker's load - return ret, nil -} - func (broker *MessageQueueBroker) AssignSegmentBrokers(c context.Context, request *mq_pb.AssignSegmentBrokersRequest) (*mq_pb.AssignSegmentBrokersResponse, error) { ret := &mq_pb.AssignSegmentBrokersResponse{} - segment := mq.FromPbSegment(request.Segment) + segment := topic.FromPbSegment(request.Segment) // check existing segment locations on filer existingBrokers, err := broker.checkSegmentOnFiler(segment) @@ -84,7 +76,92 @@ func (broker *MessageQueueBroker) AssignSegmentBrokers(c context.Context, reques return ret, nil } -func (broker *MessageQueueBroker) checkSegmentsOnBrokers(segment *mq.Segment, brokers []pb.ServerAddress) (active bool, err error) { +func (broker *MessageQueueBroker) CheckSegmentStatus(c context.Context, request *mq_pb.CheckSegmentStatusRequest) (*mq_pb.CheckSegmentStatusResponse, error) { + ret := &mq_pb.CheckSegmentStatusResponse{} + // TODO add in memory active segment + return ret, nil +} + +func (broker *MessageQueueBroker) CheckBrokerLoad(c context.Context, request *mq_pb.CheckBrokerLoadRequest) (*mq_pb.CheckBrokerLoadResponse, error) { + ret := &mq_pb.CheckBrokerLoadResponse{} + // TODO read broker's load + return ret, nil +} + +// FindTopicBrokers returns the brokers that are serving the topic +// +// 1. lock the topic +// +// 2. find the topic partitions on the filer +// 2.1 if the topic is not found, return error +// 2.2 if the request is_for_publish, create the topic +// 2.2.1 if the request is_for_subscribe, return error not found +// 2.2.2 if the request is_for_publish, create the topic +// 2.2 if the topic is found, return the brokers +// +// 3. unlock the topic +func (broker *MessageQueueBroker) FindTopicBrokers(c context.Context, request *mq_pb.FindTopicBrokersRequest) (*mq_pb.FindTopicBrokersResponse, error) { + ret := &mq_pb.FindTopicBrokersResponse{} + // lock the topic + + // find the topic partitions on the filer + // if the topic is not found + // if the request is_for_publish + // create the topic + // if the request is_for_subscribe + // return error not found + return ret, nil +} + +// CheckTopicPartitionsStatus check the topic partitions on the broker +func (broker *MessageQueueBroker) CheckTopicPartitionsStatus(c context.Context, request *mq_pb.CheckTopicPartitionsStatusRequest) (*mq_pb.CheckTopicPartitionsStatusResponse, error) { + ret := &mq_pb.CheckTopicPartitionsStatusResponse{} + return ret, nil +} + +// createOrUpdateTopicPartitions creates the topic partitions on the broker +// 1. check +func (broker *MessageQueueBroker) createOrUpdateTopicPartitions(topic *topic.Topic, prevAssignment *mq_pb.TopicPartitionsAssignment) (err error) { + // create or update each partition + if prevAssignment == nil { + broker.createOrUpdateTopicPartition(topic, nil) + } else { + for _, partitionAssignment := range prevAssignment.BrokerPartitions { + broker.createOrUpdateTopicPartition(topic, partitionAssignment) + } + } + return nil +} + +func (broker *MessageQueueBroker) createOrUpdateTopicPartition(topic *topic.Topic, oldAssignment *mq_pb.BrokerPartitionsAssignment) (newAssignment *mq_pb.BrokerPartitionsAssignment) { + shouldCreate := broker.confirmBrokerPartitionAssignment(topic, oldAssignment) + if !shouldCreate { + + } + return +} +func (broker *MessageQueueBroker) confirmBrokerPartitionAssignment(topic *topic.Topic, oldAssignment *mq_pb.BrokerPartitionsAssignment) (shouldCreate bool) { + if oldAssignment == nil { + return true + } + for _, b := range oldAssignment.FollowerBrokers { + pb.WithBrokerClient(false, pb.ServerAddress(b), broker.grpcDialOption, func(client mq_pb.SeaweedMessagingClient) error { + _, err := client.CheckTopicPartitionsStatus(context.Background(), &mq_pb.CheckTopicPartitionsStatusRequest{ + Namespace: string(topic.Namespace), + Topic: topic.Name, + BrokerPartitionsAssignment: oldAssignment, + ShouldCancelIfNotMatch: true, + }) + if err != nil { + shouldCreate = true + } + return nil + }) + } + return +} + +func (broker *MessageQueueBroker) checkSegmentsOnBrokers(segment *topic.Segment, brokers []pb.ServerAddress) (active bool, err error) { var wg sync.WaitGroup for _, candidate := range brokers { diff --git a/weed/mq/broker/brokder_grpc_pub.go b/weed/mq/broker/brokder_grpc_pub.go index a26be5171..58ab6e5d2 100644 --- a/weed/mq/broker/brokder_grpc_pub.go +++ b/weed/mq/broker/brokder_grpc_pub.go @@ -1,16 +1,136 @@ package broker import ( + "context" + "fmt" + "github.com/seaweedfs/seaweedfs/weed/glog" + "github.com/seaweedfs/seaweedfs/weed/mq/topic" + "github.com/seaweedfs/seaweedfs/weed/pb" "github.com/seaweedfs/seaweedfs/weed/pb/mq_pb" ) +// For a new or re-configured topic, or one of the broker went offline, +// the pub clients ask one broker what are the brokers for all the topic partitions. +// The broker will lock the topic on write. +// 1. if the topic is not found, create the topic, and allocate the topic partitions to the brokers +// 2. if the topic is found, return the brokers for the topic partitions +// For a topic to read from, the sub clients ask one broker what are the brokers for all the topic partitions. +// The broker will lock the topic on read. +// 1. if the topic is not found, return error +// 2. if the topic is found, return the brokers for the topic partitions +// +// If the topic needs to be re-balanced, the admin client will lock the topic, +// 1. collect throughput information for all the brokers +// 2. adjust the topic partitions to the brokers +// 3. notify the brokers to add/remove partitions to host +// 3.1 When locking the topic, the partitions and brokers should be remembered in the lock. +// 4. the brokers will stop process incoming messages if not the right partition +// 4.1 the pub clients will need to re-partition the messages and publish to the right brokers for the partition3 +// 4.2 the sub clients will need to change the brokers to read from +// +// The following is from each individual component's perspective: +// For a pub client +// For current topic/partition, ask one broker for the brokers for the topic partitions +// 1. connect to the brokers and keep sending, until the broker returns error, or the broker leader is moved. +// For a sub client +// For current topic/partition, ask one broker for the brokers for the topic partitions +// 1. connect to the brokers and keep reading, until the broker returns error, or the broker leader is moved. +// For a broker +// Upon a pub client lookup: +// 1. lock the topic +// 2. if already has topic partition assignment, check all brokers are healthy +// 3. if not, create topic partition assignment +// 2. return the brokers for the topic partitions +// 3. unlock the topic +// Upon a sub client lookup: +// 1. lock the topic +// 2. if already has topic partition assignment, check all brokers are healthy +// 3. if not, return error +// 2. return the brokers for the topic partitions +// 3. unlock the topic +// For an admin tool +// 0. collect stats from all the brokers, and find the topic worth moving +// 1. lock the topic +// 2. collect throughput information for all the brokers +// 3. adjust the topic partitions to the brokers +// 4. notify the brokers to add/remove partitions to host +// 5. the brokers will stop process incoming messages if not the right partition +// 6. unlock the topic + /* -The messages is buffered in memory, and saved to filer under +The messages are buffered in memory, and saved to filer under /topics/<topic>/<date>/<hour>/<segment>/*.msg /topics/<topic>/<date>/<hour>/segment /topics/<topic>/info/segment_<id>.meta + + + */ func (broker *MessageQueueBroker) Publish(stream mq_pb.SeaweedMessaging_PublishServer) error { + // 1. write to the volume server + // 2. find the topic metadata owning filer + // 3. write to the filer + + var localTopicPartition *topic.LocalPartition + for { + req, err := stream.Recv() + if err != nil { + return err + } + + // Process the received message + sequence := req.GetSequence() + response := &mq_pb.PublishResponse{ + AckSequence: sequence, + } + if dataMessage := req.GetData(); dataMessage != nil { + if localTopicPartition == nil { + response.Error = "topic partition not initialized" + glog.Errorf("topic partition not found") + } else { + localTopicPartition.Publish(dataMessage) + } + } else if initMessage := req.GetInit(); initMessage != nil { + localTopicPartition = broker.localTopicManager.GetTopicPartition( + topic.NewTopic(topic.Namespace(initMessage.Segment.Namespace), initMessage.Segment.Topic), + topic.FromPbPartition(initMessage.Segment.Partition), + ) + if localTopicPartition == nil { + response.Error = fmt.Sprintf("topic partition %v not found", initMessage.Segment) + glog.Errorf("topic partition %v not found", initMessage.Segment) + } + } + if err := stream.Send(response); err != nil { + glog.Errorf("Error sending setup response: %v", err) + } + } + return nil } + +// AssignTopicPartitions Runs on the assigned broker, to execute the topic partition assignment +func (broker *MessageQueueBroker) AssignTopicPartitions(c context.Context, request *mq_pb.AssignTopicPartitionsRequest) (*mq_pb.AssignTopicPartitionsResponse, error) { + ret := &mq_pb.AssignTopicPartitionsResponse{} + self := pb.ServerAddress(fmt.Sprintf("%s:%d", broker.option.Ip, broker.option.Port)) + + for _, partition := range request.TopicPartitionsAssignment.BrokerPartitions { + localPartiton := topic.FromPbBrokerPartitionsAssignment(self, partition) + broker.localTopicManager.AddTopicPartition( + topic.FromPbTopic(request.Topic), + localPartiton) + if request.IsLeader { + for _, follower := range localPartiton.FollowerBrokers { + err := pb.WithBrokerClient(false, follower, broker.grpcDialOption, func(client mq_pb.SeaweedMessagingClient) error { + _, err := client.AssignTopicPartitions(context.Background(), request) + return err + }) + if err != nil { + return ret, err + } + } + } + } + + return ret, nil +} diff --git a/weed/mq/broker/broker_segment_serde.go b/weed/mq/broker/broker_segment_serde.go index e36867da0..bb9aecc0b 100644 --- a/weed/mq/broker/broker_segment_serde.go +++ b/weed/mq/broker/broker_segment_serde.go @@ -4,7 +4,7 @@ import ( "bytes" "fmt" "github.com/seaweedfs/seaweedfs/weed/filer" - "github.com/seaweedfs/seaweedfs/weed/mq" + "github.com/seaweedfs/seaweedfs/weed/mq/topic" "github.com/seaweedfs/seaweedfs/weed/pb" "github.com/seaweedfs/seaweedfs/weed/pb/filer_pb" "github.com/seaweedfs/seaweedfs/weed/pb/mq_pb" @@ -12,8 +12,8 @@ import ( "time" ) -func (broker *MessageQueueBroker) checkSegmentOnFiler(segment *mq.Segment) (brokers []pb.ServerAddress, err error) { - info, found, err := broker.readSegmentOnFiler(segment) +func (broker *MessageQueueBroker) checkSegmentOnFiler(segment *topic.Segment) (brokers []pb.ServerAddress, err error) { + info, found, err := broker.readSegmentInfoOnFiler(segment) if err != nil { return } @@ -27,12 +27,12 @@ func (broker *MessageQueueBroker) checkSegmentOnFiler(segment *mq.Segment) (brok return } -func (broker *MessageQueueBroker) saveSegmentBrokersOnFiler(segment *mq.Segment, brokers []pb.ServerAddress) (err error) { +func (broker *MessageQueueBroker) saveSegmentBrokersOnFiler(segment *topic.Segment, brokers []pb.ServerAddress) (err error) { var nodes []string for _, b := range brokers { nodes = append(nodes, string(b)) } - broker.saveSegmentToFiler(segment, &mq_pb.SegmentInfo{ + broker.saveSegmentInfoToFiler(segment, &mq_pb.SegmentInfo{ Segment: segment.ToPbSegment(), StartTsNs: time.Now().UnixNano(), Brokers: nodes, @@ -43,7 +43,7 @@ func (broker *MessageQueueBroker) saveSegmentBrokersOnFiler(segment *mq.Segment, return } -func (broker *MessageQueueBroker) readSegmentOnFiler(segment *mq.Segment) (info *mq_pb.SegmentInfo, found bool, err error) { +func (broker *MessageQueueBroker) readSegmentInfoOnFiler(segment *topic.Segment) (info *mq_pb.SegmentInfo, found bool, err error) { dir, name := segment.DirAndName() found, err = filer_pb.Exists(broker, dir, name, false) @@ -70,7 +70,7 @@ func (broker *MessageQueueBroker) readSegmentOnFiler(segment *mq.Segment) (info return } -func (broker *MessageQueueBroker) saveSegmentToFiler(segment *mq.Segment, info *mq_pb.SegmentInfo) (err error) { +func (broker *MessageQueueBroker) saveSegmentInfoToFiler(segment *topic.Segment, info *mq_pb.SegmentInfo) (err error) { dir, name := segment.DirAndName() var buf bytes.Buffer diff --git a/weed/mq/broker/broker_server.go b/weed/mq/broker/broker_server.go index 7ec7fb431..4f5b3c28d 100644 --- a/weed/mq/broker/broker_server.go +++ b/weed/mq/broker/broker_server.go @@ -1,6 +1,7 @@ package broker import ( + "github.com/seaweedfs/seaweedfs/weed/mq/topic" "time" "github.com/seaweedfs/seaweedfs/weed/cluster" @@ -27,20 +28,22 @@ type MessageQueueBrokerOption struct { type MessageQueueBroker struct { mq_pb.UnimplementedSeaweedMessagingServer - option *MessageQueueBrokerOption - grpcDialOption grpc.DialOption - MasterClient *wdclient.MasterClient - filers map[pb.ServerAddress]struct{} - currentFiler pb.ServerAddress + option *MessageQueueBrokerOption + grpcDialOption grpc.DialOption + MasterClient *wdclient.MasterClient + filers map[pb.ServerAddress]struct{} + currentFiler pb.ServerAddress + localTopicManager *topic.LocalTopicManager } func NewMessageBroker(option *MessageQueueBrokerOption, grpcDialOption grpc.DialOption) (mqBroker *MessageQueueBroker, err error) { mqBroker = &MessageQueueBroker{ - option: option, - grpcDialOption: grpcDialOption, - MasterClient: wdclient.NewMasterClient(grpcDialOption, option.FilerGroup, cluster.BrokerType, pb.NewServerAddress(option.Ip, option.Port, 0), option.DataCenter, option.Rack, option.Masters), - filers: make(map[pb.ServerAddress]struct{}), + option: option, + grpcDialOption: grpcDialOption, + MasterClient: wdclient.NewMasterClient(grpcDialOption, option.FilerGroup, cluster.BrokerType, pb.NewServerAddress(option.Ip, option.Port, 0), option.DataCenter, option.Rack, option.Masters), + filers: make(map[pb.ServerAddress]struct{}), + localTopicManager: topic.NewLocalTopicManager(), } mqBroker.MasterClient.SetOnPeerUpdateFn(mqBroker.OnBrokerUpdate) diff --git a/weed/mq/segment/message_serde_test.go b/weed/mq/segment/message_serde_test.go index c65bffb84..a54ce5708 100644 --- a/weed/mq/segment/message_serde_test.go +++ b/weed/mq/segment/message_serde_test.go @@ -17,6 +17,7 @@ func TestMessageSerde(t *testing.T) { bb := NewMessageBatchBuilder(b, 1, 2, 3, 4) bb.AddMessage(5, 6, prop, []byte("the primary key"), []byte("body is here")) + bb.AddMessage(5, 7, prop, []byte("the primary 2"), []byte("body is 2")) bb.BuildMessageBatch() @@ -33,9 +34,9 @@ func TestMessageSerde(t *testing.T) { assert.Equal(t, int64(5), mb.SegmentSeqBase()) assert.Equal(t, int32(0), mb.SegmentSeqMaxDelta()) assert.Equal(t, int64(6), mb.TsMsBase()) - assert.Equal(t, int32(0), mb.TsMsMaxDelta()) + assert.Equal(t, int32(1), mb.TsMsMaxDelta()) - assert.Equal(t, 1, mb.MessagesLength()) + assert.Equal(t, 2, mb.MessagesLength()) m := &message_fbs.Message{} mb.Messages(m, 0) diff --git a/weed/mq/topic/local_manager.go b/weed/mq/topic/local_manager.go new file mode 100644 index 000000000..168e3d561 --- /dev/null +++ b/weed/mq/topic/local_manager.go @@ -0,0 +1,54 @@ +package topic + +import ( + cmap "github.com/orcaman/concurrent-map/v2" +) + +// LocalTopicManager manages topics on local broker +type LocalTopicManager struct { + topics cmap.ConcurrentMap[string, *LocalTopic] +} + +// NewLocalTopicManager creates a new LocalTopicManager +func NewLocalTopicManager() *LocalTopicManager { + return &LocalTopicManager{ + topics: cmap.New[*LocalTopic](), + } +} + +// AddTopic adds a topic to the local topic manager +func (manager *LocalTopicManager) AddTopicPartition(topic Topic, localPartition *LocalPartition) { + localTopic, ok := manager.topics.Get(topic.String()) + if !ok { + localTopic = &LocalTopic{ + Topic: topic, + Partitions: make([]*LocalPartition, 0), + } + } + if localTopic.findPartition(localPartition.Partition) != nil { + return + } + localTopic.Partitions = append(localTopic.Partitions, localPartition) +} + +// GetTopic gets a topic from the local topic manager +func (manager *LocalTopicManager) GetTopicPartition(topic Topic, partition Partition) *LocalPartition { + localTopic, ok := manager.topics.Get(topic.String()) + if !ok { + return nil + } + return localTopic.findPartition(partition) +} + +// RemoveTopic removes a topic from the local topic manager +func (manager *LocalTopicManager) RemoveTopic(topic Topic) { + manager.topics.Remove(topic.String()) +} + +func (manager *LocalTopicManager) RemoveTopicPartition(topic Topic, partition Partition) (removed bool) { + localTopic, ok := manager.topics.Get(topic.String()) + if !ok { + return false + } + return localTopic.removePartition(partition) +} diff --git a/weed/mq/topic/local_partition.go b/weed/mq/topic/local_partition.go new file mode 100644 index 000000000..e26b7afd1 --- /dev/null +++ b/weed/mq/topic/local_partition.go @@ -0,0 +1,40 @@ +package topic + +import ( + "github.com/seaweedfs/seaweedfs/weed/pb" + "github.com/seaweedfs/seaweedfs/weed/pb/mq_pb" + "github.com/seaweedfs/seaweedfs/weed/util/log_buffer" + "time" +) + +type LocalPartition struct { + Partition + isLeader bool + FollowerBrokers []pb.ServerAddress + logBuffer *log_buffer.LogBuffer +} + +func (p LocalPartition) Publish(message *mq_pb.PublishRequest_DataMessage) { + p.logBuffer.AddToBuffer(message.Key, message.Value, time.Now().UnixNano()) +} + +func FromPbBrokerPartitionsAssignment(self pb.ServerAddress, assignment *mq_pb.BrokerPartitionsAssignment) *LocalPartition { + isLeaer := assignment.LeaderBroker == string(self) + localPartition := &LocalPartition{ + Partition: Partition{ + RangeStart: assignment.PartitionStart, + RangeStop: assignment.PartitionStop, + RingSize: PartitionCount, + }, + isLeader: isLeaer, + } + if !isLeaer { + return localPartition + } + followers := make([]pb.ServerAddress, len(assignment.FollowerBrokers)) + for i, follower := range assignment.FollowerBrokers { + followers[i] = pb.ServerAddress(follower) + } + localPartition.FollowerBrokers = followers + return localPartition +} diff --git a/weed/mq/topic/local_topic.go b/weed/mq/topic/local_topic.go new file mode 100644 index 000000000..ef3c0e65e --- /dev/null +++ b/weed/mq/topic/local_topic.go @@ -0,0 +1,29 @@ +package topic + +type LocalTopic struct { + Topic + Partitions []*LocalPartition +} + +func (localTopic *LocalTopic) findPartition(partition Partition) *LocalPartition { + for _, localPartition := range localTopic.Partitions { + if localPartition.Partition.Equals(partition) { + return localPartition + } + } + return nil +} +func (localTopic *LocalTopic) removePartition(partition Partition) bool { + foundPartitionIndex := -1 + for i, localPartition := range localTopic.Partitions { + if localPartition.Partition.Equals(partition) { + foundPartitionIndex = i + break + } + } + if foundPartitionIndex == -1 { + return false + } + localTopic.Partitions = append(localTopic.Partitions[:foundPartitionIndex], localTopic.Partitions[foundPartitionIndex+1:]...) + return true +} diff --git a/weed/mq/topic/partition.go b/weed/mq/topic/partition.go new file mode 100644 index 000000000..285bdcb36 --- /dev/null +++ b/weed/mq/topic/partition.go @@ -0,0 +1,32 @@ +package topic + +import "github.com/seaweedfs/seaweedfs/weed/pb/mq_pb" + +const PartitionCount = 4096 + +type Partition struct { + RangeStart int32 + RangeStop int32 // exclusive + RingSize int32 +} + +func (partition Partition) Equals(other Partition) bool { + if partition.RangeStart != other.RangeStart { + return false + } + if partition.RangeStop != other.RangeStop { + return false + } + if partition.RingSize != other.RingSize { + return false + } + return true +} + +func FromPbPartition(partition *mq_pb.Partition) Partition { + return Partition{ + RangeStart: partition.RangeStart, + RangeStop: partition.RangeStop, + RingSize: partition.RingSize, + } +} diff --git a/weed/mq/topic.go b/weed/mq/topic/topic.go index 96544bac9..430999179 100644 --- a/weed/mq/topic.go +++ b/weed/mq/topic/topic.go @@ -1,4 +1,4 @@ -package mq +package topic import ( "fmt" @@ -14,10 +14,21 @@ type Topic struct { Name string } -type Partition struct { - RangeStart int32 - RangeStop int32 // exclusive - RingSize int32 +func NewTopic(namespace Namespace, name string) Topic { + return Topic{ + Namespace: namespace, + Name: name, + } +} +func FromPbTopic(topic *mq_pb.Topic) Topic { + return Topic{ + Namespace: Namespace(topic.Namespace), + Name: topic.Name, + } +} + +func (tp Topic) String() string { + return fmt.Sprintf("%s.%s", tp.Namespace, tp.Name) } type Segment struct { diff --git a/weed/mq/topic_allocation/allocation.go b/weed/mq/topic_allocation/allocation.go new file mode 100644 index 000000000..a07ce4884 --- /dev/null +++ b/weed/mq/topic_allocation/allocation.go @@ -0,0 +1,81 @@ +package topic_allocation + +import ( + "github.com/seaweedfs/seaweedfs/weed/mq/topic" + "github.com/seaweedfs/seaweedfs/weed/pb" + "github.com/seaweedfs/seaweedfs/weed/pb/mq_pb" + "modernc.org/mathutil" +) + +const ( + DefaultBrokerCount = 4 +) + +// AllocateBrokersForTopicPartitions allocate brokers for a topic's all partitions +func AllocateBrokersForTopicPartitions(t topic.Topic, prevAssignment *mq_pb.TopicPartitionsAssignment, candidateBrokers []pb.ServerAddress) (assignment *mq_pb.TopicPartitionsAssignment, err error) { + // create a previous assignment if not exists + if prevAssignment == nil || len(prevAssignment.BrokerPartitions) == 0 { + prevAssignment = &mq_pb.TopicPartitionsAssignment{ + PartitionCount: topic.PartitionCount, + } + partitionCountForEachBroker := topic.PartitionCount / DefaultBrokerCount + for i := 0; i < DefaultBrokerCount; i++ { + prevAssignment.BrokerPartitions = append(prevAssignment.BrokerPartitions, &mq_pb.BrokerPartitionsAssignment{ + PartitionStart: int32(i * partitionCountForEachBroker), + PartitionStop: mathutil.MaxInt32(int32((i+1)*partitionCountForEachBroker), topic.PartitionCount), + }) + } + } + + // create a new assignment + assignment = &mq_pb.TopicPartitionsAssignment{ + PartitionCount: prevAssignment.PartitionCount, + } + + // allocate partitions for each partition range + for _, brokerPartition := range prevAssignment.BrokerPartitions { + // allocate partitions for each partition range + leader, followers, err := allocateBrokersForOneTopicPartition(t, brokerPartition, candidateBrokers) + if err != nil { + return nil, err + } + + followerBrokers := make([]string, len(followers)) + for i, follower := range followers { + followerBrokers[i] = string(follower) + } + + assignment.BrokerPartitions = append(assignment.BrokerPartitions, &mq_pb.BrokerPartitionsAssignment{ + PartitionStart: brokerPartition.PartitionStart, + PartitionStop: brokerPartition.PartitionStop, + LeaderBroker: string(leader), + FollowerBrokers: followerBrokers, + }) + } + + return +} + +func allocateBrokersForOneTopicPartition(t topic.Topic, brokerPartition *mq_pb.BrokerPartitionsAssignment, candidateBrokers []pb.ServerAddress) (leader pb.ServerAddress, followers []pb.ServerAddress, err error) { + // allocate leader + leader, err = allocateLeaderForOneTopicPartition(t, brokerPartition, candidateBrokers) + if err != nil { + return + } + + // allocate followers + followers, err = allocateFollowersForOneTopicPartition(t, brokerPartition, candidateBrokers) + if err != nil { + return + } + + return +} + +func allocateFollowersForOneTopicPartition(t topic.Topic, partition *mq_pb.BrokerPartitionsAssignment, brokers []pb.ServerAddress) (followers []pb.ServerAddress, err error) { + return +} + +func allocateLeaderForOneTopicPartition(t topic.Topic, partition *mq_pb.BrokerPartitionsAssignment, brokers []pb.ServerAddress) (leader pb.ServerAddress, err error) { + return +} |
