aboutsummaryrefslogtreecommitdiff
path: root/weed/mq/client/sub_client/subscribe.go
blob: 1d06e0601ed4365b4c5ed750854e97c171886e53 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
package sub_client

import (
	"github.com/seaweedfs/seaweedfs/weed/util/log"
	"github.com/seaweedfs/seaweedfs/weed/mq/topic"
	"github.com/seaweedfs/seaweedfs/weed/pb/mq_pb"
	"github.com/seaweedfs/seaweedfs/weed/util"
	"sync"
	"time"
)

type ProcessorState struct {
	stopCh chan struct{}
}

// Subscribe subscribes to a topic's specified partitions.
// If a partition is moved to another broker, the subscriber will automatically reconnect to the new broker.

func (sub *TopicSubscriber) Subscribe() error {

	go sub.startProcessors()

	// loop forever
	// TODO shutdown the subscriber when not needed anymore
	sub.doKeepConnectedToSubCoordinator()

	return nil
}

func (sub *TopicSubscriber) startProcessors() {
	// listen to the messages from the sub coordinator
	// start one processor per partition
	var wg sync.WaitGroup
	semaphore := make(chan struct{}, sub.SubscriberConfig.MaxPartitionCount)

	for message := range sub.brokerPartitionAssignmentChan {
		if assigned := message.GetAssignment(); assigned != nil {
			wg.Add(1)
			semaphore <- struct{}{}

			topicPartition := topic.FromPbPartition(assigned.PartitionAssignment.Partition)

			// wait until no covering partition is still in progress
			sub.waitUntilNoOverlappingPartitionInFlight(topicPartition)

			// start a processors
			stopChan := make(chan struct{})
			sub.activeProcessorsLock.Lock()
			sub.activeProcessors[topicPartition] = &ProcessorState{
				stopCh: stopChan,
			}
			sub.activeProcessorsLock.Unlock()

			go func(assigned *mq_pb.BrokerPartitionAssignment, topicPartition topic.Partition) {
				defer func() {
					sub.activeProcessorsLock.Lock()
					delete(sub.activeProcessors, topicPartition)
					sub.activeProcessorsLock.Unlock()

					<-semaphore
					wg.Done()
				}()
				log.V(3).Infof("subscriber %s/%s assigned partition %+v at %v", sub.ContentConfig.Topic, sub.SubscriberConfig.ConsumerGroup, assigned.Partition, assigned.LeaderBroker)
				sub.brokerPartitionAssignmentAckChan <- &mq_pb.SubscriberToSubCoordinatorRequest{
					Message: &mq_pb.SubscriberToSubCoordinatorRequest_AckAssignment{
						AckAssignment: &mq_pb.SubscriberToSubCoordinatorRequest_AckAssignmentMessage{
							Partition: assigned.Partition,
						},
					},
				}

				executors := util.NewLimitedConcurrentExecutor(int(sub.SubscriberConfig.SlidingWindowSize))
				onDataMessageFn := func(m *mq_pb.SubscribeMessageResponse_Data) {
					executors.Execute(func() {
						if sub.OnDataMessageFunc != nil {
							sub.OnDataMessageFunc(m)
						}
						sub.PartitionOffsetChan <- KeyedOffset{
							Key:    m.Data.Key,
							Offset: m.Data.TsNs,
						}
					})
				}

				err := sub.onEachPartition(assigned, stopChan, onDataMessageFn)
				if err != nil {
					log.V(3).Infof("subscriber %s/%s partition %+v at %v: %v", sub.ContentConfig.Topic, sub.SubscriberConfig.ConsumerGroup, assigned.Partition, assigned.LeaderBroker, err)
				} else {
					log.V(3).Infof("subscriber %s/%s partition %+v at %v completed", sub.ContentConfig.Topic, sub.SubscriberConfig.ConsumerGroup, assigned.Partition, assigned.LeaderBroker)
				}
				sub.brokerPartitionAssignmentAckChan <- &mq_pb.SubscriberToSubCoordinatorRequest{
					Message: &mq_pb.SubscriberToSubCoordinatorRequest_AckUnAssignment{
						AckUnAssignment: &mq_pb.SubscriberToSubCoordinatorRequest_AckUnAssignmentMessage{
							Partition: assigned.Partition,
						},
					},
				}
			}(assigned.PartitionAssignment, topicPartition)
		}
		if unAssignment := message.GetUnAssignment(); unAssignment != nil {
			topicPartition := topic.FromPbPartition(unAssignment.Partition)
			sub.activeProcessorsLock.Lock()
			if processor, found := sub.activeProcessors[topicPartition]; found {
				close(processor.stopCh)
				delete(sub.activeProcessors, topicPartition)
			}
			sub.activeProcessorsLock.Unlock()
		}
	}

	wg.Wait()

}

func (sub *TopicSubscriber) waitUntilNoOverlappingPartitionInFlight(topicPartition topic.Partition) {
	foundOverlapping := true
	for foundOverlapping {
		sub.activeProcessorsLock.Lock()
		foundOverlapping = false
		var overlappedPartition topic.Partition
		for partition, _ := range sub.activeProcessors {
			if partition.Overlaps(topicPartition) {
				if partition.Equals(topicPartition) {
					continue
				}
				foundOverlapping = true
				overlappedPartition = partition
				break
			}
		}
		sub.activeProcessorsLock.Unlock()
		if foundOverlapping {
			log.V(3).Infof("subscriber %s new partition %v waiting for partition %+v to complete", sub.ContentConfig.Topic, topicPartition, overlappedPartition)
			time.Sleep(1 * time.Second)
		}
	}
}