aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorchrislu <chris.lu@gmail.com>2024-05-20 13:29:37 -0700
committerchrislu <chris.lu@gmail.com>2024-05-20 13:29:37 -0700
commit3148dec1f6314123d63f0b8e8060e13070ae9556 (patch)
treec15cfeb5413921fbacd0ce26a133e11586b5d5b6
parenta7d30d0705fa37966583c95f3f7ddfa150f909b8 (diff)
downloadseaweedfs-3148dec1f6314123d63f0b8e8060e13070ae9556.tar.xz
seaweedfs-3148dec1f6314123d63f0b8e8060e13070ae9556.zip
avoid concurrent processing for the same key
-rw-r--r--weed/mq/broker/broker_grpc_sub.go4
-rw-r--r--weed/mq/sub_coordinator/inflight_message_tracker.go8
2 files changed, 12 insertions, 0 deletions
diff --git a/weed/mq/broker/broker_grpc_sub.go b/weed/mq/broker/broker_grpc_sub.go
index 7cbde21f8..b4475c525 100644
--- a/weed/mq/broker/broker_grpc_sub.go
+++ b/weed/mq/broker/broker_grpc_sub.go
@@ -162,6 +162,10 @@ func (b *MessageQueueBroker) SubscribeMessage(stream mq_pb.SeaweedMessaging_Subs
// reset the sleep interval count
sleepIntervalCount = 0
+ for imt.IsInflight(logEntry.Key) {
+ time.Sleep(137 * time.Millisecond)
+ }
+
imt.InflightMessage(logEntry.Key, logEntry.TsNs)
if err := stream.Send(&mq_pb.SubscribeMessageResponse{Message: &mq_pb.SubscribeMessageResponse_Data{
diff --git a/weed/mq/sub_coordinator/inflight_message_tracker.go b/weed/mq/sub_coordinator/inflight_message_tracker.go
index f48335435..1b1caca00 100644
--- a/weed/mq/sub_coordinator/inflight_message_tracker.go
+++ b/weed/mq/sub_coordinator/inflight_message_tracker.go
@@ -67,6 +67,14 @@ func (imt *InflightMessageTracker) GetOldest() int64 {
return imt.timestamps.Oldest()
}
+// IsInflight returns true if the message with the key is inflight.
+func (imt *InflightMessageTracker) IsInflight(key []byte) bool {
+ imt.mu.Lock()
+ defer imt.mu.Unlock()
+ _, found := imt.messages[string(key)]
+ return found
+}
+
// RingBuffer represents a circular buffer to hold timestamps.
type RingBuffer struct {
buffer []int64