aboutsummaryrefslogtreecommitdiff
path: root/weed/mq/topic/local_topic.go
blob: a35bb32b392c36fff47fb913c6725067d802a216 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
package topic

import "sync"

type LocalTopic struct {
	Topic
	Partitions    []*LocalPartition
	partitionLock sync.RWMutex
}

func NewLocalTopic(topic Topic) *LocalTopic {
	return &LocalTopic{
		Topic:      topic,
		Partitions: make([]*LocalPartition, 0),
	}
}

func (localTopic *LocalTopic) findPartition(partition Partition) *LocalPartition {
	localTopic.partitionLock.RLock()
	defer localTopic.partitionLock.RUnlock()

	for _, localPartition := range localTopic.Partitions {
		if localPartition.Partition.Equals(partition) {
			return localPartition
		}
	}
	return nil
}
func (localTopic *LocalTopic) removePartition(partition Partition) bool {
	localTopic.partitionLock.Lock()
	defer localTopic.partitionLock.Unlock()

	foundPartitionIndex := -1
	for i, localPartition := range localTopic.Partitions {
		if localPartition.Partition.Equals(partition) {
			foundPartitionIndex = i
			localPartition.Shutdown()
			break
		}
	}
	if foundPartitionIndex == -1 {
		return false
	}
	localTopic.Partitions = append(localTopic.Partitions[:foundPartitionIndex], localTopic.Partitions[foundPartitionIndex+1:]...)
	return true
}
func (localTopic *LocalTopic) addPartition(localPartition *LocalPartition) {
	localTopic.partitionLock.Lock()
	defer localTopic.partitionLock.Unlock()
	for _, partition := range localTopic.Partitions {
		if localPartition.Partition.Equals(partition.Partition) {
			return
		}
	}
	localTopic.Partitions = append(localTopic.Partitions, localPartition)
}

func (localTopic *LocalTopic) closePartitionPublishers(unixTsNs int64) bool {
	var wg sync.WaitGroup
	for _, localPartition := range localTopic.Partitions {
		if localPartition.UnixTimeNs != unixTsNs {
			continue
		}
		wg.Add(1)
		go func(localPartition *LocalPartition) {
			defer wg.Done()
			localPartition.closePublishers()
		}(localPartition)
	}
	wg.Wait()
	return true
}

func (localTopic *LocalTopic) closePartitionSubscribers(unixTsNs int64) bool {
	var wg sync.WaitGroup
	for _, localPartition := range localTopic.Partitions {
		if localPartition.UnixTimeNs != unixTsNs {
			continue
		}
		wg.Add(1)
		go func(localPartition *LocalPartition) {
			defer wg.Done()
			localPartition.closeSubscribers()
		}(localPartition)
	}
	wg.Wait()
	return true
}

func (localTopic *LocalTopic) WaitUntilNoPublishers() {
	for {
		var wg sync.WaitGroup
		for _, localPartition := range localTopic.Partitions {
			wg.Add(1)
			go func(localPartition *LocalPartition) {
				defer wg.Done()
				localPartition.WaitUntilNoPublishers()
			}(localPartition)
		}
		wg.Wait()
		if len(localTopic.Partitions) == 0 {
			return
		}
	}
}