aboutsummaryrefslogtreecommitdiff
path: root/weed/util/log_buffer/log_read_stateless.go
diff options
context:
space:
mode:
Diffstat (limited to 'weed/util/log_buffer/log_read_stateless.go')
-rw-r--r--weed/util/log_buffer/log_read_stateless.go639
1 files changed, 639 insertions, 0 deletions
diff --git a/weed/util/log_buffer/log_read_stateless.go b/weed/util/log_buffer/log_read_stateless.go
new file mode 100644
index 000000000..b57f7742f
--- /dev/null
+++ b/weed/util/log_buffer/log_read_stateless.go
@@ -0,0 +1,639 @@
+package log_buffer
+
+import (
+ "fmt"
+ "time"
+
+ "github.com/seaweedfs/seaweedfs/weed/glog"
+ "github.com/seaweedfs/seaweedfs/weed/pb/filer_pb"
+ "github.com/seaweedfs/seaweedfs/weed/util"
+ "google.golang.org/protobuf/proto"
+)
+
+// ReadMessagesAtOffset provides Kafka-style stateless reads from LogBuffer
+// Each call is completely independent - no state maintained between calls
+// Thread-safe for concurrent reads at different offsets
+//
+// This is the recommended API for stateless clients like Kafka gateway
+// Unlike Subscribe loops, this:
+// 1. Returns immediately with available data (or empty if none)
+// 2. Does not maintain any session state
+// 3. Safe for concurrent calls
+// 4. No cancellation/restart complexity
+//
+// Returns:
+// - messages: Array of messages starting at startOffset
+// - nextOffset: Offset to use for next fetch
+// - highWaterMark: Highest offset available in partition
+// - endOfPartition: True if no more data available
+// - err: Any error encountered
+func (logBuffer *LogBuffer) ReadMessagesAtOffset(startOffset int64, maxMessages int, maxBytes int) (
+ messages []*filer_pb.LogEntry,
+ nextOffset int64,
+ highWaterMark int64,
+ endOfPartition bool,
+ err error,
+) {
+ glog.Infof("[StatelessRead] ENTRY: startOffset=%d, maxMessages=%d, maxBytes=%d",
+ startOffset, maxMessages, maxBytes)
+
+ // Quick validation
+ if maxMessages <= 0 {
+ maxMessages = 100 // Default reasonable batch size
+ }
+ if maxBytes <= 0 {
+ maxBytes = 4 * 1024 * 1024 // 4MB default
+ }
+
+ messages = make([]*filer_pb.LogEntry, 0, maxMessages)
+ nextOffset = startOffset
+
+ // Try to read from in-memory buffers first (hot path)
+ logBuffer.RLock()
+ currentBufferEnd := logBuffer.offset
+ bufferStartOffset := logBuffer.bufferStartOffset
+ highWaterMark = currentBufferEnd
+
+ glog.Infof("[StatelessRead] Buffer state: startOffset=%d, bufferStart=%d, bufferEnd=%d, HWM=%d, pos=%d",
+ startOffset, bufferStartOffset, currentBufferEnd, highWaterMark, logBuffer.pos)
+
+ // Special case: empty buffer (no data written yet)
+ if currentBufferEnd == 0 && bufferStartOffset == 0 && logBuffer.pos == 0 {
+ logBuffer.RUnlock()
+ glog.Infof("[StatelessRead] PATH: Empty buffer (no data written yet)")
+ // Return empty result - partition exists but has no data yet
+ // Preserve the requested offset in nextOffset
+ return messages, startOffset, 0, true, nil
+ }
+
+ // Check if requested offset is in current buffer
+ if startOffset >= bufferStartOffset && startOffset < currentBufferEnd {
+ glog.Infof("[StatelessRead] PATH: Attempting to read from current/previous memory buffers")
+ // Read from current buffer
+ glog.V(4).Infof("[StatelessRead] Reading from current buffer: start=%d, end=%d",
+ bufferStartOffset, currentBufferEnd)
+
+ if logBuffer.pos > 0 {
+ // Make a copy of the buffer to avoid concurrent modification
+ bufCopy := make([]byte, logBuffer.pos)
+ copy(bufCopy, logBuffer.buf[:logBuffer.pos])
+ logBuffer.RUnlock() // Release lock early
+
+ // Parse messages from buffer copy
+ messages, nextOffset, _, err = parseMessagesFromBuffer(
+ bufCopy, startOffset, maxMessages, maxBytes)
+
+ if err != nil {
+ return nil, startOffset, highWaterMark, false, err
+ }
+
+ glog.V(4).Infof("[StatelessRead] Read %d messages from current buffer, nextOffset=%d",
+ len(messages), nextOffset)
+
+ // Check if we reached the end
+ endOfPartition = (nextOffset >= currentBufferEnd) && (len(messages) == 0 || len(messages) < maxMessages)
+ return messages, nextOffset, highWaterMark, endOfPartition, nil
+ }
+
+ // Buffer is empty but offset is in range - check previous buffers
+ logBuffer.RUnlock()
+
+ // Try previous buffers
+ logBuffer.RLock()
+ for _, prevBuf := range logBuffer.prevBuffers.buffers {
+ if startOffset >= prevBuf.startOffset && startOffset <= prevBuf.offset {
+ if prevBuf.size > 0 {
+ // Found in previous buffer
+ bufCopy := make([]byte, prevBuf.size)
+ copy(bufCopy, prevBuf.buf[:prevBuf.size])
+ logBuffer.RUnlock()
+
+ messages, nextOffset, _, err = parseMessagesFromBuffer(
+ bufCopy, startOffset, maxMessages, maxBytes)
+
+ if err != nil {
+ return nil, startOffset, highWaterMark, false, err
+ }
+
+ glog.V(4).Infof("[StatelessRead] Read %d messages from previous buffer, nextOffset=%d",
+ len(messages), nextOffset)
+
+ endOfPartition = false // More data might be in current buffer
+ return messages, nextOffset, highWaterMark, endOfPartition, nil
+ }
+ // Empty previous buffer means data was flushed to disk - fall through to disk read
+ glog.V(2).Infof("[StatelessRead] Data at offset %d was flushed, attempting disk read", startOffset)
+ break
+ }
+ }
+ logBuffer.RUnlock()
+
+ // Data not in memory - attempt disk read if configured
+ // CRITICAL FIX: Don't return error here - data may be on disk!
+ // Fall through to disk read logic below
+ glog.V(2).Infof("[StatelessRead] Data at offset %d not in memory (buffer: %d-%d), attempting disk read",
+ startOffset, bufferStartOffset, currentBufferEnd)
+ // Don't return error - continue to disk read check below
+ } else {
+ // Offset is not in current buffer - check previous buffers FIRST before going to disk
+ // This handles the case where data was just flushed but is still in prevBuffers
+ glog.Infof("[StatelessRead] PATH: Offset %d not in current buffer [%d-%d), checking previous buffers first",
+ startOffset, bufferStartOffset, currentBufferEnd)
+
+ for _, prevBuf := range logBuffer.prevBuffers.buffers {
+ if startOffset >= prevBuf.startOffset && startOffset <= prevBuf.offset {
+ if prevBuf.size > 0 {
+ // Found in previous buffer!
+ bufCopy := make([]byte, prevBuf.size)
+ copy(bufCopy, prevBuf.buf[:prevBuf.size])
+ logBuffer.RUnlock()
+
+ messages, nextOffset, _, err = parseMessagesFromBuffer(
+ bufCopy, startOffset, maxMessages, maxBytes)
+
+ if err != nil {
+ return nil, startOffset, highWaterMark, false, err
+ }
+
+ glog.Infof("[StatelessRead] SUCCESS: Found %d messages in previous buffer, nextOffset=%d",
+ len(messages), nextOffset)
+
+ endOfPartition = false // More data might exist
+ return messages, nextOffset, highWaterMark, endOfPartition, nil
+ }
+ // Empty previous buffer - data was flushed to disk
+ glog.V(2).Infof("[StatelessRead] Found empty previous buffer for offset %d, will try disk", startOffset)
+ break
+ }
+ }
+ logBuffer.RUnlock()
+ }
+
+ // If we get here, unlock if not already unlocked
+ // (Note: logBuffer.RUnlock() was called above in all paths)
+
+ // Data not in memory - try disk read
+ // This handles two cases:
+ // 1. startOffset < bufferStartOffset: Historical data
+ // 2. startOffset in buffer range but not in memory: Data was flushed (from fall-through above)
+ if startOffset < currentBufferEnd {
+ glog.Infof("[StatelessRead] PATH: Data not in memory, attempting DISK READ")
+
+ // Historical data or flushed data - try to read from disk if ReadFromDiskFn is configured
+ if startOffset < bufferStartOffset {
+ glog.Errorf("[StatelessRead] CASE 1: Historical data - offset %d < bufferStart %d",
+ startOffset, bufferStartOffset)
+ } else {
+ glog.Errorf("[StatelessRead] CASE 2: Flushed data - offset %d in range [%d, %d) but not in memory",
+ startOffset, bufferStartOffset, currentBufferEnd)
+ }
+
+ // Check if disk read function is configured
+ if logBuffer.ReadFromDiskFn == nil {
+ glog.Errorf("[StatelessRead] CRITICAL: ReadFromDiskFn is NIL! Cannot read from disk.")
+ if startOffset < bufferStartOffset {
+ return messages, startOffset, highWaterMark, false, fmt.Errorf("offset %d too old (earliest in-memory: %d), and ReadFromDiskFn is nil",
+ startOffset, bufferStartOffset)
+ }
+ return messages, startOffset, highWaterMark, false, fmt.Errorf("offset %d not in memory (buffer: %d-%d), and ReadFromDiskFn is nil",
+ startOffset, bufferStartOffset, currentBufferEnd)
+ }
+
+ glog.Infof("[StatelessRead] ReadFromDiskFn is configured, calling readHistoricalDataFromDisk...")
+
+ // Read from disk (this is async/non-blocking if the ReadFromDiskFn is properly implemented)
+ // The ReadFromDiskFn should handle its own timeouts and not block indefinitely
+ diskMessages, diskNextOffset, diskErr := readHistoricalDataFromDisk(
+ logBuffer, startOffset, maxMessages, maxBytes, highWaterMark)
+
+ if diskErr != nil {
+ glog.Errorf("[StatelessRead] CRITICAL: Disk read FAILED for offset %d: %v", startOffset, diskErr)
+ // IMPORTANT: Return retryable error instead of silently returning empty!
+ return messages, startOffset, highWaterMark, false, fmt.Errorf("disk read failed for offset %d: %v", startOffset, diskErr)
+ }
+
+ if len(diskMessages) == 0 {
+ glog.Errorf("[StatelessRead] WARNING: Disk read returned 0 messages for offset %d (HWM=%d, bufferStart=%d)",
+ startOffset, highWaterMark, bufferStartOffset)
+ } else {
+ glog.Infof("[StatelessRead] SUCCESS: Disk read returned %d messages, nextOffset=%d",
+ len(diskMessages), diskNextOffset)
+ }
+
+ // Return disk data
+ endOfPartition = diskNextOffset >= bufferStartOffset && len(diskMessages) < maxMessages
+ return diskMessages, diskNextOffset, highWaterMark, endOfPartition, nil
+ }
+
+ // startOffset >= currentBufferEnd - future offset, no data available yet
+ glog.V(4).Infof("[StatelessRead] Future offset %d >= buffer end %d, no data available",
+ startOffset, currentBufferEnd)
+ return messages, startOffset, highWaterMark, true, nil
+}
+
+// readHistoricalDataFromDisk reads messages from disk for historical offsets
+// This is called when the requested offset is older than what's in memory
+// Uses an in-memory cache to avoid repeated disk I/O for the same chunks
+func readHistoricalDataFromDisk(
+ logBuffer *LogBuffer,
+ startOffset int64,
+ maxMessages int,
+ maxBytes int,
+ highWaterMark int64,
+) (messages []*filer_pb.LogEntry, nextOffset int64, err error) {
+ const chunkSize = 1000 // Size of each cached chunk
+
+ glog.Infof("[DiskRead] ENTRY: startOffset=%d, maxMessages=%d, maxBytes=%d, HWM=%d",
+ startOffset, maxMessages, maxBytes, highWaterMark)
+
+ // Calculate chunk start offset (aligned to chunkSize boundary)
+ chunkStartOffset := (startOffset / chunkSize) * chunkSize
+
+ glog.Infof("[DiskRead] Calculated chunkStartOffset=%d (aligned from %d)", chunkStartOffset, startOffset)
+
+ // Try to get from cache first
+ cachedMessages, cacheHit := getCachedDiskChunk(logBuffer, chunkStartOffset)
+
+ if cacheHit {
+ // Found in cache - extract requested messages
+ glog.Infof("[DiskCache] Cache HIT for chunk starting at offset %d (requested: %d), cachedMessages=%d",
+ chunkStartOffset, startOffset, len(cachedMessages))
+
+ result, nextOff, err := extractMessagesFromCache(cachedMessages, startOffset, maxMessages, maxBytes)
+
+ if err != nil {
+ // CRITICAL: Cache extraction failed because requested offset is BEYOND cached chunk
+ // This means disk files only contain partial data (e.g., 1000-1763) and the
+ // requested offset (e.g., 1764) is in a gap between disk and memory.
+ //
+ // SOLUTION: Return empty result with NO ERROR to let ReadMessagesAtOffset
+ // continue to check memory buffers. The data might be in memory even though
+ // it's not on disk.
+ glog.Errorf("[DiskCache] Offset %d is beyond cached chunk (start=%d, size=%d)",
+ startOffset, chunkStartOffset, len(cachedMessages))
+ glog.Infof("[DiskCache] Returning empty to let memory buffers handle offset %d", startOffset)
+
+ // Return empty but NO ERROR - this signals "not on disk, try memory"
+ return nil, startOffset, nil
+ }
+
+ // Success - return cached data
+ return result, nextOff, nil
+ }
+
+ glog.Infof("[DiskCache] Cache MISS for chunk starting at offset %d, reading from disk via ReadFromDiskFn",
+ chunkStartOffset)
+
+ // Not in cache - read entire chunk from disk for caching
+ chunkMessages := make([]*filer_pb.LogEntry, 0, chunkSize)
+ chunkNextOffset := chunkStartOffset
+
+ // Create a position for the chunk start
+ chunkPosition := MessagePosition{
+ IsOffsetBased: true,
+ Offset: chunkStartOffset,
+ }
+
+ // Define callback to collect the entire chunk
+ eachMessageFn := func(logEntry *filer_pb.LogEntry) (isDone bool, err error) {
+ // Read up to chunkSize messages for caching
+ if len(chunkMessages) >= chunkSize {
+ return true, nil
+ }
+
+ chunkMessages = append(chunkMessages, logEntry)
+ chunkNextOffset++
+
+ // Continue reading the chunk
+ return false, nil
+ }
+
+ // Read chunk from disk
+ glog.Infof("[DiskRead] Calling ReadFromDiskFn with position offset=%d...", chunkStartOffset)
+ _, _, readErr := logBuffer.ReadFromDiskFn(chunkPosition, 0, eachMessageFn)
+
+ if readErr != nil {
+ glog.Errorf("[DiskRead] CRITICAL: ReadFromDiskFn returned ERROR: %v", readErr)
+ return nil, startOffset, fmt.Errorf("failed to read from disk: %w", readErr)
+ }
+
+ glog.Infof("[DiskRead] ReadFromDiskFn completed successfully, read %d messages", len(chunkMessages))
+
+ // Cache the chunk for future reads
+ if len(chunkMessages) > 0 {
+ cacheDiskChunk(logBuffer, chunkStartOffset, chunkNextOffset-1, chunkMessages)
+ glog.Infof("[DiskCache] Cached chunk: offsets %d-%d (%d messages)",
+ chunkStartOffset, chunkNextOffset-1, len(chunkMessages))
+ } else {
+ glog.Errorf("[DiskRead] WARNING: ReadFromDiskFn returned 0 messages for chunkStart=%d", chunkStartOffset)
+ }
+
+ // Extract requested messages from the chunk
+ result, resNextOffset, resErr := extractMessagesFromCache(chunkMessages, startOffset, maxMessages, maxBytes)
+ glog.Infof("[DiskRead] EXIT: Returning %d messages, nextOffset=%d, err=%v", len(result), resNextOffset, resErr)
+ return result, resNextOffset, resErr
+}
+
+// getCachedDiskChunk retrieves a cached disk chunk if available
+func getCachedDiskChunk(logBuffer *LogBuffer, chunkStartOffset int64) ([]*filer_pb.LogEntry, bool) {
+ logBuffer.diskChunkCache.mu.RLock()
+ defer logBuffer.diskChunkCache.mu.RUnlock()
+
+ if chunk, exists := logBuffer.diskChunkCache.chunks[chunkStartOffset]; exists {
+ // Update last access time
+ chunk.lastAccess = time.Now()
+ return chunk.messages, true
+ }
+
+ return nil, false
+}
+
+// invalidateCachedDiskChunk removes a chunk from the cache
+// This is called when cached data is found to be incomplete or incorrect
+func invalidateCachedDiskChunk(logBuffer *LogBuffer, chunkStartOffset int64) {
+ logBuffer.diskChunkCache.mu.Lock()
+ defer logBuffer.diskChunkCache.mu.Unlock()
+
+ if _, exists := logBuffer.diskChunkCache.chunks[chunkStartOffset]; exists {
+ delete(logBuffer.diskChunkCache.chunks, chunkStartOffset)
+ glog.Infof("[DiskCache] Invalidated chunk at offset %d", chunkStartOffset)
+ }
+}
+
+// cacheDiskChunk stores a disk chunk in the cache with LRU eviction
+func cacheDiskChunk(logBuffer *LogBuffer, startOffset, endOffset int64, messages []*filer_pb.LogEntry) {
+ logBuffer.diskChunkCache.mu.Lock()
+ defer logBuffer.diskChunkCache.mu.Unlock()
+
+ // Check if we need to evict old chunks (LRU policy)
+ if len(logBuffer.diskChunkCache.chunks) >= logBuffer.diskChunkCache.maxChunks {
+ // Find least recently used chunk
+ var oldestOffset int64
+ var oldestTime time.Time
+ first := true
+
+ for offset, chunk := range logBuffer.diskChunkCache.chunks {
+ if first || chunk.lastAccess.Before(oldestTime) {
+ oldestOffset = offset
+ oldestTime = chunk.lastAccess
+ first = false
+ }
+ }
+
+ // Evict oldest chunk
+ delete(logBuffer.diskChunkCache.chunks, oldestOffset)
+ glog.V(4).Infof("[DiskCache] Evicted chunk at offset %d (LRU)", oldestOffset)
+ }
+
+ // Store new chunk
+ logBuffer.diskChunkCache.chunks[startOffset] = &CachedDiskChunk{
+ startOffset: startOffset,
+ endOffset: endOffset,
+ messages: messages,
+ lastAccess: time.Now(),
+ }
+}
+
+// extractMessagesFromCache extracts requested messages from a cached chunk
+// chunkMessages contains messages starting from the chunk's aligned start offset
+// We need to skip to the requested startOffset within the chunk
+func extractMessagesFromCache(chunkMessages []*filer_pb.LogEntry, startOffset int64, maxMessages, maxBytes int) ([]*filer_pb.LogEntry, int64, error) {
+ const chunkSize = 1000
+ chunkStartOffset := (startOffset / chunkSize) * chunkSize
+
+ // Calculate position within chunk
+ positionInChunk := int(startOffset - chunkStartOffset)
+
+ // Check if requested offset is within the chunk
+ if positionInChunk < 0 {
+ glog.Errorf("[DiskCache] CRITICAL: Requested offset %d is BEFORE chunk start %d (positionInChunk=%d < 0)",
+ startOffset, chunkStartOffset, positionInChunk)
+ return nil, startOffset, fmt.Errorf("offset %d before chunk start %d", startOffset, chunkStartOffset)
+ }
+
+ if positionInChunk >= len(chunkMessages) {
+ // Requested offset is beyond the cached chunk
+ // This happens when disk files only contain partial data
+ // The requested offset might be in the gap between disk and memory
+ glog.Infof("[DiskCache] Requested offset %d is beyond cached chunk (chunkStart=%d, cachedSize=%d, positionInChunk=%d)",
+ startOffset, chunkStartOffset, len(chunkMessages), positionInChunk)
+ glog.Infof("[DiskCache] Chunk contains offsets %d-%d, requested %d - data not on disk",
+ chunkStartOffset, chunkStartOffset+int64(len(chunkMessages))-1, startOffset)
+
+ // Return empty (data not on disk) - caller will check memory buffers
+ return nil, startOffset, nil
+ }
+
+ // Extract messages starting from the requested position
+ messages := make([]*filer_pb.LogEntry, 0, maxMessages)
+ nextOffset := startOffset
+ totalBytes := 0
+
+ for i := positionInChunk; i < len(chunkMessages) && len(messages) < maxMessages; i++ {
+ entry := chunkMessages[i]
+ entrySize := proto.Size(entry)
+
+ // Check byte limit
+ if totalBytes > 0 && totalBytes+entrySize > maxBytes {
+ break
+ }
+
+ messages = append(messages, entry)
+ totalBytes += entrySize
+ nextOffset++
+ }
+
+ glog.V(4).Infof("[DiskCache] Extracted %d messages from cache (offset %d-%d, bytes=%d)",
+ len(messages), startOffset, nextOffset-1, totalBytes)
+
+ return messages, nextOffset, nil
+}
+
+// parseMessagesFromBuffer parses messages from a buffer byte slice
+// This is thread-safe as it operates on a copy of the buffer
+func parseMessagesFromBuffer(buf []byte, startOffset int64, maxMessages int, maxBytes int) (
+ messages []*filer_pb.LogEntry,
+ nextOffset int64,
+ totalBytes int,
+ err error,
+) {
+ messages = make([]*filer_pb.LogEntry, 0, maxMessages)
+ nextOffset = startOffset
+ totalBytes = 0
+ foundStart := false
+
+ messagesInBuffer := 0
+ for pos := 0; pos+4 < len(buf) && len(messages) < maxMessages && totalBytes < maxBytes; {
+ // Read message size
+ size := util.BytesToUint32(buf[pos : pos+4])
+ if pos+4+int(size) > len(buf) {
+ // Incomplete message at end of buffer
+ glog.V(4).Infof("[parseMessages] Incomplete message at pos %d, size %d, bufLen %d",
+ pos, size, len(buf))
+ break
+ }
+
+ // Parse message
+ entryData := buf[pos+4 : pos+4+int(size)]
+ logEntry := &filer_pb.LogEntry{}
+ if err = proto.Unmarshal(entryData, logEntry); err != nil {
+ glog.Warningf("[parseMessages] Failed to unmarshal message: %v", err)
+ pos += 4 + int(size)
+ continue
+ }
+
+ messagesInBuffer++
+
+ // Initialize foundStart from first message
+ if !foundStart {
+ // Find the first message at or after startOffset
+ if logEntry.Offset >= startOffset {
+ glog.Infof("[parseMessages] Found first message at/after startOffset %d: logEntry.Offset=%d", startOffset, logEntry.Offset)
+ foundStart = true
+ nextOffset = logEntry.Offset
+ } else {
+ // Skip messages before startOffset
+ glog.V(3).Infof("[parseMessages] Skipping message at offset %d (before startOffset %d)", logEntry.Offset, startOffset)
+ pos += 4 + int(size)
+ continue
+ }
+ }
+
+ // Check if this message matches expected offset
+ if foundStart && logEntry.Offset >= startOffset {
+ glog.V(3).Infof("[parseMessages] Adding message at offset %d (count=%d)", logEntry.Offset, len(messages)+1)
+ messages = append(messages, logEntry)
+ totalBytes += 4 + int(size)
+ nextOffset = logEntry.Offset + 1
+ }
+
+ pos += 4 + int(size)
+ }
+
+ glog.Infof("[parseMessages] Parsed buffer: requested startOffset=%d, messagesInBuffer=%d, messagesReturned=%d, nextOffset=%d",
+ startOffset, messagesInBuffer, len(messages), nextOffset)
+
+ glog.V(4).Infof("[parseMessages] Parsed %d messages, nextOffset=%d, totalBytes=%d",
+ len(messages), nextOffset, totalBytes)
+
+ return messages, nextOffset, totalBytes, nil
+}
+
+// readMessagesFromDisk reads messages from disk using the ReadFromDiskFn
+func (logBuffer *LogBuffer) readMessagesFromDisk(startOffset int64, maxMessages int, maxBytes int, highWaterMark int64) (
+ messages []*filer_pb.LogEntry,
+ nextOffset int64,
+ highWaterMark2 int64,
+ endOfPartition bool,
+ err error,
+) {
+ if logBuffer.ReadFromDiskFn == nil {
+ return nil, startOffset, highWaterMark, true,
+ fmt.Errorf("no disk read function configured")
+ }
+
+ messages = make([]*filer_pb.LogEntry, 0, maxMessages)
+ nextOffset = startOffset
+ totalBytes := 0
+
+ // Use a simple callback to collect messages
+ collectFn := func(logEntry *filer_pb.LogEntry) (bool, error) {
+ // Check limits
+ if len(messages) >= maxMessages {
+ return true, nil // Done
+ }
+
+ entrySize := 4 + len(logEntry.Data) + len(logEntry.Key)
+ if totalBytes+entrySize > maxBytes {
+ return true, nil // Done
+ }
+
+ // Only include messages at or after startOffset
+ if logEntry.Offset >= startOffset {
+ messages = append(messages, logEntry)
+ totalBytes += entrySize
+ nextOffset = logEntry.Offset + 1
+ }
+
+ return false, nil // Continue
+ }
+
+ // Read from disk
+ startPos := NewMessagePositionFromOffset(startOffset)
+ _, isDone, err := logBuffer.ReadFromDiskFn(startPos, 0, collectFn)
+
+ if err != nil {
+ glog.Warningf("[StatelessRead] Disk read error: %v", err)
+ return nil, startOffset, highWaterMark, false, err
+ }
+
+ glog.V(4).Infof("[StatelessRead] Read %d messages from disk, nextOffset=%d, isDone=%v",
+ len(messages), nextOffset, isDone)
+
+ // If we read from disk and got no messages, and isDone is true, we're at the end
+ endOfPartition = isDone && len(messages) == 0
+
+ return messages, nextOffset, highWaterMark, endOfPartition, nil
+}
+
+// GetHighWaterMark returns the highest offset available in this partition
+// This is a lightweight operation for clients to check partition state
+func (logBuffer *LogBuffer) GetHighWaterMark() int64 {
+ logBuffer.RLock()
+ defer logBuffer.RUnlock()
+ return logBuffer.offset
+}
+
+// GetLogStartOffset returns the earliest offset available (either in memory or on disk)
+// This is useful for clients to know the valid offset range
+func (logBuffer *LogBuffer) GetLogStartOffset() int64 {
+ logBuffer.RLock()
+ defer logBuffer.RUnlock()
+
+ // Check if we have offset information
+ if !logBuffer.hasOffsets {
+ return 0
+ }
+
+ // Return the current buffer start offset - this is the earliest offset in memory RIGHT NOW
+ // For stateless fetch, we only return what's currently available in memory
+ // We don't check prevBuffers because they may be stale or getting flushed
+ return logBuffer.bufferStartOffset
+}
+
+// WaitForDataWithTimeout waits up to maxWaitMs for data to be available at startOffset
+// Returns true if data became available, false if timeout
+// This allows "long poll" behavior for real-time consumers
+func (logBuffer *LogBuffer) WaitForDataWithTimeout(startOffset int64, maxWaitMs int) bool {
+ if maxWaitMs <= 0 {
+ return false
+ }
+
+ timeout := time.NewTimer(time.Duration(maxWaitMs) * time.Millisecond)
+ defer timeout.Stop()
+
+ // Register for notifications
+ notifyChan := logBuffer.RegisterSubscriber(fmt.Sprintf("fetch-%d", startOffset))
+ defer logBuffer.UnregisterSubscriber(fmt.Sprintf("fetch-%d", startOffset))
+
+ // Check if data is already available
+ logBuffer.RLock()
+ currentEnd := logBuffer.offset
+ logBuffer.RUnlock()
+
+ if currentEnd >= startOffset {
+ return true
+ }
+
+ // Wait for notification or timeout
+ select {
+ case <-notifyChan:
+ // Data might be available now
+ logBuffer.RLock()
+ currentEnd := logBuffer.offset
+ logBuffer.RUnlock()
+ return currentEnd >= startOffset
+ case <-timeout.C:
+ return false
+ }
+}