aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorchrislu <chris.lu@gmail.com>2024-08-11 13:06:01 -0700
committerchrislu <chris.lu@gmail.com>2024-08-11 13:06:01 -0700
commit270e91b0beb0fce59946cbbf10d80e58612ebec7 (patch)
tree5b8d9731c9b304d131780ac02deca5d304997cc2
parentb6fd1ff4ce3697d89e862a2bda8e2fcaba316fb5 (diff)
downloadseaweedfs-270e91b0beb0fce59946cbbf10d80e58612ebec7.tar.xz
seaweedfs-270e91b0beb0fce59946cbbf10d80e58612ebec7.zip
adds locking
-rw-r--r--weed/mq/broker/broker_grpc_pub.go3
-rw-r--r--weed/mq/topic/local_manager.go2
-rw-r--r--weed/mq/topic/local_topic.go17
3 files changed, 15 insertions, 7 deletions
diff --git a/weed/mq/broker/broker_grpc_pub.go b/weed/mq/broker/broker_grpc_pub.go
index d633a3efa..f31dc7eff 100644
--- a/weed/mq/broker/broker_grpc_pub.go
+++ b/weed/mq/broker/broker_grpc_pub.go
@@ -36,9 +36,6 @@ import (
// Each subscription may not get data. It can act as a backup.
func (b *MessageQueueBroker) PublishMessage(stream mq_pb.SeaweedMessaging_PublishMessageServer) error {
- // 1. write to the volume server
- // 2. find the topic metadata owning filer
- // 3. write to the filer
req, err := stream.Recv()
if err != nil {
diff --git a/weed/mq/topic/local_manager.go b/weed/mq/topic/local_manager.go
index 91081c602..d87eff911 100644
--- a/weed/mq/topic/local_manager.go
+++ b/weed/mq/topic/local_manager.go
@@ -28,7 +28,7 @@ func (manager *LocalTopicManager) AddLocalPartition(topic Topic, localPartition
if !manager.topics.SetIfAbsent(topic.String(), localTopic) {
localTopic, _ = manager.topics.Get(topic.String())
}
- localTopic.AddPartition(localPartition)
+ localTopic.addPartition(localPartition)
}
// GetLocalPartition gets a topic from the local topic manager
diff --git a/weed/mq/topic/local_topic.go b/weed/mq/topic/local_topic.go
index 35e313742..25ae03df6 100644
--- a/weed/mq/topic/local_topic.go
+++ b/weed/mq/topic/local_topic.go
@@ -5,6 +5,7 @@ import "sync"
type LocalTopic struct {
Topic
Partitions []*LocalPartition
+ partitionLock sync.RWMutex
}
func NewLocalTopic(topic Topic) *LocalTopic {
@@ -15,6 +16,9 @@ func NewLocalTopic(topic Topic) *LocalTopic {
}
func (localTopic *LocalTopic) findPartition(partition Partition) *LocalPartition {
+ localTopic.partitionLock.RLock()
+ defer localTopic.partitionLock.RUnlock()
+
for _, localPartition := range localTopic.Partitions {
if localPartition.Partition.Equals(partition) {
return localPartition
@@ -23,6 +27,9 @@ func (localTopic *LocalTopic) findPartition(partition Partition) *LocalPartition
return nil
}
func (localTopic *LocalTopic) removePartition(partition Partition) bool {
+ localTopic.partitionLock.Lock()
+ defer localTopic.partitionLock.Unlock()
+
foundPartitionIndex := -1
for i, localPartition := range localTopic.Partitions {
if localPartition.Partition.Equals(partition) {
@@ -37,9 +44,13 @@ func (localTopic *LocalTopic) removePartition(partition Partition) bool {
localTopic.Partitions = append(localTopic.Partitions[:foundPartitionIndex], localTopic.Partitions[foundPartitionIndex+1:]...)
return true
}
-func (localTopic *LocalTopic) AddPartition(localPartition *LocalPartition) {
- if localTopic.findPartition(localPartition.Partition) != nil {
- return
+func (localTopic *LocalTopic) addPartition(localPartition *LocalPartition) {
+ localTopic.partitionLock.Lock()
+ defer localTopic.partitionLock.Unlock()
+ for _, partition := range localTopic.Partitions {
+ if localPartition.Partition.Equals(partition.Partition) {
+ return
+ }
}
localTopic.Partitions = append(localTopic.Partitions, localPartition)
}