aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorchrislu <chris.lu@gmail.com>2025-10-13 21:19:38 -0700
committerchrislu <chris.lu@gmail.com>2025-10-13 21:19:38 -0700
commitffc45a538d535c2cc9f374918c7db68c09809299 (patch)
tree672f88a01d32d876b58760acc291621269bb3059
parentf15eaaf8b9e9bfe92c392d6ba17f41140ea283f3 (diff)
downloadseaweedfs-ffc45a538d535c2cc9f374918c7db68c09809299.tar.xz
seaweedfs-ffc45a538d535c2cc9f374918c7db68c09809299.zip
Added bounds checking after calculating startIdx.
Problem: Race condition in cache lookup logic: Thread A reads cache metadata (17+ records, endOffset = 32) Thread B modifies/truncates the cache to 17 records Thread A calculates startIdx = 19 based on old metadata Slice operation consumedRecords[19:17] panics
-rw-r--r--weed/mq/kafka/integration/broker_client_subscribe.go19
1 files changed, 13 insertions, 6 deletions
diff --git a/weed/mq/kafka/integration/broker_client_subscribe.go b/weed/mq/kafka/integration/broker_client_subscribe.go
index e3ee6954b..09015ec60 100644
--- a/weed/mq/kafka/integration/broker_client_subscribe.go
+++ b/weed/mq/kafka/integration/broker_client_subscribe.go
@@ -271,13 +271,20 @@ func (bc *BrokerClient) ReadRecordsFromOffset(ctx context.Context, session *Brok
if requestedOffset >= cacheStartOffset && requestedOffset <= cacheEndOffset {
// Found in cache
startIdx := int(requestedOffset - cacheStartOffset)
- endIdx := startIdx + maxRecords
- if endIdx > len(session.consumedRecords) {
- endIdx = len(session.consumedRecords)
+ // CRITICAL: Bounds check to prevent race condition where cache is modified between checks
+ if startIdx < 0 || startIdx >= len(session.consumedRecords) {
+ glog.V(2).Infof("[FETCH] Cache index out of bounds (race condition): startIdx=%d, cache size=%d, falling through to normal read",
+ startIdx, len(session.consumedRecords))
+ // Cache was modified, fall through to normal read path
+ } else {
+ endIdx := startIdx + maxRecords
+ if endIdx > len(session.consumedRecords) {
+ endIdx = len(session.consumedRecords)
+ }
+ glog.V(2).Infof("[FETCH] Returning %d cached records for offset %d", endIdx-startIdx, requestedOffset)
+ session.mu.Unlock()
+ return session.consumedRecords[startIdx:endIdx], nil
}
- glog.V(2).Infof("[FETCH] Returning %d cached records for offset %d", endIdx-startIdx, requestedOffset)
- session.mu.Unlock()
- return session.consumedRecords[startIdx:endIdx], nil
}
}