diff options
| author | chrislu <chris.lu@gmail.com> | 2024-05-20 13:29:37 -0700 |
|---|---|---|
| committer | chrislu <chris.lu@gmail.com> | 2024-05-20 13:29:37 -0700 |
| commit | 3148dec1f6314123d63f0b8e8060e13070ae9556 (patch) | |
| tree | c15cfeb5413921fbacd0ce26a133e11586b5d5b6 | |
| parent | a7d30d0705fa37966583c95f3f7ddfa150f909b8 (diff) | |
| download | seaweedfs-3148dec1f6314123d63f0b8e8060e13070ae9556.tar.xz seaweedfs-3148dec1f6314123d63f0b8e8060e13070ae9556.zip | |
avoid concurrent processing for the same key
| -rw-r--r-- | weed/mq/broker/broker_grpc_sub.go | 4 | ||||
| -rw-r--r-- | weed/mq/sub_coordinator/inflight_message_tracker.go | 8 |
2 files changed, 12 insertions, 0 deletions
diff --git a/weed/mq/broker/broker_grpc_sub.go b/weed/mq/broker/broker_grpc_sub.go index 7cbde21f8..b4475c525 100644 --- a/weed/mq/broker/broker_grpc_sub.go +++ b/weed/mq/broker/broker_grpc_sub.go @@ -162,6 +162,10 @@ func (b *MessageQueueBroker) SubscribeMessage(stream mq_pb.SeaweedMessaging_Subs // reset the sleep interval count sleepIntervalCount = 0 + for imt.IsInflight(logEntry.Key) { + time.Sleep(137 * time.Millisecond) + } + imt.InflightMessage(logEntry.Key, logEntry.TsNs) if err := stream.Send(&mq_pb.SubscribeMessageResponse{Message: &mq_pb.SubscribeMessageResponse_Data{ diff --git a/weed/mq/sub_coordinator/inflight_message_tracker.go b/weed/mq/sub_coordinator/inflight_message_tracker.go index f48335435..1b1caca00 100644 --- a/weed/mq/sub_coordinator/inflight_message_tracker.go +++ b/weed/mq/sub_coordinator/inflight_message_tracker.go @@ -67,6 +67,14 @@ func (imt *InflightMessageTracker) GetOldest() int64 { return imt.timestamps.Oldest() } +// IsInflight returns true if the message with the key is inflight. +func (imt *InflightMessageTracker) IsInflight(key []byte) bool { + imt.mu.Lock() + defer imt.mu.Unlock() + _, found := imt.messages[string(key)] + return found +} + // RingBuffer represents a circular buffer to hold timestamps. type RingBuffer struct { buffer []int64 |
