aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorchrislu <chris.lu@gmail.com>2023-10-02 01:01:45 -0700
committerchrislu <chris.lu@gmail.com>2023-10-02 01:01:45 -0700
commit734178093e1e5d86dd6bd81ac3abe9b9f814ea36 (patch)
treeae88c489924fb3ff0ef41660b2ac95a5e2facab3
parent2a578b9033b2de12aeb49ae88a694d2510085665 (diff)
downloadseaweedfs-734178093e1e5d86dd6bd81ac3abe9b9f814ea36.tar.xz
seaweedfs-734178093e1e5d86dd6bd81ac3abe9b9f814ea36.zip
refactor TopicPartition struct
-rw-r--r--weed/mq/balancer/balancer.go12
-rw-r--r--weed/mq/balancer/lookup.go2
-rw-r--r--weed/mq/broker/broker_grpc_lookup.go2
-rw-r--r--weed/mq/topic/local_manager.go10
-rw-r--r--weed/mq/topic/topic.go10
-rw-r--r--weed/mq/topic/topic_partition.go6
6 files changed, 18 insertions, 24 deletions
diff --git a/weed/mq/balancer/balancer.go b/weed/mq/balancer/balancer.go
index 258b29f2f..d93cc8de8 100644
--- a/weed/mq/balancer/balancer.go
+++ b/weed/mq/balancer/balancer.go
@@ -49,10 +49,8 @@ func (bs *BrokerStats) UpdateStats(stats *mq_pb.BrokerStats) {
for _, topicPartitionStats := range stats.Stats {
tps := &TopicPartitionStats{
TopicPartition: topic.TopicPartition{
- Namespace: topicPartitionStats.Topic.Namespace,
- Topic: topicPartitionStats.Topic.Name,
- RangeStart: topicPartitionStats.Partition.RangeStart,
- RangeStop: topicPartitionStats.Partition.RangeStop,
+ Topic: topic.Topic{Namespace: topicPartitionStats.Topic.Namespace, Name: topicPartitionStats.Topic.Name},
+ Partition: topic.Partition{RangeStart: topicPartitionStats.Partition.RangeStart, RangeStop: topicPartitionStats.Partition.RangeStop, RingSize: topicPartitionStats.Partition.RingSize},
},
ConsumerCount: topicPartitionStats.ConsumerCount,
IsLeader: topicPartitionStats.IsLeader,
@@ -73,10 +71,8 @@ func (bs *BrokerStats) UpdateStats(stats *mq_pb.BrokerStats) {
func (bs *BrokerStats) RegisterAssignment(t *mq_pb.Topic, partition *mq_pb.Partition) {
tps := &TopicPartitionStats{
TopicPartition: topic.TopicPartition{
- Namespace: t.Namespace,
- Topic: t.Name,
- RangeStart: partition.RangeStart,
- RangeStop: partition.RangeStop,
+ Topic: topic.Topic{Namespace: t.Namespace, Name: t.Name},
+ Partition: topic.Partition{RangeStart: partition.RangeStart, RangeStop: partition.RangeStop},
},
ConsumerCount: 0,
IsLeader: true,
diff --git a/weed/mq/balancer/lookup.go b/weed/mq/balancer/lookup.go
index 7362fbab7..d5b78fc45 100644
--- a/weed/mq/balancer/lookup.go
+++ b/weed/mq/balancer/lookup.go
@@ -16,7 +16,7 @@ func (b *Balancer) LookupOrAllocateTopicPartitions(topic *mq_pb.Topic, publish b
for topicPartitionStatsItem := range brokerStats.Stats.IterBuffered() {
topicPartitionStat := topicPartitionStatsItem.Val
if topicPartitionStat.TopicPartition.Namespace == topic.Namespace &&
- topicPartitionStat.TopicPartition.Topic == topic.Name {
+ topicPartitionStat.TopicPartition.Name == topic.Name {
assignment := &mq_pb.BrokerPartitionAssignment{
Partition: &mq_pb.Partition{
RingSize: MaxPartitionCount,
diff --git a/weed/mq/broker/broker_grpc_lookup.go b/weed/mq/broker/broker_grpc_lookup.go
index 93586e22d..74a3a9822 100644
--- a/weed/mq/broker/broker_grpc_lookup.go
+++ b/weed/mq/broker/broker_grpc_lookup.go
@@ -70,7 +70,7 @@ func (broker *MessageQueueBroker) ListTopics(ctx context.Context, request *mq_pb
topicPartitionStat := topicPartitionStatsItem.Val
topic := &mq_pb.Topic{
Namespace: topicPartitionStat.TopicPartition.Namespace,
- Name: topicPartitionStat.TopicPartition.Topic,
+ Name: topicPartitionStat.TopicPartition.Name,
}
topicKey := fmt.Sprintf("%s/%s", topic.Namespace, topic.Name)
if _, found := knownTopics[topicKey]; found {
diff --git a/weed/mq/topic/local_manager.go b/weed/mq/topic/local_manager.go
index e3ee46a1e..0c54f2bb1 100644
--- a/weed/mq/topic/local_manager.go
+++ b/weed/mq/topic/local_manager.go
@@ -75,10 +75,12 @@ func (manager *LocalTopicManager) CollectStats(duration time.Duration) *mq_pb.Br
manager.topics.IterCb(func(topic string, localTopic *LocalTopic) {
for _, localPartition := range localTopic.Partitions {
topicPartition := &TopicPartition{
- Namespace: string(localTopic.Namespace),
- Topic: localTopic.Name,
- RangeStart: localPartition.RangeStart,
- RangeStop: localPartition.RangeStop,
+ Topic: Topic{Namespace: localTopic.Namespace, Name: localTopic.Name},
+ Partition: Partition{
+ RingSize: localPartition.RingSize,
+ RangeStart: localPartition.RangeStart,
+ RangeStop: localPartition.RangeStop,
+ },
}
stats.Stats[topicPartition.String()] = &mq_pb.TopicPartitionStats{
Topic: &mq_pb.Topic{
diff --git a/weed/mq/topic/topic.go b/weed/mq/topic/topic.go
index 430999179..3d457e6f1 100644
--- a/weed/mq/topic/topic.go
+++ b/weed/mq/topic/topic.go
@@ -7,14 +7,12 @@ import (
"time"
)
-type Namespace string
-
type Topic struct {
- Namespace Namespace
+ Namespace string
Name string
}
-func NewTopic(namespace Namespace, name string) Topic {
+func NewTopic(namespace string, name string) Topic {
return Topic{
Namespace: namespace,
Name: name,
@@ -22,7 +20,7 @@ func NewTopic(namespace Namespace, name string) Topic {
}
func FromPbTopic(topic *mq_pb.Topic) Topic {
return Topic{
- Namespace: Namespace(topic.Namespace),
+ Namespace: topic.Namespace,
Name: topic.Name,
}
}
@@ -41,7 +39,7 @@ type Segment struct {
func FromPbSegment(segment *mq_pb.Segment) *Segment {
return &Segment{
Topic: Topic{
- Namespace: Namespace(segment.Namespace),
+ Namespace: segment.Namespace,
Name: segment.Topic,
},
Id: segment.Id,
diff --git a/weed/mq/topic/topic_partition.go b/weed/mq/topic/topic_partition.go
index 3d927b1d8..20b33a7e4 100644
--- a/weed/mq/topic/topic_partition.go
+++ b/weed/mq/topic/topic_partition.go
@@ -3,10 +3,8 @@ package topic
import "fmt"
type TopicPartition struct {
- Namespace string
- Topic string
- RangeStart int32
- RangeStop int32
+ Topic
+ Partition
}
func (tp *TopicPartition) String() string {