aboutsummaryrefslogtreecommitdiff
path: root/weed/util/log_buffer/log_buffer.go
diff options
context:
space:
mode:
Diffstat (limited to 'weed/util/log_buffer/log_buffer.go')
-rw-r--r--weed/util/log_buffer/log_buffer.go114
1 files changed, 88 insertions, 26 deletions
diff --git a/weed/util/log_buffer/log_buffer.go b/weed/util/log_buffer/log_buffer.go
index aff8ec80b..692c47b44 100644
--- a/weed/util/log_buffer/log_buffer.go
+++ b/weed/util/log_buffer/log_buffer.go
@@ -2,8 +2,8 @@ package log_buffer
import (
"bytes"
+ "fmt"
"math"
- "strings"
"sync"
"sync/atomic"
"time"
@@ -33,6 +33,21 @@ type EachLogEntryWithOffsetFuncType func(logEntry *filer_pb.LogEntry, offset int
type LogFlushFuncType func(logBuffer *LogBuffer, startTime, stopTime time.Time, buf []byte, minOffset, maxOffset int64)
type LogReadFromDiskFuncType func(startPosition MessagePosition, stopTsNs int64, eachLogEntryFn EachLogEntryFuncType) (lastReadPosition MessagePosition, isDone bool, err error)
+// DiskChunkCache caches chunks of historical data read from disk
+type DiskChunkCache struct {
+ mu sync.RWMutex
+ chunks map[int64]*CachedDiskChunk // Key: chunk start offset (aligned to chunkSize)
+ maxChunks int // Maximum number of chunks to cache
+}
+
+// CachedDiskChunk represents a cached chunk of disk data
+type CachedDiskChunk struct {
+ startOffset int64
+ endOffset int64
+ messages []*filer_pb.LogEntry
+ lastAccess time.Time
+}
+
type LogBuffer struct {
LastFlushTsNs int64
name string
@@ -63,6 +78,8 @@ type LogBuffer struct {
hasOffsets bool
lastFlushedOffset atomic.Int64 // Highest offset that has been flushed to disk (-1 = nothing flushed yet)
lastFlushedTime atomic.Int64 // Latest timestamp that has been flushed to disk (0 = nothing flushed yet)
+ // Disk chunk cache for historical data reads
+ diskChunkCache *DiskChunkCache
sync.RWMutex
}
@@ -81,6 +98,10 @@ func NewLogBuffer(name string, flushInterval time.Duration, flushFn LogFlushFunc
flushChan: make(chan *dataToFlush, 256),
isStopping: new(atomic.Bool),
offset: 0, // Will be initialized from existing data if available
+ diskChunkCache: &DiskChunkCache{
+ chunks: make(map[int64]*CachedDiskChunk),
+ maxChunks: 16, // Cache up to 16 chunks (configurable)
+ },
}
lb.lastFlushedOffset.Store(-1) // Nothing flushed to disk yet
go lb.loopFlush()
@@ -359,17 +380,52 @@ func (logBuffer *LogBuffer) AddDataToBuffer(partitionKey, data []byte, processin
if logBuffer.LastTsNs.Load() >= processingTsNs {
processingTsNs = logBuffer.LastTsNs.Add(1)
ts = time.Unix(0, processingTsNs)
- // Re-marshal with corrected timestamp
logEntry.TsNs = processingTsNs
- logEntryData, _ = proto.Marshal(logEntry)
} else {
logBuffer.LastTsNs.Store(processingTsNs)
}
+ // CRITICAL FIX: Set the offset in the LogEntry before marshaling
+ // This ensures the flushed data contains the correct offset information
+ // Note: This also enables AddToBuffer to work correctly with Kafka-style offset-based reads
+ logEntry.Offset = logBuffer.offset
+
+ // DEBUG: Log data being added to buffer for GitHub Actions debugging
+ dataPreview := ""
+ if len(data) > 0 {
+ if len(data) <= 50 {
+ dataPreview = string(data)
+ } else {
+ dataPreview = fmt.Sprintf("%s...(total %d bytes)", string(data[:50]), len(data))
+ }
+ }
+ glog.V(2).Infof("[LOG_BUFFER_ADD] buffer=%s offset=%d dataLen=%d dataPreview=%q",
+ logBuffer.name, logBuffer.offset, len(data), dataPreview)
+
+ // Marshal with correct timestamp and offset
+ logEntryData, _ = proto.Marshal(logEntry)
+
size := len(logEntryData)
if logBuffer.pos == 0 {
logBuffer.startTime = ts
+ // Reset offset tracking for new buffer
+ logBuffer.hasOffsets = false
+ }
+
+ // Track offset ranges for Kafka integration
+ // CRITICAL FIX: Track the current offset being written
+ if !logBuffer.hasOffsets {
+ logBuffer.minOffset = logBuffer.offset
+ logBuffer.maxOffset = logBuffer.offset
+ logBuffer.hasOffsets = true
+ } else {
+ if logBuffer.offset < logBuffer.minOffset {
+ logBuffer.minOffset = logBuffer.offset
+ }
+ if logBuffer.offset > logBuffer.maxOffset {
+ logBuffer.maxOffset = logBuffer.offset
+ }
}
if logBuffer.startTime.Add(logBuffer.flushInterval).Before(ts) || len(logBuffer.buf)-logBuffer.pos < size+4 {
@@ -397,6 +453,7 @@ func (logBuffer *LogBuffer) AddDataToBuffer(partitionKey, data []byte, processin
copy(logBuffer.buf[logBuffer.pos+4:logBuffer.pos+4+size], logEntryData)
logBuffer.pos += size + 4
+ logBuffer.offset++
}
func (logBuffer *LogBuffer) IsStopping() bool {
@@ -540,11 +597,29 @@ func (logBuffer *LogBuffer) copyToFlushInternal(withCallback bool) *dataToFlush
logBuffer.hasOffsets = false
logBuffer.minOffset = 0
logBuffer.maxOffset = 0
+
+ // CRITICAL FIX: Invalidate disk cache chunks after flush
+ // The cache may contain stale data from before this flush
+ // Invalidating ensures consumers will re-read fresh data from disk after flush
+ logBuffer.invalidateAllDiskCacheChunks()
+
return d
}
return nil
}
+// invalidateAllDiskCacheChunks clears all cached disk chunks
+// This should be called after a buffer flush to ensure consumers read fresh data from disk
+func (logBuffer *LogBuffer) invalidateAllDiskCacheChunks() {
+ logBuffer.diskChunkCache.mu.Lock()
+ defer logBuffer.diskChunkCache.mu.Unlock()
+
+ if len(logBuffer.diskChunkCache.chunks) > 0 {
+ glog.Infof("[DiskCache] Invalidating all %d cached chunks after flush", len(logBuffer.diskChunkCache.chunks))
+ logBuffer.diskChunkCache.chunks = make(map[int64]*CachedDiskChunk)
+ }
+}
+
func (logBuffer *LogBuffer) GetEarliestTime() time.Time {
return logBuffer.startTime
}
@@ -570,12 +645,6 @@ func (logBuffer *LogBuffer) ReadFromBuffer(lastReadPosition MessagePosition) (bu
if isOffsetBased {
requestedOffset := lastReadPosition.Offset
- // DEBUG: Log buffer state for _schemas topic
- if strings.Contains(logBuffer.name, "_schemas") {
- glog.Infof("[SCHEMAS ReadFromBuffer] requested=%d bufferStart=%d bufferEnd=%d pos=%d lastFlushed=%d",
- requestedOffset, logBuffer.bufferStartOffset, logBuffer.offset, logBuffer.pos, logBuffer.lastFlushedOffset.Load())
- }
-
// Check if the requested offset is in the current buffer range
if requestedOffset >= logBuffer.bufferStartOffset && requestedOffset <= logBuffer.offset {
// If current buffer is empty (pos=0), check if data is on disk or not yet written
@@ -593,10 +662,6 @@ func (logBuffer *LogBuffer) ReadFromBuffer(lastReadPosition MessagePosition) (bu
// Case 3: try disk read (historical data might exist)
if requestedOffset < logBuffer.offset {
// Data was in the buffer range but buffer is now empty = flushed to disk
- if strings.Contains(logBuffer.name, "_schemas") {
- glog.Infof("[SCHEMAS ReadFromBuffer] Returning ResumeFromDiskError: empty buffer, offset %d was flushed (bufferStart=%d, offset=%d)",
- requestedOffset, logBuffer.bufferStartOffset, logBuffer.offset)
- }
return nil, -2, ResumeFromDiskError
}
// requestedOffset == logBuffer.offset: Current position
@@ -604,20 +669,11 @@ func (logBuffer *LogBuffer) ReadFromBuffer(lastReadPosition MessagePosition) (bu
// (historical data might exist from previous runs)
if requestedOffset == 0 && logBuffer.bufferStartOffset == 0 && logBuffer.offset == 0 {
// Initial state: try disk read before waiting for new data
- if strings.Contains(logBuffer.name, "_schemas") {
- glog.Infof("[SCHEMAS ReadFromBuffer] Initial state, trying disk read for offset 0")
- }
return nil, -2, ResumeFromDiskError
}
// Otherwise, wait for new data to arrive
- if strings.Contains(logBuffer.name, "_schemas") {
- glog.Infof("[SCHEMAS ReadFromBuffer] Returning nil: waiting for offset %d to arrive", requestedOffset)
- }
return nil, logBuffer.offset, nil
}
- if strings.Contains(logBuffer.name, "_schemas") {
- glog.Infof("[SCHEMAS ReadFromBuffer] Returning %d bytes from buffer", logBuffer.pos)
- }
return copiedBytes(logBuffer.buf[:logBuffer.pos]), logBuffer.offset, nil
}
@@ -661,25 +717,31 @@ func (logBuffer *LogBuffer) ReadFromBuffer(lastReadPosition MessagePosition) (bu
// if td < tm, case 2.3
// read from disk again
var tsMemory time.Time
- var tsBatchIndex int64
if !logBuffer.startTime.IsZero() {
tsMemory = logBuffer.startTime
- tsBatchIndex = logBuffer.offset
}
for _, prevBuf := range logBuffer.prevBuffers.buffers {
if !prevBuf.startTime.IsZero() && prevBuf.startTime.Before(tsMemory) {
tsMemory = prevBuf.startTime
- tsBatchIndex = prevBuf.offset
}
}
if tsMemory.IsZero() { // case 2.2
return nil, -2, nil
- } else if lastReadPosition.Time.Before(tsMemory) && lastReadPosition.Offset+1 < tsBatchIndex { // case 2.3
+ } else if lastReadPosition.Time.Before(tsMemory) { // case 2.3
+ // CRITICAL FIX: For time-based reads, only check timestamp for disk reads
+ // Don't use offset comparisons as they're not meaningful for time-based subscriptions
+
// Special case: If requested time is zero (Unix epoch), treat as "start from beginning"
// This handles queries that want to read all data without knowing the exact start time
if lastReadPosition.Time.IsZero() || lastReadPosition.Time.Unix() == 0 {
// Start from the beginning of memory
// Fall through to case 2.1 to read from earliest buffer
+ } else if lastReadPosition.Offset == 0 && lastReadPosition.Time.Before(tsMemory) {
+ // CRITICAL FIX: If this is the first read (offset=0) and time is slightly before memory,
+ // it's likely a race between starting to read and first message being written
+ // Fall through to case 2.1 to read from earliest buffer instead of triggering disk read
+ glog.V(2).Infof("first read at time %v before earliest memory %v, reading from memory",
+ lastReadPosition.Time, tsMemory)
} else {
// Data not in memory buffers - read from disk
glog.V(0).Infof("resume from disk: requested time %v < earliest memory time %v",