aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorchrislu <chris.lu@gmail.com>2024-08-10 12:39:16 -0700
committerchrislu <chris.lu@gmail.com>2024-08-10 12:39:16 -0700
commitb6fd1ff4ce3697d89e862a2bda8e2fcaba316fb5 (patch)
tree19c130bc80de836cb4d026ad9b0e2fd7ec7596f4
parent7438648d1cfacd5ca570dd029d1bdb5fd271bd70 (diff)
downloadseaweedfs-b6fd1ff4ce3697d89e862a2bda8e2fcaba316fb5.tar.xz
seaweedfs-b6fd1ff4ce3697d89e862a2bda8e2fcaba316fb5.zip
refactor
-rw-r--r--weed/mq/topic/local_manager.go5
-rw-r--r--weed/mq/topic/local_topic.go6
2 files changed, 7 insertions, 4 deletions
diff --git a/weed/mq/topic/local_manager.go b/weed/mq/topic/local_manager.go
index 44e629fc1..91081c602 100644
--- a/weed/mq/topic/local_manager.go
+++ b/weed/mq/topic/local_manager.go
@@ -28,10 +28,7 @@ func (manager *LocalTopicManager) AddLocalPartition(topic Topic, localPartition
if !manager.topics.SetIfAbsent(topic.String(), localTopic) {
localTopic, _ = manager.topics.Get(topic.String())
}
- if localTopic.findPartition(localPartition.Partition) != nil {
- return
- }
- localTopic.Partitions = append(localTopic.Partitions, localPartition)
+ localTopic.AddPartition(localPartition)
}
// GetLocalPartition gets a topic from the local topic manager
diff --git a/weed/mq/topic/local_topic.go b/weed/mq/topic/local_topic.go
index 8ab2a0db5..35e313742 100644
--- a/weed/mq/topic/local_topic.go
+++ b/weed/mq/topic/local_topic.go
@@ -37,6 +37,12 @@ func (localTopic *LocalTopic) removePartition(partition Partition) bool {
localTopic.Partitions = append(localTopic.Partitions[:foundPartitionIndex], localTopic.Partitions[foundPartitionIndex+1:]...)
return true
}
+func (localTopic *LocalTopic) AddPartition(localPartition *LocalPartition) {
+ if localTopic.findPartition(localPartition.Partition) != nil {
+ return
+ }
+ localTopic.Partitions = append(localTopic.Partitions, localPartition)
+}
func (localTopic *LocalTopic) closePartitionPublishers(unixTsNs int64) bool {
var wg sync.WaitGroup