diff options
| author | chrislu <chris.lu@gmail.com> | 2024-05-30 00:27:44 -0700 |
|---|---|---|
| committer | chrislu <chris.lu@gmail.com> | 2024-05-30 00:27:44 -0700 |
| commit | 23a72db1df85a275a5f63a8b76d1990667434994 (patch) | |
| tree | a3ea3c87e351c658a49de7e0f8eecfd9d9fb2054 | |
| parent | af19256dbf95e91476bf182203ca5df34fd25ab3 (diff) | |
| download | seaweedfs-23a72db1df85a275a5f63a8b76d1990667434994.tar.xz seaweedfs-23a72db1df85a275a5f63a8b76d1990667434994.zip | |
stop partitionOffsetChan if closed
| -rw-r--r-- | weed/mq/client/sub_client/on_each_partition.go | 5 |
1 files changed, 5 insertions, 0 deletions
diff --git a/weed/mq/client/sub_client/on_each_partition.go b/weed/mq/client/sub_client/on_each_partition.go index 14e427713..d0222c370 100644 --- a/weed/mq/client/sub_client/on_each_partition.go +++ b/weed/mq/client/sub_client/on_each_partition.go @@ -67,6 +67,11 @@ func (sub *TopicSubscriber) onEachPartition(assigned *mq_pb.BrokerPartitionAssig subscribeClient.CloseSend() return case ack := <-partitionOffsetChan: + case ack, ok := <-partitionOffsetChan: + if !ok { + subscribeClient.CloseSend() + return + } subscribeClient.SendMsg(&mq_pb.SubscribeMessageRequest{ Message: &mq_pb.SubscribeMessageRequest_Ack{ Ack: &mq_pb.SubscribeMessageRequest_AckMessage{ |
