diff options
| author | chrislu <chris.lu@gmail.com> | 2025-10-13 21:19:38 -0700 |
|---|---|---|
| committer | chrislu <chris.lu@gmail.com> | 2025-10-13 21:19:38 -0700 |
| commit | ffc45a538d535c2cc9f374918c7db68c09809299 (patch) | |
| tree | 672f88a01d32d876b58760acc291621269bb3059 | |
| parent | f15eaaf8b9e9bfe92c392d6ba17f41140ea283f3 (diff) | |
| download | seaweedfs-ffc45a538d535c2cc9f374918c7db68c09809299.tar.xz seaweedfs-ffc45a538d535c2cc9f374918c7db68c09809299.zip | |
Added bounds checking after calculating startIdx.
Problem: Race condition in cache lookup logic:
Thread A reads cache metadata (17+ records, endOffset = 32)
Thread B modifies/truncates the cache to 17 records
Thread A calculates startIdx = 19 based on old metadata
Slice operation consumedRecords[19:17] panics
| -rw-r--r-- | weed/mq/kafka/integration/broker_client_subscribe.go | 19 |
1 files changed, 13 insertions, 6 deletions
diff --git a/weed/mq/kafka/integration/broker_client_subscribe.go b/weed/mq/kafka/integration/broker_client_subscribe.go index e3ee6954b..09015ec60 100644 --- a/weed/mq/kafka/integration/broker_client_subscribe.go +++ b/weed/mq/kafka/integration/broker_client_subscribe.go @@ -271,13 +271,20 @@ func (bc *BrokerClient) ReadRecordsFromOffset(ctx context.Context, session *Brok if requestedOffset >= cacheStartOffset && requestedOffset <= cacheEndOffset { // Found in cache startIdx := int(requestedOffset - cacheStartOffset) - endIdx := startIdx + maxRecords - if endIdx > len(session.consumedRecords) { - endIdx = len(session.consumedRecords) + // CRITICAL: Bounds check to prevent race condition where cache is modified between checks + if startIdx < 0 || startIdx >= len(session.consumedRecords) { + glog.V(2).Infof("[FETCH] Cache index out of bounds (race condition): startIdx=%d, cache size=%d, falling through to normal read", + startIdx, len(session.consumedRecords)) + // Cache was modified, fall through to normal read path + } else { + endIdx := startIdx + maxRecords + if endIdx > len(session.consumedRecords) { + endIdx = len(session.consumedRecords) + } + glog.V(2).Infof("[FETCH] Returning %d cached records for offset %d", endIdx-startIdx, requestedOffset) + session.mu.Unlock() + return session.consumedRecords[startIdx:endIdx], nil } - glog.V(2).Infof("[FETCH] Returning %d cached records for offset %d", endIdx-startIdx, requestedOffset) - session.mu.Unlock() - return session.consumedRecords[startIdx:endIdx], nil } } |
