aboutsummaryrefslogtreecommitdiff
path: root/weed/util/log_buffer/log_buffer_test.go
blob: 7b851de06a12208d1c7cef11430727c81fa4ca10 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
package log_buffer

import (
	"crypto/rand"
	"fmt"
	"io"
	"sync"
	"testing"
	"time"

	"github.com/seaweedfs/seaweedfs/weed/pb/mq_pb"

	"github.com/seaweedfs/seaweedfs/weed/pb/filer_pb"
)

func TestNewLogBufferFirstBuffer(t *testing.T) {
	flushInterval := time.Second
	lb := NewLogBuffer("test", flushInterval, func(logBuffer *LogBuffer, startTime time.Time, stopTime time.Time, buf []byte, minOffset, maxOffset int64) {
		fmt.Printf("flush from %v to %v %d bytes\n", startTime, stopTime, len(buf))
	}, nil, func() {
	})

	startTime := MessagePosition{Time: time.Now()}

	messageSize := 1024
	messageCount := 5000

	receivedMessageCount := 0
	var wg sync.WaitGroup
	wg.Add(1)
	go func() {
		defer wg.Done()
		lastProcessedTime, isDone, err := lb.LoopProcessLogData("test", startTime, 0, func() bool {
			// stop if no more messages
			return receivedMessageCount < messageCount
		}, func(logEntry *filer_pb.LogEntry) (isDone bool, err error) {
			receivedMessageCount++
			if receivedMessageCount >= messageCount {
				println("processed all messages")
				return true, io.EOF
			}
			return false, nil
		})

		fmt.Printf("before flush: sent %d received %d\n", messageCount, receivedMessageCount)
		fmt.Printf("lastProcessedTime %v isDone %v err: %v\n", lastProcessedTime, isDone, err)
		if err != nil && err != io.EOF {
			t.Errorf("unexpected error %v", err)
		}
	}()

	var buf = make([]byte, messageSize)
	for i := 0; i < messageCount; i++ {
		rand.Read(buf)
		lb.AddToBuffer(&mq_pb.DataMessage{
			Key:   nil,
			Value: buf,
			TsNs:  0,
		})
	}
	wg.Wait()

	if receivedMessageCount != messageCount {
		t.Errorf("expect %d messages, but got %d", messageCount, receivedMessageCount)
	}
}

// TestReadFromBuffer_OldOffsetReturnsResumeFromDiskError tests that requesting an old offset
// that has been flushed to disk properly returns ResumeFromDiskError instead of hanging forever.
// This reproduces the bug where Schema Registry couldn't read the _schemas topic.
func TestReadFromBuffer_OldOffsetReturnsResumeFromDiskError(t *testing.T) {
	tests := []struct {
		name              string
		bufferStartOffset int64
		currentOffset     int64
		requestedOffset   int64
		hasData           bool
		expectError       error
		description       string
	}{
		{
			name:              "Request offset 0 when buffer starts at 4 (Schema Registry bug scenario)",
			bufferStartOffset: 4,
			currentOffset:     10,
			requestedOffset:   0,
			hasData:           true,
			expectError:       ResumeFromDiskError,
			description:       "When Schema Registry tries to read from offset 0, but data has been flushed to disk",
		},
		{
			name:              "Request offset before buffer start with empty buffer",
			bufferStartOffset: 10,
			currentOffset:     10,
			requestedOffset:   5,
			hasData:           false,
			expectError:       ResumeFromDiskError,
			description:       "Old offset with no data in memory should trigger disk read",
		},
		{
			name:              "Request offset before buffer start with data",
			bufferStartOffset: 100,
			currentOffset:     150,
			requestedOffset:   50,
			hasData:           true,
			expectError:       ResumeFromDiskError,
			description:       "Old offset with current data in memory should still trigger disk read",
		},
		{
			name:              "Request current offset (no disk read needed)",
			bufferStartOffset: 4,
			currentOffset:     10,
			requestedOffset:   10,
			hasData:           true,
			expectError:       nil,
			description:       "Current offset should return data from memory without error",
		},
		{
			name:              "Request offset within buffer range",
			bufferStartOffset: 4,
			currentOffset:     10,
			requestedOffset:   7,
			hasData:           true,
			expectError:       nil,
			description:       "Offset within buffer range should return data without error",
		},
	}

	for _, tt := range tests {
		t.Run(tt.name, func(t *testing.T) {
			// Create a LogBuffer with minimal configuration
			lb := NewLogBuffer("test", time.Hour, nil, nil, func() {})

			// Simulate data that has been flushed to disk by setting bufferStartOffset
			lb.bufferStartOffset = tt.bufferStartOffset
			lb.offset = tt.currentOffset

			// CRITICAL: Mark this as an offset-based buffer
			lb.hasOffsets = true

			// Add some data to the buffer if needed (at current offset position)
			if tt.hasData {
				testData := []byte("test message")
				// Use AddLogEntryToBuffer to preserve offset information
				lb.AddLogEntryToBuffer(&filer_pb.LogEntry{
					TsNs:   time.Now().UnixNano(),
					Key:    []byte("key"),
					Data:   testData,
					Offset: tt.currentOffset, // Add data at current offset
				})
			}

			// Create an offset-based position for the requested offset
			requestPosition := NewMessagePositionFromOffset(tt.requestedOffset)

			// Try to read from the buffer
			buf, batchIdx, err := lb.ReadFromBuffer(requestPosition)

			// Verify the error matches expectations
			if tt.expectError != nil {
				if err != tt.expectError {
					t.Errorf("%s\nExpected error: %v\nGot error: %v\nbuf=%v, batchIdx=%d",
						tt.description, tt.expectError, err, buf != nil, batchIdx)
				} else {
					t.Logf("✓ %s: correctly returned %v", tt.description, err)
				}
			} else {
				if err != nil {
					t.Errorf("%s\nExpected no error but got: %v\nbuf=%v, batchIdx=%d",
						tt.description, err, buf != nil, batchIdx)
				} else {
					t.Logf("✓ %s: correctly returned data without error", tt.description)
				}
			}
		})
	}
}

// TestReadFromBuffer_OldOffsetWithNoPrevBuffers specifically tests the bug fix
// where requesting an old offset would return nil instead of ResumeFromDiskError
func TestReadFromBuffer_OldOffsetWithNoPrevBuffers(t *testing.T) {
	// This is the exact scenario that caused the Schema Registry to hang:
	// 1. Data was published to _schemas topic (offsets 0, 1, 2, 3)
	// 2. Data was flushed to disk
	// 3. LogBuffer's bufferStartOffset was updated to 4
	// 4. Schema Registry tried to read from offset 0
	// 5. ReadFromBuffer would return (nil, offset, nil) instead of ResumeFromDiskError
	// 6. The subscriber would wait forever for data that would never come from memory

	lb := NewLogBuffer("_schemas", time.Hour, nil, nil, func() {})

	// Simulate the state after data has been flushed to disk:
	// - bufferStartOffset = 10 (data 0-9 has been flushed)
	// - offset = 15 (next offset to assign, current buffer has 10-14)
	// - pos = 100 (some data in current buffer)
	// Set prevBuffers to have non-overlapping ranges to avoid the safety check at line 420-428
	lb.bufferStartOffset = 10
	lb.offset = 15
	lb.pos = 100

	// Modify prevBuffers to have non-zero offset ranges that DON'T include the requested offset
	// This bypasses the safety check and exposes the real bug
	for i := range lb.prevBuffers.buffers {
		lb.prevBuffers.buffers[i].startOffset = 20 + int64(i)*10 // 20, 30, 40, etc.
		lb.prevBuffers.buffers[i].offset = 25 + int64(i)*10      // 25, 35, 45, etc.
		lb.prevBuffers.buffers[i].size = 0                       // Empty (flushed)
	}

	// Schema Registry requests offset 5 (which is before bufferStartOffset=10)
	requestPosition := NewMessagePositionFromOffset(5)

	// Before the fix, this would return (nil, offset, nil) causing an infinite wait
	// After the fix, this should return ResumeFromDiskError
	buf, batchIdx, err := lb.ReadFromBuffer(requestPosition)

	t.Logf("DEBUG: ReadFromBuffer returned: buf=%v, batchIdx=%d, err=%v", buf != nil, batchIdx, err)
	t.Logf("DEBUG: Buffer state: bufferStartOffset=%d, offset=%d, pos=%d",
		lb.bufferStartOffset, lb.offset, lb.pos)
	t.Logf("DEBUG: Requested offset 5, prevBuffers[0] range: [%d-%d]",
		lb.prevBuffers.buffers[0].startOffset, lb.prevBuffers.buffers[0].offset)

	if err != ResumeFromDiskError {
		t.Errorf("CRITICAL BUG REPRODUCED: Expected ResumeFromDiskError but got err=%v, buf=%v, batchIdx=%d\n"+
			"This causes Schema Registry to hang indefinitely waiting for data that's on disk!",
			err, buf != nil, batchIdx)
		t.Errorf("The buggy code falls through without returning ResumeFromDiskError!")
	} else {
		t.Logf("✓ BUG FIX VERIFIED: Correctly returns ResumeFromDiskError when requesting old offset 5")
		t.Logf("  This allows the subscriber to read from disk instead of waiting forever")
	}
}

// TestReadFromBuffer_EmptyBufferAtCurrentOffset tests Bug #2
// where an empty buffer at the current offset would return empty data instead of ResumeFromDiskError
func TestReadFromBuffer_EmptyBufferAtCurrentOffset(t *testing.T) {
	lb := NewLogBuffer("_schemas", time.Hour, nil, nil, func() {})

	// Simulate buffer state where data 0-3 was published and flushed, but buffer NOT advanced yet:
	// - bufferStartOffset = 0 (buffer hasn't been advanced after flush)
	// - offset = 4 (next offset to assign - data 0-3 exists)
	// - pos = 0 (buffer is empty after flush)
	// This happens in the window between flush and buffer advancement
	lb.bufferStartOffset = 0
	lb.offset = 4
	lb.pos = 0

	// Schema Registry requests offset 0 (which appears to be in range [0, 4])
	requestPosition := NewMessagePositionFromOffset(0)

	// BUG: Without fix, this returns empty buffer instead of checking disk
	// FIX: Should return ResumeFromDiskError because buffer is empty (pos=0) despite valid range
	buf, batchIdx, err := lb.ReadFromBuffer(requestPosition)

	t.Logf("DEBUG: ReadFromBuffer returned: buf=%v, batchIdx=%d, err=%v", buf != nil, batchIdx, err)
	t.Logf("DEBUG: Buffer state: bufferStartOffset=%d, offset=%d, pos=%d",
		lb.bufferStartOffset, lb.offset, lb.pos)

	if err != ResumeFromDiskError {
		if buf == nil || len(buf.Bytes()) == 0 {
			t.Errorf("CRITICAL BUG #2 REPRODUCED: Empty buffer should return ResumeFromDiskError, got err=%v, buf=%v\n"+
				"Without the fix, Schema Registry gets empty data instead of reading from disk!",
				err, buf != nil)
		}
	} else {
		t.Logf("✓ BUG #2 FIX VERIFIED: Empty buffer correctly returns ResumeFromDiskError to check disk")
	}
}

// TestReadFromBuffer_OffsetRanges tests various offset range scenarios
func TestReadFromBuffer_OffsetRanges(t *testing.T) {
	lb := NewLogBuffer("test", time.Hour, nil, nil, func() {})

	// Setup: buffer contains offsets 10-20
	lb.bufferStartOffset = 10
	lb.offset = 20
	lb.pos = 100 // some data in buffer

	testCases := []struct {
		name            string
		requestedOffset int64
		expectedError   error
		description     string
	}{
		{
			name:            "Before buffer start",
			requestedOffset: 5,
			expectedError:   ResumeFromDiskError,
			description:     "Offset 5 < bufferStartOffset 10 → read from disk",
		},
		{
			name:            "At buffer start",
			requestedOffset: 10,
			expectedError:   nil,
			description:     "Offset 10 == bufferStartOffset 10 → read from buffer",
		},
		{
			name:            "Within buffer range",
			requestedOffset: 15,
			expectedError:   nil,
			description:     "Offset 15 is within [10, 20] → read from buffer",
		},
		{
			name:            "At buffer end",
			requestedOffset: 20,
			expectedError:   nil,
			description:     "Offset 20 == offset 20 → read from buffer",
		},
		{
			name:            "After buffer end",
			requestedOffset: 25,
			expectedError:   nil,
			description:     "Offset 25 > offset 20 → future data, return nil without error",
		},
	}

	for _, tc := range testCases {
		t.Run(tc.name, func(t *testing.T) {
			requestPosition := NewMessagePositionFromOffset(tc.requestedOffset)
			_, _, err := lb.ReadFromBuffer(requestPosition)

			if tc.expectedError != nil {
				if err != tc.expectedError {
					t.Errorf("%s\nExpected error: %v, got: %v", tc.description, tc.expectedError, err)
				} else {
					t.Logf("✓ %s", tc.description)
				}
			} else {
				// For nil expectedError, we accept either nil or no error condition
				// (future offsets return nil without error)
				if err != nil && err != ResumeFromDiskError {
					t.Errorf("%s\nExpected no ResumeFromDiskError, got: %v", tc.description, err)
				} else {
					t.Logf("✓ %s", tc.description)
				}
			}
		})
	}
}

// TestReadFromBuffer_InitializedFromDisk tests Bug #3
// where bufferStartOffset was incorrectly set to 0 after InitializeOffsetFromExistingData,
// causing reads for old offsets to return new data instead of triggering a disk read.
func TestReadFromBuffer_InitializedFromDisk(t *testing.T) {
	// This reproduces the real Schema Registry bug scenario:
	// 1. Broker restarts, finds 4 messages on disk (offsets 0-3)
	// 2. InitializeOffsetFromExistingData sets offset=4
	//    - BUG: bufferStartOffset=0 (wrong!)
	//    - FIX: bufferStartOffset=4 (correct!)
	// 3. First new message is written (offset 4)
	// 4. Schema Registry reads offset 0
	// 5. With FIX: requestedOffset=0 < bufferStartOffset=4 → ResumeFromDiskError (correct!)
	// 6. Without FIX: requestedOffset=0 in range [0, 5] → returns wrong data (bug!)

	lb := NewLogBuffer("_schemas", time.Hour, nil, nil, func() {})

	// Use the actual InitializeOffsetFromExistingData to test the fix
	err := lb.InitializeOffsetFromExistingData(func() (int64, error) {
		return 3, nil // Simulate 4 messages on disk (offsets 0-3, highest=3)
	})
	if err != nil {
		t.Fatalf("InitializeOffsetFromExistingData failed: %v", err)
	}

	t.Logf("After InitializeOffsetFromExistingData(highestOffset=3):")
	t.Logf("  offset=%d (should be 4), bufferStartOffset=%d (FIX: should be 4, not 0)",
		lb.offset, lb.bufferStartOffset)

	// Now write a new message at offset 4
	lb.AddToBuffer(&mq_pb.DataMessage{
		Key:   []byte("new-key"),
		Value: []byte("new-message-at-offset-4"),
		TsNs:  time.Now().UnixNano(),
	})
	// After AddToBuffer: offset=5, pos>0

	// Schema Registry tries to read offset 0 (should be on disk)
	requestPosition := NewMessagePositionFromOffset(0)

	buf, batchIdx, err := lb.ReadFromBuffer(requestPosition)

	t.Logf("After writing new message:")
	t.Logf("  bufferStartOffset=%d, offset=%d, pos=%d", lb.bufferStartOffset, lb.offset, lb.pos)
	t.Logf("  Requested offset 0, got: buf=%v, batchIdx=%d, err=%v", buf != nil, batchIdx, err)

	// EXPECTED BEHAVIOR (with fix):
	// bufferStartOffset=4 after initialization, so requestedOffset=0 < bufferStartOffset=4
	// → returns ResumeFromDiskError

	// BUGGY BEHAVIOR (without fix):
	// bufferStartOffset=0 after initialization, so requestedOffset=0 is in range [0, 5]
	// → returns the NEW message (offset 4) instead of reading from disk!

	if err != ResumeFromDiskError {
		t.Errorf("CRITICAL BUG #3 REPRODUCED: Reading offset 0 after initialization from disk should return ResumeFromDiskError\n"+
			"Instead got: err=%v, buf=%v, batchIdx=%d\n"+
			"This means Schema Registry would receive WRONG data (offset 4) when requesting offset 0!",
			err, buf != nil, batchIdx)
		t.Errorf("Root cause: bufferStartOffset=%d should be 4 after InitializeOffsetFromExistingData(highestOffset=3)",
			lb.bufferStartOffset)
	} else {
		t.Logf("✓ BUG #3 FIX VERIFIED: Reading old offset 0 correctly returns ResumeFromDiskError")
		t.Logf("  This ensures Schema Registry reads correct data from disk instead of getting new messages")
	}
}

// TestLoopProcessLogDataWithOffset_DiskReadRetry tests that when a subscriber
// reads from disk before flush completes, it continues to retry disk reads
// and eventually finds the data after flush completes.
// This reproduces the Schema Registry timeout issue on first start.
func TestLoopProcessLogDataWithOffset_DiskReadRetry(t *testing.T) {
	diskReadCallCount := 0
	diskReadMu := sync.Mutex{}
	dataFlushedToDisk := false
	var flushedData []*filer_pb.LogEntry

	// Create a readFromDiskFn that simulates the race condition
	readFromDiskFn := func(startPosition MessagePosition, stopTsNs int64, eachLogEntryFn EachLogEntryFuncType) (MessagePosition, bool, error) {
		diskReadMu.Lock()
		diskReadCallCount++
		callNum := diskReadCallCount
		hasData := dataFlushedToDisk
		diskReadMu.Unlock()

		t.Logf("DISK READ #%d: startOffset=%d, dataFlushedToDisk=%v", callNum, startPosition.Offset, hasData)

		if !hasData {
			// Simulate: data not yet on disk (flush hasn't completed)
			t.Logf("  → No data found (flush not completed yet)")
			return startPosition, false, nil
		}

		// Data is now on disk, process it
		t.Logf("  → Found %d entries on disk", len(flushedData))
		for _, entry := range flushedData {
			if entry.Offset >= startPosition.Offset {
				isDone, err := eachLogEntryFn(entry)
				if err != nil || isDone {
					return NewMessagePositionFromOffset(entry.Offset + 1), isDone, err
				}
			}
		}
		return NewMessagePositionFromOffset(int64(len(flushedData))), false, nil
	}

	flushFn := func(logBuffer *LogBuffer, startTime, stopTime time.Time, buf []byte, minOffset, maxOffset int64) {
		t.Logf("FLUSH: minOffset=%d maxOffset=%d size=%d bytes", minOffset, maxOffset, len(buf))
		// Simulate writing to disk
		diskReadMu.Lock()
		dataFlushedToDisk = true
		// Parse the buffer and add entries to flushedData
		// For this test, we'll just create mock entries
		flushedData = append(flushedData, &filer_pb.LogEntry{
			Key:    []byte("key-0"),
			Data:   []byte("message-0"),
			TsNs:   time.Now().UnixNano(),
			Offset: 0,
		})
		diskReadMu.Unlock()
	}

	logBuffer := NewLogBuffer("test", 1*time.Minute, flushFn, readFromDiskFn, nil)
	defer logBuffer.ShutdownLogBuffer()

	// Simulate the race condition:
	// 1. Subscriber starts reading from offset 0
	// 2. Data is not yet flushed
	// 3. Loop calls readFromDiskFn → no data found
	// 4. A bit later, data gets flushed
	// 5. Loop should continue and call readFromDiskFn again

	receivedMessages := 0
	mu := sync.Mutex{}
	maxIterations := 50 // Allow up to 50 iterations (500ms with 10ms sleep each)
	iterationCount := 0

	waitForDataFn := func() bool {
		mu.Lock()
		defer mu.Unlock()
		iterationCount++
		// Stop after receiving message or max iterations
		return receivedMessages == 0 && iterationCount < maxIterations
	}

	eachLogEntryFn := func(logEntry *filer_pb.LogEntry, offset int64) (bool, error) {
		mu.Lock()
		receivedMessages++
		mu.Unlock()
		t.Logf("✉️  RECEIVED: offset=%d key=%s", offset, string(logEntry.Key))
		return true, nil // Stop after first message
	}

	// Start the reader in a goroutine
	var readerWg sync.WaitGroup
	readerWg.Add(1)
	go func() {
		defer readerWg.Done()
		startPosition := NewMessagePositionFromOffset(0)
		_, isDone, err := logBuffer.LoopProcessLogDataWithOffset("test-subscriber", startPosition, 0, waitForDataFn, eachLogEntryFn)
		t.Logf("📋 Reader finished: isDone=%v, err=%v", isDone, err)
	}()

	// Wait a bit to let the first disk read happen (returns no data)
	time.Sleep(50 * time.Millisecond)

	// Now add data and flush it
	t.Logf("➕ Adding message to buffer...")
	logBuffer.AddToBuffer(&mq_pb.DataMessage{
		Key:   []byte("key-0"),
		Value: []byte("message-0"),
		TsNs:  time.Now().UnixNano(),
	})

	// Force flush
	t.Logf("Force flushing...")
	logBuffer.ForceFlush()

	// Wait for reader to finish
	readerWg.Wait()

	// Check results
	diskReadMu.Lock()
	finalDiskReadCount := diskReadCallCount
	diskReadMu.Unlock()

	mu.Lock()
	finalReceivedMessages := receivedMessages
	finalIterations := iterationCount
	mu.Unlock()

	t.Logf("\nRESULTS:")
	t.Logf("  Disk reads: %d", finalDiskReadCount)
	t.Logf("  Received messages: %d", finalReceivedMessages)
	t.Logf("  Loop iterations: %d", finalIterations)

	if finalDiskReadCount < 2 {
		t.Errorf("CRITICAL BUG REPRODUCED: Disk read was only called %d time(s)", finalDiskReadCount)
		t.Errorf("Expected: Multiple disk reads as the loop continues after flush completes")
		t.Errorf("This is why Schema Registry times out - it reads once before flush, never re-reads after flush")
	}

	if finalReceivedMessages == 0 {
		t.Errorf("SCHEMA REGISTRY TIMEOUT REPRODUCED: No messages received even after flush")
		t.Errorf("The subscriber is stuck because disk reads are not retried")
	} else {
		t.Logf("✓ SUCCESS: Message received after %d disk read attempts", finalDiskReadCount)
	}
}