aboutsummaryrefslogtreecommitdiff
path: root/weed/mq/topic/local_partition.go
diff options
context:
space:
mode:
Diffstat (limited to 'weed/mq/topic/local_partition.go')
-rw-r--r--weed/mq/topic/local_partition.go40
1 files changed, 40 insertions, 0 deletions
diff --git a/weed/mq/topic/local_partition.go b/weed/mq/topic/local_partition.go
new file mode 100644
index 000000000..e26b7afd1
--- /dev/null
+++ b/weed/mq/topic/local_partition.go
@@ -0,0 +1,40 @@
+package topic
+
+import (
+ "github.com/seaweedfs/seaweedfs/weed/pb"
+ "github.com/seaweedfs/seaweedfs/weed/pb/mq_pb"
+ "github.com/seaweedfs/seaweedfs/weed/util/log_buffer"
+ "time"
+)
+
+type LocalPartition struct {
+ Partition
+ isLeader bool
+ FollowerBrokers []pb.ServerAddress
+ logBuffer *log_buffer.LogBuffer
+}
+
+func (p LocalPartition) Publish(message *mq_pb.PublishRequest_DataMessage) {
+ p.logBuffer.AddToBuffer(message.Key, message.Value, time.Now().UnixNano())
+}
+
+func FromPbBrokerPartitionsAssignment(self pb.ServerAddress, assignment *mq_pb.BrokerPartitionsAssignment) *LocalPartition {
+ isLeaer := assignment.LeaderBroker == string(self)
+ localPartition := &LocalPartition{
+ Partition: Partition{
+ RangeStart: assignment.PartitionStart,
+ RangeStop: assignment.PartitionStop,
+ RingSize: PartitionCount,
+ },
+ isLeader: isLeaer,
+ }
+ if !isLeaer {
+ return localPartition
+ }
+ followers := make([]pb.ServerAddress, len(assignment.FollowerBrokers))
+ for i, follower := range assignment.FollowerBrokers {
+ followers[i] = pb.ServerAddress(follower)
+ }
+ localPartition.FollowerBrokers = followers
+ return localPartition
+}