aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--weed/mq/client/sub_client/on_each_partition.go5
1 files changed, 5 insertions, 0 deletions
diff --git a/weed/mq/client/sub_client/on_each_partition.go b/weed/mq/client/sub_client/on_each_partition.go
index 14e427713..d0222c370 100644
--- a/weed/mq/client/sub_client/on_each_partition.go
+++ b/weed/mq/client/sub_client/on_each_partition.go
@@ -67,6 +67,11 @@ func (sub *TopicSubscriber) onEachPartition(assigned *mq_pb.BrokerPartitionAssig
subscribeClient.CloseSend()
return
case ack := <-partitionOffsetChan:
+ case ack, ok := <-partitionOffsetChan:
+ if !ok {
+ subscribeClient.CloseSend()
+ return
+ }
subscribeClient.SendMsg(&mq_pb.SubscribeMessageRequest{
Message: &mq_pb.SubscribeMessageRequest_Ack{
Ack: &mq_pb.SubscribeMessageRequest_AckMessage{