aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorchrislu <chris.lu@gmail.com>2024-05-20 09:19:39 -0700
committerchrislu <chris.lu@gmail.com>2024-05-20 09:19:39 -0700
commit2b07a40da5893cba380ac2d9a51e550af6b73e5a (patch)
treef86852ff0e4dd66434abf0ac9734b5d6af4f5f5f
parenta2885512e184a6ab8e00fed5b6e4b0679d219230 (diff)
downloadseaweedfs-2b07a40da5893cba380ac2d9a51e550af6b73e5a.tar.xz
seaweedfs-2b07a40da5893cba380ac2d9a51e550af6b73e5a.zip
add InflightMessageTracker
-rw-r--r--weed/mq/sub_coordinator/inflight_message_tracker.go120
-rw-r--r--weed/mq/sub_coordinator/inflight_message_tracker_test.go85
2 files changed, 205 insertions, 0 deletions
diff --git a/weed/mq/sub_coordinator/inflight_message_tracker.go b/weed/mq/sub_coordinator/inflight_message_tracker.go
new file mode 100644
index 000000000..aa1f3f0c6
--- /dev/null
+++ b/weed/mq/sub_coordinator/inflight_message_tracker.go
@@ -0,0 +1,120 @@
+package sub_coordinator
+
+import (
+ "sort"
+ "sync"
+)
+
+type InflightMessageTracker struct {
+ messages map[string]int64
+ mu sync.Mutex
+ timestamps *RingBuffer
+}
+
+func NewInflightMessageTracker(capacity int) *InflightMessageTracker {
+ return &InflightMessageTracker{
+ messages: make(map[string]int64),
+ timestamps: NewRingBuffer(capacity),
+ }
+}
+
+// InflightMessage tracks the message with the key and timestamp.
+// These messages are sent to the consumer group instances and waiting for ack.
+func (imt *InflightMessageTracker) InflightMessage(key []byte, tsNs int64) {
+ imt.mu.Lock()
+ defer imt.mu.Unlock()
+ imt.messages[string(key)] = tsNs
+ imt.timestamps.Add(tsNs)
+}
+// IsMessageAcknowledged returns true if the message has been acknowledged.
+// If the message is older than the oldest inflight messages, returns false.
+// returns false if the message is inflight.
+// Otherwise, returns false if the message is old and can be ignored.
+func (imt *InflightMessageTracker) IsMessageAcknowledged(key []byte, tsNs int64) bool {
+ imt.mu.Lock()
+ defer imt.mu.Unlock()
+
+ if tsNs < imt.timestamps.Oldest() {
+ return true
+ }
+ if tsNs > imt.timestamps.Latest() {
+ return false
+ }
+
+ if _, found := imt.messages[string(key)]; found {
+ return false
+ }
+
+ return true
+}
+// AcknowledgeMessage acknowledges the message with the key and timestamp.
+func (imt *InflightMessageTracker) AcknowledgeMessage(key []byte, tsNs int64) bool {
+ imt.mu.Lock()
+ defer imt.mu.Unlock()
+ timestamp, exists := imt.messages[string(key)]
+ if !exists || timestamp != tsNs {
+ return false
+ }
+ delete(imt.messages, string(key))
+ // Remove the specific timestamp from the ring buffer.
+ imt.timestamps.Remove(tsNs)
+ return true
+}
+
+// RingBuffer represents a circular buffer to hold timestamps.
+type RingBuffer struct {
+ buffer []int64
+ head int
+ size int
+}
+// NewRingBuffer creates a new RingBuffer of the given capacity.
+func NewRingBuffer(capacity int) *RingBuffer {
+ return &RingBuffer{
+ buffer: make([]int64, capacity),
+ }
+}
+// Add adds a new timestamp to the ring buffer.
+func (rb *RingBuffer) Add(timestamp int64) {
+ rb.buffer[rb.head] = timestamp
+ rb.head = (rb.head + 1) % len(rb.buffer)
+ if rb.size < len(rb.buffer) {
+ rb.size++
+ }
+}
+// Remove removes the specified timestamp from the ring buffer.
+func (rb *RingBuffer) Remove(timestamp int64) {
+ // Perform binary search
+ index := sort.Search(rb.size, func(i int) bool {
+ return rb.buffer[(rb.head+len(rb.buffer)-rb.size+i)%len(rb.buffer)] >= timestamp
+ })
+ actualIndex := (rb.head + len(rb.buffer) - rb.size + index) % len(rb.buffer)
+
+ if index < rb.size && rb.buffer[actualIndex] == timestamp {
+ // Shift elements to maintain the buffer order
+ for i := index; i < rb.size-1; i++ {
+ fromIndex := (rb.head + len(rb.buffer) - rb.size + i + 1) % len(rb.buffer)
+ toIndex := (rb.head + len(rb.buffer) - rb.size + i) % len(rb.buffer)
+ rb.buffer[toIndex] = rb.buffer[fromIndex]
+ }
+ rb.size--
+ rb.buffer[(rb.head+len(rb.buffer)-1)%len(rb.buffer)] = 0 // Clear the last element
+ }
+}
+
+// Oldest returns the oldest timestamp in the ring buffer.
+func (rb *RingBuffer) Oldest() int64 {
+ if rb.size == 0 {
+ return 0
+ }
+ oldestIndex := (rb.head + len(rb.buffer) - rb.size) % len(rb.buffer)
+ return rb.buffer[oldestIndex]
+}
+
+// Latest returns the most recently added timestamp in the ring buffer.
+func (rb *RingBuffer) Latest() int64 {
+ if rb.size == 0 {
+ return 0
+ }
+ latestIndex := (rb.head + len(rb.buffer) - 1) % len(rb.buffer)
+ return rb.buffer[latestIndex]
+}
diff --git a/weed/mq/sub_coordinator/inflight_message_tracker_test.go b/weed/mq/sub_coordinator/inflight_message_tracker_test.go
new file mode 100644
index 000000000..4b35e32bf
--- /dev/null
+++ b/weed/mq/sub_coordinator/inflight_message_tracker_test.go
@@ -0,0 +1,85 @@
+package sub_coordinator
+
+import (
+ "sort"
+ "testing"
+ "time"
+)
+
+func TestRingBuffer(t *testing.T) {
+ // Initialize a RingBuffer with capacity 5
+ rb := NewRingBuffer(5)
+
+ // Add timestamps to the buffer
+ timestamps := []int64{100, 200, 300, 400, 500}
+ for _, ts := range timestamps {
+ rb.Add(ts)
+ }
+
+ // Test Add method and buffer size
+ expectedSize := 5
+ if rb.size != expectedSize {
+ t.Errorf("Expected buffer size %d, got %d", expectedSize, rb.size)
+ }
+
+ // Test Oldest and Latest methods
+ expectedOldest := int64(100)
+ if oldest := rb.Oldest(); oldest != expectedOldest {
+ t.Errorf("Expected oldest timestamp %d, got %d", expectedOldest, oldest)
+ }
+ expectedLatest := int64(500)
+ if latest := rb.Latest(); latest != expectedLatest {
+ t.Errorf("Expected latest timestamp %d, got %d", expectedLatest, latest)
+ }
+
+ // Test Remove method
+ rb.Remove(200)
+ expectedSize--
+ if rb.size != expectedSize {
+ t.Errorf("Expected buffer size %d after removal, got %d", expectedSize, rb.size)
+ }
+
+ // Test removal of non-existent element
+ rb.Remove(600)
+ if rb.size != expectedSize {
+ t.Errorf("Expected buffer size %d after attempting removal of non-existent element, got %d", expectedSize, rb.size)
+ }
+
+ // Test binary search correctness
+ target := int64(300)
+ index := sort.Search(rb.size, func(i int) bool {
+ return rb.buffer[(rb.head+len(rb.buffer)-rb.size+i)%len(rb.buffer)] >= target
+ })
+ actualIndex := (rb.head + len(rb.buffer) - rb.size + index) % len(rb.buffer)
+ if rb.buffer[actualIndex] != target {
+ t.Errorf("Binary search failed to find the correct index for timestamp %d", target)
+ }
+}
+
+func TestInflightMessageTracker(t *testing.T) {
+ // Initialize an InflightMessageTracker with capacity 5
+ tracker := NewInflightMessageTracker(5)
+
+ // Add inflight messages
+ key := []byte("exampleKey")
+ timestamp := time.Now().UnixNano()
+ tracker.InflightMessage(key, timestamp)
+
+ // Test IsMessageAcknowledged method
+ isOld := tracker.IsMessageAcknowledged(key, timestamp-10)
+ if !isOld {
+ t.Error("Expected message to be old")
+ }
+
+ // Test AcknowledgeMessage method
+ acked := tracker.AcknowledgeMessage(key, timestamp)
+ if !acked {
+ t.Error("Expected message to be acked")
+ }
+ if _, exists := tracker.messages[string(key)]; exists {
+ t.Error("Expected message to be deleted after ack")
+ }
+ if tracker.timestamps.size != 0 {
+ t.Error("Expected buffer size to be 0 after ack")
+ }
+}