aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorchrislu <chris.lu@gmail.com>2024-01-16 08:43:07 -0800
committerchrislu <chris.lu@gmail.com>2024-01-16 08:43:07 -0800
commit3795d8dca8b1548263376057173aaac310e9fdd3 (patch)
treed7d784e87a9a28031f5b5bba77294a6f815794fb
parentf782165638c0787803e1d395bb2f6a9285ef41be (diff)
downloadseaweedfs-3795d8dca8b1548263376057173aaac310e9fdd3.tar.xz
seaweedfs-3795d8dca8b1548263376057173aaac310e9fdd3.zip
release local topic partition if no publisher and subscribers
-rw-r--r--weed/mq/broker/broker_grpc_pub.go7
-rw-r--r--weed/mq/broker/broker_grpc_sub.go3
-rw-r--r--weed/mq/topic/local_partition.go8
-rw-r--r--weed/mq/topic/local_partition_subscribers.go7
4 files changed, 24 insertions, 1 deletions
diff --git a/weed/mq/broker/broker_grpc_pub.go b/weed/mq/broker/broker_grpc_pub.go
index 45a573633..e0e138ef2 100644
--- a/weed/mq/broker/broker_grpc_pub.go
+++ b/weed/mq/broker/broker_grpc_pub.go
@@ -48,8 +48,10 @@ func (b *MessageQueueBroker) PublishMessage(stream mq_pb.SeaweedMessaging_Publis
// TODO check whether current broker should be the leader for the topic partition
ackInterval := 1
initMessage := req.GetInit()
+ var t topic.Topic
+ var p topic.Partition
if initMessage != nil {
- t, p := topic.FromPbTopic(initMessage.Topic), topic.FromPbPartition(initMessage.Partition)
+ t, p = topic.FromPbTopic(initMessage.Topic), topic.FromPbPartition(initMessage.Partition)
localTopicPartition = b.localTopicManager.GetTopicPartition(t, p)
if localTopicPartition == nil {
response.Error = fmt.Sprintf("topic %v partition %v not setup", initMessage.Topic, initMessage.Partition)
@@ -75,6 +77,9 @@ func (b *MessageQueueBroker) PublishMessage(stream mq_pb.SeaweedMessaging_Publis
atomic.StoreInt32(&isStopping, 1)
close(respChan)
localTopicPartition.Publishers.RemovePublisher(clientName)
+ if localTopicPartition.MaybeShutdownLocalPartition() {
+ b.localTopicManager.RemoveTopicPartition(t, p)
+ }
}()
go func() {
ticker := time.NewTicker(1 * time.Second)
diff --git a/weed/mq/broker/broker_grpc_sub.go b/weed/mq/broker/broker_grpc_sub.go
index 5ab47b61f..d6114ad23 100644
--- a/weed/mq/broker/broker_grpc_sub.go
+++ b/weed/mq/broker/broker_grpc_sub.go
@@ -58,6 +58,9 @@ func (b *MessageQueueBroker) SubscribeMessage(req *mq_pb.SubscribeMessageRequest
isConnected = false
localTopicPartition.Subscribers.RemoveSubscriber(clientName)
glog.V(0).Infof("Subscriber %s on %v %v disconnected, sent %d", clientName, t, partition, counter)
+ if localTopicPartition.MaybeShutdownLocalPartition() {
+ b.localTopicManager.RemoveTopicPartition(t, partition)
+ }
}()
var startPosition log_buffer.MessagePosition
diff --git a/weed/mq/topic/local_partition.go b/weed/mq/topic/local_partition.go
index f4a080f38..9b7281b65 100644
--- a/weed/mq/topic/local_partition.go
+++ b/weed/mq/topic/local_partition.go
@@ -109,3 +109,11 @@ func (p *LocalPartition) WaitUntilNoPublishers() {
time.Sleep(113 * time.Millisecond)
}
}
+
+func (p *LocalPartition) MaybeShutdownLocalPartition() (hasShutdown bool) {
+ if p.Publishers.IsEmpty() && p.Subscribers.IsEmpty() {
+ p.logBuffer.ShutdownLogBuffer()
+ hasShutdown = true
+ }
+ return
+}
diff --git a/weed/mq/topic/local_partition_subscribers.go b/weed/mq/topic/local_partition_subscribers.go
index e177ec7e8..caadff278 100644
--- a/weed/mq/topic/local_partition_subscribers.go
+++ b/weed/mq/topic/local_partition_subscribers.go
@@ -47,3 +47,10 @@ func (p *LocalPartitionSubscribers) SignalShutdown() {
Subscriber.SignalShutdown()
}
}
+
+func (p *LocalPartitionSubscribers) IsEmpty() bool {
+ p.SubscribersLock.RLock()
+ defer p.SubscribersLock.RUnlock()
+
+ return len(p.Subscribers) == 0
+}