aboutsummaryrefslogtreecommitdiff
path: root/weed/mq/topic/local_partition_subscribers.go
blob: 9c5d44adfc4aef7a6dd1ef334013bcbb76e77637 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
package topic

import (
	"sync"
	"sync/atomic"
	"time"
)

type LocalPartitionSubscribers struct {
	Subscribers     map[string]*LocalSubscriber
	SubscribersLock sync.RWMutex
}
type LocalSubscriber struct {
	connectTimeNs      int64 // accessed atomically
	lastSeenTimeNs     int64 // accessed atomically
	lastReceivedOffset int64 // accessed atomically - offset of last message received
	lastAckedOffset    int64 // accessed atomically - offset of last message acknowledged
	stopCh             chan struct{}
}

func NewLocalSubscriber() *LocalSubscriber {
	now := time.Now().UnixNano()
	subscriber := &LocalSubscriber{
		stopCh: make(chan struct{}, 1),
	}
	atomic.StoreInt64(&subscriber.connectTimeNs, now)
	atomic.StoreInt64(&subscriber.lastSeenTimeNs, now)
	atomic.StoreInt64(&subscriber.lastReceivedOffset, 0)
	atomic.StoreInt64(&subscriber.lastAckedOffset, 0)
	return subscriber
}
func (p *LocalSubscriber) SignalShutdown() {
	close(p.stopCh)
}

// UpdateLastSeen updates the last activity time for this subscriber
func (p *LocalSubscriber) UpdateLastSeen() {
	atomic.StoreInt64(&p.lastSeenTimeNs, time.Now().UnixNano())
}

// UpdateReceivedOffset updates the offset of the last message received by this subscriber
func (p *LocalSubscriber) UpdateReceivedOffset(offset int64) {
	atomic.StoreInt64(&p.lastReceivedOffset, offset)
	atomic.StoreInt64(&p.lastSeenTimeNs, time.Now().UnixNano())
}

// UpdateAckedOffset updates the offset of the last message acknowledged by this subscriber
func (p *LocalSubscriber) UpdateAckedOffset(offset int64) {
	atomic.StoreInt64(&p.lastAckedOffset, offset)
	atomic.StoreInt64(&p.lastSeenTimeNs, time.Now().UnixNano())
}

// GetTimestamps returns the connect and last seen timestamps safely
func (p *LocalSubscriber) GetTimestamps() (connectTimeNs, lastSeenTimeNs int64) {
	return atomic.LoadInt64(&p.connectTimeNs), atomic.LoadInt64(&p.lastSeenTimeNs)
}

// GetOffsets returns the received and acknowledged offsets safely
func (p *LocalSubscriber) GetOffsets() (lastReceivedOffset, lastAckedOffset int64) {
	return atomic.LoadInt64(&p.lastReceivedOffset), atomic.LoadInt64(&p.lastAckedOffset)
}

// GetCurrentOffset returns the acknowledged offset (for compatibility)
func (p *LocalSubscriber) GetCurrentOffset() int64 {
	return atomic.LoadInt64(&p.lastAckedOffset)
}

func NewLocalPartitionSubscribers() *LocalPartitionSubscribers {
	return &LocalPartitionSubscribers{
		Subscribers: make(map[string]*LocalSubscriber),
	}
}

func (p *LocalPartitionSubscribers) AddSubscriber(clientName string, Subscriber *LocalSubscriber) {
	p.SubscribersLock.Lock()
	defer p.SubscribersLock.Unlock()

	p.Subscribers[clientName] = Subscriber
}

func (p *LocalPartitionSubscribers) RemoveSubscriber(clientName string) {
	p.SubscribersLock.Lock()
	defer p.SubscribersLock.Unlock()

	delete(p.Subscribers, clientName)
}

func (p *LocalPartitionSubscribers) SignalShutdown() {
	p.SubscribersLock.RLock()
	defer p.SubscribersLock.RUnlock()

	for _, Subscriber := range p.Subscribers {
		Subscriber.SignalShutdown()
	}
}

func (p *LocalPartitionSubscribers) Size() int {
	p.SubscribersLock.RLock()
	defer p.SubscribersLock.RUnlock()

	return len(p.Subscribers)
}

// GetSubscriberNames returns the names of all subscribers
func (p *LocalPartitionSubscribers) GetSubscriberNames() []string {
	p.SubscribersLock.RLock()
	defer p.SubscribersLock.RUnlock()

	names := make([]string, 0, len(p.Subscribers))
	for name := range p.Subscribers {
		names = append(names, name)
	}
	return names
}

// ForEachSubscriber iterates over all subscribers
func (p *LocalPartitionSubscribers) ForEachSubscriber(fn func(name string, subscriber *LocalSubscriber)) {
	p.SubscribersLock.RLock()
	defer p.SubscribersLock.RUnlock()

	for name, subscriber := range p.Subscribers {
		fn(name, subscriber)
	}
}