diff options
Diffstat (limited to 'weed/util/log_buffer/log_buffer.go')
| -rw-r--r-- | weed/util/log_buffer/log_buffer.go | 114 |
1 files changed, 88 insertions, 26 deletions
diff --git a/weed/util/log_buffer/log_buffer.go b/weed/util/log_buffer/log_buffer.go index aff8ec80b..692c47b44 100644 --- a/weed/util/log_buffer/log_buffer.go +++ b/weed/util/log_buffer/log_buffer.go @@ -2,8 +2,8 @@ package log_buffer import ( "bytes" + "fmt" "math" - "strings" "sync" "sync/atomic" "time" @@ -33,6 +33,21 @@ type EachLogEntryWithOffsetFuncType func(logEntry *filer_pb.LogEntry, offset int type LogFlushFuncType func(logBuffer *LogBuffer, startTime, stopTime time.Time, buf []byte, minOffset, maxOffset int64) type LogReadFromDiskFuncType func(startPosition MessagePosition, stopTsNs int64, eachLogEntryFn EachLogEntryFuncType) (lastReadPosition MessagePosition, isDone bool, err error) +// DiskChunkCache caches chunks of historical data read from disk +type DiskChunkCache struct { + mu sync.RWMutex + chunks map[int64]*CachedDiskChunk // Key: chunk start offset (aligned to chunkSize) + maxChunks int // Maximum number of chunks to cache +} + +// CachedDiskChunk represents a cached chunk of disk data +type CachedDiskChunk struct { + startOffset int64 + endOffset int64 + messages []*filer_pb.LogEntry + lastAccess time.Time +} + type LogBuffer struct { LastFlushTsNs int64 name string @@ -63,6 +78,8 @@ type LogBuffer struct { hasOffsets bool lastFlushedOffset atomic.Int64 // Highest offset that has been flushed to disk (-1 = nothing flushed yet) lastFlushedTime atomic.Int64 // Latest timestamp that has been flushed to disk (0 = nothing flushed yet) + // Disk chunk cache for historical data reads + diskChunkCache *DiskChunkCache sync.RWMutex } @@ -81,6 +98,10 @@ func NewLogBuffer(name string, flushInterval time.Duration, flushFn LogFlushFunc flushChan: make(chan *dataToFlush, 256), isStopping: new(atomic.Bool), offset: 0, // Will be initialized from existing data if available + diskChunkCache: &DiskChunkCache{ + chunks: make(map[int64]*CachedDiskChunk), + maxChunks: 16, // Cache up to 16 chunks (configurable) + }, } lb.lastFlushedOffset.Store(-1) // Nothing flushed to disk yet go lb.loopFlush() @@ -359,17 +380,52 @@ func (logBuffer *LogBuffer) AddDataToBuffer(partitionKey, data []byte, processin if logBuffer.LastTsNs.Load() >= processingTsNs { processingTsNs = logBuffer.LastTsNs.Add(1) ts = time.Unix(0, processingTsNs) - // Re-marshal with corrected timestamp logEntry.TsNs = processingTsNs - logEntryData, _ = proto.Marshal(logEntry) } else { logBuffer.LastTsNs.Store(processingTsNs) } + // CRITICAL FIX: Set the offset in the LogEntry before marshaling + // This ensures the flushed data contains the correct offset information + // Note: This also enables AddToBuffer to work correctly with Kafka-style offset-based reads + logEntry.Offset = logBuffer.offset + + // DEBUG: Log data being added to buffer for GitHub Actions debugging + dataPreview := "" + if len(data) > 0 { + if len(data) <= 50 { + dataPreview = string(data) + } else { + dataPreview = fmt.Sprintf("%s...(total %d bytes)", string(data[:50]), len(data)) + } + } + glog.V(2).Infof("[LOG_BUFFER_ADD] buffer=%s offset=%d dataLen=%d dataPreview=%q", + logBuffer.name, logBuffer.offset, len(data), dataPreview) + + // Marshal with correct timestamp and offset + logEntryData, _ = proto.Marshal(logEntry) + size := len(logEntryData) if logBuffer.pos == 0 { logBuffer.startTime = ts + // Reset offset tracking for new buffer + logBuffer.hasOffsets = false + } + + // Track offset ranges for Kafka integration + // CRITICAL FIX: Track the current offset being written + if !logBuffer.hasOffsets { + logBuffer.minOffset = logBuffer.offset + logBuffer.maxOffset = logBuffer.offset + logBuffer.hasOffsets = true + } else { + if logBuffer.offset < logBuffer.minOffset { + logBuffer.minOffset = logBuffer.offset + } + if logBuffer.offset > logBuffer.maxOffset { + logBuffer.maxOffset = logBuffer.offset + } } if logBuffer.startTime.Add(logBuffer.flushInterval).Before(ts) || len(logBuffer.buf)-logBuffer.pos < size+4 { @@ -397,6 +453,7 @@ func (logBuffer *LogBuffer) AddDataToBuffer(partitionKey, data []byte, processin copy(logBuffer.buf[logBuffer.pos+4:logBuffer.pos+4+size], logEntryData) logBuffer.pos += size + 4 + logBuffer.offset++ } func (logBuffer *LogBuffer) IsStopping() bool { @@ -540,11 +597,29 @@ func (logBuffer *LogBuffer) copyToFlushInternal(withCallback bool) *dataToFlush logBuffer.hasOffsets = false logBuffer.minOffset = 0 logBuffer.maxOffset = 0 + + // CRITICAL FIX: Invalidate disk cache chunks after flush + // The cache may contain stale data from before this flush + // Invalidating ensures consumers will re-read fresh data from disk after flush + logBuffer.invalidateAllDiskCacheChunks() + return d } return nil } +// invalidateAllDiskCacheChunks clears all cached disk chunks +// This should be called after a buffer flush to ensure consumers read fresh data from disk +func (logBuffer *LogBuffer) invalidateAllDiskCacheChunks() { + logBuffer.diskChunkCache.mu.Lock() + defer logBuffer.diskChunkCache.mu.Unlock() + + if len(logBuffer.diskChunkCache.chunks) > 0 { + glog.Infof("[DiskCache] Invalidating all %d cached chunks after flush", len(logBuffer.diskChunkCache.chunks)) + logBuffer.diskChunkCache.chunks = make(map[int64]*CachedDiskChunk) + } +} + func (logBuffer *LogBuffer) GetEarliestTime() time.Time { return logBuffer.startTime } @@ -570,12 +645,6 @@ func (logBuffer *LogBuffer) ReadFromBuffer(lastReadPosition MessagePosition) (bu if isOffsetBased { requestedOffset := lastReadPosition.Offset - // DEBUG: Log buffer state for _schemas topic - if strings.Contains(logBuffer.name, "_schemas") { - glog.Infof("[SCHEMAS ReadFromBuffer] requested=%d bufferStart=%d bufferEnd=%d pos=%d lastFlushed=%d", - requestedOffset, logBuffer.bufferStartOffset, logBuffer.offset, logBuffer.pos, logBuffer.lastFlushedOffset.Load()) - } - // Check if the requested offset is in the current buffer range if requestedOffset >= logBuffer.bufferStartOffset && requestedOffset <= logBuffer.offset { // If current buffer is empty (pos=0), check if data is on disk or not yet written @@ -593,10 +662,6 @@ func (logBuffer *LogBuffer) ReadFromBuffer(lastReadPosition MessagePosition) (bu // Case 3: try disk read (historical data might exist) if requestedOffset < logBuffer.offset { // Data was in the buffer range but buffer is now empty = flushed to disk - if strings.Contains(logBuffer.name, "_schemas") { - glog.Infof("[SCHEMAS ReadFromBuffer] Returning ResumeFromDiskError: empty buffer, offset %d was flushed (bufferStart=%d, offset=%d)", - requestedOffset, logBuffer.bufferStartOffset, logBuffer.offset) - } return nil, -2, ResumeFromDiskError } // requestedOffset == logBuffer.offset: Current position @@ -604,20 +669,11 @@ func (logBuffer *LogBuffer) ReadFromBuffer(lastReadPosition MessagePosition) (bu // (historical data might exist from previous runs) if requestedOffset == 0 && logBuffer.bufferStartOffset == 0 && logBuffer.offset == 0 { // Initial state: try disk read before waiting for new data - if strings.Contains(logBuffer.name, "_schemas") { - glog.Infof("[SCHEMAS ReadFromBuffer] Initial state, trying disk read for offset 0") - } return nil, -2, ResumeFromDiskError } // Otherwise, wait for new data to arrive - if strings.Contains(logBuffer.name, "_schemas") { - glog.Infof("[SCHEMAS ReadFromBuffer] Returning nil: waiting for offset %d to arrive", requestedOffset) - } return nil, logBuffer.offset, nil } - if strings.Contains(logBuffer.name, "_schemas") { - glog.Infof("[SCHEMAS ReadFromBuffer] Returning %d bytes from buffer", logBuffer.pos) - } return copiedBytes(logBuffer.buf[:logBuffer.pos]), logBuffer.offset, nil } @@ -661,25 +717,31 @@ func (logBuffer *LogBuffer) ReadFromBuffer(lastReadPosition MessagePosition) (bu // if td < tm, case 2.3 // read from disk again var tsMemory time.Time - var tsBatchIndex int64 if !logBuffer.startTime.IsZero() { tsMemory = logBuffer.startTime - tsBatchIndex = logBuffer.offset } for _, prevBuf := range logBuffer.prevBuffers.buffers { if !prevBuf.startTime.IsZero() && prevBuf.startTime.Before(tsMemory) { tsMemory = prevBuf.startTime - tsBatchIndex = prevBuf.offset } } if tsMemory.IsZero() { // case 2.2 return nil, -2, nil - } else if lastReadPosition.Time.Before(tsMemory) && lastReadPosition.Offset+1 < tsBatchIndex { // case 2.3 + } else if lastReadPosition.Time.Before(tsMemory) { // case 2.3 + // CRITICAL FIX: For time-based reads, only check timestamp for disk reads + // Don't use offset comparisons as they're not meaningful for time-based subscriptions + // Special case: If requested time is zero (Unix epoch), treat as "start from beginning" // This handles queries that want to read all data without knowing the exact start time if lastReadPosition.Time.IsZero() || lastReadPosition.Time.Unix() == 0 { // Start from the beginning of memory // Fall through to case 2.1 to read from earliest buffer + } else if lastReadPosition.Offset == 0 && lastReadPosition.Time.Before(tsMemory) { + // CRITICAL FIX: If this is the first read (offset=0) and time is slightly before memory, + // it's likely a race between starting to read and first message being written + // Fall through to case 2.1 to read from earliest buffer instead of triggering disk read + glog.V(2).Infof("first read at time %v before earliest memory %v, reading from memory", + lastReadPosition.Time, tsMemory) } else { // Data not in memory buffers - read from disk glog.V(0).Infof("resume from disk: requested time %v < earliest memory time %v", |
