aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--weed/mq/sub_coordinator/inflight_message_tracker.go2
-rw-r--r--weed/mq/sub_coordinator/inflight_message_tracker_test.go14
2 files changed, 15 insertions, 1 deletions
diff --git a/weed/mq/sub_coordinator/inflight_message_tracker.go b/weed/mq/sub_coordinator/inflight_message_tracker.go
index 7a72d2eef..f8effef95 100644
--- a/weed/mq/sub_coordinator/inflight_message_tracker.go
+++ b/weed/mq/sub_coordinator/inflight_message_tracker.go
@@ -142,9 +142,9 @@ func (rb *RingBuffer) AckTimestamp(timestamp int64) {
// Remove all the continuously acknowledged timestamps from the buffer
startPos := (rb.head + len(rb.buffer) - rb.size) % len(rb.buffer)
for i := 0; i < len(rb.buffer) && rb.buffer[(startPos+i)%len(rb.buffer)].Acked; i++ {
- rb.size--
t := rb.buffer[(startPos+i)%len(rb.buffer)]
if rb.maxAllAckedTs < t.Timestamp {
+ rb.size--
rb.maxAllAckedTs = t.Timestamp
}
}
diff --git a/weed/mq/sub_coordinator/inflight_message_tracker_test.go b/weed/mq/sub_coordinator/inflight_message_tracker_test.go
index 9f62f80ad..5b7a1bdd8 100644
--- a/weed/mq/sub_coordinator/inflight_message_tracker_test.go
+++ b/weed/mq/sub_coordinator/inflight_message_tracker_test.go
@@ -117,3 +117,17 @@ func TestInflightMessageTracker3(t *testing.T) {
assert.Equal(t, int64(7), tracker.GetOldestAckedTimestamp())
}
+
+func TestInflightMessageTracker4(t *testing.T) {
+ // Initialize an InflightMessageTracker with initial capacity 1
+ tracker := NewInflightMessageTracker(1)
+
+ tracker.EnflightMessage([]byte("1"), int64(1))
+ tracker.EnflightMessage([]byte("2"), int64(2))
+ assert.True(t, tracker.AcknowledgeMessage([]byte("1"), int64(1)))
+ assert.True(t, tracker.AcknowledgeMessage([]byte("2"), int64(2)))
+ tracker.EnflightMessage([]byte("3"), int64(3))
+ assert.True(t, tracker.AcknowledgeMessage([]byte("3"), int64(3)))
+ assert.Equal(t, int64(3), tracker.GetOldestAckedTimestamp())
+
+}