aboutsummaryrefslogtreecommitdiff
path: root/weed/util/log_buffer/log_buffer.go
diff options
context:
space:
mode:
Diffstat (limited to 'weed/util/log_buffer/log_buffer.go')
-rw-r--r--weed/util/log_buffer/log_buffer.go501
1 files changed, 455 insertions, 46 deletions
diff --git a/weed/util/log_buffer/log_buffer.go b/weed/util/log_buffer/log_buffer.go
index 15ea062c6..aff8ec80b 100644
--- a/weed/util/log_buffer/log_buffer.go
+++ b/weed/util/log_buffer/log_buffer.go
@@ -2,6 +2,8 @@ package log_buffer
import (
"bytes"
+ "math"
+ "strings"
"sync"
"sync/atomic"
"time"
@@ -21,11 +23,14 @@ type dataToFlush struct {
startTime time.Time
stopTime time.Time
data *bytes.Buffer
+ minOffset int64
+ maxOffset int64
+ done chan struct{} // Signal when flush completes
}
type EachLogEntryFuncType func(logEntry *filer_pb.LogEntry) (isDone bool, err error)
-type EachLogEntryWithBatchIndexFuncType func(logEntry *filer_pb.LogEntry, batchIndex int64) (isDone bool, err error)
-type LogFlushFuncType func(logBuffer *LogBuffer, startTime, stopTime time.Time, buf []byte)
+type EachLogEntryWithOffsetFuncType func(logEntry *filer_pb.LogEntry, offset int64) (isDone bool, err error)
+type LogFlushFuncType func(logBuffer *LogBuffer, startTime, stopTime time.Time, buf []byte, minOffset, maxOffset int64)
type LogReadFromDiskFuncType func(startPosition MessagePosition, stopTsNs int64, eachLogEntryFn EachLogEntryFuncType) (lastReadPosition MessagePosition, isDone bool, err error)
type LogBuffer struct {
@@ -33,7 +38,8 @@ type LogBuffer struct {
name string
prevBuffers *SealedBuffers
buf []byte
- batchIndex int64
+ offset int64 // Last offset in current buffer (endOffset)
+ bufferStartOffset int64 // First offset in current buffer
idx []int
pos int
startTime time.Time
@@ -44,10 +50,19 @@ type LogBuffer struct {
flushFn LogFlushFuncType
ReadFromDiskFn LogReadFromDiskFuncType
notifyFn func()
- isStopping *atomic.Bool
- isAllFlushed bool
- flushChan chan *dataToFlush
- LastTsNs atomic.Int64
+ // Per-subscriber notification channels for instant wake-up
+ subscribersMu sync.RWMutex
+ subscribers map[string]chan struct{} // subscriberID -> notification channel
+ isStopping *atomic.Bool
+ isAllFlushed bool
+ flushChan chan *dataToFlush
+ LastTsNs atomic.Int64
+ // Offset range tracking for Kafka integration
+ minOffset int64
+ maxOffset int64
+ hasOffsets bool
+ lastFlushedOffset atomic.Int64 // Highest offset that has been flushed to disk (-1 = nothing flushed yet)
+ lastFlushedTime atomic.Int64 // Latest timestamp that has been flushed to disk (0 = nothing flushed yet)
sync.RWMutex
}
@@ -62,19 +77,250 @@ func NewLogBuffer(name string, flushInterval time.Duration, flushFn LogFlushFunc
flushFn: flushFn,
ReadFromDiskFn: readFromDiskFn,
notifyFn: notifyFn,
+ subscribers: make(map[string]chan struct{}),
flushChan: make(chan *dataToFlush, 256),
isStopping: new(atomic.Bool),
- batchIndex: time.Now().UnixNano(), // Initialize with creation time for uniqueness across restarts
+ offset: 0, // Will be initialized from existing data if available
}
+ lb.lastFlushedOffset.Store(-1) // Nothing flushed to disk yet
go lb.loopFlush()
go lb.loopInterval()
return lb
}
+// RegisterSubscriber registers a subscriber for instant notifications when data is written
+// Returns a channel that will receive notifications (<1ms latency)
+func (logBuffer *LogBuffer) RegisterSubscriber(subscriberID string) chan struct{} {
+ logBuffer.subscribersMu.Lock()
+ defer logBuffer.subscribersMu.Unlock()
+
+ // Check if already registered
+ if existingChan, exists := logBuffer.subscribers[subscriberID]; exists {
+ glog.V(2).Infof("Subscriber %s already registered for %s, reusing channel", subscriberID, logBuffer.name)
+ return existingChan
+ }
+
+ // Create buffered channel (size 1) so notifications never block
+ notifyChan := make(chan struct{}, 1)
+ logBuffer.subscribers[subscriberID] = notifyChan
+ glog.V(1).Infof("Registered subscriber %s for %s (total: %d)", subscriberID, logBuffer.name, len(logBuffer.subscribers))
+ return notifyChan
+}
+
+// UnregisterSubscriber removes a subscriber and closes its notification channel
+func (logBuffer *LogBuffer) UnregisterSubscriber(subscriberID string) {
+ logBuffer.subscribersMu.Lock()
+ defer logBuffer.subscribersMu.Unlock()
+
+ if ch, exists := logBuffer.subscribers[subscriberID]; exists {
+ close(ch)
+ delete(logBuffer.subscribers, subscriberID)
+ glog.V(1).Infof("Unregistered subscriber %s from %s (remaining: %d)", subscriberID, logBuffer.name, len(logBuffer.subscribers))
+ }
+}
+
+// IsOffsetInMemory checks if the given offset is available in the in-memory buffer
+// Returns true if:
+// 1. Offset is newer than what's been flushed to disk (must be in memory)
+// 2. Offset is in current buffer or previous buffers (may be flushed but still in memory)
+// Returns false if offset is older than memory buffers (only on disk)
+func (logBuffer *LogBuffer) IsOffsetInMemory(offset int64) bool {
+ logBuffer.RLock()
+ defer logBuffer.RUnlock()
+
+ // Check if we're tracking offsets at all
+ if !logBuffer.hasOffsets {
+ return false // No offsets tracked yet
+ }
+
+ // OPTIMIZATION: If offset is newer than what's been flushed to disk,
+ // it MUST be in memory (not written to disk yet)
+ lastFlushed := logBuffer.lastFlushedOffset.Load()
+ if lastFlushed >= 0 && offset > lastFlushed {
+ glog.V(3).Infof("Offset %d is in memory (newer than lastFlushed=%d)", offset, lastFlushed)
+ return true
+ }
+
+ // Check if offset is in current buffer range AND buffer has data
+ // (data can be both on disk AND in memory during flush window)
+ if offset >= logBuffer.bufferStartOffset && offset <= logBuffer.offset {
+ // CRITICAL: Check if buffer actually has data (pos > 0)
+ // After flush, pos=0 but range is still valid - data is on disk, not in memory
+ if logBuffer.pos > 0 {
+ glog.V(3).Infof("Offset %d is in current buffer [%d-%d] with data", offset, logBuffer.bufferStartOffset, logBuffer.offset)
+ return true
+ }
+ // Buffer is empty (just flushed) - data is on disk
+ glog.V(3).Infof("Offset %d in range [%d-%d] but buffer empty (pos=0), data on disk", offset, logBuffer.bufferStartOffset, logBuffer.offset)
+ return false
+ }
+
+ // Check if offset is in previous buffers AND they have data
+ for _, buf := range logBuffer.prevBuffers.buffers {
+ if offset >= buf.startOffset && offset <= buf.offset {
+ // Check if prevBuffer actually has data
+ if buf.size > 0 {
+ glog.V(3).Infof("Offset %d is in previous buffer [%d-%d] with data", offset, buf.startOffset, buf.offset)
+ return true
+ }
+ // Buffer is empty (flushed) - data is on disk
+ glog.V(3).Infof("Offset %d in prevBuffer [%d-%d] but empty (size=0), data on disk", offset, buf.startOffset, buf.offset)
+ return false
+ }
+ }
+
+ // Offset is older than memory buffers - only available on disk
+ glog.V(3).Infof("Offset %d is NOT in memory (bufferStart=%d, lastFlushed=%d)", offset, logBuffer.bufferStartOffset, lastFlushed)
+ return false
+}
+
+// notifySubscribers sends notifications to all registered subscribers
+// Non-blocking: uses select with default to avoid blocking on full channels
+func (logBuffer *LogBuffer) notifySubscribers() {
+ logBuffer.subscribersMu.RLock()
+ defer logBuffer.subscribersMu.RUnlock()
+
+ if len(logBuffer.subscribers) == 0 {
+ return // No subscribers, skip notification
+ }
+
+ for subscriberID, notifyChan := range logBuffer.subscribers {
+ select {
+ case notifyChan <- struct{}{}:
+ // Notification sent successfully
+ glog.V(3).Infof("Notified subscriber %s for %s", subscriberID, logBuffer.name)
+ default:
+ // Channel full - subscriber hasn't consumed previous notification yet
+ // This is OK because one notification is sufficient to wake the subscriber
+ glog.V(3).Infof("Subscriber %s notification channel full (OK - already notified)", subscriberID)
+ }
+ }
+}
+
+// InitializeOffsetFromExistingData initializes the offset counter from existing data on disk
+// This should be called after LogBuffer creation to ensure offset continuity on restart
+func (logBuffer *LogBuffer) InitializeOffsetFromExistingData(getHighestOffsetFn func() (int64, error)) error {
+ if getHighestOffsetFn == nil {
+ return nil // No initialization function provided
+ }
+
+ highestOffset, err := getHighestOffsetFn()
+ if err != nil {
+ glog.V(0).Infof("Failed to get highest offset for %s: %v, starting from 0", logBuffer.name, err)
+ return nil // Continue with offset 0 if we can't read existing data
+ }
+
+ if highestOffset >= 0 {
+ // Set the next offset to be one after the highest existing offset
+ nextOffset := highestOffset + 1
+ logBuffer.offset = nextOffset
+ // CRITICAL FIX: bufferStartOffset should match offset after initialization
+ // This ensures that reads for old offsets (0...highestOffset) will trigger disk reads
+ // New data written after this will start at nextOffset
+ logBuffer.bufferStartOffset = nextOffset
+ // CRITICAL: Track that data [0...highestOffset] is on disk
+ logBuffer.lastFlushedOffset.Store(highestOffset)
+ // Set lastFlushedTime to current time (we know data up to highestOffset is on disk)
+ logBuffer.lastFlushedTime.Store(time.Now().UnixNano())
+ glog.V(0).Infof("Initialized LogBuffer %s offset to %d (highest existing: %d), buffer starts at %d, lastFlushedOffset=%d, lastFlushedTime=%v",
+ logBuffer.name, nextOffset, highestOffset, nextOffset, highestOffset, time.Now())
+ } else {
+ logBuffer.bufferStartOffset = 0 // Start from offset 0
+ // No data on disk yet
+ glog.V(0).Infof("No existing data found for %s, starting from offset 0, lastFlushedOffset=-1, lastFlushedTime=0", logBuffer.name)
+ }
+
+ return nil
+}
+
func (logBuffer *LogBuffer) AddToBuffer(message *mq_pb.DataMessage) {
logBuffer.AddDataToBuffer(message.Key, message.Value, message.TsNs)
}
+// AddLogEntryToBuffer directly adds a LogEntry to the buffer, preserving offset information
+func (logBuffer *LogBuffer) AddLogEntryToBuffer(logEntry *filer_pb.LogEntry) {
+ logEntryData, _ := proto.Marshal(logEntry)
+
+ var toFlush *dataToFlush
+ logBuffer.Lock()
+ defer func() {
+ logBuffer.Unlock()
+ if toFlush != nil {
+ logBuffer.flushChan <- toFlush
+ }
+ if logBuffer.notifyFn != nil {
+ logBuffer.notifyFn()
+ }
+ // Notify all registered subscribers instantly (<1ms latency)
+ logBuffer.notifySubscribers()
+ }()
+
+ processingTsNs := logEntry.TsNs
+ ts := time.Unix(0, processingTsNs)
+
+ // Handle timestamp collision inside lock (rare case)
+ if logBuffer.LastTsNs.Load() >= processingTsNs {
+ processingTsNs = logBuffer.LastTsNs.Add(1)
+ ts = time.Unix(0, processingTsNs)
+ // Re-marshal with corrected timestamp
+ logEntry.TsNs = processingTsNs
+ logEntryData, _ = proto.Marshal(logEntry)
+ } else {
+ logBuffer.LastTsNs.Store(processingTsNs)
+ }
+
+ size := len(logEntryData)
+
+ if logBuffer.pos == 0 {
+ logBuffer.startTime = ts
+ // Reset offset tracking for new buffer
+ logBuffer.hasOffsets = false
+ }
+
+ // Track offset ranges for Kafka integration
+ // CRITICAL FIX: Use >= 0 to include offset 0 (first message in a topic)
+ if logEntry.Offset >= 0 {
+ if !logBuffer.hasOffsets {
+ logBuffer.minOffset = logEntry.Offset
+ logBuffer.maxOffset = logEntry.Offset
+ logBuffer.hasOffsets = true
+ } else {
+ if logEntry.Offset < logBuffer.minOffset {
+ logBuffer.minOffset = logEntry.Offset
+ }
+ if logEntry.Offset > logBuffer.maxOffset {
+ logBuffer.maxOffset = logEntry.Offset
+ }
+ }
+ }
+
+ if logBuffer.startTime.Add(logBuffer.flushInterval).Before(ts) || len(logBuffer.buf)-logBuffer.pos < size+4 {
+ toFlush = logBuffer.copyToFlush()
+ logBuffer.startTime = ts
+ if len(logBuffer.buf) < size+4 {
+ // Validate size to prevent integer overflow in computation BEFORE allocation
+ const maxBufferSize = 1 << 30 // 1 GiB practical limit
+ // Ensure 2*size + 4 won't overflow int and stays within practical bounds
+ if size < 0 || size > (math.MaxInt-4)/2 || size > (maxBufferSize-4)/2 {
+ glog.Errorf("Buffer size out of valid range: %d bytes, skipping", size)
+ return
+ }
+ // Safe to compute now that we've validated size is in valid range
+ newSize := 2*size + 4
+ logBuffer.buf = make([]byte, newSize)
+ }
+ }
+ logBuffer.stopTime = ts
+
+ logBuffer.idx = append(logBuffer.idx, logBuffer.pos)
+ util.Uint32toBytes(logBuffer.sizeBuf, uint32(size))
+ copy(logBuffer.buf[logBuffer.pos:logBuffer.pos+4], logBuffer.sizeBuf)
+ copy(logBuffer.buf[logBuffer.pos+4:logBuffer.pos+4+size], logEntryData)
+ logBuffer.pos += size + 4
+
+ logBuffer.offset++
+}
+
func (logBuffer *LogBuffer) AddDataToBuffer(partitionKey, data []byte, processingTsNs int64) {
// PERFORMANCE OPTIMIZATION: Pre-process expensive operations OUTSIDE the lock
@@ -105,6 +351,8 @@ func (logBuffer *LogBuffer) AddDataToBuffer(partitionKey, data []byte, processin
if logBuffer.notifyFn != nil {
logBuffer.notifyFn()
}
+ // Notify all registered subscribers instantly (<1ms latency)
+ logBuffer.notifySubscribers()
}()
// Handle timestamp collision inside lock (rare case)
@@ -125,11 +373,20 @@ func (logBuffer *LogBuffer) AddDataToBuffer(partitionKey, data []byte, processin
}
if logBuffer.startTime.Add(logBuffer.flushInterval).Before(ts) || len(logBuffer.buf)-logBuffer.pos < size+4 {
- // glog.V(0).Infof("%s copyToFlush1 batch:%d count:%d start time %v, ts %v, remaining %d bytes", logBuffer.name, logBuffer.batchIndex, len(logBuffer.idx), logBuffer.startTime, ts, len(logBuffer.buf)-logBuffer.pos)
+ // glog.V(0).Infof("%s copyToFlush1 offset:%d count:%d start time %v, ts %v, remaining %d bytes", logBuffer.name, logBuffer.offset, len(logBuffer.idx), logBuffer.startTime, ts, len(logBuffer.buf)-logBuffer.pos)
toFlush = logBuffer.copyToFlush()
logBuffer.startTime = ts
if len(logBuffer.buf) < size+4 {
- logBuffer.buf = make([]byte, 2*size+4)
+ // Validate size to prevent integer overflow in computation BEFORE allocation
+ const maxBufferSize = 1 << 30 // 1 GiB practical limit
+ // Ensure 2*size + 4 won't overflow int and stays within practical bounds
+ if size < 0 || size > (math.MaxInt-4)/2 || size > (maxBufferSize-4)/2 {
+ glog.Errorf("Buffer size out of valid range: %d bytes, skipping", size)
+ return
+ }
+ // Safe to compute now that we've validated size is in valid range
+ newSize := 2*size + 4
+ logBuffer.buf = make([]byte, newSize)
}
}
logBuffer.stopTime = ts
@@ -140,14 +397,44 @@ func (logBuffer *LogBuffer) AddDataToBuffer(partitionKey, data []byte, processin
copy(logBuffer.buf[logBuffer.pos+4:logBuffer.pos+4+size], logEntryData)
logBuffer.pos += size + 4
- // fmt.Printf("partitionKey %v entry size %d total %d count %d\n", string(partitionKey), size, m.pos, len(m.idx))
-
}
func (logBuffer *LogBuffer) IsStopping() bool {
return logBuffer.isStopping.Load()
}
+// ForceFlush immediately flushes the current buffer content and WAITS for completion
+// This is useful for critical topics that need immediate persistence
+// CRITICAL: This function is now SYNCHRONOUS - it blocks until the flush completes
+func (logBuffer *LogBuffer) ForceFlush() {
+ if logBuffer.isStopping.Load() {
+ return // Don't flush if we're shutting down
+ }
+
+ logBuffer.Lock()
+ toFlush := logBuffer.copyToFlushWithCallback()
+ logBuffer.Unlock()
+
+ if toFlush != nil {
+ // Send to flush channel (with reasonable timeout)
+ select {
+ case logBuffer.flushChan <- toFlush:
+ // Successfully queued for flush - now WAIT for it to complete
+ select {
+ case <-toFlush.done:
+ // Flush completed successfully
+ glog.V(1).Infof("ForceFlush completed for %s", logBuffer.name)
+ case <-time.After(5 * time.Second):
+ // Timeout waiting for flush - this shouldn't happen
+ glog.Warningf("ForceFlush timed out waiting for completion on %s", logBuffer.name)
+ }
+ case <-time.After(2 * time.Second):
+ // If flush channel is still blocked after 2s, something is wrong
+ glog.Warningf("ForceFlush channel timeout for %s - flush channel busy for 2s", logBuffer.name)
+ }
+ }
+}
+
// ShutdownLogBuffer flushes the buffer and stops the log buffer
func (logBuffer *LogBuffer) ShutdownLogBuffer() {
isAlreadyStopped := logBuffer.isStopping.Swap(true)
@@ -168,10 +455,24 @@ func (logBuffer *LogBuffer) loopFlush() {
for d := range logBuffer.flushChan {
if d != nil {
// glog.V(4).Infof("%s flush [%v, %v] size %d", m.name, d.startTime, d.stopTime, len(d.data.Bytes()))
- logBuffer.flushFn(logBuffer, d.startTime, d.stopTime, d.data.Bytes())
+ logBuffer.flushFn(logBuffer, d.startTime, d.stopTime, d.data.Bytes(), d.minOffset, d.maxOffset)
d.releaseMemory()
// local logbuffer is different from aggregate logbuffer here
logBuffer.lastFlushDataTime = d.stopTime
+
+ // CRITICAL: Track what's been flushed to disk for both offset-based and time-based reads
+ // CRITICAL FIX: Use >= 0 to include offset 0 (first message in a topic)
+ if d.maxOffset >= 0 {
+ logBuffer.lastFlushedOffset.Store(d.maxOffset)
+ }
+ if !d.stopTime.IsZero() {
+ logBuffer.lastFlushedTime.Store(d.stopTime.UnixNano())
+ }
+
+ // Signal completion if there's a callback channel
+ if d.done != nil {
+ close(d.done)
+ }
}
}
logBuffer.isAllFlushed = true
@@ -183,6 +484,7 @@ func (logBuffer *LogBuffer) loopInterval() {
if logBuffer.IsStopping() {
return
}
+
logBuffer.Lock()
toFlush := logBuffer.copyToFlush()
logBuffer.Unlock()
@@ -196,27 +498,48 @@ func (logBuffer *LogBuffer) loopInterval() {
}
func (logBuffer *LogBuffer) copyToFlush() *dataToFlush {
+ return logBuffer.copyToFlushInternal(false)
+}
+
+func (logBuffer *LogBuffer) copyToFlushWithCallback() *dataToFlush {
+ return logBuffer.copyToFlushInternal(true)
+}
+
+func (logBuffer *LogBuffer) copyToFlushInternal(withCallback bool) *dataToFlush {
if logBuffer.pos > 0 {
- // fmt.Printf("flush buffer %d pos %d empty space %d\n", len(m.buf), m.pos, len(m.buf)-m.pos)
var d *dataToFlush
if logBuffer.flushFn != nil {
d = &dataToFlush{
startTime: logBuffer.startTime,
stopTime: logBuffer.stopTime,
data: copiedBytes(logBuffer.buf[:logBuffer.pos]),
+ minOffset: logBuffer.minOffset,
+ maxOffset: logBuffer.maxOffset,
+ }
+ // Add callback channel for synchronous ForceFlush
+ if withCallback {
+ d.done = make(chan struct{})
}
// glog.V(4).Infof("%s flushing [0,%d) with %d entries [%v, %v]", m.name, m.pos, len(m.idx), m.startTime, m.stopTime)
} else {
// glog.V(4).Infof("%s removed from memory [0,%d) with %d entries [%v, %v]", m.name, m.pos, len(m.idx), m.startTime, m.stopTime)
logBuffer.lastFlushDataTime = logBuffer.stopTime
}
- logBuffer.buf = logBuffer.prevBuffers.SealBuffer(logBuffer.startTime, logBuffer.stopTime, logBuffer.buf, logBuffer.pos, logBuffer.batchIndex)
+ // CRITICAL: logBuffer.offset is the "next offset to assign", so last offset in buffer is offset-1
+ lastOffsetInBuffer := logBuffer.offset - 1
+ logBuffer.buf = logBuffer.prevBuffers.SealBuffer(logBuffer.startTime, logBuffer.stopTime, logBuffer.buf, logBuffer.pos, logBuffer.bufferStartOffset, lastOffsetInBuffer)
logBuffer.startTime = time.Unix(0, 0)
logBuffer.stopTime = time.Unix(0, 0)
logBuffer.pos = 0
logBuffer.idx = logBuffer.idx[:0]
- logBuffer.batchIndex++
+ // DON'T increment offset - it's already pointing to the next offset!
+ // logBuffer.offset++ // REMOVED - this was causing offset gaps!
+ logBuffer.bufferStartOffset = logBuffer.offset // Next buffer starts at current offset (which is already the next one)
+ // Reset offset tracking
+ logBuffer.hasOffsets = false
+ logBuffer.minOffset = 0
+ logBuffer.maxOffset = 0
return d
}
return nil
@@ -227,8 +550,8 @@ func (logBuffer *LogBuffer) GetEarliestTime() time.Time {
}
func (logBuffer *LogBuffer) GetEarliestPosition() MessagePosition {
return MessagePosition{
- Time: logBuffer.startTime,
- BatchIndex: logBuffer.batchIndex,
+ Time: logBuffer.startTime,
+ Offset: logBuffer.offset,
}
}
@@ -241,6 +564,93 @@ func (logBuffer *LogBuffer) ReadFromBuffer(lastReadPosition MessagePosition) (bu
logBuffer.RLock()
defer logBuffer.RUnlock()
+ isOffsetBased := lastReadPosition.IsOffsetBased
+
+ // CRITICAL FIX: For offset-based subscriptions, use offset comparisons, not time comparisons!
+ if isOffsetBased {
+ requestedOffset := lastReadPosition.Offset
+
+ // DEBUG: Log buffer state for _schemas topic
+ if strings.Contains(logBuffer.name, "_schemas") {
+ glog.Infof("[SCHEMAS ReadFromBuffer] requested=%d bufferStart=%d bufferEnd=%d pos=%d lastFlushed=%d",
+ requestedOffset, logBuffer.bufferStartOffset, logBuffer.offset, logBuffer.pos, logBuffer.lastFlushedOffset.Load())
+ }
+
+ // Check if the requested offset is in the current buffer range
+ if requestedOffset >= logBuffer.bufferStartOffset && requestedOffset <= logBuffer.offset {
+ // If current buffer is empty (pos=0), check if data is on disk or not yet written
+ if logBuffer.pos == 0 {
+ // CRITICAL FIX: If buffer is empty but offset range covers the request,
+ // it means data was in memory and has been flushed/moved out.
+ // The bufferStartOffset advancing to cover this offset proves data existed.
+ //
+ // Three cases:
+ // 1. requestedOffset < logBuffer.offset: Data was here, now flushed
+ // 2. requestedOffset == logBuffer.offset && bufferStartOffset > 0: Buffer advanced, data flushed
+ // 3. requestedOffset == logBuffer.offset && bufferStartOffset == 0: Initial state - try disk first!
+ //
+ // Cases 1 & 2: try disk read
+ // Case 3: try disk read (historical data might exist)
+ if requestedOffset < logBuffer.offset {
+ // Data was in the buffer range but buffer is now empty = flushed to disk
+ if strings.Contains(logBuffer.name, "_schemas") {
+ glog.Infof("[SCHEMAS ReadFromBuffer] Returning ResumeFromDiskError: empty buffer, offset %d was flushed (bufferStart=%d, offset=%d)",
+ requestedOffset, logBuffer.bufferStartOffset, logBuffer.offset)
+ }
+ return nil, -2, ResumeFromDiskError
+ }
+ // requestedOffset == logBuffer.offset: Current position
+ // CRITICAL: For subscribers starting from offset 0, try disk read first
+ // (historical data might exist from previous runs)
+ if requestedOffset == 0 && logBuffer.bufferStartOffset == 0 && logBuffer.offset == 0 {
+ // Initial state: try disk read before waiting for new data
+ if strings.Contains(logBuffer.name, "_schemas") {
+ glog.Infof("[SCHEMAS ReadFromBuffer] Initial state, trying disk read for offset 0")
+ }
+ return nil, -2, ResumeFromDiskError
+ }
+ // Otherwise, wait for new data to arrive
+ if strings.Contains(logBuffer.name, "_schemas") {
+ glog.Infof("[SCHEMAS ReadFromBuffer] Returning nil: waiting for offset %d to arrive", requestedOffset)
+ }
+ return nil, logBuffer.offset, nil
+ }
+ if strings.Contains(logBuffer.name, "_schemas") {
+ glog.Infof("[SCHEMAS ReadFromBuffer] Returning %d bytes from buffer", logBuffer.pos)
+ }
+ return copiedBytes(logBuffer.buf[:logBuffer.pos]), logBuffer.offset, nil
+ }
+
+ // Check previous buffers for the requested offset
+ for _, buf := range logBuffer.prevBuffers.buffers {
+ if requestedOffset >= buf.startOffset && requestedOffset <= buf.offset {
+ // If prevBuffer is empty, it means the data was flushed to disk
+ // (prevBuffers are created when buffer is flushed)
+ if buf.size == 0 {
+ // Empty prevBuffer covering this offset means data was flushed
+ return nil, -2, ResumeFromDiskError
+ }
+ return copiedBytes(buf.buf[:buf.size]), buf.offset, nil
+ }
+ }
+
+ // Offset not found in any buffer
+ if requestedOffset < logBuffer.bufferStartOffset {
+ // Data not in current buffers - must be on disk (flushed or never existed)
+ // Return ResumeFromDiskError to trigger disk read
+ return nil, -2, ResumeFromDiskError
+ }
+
+ if requestedOffset > logBuffer.offset {
+ // Future data, not available yet
+ return nil, logBuffer.offset, nil
+ }
+
+ // Offset not found - return nil
+ return nil, logBuffer.offset, nil
+ }
+
+ // TIMESTAMP-BASED READ (original logic)
// Read from disk and memory
// 1. read from disk, last time is = td
// 2. in memory, the earliest time = tm
@@ -254,52 +664,55 @@ func (logBuffer *LogBuffer) ReadFromBuffer(lastReadPosition MessagePosition) (bu
var tsBatchIndex int64
if !logBuffer.startTime.IsZero() {
tsMemory = logBuffer.startTime
- tsBatchIndex = logBuffer.batchIndex
+ tsBatchIndex = logBuffer.offset
}
for _, prevBuf := range logBuffer.prevBuffers.buffers {
if !prevBuf.startTime.IsZero() && prevBuf.startTime.Before(tsMemory) {
tsMemory = prevBuf.startTime
- tsBatchIndex = prevBuf.batchIndex
+ tsBatchIndex = prevBuf.offset
}
}
if tsMemory.IsZero() { // case 2.2
- // println("2.2 no data")
return nil, -2, nil
- } else if lastReadPosition.Before(tsMemory) && lastReadPosition.BatchIndex+1 < tsBatchIndex { // case 2.3
- if !logBuffer.lastFlushDataTime.IsZero() {
- glog.V(0).Infof("resume with last flush time: %v", logBuffer.lastFlushDataTime)
+ } else if lastReadPosition.Time.Before(tsMemory) && lastReadPosition.Offset+1 < tsBatchIndex { // case 2.3
+ // Special case: If requested time is zero (Unix epoch), treat as "start from beginning"
+ // This handles queries that want to read all data without knowing the exact start time
+ if lastReadPosition.Time.IsZero() || lastReadPosition.Time.Unix() == 0 {
+ // Start from the beginning of memory
+ // Fall through to case 2.1 to read from earliest buffer
+ } else {
+ // Data not in memory buffers - read from disk
+ glog.V(0).Infof("resume from disk: requested time %v < earliest memory time %v",
+ lastReadPosition.Time, tsMemory)
return nil, -2, ResumeFromDiskError
}
}
// the following is case 2.1
- if lastReadPosition.Equal(logBuffer.stopTime) {
- return nil, logBuffer.batchIndex, nil
+ if lastReadPosition.Time.Equal(logBuffer.stopTime) {
+ return nil, logBuffer.offset, nil
}
- if lastReadPosition.After(logBuffer.stopTime) {
+ if lastReadPosition.Time.After(logBuffer.stopTime) {
// glog.Fatalf("unexpected last read time %v, older than latest %v", lastReadPosition, m.stopTime)
- return nil, logBuffer.batchIndex, nil
+ return nil, logBuffer.offset, nil
}
- if lastReadPosition.Before(logBuffer.startTime) {
- // println("checking ", lastReadPosition.UnixNano())
+ if lastReadPosition.Time.Before(logBuffer.startTime) {
for _, buf := range logBuffer.prevBuffers.buffers {
if buf.startTime.After(lastReadPosition.Time) {
// glog.V(4).Infof("%s return the %d sealed buffer %v", m.name, i, buf.startTime)
- // println("return the", i, "th in memory", buf.startTime.UnixNano())
- return copiedBytes(buf.buf[:buf.size]), buf.batchIndex, nil
+ return copiedBytes(buf.buf[:buf.size]), buf.offset, nil
}
if !buf.startTime.After(lastReadPosition.Time) && buf.stopTime.After(lastReadPosition.Time) {
pos := buf.locateByTs(lastReadPosition.Time)
- // fmt.Printf("locate buffer[%d] pos %d\n", i, pos)
- return copiedBytes(buf.buf[pos:buf.size]), buf.batchIndex, nil
+ return copiedBytes(buf.buf[pos:buf.size]), buf.offset, nil
}
}
// glog.V(4).Infof("%s return the current buf %v", m.name, lastReadPosition)
- return copiedBytes(logBuffer.buf[:logBuffer.pos]), logBuffer.batchIndex, nil
+ return copiedBytes(logBuffer.buf[:logBuffer.pos]), logBuffer.offset, nil
}
- lastTs := lastReadPosition.UnixNano()
+ lastTs := lastReadPosition.Time.UnixNano()
l, h := 0, len(logBuffer.idx)-1
/*
@@ -311,9 +724,7 @@ func (logBuffer *LogBuffer) ReadFromBuffer(lastReadPosition MessagePosition) (bu
if entry == nil {
entry = event.EventNotification.NewEntry
}
- fmt.Printf("entry %d ts: %v offset:%d dir:%s name:%s\n", i, time.Unix(0, ts), pos, event.Directory, entry.Name)
}
- fmt.Printf("l=%d, h=%d\n", l, h)
*/
for l <= h {
@@ -328,16 +739,14 @@ func (logBuffer *LogBuffer) ReadFromBuffer(lastReadPosition MessagePosition) (bu
_, prevT = readTs(logBuffer.buf, logBuffer.idx[mid-1])
}
if prevT <= lastTs {
- // fmt.Printf("found l=%d, m-1=%d(ts=%d), m=%d(ts=%d), h=%d [%d, %d) \n", l, mid-1, prevT, mid, t, h, pos, m.pos)
- return copiedBytes(logBuffer.buf[pos:logBuffer.pos]), logBuffer.batchIndex, nil
+ return copiedBytes(logBuffer.buf[pos:logBuffer.pos]), logBuffer.offset, nil
}
h = mid
}
- // fmt.Printf("l=%d, h=%d\n", l, h)
}
- // FIXME: this could be that the buffer has been flushed already
- println("Not sure why no data", lastReadPosition.BatchIndex, tsBatchIndex)
+ // Binary search didn't find the timestamp - data may have been flushed to disk already
+ // Returning -2 signals to caller that data is not available in memory
return nil, -2, nil
}
@@ -352,11 +761,11 @@ func (logBuffer *LogBuffer) GetName() string {
return logBuffer.name
}
-// GetBatchIndex returns the current batch index for metadata tracking
-func (logBuffer *LogBuffer) GetBatchIndex() int64 {
+// GetOffset returns the current offset for metadata tracking
+func (logBuffer *LogBuffer) GetOffset() int64 {
logBuffer.RLock()
defer logBuffer.RUnlock()
- return logBuffer.batchIndex
+ return logBuffer.offset
}
var bufferPool = sync.Pool{