diff options
Diffstat (limited to 'weed/util/log_buffer/log_read_stateless.go')
| -rw-r--r-- | weed/util/log_buffer/log_read_stateless.go | 639 |
1 files changed, 639 insertions, 0 deletions
diff --git a/weed/util/log_buffer/log_read_stateless.go b/weed/util/log_buffer/log_read_stateless.go new file mode 100644 index 000000000..b57f7742f --- /dev/null +++ b/weed/util/log_buffer/log_read_stateless.go @@ -0,0 +1,639 @@ +package log_buffer + +import ( + "fmt" + "time" + + "github.com/seaweedfs/seaweedfs/weed/glog" + "github.com/seaweedfs/seaweedfs/weed/pb/filer_pb" + "github.com/seaweedfs/seaweedfs/weed/util" + "google.golang.org/protobuf/proto" +) + +// ReadMessagesAtOffset provides Kafka-style stateless reads from LogBuffer +// Each call is completely independent - no state maintained between calls +// Thread-safe for concurrent reads at different offsets +// +// This is the recommended API for stateless clients like Kafka gateway +// Unlike Subscribe loops, this: +// 1. Returns immediately with available data (or empty if none) +// 2. Does not maintain any session state +// 3. Safe for concurrent calls +// 4. No cancellation/restart complexity +// +// Returns: +// - messages: Array of messages starting at startOffset +// - nextOffset: Offset to use for next fetch +// - highWaterMark: Highest offset available in partition +// - endOfPartition: True if no more data available +// - err: Any error encountered +func (logBuffer *LogBuffer) ReadMessagesAtOffset(startOffset int64, maxMessages int, maxBytes int) ( + messages []*filer_pb.LogEntry, + nextOffset int64, + highWaterMark int64, + endOfPartition bool, + err error, +) { + glog.Infof("[StatelessRead] ENTRY: startOffset=%d, maxMessages=%d, maxBytes=%d", + startOffset, maxMessages, maxBytes) + + // Quick validation + if maxMessages <= 0 { + maxMessages = 100 // Default reasonable batch size + } + if maxBytes <= 0 { + maxBytes = 4 * 1024 * 1024 // 4MB default + } + + messages = make([]*filer_pb.LogEntry, 0, maxMessages) + nextOffset = startOffset + + // Try to read from in-memory buffers first (hot path) + logBuffer.RLock() + currentBufferEnd := logBuffer.offset + bufferStartOffset := logBuffer.bufferStartOffset + highWaterMark = currentBufferEnd + + glog.Infof("[StatelessRead] Buffer state: startOffset=%d, bufferStart=%d, bufferEnd=%d, HWM=%d, pos=%d", + startOffset, bufferStartOffset, currentBufferEnd, highWaterMark, logBuffer.pos) + + // Special case: empty buffer (no data written yet) + if currentBufferEnd == 0 && bufferStartOffset == 0 && logBuffer.pos == 0 { + logBuffer.RUnlock() + glog.Infof("[StatelessRead] PATH: Empty buffer (no data written yet)") + // Return empty result - partition exists but has no data yet + // Preserve the requested offset in nextOffset + return messages, startOffset, 0, true, nil + } + + // Check if requested offset is in current buffer + if startOffset >= bufferStartOffset && startOffset < currentBufferEnd { + glog.Infof("[StatelessRead] PATH: Attempting to read from current/previous memory buffers") + // Read from current buffer + glog.V(4).Infof("[StatelessRead] Reading from current buffer: start=%d, end=%d", + bufferStartOffset, currentBufferEnd) + + if logBuffer.pos > 0 { + // Make a copy of the buffer to avoid concurrent modification + bufCopy := make([]byte, logBuffer.pos) + copy(bufCopy, logBuffer.buf[:logBuffer.pos]) + logBuffer.RUnlock() // Release lock early + + // Parse messages from buffer copy + messages, nextOffset, _, err = parseMessagesFromBuffer( + bufCopy, startOffset, maxMessages, maxBytes) + + if err != nil { + return nil, startOffset, highWaterMark, false, err + } + + glog.V(4).Infof("[StatelessRead] Read %d messages from current buffer, nextOffset=%d", + len(messages), nextOffset) + + // Check if we reached the end + endOfPartition = (nextOffset >= currentBufferEnd) && (len(messages) == 0 || len(messages) < maxMessages) + return messages, nextOffset, highWaterMark, endOfPartition, nil + } + + // Buffer is empty but offset is in range - check previous buffers + logBuffer.RUnlock() + + // Try previous buffers + logBuffer.RLock() + for _, prevBuf := range logBuffer.prevBuffers.buffers { + if startOffset >= prevBuf.startOffset && startOffset <= prevBuf.offset { + if prevBuf.size > 0 { + // Found in previous buffer + bufCopy := make([]byte, prevBuf.size) + copy(bufCopy, prevBuf.buf[:prevBuf.size]) + logBuffer.RUnlock() + + messages, nextOffset, _, err = parseMessagesFromBuffer( + bufCopy, startOffset, maxMessages, maxBytes) + + if err != nil { + return nil, startOffset, highWaterMark, false, err + } + + glog.V(4).Infof("[StatelessRead] Read %d messages from previous buffer, nextOffset=%d", + len(messages), nextOffset) + + endOfPartition = false // More data might be in current buffer + return messages, nextOffset, highWaterMark, endOfPartition, nil + } + // Empty previous buffer means data was flushed to disk - fall through to disk read + glog.V(2).Infof("[StatelessRead] Data at offset %d was flushed, attempting disk read", startOffset) + break + } + } + logBuffer.RUnlock() + + // Data not in memory - attempt disk read if configured + // CRITICAL FIX: Don't return error here - data may be on disk! + // Fall through to disk read logic below + glog.V(2).Infof("[StatelessRead] Data at offset %d not in memory (buffer: %d-%d), attempting disk read", + startOffset, bufferStartOffset, currentBufferEnd) + // Don't return error - continue to disk read check below + } else { + // Offset is not in current buffer - check previous buffers FIRST before going to disk + // This handles the case where data was just flushed but is still in prevBuffers + glog.Infof("[StatelessRead] PATH: Offset %d not in current buffer [%d-%d), checking previous buffers first", + startOffset, bufferStartOffset, currentBufferEnd) + + for _, prevBuf := range logBuffer.prevBuffers.buffers { + if startOffset >= prevBuf.startOffset && startOffset <= prevBuf.offset { + if prevBuf.size > 0 { + // Found in previous buffer! + bufCopy := make([]byte, prevBuf.size) + copy(bufCopy, prevBuf.buf[:prevBuf.size]) + logBuffer.RUnlock() + + messages, nextOffset, _, err = parseMessagesFromBuffer( + bufCopy, startOffset, maxMessages, maxBytes) + + if err != nil { + return nil, startOffset, highWaterMark, false, err + } + + glog.Infof("[StatelessRead] SUCCESS: Found %d messages in previous buffer, nextOffset=%d", + len(messages), nextOffset) + + endOfPartition = false // More data might exist + return messages, nextOffset, highWaterMark, endOfPartition, nil + } + // Empty previous buffer - data was flushed to disk + glog.V(2).Infof("[StatelessRead] Found empty previous buffer for offset %d, will try disk", startOffset) + break + } + } + logBuffer.RUnlock() + } + + // If we get here, unlock if not already unlocked + // (Note: logBuffer.RUnlock() was called above in all paths) + + // Data not in memory - try disk read + // This handles two cases: + // 1. startOffset < bufferStartOffset: Historical data + // 2. startOffset in buffer range but not in memory: Data was flushed (from fall-through above) + if startOffset < currentBufferEnd { + glog.Infof("[StatelessRead] PATH: Data not in memory, attempting DISK READ") + + // Historical data or flushed data - try to read from disk if ReadFromDiskFn is configured + if startOffset < bufferStartOffset { + glog.Errorf("[StatelessRead] CASE 1: Historical data - offset %d < bufferStart %d", + startOffset, bufferStartOffset) + } else { + glog.Errorf("[StatelessRead] CASE 2: Flushed data - offset %d in range [%d, %d) but not in memory", + startOffset, bufferStartOffset, currentBufferEnd) + } + + // Check if disk read function is configured + if logBuffer.ReadFromDiskFn == nil { + glog.Errorf("[StatelessRead] CRITICAL: ReadFromDiskFn is NIL! Cannot read from disk.") + if startOffset < bufferStartOffset { + return messages, startOffset, highWaterMark, false, fmt.Errorf("offset %d too old (earliest in-memory: %d), and ReadFromDiskFn is nil", + startOffset, bufferStartOffset) + } + return messages, startOffset, highWaterMark, false, fmt.Errorf("offset %d not in memory (buffer: %d-%d), and ReadFromDiskFn is nil", + startOffset, bufferStartOffset, currentBufferEnd) + } + + glog.Infof("[StatelessRead] ReadFromDiskFn is configured, calling readHistoricalDataFromDisk...") + + // Read from disk (this is async/non-blocking if the ReadFromDiskFn is properly implemented) + // The ReadFromDiskFn should handle its own timeouts and not block indefinitely + diskMessages, diskNextOffset, diskErr := readHistoricalDataFromDisk( + logBuffer, startOffset, maxMessages, maxBytes, highWaterMark) + + if diskErr != nil { + glog.Errorf("[StatelessRead] CRITICAL: Disk read FAILED for offset %d: %v", startOffset, diskErr) + // IMPORTANT: Return retryable error instead of silently returning empty! + return messages, startOffset, highWaterMark, false, fmt.Errorf("disk read failed for offset %d: %v", startOffset, diskErr) + } + + if len(diskMessages) == 0 { + glog.Errorf("[StatelessRead] WARNING: Disk read returned 0 messages for offset %d (HWM=%d, bufferStart=%d)", + startOffset, highWaterMark, bufferStartOffset) + } else { + glog.Infof("[StatelessRead] SUCCESS: Disk read returned %d messages, nextOffset=%d", + len(diskMessages), diskNextOffset) + } + + // Return disk data + endOfPartition = diskNextOffset >= bufferStartOffset && len(diskMessages) < maxMessages + return diskMessages, diskNextOffset, highWaterMark, endOfPartition, nil + } + + // startOffset >= currentBufferEnd - future offset, no data available yet + glog.V(4).Infof("[StatelessRead] Future offset %d >= buffer end %d, no data available", + startOffset, currentBufferEnd) + return messages, startOffset, highWaterMark, true, nil +} + +// readHistoricalDataFromDisk reads messages from disk for historical offsets +// This is called when the requested offset is older than what's in memory +// Uses an in-memory cache to avoid repeated disk I/O for the same chunks +func readHistoricalDataFromDisk( + logBuffer *LogBuffer, + startOffset int64, + maxMessages int, + maxBytes int, + highWaterMark int64, +) (messages []*filer_pb.LogEntry, nextOffset int64, err error) { + const chunkSize = 1000 // Size of each cached chunk + + glog.Infof("[DiskRead] ENTRY: startOffset=%d, maxMessages=%d, maxBytes=%d, HWM=%d", + startOffset, maxMessages, maxBytes, highWaterMark) + + // Calculate chunk start offset (aligned to chunkSize boundary) + chunkStartOffset := (startOffset / chunkSize) * chunkSize + + glog.Infof("[DiskRead] Calculated chunkStartOffset=%d (aligned from %d)", chunkStartOffset, startOffset) + + // Try to get from cache first + cachedMessages, cacheHit := getCachedDiskChunk(logBuffer, chunkStartOffset) + + if cacheHit { + // Found in cache - extract requested messages + glog.Infof("[DiskCache] Cache HIT for chunk starting at offset %d (requested: %d), cachedMessages=%d", + chunkStartOffset, startOffset, len(cachedMessages)) + + result, nextOff, err := extractMessagesFromCache(cachedMessages, startOffset, maxMessages, maxBytes) + + if err != nil { + // CRITICAL: Cache extraction failed because requested offset is BEYOND cached chunk + // This means disk files only contain partial data (e.g., 1000-1763) and the + // requested offset (e.g., 1764) is in a gap between disk and memory. + // + // SOLUTION: Return empty result with NO ERROR to let ReadMessagesAtOffset + // continue to check memory buffers. The data might be in memory even though + // it's not on disk. + glog.Errorf("[DiskCache] Offset %d is beyond cached chunk (start=%d, size=%d)", + startOffset, chunkStartOffset, len(cachedMessages)) + glog.Infof("[DiskCache] Returning empty to let memory buffers handle offset %d", startOffset) + + // Return empty but NO ERROR - this signals "not on disk, try memory" + return nil, startOffset, nil + } + + // Success - return cached data + return result, nextOff, nil + } + + glog.Infof("[DiskCache] Cache MISS for chunk starting at offset %d, reading from disk via ReadFromDiskFn", + chunkStartOffset) + + // Not in cache - read entire chunk from disk for caching + chunkMessages := make([]*filer_pb.LogEntry, 0, chunkSize) + chunkNextOffset := chunkStartOffset + + // Create a position for the chunk start + chunkPosition := MessagePosition{ + IsOffsetBased: true, + Offset: chunkStartOffset, + } + + // Define callback to collect the entire chunk + eachMessageFn := func(logEntry *filer_pb.LogEntry) (isDone bool, err error) { + // Read up to chunkSize messages for caching + if len(chunkMessages) >= chunkSize { + return true, nil + } + + chunkMessages = append(chunkMessages, logEntry) + chunkNextOffset++ + + // Continue reading the chunk + return false, nil + } + + // Read chunk from disk + glog.Infof("[DiskRead] Calling ReadFromDiskFn with position offset=%d...", chunkStartOffset) + _, _, readErr := logBuffer.ReadFromDiskFn(chunkPosition, 0, eachMessageFn) + + if readErr != nil { + glog.Errorf("[DiskRead] CRITICAL: ReadFromDiskFn returned ERROR: %v", readErr) + return nil, startOffset, fmt.Errorf("failed to read from disk: %w", readErr) + } + + glog.Infof("[DiskRead] ReadFromDiskFn completed successfully, read %d messages", len(chunkMessages)) + + // Cache the chunk for future reads + if len(chunkMessages) > 0 { + cacheDiskChunk(logBuffer, chunkStartOffset, chunkNextOffset-1, chunkMessages) + glog.Infof("[DiskCache] Cached chunk: offsets %d-%d (%d messages)", + chunkStartOffset, chunkNextOffset-1, len(chunkMessages)) + } else { + glog.Errorf("[DiskRead] WARNING: ReadFromDiskFn returned 0 messages for chunkStart=%d", chunkStartOffset) + } + + // Extract requested messages from the chunk + result, resNextOffset, resErr := extractMessagesFromCache(chunkMessages, startOffset, maxMessages, maxBytes) + glog.Infof("[DiskRead] EXIT: Returning %d messages, nextOffset=%d, err=%v", len(result), resNextOffset, resErr) + return result, resNextOffset, resErr +} + +// getCachedDiskChunk retrieves a cached disk chunk if available +func getCachedDiskChunk(logBuffer *LogBuffer, chunkStartOffset int64) ([]*filer_pb.LogEntry, bool) { + logBuffer.diskChunkCache.mu.RLock() + defer logBuffer.diskChunkCache.mu.RUnlock() + + if chunk, exists := logBuffer.diskChunkCache.chunks[chunkStartOffset]; exists { + // Update last access time + chunk.lastAccess = time.Now() + return chunk.messages, true + } + + return nil, false +} + +// invalidateCachedDiskChunk removes a chunk from the cache +// This is called when cached data is found to be incomplete or incorrect +func invalidateCachedDiskChunk(logBuffer *LogBuffer, chunkStartOffset int64) { + logBuffer.diskChunkCache.mu.Lock() + defer logBuffer.diskChunkCache.mu.Unlock() + + if _, exists := logBuffer.diskChunkCache.chunks[chunkStartOffset]; exists { + delete(logBuffer.diskChunkCache.chunks, chunkStartOffset) + glog.Infof("[DiskCache] Invalidated chunk at offset %d", chunkStartOffset) + } +} + +// cacheDiskChunk stores a disk chunk in the cache with LRU eviction +func cacheDiskChunk(logBuffer *LogBuffer, startOffset, endOffset int64, messages []*filer_pb.LogEntry) { + logBuffer.diskChunkCache.mu.Lock() + defer logBuffer.diskChunkCache.mu.Unlock() + + // Check if we need to evict old chunks (LRU policy) + if len(logBuffer.diskChunkCache.chunks) >= logBuffer.diskChunkCache.maxChunks { + // Find least recently used chunk + var oldestOffset int64 + var oldestTime time.Time + first := true + + for offset, chunk := range logBuffer.diskChunkCache.chunks { + if first || chunk.lastAccess.Before(oldestTime) { + oldestOffset = offset + oldestTime = chunk.lastAccess + first = false + } + } + + // Evict oldest chunk + delete(logBuffer.diskChunkCache.chunks, oldestOffset) + glog.V(4).Infof("[DiskCache] Evicted chunk at offset %d (LRU)", oldestOffset) + } + + // Store new chunk + logBuffer.diskChunkCache.chunks[startOffset] = &CachedDiskChunk{ + startOffset: startOffset, + endOffset: endOffset, + messages: messages, + lastAccess: time.Now(), + } +} + +// extractMessagesFromCache extracts requested messages from a cached chunk +// chunkMessages contains messages starting from the chunk's aligned start offset +// We need to skip to the requested startOffset within the chunk +func extractMessagesFromCache(chunkMessages []*filer_pb.LogEntry, startOffset int64, maxMessages, maxBytes int) ([]*filer_pb.LogEntry, int64, error) { + const chunkSize = 1000 + chunkStartOffset := (startOffset / chunkSize) * chunkSize + + // Calculate position within chunk + positionInChunk := int(startOffset - chunkStartOffset) + + // Check if requested offset is within the chunk + if positionInChunk < 0 { + glog.Errorf("[DiskCache] CRITICAL: Requested offset %d is BEFORE chunk start %d (positionInChunk=%d < 0)", + startOffset, chunkStartOffset, positionInChunk) + return nil, startOffset, fmt.Errorf("offset %d before chunk start %d", startOffset, chunkStartOffset) + } + + if positionInChunk >= len(chunkMessages) { + // Requested offset is beyond the cached chunk + // This happens when disk files only contain partial data + // The requested offset might be in the gap between disk and memory + glog.Infof("[DiskCache] Requested offset %d is beyond cached chunk (chunkStart=%d, cachedSize=%d, positionInChunk=%d)", + startOffset, chunkStartOffset, len(chunkMessages), positionInChunk) + glog.Infof("[DiskCache] Chunk contains offsets %d-%d, requested %d - data not on disk", + chunkStartOffset, chunkStartOffset+int64(len(chunkMessages))-1, startOffset) + + // Return empty (data not on disk) - caller will check memory buffers + return nil, startOffset, nil + } + + // Extract messages starting from the requested position + messages := make([]*filer_pb.LogEntry, 0, maxMessages) + nextOffset := startOffset + totalBytes := 0 + + for i := positionInChunk; i < len(chunkMessages) && len(messages) < maxMessages; i++ { + entry := chunkMessages[i] + entrySize := proto.Size(entry) + + // Check byte limit + if totalBytes > 0 && totalBytes+entrySize > maxBytes { + break + } + + messages = append(messages, entry) + totalBytes += entrySize + nextOffset++ + } + + glog.V(4).Infof("[DiskCache] Extracted %d messages from cache (offset %d-%d, bytes=%d)", + len(messages), startOffset, nextOffset-1, totalBytes) + + return messages, nextOffset, nil +} + +// parseMessagesFromBuffer parses messages from a buffer byte slice +// This is thread-safe as it operates on a copy of the buffer +func parseMessagesFromBuffer(buf []byte, startOffset int64, maxMessages int, maxBytes int) ( + messages []*filer_pb.LogEntry, + nextOffset int64, + totalBytes int, + err error, +) { + messages = make([]*filer_pb.LogEntry, 0, maxMessages) + nextOffset = startOffset + totalBytes = 0 + foundStart := false + + messagesInBuffer := 0 + for pos := 0; pos+4 < len(buf) && len(messages) < maxMessages && totalBytes < maxBytes; { + // Read message size + size := util.BytesToUint32(buf[pos : pos+4]) + if pos+4+int(size) > len(buf) { + // Incomplete message at end of buffer + glog.V(4).Infof("[parseMessages] Incomplete message at pos %d, size %d, bufLen %d", + pos, size, len(buf)) + break + } + + // Parse message + entryData := buf[pos+4 : pos+4+int(size)] + logEntry := &filer_pb.LogEntry{} + if err = proto.Unmarshal(entryData, logEntry); err != nil { + glog.Warningf("[parseMessages] Failed to unmarshal message: %v", err) + pos += 4 + int(size) + continue + } + + messagesInBuffer++ + + // Initialize foundStart from first message + if !foundStart { + // Find the first message at or after startOffset + if logEntry.Offset >= startOffset { + glog.Infof("[parseMessages] Found first message at/after startOffset %d: logEntry.Offset=%d", startOffset, logEntry.Offset) + foundStart = true + nextOffset = logEntry.Offset + } else { + // Skip messages before startOffset + glog.V(3).Infof("[parseMessages] Skipping message at offset %d (before startOffset %d)", logEntry.Offset, startOffset) + pos += 4 + int(size) + continue + } + } + + // Check if this message matches expected offset + if foundStart && logEntry.Offset >= startOffset { + glog.V(3).Infof("[parseMessages] Adding message at offset %d (count=%d)", logEntry.Offset, len(messages)+1) + messages = append(messages, logEntry) + totalBytes += 4 + int(size) + nextOffset = logEntry.Offset + 1 + } + + pos += 4 + int(size) + } + + glog.Infof("[parseMessages] Parsed buffer: requested startOffset=%d, messagesInBuffer=%d, messagesReturned=%d, nextOffset=%d", + startOffset, messagesInBuffer, len(messages), nextOffset) + + glog.V(4).Infof("[parseMessages] Parsed %d messages, nextOffset=%d, totalBytes=%d", + len(messages), nextOffset, totalBytes) + + return messages, nextOffset, totalBytes, nil +} + +// readMessagesFromDisk reads messages from disk using the ReadFromDiskFn +func (logBuffer *LogBuffer) readMessagesFromDisk(startOffset int64, maxMessages int, maxBytes int, highWaterMark int64) ( + messages []*filer_pb.LogEntry, + nextOffset int64, + highWaterMark2 int64, + endOfPartition bool, + err error, +) { + if logBuffer.ReadFromDiskFn == nil { + return nil, startOffset, highWaterMark, true, + fmt.Errorf("no disk read function configured") + } + + messages = make([]*filer_pb.LogEntry, 0, maxMessages) + nextOffset = startOffset + totalBytes := 0 + + // Use a simple callback to collect messages + collectFn := func(logEntry *filer_pb.LogEntry) (bool, error) { + // Check limits + if len(messages) >= maxMessages { + return true, nil // Done + } + + entrySize := 4 + len(logEntry.Data) + len(logEntry.Key) + if totalBytes+entrySize > maxBytes { + return true, nil // Done + } + + // Only include messages at or after startOffset + if logEntry.Offset >= startOffset { + messages = append(messages, logEntry) + totalBytes += entrySize + nextOffset = logEntry.Offset + 1 + } + + return false, nil // Continue + } + + // Read from disk + startPos := NewMessagePositionFromOffset(startOffset) + _, isDone, err := logBuffer.ReadFromDiskFn(startPos, 0, collectFn) + + if err != nil { + glog.Warningf("[StatelessRead] Disk read error: %v", err) + return nil, startOffset, highWaterMark, false, err + } + + glog.V(4).Infof("[StatelessRead] Read %d messages from disk, nextOffset=%d, isDone=%v", + len(messages), nextOffset, isDone) + + // If we read from disk and got no messages, and isDone is true, we're at the end + endOfPartition = isDone && len(messages) == 0 + + return messages, nextOffset, highWaterMark, endOfPartition, nil +} + +// GetHighWaterMark returns the highest offset available in this partition +// This is a lightweight operation for clients to check partition state +func (logBuffer *LogBuffer) GetHighWaterMark() int64 { + logBuffer.RLock() + defer logBuffer.RUnlock() + return logBuffer.offset +} + +// GetLogStartOffset returns the earliest offset available (either in memory or on disk) +// This is useful for clients to know the valid offset range +func (logBuffer *LogBuffer) GetLogStartOffset() int64 { + logBuffer.RLock() + defer logBuffer.RUnlock() + + // Check if we have offset information + if !logBuffer.hasOffsets { + return 0 + } + + // Return the current buffer start offset - this is the earliest offset in memory RIGHT NOW + // For stateless fetch, we only return what's currently available in memory + // We don't check prevBuffers because they may be stale or getting flushed + return logBuffer.bufferStartOffset +} + +// WaitForDataWithTimeout waits up to maxWaitMs for data to be available at startOffset +// Returns true if data became available, false if timeout +// This allows "long poll" behavior for real-time consumers +func (logBuffer *LogBuffer) WaitForDataWithTimeout(startOffset int64, maxWaitMs int) bool { + if maxWaitMs <= 0 { + return false + } + + timeout := time.NewTimer(time.Duration(maxWaitMs) * time.Millisecond) + defer timeout.Stop() + + // Register for notifications + notifyChan := logBuffer.RegisterSubscriber(fmt.Sprintf("fetch-%d", startOffset)) + defer logBuffer.UnregisterSubscriber(fmt.Sprintf("fetch-%d", startOffset)) + + // Check if data is already available + logBuffer.RLock() + currentEnd := logBuffer.offset + logBuffer.RUnlock() + + if currentEnd >= startOffset { + return true + } + + // Wait for notification or timeout + select { + case <-notifyChan: + // Data might be available now + logBuffer.RLock() + currentEnd := logBuffer.offset + logBuffer.RUnlock() + return currentEnd >= startOffset + case <-timeout.C: + return false + } +} |
