diff options
Diffstat (limited to 'weed/util/log_buffer/log_buffer.go')
| -rw-r--r-- | weed/util/log_buffer/log_buffer.go | 501 |
1 files changed, 455 insertions, 46 deletions
diff --git a/weed/util/log_buffer/log_buffer.go b/weed/util/log_buffer/log_buffer.go index 15ea062c6..aff8ec80b 100644 --- a/weed/util/log_buffer/log_buffer.go +++ b/weed/util/log_buffer/log_buffer.go @@ -2,6 +2,8 @@ package log_buffer import ( "bytes" + "math" + "strings" "sync" "sync/atomic" "time" @@ -21,11 +23,14 @@ type dataToFlush struct { startTime time.Time stopTime time.Time data *bytes.Buffer + minOffset int64 + maxOffset int64 + done chan struct{} // Signal when flush completes } type EachLogEntryFuncType func(logEntry *filer_pb.LogEntry) (isDone bool, err error) -type EachLogEntryWithBatchIndexFuncType func(logEntry *filer_pb.LogEntry, batchIndex int64) (isDone bool, err error) -type LogFlushFuncType func(logBuffer *LogBuffer, startTime, stopTime time.Time, buf []byte) +type EachLogEntryWithOffsetFuncType func(logEntry *filer_pb.LogEntry, offset int64) (isDone bool, err error) +type LogFlushFuncType func(logBuffer *LogBuffer, startTime, stopTime time.Time, buf []byte, minOffset, maxOffset int64) type LogReadFromDiskFuncType func(startPosition MessagePosition, stopTsNs int64, eachLogEntryFn EachLogEntryFuncType) (lastReadPosition MessagePosition, isDone bool, err error) type LogBuffer struct { @@ -33,7 +38,8 @@ type LogBuffer struct { name string prevBuffers *SealedBuffers buf []byte - batchIndex int64 + offset int64 // Last offset in current buffer (endOffset) + bufferStartOffset int64 // First offset in current buffer idx []int pos int startTime time.Time @@ -44,10 +50,19 @@ type LogBuffer struct { flushFn LogFlushFuncType ReadFromDiskFn LogReadFromDiskFuncType notifyFn func() - isStopping *atomic.Bool - isAllFlushed bool - flushChan chan *dataToFlush - LastTsNs atomic.Int64 + // Per-subscriber notification channels for instant wake-up + subscribersMu sync.RWMutex + subscribers map[string]chan struct{} // subscriberID -> notification channel + isStopping *atomic.Bool + isAllFlushed bool + flushChan chan *dataToFlush + LastTsNs atomic.Int64 + // Offset range tracking for Kafka integration + minOffset int64 + maxOffset int64 + hasOffsets bool + lastFlushedOffset atomic.Int64 // Highest offset that has been flushed to disk (-1 = nothing flushed yet) + lastFlushedTime atomic.Int64 // Latest timestamp that has been flushed to disk (0 = nothing flushed yet) sync.RWMutex } @@ -62,19 +77,250 @@ func NewLogBuffer(name string, flushInterval time.Duration, flushFn LogFlushFunc flushFn: flushFn, ReadFromDiskFn: readFromDiskFn, notifyFn: notifyFn, + subscribers: make(map[string]chan struct{}), flushChan: make(chan *dataToFlush, 256), isStopping: new(atomic.Bool), - batchIndex: time.Now().UnixNano(), // Initialize with creation time for uniqueness across restarts + offset: 0, // Will be initialized from existing data if available } + lb.lastFlushedOffset.Store(-1) // Nothing flushed to disk yet go lb.loopFlush() go lb.loopInterval() return lb } +// RegisterSubscriber registers a subscriber for instant notifications when data is written +// Returns a channel that will receive notifications (<1ms latency) +func (logBuffer *LogBuffer) RegisterSubscriber(subscriberID string) chan struct{} { + logBuffer.subscribersMu.Lock() + defer logBuffer.subscribersMu.Unlock() + + // Check if already registered + if existingChan, exists := logBuffer.subscribers[subscriberID]; exists { + glog.V(2).Infof("Subscriber %s already registered for %s, reusing channel", subscriberID, logBuffer.name) + return existingChan + } + + // Create buffered channel (size 1) so notifications never block + notifyChan := make(chan struct{}, 1) + logBuffer.subscribers[subscriberID] = notifyChan + glog.V(1).Infof("Registered subscriber %s for %s (total: %d)", subscriberID, logBuffer.name, len(logBuffer.subscribers)) + return notifyChan +} + +// UnregisterSubscriber removes a subscriber and closes its notification channel +func (logBuffer *LogBuffer) UnregisterSubscriber(subscriberID string) { + logBuffer.subscribersMu.Lock() + defer logBuffer.subscribersMu.Unlock() + + if ch, exists := logBuffer.subscribers[subscriberID]; exists { + close(ch) + delete(logBuffer.subscribers, subscriberID) + glog.V(1).Infof("Unregistered subscriber %s from %s (remaining: %d)", subscriberID, logBuffer.name, len(logBuffer.subscribers)) + } +} + +// IsOffsetInMemory checks if the given offset is available in the in-memory buffer +// Returns true if: +// 1. Offset is newer than what's been flushed to disk (must be in memory) +// 2. Offset is in current buffer or previous buffers (may be flushed but still in memory) +// Returns false if offset is older than memory buffers (only on disk) +func (logBuffer *LogBuffer) IsOffsetInMemory(offset int64) bool { + logBuffer.RLock() + defer logBuffer.RUnlock() + + // Check if we're tracking offsets at all + if !logBuffer.hasOffsets { + return false // No offsets tracked yet + } + + // OPTIMIZATION: If offset is newer than what's been flushed to disk, + // it MUST be in memory (not written to disk yet) + lastFlushed := logBuffer.lastFlushedOffset.Load() + if lastFlushed >= 0 && offset > lastFlushed { + glog.V(3).Infof("Offset %d is in memory (newer than lastFlushed=%d)", offset, lastFlushed) + return true + } + + // Check if offset is in current buffer range AND buffer has data + // (data can be both on disk AND in memory during flush window) + if offset >= logBuffer.bufferStartOffset && offset <= logBuffer.offset { + // CRITICAL: Check if buffer actually has data (pos > 0) + // After flush, pos=0 but range is still valid - data is on disk, not in memory + if logBuffer.pos > 0 { + glog.V(3).Infof("Offset %d is in current buffer [%d-%d] with data", offset, logBuffer.bufferStartOffset, logBuffer.offset) + return true + } + // Buffer is empty (just flushed) - data is on disk + glog.V(3).Infof("Offset %d in range [%d-%d] but buffer empty (pos=0), data on disk", offset, logBuffer.bufferStartOffset, logBuffer.offset) + return false + } + + // Check if offset is in previous buffers AND they have data + for _, buf := range logBuffer.prevBuffers.buffers { + if offset >= buf.startOffset && offset <= buf.offset { + // Check if prevBuffer actually has data + if buf.size > 0 { + glog.V(3).Infof("Offset %d is in previous buffer [%d-%d] with data", offset, buf.startOffset, buf.offset) + return true + } + // Buffer is empty (flushed) - data is on disk + glog.V(3).Infof("Offset %d in prevBuffer [%d-%d] but empty (size=0), data on disk", offset, buf.startOffset, buf.offset) + return false + } + } + + // Offset is older than memory buffers - only available on disk + glog.V(3).Infof("Offset %d is NOT in memory (bufferStart=%d, lastFlushed=%d)", offset, logBuffer.bufferStartOffset, lastFlushed) + return false +} + +// notifySubscribers sends notifications to all registered subscribers +// Non-blocking: uses select with default to avoid blocking on full channels +func (logBuffer *LogBuffer) notifySubscribers() { + logBuffer.subscribersMu.RLock() + defer logBuffer.subscribersMu.RUnlock() + + if len(logBuffer.subscribers) == 0 { + return // No subscribers, skip notification + } + + for subscriberID, notifyChan := range logBuffer.subscribers { + select { + case notifyChan <- struct{}{}: + // Notification sent successfully + glog.V(3).Infof("Notified subscriber %s for %s", subscriberID, logBuffer.name) + default: + // Channel full - subscriber hasn't consumed previous notification yet + // This is OK because one notification is sufficient to wake the subscriber + glog.V(3).Infof("Subscriber %s notification channel full (OK - already notified)", subscriberID) + } + } +} + +// InitializeOffsetFromExistingData initializes the offset counter from existing data on disk +// This should be called after LogBuffer creation to ensure offset continuity on restart +func (logBuffer *LogBuffer) InitializeOffsetFromExistingData(getHighestOffsetFn func() (int64, error)) error { + if getHighestOffsetFn == nil { + return nil // No initialization function provided + } + + highestOffset, err := getHighestOffsetFn() + if err != nil { + glog.V(0).Infof("Failed to get highest offset for %s: %v, starting from 0", logBuffer.name, err) + return nil // Continue with offset 0 if we can't read existing data + } + + if highestOffset >= 0 { + // Set the next offset to be one after the highest existing offset + nextOffset := highestOffset + 1 + logBuffer.offset = nextOffset + // CRITICAL FIX: bufferStartOffset should match offset after initialization + // This ensures that reads for old offsets (0...highestOffset) will trigger disk reads + // New data written after this will start at nextOffset + logBuffer.bufferStartOffset = nextOffset + // CRITICAL: Track that data [0...highestOffset] is on disk + logBuffer.lastFlushedOffset.Store(highestOffset) + // Set lastFlushedTime to current time (we know data up to highestOffset is on disk) + logBuffer.lastFlushedTime.Store(time.Now().UnixNano()) + glog.V(0).Infof("Initialized LogBuffer %s offset to %d (highest existing: %d), buffer starts at %d, lastFlushedOffset=%d, lastFlushedTime=%v", + logBuffer.name, nextOffset, highestOffset, nextOffset, highestOffset, time.Now()) + } else { + logBuffer.bufferStartOffset = 0 // Start from offset 0 + // No data on disk yet + glog.V(0).Infof("No existing data found for %s, starting from offset 0, lastFlushedOffset=-1, lastFlushedTime=0", logBuffer.name) + } + + return nil +} + func (logBuffer *LogBuffer) AddToBuffer(message *mq_pb.DataMessage) { logBuffer.AddDataToBuffer(message.Key, message.Value, message.TsNs) } +// AddLogEntryToBuffer directly adds a LogEntry to the buffer, preserving offset information +func (logBuffer *LogBuffer) AddLogEntryToBuffer(logEntry *filer_pb.LogEntry) { + logEntryData, _ := proto.Marshal(logEntry) + + var toFlush *dataToFlush + logBuffer.Lock() + defer func() { + logBuffer.Unlock() + if toFlush != nil { + logBuffer.flushChan <- toFlush + } + if logBuffer.notifyFn != nil { + logBuffer.notifyFn() + } + // Notify all registered subscribers instantly (<1ms latency) + logBuffer.notifySubscribers() + }() + + processingTsNs := logEntry.TsNs + ts := time.Unix(0, processingTsNs) + + // Handle timestamp collision inside lock (rare case) + if logBuffer.LastTsNs.Load() >= processingTsNs { + processingTsNs = logBuffer.LastTsNs.Add(1) + ts = time.Unix(0, processingTsNs) + // Re-marshal with corrected timestamp + logEntry.TsNs = processingTsNs + logEntryData, _ = proto.Marshal(logEntry) + } else { + logBuffer.LastTsNs.Store(processingTsNs) + } + + size := len(logEntryData) + + if logBuffer.pos == 0 { + logBuffer.startTime = ts + // Reset offset tracking for new buffer + logBuffer.hasOffsets = false + } + + // Track offset ranges for Kafka integration + // CRITICAL FIX: Use >= 0 to include offset 0 (first message in a topic) + if logEntry.Offset >= 0 { + if !logBuffer.hasOffsets { + logBuffer.minOffset = logEntry.Offset + logBuffer.maxOffset = logEntry.Offset + logBuffer.hasOffsets = true + } else { + if logEntry.Offset < logBuffer.minOffset { + logBuffer.minOffset = logEntry.Offset + } + if logEntry.Offset > logBuffer.maxOffset { + logBuffer.maxOffset = logEntry.Offset + } + } + } + + if logBuffer.startTime.Add(logBuffer.flushInterval).Before(ts) || len(logBuffer.buf)-logBuffer.pos < size+4 { + toFlush = logBuffer.copyToFlush() + logBuffer.startTime = ts + if len(logBuffer.buf) < size+4 { + // Validate size to prevent integer overflow in computation BEFORE allocation + const maxBufferSize = 1 << 30 // 1 GiB practical limit + // Ensure 2*size + 4 won't overflow int and stays within practical bounds + if size < 0 || size > (math.MaxInt-4)/2 || size > (maxBufferSize-4)/2 { + glog.Errorf("Buffer size out of valid range: %d bytes, skipping", size) + return + } + // Safe to compute now that we've validated size is in valid range + newSize := 2*size + 4 + logBuffer.buf = make([]byte, newSize) + } + } + logBuffer.stopTime = ts + + logBuffer.idx = append(logBuffer.idx, logBuffer.pos) + util.Uint32toBytes(logBuffer.sizeBuf, uint32(size)) + copy(logBuffer.buf[logBuffer.pos:logBuffer.pos+4], logBuffer.sizeBuf) + copy(logBuffer.buf[logBuffer.pos+4:logBuffer.pos+4+size], logEntryData) + logBuffer.pos += size + 4 + + logBuffer.offset++ +} + func (logBuffer *LogBuffer) AddDataToBuffer(partitionKey, data []byte, processingTsNs int64) { // PERFORMANCE OPTIMIZATION: Pre-process expensive operations OUTSIDE the lock @@ -105,6 +351,8 @@ func (logBuffer *LogBuffer) AddDataToBuffer(partitionKey, data []byte, processin if logBuffer.notifyFn != nil { logBuffer.notifyFn() } + // Notify all registered subscribers instantly (<1ms latency) + logBuffer.notifySubscribers() }() // Handle timestamp collision inside lock (rare case) @@ -125,11 +373,20 @@ func (logBuffer *LogBuffer) AddDataToBuffer(partitionKey, data []byte, processin } if logBuffer.startTime.Add(logBuffer.flushInterval).Before(ts) || len(logBuffer.buf)-logBuffer.pos < size+4 { - // glog.V(0).Infof("%s copyToFlush1 batch:%d count:%d start time %v, ts %v, remaining %d bytes", logBuffer.name, logBuffer.batchIndex, len(logBuffer.idx), logBuffer.startTime, ts, len(logBuffer.buf)-logBuffer.pos) + // glog.V(0).Infof("%s copyToFlush1 offset:%d count:%d start time %v, ts %v, remaining %d bytes", logBuffer.name, logBuffer.offset, len(logBuffer.idx), logBuffer.startTime, ts, len(logBuffer.buf)-logBuffer.pos) toFlush = logBuffer.copyToFlush() logBuffer.startTime = ts if len(logBuffer.buf) < size+4 { - logBuffer.buf = make([]byte, 2*size+4) + // Validate size to prevent integer overflow in computation BEFORE allocation + const maxBufferSize = 1 << 30 // 1 GiB practical limit + // Ensure 2*size + 4 won't overflow int and stays within practical bounds + if size < 0 || size > (math.MaxInt-4)/2 || size > (maxBufferSize-4)/2 { + glog.Errorf("Buffer size out of valid range: %d bytes, skipping", size) + return + } + // Safe to compute now that we've validated size is in valid range + newSize := 2*size + 4 + logBuffer.buf = make([]byte, newSize) } } logBuffer.stopTime = ts @@ -140,14 +397,44 @@ func (logBuffer *LogBuffer) AddDataToBuffer(partitionKey, data []byte, processin copy(logBuffer.buf[logBuffer.pos+4:logBuffer.pos+4+size], logEntryData) logBuffer.pos += size + 4 - // fmt.Printf("partitionKey %v entry size %d total %d count %d\n", string(partitionKey), size, m.pos, len(m.idx)) - } func (logBuffer *LogBuffer) IsStopping() bool { return logBuffer.isStopping.Load() } +// ForceFlush immediately flushes the current buffer content and WAITS for completion +// This is useful for critical topics that need immediate persistence +// CRITICAL: This function is now SYNCHRONOUS - it blocks until the flush completes +func (logBuffer *LogBuffer) ForceFlush() { + if logBuffer.isStopping.Load() { + return // Don't flush if we're shutting down + } + + logBuffer.Lock() + toFlush := logBuffer.copyToFlushWithCallback() + logBuffer.Unlock() + + if toFlush != nil { + // Send to flush channel (with reasonable timeout) + select { + case logBuffer.flushChan <- toFlush: + // Successfully queued for flush - now WAIT for it to complete + select { + case <-toFlush.done: + // Flush completed successfully + glog.V(1).Infof("ForceFlush completed for %s", logBuffer.name) + case <-time.After(5 * time.Second): + // Timeout waiting for flush - this shouldn't happen + glog.Warningf("ForceFlush timed out waiting for completion on %s", logBuffer.name) + } + case <-time.After(2 * time.Second): + // If flush channel is still blocked after 2s, something is wrong + glog.Warningf("ForceFlush channel timeout for %s - flush channel busy for 2s", logBuffer.name) + } + } +} + // ShutdownLogBuffer flushes the buffer and stops the log buffer func (logBuffer *LogBuffer) ShutdownLogBuffer() { isAlreadyStopped := logBuffer.isStopping.Swap(true) @@ -168,10 +455,24 @@ func (logBuffer *LogBuffer) loopFlush() { for d := range logBuffer.flushChan { if d != nil { // glog.V(4).Infof("%s flush [%v, %v] size %d", m.name, d.startTime, d.stopTime, len(d.data.Bytes())) - logBuffer.flushFn(logBuffer, d.startTime, d.stopTime, d.data.Bytes()) + logBuffer.flushFn(logBuffer, d.startTime, d.stopTime, d.data.Bytes(), d.minOffset, d.maxOffset) d.releaseMemory() // local logbuffer is different from aggregate logbuffer here logBuffer.lastFlushDataTime = d.stopTime + + // CRITICAL: Track what's been flushed to disk for both offset-based and time-based reads + // CRITICAL FIX: Use >= 0 to include offset 0 (first message in a topic) + if d.maxOffset >= 0 { + logBuffer.lastFlushedOffset.Store(d.maxOffset) + } + if !d.stopTime.IsZero() { + logBuffer.lastFlushedTime.Store(d.stopTime.UnixNano()) + } + + // Signal completion if there's a callback channel + if d.done != nil { + close(d.done) + } } } logBuffer.isAllFlushed = true @@ -183,6 +484,7 @@ func (logBuffer *LogBuffer) loopInterval() { if logBuffer.IsStopping() { return } + logBuffer.Lock() toFlush := logBuffer.copyToFlush() logBuffer.Unlock() @@ -196,27 +498,48 @@ func (logBuffer *LogBuffer) loopInterval() { } func (logBuffer *LogBuffer) copyToFlush() *dataToFlush { + return logBuffer.copyToFlushInternal(false) +} + +func (logBuffer *LogBuffer) copyToFlushWithCallback() *dataToFlush { + return logBuffer.copyToFlushInternal(true) +} + +func (logBuffer *LogBuffer) copyToFlushInternal(withCallback bool) *dataToFlush { if logBuffer.pos > 0 { - // fmt.Printf("flush buffer %d pos %d empty space %d\n", len(m.buf), m.pos, len(m.buf)-m.pos) var d *dataToFlush if logBuffer.flushFn != nil { d = &dataToFlush{ startTime: logBuffer.startTime, stopTime: logBuffer.stopTime, data: copiedBytes(logBuffer.buf[:logBuffer.pos]), + minOffset: logBuffer.minOffset, + maxOffset: logBuffer.maxOffset, + } + // Add callback channel for synchronous ForceFlush + if withCallback { + d.done = make(chan struct{}) } // glog.V(4).Infof("%s flushing [0,%d) with %d entries [%v, %v]", m.name, m.pos, len(m.idx), m.startTime, m.stopTime) } else { // glog.V(4).Infof("%s removed from memory [0,%d) with %d entries [%v, %v]", m.name, m.pos, len(m.idx), m.startTime, m.stopTime) logBuffer.lastFlushDataTime = logBuffer.stopTime } - logBuffer.buf = logBuffer.prevBuffers.SealBuffer(logBuffer.startTime, logBuffer.stopTime, logBuffer.buf, logBuffer.pos, logBuffer.batchIndex) + // CRITICAL: logBuffer.offset is the "next offset to assign", so last offset in buffer is offset-1 + lastOffsetInBuffer := logBuffer.offset - 1 + logBuffer.buf = logBuffer.prevBuffers.SealBuffer(logBuffer.startTime, logBuffer.stopTime, logBuffer.buf, logBuffer.pos, logBuffer.bufferStartOffset, lastOffsetInBuffer) logBuffer.startTime = time.Unix(0, 0) logBuffer.stopTime = time.Unix(0, 0) logBuffer.pos = 0 logBuffer.idx = logBuffer.idx[:0] - logBuffer.batchIndex++ + // DON'T increment offset - it's already pointing to the next offset! + // logBuffer.offset++ // REMOVED - this was causing offset gaps! + logBuffer.bufferStartOffset = logBuffer.offset // Next buffer starts at current offset (which is already the next one) + // Reset offset tracking + logBuffer.hasOffsets = false + logBuffer.minOffset = 0 + logBuffer.maxOffset = 0 return d } return nil @@ -227,8 +550,8 @@ func (logBuffer *LogBuffer) GetEarliestTime() time.Time { } func (logBuffer *LogBuffer) GetEarliestPosition() MessagePosition { return MessagePosition{ - Time: logBuffer.startTime, - BatchIndex: logBuffer.batchIndex, + Time: logBuffer.startTime, + Offset: logBuffer.offset, } } @@ -241,6 +564,93 @@ func (logBuffer *LogBuffer) ReadFromBuffer(lastReadPosition MessagePosition) (bu logBuffer.RLock() defer logBuffer.RUnlock() + isOffsetBased := lastReadPosition.IsOffsetBased + + // CRITICAL FIX: For offset-based subscriptions, use offset comparisons, not time comparisons! + if isOffsetBased { + requestedOffset := lastReadPosition.Offset + + // DEBUG: Log buffer state for _schemas topic + if strings.Contains(logBuffer.name, "_schemas") { + glog.Infof("[SCHEMAS ReadFromBuffer] requested=%d bufferStart=%d bufferEnd=%d pos=%d lastFlushed=%d", + requestedOffset, logBuffer.bufferStartOffset, logBuffer.offset, logBuffer.pos, logBuffer.lastFlushedOffset.Load()) + } + + // Check if the requested offset is in the current buffer range + if requestedOffset >= logBuffer.bufferStartOffset && requestedOffset <= logBuffer.offset { + // If current buffer is empty (pos=0), check if data is on disk or not yet written + if logBuffer.pos == 0 { + // CRITICAL FIX: If buffer is empty but offset range covers the request, + // it means data was in memory and has been flushed/moved out. + // The bufferStartOffset advancing to cover this offset proves data existed. + // + // Three cases: + // 1. requestedOffset < logBuffer.offset: Data was here, now flushed + // 2. requestedOffset == logBuffer.offset && bufferStartOffset > 0: Buffer advanced, data flushed + // 3. requestedOffset == logBuffer.offset && bufferStartOffset == 0: Initial state - try disk first! + // + // Cases 1 & 2: try disk read + // Case 3: try disk read (historical data might exist) + if requestedOffset < logBuffer.offset { + // Data was in the buffer range but buffer is now empty = flushed to disk + if strings.Contains(logBuffer.name, "_schemas") { + glog.Infof("[SCHEMAS ReadFromBuffer] Returning ResumeFromDiskError: empty buffer, offset %d was flushed (bufferStart=%d, offset=%d)", + requestedOffset, logBuffer.bufferStartOffset, logBuffer.offset) + } + return nil, -2, ResumeFromDiskError + } + // requestedOffset == logBuffer.offset: Current position + // CRITICAL: For subscribers starting from offset 0, try disk read first + // (historical data might exist from previous runs) + if requestedOffset == 0 && logBuffer.bufferStartOffset == 0 && logBuffer.offset == 0 { + // Initial state: try disk read before waiting for new data + if strings.Contains(logBuffer.name, "_schemas") { + glog.Infof("[SCHEMAS ReadFromBuffer] Initial state, trying disk read for offset 0") + } + return nil, -2, ResumeFromDiskError + } + // Otherwise, wait for new data to arrive + if strings.Contains(logBuffer.name, "_schemas") { + glog.Infof("[SCHEMAS ReadFromBuffer] Returning nil: waiting for offset %d to arrive", requestedOffset) + } + return nil, logBuffer.offset, nil + } + if strings.Contains(logBuffer.name, "_schemas") { + glog.Infof("[SCHEMAS ReadFromBuffer] Returning %d bytes from buffer", logBuffer.pos) + } + return copiedBytes(logBuffer.buf[:logBuffer.pos]), logBuffer.offset, nil + } + + // Check previous buffers for the requested offset + for _, buf := range logBuffer.prevBuffers.buffers { + if requestedOffset >= buf.startOffset && requestedOffset <= buf.offset { + // If prevBuffer is empty, it means the data was flushed to disk + // (prevBuffers are created when buffer is flushed) + if buf.size == 0 { + // Empty prevBuffer covering this offset means data was flushed + return nil, -2, ResumeFromDiskError + } + return copiedBytes(buf.buf[:buf.size]), buf.offset, nil + } + } + + // Offset not found in any buffer + if requestedOffset < logBuffer.bufferStartOffset { + // Data not in current buffers - must be on disk (flushed or never existed) + // Return ResumeFromDiskError to trigger disk read + return nil, -2, ResumeFromDiskError + } + + if requestedOffset > logBuffer.offset { + // Future data, not available yet + return nil, logBuffer.offset, nil + } + + // Offset not found - return nil + return nil, logBuffer.offset, nil + } + + // TIMESTAMP-BASED READ (original logic) // Read from disk and memory // 1. read from disk, last time is = td // 2. in memory, the earliest time = tm @@ -254,52 +664,55 @@ func (logBuffer *LogBuffer) ReadFromBuffer(lastReadPosition MessagePosition) (bu var tsBatchIndex int64 if !logBuffer.startTime.IsZero() { tsMemory = logBuffer.startTime - tsBatchIndex = logBuffer.batchIndex + tsBatchIndex = logBuffer.offset } for _, prevBuf := range logBuffer.prevBuffers.buffers { if !prevBuf.startTime.IsZero() && prevBuf.startTime.Before(tsMemory) { tsMemory = prevBuf.startTime - tsBatchIndex = prevBuf.batchIndex + tsBatchIndex = prevBuf.offset } } if tsMemory.IsZero() { // case 2.2 - // println("2.2 no data") return nil, -2, nil - } else if lastReadPosition.Before(tsMemory) && lastReadPosition.BatchIndex+1 < tsBatchIndex { // case 2.3 - if !logBuffer.lastFlushDataTime.IsZero() { - glog.V(0).Infof("resume with last flush time: %v", logBuffer.lastFlushDataTime) + } else if lastReadPosition.Time.Before(tsMemory) && lastReadPosition.Offset+1 < tsBatchIndex { // case 2.3 + // Special case: If requested time is zero (Unix epoch), treat as "start from beginning" + // This handles queries that want to read all data without knowing the exact start time + if lastReadPosition.Time.IsZero() || lastReadPosition.Time.Unix() == 0 { + // Start from the beginning of memory + // Fall through to case 2.1 to read from earliest buffer + } else { + // Data not in memory buffers - read from disk + glog.V(0).Infof("resume from disk: requested time %v < earliest memory time %v", + lastReadPosition.Time, tsMemory) return nil, -2, ResumeFromDiskError } } // the following is case 2.1 - if lastReadPosition.Equal(logBuffer.stopTime) { - return nil, logBuffer.batchIndex, nil + if lastReadPosition.Time.Equal(logBuffer.stopTime) { + return nil, logBuffer.offset, nil } - if lastReadPosition.After(logBuffer.stopTime) { + if lastReadPosition.Time.After(logBuffer.stopTime) { // glog.Fatalf("unexpected last read time %v, older than latest %v", lastReadPosition, m.stopTime) - return nil, logBuffer.batchIndex, nil + return nil, logBuffer.offset, nil } - if lastReadPosition.Before(logBuffer.startTime) { - // println("checking ", lastReadPosition.UnixNano()) + if lastReadPosition.Time.Before(logBuffer.startTime) { for _, buf := range logBuffer.prevBuffers.buffers { if buf.startTime.After(lastReadPosition.Time) { // glog.V(4).Infof("%s return the %d sealed buffer %v", m.name, i, buf.startTime) - // println("return the", i, "th in memory", buf.startTime.UnixNano()) - return copiedBytes(buf.buf[:buf.size]), buf.batchIndex, nil + return copiedBytes(buf.buf[:buf.size]), buf.offset, nil } if !buf.startTime.After(lastReadPosition.Time) && buf.stopTime.After(lastReadPosition.Time) { pos := buf.locateByTs(lastReadPosition.Time) - // fmt.Printf("locate buffer[%d] pos %d\n", i, pos) - return copiedBytes(buf.buf[pos:buf.size]), buf.batchIndex, nil + return copiedBytes(buf.buf[pos:buf.size]), buf.offset, nil } } // glog.V(4).Infof("%s return the current buf %v", m.name, lastReadPosition) - return copiedBytes(logBuffer.buf[:logBuffer.pos]), logBuffer.batchIndex, nil + return copiedBytes(logBuffer.buf[:logBuffer.pos]), logBuffer.offset, nil } - lastTs := lastReadPosition.UnixNano() + lastTs := lastReadPosition.Time.UnixNano() l, h := 0, len(logBuffer.idx)-1 /* @@ -311,9 +724,7 @@ func (logBuffer *LogBuffer) ReadFromBuffer(lastReadPosition MessagePosition) (bu if entry == nil { entry = event.EventNotification.NewEntry } - fmt.Printf("entry %d ts: %v offset:%d dir:%s name:%s\n", i, time.Unix(0, ts), pos, event.Directory, entry.Name) } - fmt.Printf("l=%d, h=%d\n", l, h) */ for l <= h { @@ -328,16 +739,14 @@ func (logBuffer *LogBuffer) ReadFromBuffer(lastReadPosition MessagePosition) (bu _, prevT = readTs(logBuffer.buf, logBuffer.idx[mid-1]) } if prevT <= lastTs { - // fmt.Printf("found l=%d, m-1=%d(ts=%d), m=%d(ts=%d), h=%d [%d, %d) \n", l, mid-1, prevT, mid, t, h, pos, m.pos) - return copiedBytes(logBuffer.buf[pos:logBuffer.pos]), logBuffer.batchIndex, nil + return copiedBytes(logBuffer.buf[pos:logBuffer.pos]), logBuffer.offset, nil } h = mid } - // fmt.Printf("l=%d, h=%d\n", l, h) } - // FIXME: this could be that the buffer has been flushed already - println("Not sure why no data", lastReadPosition.BatchIndex, tsBatchIndex) + // Binary search didn't find the timestamp - data may have been flushed to disk already + // Returning -2 signals to caller that data is not available in memory return nil, -2, nil } @@ -352,11 +761,11 @@ func (logBuffer *LogBuffer) GetName() string { return logBuffer.name } -// GetBatchIndex returns the current batch index for metadata tracking -func (logBuffer *LogBuffer) GetBatchIndex() int64 { +// GetOffset returns the current offset for metadata tracking +func (logBuffer *LogBuffer) GetOffset() int64 { logBuffer.RLock() defer logBuffer.RUnlock() - return logBuffer.batchIndex + return logBuffer.offset } var bufferPool = sync.Pool{ |
