aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorchrislu <chris.lu@gmail.com>2024-05-30 00:27:44 -0700
committerchrislu <chris.lu@gmail.com>2024-05-30 00:27:44 -0700
commit23a72db1df85a275a5f63a8b76d1990667434994 (patch)
treea3ea3c87e351c658a49de7e0f8eecfd9d9fb2054
parentaf19256dbf95e91476bf182203ca5df34fd25ab3 (diff)
downloadseaweedfs-23a72db1df85a275a5f63a8b76d1990667434994.tar.xz
seaweedfs-23a72db1df85a275a5f63a8b76d1990667434994.zip
stop partitionOffsetChan if closed
-rw-r--r--weed/mq/client/sub_client/on_each_partition.go5
1 files changed, 5 insertions, 0 deletions
diff --git a/weed/mq/client/sub_client/on_each_partition.go b/weed/mq/client/sub_client/on_each_partition.go
index 14e427713..d0222c370 100644
--- a/weed/mq/client/sub_client/on_each_partition.go
+++ b/weed/mq/client/sub_client/on_each_partition.go
@@ -67,6 +67,11 @@ func (sub *TopicSubscriber) onEachPartition(assigned *mq_pb.BrokerPartitionAssig
subscribeClient.CloseSend()
return
case ack := <-partitionOffsetChan:
+ case ack, ok := <-partitionOffsetChan:
+ if !ok {
+ subscribeClient.CloseSend()
+ return
+ }
subscribeClient.SendMsg(&mq_pb.SubscribeMessageRequest{
Message: &mq_pb.SubscribeMessageRequest_Ack{
Ack: &mq_pb.SubscribeMessageRequest_AckMessage{