aboutsummaryrefslogtreecommitdiff
path: root/weed/util/log_buffer/log_read_stateless.go
blob: abc7d9ac0a13c72e27f4af39856643e01be53706 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
package log_buffer

import (
	"fmt"
	"time"

	"github.com/seaweedfs/seaweedfs/weed/glog"
	"github.com/seaweedfs/seaweedfs/weed/pb/filer_pb"
	"github.com/seaweedfs/seaweedfs/weed/util"
	"google.golang.org/protobuf/proto"
)

// ReadMessagesAtOffset provides Kafka-style stateless reads from LogBuffer
// Each call is completely independent - no state maintained between calls
// Thread-safe for concurrent reads at different offsets
//
// This is the recommended API for stateless clients like Kafka gateway
// Unlike Subscribe loops, this:
// 1. Returns immediately with available data (or empty if none)
// 2. Does not maintain any session state
// 3. Safe for concurrent calls
// 4. No cancellation/restart complexity
//
// Returns:
// - messages: Array of messages starting at startOffset
// - nextOffset: Offset to use for next fetch
// - highWaterMark: Highest offset available in partition
// - endOfPartition: True if no more data available
// - err: Any error encountered
func (logBuffer *LogBuffer) ReadMessagesAtOffset(startOffset int64, maxMessages int, maxBytes int) (
	messages []*filer_pb.LogEntry,
	nextOffset int64,
	highWaterMark int64,
	endOfPartition bool,
	err error,
) {
	// Quick validation
	if maxMessages <= 0 {
		maxMessages = 100 // Default reasonable batch size
	}
	if maxBytes <= 0 {
		maxBytes = 4 * 1024 * 1024 // 4MB default
	}

	messages = make([]*filer_pb.LogEntry, 0, maxMessages)
	nextOffset = startOffset

	// Try to read from in-memory buffers first (hot path)
	logBuffer.RLock()
	currentBufferEnd := logBuffer.offset
	bufferStartOffset := logBuffer.bufferStartOffset
	highWaterMark = currentBufferEnd

	// Special case: empty buffer (no data written yet)
	if currentBufferEnd == 0 && bufferStartOffset == 0 && logBuffer.pos == 0 {
		logBuffer.RUnlock()
		// Return empty result - partition exists but has no data yet
		// Preserve the requested offset in nextOffset
		return messages, startOffset, 0, true, nil
	}

	// Check if requested offset is in current buffer
	if startOffset >= bufferStartOffset && startOffset < currentBufferEnd {
		// Read from current buffer
		glog.V(4).Infof("[StatelessRead] Reading from current buffer: start=%d, end=%d",
			bufferStartOffset, currentBufferEnd)

		if logBuffer.pos > 0 {
			// Make a copy of the buffer to avoid concurrent modification
			bufCopy := make([]byte, logBuffer.pos)
			copy(bufCopy, logBuffer.buf[:logBuffer.pos])
			logBuffer.RUnlock() // Release lock early

			// Parse messages from buffer copy
			messages, nextOffset, _, err = parseMessagesFromBuffer(
				bufCopy, startOffset, maxMessages, maxBytes)

			if err != nil {
				return nil, startOffset, highWaterMark, false, err
			}

			glog.V(4).Infof("[StatelessRead] Read %d messages from current buffer, nextOffset=%d",
				len(messages), nextOffset)

			// Check if we reached the end
			endOfPartition = (nextOffset >= currentBufferEnd) && (len(messages) == 0 || len(messages) < maxMessages)
			return messages, nextOffset, highWaterMark, endOfPartition, nil
		}

		// Buffer is empty but offset is in range - check previous buffers
		logBuffer.RUnlock()

		// Try previous buffers
		logBuffer.RLock()
		for _, prevBuf := range logBuffer.prevBuffers.buffers {
			if startOffset >= prevBuf.startOffset && startOffset <= prevBuf.offset {
				if prevBuf.size > 0 {
					// Found in previous buffer
					bufCopy := make([]byte, prevBuf.size)
					copy(bufCopy, prevBuf.buf[:prevBuf.size])
					logBuffer.RUnlock()

					messages, nextOffset, _, err = parseMessagesFromBuffer(
						bufCopy, startOffset, maxMessages, maxBytes)

					if err != nil {
						return nil, startOffset, highWaterMark, false, err
					}

					glog.V(4).Infof("[StatelessRead] Read %d messages from previous buffer, nextOffset=%d",
						len(messages), nextOffset)

					endOfPartition = false // More data might be in current buffer
					return messages, nextOffset, highWaterMark, endOfPartition, nil
				}
				// Empty previous buffer means data was flushed to disk - fall through to disk read
				glog.V(2).Infof("[StatelessRead] Data at offset %d was flushed, attempting disk read", startOffset)
				break
			}
		}
		logBuffer.RUnlock()

		// Data not in memory - attempt disk read if configured
		// Don't return error here - data may be on disk!
		// Fall through to disk read logic below
		glog.V(2).Infof("[StatelessRead] Data at offset %d not in memory (buffer: %d-%d), attempting disk read",
			startOffset, bufferStartOffset, currentBufferEnd)
		// Don't return error - continue to disk read check below
	} else {
		// Offset is not in current buffer - check previous buffers FIRST before going to disk
		// This handles the case where data was just flushed but is still in prevBuffers

		for _, prevBuf := range logBuffer.prevBuffers.buffers {
			if startOffset >= prevBuf.startOffset && startOffset <= prevBuf.offset {
				if prevBuf.size > 0 {
					// Found in previous buffer!
					bufCopy := make([]byte, prevBuf.size)
					copy(bufCopy, prevBuf.buf[:prevBuf.size])
					logBuffer.RUnlock()

					messages, nextOffset, _, err = parseMessagesFromBuffer(
						bufCopy, startOffset, maxMessages, maxBytes)

					if err != nil {
						return nil, startOffset, highWaterMark, false, err
					}

					endOfPartition = false // More data might exist
					return messages, nextOffset, highWaterMark, endOfPartition, nil
				}
				// Empty previous buffer - data was flushed to disk
				glog.V(2).Infof("[StatelessRead] Found empty previous buffer for offset %d, will try disk", startOffset)
				break
			}
		}
		logBuffer.RUnlock()
	}

	// If we get here, unlock if not already unlocked
	// (Note: logBuffer.RUnlock() was called above in all paths)

	// Data not in memory - try disk read
	// This handles two cases:
	// 1. startOffset < bufferStartOffset: Historical data
	// 2. startOffset in buffer range but not in memory: Data was flushed (from fall-through above)
	if startOffset < currentBufferEnd {
		// Historical data or flushed data - try to read from disk if ReadFromDiskFn is configured
		if startOffset < bufferStartOffset {
			glog.Errorf("[StatelessRead] CASE 1: Historical data - offset %d < bufferStart %d",
				startOffset, bufferStartOffset)
		} else {
			glog.Errorf("[StatelessRead] CASE 2: Flushed data - offset %d in range [%d, %d) but not in memory",
				startOffset, bufferStartOffset, currentBufferEnd)
		}

		// Check if disk read function is configured
		if logBuffer.ReadFromDiskFn == nil {
			glog.Errorf("[StatelessRead] CRITICAL: ReadFromDiskFn is NIL! Cannot read from disk.")
			if startOffset < bufferStartOffset {
				return messages, startOffset, highWaterMark, false, fmt.Errorf("offset %d too old (earliest in-memory: %d), and ReadFromDiskFn is nil",
					startOffset, bufferStartOffset)
			}
			return messages, startOffset, highWaterMark, false, fmt.Errorf("offset %d not in memory (buffer: %d-%d), and ReadFromDiskFn is nil",
				startOffset, bufferStartOffset, currentBufferEnd)
		}

		// Read from disk (this is async/non-blocking if the ReadFromDiskFn is properly implemented)
		// The ReadFromDiskFn should handle its own timeouts and not block indefinitely
		diskMessages, diskNextOffset, diskErr := readHistoricalDataFromDisk(
			logBuffer, startOffset, maxMessages, maxBytes, highWaterMark)

		if diskErr != nil {
			glog.Errorf("[StatelessRead] CRITICAL: Disk read FAILED for offset %d: %v", startOffset, diskErr)
			// IMPORTANT: Return retryable error instead of silently returning empty!
			return messages, startOffset, highWaterMark, false, fmt.Errorf("disk read failed for offset %d: %v", startOffset, diskErr)
		}

		if len(diskMessages) == 0 {
			glog.Errorf("[StatelessRead] WARNING: Disk read returned 0 messages for offset %d (HWM=%d, bufferStart=%d)",
				startOffset, highWaterMark, bufferStartOffset)
		}

		// Return disk data
		endOfPartition = diskNextOffset >= bufferStartOffset && len(diskMessages) < maxMessages
		return diskMessages, diskNextOffset, highWaterMark, endOfPartition, nil
	}

	// startOffset >= currentBufferEnd - future offset, no data available yet
	glog.V(4).Infof("[StatelessRead] Future offset %d >= buffer end %d, no data available",
		startOffset, currentBufferEnd)
	return messages, startOffset, highWaterMark, true, nil
}

// readHistoricalDataFromDisk reads messages from disk for historical offsets
// This is called when the requested offset is older than what's in memory
// Uses an in-memory cache to avoid repeated disk I/O for the same chunks
func readHistoricalDataFromDisk(
	logBuffer *LogBuffer,
	startOffset int64,
	maxMessages int,
	maxBytes int,
	highWaterMark int64,
) (messages []*filer_pb.LogEntry, nextOffset int64, err error) {
	const chunkSize = 1000 // Size of each cached chunk

	// Calculate chunk start offset (aligned to chunkSize boundary)
	chunkStartOffset := (startOffset / chunkSize) * chunkSize

	// Try to get from cache first
	cachedMessages, cacheHit := getCachedDiskChunk(logBuffer, chunkStartOffset)

	if cacheHit {
		// Found in cache - extract requested messages
		result, nextOff, err := extractMessagesFromCache(cachedMessages, startOffset, maxMessages, maxBytes)

		if err != nil {
			// CRITICAL: Cache extraction failed because requested offset is BEYOND cached chunk
			// This means disk files only contain partial data (e.g., 1000-1763) and the
			// requested offset (e.g., 1764) is in a gap between disk and memory.
			//
			// SOLUTION: Return empty result with NO ERROR to let ReadMessagesAtOffset
			// continue to check memory buffers. The data might be in memory even though
			// it's not on disk.
			glog.Errorf("[DiskCache] Offset %d is beyond cached chunk (start=%d, size=%d)",
				startOffset, chunkStartOffset, len(cachedMessages))

			// Return empty but NO ERROR - this signals "not on disk, try memory"
			return nil, startOffset, nil
		}

		// Success - return cached data
		return result, nextOff, nil
	}

	// Not in cache - read entire chunk from disk for caching
	chunkMessages := make([]*filer_pb.LogEntry, 0, chunkSize)
	chunkNextOffset := chunkStartOffset

	// Create a position for the chunk start
	chunkPosition := MessagePosition{
		IsOffsetBased: true,
		Offset:        chunkStartOffset,
	}

	// Define callback to collect the entire chunk
	eachMessageFn := func(logEntry *filer_pb.LogEntry) (isDone bool, err error) {
		// Read up to chunkSize messages for caching
		if len(chunkMessages) >= chunkSize {
			return true, nil
		}

		chunkMessages = append(chunkMessages, logEntry)
		chunkNextOffset++

		// Continue reading the chunk
		return false, nil
	}

	// Read chunk from disk
	_, _, readErr := logBuffer.ReadFromDiskFn(chunkPosition, 0, eachMessageFn)

	if readErr != nil {
		glog.Errorf("[DiskRead] CRITICAL: ReadFromDiskFn returned ERROR: %v", readErr)
		return nil, startOffset, fmt.Errorf("failed to read from disk: %w", readErr)
	}

	// Cache the chunk for future reads
	if len(chunkMessages) > 0 {
		cacheDiskChunk(logBuffer, chunkStartOffset, chunkNextOffset-1, chunkMessages)
	} else {
		glog.Errorf("[DiskRead] WARNING: ReadFromDiskFn returned 0 messages for chunkStart=%d", chunkStartOffset)
	}

	// Extract requested messages from the chunk
	result, resNextOffset, resErr := extractMessagesFromCache(chunkMessages, startOffset, maxMessages, maxBytes)
	return result, resNextOffset, resErr
}

// getCachedDiskChunk retrieves a cached disk chunk if available
func getCachedDiskChunk(logBuffer *LogBuffer, chunkStartOffset int64) ([]*filer_pb.LogEntry, bool) {
	logBuffer.diskChunkCache.mu.RLock()
	defer logBuffer.diskChunkCache.mu.RUnlock()

	if chunk, exists := logBuffer.diskChunkCache.chunks[chunkStartOffset]; exists {
		// Update last access time
		chunk.lastAccess = time.Now()
		return chunk.messages, true
	}

	return nil, false
}

// invalidateCachedDiskChunk removes a chunk from the cache
// This is called when cached data is found to be incomplete or incorrect
func invalidateCachedDiskChunk(logBuffer *LogBuffer, chunkStartOffset int64) {
	logBuffer.diskChunkCache.mu.Lock()
	defer logBuffer.diskChunkCache.mu.Unlock()

	if _, exists := logBuffer.diskChunkCache.chunks[chunkStartOffset]; exists {
		delete(logBuffer.diskChunkCache.chunks, chunkStartOffset)
	}
}

// cacheDiskChunk stores a disk chunk in the cache with LRU eviction
func cacheDiskChunk(logBuffer *LogBuffer, startOffset, endOffset int64, messages []*filer_pb.LogEntry) {
	logBuffer.diskChunkCache.mu.Lock()
	defer logBuffer.diskChunkCache.mu.Unlock()

	// Check if we need to evict old chunks (LRU policy)
	if len(logBuffer.diskChunkCache.chunks) >= logBuffer.diskChunkCache.maxChunks {
		// Find least recently used chunk
		var oldestOffset int64
		var oldestTime time.Time
		first := true

		for offset, chunk := range logBuffer.diskChunkCache.chunks {
			if first || chunk.lastAccess.Before(oldestTime) {
				oldestOffset = offset
				oldestTime = chunk.lastAccess
				first = false
			}
		}

		// Evict oldest chunk
		delete(logBuffer.diskChunkCache.chunks, oldestOffset)
		glog.V(4).Infof("[DiskCache] Evicted chunk at offset %d (LRU)", oldestOffset)
	}

	// Store new chunk
	logBuffer.diskChunkCache.chunks[startOffset] = &CachedDiskChunk{
		startOffset: startOffset,
		endOffset:   endOffset,
		messages:    messages,
		lastAccess:  time.Now(),
	}
}

// extractMessagesFromCache extracts requested messages from a cached chunk
// chunkMessages contains messages starting from the chunk's aligned start offset
// We need to skip to the requested startOffset within the chunk
func extractMessagesFromCache(chunkMessages []*filer_pb.LogEntry, startOffset int64, maxMessages, maxBytes int) ([]*filer_pb.LogEntry, int64, error) {
	const chunkSize = 1000
	chunkStartOffset := (startOffset / chunkSize) * chunkSize

	// Calculate position within chunk
	positionInChunk := int(startOffset - chunkStartOffset)

	// Check if requested offset is within the chunk
	if positionInChunk < 0 {
		glog.Errorf("[DiskCache] CRITICAL: Requested offset %d is BEFORE chunk start %d (positionInChunk=%d < 0)",
			startOffset, chunkStartOffset, positionInChunk)
		return nil, startOffset, fmt.Errorf("offset %d before chunk start %d", startOffset, chunkStartOffset)
	}

	if positionInChunk >= len(chunkMessages) {
		// Requested offset is beyond the cached chunk
		// This happens when disk files only contain partial data
		// The requested offset might be in the gap between disk and memory

		// Return empty (data not on disk) - caller will check memory buffers
		return nil, startOffset, nil
	}

	// Extract messages starting from the requested position
	messages := make([]*filer_pb.LogEntry, 0, maxMessages)
	nextOffset := startOffset
	totalBytes := 0

	for i := positionInChunk; i < len(chunkMessages) && len(messages) < maxMessages; i++ {
		entry := chunkMessages[i]
		entrySize := proto.Size(entry)

		// Check byte limit
		if totalBytes > 0 && totalBytes+entrySize > maxBytes {
			break
		}

		messages = append(messages, entry)
		totalBytes += entrySize
		nextOffset++
	}

	glog.V(4).Infof("[DiskCache] Extracted %d messages from cache (offset %d-%d, bytes=%d)",
		len(messages), startOffset, nextOffset-1, totalBytes)

	return messages, nextOffset, nil
}

// parseMessagesFromBuffer parses messages from a buffer byte slice
// This is thread-safe as it operates on a copy of the buffer
func parseMessagesFromBuffer(buf []byte, startOffset int64, maxMessages int, maxBytes int) (
	messages []*filer_pb.LogEntry,
	nextOffset int64,
	totalBytes int,
	err error,
) {
	messages = make([]*filer_pb.LogEntry, 0, maxMessages)
	nextOffset = startOffset
	totalBytes = 0
	foundStart := false

	messagesInBuffer := 0
	for pos := 0; pos+4 < len(buf) && len(messages) < maxMessages && totalBytes < maxBytes; {
		// Read message size
		size := util.BytesToUint32(buf[pos : pos+4])
		if pos+4+int(size) > len(buf) {
			// Incomplete message at end of buffer
			glog.V(4).Infof("[parseMessages] Incomplete message at pos %d, size %d, bufLen %d",
				pos, size, len(buf))
			break
		}

		// Parse message
		entryData := buf[pos+4 : pos+4+int(size)]
		logEntry := &filer_pb.LogEntry{}
		if err = proto.Unmarshal(entryData, logEntry); err != nil {
			glog.Warningf("[parseMessages] Failed to unmarshal message: %v", err)
			pos += 4 + int(size)
			continue
		}

		messagesInBuffer++

		// Initialize foundStart from first message
		if !foundStart {
			// Find the first message at or after startOffset
			if logEntry.Offset >= startOffset {
				foundStart = true
				nextOffset = logEntry.Offset
			} else {
				// Skip messages before startOffset
				glog.V(3).Infof("[parseMessages] Skipping message at offset %d (before startOffset %d)", logEntry.Offset, startOffset)
				pos += 4 + int(size)
				continue
			}
		}

		// Check if this message matches expected offset
		if foundStart && logEntry.Offset >= startOffset {
			glog.V(3).Infof("[parseMessages] Adding message at offset %d (count=%d)", logEntry.Offset, len(messages)+1)
			messages = append(messages, logEntry)
			totalBytes += 4 + int(size)
			nextOffset = logEntry.Offset + 1
		}

		pos += 4 + int(size)
	}

	glog.V(4).Infof("[parseMessages] Parsed %d messages, nextOffset=%d, totalBytes=%d",
		len(messages), nextOffset, totalBytes)

	return messages, nextOffset, totalBytes, nil
}

// readMessagesFromDisk reads messages from disk using the ReadFromDiskFn
func (logBuffer *LogBuffer) readMessagesFromDisk(startOffset int64, maxMessages int, maxBytes int, highWaterMark int64) (
	messages []*filer_pb.LogEntry,
	nextOffset int64,
	highWaterMark2 int64,
	endOfPartition bool,
	err error,
) {
	if logBuffer.ReadFromDiskFn == nil {
		return nil, startOffset, highWaterMark, true,
			fmt.Errorf("no disk read function configured")
	}

	messages = make([]*filer_pb.LogEntry, 0, maxMessages)
	nextOffset = startOffset
	totalBytes := 0

	// Use a simple callback to collect messages
	collectFn := func(logEntry *filer_pb.LogEntry) (bool, error) {
		// Check limits
		if len(messages) >= maxMessages {
			return true, nil // Done
		}

		entrySize := 4 + len(logEntry.Data) + len(logEntry.Key)
		if totalBytes+entrySize > maxBytes {
			return true, nil // Done
		}

		// Only include messages at or after startOffset
		if logEntry.Offset >= startOffset {
			messages = append(messages, logEntry)
			totalBytes += entrySize
			nextOffset = logEntry.Offset + 1
		}

		return false, nil // Continue
	}

	// Read from disk
	startPos := NewMessagePositionFromOffset(startOffset)
	_, isDone, err := logBuffer.ReadFromDiskFn(startPos, 0, collectFn)

	if err != nil {
		glog.Warningf("[StatelessRead] Disk read error: %v", err)
		return nil, startOffset, highWaterMark, false, err
	}

	glog.V(4).Infof("[StatelessRead] Read %d messages from disk, nextOffset=%d, isDone=%v",
		len(messages), nextOffset, isDone)

	// If we read from disk and got no messages, and isDone is true, we're at the end
	endOfPartition = isDone && len(messages) == 0

	return messages, nextOffset, highWaterMark, endOfPartition, nil
}

// GetHighWaterMark returns the highest offset available in this partition
// This is a lightweight operation for clients to check partition state
func (logBuffer *LogBuffer) GetHighWaterMark() int64 {
	logBuffer.RLock()
	defer logBuffer.RUnlock()
	return logBuffer.offset
}

// GetLogStartOffset returns the earliest offset available (either in memory or on disk)
// This is useful for clients to know the valid offset range
func (logBuffer *LogBuffer) GetLogStartOffset() int64 {
	logBuffer.RLock()
	defer logBuffer.RUnlock()

	// Check if we have offset information
	if !logBuffer.hasOffsets {
		return 0
	}

	// Return the current buffer start offset - this is the earliest offset in memory RIGHT NOW
	// For stateless fetch, we only return what's currently available in memory
	// We don't check prevBuffers because they may be stale or getting flushed
	return logBuffer.bufferStartOffset
}

// WaitForDataWithTimeout waits up to maxWaitMs for data to be available at startOffset
// Returns true if data became available, false if timeout
// This allows "long poll" behavior for real-time consumers
func (logBuffer *LogBuffer) WaitForDataWithTimeout(startOffset int64, maxWaitMs int) bool {
	if maxWaitMs <= 0 {
		return false
	}

	timeout := time.NewTimer(time.Duration(maxWaitMs) * time.Millisecond)
	defer timeout.Stop()

	// Register for notifications
	notifyChan := logBuffer.RegisterSubscriber(fmt.Sprintf("fetch-%d", startOffset))
	defer logBuffer.UnregisterSubscriber(fmt.Sprintf("fetch-%d", startOffset))

	// Check if data is already available
	logBuffer.RLock()
	currentEnd := logBuffer.offset
	logBuffer.RUnlock()

	if currentEnd >= startOffset {
		return true
	}

	// Wait for notification or timeout
	select {
	case <-notifyChan:
		// Data might be available now
		logBuffer.RLock()
		currentEnd := logBuffer.offset
		logBuffer.RUnlock()
		return currentEnd >= startOffset
	case <-timeout.C:
		return false
	}
}