1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
|
package sub_client
import (
"github.com/seaweedfs/seaweedfs/weed/util/log"
"github.com/seaweedfs/seaweedfs/weed/mq/topic"
"github.com/seaweedfs/seaweedfs/weed/pb/mq_pb"
"github.com/seaweedfs/seaweedfs/weed/util"
"sync"
"time"
)
type ProcessorState struct {
stopCh chan struct{}
}
// Subscribe subscribes to a topic's specified partitions.
// If a partition is moved to another broker, the subscriber will automatically reconnect to the new broker.
func (sub *TopicSubscriber) Subscribe() error {
go sub.startProcessors()
// loop forever
// TODO shutdown the subscriber when not needed anymore
sub.doKeepConnectedToSubCoordinator()
return nil
}
func (sub *TopicSubscriber) startProcessors() {
// listen to the messages from the sub coordinator
// start one processor per partition
var wg sync.WaitGroup
semaphore := make(chan struct{}, sub.SubscriberConfig.MaxPartitionCount)
for message := range sub.brokerPartitionAssignmentChan {
if assigned := message.GetAssignment(); assigned != nil {
wg.Add(1)
semaphore <- struct{}{}
topicPartition := topic.FromPbPartition(assigned.PartitionAssignment.Partition)
// wait until no covering partition is still in progress
sub.waitUntilNoOverlappingPartitionInFlight(topicPartition)
// start a processors
stopChan := make(chan struct{})
sub.activeProcessorsLock.Lock()
sub.activeProcessors[topicPartition] = &ProcessorState{
stopCh: stopChan,
}
sub.activeProcessorsLock.Unlock()
go func(assigned *mq_pb.BrokerPartitionAssignment, topicPartition topic.Partition) {
defer func() {
sub.activeProcessorsLock.Lock()
delete(sub.activeProcessors, topicPartition)
sub.activeProcessorsLock.Unlock()
<-semaphore
wg.Done()
}()
log.V(3).Infof("subscriber %s/%s assigned partition %+v at %v", sub.ContentConfig.Topic, sub.SubscriberConfig.ConsumerGroup, assigned.Partition, assigned.LeaderBroker)
sub.brokerPartitionAssignmentAckChan <- &mq_pb.SubscriberToSubCoordinatorRequest{
Message: &mq_pb.SubscriberToSubCoordinatorRequest_AckAssignment{
AckAssignment: &mq_pb.SubscriberToSubCoordinatorRequest_AckAssignmentMessage{
Partition: assigned.Partition,
},
},
}
executors := util.NewLimitedConcurrentExecutor(int(sub.SubscriberConfig.SlidingWindowSize))
onDataMessageFn := func(m *mq_pb.SubscribeMessageResponse_Data) {
executors.Execute(func() {
if sub.OnDataMessageFunc != nil {
sub.OnDataMessageFunc(m)
}
sub.PartitionOffsetChan <- KeyedOffset{
Key: m.Data.Key,
Offset: m.Data.TsNs,
}
})
}
err := sub.onEachPartition(assigned, stopChan, onDataMessageFn)
if err != nil {
log.V(3).Infof("subscriber %s/%s partition %+v at %v: %v", sub.ContentConfig.Topic, sub.SubscriberConfig.ConsumerGroup, assigned.Partition, assigned.LeaderBroker, err)
} else {
log.V(3).Infof("subscriber %s/%s partition %+v at %v completed", sub.ContentConfig.Topic, sub.SubscriberConfig.ConsumerGroup, assigned.Partition, assigned.LeaderBroker)
}
sub.brokerPartitionAssignmentAckChan <- &mq_pb.SubscriberToSubCoordinatorRequest{
Message: &mq_pb.SubscriberToSubCoordinatorRequest_AckUnAssignment{
AckUnAssignment: &mq_pb.SubscriberToSubCoordinatorRequest_AckUnAssignmentMessage{
Partition: assigned.Partition,
},
},
}
}(assigned.PartitionAssignment, topicPartition)
}
if unAssignment := message.GetUnAssignment(); unAssignment != nil {
topicPartition := topic.FromPbPartition(unAssignment.Partition)
sub.activeProcessorsLock.Lock()
if processor, found := sub.activeProcessors[topicPartition]; found {
close(processor.stopCh)
delete(sub.activeProcessors, topicPartition)
}
sub.activeProcessorsLock.Unlock()
}
}
wg.Wait()
}
func (sub *TopicSubscriber) waitUntilNoOverlappingPartitionInFlight(topicPartition topic.Partition) {
foundOverlapping := true
for foundOverlapping {
sub.activeProcessorsLock.Lock()
foundOverlapping = false
var overlappedPartition topic.Partition
for partition, _ := range sub.activeProcessors {
if partition.Overlaps(topicPartition) {
if partition.Equals(topicPartition) {
continue
}
foundOverlapping = true
overlappedPartition = partition
break
}
}
sub.activeProcessorsLock.Unlock()
if foundOverlapping {
log.V(3).Infof("subscriber %s new partition %v waiting for partition %+v to complete", sub.ContentConfig.Topic, topicPartition, overlappedPartition)
time.Sleep(1 * time.Second)
}
}
}
|