diff options
Diffstat (limited to 'weed/mq/topic')
| -rw-r--r-- | weed/mq/topic/local_manager.go | 54 | ||||
| -rw-r--r-- | weed/mq/topic/local_partition.go | 40 | ||||
| -rw-r--r-- | weed/mq/topic/local_topic.go | 29 | ||||
| -rw-r--r-- | weed/mq/topic/partition.go | 32 | ||||
| -rw-r--r-- | weed/mq/topic/topic.go | 73 |
5 files changed, 228 insertions, 0 deletions
diff --git a/weed/mq/topic/local_manager.go b/weed/mq/topic/local_manager.go new file mode 100644 index 000000000..168e3d561 --- /dev/null +++ b/weed/mq/topic/local_manager.go @@ -0,0 +1,54 @@ +package topic + +import ( + cmap "github.com/orcaman/concurrent-map/v2" +) + +// LocalTopicManager manages topics on local broker +type LocalTopicManager struct { + topics cmap.ConcurrentMap[string, *LocalTopic] +} + +// NewLocalTopicManager creates a new LocalTopicManager +func NewLocalTopicManager() *LocalTopicManager { + return &LocalTopicManager{ + topics: cmap.New[*LocalTopic](), + } +} + +// AddTopic adds a topic to the local topic manager +func (manager *LocalTopicManager) AddTopicPartition(topic Topic, localPartition *LocalPartition) { + localTopic, ok := manager.topics.Get(topic.String()) + if !ok { + localTopic = &LocalTopic{ + Topic: topic, + Partitions: make([]*LocalPartition, 0), + } + } + if localTopic.findPartition(localPartition.Partition) != nil { + return + } + localTopic.Partitions = append(localTopic.Partitions, localPartition) +} + +// GetTopic gets a topic from the local topic manager +func (manager *LocalTopicManager) GetTopicPartition(topic Topic, partition Partition) *LocalPartition { + localTopic, ok := manager.topics.Get(topic.String()) + if !ok { + return nil + } + return localTopic.findPartition(partition) +} + +// RemoveTopic removes a topic from the local topic manager +func (manager *LocalTopicManager) RemoveTopic(topic Topic) { + manager.topics.Remove(topic.String()) +} + +func (manager *LocalTopicManager) RemoveTopicPartition(topic Topic, partition Partition) (removed bool) { + localTopic, ok := manager.topics.Get(topic.String()) + if !ok { + return false + } + return localTopic.removePartition(partition) +} diff --git a/weed/mq/topic/local_partition.go b/weed/mq/topic/local_partition.go new file mode 100644 index 000000000..e26b7afd1 --- /dev/null +++ b/weed/mq/topic/local_partition.go @@ -0,0 +1,40 @@ +package topic + +import ( + "github.com/seaweedfs/seaweedfs/weed/pb" + "github.com/seaweedfs/seaweedfs/weed/pb/mq_pb" + "github.com/seaweedfs/seaweedfs/weed/util/log_buffer" + "time" +) + +type LocalPartition struct { + Partition + isLeader bool + FollowerBrokers []pb.ServerAddress + logBuffer *log_buffer.LogBuffer +} + +func (p LocalPartition) Publish(message *mq_pb.PublishRequest_DataMessage) { + p.logBuffer.AddToBuffer(message.Key, message.Value, time.Now().UnixNano()) +} + +func FromPbBrokerPartitionsAssignment(self pb.ServerAddress, assignment *mq_pb.BrokerPartitionsAssignment) *LocalPartition { + isLeaer := assignment.LeaderBroker == string(self) + localPartition := &LocalPartition{ + Partition: Partition{ + RangeStart: assignment.PartitionStart, + RangeStop: assignment.PartitionStop, + RingSize: PartitionCount, + }, + isLeader: isLeaer, + } + if !isLeaer { + return localPartition + } + followers := make([]pb.ServerAddress, len(assignment.FollowerBrokers)) + for i, follower := range assignment.FollowerBrokers { + followers[i] = pb.ServerAddress(follower) + } + localPartition.FollowerBrokers = followers + return localPartition +} diff --git a/weed/mq/topic/local_topic.go b/weed/mq/topic/local_topic.go new file mode 100644 index 000000000..ef3c0e65e --- /dev/null +++ b/weed/mq/topic/local_topic.go @@ -0,0 +1,29 @@ +package topic + +type LocalTopic struct { + Topic + Partitions []*LocalPartition +} + +func (localTopic *LocalTopic) findPartition(partition Partition) *LocalPartition { + for _, localPartition := range localTopic.Partitions { + if localPartition.Partition.Equals(partition) { + return localPartition + } + } + return nil +} +func (localTopic *LocalTopic) removePartition(partition Partition) bool { + foundPartitionIndex := -1 + for i, localPartition := range localTopic.Partitions { + if localPartition.Partition.Equals(partition) { + foundPartitionIndex = i + break + } + } + if foundPartitionIndex == -1 { + return false + } + localTopic.Partitions = append(localTopic.Partitions[:foundPartitionIndex], localTopic.Partitions[foundPartitionIndex+1:]...) + return true +} diff --git a/weed/mq/topic/partition.go b/weed/mq/topic/partition.go new file mode 100644 index 000000000..285bdcb36 --- /dev/null +++ b/weed/mq/topic/partition.go @@ -0,0 +1,32 @@ +package topic + +import "github.com/seaweedfs/seaweedfs/weed/pb/mq_pb" + +const PartitionCount = 4096 + +type Partition struct { + RangeStart int32 + RangeStop int32 // exclusive + RingSize int32 +} + +func (partition Partition) Equals(other Partition) bool { + if partition.RangeStart != other.RangeStart { + return false + } + if partition.RangeStop != other.RangeStop { + return false + } + if partition.RingSize != other.RingSize { + return false + } + return true +} + +func FromPbPartition(partition *mq_pb.Partition) Partition { + return Partition{ + RangeStart: partition.RangeStart, + RangeStop: partition.RangeStop, + RingSize: partition.RingSize, + } +} diff --git a/weed/mq/topic/topic.go b/weed/mq/topic/topic.go new file mode 100644 index 000000000..430999179 --- /dev/null +++ b/weed/mq/topic/topic.go @@ -0,0 +1,73 @@ +package topic + +import ( + "fmt" + "github.com/seaweedfs/seaweedfs/weed/filer" + "github.com/seaweedfs/seaweedfs/weed/pb/mq_pb" + "time" +) + +type Namespace string + +type Topic struct { + Namespace Namespace + Name string +} + +func NewTopic(namespace Namespace, name string) Topic { + return Topic{ + Namespace: namespace, + Name: name, + } +} +func FromPbTopic(topic *mq_pb.Topic) Topic { + return Topic{ + Namespace: Namespace(topic.Namespace), + Name: topic.Name, + } +} + +func (tp Topic) String() string { + return fmt.Sprintf("%s.%s", tp.Namespace, tp.Name) +} + +type Segment struct { + Topic Topic + Id int32 + Partition Partition + LastModified time.Time +} + +func FromPbSegment(segment *mq_pb.Segment) *Segment { + return &Segment{ + Topic: Topic{ + Namespace: Namespace(segment.Namespace), + Name: segment.Topic, + }, + Id: segment.Id, + Partition: Partition{ + RangeStart: segment.Partition.RangeStart, + RangeStop: segment.Partition.RangeStop, + RingSize: segment.Partition.RingSize, + }, + } +} + +func (segment *Segment) ToPbSegment() *mq_pb.Segment { + return &mq_pb.Segment{ + Namespace: string(segment.Topic.Namespace), + Topic: segment.Topic.Name, + Id: segment.Id, + Partition: &mq_pb.Partition{ + RingSize: segment.Partition.RingSize, + RangeStart: segment.Partition.RangeStart, + RangeStop: segment.Partition.RangeStop, + }, + } +} + +func (segment *Segment) DirAndName() (dir string, name string) { + dir = fmt.Sprintf("%s/%s/%s", filer.TopicsDir, segment.Topic.Namespace, segment.Topic.Name) + name = fmt.Sprintf("%4d.segment", segment.Id) + return +} |
