aboutsummaryrefslogtreecommitdiff
path: root/weed
diff options
context:
space:
mode:
authorchrislu <chris.lu@gmail.com>2024-05-21 08:42:04 -0700
committerchrislu <chris.lu@gmail.com>2024-05-21 08:42:04 -0700
commitfa98ecf71edcdf1a8b63a92f46a473ad1073b889 (patch)
tree8e848071739f4fddbe94c6edcc4d3075822dc0bd /weed
parent6634b429812310b38c0efc7f05a9c8fa3911aa68 (diff)
downloadseaweedfs-fa98ecf71edcdf1a8b63a92f46a473ad1073b889.tar.xz
seaweedfs-fa98ecf71edcdf1a8b63a92f46a473ad1073b889.zip
client side stop partition subscribing if unassigned
Diffstat (limited to 'weed')
-rw-r--r--weed/mq/client/sub_client/on_each_partition.go23
-rw-r--r--weed/mq/client/sub_client/subscribe.go16
2 files changed, 27 insertions, 12 deletions
diff --git a/weed/mq/client/sub_client/on_each_partition.go b/weed/mq/client/sub_client/on_each_partition.go
index 792376a69..8d5bf2044 100644
--- a/weed/mq/client/sub_client/on_each_partition.go
+++ b/weed/mq/client/sub_client/on_each_partition.go
@@ -10,7 +10,7 @@ import (
"io"
)
-func (sub *TopicSubscriber) onEachPartition(assigned *mq_pb.BrokerPartitionAssignment) error {
+func (sub *TopicSubscriber) onEachPartition(assigned *mq_pb.BrokerPartitionAssignment, stopCh chan struct{}) error {
// connect to the partition broker
return pb.WithBrokerGrpcClient(true, assigned.LeaderBroker, sub.SubscriberConfig.GrpcDialOption, func(client mq_pb.SeaweedMessagingClient) error {
@@ -61,15 +61,20 @@ func (sub *TopicSubscriber) onEachPartition(assigned *mq_pb.BrokerPartitionAssig
executors := util.NewLimitedConcurrentExecutor(int(perPartitionConcurrency))
go func() {
- for ack := range partitionOffsetChan {
- subscribeClient.SendMsg(&mq_pb.SubscribeMessageRequest{
- Message: &mq_pb.SubscribeMessageRequest_Ack{
- Ack: &mq_pb.SubscribeMessageRequest_AckMessage{
- Key: ack.Key,
- Sequence: ack.Offset,
+ for {
+ select {
+ case <-stopCh:
+ break
+ case ack := <- partitionOffsetChan:
+ subscribeClient.SendMsg(&mq_pb.SubscribeMessageRequest{
+ Message: &mq_pb.SubscribeMessageRequest_Ack{
+ Ack: &mq_pb.SubscribeMessageRequest_AckMessage{
+ Key: ack.Key,
+ Sequence: ack.Offset,
+ },
},
- },
- })
+ })
+ }
}
subscribeClient.CloseSend()
}()
diff --git a/weed/mq/client/sub_client/subscribe.go b/weed/mq/client/sub_client/subscribe.go
index e539d3da0..7208ce2ac 100644
--- a/weed/mq/client/sub_client/subscribe.go
+++ b/weed/mq/client/sub_client/subscribe.go
@@ -9,6 +9,7 @@ import (
)
type ProcessorState struct {
+ stopCh chan struct{}
}
// Subscribe subscribes to a topic's specified partitions.
@@ -41,8 +42,11 @@ func (sub *TopicSubscriber) startProcessors() {
sub.waitUntilNoOverlappingPartitionInFlight(topicPartition)
// start a processors
+ stopChan := make(chan struct{})
sub.activeProcessorsLock.Lock()
- sub.activeProcessors[topicPartition] = &ProcessorState{}
+ sub.activeProcessors[topicPartition] = &ProcessorState{
+ stopCh: stopChan,
+ }
sub.activeProcessorsLock.Unlock()
go func(assigned *mq_pb.BrokerPartitionAssignment, topicPartition topic.Partition) {
@@ -55,7 +59,7 @@ func (sub *TopicSubscriber) startProcessors() {
wg.Done()
}()
glog.V(0).Infof("subscriber %s/%s assigned partition %+v at %v", sub.ContentConfig.Topic, sub.SubscriberConfig.ConsumerGroup, assigned.Partition, assigned.LeaderBroker)
- err := sub.onEachPartition(assigned)
+ err := sub.onEachPartition(assigned, stopChan)
if err != nil {
glog.V(0).Infof("subscriber %s/%s partition %+v at %v: %v", sub.ContentConfig.Topic, sub.SubscriberConfig.ConsumerGroup, assigned.Partition, assigned.LeaderBroker, err)
} else {
@@ -64,7 +68,13 @@ func (sub *TopicSubscriber) startProcessors() {
}(assigned.PartitionAssignment, topicPartition)
}
if unAssignment := message.GetUnAssignment(); unAssignment != nil {
-
+ topicPartition := topic.FromPbPartition(unAssignment.Partition)
+ sub.activeProcessorsLock.Lock()
+ if processor, found := sub.activeProcessors[topicPartition]; found {
+ close(processor.stopCh)
+ delete(sub.activeProcessors, topicPartition)
+ }
+ sub.activeProcessorsLock.Unlock()
}
}