aboutsummaryrefslogtreecommitdiff
path: root/weed/mq
diff options
context:
space:
mode:
authorchrislu <chris.lu@gmail.com>2023-08-20 22:53:05 -0700
committerchrislu <chris.lu@gmail.com>2023-08-20 22:53:05 -0700
commit01d70c21f30988bffa37ffdcb6b80f1646293390 (patch)
treeace1aa23be04dc833092e1a33fcf36237b11798f /weed/mq
parent3650e5adda604dc7507ba3f0f63799c4cbfa4dfe (diff)
downloadseaweedfs-01d70c21f30988bffa37ffdcb6b80f1646293390.tar.xz
seaweedfs-01d70c21f30988bffa37ffdcb6b80f1646293390.zip
Squashed commit of the following:
commit 32f4b1a13057d56b6de487cdb80ff7c205af01a6 Author: chrislu <chris.lu@gmail.com> Date: Sun Aug 20 22:52:19 2023 -0700 fix compilation commit e77ad33b7ca0423138fbae26a4433b60923a9588 Author: chrislu <chris.lu@gmail.com> Date: Sun Aug 20 22:46:44 2023 -0700 pub commit f431f30cc7ca277ca299e3cd118c05537fb9f5c3 Author: chrislu <chris.lu@gmail.com> Date: Sun Aug 20 13:27:39 2023 -0700 fix generic type commit 4e9dcb18293fd1e3e306e2dceb995dfd67a35e1d Merge: 30f942580 16e3f2d52 Author: chrislu <chris.lu@gmail.com> Date: Sun Aug 20 12:47:14 2023 -0700 Merge branch 'master' into pubsub commit 30f942580ad1bb32ae94aade2e3a21ec3ab63e21 Author: chrislu <chris.lu@gmail.com> Date: Sun Aug 20 11:10:58 2023 -0700 wip commit f8b00980bc2f3879bb43decffd9a08d842f196f2 Author: chrislu <chris.lu@gmail.com> Date: Tue Jul 25 09:14:35 2023 -0700 add design document commit 08d2bebe42a26ebc39f1542f54d99e73620727dd Author: chrislu <chris.lu@gmail.com> Date: Tue Jul 25 09:14:06 2023 -0700 minor commit bcfa7982b262a40fcdce6fc6613fad2ce07c13da Author: chrislu <chris.lu@gmail.com> Date: Tue Jul 25 09:13:49 2023 -0700 rename
Diffstat (limited to 'weed/mq')
-rw-r--r--weed/mq/broker/brokder_grpc_admin.go107
-rw-r--r--weed/mq/broker/brokder_grpc_pub.go122
-rw-r--r--weed/mq/broker/broker_segment_serde.go14
-rw-r--r--weed/mq/broker/broker_server.go21
-rw-r--r--weed/mq/segment/message_serde_test.go5
-rw-r--r--weed/mq/topic/local_manager.go54
-rw-r--r--weed/mq/topic/local_partition.go40
-rw-r--r--weed/mq/topic/local_topic.go29
-rw-r--r--weed/mq/topic/partition.go32
-rw-r--r--weed/mq/topic/topic.go (renamed from weed/mq/topic.go)21
-rw-r--r--weed/mq/topic_allocation/allocation.go81
11 files changed, 487 insertions, 39 deletions
diff --git a/weed/mq/broker/brokder_grpc_admin.go b/weed/mq/broker/brokder_grpc_admin.go
index 7e669ef9b..5aac780fb 100644
--- a/weed/mq/broker/brokder_grpc_admin.go
+++ b/weed/mq/broker/brokder_grpc_admin.go
@@ -4,7 +4,7 @@ import (
"context"
"github.com/seaweedfs/seaweedfs/weed/cluster"
"github.com/seaweedfs/seaweedfs/weed/glog"
- "github.com/seaweedfs/seaweedfs/weed/mq"
+ "github.com/seaweedfs/seaweedfs/weed/mq/topic"
"github.com/seaweedfs/seaweedfs/weed/pb"
"github.com/seaweedfs/seaweedfs/weed/pb/master_pb"
"github.com/seaweedfs/seaweedfs/weed/pb/mq_pb"
@@ -12,6 +12,10 @@ import (
"sync"
)
+const (
+ MaxPartitionCount = 1024
+)
+
func (broker *MessageQueueBroker) FindBrokerLeader(c context.Context, request *mq_pb.FindBrokerLeaderRequest) (*mq_pb.FindBrokerLeaderResponse, error) {
ret := &mq_pb.FindBrokerLeaderResponse{}
err := broker.withMasterClient(false, broker.MasterClient.GetMaster(), func(client master_pb.SeaweedClient) error {
@@ -31,21 +35,9 @@ func (broker *MessageQueueBroker) FindBrokerLeader(c context.Context, request *m
return ret, err
}
-func (broker *MessageQueueBroker) CheckSegmentStatus(c context.Context, request *mq_pb.CheckSegmentStatusRequest) (*mq_pb.CheckSegmentStatusResponse, error) {
- ret := &mq_pb.CheckSegmentStatusResponse{}
- // TODO add in memory active segment
- return ret, nil
-}
-
-func (broker *MessageQueueBroker) CheckBrokerLoad(c context.Context, request *mq_pb.CheckBrokerLoadRequest) (*mq_pb.CheckBrokerLoadResponse, error) {
- ret := &mq_pb.CheckBrokerLoadResponse{}
- // TODO read broker's load
- return ret, nil
-}
-
func (broker *MessageQueueBroker) AssignSegmentBrokers(c context.Context, request *mq_pb.AssignSegmentBrokersRequest) (*mq_pb.AssignSegmentBrokersResponse, error) {
ret := &mq_pb.AssignSegmentBrokersResponse{}
- segment := mq.FromPbSegment(request.Segment)
+ segment := topic.FromPbSegment(request.Segment)
// check existing segment locations on filer
existingBrokers, err := broker.checkSegmentOnFiler(segment)
@@ -84,7 +76,92 @@ func (broker *MessageQueueBroker) AssignSegmentBrokers(c context.Context, reques
return ret, nil
}
-func (broker *MessageQueueBroker) checkSegmentsOnBrokers(segment *mq.Segment, brokers []pb.ServerAddress) (active bool, err error) {
+func (broker *MessageQueueBroker) CheckSegmentStatus(c context.Context, request *mq_pb.CheckSegmentStatusRequest) (*mq_pb.CheckSegmentStatusResponse, error) {
+ ret := &mq_pb.CheckSegmentStatusResponse{}
+ // TODO add in memory active segment
+ return ret, nil
+}
+
+func (broker *MessageQueueBroker) CheckBrokerLoad(c context.Context, request *mq_pb.CheckBrokerLoadRequest) (*mq_pb.CheckBrokerLoadResponse, error) {
+ ret := &mq_pb.CheckBrokerLoadResponse{}
+ // TODO read broker's load
+ return ret, nil
+}
+
+// FindTopicBrokers returns the brokers that are serving the topic
+//
+// 1. lock the topic
+//
+// 2. find the topic partitions on the filer
+// 2.1 if the topic is not found, return error
+// 2.2 if the request is_for_publish, create the topic
+// 2.2.1 if the request is_for_subscribe, return error not found
+// 2.2.2 if the request is_for_publish, create the topic
+// 2.2 if the topic is found, return the brokers
+//
+// 3. unlock the topic
+func (broker *MessageQueueBroker) FindTopicBrokers(c context.Context, request *mq_pb.FindTopicBrokersRequest) (*mq_pb.FindTopicBrokersResponse, error) {
+ ret := &mq_pb.FindTopicBrokersResponse{}
+ // lock the topic
+
+ // find the topic partitions on the filer
+ // if the topic is not found
+ // if the request is_for_publish
+ // create the topic
+ // if the request is_for_subscribe
+ // return error not found
+ return ret, nil
+}
+
+// CheckTopicPartitionsStatus check the topic partitions on the broker
+func (broker *MessageQueueBroker) CheckTopicPartitionsStatus(c context.Context, request *mq_pb.CheckTopicPartitionsStatusRequest) (*mq_pb.CheckTopicPartitionsStatusResponse, error) {
+ ret := &mq_pb.CheckTopicPartitionsStatusResponse{}
+ return ret, nil
+}
+
+// createOrUpdateTopicPartitions creates the topic partitions on the broker
+// 1. check
+func (broker *MessageQueueBroker) createOrUpdateTopicPartitions(topic *topic.Topic, prevAssignment *mq_pb.TopicPartitionsAssignment) (err error) {
+ // create or update each partition
+ if prevAssignment == nil {
+ broker.createOrUpdateTopicPartition(topic, nil)
+ } else {
+ for _, partitionAssignment := range prevAssignment.BrokerPartitions {
+ broker.createOrUpdateTopicPartition(topic, partitionAssignment)
+ }
+ }
+ return nil
+}
+
+func (broker *MessageQueueBroker) createOrUpdateTopicPartition(topic *topic.Topic, oldAssignment *mq_pb.BrokerPartitionsAssignment) (newAssignment *mq_pb.BrokerPartitionsAssignment) {
+ shouldCreate := broker.confirmBrokerPartitionAssignment(topic, oldAssignment)
+ if !shouldCreate {
+
+ }
+ return
+}
+func (broker *MessageQueueBroker) confirmBrokerPartitionAssignment(topic *topic.Topic, oldAssignment *mq_pb.BrokerPartitionsAssignment) (shouldCreate bool) {
+ if oldAssignment == nil {
+ return true
+ }
+ for _, b := range oldAssignment.FollowerBrokers {
+ pb.WithBrokerClient(false, pb.ServerAddress(b), broker.grpcDialOption, func(client mq_pb.SeaweedMessagingClient) error {
+ _, err := client.CheckTopicPartitionsStatus(context.Background(), &mq_pb.CheckTopicPartitionsStatusRequest{
+ Namespace: string(topic.Namespace),
+ Topic: topic.Name,
+ BrokerPartitionsAssignment: oldAssignment,
+ ShouldCancelIfNotMatch: true,
+ })
+ if err != nil {
+ shouldCreate = true
+ }
+ return nil
+ })
+ }
+ return
+}
+
+func (broker *MessageQueueBroker) checkSegmentsOnBrokers(segment *topic.Segment, brokers []pb.ServerAddress) (active bool, err error) {
var wg sync.WaitGroup
for _, candidate := range brokers {
diff --git a/weed/mq/broker/brokder_grpc_pub.go b/weed/mq/broker/brokder_grpc_pub.go
index a26be5171..58ab6e5d2 100644
--- a/weed/mq/broker/brokder_grpc_pub.go
+++ b/weed/mq/broker/brokder_grpc_pub.go
@@ -1,16 +1,136 @@
package broker
import (
+ "context"
+ "fmt"
+ "github.com/seaweedfs/seaweedfs/weed/glog"
+ "github.com/seaweedfs/seaweedfs/weed/mq/topic"
+ "github.com/seaweedfs/seaweedfs/weed/pb"
"github.com/seaweedfs/seaweedfs/weed/pb/mq_pb"
)
+// For a new or re-configured topic, or one of the broker went offline,
+// the pub clients ask one broker what are the brokers for all the topic partitions.
+// The broker will lock the topic on write.
+// 1. if the topic is not found, create the topic, and allocate the topic partitions to the brokers
+// 2. if the topic is found, return the brokers for the topic partitions
+// For a topic to read from, the sub clients ask one broker what are the brokers for all the topic partitions.
+// The broker will lock the topic on read.
+// 1. if the topic is not found, return error
+// 2. if the topic is found, return the brokers for the topic partitions
+//
+// If the topic needs to be re-balanced, the admin client will lock the topic,
+// 1. collect throughput information for all the brokers
+// 2. adjust the topic partitions to the brokers
+// 3. notify the brokers to add/remove partitions to host
+// 3.1 When locking the topic, the partitions and brokers should be remembered in the lock.
+// 4. the brokers will stop process incoming messages if not the right partition
+// 4.1 the pub clients will need to re-partition the messages and publish to the right brokers for the partition3
+// 4.2 the sub clients will need to change the brokers to read from
+//
+// The following is from each individual component's perspective:
+// For a pub client
+// For current topic/partition, ask one broker for the brokers for the topic partitions
+// 1. connect to the brokers and keep sending, until the broker returns error, or the broker leader is moved.
+// For a sub client
+// For current topic/partition, ask one broker for the brokers for the topic partitions
+// 1. connect to the brokers and keep reading, until the broker returns error, or the broker leader is moved.
+// For a broker
+// Upon a pub client lookup:
+// 1. lock the topic
+// 2. if already has topic partition assignment, check all brokers are healthy
+// 3. if not, create topic partition assignment
+// 2. return the brokers for the topic partitions
+// 3. unlock the topic
+// Upon a sub client lookup:
+// 1. lock the topic
+// 2. if already has topic partition assignment, check all brokers are healthy
+// 3. if not, return error
+// 2. return the brokers for the topic partitions
+// 3. unlock the topic
+// For an admin tool
+// 0. collect stats from all the brokers, and find the topic worth moving
+// 1. lock the topic
+// 2. collect throughput information for all the brokers
+// 3. adjust the topic partitions to the brokers
+// 4. notify the brokers to add/remove partitions to host
+// 5. the brokers will stop process incoming messages if not the right partition
+// 6. unlock the topic
+
/*
-The messages is buffered in memory, and saved to filer under
+The messages are buffered in memory, and saved to filer under
/topics/<topic>/<date>/<hour>/<segment>/*.msg
/topics/<topic>/<date>/<hour>/segment
/topics/<topic>/info/segment_<id>.meta
+
+
+
*/
func (broker *MessageQueueBroker) Publish(stream mq_pb.SeaweedMessaging_PublishServer) error {
+ // 1. write to the volume server
+ // 2. find the topic metadata owning filer
+ // 3. write to the filer
+
+ var localTopicPartition *topic.LocalPartition
+ for {
+ req, err := stream.Recv()
+ if err != nil {
+ return err
+ }
+
+ // Process the received message
+ sequence := req.GetSequence()
+ response := &mq_pb.PublishResponse{
+ AckSequence: sequence,
+ }
+ if dataMessage := req.GetData(); dataMessage != nil {
+ if localTopicPartition == nil {
+ response.Error = "topic partition not initialized"
+ glog.Errorf("topic partition not found")
+ } else {
+ localTopicPartition.Publish(dataMessage)
+ }
+ } else if initMessage := req.GetInit(); initMessage != nil {
+ localTopicPartition = broker.localTopicManager.GetTopicPartition(
+ topic.NewTopic(topic.Namespace(initMessage.Segment.Namespace), initMessage.Segment.Topic),
+ topic.FromPbPartition(initMessage.Segment.Partition),
+ )
+ if localTopicPartition == nil {
+ response.Error = fmt.Sprintf("topic partition %v not found", initMessage.Segment)
+ glog.Errorf("topic partition %v not found", initMessage.Segment)
+ }
+ }
+ if err := stream.Send(response); err != nil {
+ glog.Errorf("Error sending setup response: %v", err)
+ }
+ }
+
return nil
}
+
+// AssignTopicPartitions Runs on the assigned broker, to execute the topic partition assignment
+func (broker *MessageQueueBroker) AssignTopicPartitions(c context.Context, request *mq_pb.AssignTopicPartitionsRequest) (*mq_pb.AssignTopicPartitionsResponse, error) {
+ ret := &mq_pb.AssignTopicPartitionsResponse{}
+ self := pb.ServerAddress(fmt.Sprintf("%s:%d", broker.option.Ip, broker.option.Port))
+
+ for _, partition := range request.TopicPartitionsAssignment.BrokerPartitions {
+ localPartiton := topic.FromPbBrokerPartitionsAssignment(self, partition)
+ broker.localTopicManager.AddTopicPartition(
+ topic.FromPbTopic(request.Topic),
+ localPartiton)
+ if request.IsLeader {
+ for _, follower := range localPartiton.FollowerBrokers {
+ err := pb.WithBrokerClient(false, follower, broker.grpcDialOption, func(client mq_pb.SeaweedMessagingClient) error {
+ _, err := client.AssignTopicPartitions(context.Background(), request)
+ return err
+ })
+ if err != nil {
+ return ret, err
+ }
+ }
+ }
+ }
+
+ return ret, nil
+}
diff --git a/weed/mq/broker/broker_segment_serde.go b/weed/mq/broker/broker_segment_serde.go
index e36867da0..bb9aecc0b 100644
--- a/weed/mq/broker/broker_segment_serde.go
+++ b/weed/mq/broker/broker_segment_serde.go
@@ -4,7 +4,7 @@ import (
"bytes"
"fmt"
"github.com/seaweedfs/seaweedfs/weed/filer"
- "github.com/seaweedfs/seaweedfs/weed/mq"
+ "github.com/seaweedfs/seaweedfs/weed/mq/topic"
"github.com/seaweedfs/seaweedfs/weed/pb"
"github.com/seaweedfs/seaweedfs/weed/pb/filer_pb"
"github.com/seaweedfs/seaweedfs/weed/pb/mq_pb"
@@ -12,8 +12,8 @@ import (
"time"
)
-func (broker *MessageQueueBroker) checkSegmentOnFiler(segment *mq.Segment) (brokers []pb.ServerAddress, err error) {
- info, found, err := broker.readSegmentOnFiler(segment)
+func (broker *MessageQueueBroker) checkSegmentOnFiler(segment *topic.Segment) (brokers []pb.ServerAddress, err error) {
+ info, found, err := broker.readSegmentInfoOnFiler(segment)
if err != nil {
return
}
@@ -27,12 +27,12 @@ func (broker *MessageQueueBroker) checkSegmentOnFiler(segment *mq.Segment) (brok
return
}
-func (broker *MessageQueueBroker) saveSegmentBrokersOnFiler(segment *mq.Segment, brokers []pb.ServerAddress) (err error) {
+func (broker *MessageQueueBroker) saveSegmentBrokersOnFiler(segment *topic.Segment, brokers []pb.ServerAddress) (err error) {
var nodes []string
for _, b := range brokers {
nodes = append(nodes, string(b))
}
- broker.saveSegmentToFiler(segment, &mq_pb.SegmentInfo{
+ broker.saveSegmentInfoToFiler(segment, &mq_pb.SegmentInfo{
Segment: segment.ToPbSegment(),
StartTsNs: time.Now().UnixNano(),
Brokers: nodes,
@@ -43,7 +43,7 @@ func (broker *MessageQueueBroker) saveSegmentBrokersOnFiler(segment *mq.Segment,
return
}
-func (broker *MessageQueueBroker) readSegmentOnFiler(segment *mq.Segment) (info *mq_pb.SegmentInfo, found bool, err error) {
+func (broker *MessageQueueBroker) readSegmentInfoOnFiler(segment *topic.Segment) (info *mq_pb.SegmentInfo, found bool, err error) {
dir, name := segment.DirAndName()
found, err = filer_pb.Exists(broker, dir, name, false)
@@ -70,7 +70,7 @@ func (broker *MessageQueueBroker) readSegmentOnFiler(segment *mq.Segment) (info
return
}
-func (broker *MessageQueueBroker) saveSegmentToFiler(segment *mq.Segment, info *mq_pb.SegmentInfo) (err error) {
+func (broker *MessageQueueBroker) saveSegmentInfoToFiler(segment *topic.Segment, info *mq_pb.SegmentInfo) (err error) {
dir, name := segment.DirAndName()
var buf bytes.Buffer
diff --git a/weed/mq/broker/broker_server.go b/weed/mq/broker/broker_server.go
index 7ec7fb431..4f5b3c28d 100644
--- a/weed/mq/broker/broker_server.go
+++ b/weed/mq/broker/broker_server.go
@@ -1,6 +1,7 @@
package broker
import (
+ "github.com/seaweedfs/seaweedfs/weed/mq/topic"
"time"
"github.com/seaweedfs/seaweedfs/weed/cluster"
@@ -27,20 +28,22 @@ type MessageQueueBrokerOption struct {
type MessageQueueBroker struct {
mq_pb.UnimplementedSeaweedMessagingServer
- option *MessageQueueBrokerOption
- grpcDialOption grpc.DialOption
- MasterClient *wdclient.MasterClient
- filers map[pb.ServerAddress]struct{}
- currentFiler pb.ServerAddress
+ option *MessageQueueBrokerOption
+ grpcDialOption grpc.DialOption
+ MasterClient *wdclient.MasterClient
+ filers map[pb.ServerAddress]struct{}
+ currentFiler pb.ServerAddress
+ localTopicManager *topic.LocalTopicManager
}
func NewMessageBroker(option *MessageQueueBrokerOption, grpcDialOption grpc.DialOption) (mqBroker *MessageQueueBroker, err error) {
mqBroker = &MessageQueueBroker{
- option: option,
- grpcDialOption: grpcDialOption,
- MasterClient: wdclient.NewMasterClient(grpcDialOption, option.FilerGroup, cluster.BrokerType, pb.NewServerAddress(option.Ip, option.Port, 0), option.DataCenter, option.Rack, option.Masters),
- filers: make(map[pb.ServerAddress]struct{}),
+ option: option,
+ grpcDialOption: grpcDialOption,
+ MasterClient: wdclient.NewMasterClient(grpcDialOption, option.FilerGroup, cluster.BrokerType, pb.NewServerAddress(option.Ip, option.Port, 0), option.DataCenter, option.Rack, option.Masters),
+ filers: make(map[pb.ServerAddress]struct{}),
+ localTopicManager: topic.NewLocalTopicManager(),
}
mqBroker.MasterClient.SetOnPeerUpdateFn(mqBroker.OnBrokerUpdate)
diff --git a/weed/mq/segment/message_serde_test.go b/weed/mq/segment/message_serde_test.go
index c65bffb84..a54ce5708 100644
--- a/weed/mq/segment/message_serde_test.go
+++ b/weed/mq/segment/message_serde_test.go
@@ -17,6 +17,7 @@ func TestMessageSerde(t *testing.T) {
bb := NewMessageBatchBuilder(b, 1, 2, 3, 4)
bb.AddMessage(5, 6, prop, []byte("the primary key"), []byte("body is here"))
+ bb.AddMessage(5, 7, prop, []byte("the primary 2"), []byte("body is 2"))
bb.BuildMessageBatch()
@@ -33,9 +34,9 @@ func TestMessageSerde(t *testing.T) {
assert.Equal(t, int64(5), mb.SegmentSeqBase())
assert.Equal(t, int32(0), mb.SegmentSeqMaxDelta())
assert.Equal(t, int64(6), mb.TsMsBase())
- assert.Equal(t, int32(0), mb.TsMsMaxDelta())
+ assert.Equal(t, int32(1), mb.TsMsMaxDelta())
- assert.Equal(t, 1, mb.MessagesLength())
+ assert.Equal(t, 2, mb.MessagesLength())
m := &message_fbs.Message{}
mb.Messages(m, 0)
diff --git a/weed/mq/topic/local_manager.go b/weed/mq/topic/local_manager.go
new file mode 100644
index 000000000..168e3d561
--- /dev/null
+++ b/weed/mq/topic/local_manager.go
@@ -0,0 +1,54 @@
+package topic
+
+import (
+ cmap "github.com/orcaman/concurrent-map/v2"
+)
+
+// LocalTopicManager manages topics on local broker
+type LocalTopicManager struct {
+ topics cmap.ConcurrentMap[string, *LocalTopic]
+}
+
+// NewLocalTopicManager creates a new LocalTopicManager
+func NewLocalTopicManager() *LocalTopicManager {
+ return &LocalTopicManager{
+ topics: cmap.New[*LocalTopic](),
+ }
+}
+
+// AddTopic adds a topic to the local topic manager
+func (manager *LocalTopicManager) AddTopicPartition(topic Topic, localPartition *LocalPartition) {
+ localTopic, ok := manager.topics.Get(topic.String())
+ if !ok {
+ localTopic = &LocalTopic{
+ Topic: topic,
+ Partitions: make([]*LocalPartition, 0),
+ }
+ }
+ if localTopic.findPartition(localPartition.Partition) != nil {
+ return
+ }
+ localTopic.Partitions = append(localTopic.Partitions, localPartition)
+}
+
+// GetTopic gets a topic from the local topic manager
+func (manager *LocalTopicManager) GetTopicPartition(topic Topic, partition Partition) *LocalPartition {
+ localTopic, ok := manager.topics.Get(topic.String())
+ if !ok {
+ return nil
+ }
+ return localTopic.findPartition(partition)
+}
+
+// RemoveTopic removes a topic from the local topic manager
+func (manager *LocalTopicManager) RemoveTopic(topic Topic) {
+ manager.topics.Remove(topic.String())
+}
+
+func (manager *LocalTopicManager) RemoveTopicPartition(topic Topic, partition Partition) (removed bool) {
+ localTopic, ok := manager.topics.Get(topic.String())
+ if !ok {
+ return false
+ }
+ return localTopic.removePartition(partition)
+}
diff --git a/weed/mq/topic/local_partition.go b/weed/mq/topic/local_partition.go
new file mode 100644
index 000000000..e26b7afd1
--- /dev/null
+++ b/weed/mq/topic/local_partition.go
@@ -0,0 +1,40 @@
+package topic
+
+import (
+ "github.com/seaweedfs/seaweedfs/weed/pb"
+ "github.com/seaweedfs/seaweedfs/weed/pb/mq_pb"
+ "github.com/seaweedfs/seaweedfs/weed/util/log_buffer"
+ "time"
+)
+
+type LocalPartition struct {
+ Partition
+ isLeader bool
+ FollowerBrokers []pb.ServerAddress
+ logBuffer *log_buffer.LogBuffer
+}
+
+func (p LocalPartition) Publish(message *mq_pb.PublishRequest_DataMessage) {
+ p.logBuffer.AddToBuffer(message.Key, message.Value, time.Now().UnixNano())
+}
+
+func FromPbBrokerPartitionsAssignment(self pb.ServerAddress, assignment *mq_pb.BrokerPartitionsAssignment) *LocalPartition {
+ isLeaer := assignment.LeaderBroker == string(self)
+ localPartition := &LocalPartition{
+ Partition: Partition{
+ RangeStart: assignment.PartitionStart,
+ RangeStop: assignment.PartitionStop,
+ RingSize: PartitionCount,
+ },
+ isLeader: isLeaer,
+ }
+ if !isLeaer {
+ return localPartition
+ }
+ followers := make([]pb.ServerAddress, len(assignment.FollowerBrokers))
+ for i, follower := range assignment.FollowerBrokers {
+ followers[i] = pb.ServerAddress(follower)
+ }
+ localPartition.FollowerBrokers = followers
+ return localPartition
+}
diff --git a/weed/mq/topic/local_topic.go b/weed/mq/topic/local_topic.go
new file mode 100644
index 000000000..ef3c0e65e
--- /dev/null
+++ b/weed/mq/topic/local_topic.go
@@ -0,0 +1,29 @@
+package topic
+
+type LocalTopic struct {
+ Topic
+ Partitions []*LocalPartition
+}
+
+func (localTopic *LocalTopic) findPartition(partition Partition) *LocalPartition {
+ for _, localPartition := range localTopic.Partitions {
+ if localPartition.Partition.Equals(partition) {
+ return localPartition
+ }
+ }
+ return nil
+}
+func (localTopic *LocalTopic) removePartition(partition Partition) bool {
+ foundPartitionIndex := -1
+ for i, localPartition := range localTopic.Partitions {
+ if localPartition.Partition.Equals(partition) {
+ foundPartitionIndex = i
+ break
+ }
+ }
+ if foundPartitionIndex == -1 {
+ return false
+ }
+ localTopic.Partitions = append(localTopic.Partitions[:foundPartitionIndex], localTopic.Partitions[foundPartitionIndex+1:]...)
+ return true
+}
diff --git a/weed/mq/topic/partition.go b/weed/mq/topic/partition.go
new file mode 100644
index 000000000..285bdcb36
--- /dev/null
+++ b/weed/mq/topic/partition.go
@@ -0,0 +1,32 @@
+package topic
+
+import "github.com/seaweedfs/seaweedfs/weed/pb/mq_pb"
+
+const PartitionCount = 4096
+
+type Partition struct {
+ RangeStart int32
+ RangeStop int32 // exclusive
+ RingSize int32
+}
+
+func (partition Partition) Equals(other Partition) bool {
+ if partition.RangeStart != other.RangeStart {
+ return false
+ }
+ if partition.RangeStop != other.RangeStop {
+ return false
+ }
+ if partition.RingSize != other.RingSize {
+ return false
+ }
+ return true
+}
+
+func FromPbPartition(partition *mq_pb.Partition) Partition {
+ return Partition{
+ RangeStart: partition.RangeStart,
+ RangeStop: partition.RangeStop,
+ RingSize: partition.RingSize,
+ }
+}
diff --git a/weed/mq/topic.go b/weed/mq/topic/topic.go
index 96544bac9..430999179 100644
--- a/weed/mq/topic.go
+++ b/weed/mq/topic/topic.go
@@ -1,4 +1,4 @@
-package mq
+package topic
import (
"fmt"
@@ -14,10 +14,21 @@ type Topic struct {
Name string
}
-type Partition struct {
- RangeStart int32
- RangeStop int32 // exclusive
- RingSize int32
+func NewTopic(namespace Namespace, name string) Topic {
+ return Topic{
+ Namespace: namespace,
+ Name: name,
+ }
+}
+func FromPbTopic(topic *mq_pb.Topic) Topic {
+ return Topic{
+ Namespace: Namespace(topic.Namespace),
+ Name: topic.Name,
+ }
+}
+
+func (tp Topic) String() string {
+ return fmt.Sprintf("%s.%s", tp.Namespace, tp.Name)
}
type Segment struct {
diff --git a/weed/mq/topic_allocation/allocation.go b/weed/mq/topic_allocation/allocation.go
new file mode 100644
index 000000000..a07ce4884
--- /dev/null
+++ b/weed/mq/topic_allocation/allocation.go
@@ -0,0 +1,81 @@
+package topic_allocation
+
+import (
+ "github.com/seaweedfs/seaweedfs/weed/mq/topic"
+ "github.com/seaweedfs/seaweedfs/weed/pb"
+ "github.com/seaweedfs/seaweedfs/weed/pb/mq_pb"
+ "modernc.org/mathutil"
+)
+
+const (
+ DefaultBrokerCount = 4
+)
+
+// AllocateBrokersForTopicPartitions allocate brokers for a topic's all partitions
+func AllocateBrokersForTopicPartitions(t topic.Topic, prevAssignment *mq_pb.TopicPartitionsAssignment, candidateBrokers []pb.ServerAddress) (assignment *mq_pb.TopicPartitionsAssignment, err error) {
+ // create a previous assignment if not exists
+ if prevAssignment == nil || len(prevAssignment.BrokerPartitions) == 0 {
+ prevAssignment = &mq_pb.TopicPartitionsAssignment{
+ PartitionCount: topic.PartitionCount,
+ }
+ partitionCountForEachBroker := topic.PartitionCount / DefaultBrokerCount
+ for i := 0; i < DefaultBrokerCount; i++ {
+ prevAssignment.BrokerPartitions = append(prevAssignment.BrokerPartitions, &mq_pb.BrokerPartitionsAssignment{
+ PartitionStart: int32(i * partitionCountForEachBroker),
+ PartitionStop: mathutil.MaxInt32(int32((i+1)*partitionCountForEachBroker), topic.PartitionCount),
+ })
+ }
+ }
+
+ // create a new assignment
+ assignment = &mq_pb.TopicPartitionsAssignment{
+ PartitionCount: prevAssignment.PartitionCount,
+ }
+
+ // allocate partitions for each partition range
+ for _, brokerPartition := range prevAssignment.BrokerPartitions {
+ // allocate partitions for each partition range
+ leader, followers, err := allocateBrokersForOneTopicPartition(t, brokerPartition, candidateBrokers)
+ if err != nil {
+ return nil, err
+ }
+
+ followerBrokers := make([]string, len(followers))
+ for i, follower := range followers {
+ followerBrokers[i] = string(follower)
+ }
+
+ assignment.BrokerPartitions = append(assignment.BrokerPartitions, &mq_pb.BrokerPartitionsAssignment{
+ PartitionStart: brokerPartition.PartitionStart,
+ PartitionStop: brokerPartition.PartitionStop,
+ LeaderBroker: string(leader),
+ FollowerBrokers: followerBrokers,
+ })
+ }
+
+ return
+}
+
+func allocateBrokersForOneTopicPartition(t topic.Topic, brokerPartition *mq_pb.BrokerPartitionsAssignment, candidateBrokers []pb.ServerAddress) (leader pb.ServerAddress, followers []pb.ServerAddress, err error) {
+ // allocate leader
+ leader, err = allocateLeaderForOneTopicPartition(t, brokerPartition, candidateBrokers)
+ if err != nil {
+ return
+ }
+
+ // allocate followers
+ followers, err = allocateFollowersForOneTopicPartition(t, brokerPartition, candidateBrokers)
+ if err != nil {
+ return
+ }
+
+ return
+}
+
+func allocateFollowersForOneTopicPartition(t topic.Topic, partition *mq_pb.BrokerPartitionsAssignment, brokers []pb.ServerAddress) (followers []pb.ServerAddress, err error) {
+ return
+}
+
+func allocateLeaderForOneTopicPartition(t topic.Topic, partition *mq_pb.BrokerPartitionsAssignment, brokers []pb.ServerAddress) (leader pb.ServerAddress, err error) {
+ return
+}