diff options
Diffstat (limited to 'weed/util')
| -rw-r--r-- | weed/util/log_buffer/disk_buffer_cache.go | 195 | ||||
| -rw-r--r-- | weed/util/log_buffer/log_buffer.go | 501 | ||||
| -rw-r--r-- | weed/util/log_buffer/log_buffer_queryability_test.go | 238 | ||||
| -rw-r--r-- | weed/util/log_buffer/log_buffer_test.go | 485 | ||||
| -rw-r--r-- | weed/util/log_buffer/log_read.go | 233 | ||||
| -rw-r--r-- | weed/util/log_buffer/log_read_test.go | 329 | ||||
| -rw-r--r-- | weed/util/log_buffer/sealed_buffer.go | 19 |
7 files changed, 1911 insertions, 89 deletions
diff --git a/weed/util/log_buffer/disk_buffer_cache.go b/weed/util/log_buffer/disk_buffer_cache.go new file mode 100644 index 000000000..ceafa9329 --- /dev/null +++ b/weed/util/log_buffer/disk_buffer_cache.go @@ -0,0 +1,195 @@ +package log_buffer + +import ( + "container/list" + "sync" + "time" + + "github.com/seaweedfs/seaweedfs/weed/glog" +) + +// DiskBufferCache is a small LRU cache for recently-read historical data buffers +// This reduces Filer load when multiple consumers are catching up on historical messages +type DiskBufferCache struct { + maxSize int + ttl time.Duration + cache map[string]*cacheEntry + lruList *list.List + mu sync.RWMutex + hits int64 + misses int64 + evictions int64 +} + +type cacheEntry struct { + key string + data []byte + offset int64 + timestamp time.Time + lruElement *list.Element + isNegative bool // true if this is a negative cache entry (data not found) +} + +// NewDiskBufferCache creates a new cache with the specified size and TTL +// Recommended size: 3-5 buffers (each ~8MB) +// Recommended TTL: 30-60 seconds +func NewDiskBufferCache(maxSize int, ttl time.Duration) *DiskBufferCache { + cache := &DiskBufferCache{ + maxSize: maxSize, + ttl: ttl, + cache: make(map[string]*cacheEntry), + lruList: list.New(), + } + + // Start background cleanup goroutine + go cache.cleanupLoop() + + return cache +} + +// Get retrieves a buffer from the cache +// Returns (data, offset, found) +// If found=true and data=nil, this is a negative cache entry (data doesn't exist) +func (c *DiskBufferCache) Get(key string) ([]byte, int64, bool) { + c.mu.Lock() + defer c.mu.Unlock() + + entry, exists := c.cache[key] + if !exists { + c.misses++ + return nil, 0, false + } + + // Check if entry has expired + if time.Since(entry.timestamp) > c.ttl { + c.evict(entry) + c.misses++ + return nil, 0, false + } + + // Move to front of LRU list (most recently used) + c.lruList.MoveToFront(entry.lruElement) + c.hits++ + + if entry.isNegative { + glog.V(4).Infof("📦 CACHE HIT (NEGATIVE): key=%s - data not found (hits=%d misses=%d)", + key, c.hits, c.misses) + } else { + glog.V(4).Infof("📦 CACHE HIT: key=%s offset=%d size=%d (hits=%d misses=%d)", + key, entry.offset, len(entry.data), c.hits, c.misses) + } + + return entry.data, entry.offset, true +} + +// Put adds a buffer to the cache +// If data is nil, this creates a negative cache entry (data doesn't exist) +func (c *DiskBufferCache) Put(key string, data []byte, offset int64) { + c.mu.Lock() + defer c.mu.Unlock() + + isNegative := data == nil + + // Check if entry already exists + if entry, exists := c.cache[key]; exists { + // Update existing entry + entry.data = data + entry.offset = offset + entry.timestamp = time.Now() + entry.isNegative = isNegative + c.lruList.MoveToFront(entry.lruElement) + if isNegative { + glog.V(4).Infof("📦 CACHE UPDATE (NEGATIVE): key=%s - data not found", key) + } else { + glog.V(4).Infof("📦 CACHE UPDATE: key=%s offset=%d size=%d", key, offset, len(data)) + } + return + } + + // Evict oldest entry if cache is full + if c.lruList.Len() >= c.maxSize { + oldest := c.lruList.Back() + if oldest != nil { + c.evict(oldest.Value.(*cacheEntry)) + } + } + + // Add new entry + entry := &cacheEntry{ + key: key, + data: data, + offset: offset, + timestamp: time.Now(), + isNegative: isNegative, + } + entry.lruElement = c.lruList.PushFront(entry) + c.cache[key] = entry + + if isNegative { + glog.V(4).Infof("📦 CACHE PUT (NEGATIVE): key=%s - data not found (cache_size=%d/%d)", + key, c.lruList.Len(), c.maxSize) + } else { + glog.V(4).Infof("📦 CACHE PUT: key=%s offset=%d size=%d (cache_size=%d/%d)", + key, offset, len(data), c.lruList.Len(), c.maxSize) + } +} + +// evict removes an entry from the cache (must be called with lock held) +func (c *DiskBufferCache) evict(entry *cacheEntry) { + delete(c.cache, entry.key) + c.lruList.Remove(entry.lruElement) + c.evictions++ + glog.V(4).Infof("📦 CACHE EVICT: key=%s (evictions=%d)", entry.key, c.evictions) +} + +// cleanupLoop periodically removes expired entries +func (c *DiskBufferCache) cleanupLoop() { + ticker := time.NewTicker(c.ttl / 2) + defer ticker.Stop() + + for range ticker.C { + c.cleanup() + } +} + +// cleanup removes expired entries +func (c *DiskBufferCache) cleanup() { + c.mu.Lock() + defer c.mu.Unlock() + + now := time.Now() + var toEvict []*cacheEntry + + // Find expired entries + for _, entry := range c.cache { + if now.Sub(entry.timestamp) > c.ttl { + toEvict = append(toEvict, entry) + } + } + + // Evict expired entries + for _, entry := range toEvict { + c.evict(entry) + } + + if len(toEvict) > 0 { + glog.V(3).Infof("📦 CACHE CLEANUP: evicted %d expired entries", len(toEvict)) + } +} + +// Stats returns cache statistics +func (c *DiskBufferCache) Stats() (hits, misses, evictions int64, size int) { + c.mu.RLock() + defer c.mu.RUnlock() + return c.hits, c.misses, c.evictions, c.lruList.Len() +} + +// Clear removes all entries from the cache +func (c *DiskBufferCache) Clear() { + c.mu.Lock() + defer c.mu.Unlock() + + c.cache = make(map[string]*cacheEntry) + c.lruList = list.New() + glog.V(2).Infof("📦 CACHE CLEARED") +} diff --git a/weed/util/log_buffer/log_buffer.go b/weed/util/log_buffer/log_buffer.go index 15ea062c6..aff8ec80b 100644 --- a/weed/util/log_buffer/log_buffer.go +++ b/weed/util/log_buffer/log_buffer.go @@ -2,6 +2,8 @@ package log_buffer import ( "bytes" + "math" + "strings" "sync" "sync/atomic" "time" @@ -21,11 +23,14 @@ type dataToFlush struct { startTime time.Time stopTime time.Time data *bytes.Buffer + minOffset int64 + maxOffset int64 + done chan struct{} // Signal when flush completes } type EachLogEntryFuncType func(logEntry *filer_pb.LogEntry) (isDone bool, err error) -type EachLogEntryWithBatchIndexFuncType func(logEntry *filer_pb.LogEntry, batchIndex int64) (isDone bool, err error) -type LogFlushFuncType func(logBuffer *LogBuffer, startTime, stopTime time.Time, buf []byte) +type EachLogEntryWithOffsetFuncType func(logEntry *filer_pb.LogEntry, offset int64) (isDone bool, err error) +type LogFlushFuncType func(logBuffer *LogBuffer, startTime, stopTime time.Time, buf []byte, minOffset, maxOffset int64) type LogReadFromDiskFuncType func(startPosition MessagePosition, stopTsNs int64, eachLogEntryFn EachLogEntryFuncType) (lastReadPosition MessagePosition, isDone bool, err error) type LogBuffer struct { @@ -33,7 +38,8 @@ type LogBuffer struct { name string prevBuffers *SealedBuffers buf []byte - batchIndex int64 + offset int64 // Last offset in current buffer (endOffset) + bufferStartOffset int64 // First offset in current buffer idx []int pos int startTime time.Time @@ -44,10 +50,19 @@ type LogBuffer struct { flushFn LogFlushFuncType ReadFromDiskFn LogReadFromDiskFuncType notifyFn func() - isStopping *atomic.Bool - isAllFlushed bool - flushChan chan *dataToFlush - LastTsNs atomic.Int64 + // Per-subscriber notification channels for instant wake-up + subscribersMu sync.RWMutex + subscribers map[string]chan struct{} // subscriberID -> notification channel + isStopping *atomic.Bool + isAllFlushed bool + flushChan chan *dataToFlush + LastTsNs atomic.Int64 + // Offset range tracking for Kafka integration + minOffset int64 + maxOffset int64 + hasOffsets bool + lastFlushedOffset atomic.Int64 // Highest offset that has been flushed to disk (-1 = nothing flushed yet) + lastFlushedTime atomic.Int64 // Latest timestamp that has been flushed to disk (0 = nothing flushed yet) sync.RWMutex } @@ -62,19 +77,250 @@ func NewLogBuffer(name string, flushInterval time.Duration, flushFn LogFlushFunc flushFn: flushFn, ReadFromDiskFn: readFromDiskFn, notifyFn: notifyFn, + subscribers: make(map[string]chan struct{}), flushChan: make(chan *dataToFlush, 256), isStopping: new(atomic.Bool), - batchIndex: time.Now().UnixNano(), // Initialize with creation time for uniqueness across restarts + offset: 0, // Will be initialized from existing data if available } + lb.lastFlushedOffset.Store(-1) // Nothing flushed to disk yet go lb.loopFlush() go lb.loopInterval() return lb } +// RegisterSubscriber registers a subscriber for instant notifications when data is written +// Returns a channel that will receive notifications (<1ms latency) +func (logBuffer *LogBuffer) RegisterSubscriber(subscriberID string) chan struct{} { + logBuffer.subscribersMu.Lock() + defer logBuffer.subscribersMu.Unlock() + + // Check if already registered + if existingChan, exists := logBuffer.subscribers[subscriberID]; exists { + glog.V(2).Infof("Subscriber %s already registered for %s, reusing channel", subscriberID, logBuffer.name) + return existingChan + } + + // Create buffered channel (size 1) so notifications never block + notifyChan := make(chan struct{}, 1) + logBuffer.subscribers[subscriberID] = notifyChan + glog.V(1).Infof("Registered subscriber %s for %s (total: %d)", subscriberID, logBuffer.name, len(logBuffer.subscribers)) + return notifyChan +} + +// UnregisterSubscriber removes a subscriber and closes its notification channel +func (logBuffer *LogBuffer) UnregisterSubscriber(subscriberID string) { + logBuffer.subscribersMu.Lock() + defer logBuffer.subscribersMu.Unlock() + + if ch, exists := logBuffer.subscribers[subscriberID]; exists { + close(ch) + delete(logBuffer.subscribers, subscriberID) + glog.V(1).Infof("Unregistered subscriber %s from %s (remaining: %d)", subscriberID, logBuffer.name, len(logBuffer.subscribers)) + } +} + +// IsOffsetInMemory checks if the given offset is available in the in-memory buffer +// Returns true if: +// 1. Offset is newer than what's been flushed to disk (must be in memory) +// 2. Offset is in current buffer or previous buffers (may be flushed but still in memory) +// Returns false if offset is older than memory buffers (only on disk) +func (logBuffer *LogBuffer) IsOffsetInMemory(offset int64) bool { + logBuffer.RLock() + defer logBuffer.RUnlock() + + // Check if we're tracking offsets at all + if !logBuffer.hasOffsets { + return false // No offsets tracked yet + } + + // OPTIMIZATION: If offset is newer than what's been flushed to disk, + // it MUST be in memory (not written to disk yet) + lastFlushed := logBuffer.lastFlushedOffset.Load() + if lastFlushed >= 0 && offset > lastFlushed { + glog.V(3).Infof("Offset %d is in memory (newer than lastFlushed=%d)", offset, lastFlushed) + return true + } + + // Check if offset is in current buffer range AND buffer has data + // (data can be both on disk AND in memory during flush window) + if offset >= logBuffer.bufferStartOffset && offset <= logBuffer.offset { + // CRITICAL: Check if buffer actually has data (pos > 0) + // After flush, pos=0 but range is still valid - data is on disk, not in memory + if logBuffer.pos > 0 { + glog.V(3).Infof("Offset %d is in current buffer [%d-%d] with data", offset, logBuffer.bufferStartOffset, logBuffer.offset) + return true + } + // Buffer is empty (just flushed) - data is on disk + glog.V(3).Infof("Offset %d in range [%d-%d] but buffer empty (pos=0), data on disk", offset, logBuffer.bufferStartOffset, logBuffer.offset) + return false + } + + // Check if offset is in previous buffers AND they have data + for _, buf := range logBuffer.prevBuffers.buffers { + if offset >= buf.startOffset && offset <= buf.offset { + // Check if prevBuffer actually has data + if buf.size > 0 { + glog.V(3).Infof("Offset %d is in previous buffer [%d-%d] with data", offset, buf.startOffset, buf.offset) + return true + } + // Buffer is empty (flushed) - data is on disk + glog.V(3).Infof("Offset %d in prevBuffer [%d-%d] but empty (size=0), data on disk", offset, buf.startOffset, buf.offset) + return false + } + } + + // Offset is older than memory buffers - only available on disk + glog.V(3).Infof("Offset %d is NOT in memory (bufferStart=%d, lastFlushed=%d)", offset, logBuffer.bufferStartOffset, lastFlushed) + return false +} + +// notifySubscribers sends notifications to all registered subscribers +// Non-blocking: uses select with default to avoid blocking on full channels +func (logBuffer *LogBuffer) notifySubscribers() { + logBuffer.subscribersMu.RLock() + defer logBuffer.subscribersMu.RUnlock() + + if len(logBuffer.subscribers) == 0 { + return // No subscribers, skip notification + } + + for subscriberID, notifyChan := range logBuffer.subscribers { + select { + case notifyChan <- struct{}{}: + // Notification sent successfully + glog.V(3).Infof("Notified subscriber %s for %s", subscriberID, logBuffer.name) + default: + // Channel full - subscriber hasn't consumed previous notification yet + // This is OK because one notification is sufficient to wake the subscriber + glog.V(3).Infof("Subscriber %s notification channel full (OK - already notified)", subscriberID) + } + } +} + +// InitializeOffsetFromExistingData initializes the offset counter from existing data on disk +// This should be called after LogBuffer creation to ensure offset continuity on restart +func (logBuffer *LogBuffer) InitializeOffsetFromExistingData(getHighestOffsetFn func() (int64, error)) error { + if getHighestOffsetFn == nil { + return nil // No initialization function provided + } + + highestOffset, err := getHighestOffsetFn() + if err != nil { + glog.V(0).Infof("Failed to get highest offset for %s: %v, starting from 0", logBuffer.name, err) + return nil // Continue with offset 0 if we can't read existing data + } + + if highestOffset >= 0 { + // Set the next offset to be one after the highest existing offset + nextOffset := highestOffset + 1 + logBuffer.offset = nextOffset + // CRITICAL FIX: bufferStartOffset should match offset after initialization + // This ensures that reads for old offsets (0...highestOffset) will trigger disk reads + // New data written after this will start at nextOffset + logBuffer.bufferStartOffset = nextOffset + // CRITICAL: Track that data [0...highestOffset] is on disk + logBuffer.lastFlushedOffset.Store(highestOffset) + // Set lastFlushedTime to current time (we know data up to highestOffset is on disk) + logBuffer.lastFlushedTime.Store(time.Now().UnixNano()) + glog.V(0).Infof("Initialized LogBuffer %s offset to %d (highest existing: %d), buffer starts at %d, lastFlushedOffset=%d, lastFlushedTime=%v", + logBuffer.name, nextOffset, highestOffset, nextOffset, highestOffset, time.Now()) + } else { + logBuffer.bufferStartOffset = 0 // Start from offset 0 + // No data on disk yet + glog.V(0).Infof("No existing data found for %s, starting from offset 0, lastFlushedOffset=-1, lastFlushedTime=0", logBuffer.name) + } + + return nil +} + func (logBuffer *LogBuffer) AddToBuffer(message *mq_pb.DataMessage) { logBuffer.AddDataToBuffer(message.Key, message.Value, message.TsNs) } +// AddLogEntryToBuffer directly adds a LogEntry to the buffer, preserving offset information +func (logBuffer *LogBuffer) AddLogEntryToBuffer(logEntry *filer_pb.LogEntry) { + logEntryData, _ := proto.Marshal(logEntry) + + var toFlush *dataToFlush + logBuffer.Lock() + defer func() { + logBuffer.Unlock() + if toFlush != nil { + logBuffer.flushChan <- toFlush + } + if logBuffer.notifyFn != nil { + logBuffer.notifyFn() + } + // Notify all registered subscribers instantly (<1ms latency) + logBuffer.notifySubscribers() + }() + + processingTsNs := logEntry.TsNs + ts := time.Unix(0, processingTsNs) + + // Handle timestamp collision inside lock (rare case) + if logBuffer.LastTsNs.Load() >= processingTsNs { + processingTsNs = logBuffer.LastTsNs.Add(1) + ts = time.Unix(0, processingTsNs) + // Re-marshal with corrected timestamp + logEntry.TsNs = processingTsNs + logEntryData, _ = proto.Marshal(logEntry) + } else { + logBuffer.LastTsNs.Store(processingTsNs) + } + + size := len(logEntryData) + + if logBuffer.pos == 0 { + logBuffer.startTime = ts + // Reset offset tracking for new buffer + logBuffer.hasOffsets = false + } + + // Track offset ranges for Kafka integration + // CRITICAL FIX: Use >= 0 to include offset 0 (first message in a topic) + if logEntry.Offset >= 0 { + if !logBuffer.hasOffsets { + logBuffer.minOffset = logEntry.Offset + logBuffer.maxOffset = logEntry.Offset + logBuffer.hasOffsets = true + } else { + if logEntry.Offset < logBuffer.minOffset { + logBuffer.minOffset = logEntry.Offset + } + if logEntry.Offset > logBuffer.maxOffset { + logBuffer.maxOffset = logEntry.Offset + } + } + } + + if logBuffer.startTime.Add(logBuffer.flushInterval).Before(ts) || len(logBuffer.buf)-logBuffer.pos < size+4 { + toFlush = logBuffer.copyToFlush() + logBuffer.startTime = ts + if len(logBuffer.buf) < size+4 { + // Validate size to prevent integer overflow in computation BEFORE allocation + const maxBufferSize = 1 << 30 // 1 GiB practical limit + // Ensure 2*size + 4 won't overflow int and stays within practical bounds + if size < 0 || size > (math.MaxInt-4)/2 || size > (maxBufferSize-4)/2 { + glog.Errorf("Buffer size out of valid range: %d bytes, skipping", size) + return + } + // Safe to compute now that we've validated size is in valid range + newSize := 2*size + 4 + logBuffer.buf = make([]byte, newSize) + } + } + logBuffer.stopTime = ts + + logBuffer.idx = append(logBuffer.idx, logBuffer.pos) + util.Uint32toBytes(logBuffer.sizeBuf, uint32(size)) + copy(logBuffer.buf[logBuffer.pos:logBuffer.pos+4], logBuffer.sizeBuf) + copy(logBuffer.buf[logBuffer.pos+4:logBuffer.pos+4+size], logEntryData) + logBuffer.pos += size + 4 + + logBuffer.offset++ +} + func (logBuffer *LogBuffer) AddDataToBuffer(partitionKey, data []byte, processingTsNs int64) { // PERFORMANCE OPTIMIZATION: Pre-process expensive operations OUTSIDE the lock @@ -105,6 +351,8 @@ func (logBuffer *LogBuffer) AddDataToBuffer(partitionKey, data []byte, processin if logBuffer.notifyFn != nil { logBuffer.notifyFn() } + // Notify all registered subscribers instantly (<1ms latency) + logBuffer.notifySubscribers() }() // Handle timestamp collision inside lock (rare case) @@ -125,11 +373,20 @@ func (logBuffer *LogBuffer) AddDataToBuffer(partitionKey, data []byte, processin } if logBuffer.startTime.Add(logBuffer.flushInterval).Before(ts) || len(logBuffer.buf)-logBuffer.pos < size+4 { - // glog.V(0).Infof("%s copyToFlush1 batch:%d count:%d start time %v, ts %v, remaining %d bytes", logBuffer.name, logBuffer.batchIndex, len(logBuffer.idx), logBuffer.startTime, ts, len(logBuffer.buf)-logBuffer.pos) + // glog.V(0).Infof("%s copyToFlush1 offset:%d count:%d start time %v, ts %v, remaining %d bytes", logBuffer.name, logBuffer.offset, len(logBuffer.idx), logBuffer.startTime, ts, len(logBuffer.buf)-logBuffer.pos) toFlush = logBuffer.copyToFlush() logBuffer.startTime = ts if len(logBuffer.buf) < size+4 { - logBuffer.buf = make([]byte, 2*size+4) + // Validate size to prevent integer overflow in computation BEFORE allocation + const maxBufferSize = 1 << 30 // 1 GiB practical limit + // Ensure 2*size + 4 won't overflow int and stays within practical bounds + if size < 0 || size > (math.MaxInt-4)/2 || size > (maxBufferSize-4)/2 { + glog.Errorf("Buffer size out of valid range: %d bytes, skipping", size) + return + } + // Safe to compute now that we've validated size is in valid range + newSize := 2*size + 4 + logBuffer.buf = make([]byte, newSize) } } logBuffer.stopTime = ts @@ -140,14 +397,44 @@ func (logBuffer *LogBuffer) AddDataToBuffer(partitionKey, data []byte, processin copy(logBuffer.buf[logBuffer.pos+4:logBuffer.pos+4+size], logEntryData) logBuffer.pos += size + 4 - // fmt.Printf("partitionKey %v entry size %d total %d count %d\n", string(partitionKey), size, m.pos, len(m.idx)) - } func (logBuffer *LogBuffer) IsStopping() bool { return logBuffer.isStopping.Load() } +// ForceFlush immediately flushes the current buffer content and WAITS for completion +// This is useful for critical topics that need immediate persistence +// CRITICAL: This function is now SYNCHRONOUS - it blocks until the flush completes +func (logBuffer *LogBuffer) ForceFlush() { + if logBuffer.isStopping.Load() { + return // Don't flush if we're shutting down + } + + logBuffer.Lock() + toFlush := logBuffer.copyToFlushWithCallback() + logBuffer.Unlock() + + if toFlush != nil { + // Send to flush channel (with reasonable timeout) + select { + case logBuffer.flushChan <- toFlush: + // Successfully queued for flush - now WAIT for it to complete + select { + case <-toFlush.done: + // Flush completed successfully + glog.V(1).Infof("ForceFlush completed for %s", logBuffer.name) + case <-time.After(5 * time.Second): + // Timeout waiting for flush - this shouldn't happen + glog.Warningf("ForceFlush timed out waiting for completion on %s", logBuffer.name) + } + case <-time.After(2 * time.Second): + // If flush channel is still blocked after 2s, something is wrong + glog.Warningf("ForceFlush channel timeout for %s - flush channel busy for 2s", logBuffer.name) + } + } +} + // ShutdownLogBuffer flushes the buffer and stops the log buffer func (logBuffer *LogBuffer) ShutdownLogBuffer() { isAlreadyStopped := logBuffer.isStopping.Swap(true) @@ -168,10 +455,24 @@ func (logBuffer *LogBuffer) loopFlush() { for d := range logBuffer.flushChan { if d != nil { // glog.V(4).Infof("%s flush [%v, %v] size %d", m.name, d.startTime, d.stopTime, len(d.data.Bytes())) - logBuffer.flushFn(logBuffer, d.startTime, d.stopTime, d.data.Bytes()) + logBuffer.flushFn(logBuffer, d.startTime, d.stopTime, d.data.Bytes(), d.minOffset, d.maxOffset) d.releaseMemory() // local logbuffer is different from aggregate logbuffer here logBuffer.lastFlushDataTime = d.stopTime + + // CRITICAL: Track what's been flushed to disk for both offset-based and time-based reads + // CRITICAL FIX: Use >= 0 to include offset 0 (first message in a topic) + if d.maxOffset >= 0 { + logBuffer.lastFlushedOffset.Store(d.maxOffset) + } + if !d.stopTime.IsZero() { + logBuffer.lastFlushedTime.Store(d.stopTime.UnixNano()) + } + + // Signal completion if there's a callback channel + if d.done != nil { + close(d.done) + } } } logBuffer.isAllFlushed = true @@ -183,6 +484,7 @@ func (logBuffer *LogBuffer) loopInterval() { if logBuffer.IsStopping() { return } + logBuffer.Lock() toFlush := logBuffer.copyToFlush() logBuffer.Unlock() @@ -196,27 +498,48 @@ func (logBuffer *LogBuffer) loopInterval() { } func (logBuffer *LogBuffer) copyToFlush() *dataToFlush { + return logBuffer.copyToFlushInternal(false) +} + +func (logBuffer *LogBuffer) copyToFlushWithCallback() *dataToFlush { + return logBuffer.copyToFlushInternal(true) +} + +func (logBuffer *LogBuffer) copyToFlushInternal(withCallback bool) *dataToFlush { if logBuffer.pos > 0 { - // fmt.Printf("flush buffer %d pos %d empty space %d\n", len(m.buf), m.pos, len(m.buf)-m.pos) var d *dataToFlush if logBuffer.flushFn != nil { d = &dataToFlush{ startTime: logBuffer.startTime, stopTime: logBuffer.stopTime, data: copiedBytes(logBuffer.buf[:logBuffer.pos]), + minOffset: logBuffer.minOffset, + maxOffset: logBuffer.maxOffset, + } + // Add callback channel for synchronous ForceFlush + if withCallback { + d.done = make(chan struct{}) } // glog.V(4).Infof("%s flushing [0,%d) with %d entries [%v, %v]", m.name, m.pos, len(m.idx), m.startTime, m.stopTime) } else { // glog.V(4).Infof("%s removed from memory [0,%d) with %d entries [%v, %v]", m.name, m.pos, len(m.idx), m.startTime, m.stopTime) logBuffer.lastFlushDataTime = logBuffer.stopTime } - logBuffer.buf = logBuffer.prevBuffers.SealBuffer(logBuffer.startTime, logBuffer.stopTime, logBuffer.buf, logBuffer.pos, logBuffer.batchIndex) + // CRITICAL: logBuffer.offset is the "next offset to assign", so last offset in buffer is offset-1 + lastOffsetInBuffer := logBuffer.offset - 1 + logBuffer.buf = logBuffer.prevBuffers.SealBuffer(logBuffer.startTime, logBuffer.stopTime, logBuffer.buf, logBuffer.pos, logBuffer.bufferStartOffset, lastOffsetInBuffer) logBuffer.startTime = time.Unix(0, 0) logBuffer.stopTime = time.Unix(0, 0) logBuffer.pos = 0 logBuffer.idx = logBuffer.idx[:0] - logBuffer.batchIndex++ + // DON'T increment offset - it's already pointing to the next offset! + // logBuffer.offset++ // REMOVED - this was causing offset gaps! + logBuffer.bufferStartOffset = logBuffer.offset // Next buffer starts at current offset (which is already the next one) + // Reset offset tracking + logBuffer.hasOffsets = false + logBuffer.minOffset = 0 + logBuffer.maxOffset = 0 return d } return nil @@ -227,8 +550,8 @@ func (logBuffer *LogBuffer) GetEarliestTime() time.Time { } func (logBuffer *LogBuffer) GetEarliestPosition() MessagePosition { return MessagePosition{ - Time: logBuffer.startTime, - BatchIndex: logBuffer.batchIndex, + Time: logBuffer.startTime, + Offset: logBuffer.offset, } } @@ -241,6 +564,93 @@ func (logBuffer *LogBuffer) ReadFromBuffer(lastReadPosition MessagePosition) (bu logBuffer.RLock() defer logBuffer.RUnlock() + isOffsetBased := lastReadPosition.IsOffsetBased + + // CRITICAL FIX: For offset-based subscriptions, use offset comparisons, not time comparisons! + if isOffsetBased { + requestedOffset := lastReadPosition.Offset + + // DEBUG: Log buffer state for _schemas topic + if strings.Contains(logBuffer.name, "_schemas") { + glog.Infof("[SCHEMAS ReadFromBuffer] requested=%d bufferStart=%d bufferEnd=%d pos=%d lastFlushed=%d", + requestedOffset, logBuffer.bufferStartOffset, logBuffer.offset, logBuffer.pos, logBuffer.lastFlushedOffset.Load()) + } + + // Check if the requested offset is in the current buffer range + if requestedOffset >= logBuffer.bufferStartOffset && requestedOffset <= logBuffer.offset { + // If current buffer is empty (pos=0), check if data is on disk or not yet written + if logBuffer.pos == 0 { + // CRITICAL FIX: If buffer is empty but offset range covers the request, + // it means data was in memory and has been flushed/moved out. + // The bufferStartOffset advancing to cover this offset proves data existed. + // + // Three cases: + // 1. requestedOffset < logBuffer.offset: Data was here, now flushed + // 2. requestedOffset == logBuffer.offset && bufferStartOffset > 0: Buffer advanced, data flushed + // 3. requestedOffset == logBuffer.offset && bufferStartOffset == 0: Initial state - try disk first! + // + // Cases 1 & 2: try disk read + // Case 3: try disk read (historical data might exist) + if requestedOffset < logBuffer.offset { + // Data was in the buffer range but buffer is now empty = flushed to disk + if strings.Contains(logBuffer.name, "_schemas") { + glog.Infof("[SCHEMAS ReadFromBuffer] Returning ResumeFromDiskError: empty buffer, offset %d was flushed (bufferStart=%d, offset=%d)", + requestedOffset, logBuffer.bufferStartOffset, logBuffer.offset) + } + return nil, -2, ResumeFromDiskError + } + // requestedOffset == logBuffer.offset: Current position + // CRITICAL: For subscribers starting from offset 0, try disk read first + // (historical data might exist from previous runs) + if requestedOffset == 0 && logBuffer.bufferStartOffset == 0 && logBuffer.offset == 0 { + // Initial state: try disk read before waiting for new data + if strings.Contains(logBuffer.name, "_schemas") { + glog.Infof("[SCHEMAS ReadFromBuffer] Initial state, trying disk read for offset 0") + } + return nil, -2, ResumeFromDiskError + } + // Otherwise, wait for new data to arrive + if strings.Contains(logBuffer.name, "_schemas") { + glog.Infof("[SCHEMAS ReadFromBuffer] Returning nil: waiting for offset %d to arrive", requestedOffset) + } + return nil, logBuffer.offset, nil + } + if strings.Contains(logBuffer.name, "_schemas") { + glog.Infof("[SCHEMAS ReadFromBuffer] Returning %d bytes from buffer", logBuffer.pos) + } + return copiedBytes(logBuffer.buf[:logBuffer.pos]), logBuffer.offset, nil + } + + // Check previous buffers for the requested offset + for _, buf := range logBuffer.prevBuffers.buffers { + if requestedOffset >= buf.startOffset && requestedOffset <= buf.offset { + // If prevBuffer is empty, it means the data was flushed to disk + // (prevBuffers are created when buffer is flushed) + if buf.size == 0 { + // Empty prevBuffer covering this offset means data was flushed + return nil, -2, ResumeFromDiskError + } + return copiedBytes(buf.buf[:buf.size]), buf.offset, nil + } + } + + // Offset not found in any buffer + if requestedOffset < logBuffer.bufferStartOffset { + // Data not in current buffers - must be on disk (flushed or never existed) + // Return ResumeFromDiskError to trigger disk read + return nil, -2, ResumeFromDiskError + } + + if requestedOffset > logBuffer.offset { + // Future data, not available yet + return nil, logBuffer.offset, nil + } + + // Offset not found - return nil + return nil, logBuffer.offset, nil + } + + // TIMESTAMP-BASED READ (original logic) // Read from disk and memory // 1. read from disk, last time is = td // 2. in memory, the earliest time = tm @@ -254,52 +664,55 @@ func (logBuffer *LogBuffer) ReadFromBuffer(lastReadPosition MessagePosition) (bu var tsBatchIndex int64 if !logBuffer.startTime.IsZero() { tsMemory = logBuffer.startTime - tsBatchIndex = logBuffer.batchIndex + tsBatchIndex = logBuffer.offset } for _, prevBuf := range logBuffer.prevBuffers.buffers { if !prevBuf.startTime.IsZero() && prevBuf.startTime.Before(tsMemory) { tsMemory = prevBuf.startTime - tsBatchIndex = prevBuf.batchIndex + tsBatchIndex = prevBuf.offset } } if tsMemory.IsZero() { // case 2.2 - // println("2.2 no data") return nil, -2, nil - } else if lastReadPosition.Before(tsMemory) && lastReadPosition.BatchIndex+1 < tsBatchIndex { // case 2.3 - if !logBuffer.lastFlushDataTime.IsZero() { - glog.V(0).Infof("resume with last flush time: %v", logBuffer.lastFlushDataTime) + } else if lastReadPosition.Time.Before(tsMemory) && lastReadPosition.Offset+1 < tsBatchIndex { // case 2.3 + // Special case: If requested time is zero (Unix epoch), treat as "start from beginning" + // This handles queries that want to read all data without knowing the exact start time + if lastReadPosition.Time.IsZero() || lastReadPosition.Time.Unix() == 0 { + // Start from the beginning of memory + // Fall through to case 2.1 to read from earliest buffer + } else { + // Data not in memory buffers - read from disk + glog.V(0).Infof("resume from disk: requested time %v < earliest memory time %v", + lastReadPosition.Time, tsMemory) return nil, -2, ResumeFromDiskError } } // the following is case 2.1 - if lastReadPosition.Equal(logBuffer.stopTime) { - return nil, logBuffer.batchIndex, nil + if lastReadPosition.Time.Equal(logBuffer.stopTime) { + return nil, logBuffer.offset, nil } - if lastReadPosition.After(logBuffer.stopTime) { + if lastReadPosition.Time.After(logBuffer.stopTime) { // glog.Fatalf("unexpected last read time %v, older than latest %v", lastReadPosition, m.stopTime) - return nil, logBuffer.batchIndex, nil + return nil, logBuffer.offset, nil } - if lastReadPosition.Before(logBuffer.startTime) { - // println("checking ", lastReadPosition.UnixNano()) + if lastReadPosition.Time.Before(logBuffer.startTime) { for _, buf := range logBuffer.prevBuffers.buffers { if buf.startTime.After(lastReadPosition.Time) { // glog.V(4).Infof("%s return the %d sealed buffer %v", m.name, i, buf.startTime) - // println("return the", i, "th in memory", buf.startTime.UnixNano()) - return copiedBytes(buf.buf[:buf.size]), buf.batchIndex, nil + return copiedBytes(buf.buf[:buf.size]), buf.offset, nil } if !buf.startTime.After(lastReadPosition.Time) && buf.stopTime.After(lastReadPosition.Time) { pos := buf.locateByTs(lastReadPosition.Time) - // fmt.Printf("locate buffer[%d] pos %d\n", i, pos) - return copiedBytes(buf.buf[pos:buf.size]), buf.batchIndex, nil + return copiedBytes(buf.buf[pos:buf.size]), buf.offset, nil } } // glog.V(4).Infof("%s return the current buf %v", m.name, lastReadPosition) - return copiedBytes(logBuffer.buf[:logBuffer.pos]), logBuffer.batchIndex, nil + return copiedBytes(logBuffer.buf[:logBuffer.pos]), logBuffer.offset, nil } - lastTs := lastReadPosition.UnixNano() + lastTs := lastReadPosition.Time.UnixNano() l, h := 0, len(logBuffer.idx)-1 /* @@ -311,9 +724,7 @@ func (logBuffer *LogBuffer) ReadFromBuffer(lastReadPosition MessagePosition) (bu if entry == nil { entry = event.EventNotification.NewEntry } - fmt.Printf("entry %d ts: %v offset:%d dir:%s name:%s\n", i, time.Unix(0, ts), pos, event.Directory, entry.Name) } - fmt.Printf("l=%d, h=%d\n", l, h) */ for l <= h { @@ -328,16 +739,14 @@ func (logBuffer *LogBuffer) ReadFromBuffer(lastReadPosition MessagePosition) (bu _, prevT = readTs(logBuffer.buf, logBuffer.idx[mid-1]) } if prevT <= lastTs { - // fmt.Printf("found l=%d, m-1=%d(ts=%d), m=%d(ts=%d), h=%d [%d, %d) \n", l, mid-1, prevT, mid, t, h, pos, m.pos) - return copiedBytes(logBuffer.buf[pos:logBuffer.pos]), logBuffer.batchIndex, nil + return copiedBytes(logBuffer.buf[pos:logBuffer.pos]), logBuffer.offset, nil } h = mid } - // fmt.Printf("l=%d, h=%d\n", l, h) } - // FIXME: this could be that the buffer has been flushed already - println("Not sure why no data", lastReadPosition.BatchIndex, tsBatchIndex) + // Binary search didn't find the timestamp - data may have been flushed to disk already + // Returning -2 signals to caller that data is not available in memory return nil, -2, nil } @@ -352,11 +761,11 @@ func (logBuffer *LogBuffer) GetName() string { return logBuffer.name } -// GetBatchIndex returns the current batch index for metadata tracking -func (logBuffer *LogBuffer) GetBatchIndex() int64 { +// GetOffset returns the current offset for metadata tracking +func (logBuffer *LogBuffer) GetOffset() int64 { logBuffer.RLock() defer logBuffer.RUnlock() - return logBuffer.batchIndex + return logBuffer.offset } var bufferPool = sync.Pool{ diff --git a/weed/util/log_buffer/log_buffer_queryability_test.go b/weed/util/log_buffer/log_buffer_queryability_test.go new file mode 100644 index 000000000..6e372d2b3 --- /dev/null +++ b/weed/util/log_buffer/log_buffer_queryability_test.go @@ -0,0 +1,238 @@ +package log_buffer + +import ( + "bytes" + "testing" + "time" + + "github.com/seaweedfs/seaweedfs/weed/pb/filer_pb" + "github.com/seaweedfs/seaweedfs/weed/util" + "google.golang.org/protobuf/proto" +) + +// TestBufferQueryability tests that data written to the buffer can be immediately queried +func TestBufferQueryability(t *testing.T) { + // Create a log buffer with a long flush interval to prevent premature flushing + logBuffer := NewLogBuffer("test-buffer", 10*time.Minute, + func(logBuffer *LogBuffer, startTime, stopTime time.Time, buf []byte, minOffset, maxOffset int64) { + // Mock flush function - do nothing to keep data in memory + }, + func(startPosition MessagePosition, stopTsNs int64, eachLogEntryFn EachLogEntryFuncType) (MessagePosition, bool, error) { + // Mock read from disk function + return startPosition, false, nil + }, + func() { + // Mock notify function + }) + + // Test data similar to schema registry messages + testKey := []byte(`{"keytype":"SCHEMA","subject":"test-topic-value","version":1,"magic":1}`) + testValue := []byte(`{"subject":"test-topic-value","version":1,"id":1,"schemaType":"AVRO","schema":"\"string\"","deleted":false}`) + + // Create a LogEntry with offset (simulating the schema registry scenario) + logEntry := &filer_pb.LogEntry{ + TsNs: time.Now().UnixNano(), + PartitionKeyHash: 12345, + Data: testValue, + Key: testKey, + Offset: 1, + } + + // Add the entry to the buffer + logBuffer.AddLogEntryToBuffer(logEntry) + + // Verify the buffer has data + if logBuffer.pos == 0 { + t.Fatal("Buffer should have data after adding entry") + } + + // Test immediate queryability - read from buffer starting from beginning + startPosition := NewMessagePosition(0, 0) // Start from beginning + bufferCopy, batchIndex, err := logBuffer.ReadFromBuffer(startPosition) + + if err != nil { + t.Fatalf("ReadFromBuffer failed: %v", err) + } + + if bufferCopy == nil { + t.Fatal("ReadFromBuffer returned nil buffer - data should be queryable immediately") + } + + if batchIndex != 1 { + t.Errorf("Expected batchIndex=1, got %d", batchIndex) + } + + // Verify we can read the data back + buf := bufferCopy.Bytes() + if len(buf) == 0 { + t.Fatal("Buffer copy is empty") + } + + // Parse the first entry from the buffer + if len(buf) < 4 { + t.Fatal("Buffer too small to contain entry size") + } + + size := util.BytesToUint32(buf[0:4]) + if len(buf) < 4+int(size) { + t.Fatalf("Buffer too small to contain entry data: need %d, have %d", 4+int(size), len(buf)) + } + + entryData := buf[4 : 4+int(size)] + + // Unmarshal and verify the entry + retrievedEntry := &filer_pb.LogEntry{} + if err := proto.Unmarshal(entryData, retrievedEntry); err != nil { + t.Fatalf("Failed to unmarshal retrieved entry: %v", err) + } + + // Verify the data matches + if !bytes.Equal(retrievedEntry.Key, testKey) { + t.Errorf("Key mismatch: expected %s, got %s", string(testKey), string(retrievedEntry.Key)) + } + + if !bytes.Equal(retrievedEntry.Data, testValue) { + t.Errorf("Value mismatch: expected %s, got %s", string(testValue), string(retrievedEntry.Data)) + } + + if retrievedEntry.Offset != 1 { + t.Errorf("Offset mismatch: expected 1, got %d", retrievedEntry.Offset) + } + + t.Logf("Buffer queryability test passed - data is immediately readable") +} + +// TestMultipleEntriesQueryability tests querying multiple entries from buffer +func TestMultipleEntriesQueryability(t *testing.T) { + logBuffer := NewLogBuffer("test-multi-buffer", 10*time.Minute, + func(logBuffer *LogBuffer, startTime, stopTime time.Time, buf []byte, minOffset, maxOffset int64) { + // Mock flush function + }, + func(startPosition MessagePosition, stopTsNs int64, eachLogEntryFn EachLogEntryFuncType) (MessagePosition, bool, error) { + return startPosition, false, nil + }, + func() {}) + + // Add multiple entries + for i := 1; i <= 3; i++ { + logEntry := &filer_pb.LogEntry{ + TsNs: time.Now().UnixNano() + int64(i*1000), // Ensure different timestamps + PartitionKeyHash: int32(i), + Data: []byte("test-data-" + string(rune('0'+i))), + Key: []byte("test-key-" + string(rune('0'+i))), + Offset: int64(i), + } + logBuffer.AddLogEntryToBuffer(logEntry) + } + + // Read all entries + startPosition := NewMessagePosition(0, 0) + bufferCopy, batchIndex, err := logBuffer.ReadFromBuffer(startPosition) + + if err != nil { + t.Fatalf("ReadFromBuffer failed: %v", err) + } + + if bufferCopy == nil { + t.Fatal("ReadFromBuffer returned nil buffer") + } + + if batchIndex != 3 { + t.Errorf("Expected batchIndex=3, got %d", batchIndex) + } + + // Count entries in buffer + buf := bufferCopy.Bytes() + entryCount := 0 + pos := 0 + + for pos+4 < len(buf) { + size := util.BytesToUint32(buf[pos : pos+4]) + if pos+4+int(size) > len(buf) { + break + } + + entryData := buf[pos+4 : pos+4+int(size)] + entry := &filer_pb.LogEntry{} + if err := proto.Unmarshal(entryData, entry); err != nil { + t.Fatalf("Failed to unmarshal entry %d: %v", entryCount+1, err) + } + + entryCount++ + pos += 4 + int(size) + + t.Logf("Entry %d: Key=%s, Data=%s, Offset=%d", entryCount, string(entry.Key), string(entry.Data), entry.Offset) + } + + if entryCount != 3 { + t.Errorf("Expected 3 entries, found %d", entryCount) + } + + t.Logf("Multiple entries queryability test passed - found %d entries", entryCount) +} + +// TestSchemaRegistryScenario tests the specific scenario that was failing +func TestSchemaRegistryScenario(t *testing.T) { + logBuffer := NewLogBuffer("_schemas", 10*time.Minute, + func(logBuffer *LogBuffer, startTime, stopTime time.Time, buf []byte, minOffset, maxOffset int64) { + // Mock flush function - simulate what happens in real scenario + t.Logf("FLUSH: startTime=%v, stopTime=%v, bufSize=%d, minOffset=%d, maxOffset=%d", + startTime, stopTime, len(buf), minOffset, maxOffset) + }, + func(startPosition MessagePosition, stopTsNs int64, eachLogEntryFn EachLogEntryFuncType) (MessagePosition, bool, error) { + return startPosition, false, nil + }, + func() {}) + + // Simulate schema registry message + schemaKey := []byte(`{"keytype":"SCHEMA","subject":"test-schema-value","version":1,"magic":1}`) + schemaValue := []byte(`{"subject":"test-schema-value","version":1,"id":12,"schemaType":"AVRO","schema":"\"string\"","deleted":false}`) + + logEntry := &filer_pb.LogEntry{ + TsNs: time.Now().UnixNano(), + PartitionKeyHash: 12345, + Data: schemaValue, + Key: schemaKey, + Offset: 0, // First message + } + + // Add to buffer + logBuffer.AddLogEntryToBuffer(logEntry) + + // Simulate the SQL query scenario - read from offset 0 + startPosition := NewMessagePosition(0, 0) + bufferCopy, _, err := logBuffer.ReadFromBuffer(startPosition) + + if err != nil { + t.Fatalf("Schema registry scenario failed: %v", err) + } + + if bufferCopy == nil { + t.Fatal("Schema registry scenario: ReadFromBuffer returned nil - this is the bug!") + } + + // Verify schema data is readable + buf := bufferCopy.Bytes() + if len(buf) < 4 { + t.Fatal("Buffer too small") + } + + size := util.BytesToUint32(buf[0:4]) + entryData := buf[4 : 4+int(size)] + + retrievedEntry := &filer_pb.LogEntry{} + if err := proto.Unmarshal(entryData, retrievedEntry); err != nil { + t.Fatalf("Failed to unmarshal schema entry: %v", err) + } + + // Verify schema value is preserved + if !bytes.Equal(retrievedEntry.Data, schemaValue) { + t.Errorf("Schema value lost! Expected: %s, Got: %s", string(schemaValue), string(retrievedEntry.Data)) + } + + if len(retrievedEntry.Data) != len(schemaValue) { + t.Errorf("Schema value length mismatch! Expected: %d, Got: %d", len(schemaValue), len(retrievedEntry.Data)) + } + + t.Logf("Schema registry scenario test passed - schema value preserved: %d bytes", len(retrievedEntry.Data)) +} diff --git a/weed/util/log_buffer/log_buffer_test.go b/weed/util/log_buffer/log_buffer_test.go index a4947a611..7b851de06 100644 --- a/weed/util/log_buffer/log_buffer_test.go +++ b/weed/util/log_buffer/log_buffer_test.go @@ -3,18 +3,19 @@ package log_buffer import ( "crypto/rand" "fmt" - "github.com/seaweedfs/seaweedfs/weed/pb/mq_pb" "io" "sync" "testing" "time" + "github.com/seaweedfs/seaweedfs/weed/pb/mq_pb" + "github.com/seaweedfs/seaweedfs/weed/pb/filer_pb" ) func TestNewLogBufferFirstBuffer(t *testing.T) { flushInterval := time.Second - lb := NewLogBuffer("test", flushInterval, func(logBuffer *LogBuffer, startTime time.Time, stopTime time.Time, buf []byte) { + lb := NewLogBuffer("test", flushInterval, func(logBuffer *LogBuffer, startTime time.Time, stopTime time.Time, buf []byte, minOffset, maxOffset int64) { fmt.Printf("flush from %v to %v %d bytes\n", startTime, stopTime, len(buf)) }, nil, func() { }) @@ -63,3 +64,483 @@ func TestNewLogBufferFirstBuffer(t *testing.T) { t.Errorf("expect %d messages, but got %d", messageCount, receivedMessageCount) } } + +// TestReadFromBuffer_OldOffsetReturnsResumeFromDiskError tests that requesting an old offset +// that has been flushed to disk properly returns ResumeFromDiskError instead of hanging forever. +// This reproduces the bug where Schema Registry couldn't read the _schemas topic. +func TestReadFromBuffer_OldOffsetReturnsResumeFromDiskError(t *testing.T) { + tests := []struct { + name string + bufferStartOffset int64 + currentOffset int64 + requestedOffset int64 + hasData bool + expectError error + description string + }{ + { + name: "Request offset 0 when buffer starts at 4 (Schema Registry bug scenario)", + bufferStartOffset: 4, + currentOffset: 10, + requestedOffset: 0, + hasData: true, + expectError: ResumeFromDiskError, + description: "When Schema Registry tries to read from offset 0, but data has been flushed to disk", + }, + { + name: "Request offset before buffer start with empty buffer", + bufferStartOffset: 10, + currentOffset: 10, + requestedOffset: 5, + hasData: false, + expectError: ResumeFromDiskError, + description: "Old offset with no data in memory should trigger disk read", + }, + { + name: "Request offset before buffer start with data", + bufferStartOffset: 100, + currentOffset: 150, + requestedOffset: 50, + hasData: true, + expectError: ResumeFromDiskError, + description: "Old offset with current data in memory should still trigger disk read", + }, + { + name: "Request current offset (no disk read needed)", + bufferStartOffset: 4, + currentOffset: 10, + requestedOffset: 10, + hasData: true, + expectError: nil, + description: "Current offset should return data from memory without error", + }, + { + name: "Request offset within buffer range", + bufferStartOffset: 4, + currentOffset: 10, + requestedOffset: 7, + hasData: true, + expectError: nil, + description: "Offset within buffer range should return data without error", + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + // Create a LogBuffer with minimal configuration + lb := NewLogBuffer("test", time.Hour, nil, nil, func() {}) + + // Simulate data that has been flushed to disk by setting bufferStartOffset + lb.bufferStartOffset = tt.bufferStartOffset + lb.offset = tt.currentOffset + + // CRITICAL: Mark this as an offset-based buffer + lb.hasOffsets = true + + // Add some data to the buffer if needed (at current offset position) + if tt.hasData { + testData := []byte("test message") + // Use AddLogEntryToBuffer to preserve offset information + lb.AddLogEntryToBuffer(&filer_pb.LogEntry{ + TsNs: time.Now().UnixNano(), + Key: []byte("key"), + Data: testData, + Offset: tt.currentOffset, // Add data at current offset + }) + } + + // Create an offset-based position for the requested offset + requestPosition := NewMessagePositionFromOffset(tt.requestedOffset) + + // Try to read from the buffer + buf, batchIdx, err := lb.ReadFromBuffer(requestPosition) + + // Verify the error matches expectations + if tt.expectError != nil { + if err != tt.expectError { + t.Errorf("%s\nExpected error: %v\nGot error: %v\nbuf=%v, batchIdx=%d", + tt.description, tt.expectError, err, buf != nil, batchIdx) + } else { + t.Logf("✓ %s: correctly returned %v", tt.description, err) + } + } else { + if err != nil { + t.Errorf("%s\nExpected no error but got: %v\nbuf=%v, batchIdx=%d", + tt.description, err, buf != nil, batchIdx) + } else { + t.Logf("✓ %s: correctly returned data without error", tt.description) + } + } + }) + } +} + +// TestReadFromBuffer_OldOffsetWithNoPrevBuffers specifically tests the bug fix +// where requesting an old offset would return nil instead of ResumeFromDiskError +func TestReadFromBuffer_OldOffsetWithNoPrevBuffers(t *testing.T) { + // This is the exact scenario that caused the Schema Registry to hang: + // 1. Data was published to _schemas topic (offsets 0, 1, 2, 3) + // 2. Data was flushed to disk + // 3. LogBuffer's bufferStartOffset was updated to 4 + // 4. Schema Registry tried to read from offset 0 + // 5. ReadFromBuffer would return (nil, offset, nil) instead of ResumeFromDiskError + // 6. The subscriber would wait forever for data that would never come from memory + + lb := NewLogBuffer("_schemas", time.Hour, nil, nil, func() {}) + + // Simulate the state after data has been flushed to disk: + // - bufferStartOffset = 10 (data 0-9 has been flushed) + // - offset = 15 (next offset to assign, current buffer has 10-14) + // - pos = 100 (some data in current buffer) + // Set prevBuffers to have non-overlapping ranges to avoid the safety check at line 420-428 + lb.bufferStartOffset = 10 + lb.offset = 15 + lb.pos = 100 + + // Modify prevBuffers to have non-zero offset ranges that DON'T include the requested offset + // This bypasses the safety check and exposes the real bug + for i := range lb.prevBuffers.buffers { + lb.prevBuffers.buffers[i].startOffset = 20 + int64(i)*10 // 20, 30, 40, etc. + lb.prevBuffers.buffers[i].offset = 25 + int64(i)*10 // 25, 35, 45, etc. + lb.prevBuffers.buffers[i].size = 0 // Empty (flushed) + } + + // Schema Registry requests offset 5 (which is before bufferStartOffset=10) + requestPosition := NewMessagePositionFromOffset(5) + + // Before the fix, this would return (nil, offset, nil) causing an infinite wait + // After the fix, this should return ResumeFromDiskError + buf, batchIdx, err := lb.ReadFromBuffer(requestPosition) + + t.Logf("DEBUG: ReadFromBuffer returned: buf=%v, batchIdx=%d, err=%v", buf != nil, batchIdx, err) + t.Logf("DEBUG: Buffer state: bufferStartOffset=%d, offset=%d, pos=%d", + lb.bufferStartOffset, lb.offset, lb.pos) + t.Logf("DEBUG: Requested offset 5, prevBuffers[0] range: [%d-%d]", + lb.prevBuffers.buffers[0].startOffset, lb.prevBuffers.buffers[0].offset) + + if err != ResumeFromDiskError { + t.Errorf("CRITICAL BUG REPRODUCED: Expected ResumeFromDiskError but got err=%v, buf=%v, batchIdx=%d\n"+ + "This causes Schema Registry to hang indefinitely waiting for data that's on disk!", + err, buf != nil, batchIdx) + t.Errorf("The buggy code falls through without returning ResumeFromDiskError!") + } else { + t.Logf("✓ BUG FIX VERIFIED: Correctly returns ResumeFromDiskError when requesting old offset 5") + t.Logf(" This allows the subscriber to read from disk instead of waiting forever") + } +} + +// TestReadFromBuffer_EmptyBufferAtCurrentOffset tests Bug #2 +// where an empty buffer at the current offset would return empty data instead of ResumeFromDiskError +func TestReadFromBuffer_EmptyBufferAtCurrentOffset(t *testing.T) { + lb := NewLogBuffer("_schemas", time.Hour, nil, nil, func() {}) + + // Simulate buffer state where data 0-3 was published and flushed, but buffer NOT advanced yet: + // - bufferStartOffset = 0 (buffer hasn't been advanced after flush) + // - offset = 4 (next offset to assign - data 0-3 exists) + // - pos = 0 (buffer is empty after flush) + // This happens in the window between flush and buffer advancement + lb.bufferStartOffset = 0 + lb.offset = 4 + lb.pos = 0 + + // Schema Registry requests offset 0 (which appears to be in range [0, 4]) + requestPosition := NewMessagePositionFromOffset(0) + + // BUG: Without fix, this returns empty buffer instead of checking disk + // FIX: Should return ResumeFromDiskError because buffer is empty (pos=0) despite valid range + buf, batchIdx, err := lb.ReadFromBuffer(requestPosition) + + t.Logf("DEBUG: ReadFromBuffer returned: buf=%v, batchIdx=%d, err=%v", buf != nil, batchIdx, err) + t.Logf("DEBUG: Buffer state: bufferStartOffset=%d, offset=%d, pos=%d", + lb.bufferStartOffset, lb.offset, lb.pos) + + if err != ResumeFromDiskError { + if buf == nil || len(buf.Bytes()) == 0 { + t.Errorf("CRITICAL BUG #2 REPRODUCED: Empty buffer should return ResumeFromDiskError, got err=%v, buf=%v\n"+ + "Without the fix, Schema Registry gets empty data instead of reading from disk!", + err, buf != nil) + } + } else { + t.Logf("✓ BUG #2 FIX VERIFIED: Empty buffer correctly returns ResumeFromDiskError to check disk") + } +} + +// TestReadFromBuffer_OffsetRanges tests various offset range scenarios +func TestReadFromBuffer_OffsetRanges(t *testing.T) { + lb := NewLogBuffer("test", time.Hour, nil, nil, func() {}) + + // Setup: buffer contains offsets 10-20 + lb.bufferStartOffset = 10 + lb.offset = 20 + lb.pos = 100 // some data in buffer + + testCases := []struct { + name string + requestedOffset int64 + expectedError error + description string + }{ + { + name: "Before buffer start", + requestedOffset: 5, + expectedError: ResumeFromDiskError, + description: "Offset 5 < bufferStartOffset 10 → read from disk", + }, + { + name: "At buffer start", + requestedOffset: 10, + expectedError: nil, + description: "Offset 10 == bufferStartOffset 10 → read from buffer", + }, + { + name: "Within buffer range", + requestedOffset: 15, + expectedError: nil, + description: "Offset 15 is within [10, 20] → read from buffer", + }, + { + name: "At buffer end", + requestedOffset: 20, + expectedError: nil, + description: "Offset 20 == offset 20 → read from buffer", + }, + { + name: "After buffer end", + requestedOffset: 25, + expectedError: nil, + description: "Offset 25 > offset 20 → future data, return nil without error", + }, + } + + for _, tc := range testCases { + t.Run(tc.name, func(t *testing.T) { + requestPosition := NewMessagePositionFromOffset(tc.requestedOffset) + _, _, err := lb.ReadFromBuffer(requestPosition) + + if tc.expectedError != nil { + if err != tc.expectedError { + t.Errorf("%s\nExpected error: %v, got: %v", tc.description, tc.expectedError, err) + } else { + t.Logf("✓ %s", tc.description) + } + } else { + // For nil expectedError, we accept either nil or no error condition + // (future offsets return nil without error) + if err != nil && err != ResumeFromDiskError { + t.Errorf("%s\nExpected no ResumeFromDiskError, got: %v", tc.description, err) + } else { + t.Logf("✓ %s", tc.description) + } + } + }) + } +} + +// TestReadFromBuffer_InitializedFromDisk tests Bug #3 +// where bufferStartOffset was incorrectly set to 0 after InitializeOffsetFromExistingData, +// causing reads for old offsets to return new data instead of triggering a disk read. +func TestReadFromBuffer_InitializedFromDisk(t *testing.T) { + // This reproduces the real Schema Registry bug scenario: + // 1. Broker restarts, finds 4 messages on disk (offsets 0-3) + // 2. InitializeOffsetFromExistingData sets offset=4 + // - BUG: bufferStartOffset=0 (wrong!) + // - FIX: bufferStartOffset=4 (correct!) + // 3. First new message is written (offset 4) + // 4. Schema Registry reads offset 0 + // 5. With FIX: requestedOffset=0 < bufferStartOffset=4 → ResumeFromDiskError (correct!) + // 6. Without FIX: requestedOffset=0 in range [0, 5] → returns wrong data (bug!) + + lb := NewLogBuffer("_schemas", time.Hour, nil, nil, func() {}) + + // Use the actual InitializeOffsetFromExistingData to test the fix + err := lb.InitializeOffsetFromExistingData(func() (int64, error) { + return 3, nil // Simulate 4 messages on disk (offsets 0-3, highest=3) + }) + if err != nil { + t.Fatalf("InitializeOffsetFromExistingData failed: %v", err) + } + + t.Logf("After InitializeOffsetFromExistingData(highestOffset=3):") + t.Logf(" offset=%d (should be 4), bufferStartOffset=%d (FIX: should be 4, not 0)", + lb.offset, lb.bufferStartOffset) + + // Now write a new message at offset 4 + lb.AddToBuffer(&mq_pb.DataMessage{ + Key: []byte("new-key"), + Value: []byte("new-message-at-offset-4"), + TsNs: time.Now().UnixNano(), + }) + // After AddToBuffer: offset=5, pos>0 + + // Schema Registry tries to read offset 0 (should be on disk) + requestPosition := NewMessagePositionFromOffset(0) + + buf, batchIdx, err := lb.ReadFromBuffer(requestPosition) + + t.Logf("After writing new message:") + t.Logf(" bufferStartOffset=%d, offset=%d, pos=%d", lb.bufferStartOffset, lb.offset, lb.pos) + t.Logf(" Requested offset 0, got: buf=%v, batchIdx=%d, err=%v", buf != nil, batchIdx, err) + + // EXPECTED BEHAVIOR (with fix): + // bufferStartOffset=4 after initialization, so requestedOffset=0 < bufferStartOffset=4 + // → returns ResumeFromDiskError + + // BUGGY BEHAVIOR (without fix): + // bufferStartOffset=0 after initialization, so requestedOffset=0 is in range [0, 5] + // → returns the NEW message (offset 4) instead of reading from disk! + + if err != ResumeFromDiskError { + t.Errorf("CRITICAL BUG #3 REPRODUCED: Reading offset 0 after initialization from disk should return ResumeFromDiskError\n"+ + "Instead got: err=%v, buf=%v, batchIdx=%d\n"+ + "This means Schema Registry would receive WRONG data (offset 4) when requesting offset 0!", + err, buf != nil, batchIdx) + t.Errorf("Root cause: bufferStartOffset=%d should be 4 after InitializeOffsetFromExistingData(highestOffset=3)", + lb.bufferStartOffset) + } else { + t.Logf("✓ BUG #3 FIX VERIFIED: Reading old offset 0 correctly returns ResumeFromDiskError") + t.Logf(" This ensures Schema Registry reads correct data from disk instead of getting new messages") + } +} + +// TestLoopProcessLogDataWithOffset_DiskReadRetry tests that when a subscriber +// reads from disk before flush completes, it continues to retry disk reads +// and eventually finds the data after flush completes. +// This reproduces the Schema Registry timeout issue on first start. +func TestLoopProcessLogDataWithOffset_DiskReadRetry(t *testing.T) { + diskReadCallCount := 0 + diskReadMu := sync.Mutex{} + dataFlushedToDisk := false + var flushedData []*filer_pb.LogEntry + + // Create a readFromDiskFn that simulates the race condition + readFromDiskFn := func(startPosition MessagePosition, stopTsNs int64, eachLogEntryFn EachLogEntryFuncType) (MessagePosition, bool, error) { + diskReadMu.Lock() + diskReadCallCount++ + callNum := diskReadCallCount + hasData := dataFlushedToDisk + diskReadMu.Unlock() + + t.Logf("DISK READ #%d: startOffset=%d, dataFlushedToDisk=%v", callNum, startPosition.Offset, hasData) + + if !hasData { + // Simulate: data not yet on disk (flush hasn't completed) + t.Logf(" → No data found (flush not completed yet)") + return startPosition, false, nil + } + + // Data is now on disk, process it + t.Logf(" → Found %d entries on disk", len(flushedData)) + for _, entry := range flushedData { + if entry.Offset >= startPosition.Offset { + isDone, err := eachLogEntryFn(entry) + if err != nil || isDone { + return NewMessagePositionFromOffset(entry.Offset + 1), isDone, err + } + } + } + return NewMessagePositionFromOffset(int64(len(flushedData))), false, nil + } + + flushFn := func(logBuffer *LogBuffer, startTime, stopTime time.Time, buf []byte, minOffset, maxOffset int64) { + t.Logf("FLUSH: minOffset=%d maxOffset=%d size=%d bytes", minOffset, maxOffset, len(buf)) + // Simulate writing to disk + diskReadMu.Lock() + dataFlushedToDisk = true + // Parse the buffer and add entries to flushedData + // For this test, we'll just create mock entries + flushedData = append(flushedData, &filer_pb.LogEntry{ + Key: []byte("key-0"), + Data: []byte("message-0"), + TsNs: time.Now().UnixNano(), + Offset: 0, + }) + diskReadMu.Unlock() + } + + logBuffer := NewLogBuffer("test", 1*time.Minute, flushFn, readFromDiskFn, nil) + defer logBuffer.ShutdownLogBuffer() + + // Simulate the race condition: + // 1. Subscriber starts reading from offset 0 + // 2. Data is not yet flushed + // 3. Loop calls readFromDiskFn → no data found + // 4. A bit later, data gets flushed + // 5. Loop should continue and call readFromDiskFn again + + receivedMessages := 0 + mu := sync.Mutex{} + maxIterations := 50 // Allow up to 50 iterations (500ms with 10ms sleep each) + iterationCount := 0 + + waitForDataFn := func() bool { + mu.Lock() + defer mu.Unlock() + iterationCount++ + // Stop after receiving message or max iterations + return receivedMessages == 0 && iterationCount < maxIterations + } + + eachLogEntryFn := func(logEntry *filer_pb.LogEntry, offset int64) (bool, error) { + mu.Lock() + receivedMessages++ + mu.Unlock() + t.Logf("✉️ RECEIVED: offset=%d key=%s", offset, string(logEntry.Key)) + return true, nil // Stop after first message + } + + // Start the reader in a goroutine + var readerWg sync.WaitGroup + readerWg.Add(1) + go func() { + defer readerWg.Done() + startPosition := NewMessagePositionFromOffset(0) + _, isDone, err := logBuffer.LoopProcessLogDataWithOffset("test-subscriber", startPosition, 0, waitForDataFn, eachLogEntryFn) + t.Logf("📋 Reader finished: isDone=%v, err=%v", isDone, err) + }() + + // Wait a bit to let the first disk read happen (returns no data) + time.Sleep(50 * time.Millisecond) + + // Now add data and flush it + t.Logf("➕ Adding message to buffer...") + logBuffer.AddToBuffer(&mq_pb.DataMessage{ + Key: []byte("key-0"), + Value: []byte("message-0"), + TsNs: time.Now().UnixNano(), + }) + + // Force flush + t.Logf("Force flushing...") + logBuffer.ForceFlush() + + // Wait for reader to finish + readerWg.Wait() + + // Check results + diskReadMu.Lock() + finalDiskReadCount := diskReadCallCount + diskReadMu.Unlock() + + mu.Lock() + finalReceivedMessages := receivedMessages + finalIterations := iterationCount + mu.Unlock() + + t.Logf("\nRESULTS:") + t.Logf(" Disk reads: %d", finalDiskReadCount) + t.Logf(" Received messages: %d", finalReceivedMessages) + t.Logf(" Loop iterations: %d", finalIterations) + + if finalDiskReadCount < 2 { + t.Errorf("CRITICAL BUG REPRODUCED: Disk read was only called %d time(s)", finalDiskReadCount) + t.Errorf("Expected: Multiple disk reads as the loop continues after flush completes") + t.Errorf("This is why Schema Registry times out - it reads once before flush, never re-reads after flush") + } + + if finalReceivedMessages == 0 { + t.Errorf("SCHEMA REGISTRY TIMEOUT REPRODUCED: No messages received even after flush") + t.Errorf("The subscriber is stuck because disk reads are not retried") + } else { + t.Logf("✓ SUCCESS: Message received after %d disk read attempts", finalDiskReadCount) + } +} diff --git a/weed/util/log_buffer/log_read.go b/weed/util/log_buffer/log_read.go index 0ebcc7cc9..77f03ddb8 100644 --- a/weed/util/log_buffer/log_read.go +++ b/weed/util/log_buffer/log_read.go @@ -18,19 +18,43 @@ var ( ) type MessagePosition struct { - time.Time // this is the timestamp of the message - BatchIndex int64 // this is only used when the timestamp is not enough to identify the next message, when the timestamp is in the previous batch. + Time time.Time // timestamp of the message + Offset int64 // Kafka offset for offset-based positioning, or batch index for timestamp-based + IsOffsetBased bool // true if this position is offset-based, false if timestamp-based } -func NewMessagePosition(tsNs int64, batchIndex int64) MessagePosition { +func NewMessagePosition(tsNs int64, offset int64) MessagePosition { return MessagePosition{ - Time: time.Unix(0, tsNs).UTC(), - BatchIndex: batchIndex, + Time: time.Unix(0, tsNs).UTC(), + Offset: offset, + IsOffsetBased: false, // timestamp-based by default } } +// NewMessagePositionFromOffset creates a MessagePosition that represents a specific offset +func NewMessagePositionFromOffset(offset int64) MessagePosition { + return MessagePosition{ + Time: time.Time{}, // Zero time for offset-based positions + Offset: offset, + IsOffsetBased: true, + } +} + +// GetOffset extracts the offset from an offset-based MessagePosition +func (mp MessagePosition) GetOffset() int64 { + if !mp.IsOffsetBased { + return -1 // Not an offset-based position + } + return mp.Offset // Offset is stored directly +} + func (logBuffer *LogBuffer) LoopProcessLogData(readerName string, startPosition MessagePosition, stopTsNs int64, waitForDataFn func() bool, eachLogDataFn EachLogEntryFuncType) (lastReadPosition MessagePosition, isDone bool, err error) { + + // Register for instant notifications (<1ms latency) + notifyChan := logBuffer.RegisterSubscriber(readerName) + defer logBuffer.UnregisterSubscriber(readerName) + // loop through all messages var bytesBuf *bytes.Buffer var batchIndex int64 @@ -57,10 +81,10 @@ func (logBuffer *LogBuffer) LoopProcessLogData(readerName string, startPosition if bytesBuf != nil { readSize = bytesBuf.Len() } - glog.V(4).Infof("%s ReadFromBuffer at %v batch %d. Read bytes %v batch %d", readerName, lastReadPosition, lastReadPosition.BatchIndex, readSize, batchIndex) + glog.V(4).Infof("%s ReadFromBuffer at %v offset %d. Read bytes %v batchIndex %d", readerName, lastReadPosition, lastReadPosition.Offset, readSize, batchIndex) if bytesBuf == nil { if batchIndex >= 0 { - lastReadPosition = NewMessagePosition(lastReadPosition.UnixNano(), batchIndex) + lastReadPosition = NewMessagePosition(lastReadPosition.Time.UnixNano(), batchIndex) } if stopTsNs != 0 { isDone = true @@ -69,12 +93,23 @@ func (logBuffer *LogBuffer) LoopProcessLogData(readerName string, startPosition lastTsNs := logBuffer.LastTsNs.Load() for lastTsNs == logBuffer.LastTsNs.Load() { - if waitForDataFn() { - continue - } else { + if !waitForDataFn() { isDone = true return } + // Wait for notification or timeout (instant wake-up when data arrives) + select { + case <-notifyChan: + // New data available, break and retry read + glog.V(3).Infof("%s: Woke up from notification (LoopProcessLogData)", readerName) + break + case <-time.After(10 * time.Millisecond): + // Timeout, check if timestamp changed + if lastTsNs != logBuffer.LastTsNs.Load() { + break + } + glog.V(4).Infof("%s: Notification timeout (LoopProcessLogData), polling", readerName) + } } if logBuffer.IsStopping() { isDone = true @@ -104,6 +139,18 @@ func (logBuffer *LogBuffer) LoopProcessLogData(readerName string, startPosition pos += 4 + int(size) continue } + + // Handle offset-based filtering for offset-based start positions + if startPosition.IsOffsetBased { + startOffset := startPosition.GetOffset() + if logEntry.Offset < startOffset { + // Skip entries before the starting offset + pos += 4 + int(size) + batchSize++ + continue + } + } + if stopTsNs != 0 && logEntry.TsNs > stopTsNs { isDone = true // println("stopTsNs", stopTsNs, "logEntry.TsNs", logEntry.TsNs) @@ -131,63 +178,163 @@ func (logBuffer *LogBuffer) LoopProcessLogData(readerName string, startPosition } -// LoopProcessLogDataWithBatchIndex is similar to LoopProcessLogData but provides batchIndex to the callback -func (logBuffer *LogBuffer) LoopProcessLogDataWithBatchIndex(readerName string, startPosition MessagePosition, stopTsNs int64, - waitForDataFn func() bool, eachLogDataFn EachLogEntryWithBatchIndexFuncType) (lastReadPosition MessagePosition, isDone bool, err error) { +// LoopProcessLogDataWithOffset is similar to LoopProcessLogData but provides offset to the callback +func (logBuffer *LogBuffer) LoopProcessLogDataWithOffset(readerName string, startPosition MessagePosition, stopTsNs int64, + waitForDataFn func() bool, eachLogDataFn EachLogEntryWithOffsetFuncType) (lastReadPosition MessagePosition, isDone bool, err error) { + glog.V(4).Infof("LoopProcessLogDataWithOffset started for %s, startPosition=%v", readerName, startPosition) + + // Register for instant notifications (<1ms latency) + notifyChan := logBuffer.RegisterSubscriber(readerName) + defer logBuffer.UnregisterSubscriber(readerName) + // loop through all messages var bytesBuf *bytes.Buffer - var batchIndex int64 + var offset int64 lastReadPosition = startPosition var entryCounter int64 defer func() { if bytesBuf != nil { logBuffer.ReleaseMemory(bytesBuf) } - // println("LoopProcessLogDataWithBatchIndex", readerName, "sent messages total", entryCounter) + // println("LoopProcessLogDataWithOffset", readerName, "sent messages total", entryCounter) }() for { + // Check stopTsNs at the beginning of each iteration + // This ensures we exit immediately if the stop time is in the past + if stopTsNs != 0 && time.Now().UnixNano() > stopTsNs { + isDone = true + return + } if bytesBuf != nil { logBuffer.ReleaseMemory(bytesBuf) } - bytesBuf, batchIndex, err = logBuffer.ReadFromBuffer(lastReadPosition) + bytesBuf, offset, err = logBuffer.ReadFromBuffer(lastReadPosition) + glog.V(4).Infof("ReadFromBuffer for %s returned bytesBuf=%v, offset=%d, err=%v", readerName, bytesBuf != nil, offset, err) if err == ResumeFromDiskError { - time.Sleep(1127 * time.Millisecond) - return lastReadPosition, isDone, ResumeFromDiskError + // Try to read from disk if readFromDiskFn is available + if logBuffer.ReadFromDiskFn != nil { + // Wrap eachLogDataFn to match the expected signature + diskReadFn := func(logEntry *filer_pb.LogEntry) (bool, error) { + return eachLogDataFn(logEntry, logEntry.Offset) + } + lastReadPosition, isDone, err = logBuffer.ReadFromDiskFn(lastReadPosition, stopTsNs, diskReadFn) + if err != nil { + return lastReadPosition, isDone, err + } + if isDone { + return lastReadPosition, isDone, nil + } + // Continue to next iteration after disk read + } + + // CRITICAL: Check if client is still connected after disk read + if !waitForDataFn() { + // Client disconnected - exit cleanly + glog.V(4).Infof("%s: Client disconnected after disk read", readerName) + return lastReadPosition, true, nil + } + + // Wait for notification or timeout (instant wake-up when data arrives) + select { + case <-notifyChan: + // New data available, retry immediately + glog.V(3).Infof("%s: Woke up from notification after disk read", readerName) + case <-time.After(10 * time.Millisecond): + // Timeout, retry anyway (fallback for edge cases) + glog.V(4).Infof("%s: Notification timeout, polling", readerName) + } + + // Continue to next iteration (don't return ResumeFromDiskError) + continue } readSize := 0 if bytesBuf != nil { readSize = bytesBuf.Len() } - glog.V(4).Infof("%s ReadFromBuffer at %v batch %d. Read bytes %v batch %d", readerName, lastReadPosition, lastReadPosition.BatchIndex, readSize, batchIndex) + glog.V(4).Infof("%s ReadFromBuffer at %v posOffset %d. Read bytes %v bufferOffset %d", readerName, lastReadPosition, lastReadPosition.Offset, readSize, offset) if bytesBuf == nil { - if batchIndex >= 0 { - lastReadPosition = NewMessagePosition(lastReadPosition.UnixNano(), batchIndex) + // CRITICAL: Check if subscription is still active BEFORE waiting + // This prevents infinite loops when client has disconnected + if !waitForDataFn() { + glog.V(4).Infof("%s: waitForDataFn returned false, subscription ending", readerName) + return lastReadPosition, true, nil + } + + if offset >= 0 { + lastReadPosition = NewMessagePosition(lastReadPosition.Time.UnixNano(), offset) } if stopTsNs != 0 { isDone = true return } + + // CRITICAL FIX: If we're reading offset-based and there's no data in LogBuffer, + // return ResumeFromDiskError to let Subscribe try reading from disk again. + // This prevents infinite blocking when all data is on disk (e.g., after restart). + if startPosition.IsOffsetBased { + glog.V(4).Infof("%s: No data in LogBuffer for offset-based read at %v, checking if client still connected", readerName, lastReadPosition) + // Check if client is still connected before busy-looping + if !waitForDataFn() { + glog.V(4).Infof("%s: Client disconnected, stopping offset-based read", readerName) + return lastReadPosition, true, nil + } + // Wait for notification or timeout (instant wake-up when data arrives) + select { + case <-notifyChan: + // New data available, retry immediately + glog.V(3).Infof("%s: Woke up from notification for offset-based read", readerName) + case <-time.After(10 * time.Millisecond): + // Timeout, retry anyway (fallback for edge cases) + glog.V(4).Infof("%s: Notification timeout for offset-based, polling", readerName) + } + return lastReadPosition, isDone, ResumeFromDiskError + } + lastTsNs := logBuffer.LastTsNs.Load() for lastTsNs == logBuffer.LastTsNs.Load() { - if waitForDataFn() { - continue - } else { - isDone = true - return + if !waitForDataFn() { + glog.V(4).Infof("%s: Client disconnected during timestamp wait", readerName) + return lastReadPosition, true, nil + } + // Wait for notification or timeout (instant wake-up when data arrives) + select { + case <-notifyChan: + // New data available, break and retry read + glog.V(3).Infof("%s: Woke up from notification (main loop)", readerName) + break + case <-time.After(10 * time.Millisecond): + // Timeout, check if timestamp changed + if lastTsNs != logBuffer.LastTsNs.Load() { + break + } + glog.V(4).Infof("%s: Notification timeout (main loop), polling", readerName) } } if logBuffer.IsStopping() { - isDone = true - return + glog.V(4).Infof("%s: LogBuffer is stopping", readerName) + return lastReadPosition, true, nil } continue } buf := bytesBuf.Bytes() // fmt.Printf("ReadFromBuffer %s by %v size %d\n", readerName, lastReadPosition, len(buf)) + glog.V(4).Infof("Processing buffer with %d bytes for %s", len(buf), readerName) + + // If buffer is empty, check if client is still connected before looping + if len(buf) == 0 { + glog.V(4).Infof("Empty buffer for %s, checking if client still connected", readerName) + if !waitForDataFn() { + glog.V(4).Infof("%s: Client disconnected on empty buffer", readerName) + return lastReadPosition, true, nil + } + // Sleep to avoid busy-wait on empty buffer + time.Sleep(10 * time.Millisecond) + continue + } batchSize := 0 @@ -196,7 +343,7 @@ func (logBuffer *LogBuffer) LoopProcessLogDataWithBatchIndex(readerName string, size := util.BytesToUint32(buf[pos : pos+4]) if pos+4+int(size) > len(buf) { err = ResumeError - glog.Errorf("LoopProcessLogDataWithBatchIndex: %s read buffer %v read %d entries [%d,%d) from [0,%d)", readerName, lastReadPosition, batchSize, pos, pos+int(size)+4, len(buf)) + glog.Errorf("LoopProcessLogDataWithOffset: %s read buffer %v read %d entries [%d,%d) from [0,%d)", readerName, lastReadPosition, batchSize, pos, pos+int(size)+4, len(buf)) return } entryData := buf[pos+4 : pos+4+int(size)] @@ -207,19 +354,39 @@ func (logBuffer *LogBuffer) LoopProcessLogDataWithBatchIndex(readerName string, pos += 4 + int(size) continue } + + glog.V(4).Infof("Unmarshaled log entry %d: TsNs=%d, Offset=%d, Key=%s", batchSize+1, logEntry.TsNs, logEntry.Offset, string(logEntry.Key)) + + // Handle offset-based filtering for offset-based start positions + if startPosition.IsOffsetBased { + startOffset := startPosition.GetOffset() + glog.V(4).Infof("Offset-based filtering: logEntry.Offset=%d, startOffset=%d", logEntry.Offset, startOffset) + if logEntry.Offset < startOffset { + // Skip entries before the starting offset + glog.V(4).Infof("Skipping entry due to offset filter") + pos += 4 + int(size) + batchSize++ + continue + } + } + if stopTsNs != 0 && logEntry.TsNs > stopTsNs { + glog.V(4).Infof("Stopping due to stopTsNs") isDone = true // println("stopTsNs", stopTsNs, "logEntry.TsNs", logEntry.TsNs) return } - lastReadPosition = NewMessagePosition(logEntry.TsNs, batchIndex) + // CRITICAL FIX: Use logEntry.Offset + 1 to move PAST the current entry + // This prevents infinite loops where we keep requesting the same offset + lastReadPosition = NewMessagePosition(logEntry.TsNs, logEntry.Offset+1) - if isDone, err = eachLogDataFn(logEntry, batchIndex); err != nil { - glog.Errorf("LoopProcessLogDataWithBatchIndex: %s process log entry %d %v: %v", readerName, batchSize+1, logEntry, err) + glog.V(4).Infof("Calling eachLogDataFn for entry at offset %d, next position will be %d", logEntry.Offset, logEntry.Offset+1) + if isDone, err = eachLogDataFn(logEntry, logEntry.Offset); err != nil { + glog.Errorf("LoopProcessLogDataWithOffset: %s process log entry %d %v: %v", readerName, batchSize+1, logEntry, err) return } if isDone { - glog.V(0).Infof("LoopProcessLogDataWithBatchIndex: %s process log entry %d", readerName, batchSize+1) + glog.V(0).Infof("LoopProcessLogDataWithOffset: %s process log entry %d", readerName, batchSize+1) return } diff --git a/weed/util/log_buffer/log_read_test.go b/weed/util/log_buffer/log_read_test.go new file mode 100644 index 000000000..f01e2912a --- /dev/null +++ b/weed/util/log_buffer/log_read_test.go @@ -0,0 +1,329 @@ +package log_buffer + +import ( + "context" + "sync" + "testing" + "time" + + "github.com/seaweedfs/seaweedfs/weed/pb/filer_pb" + "github.com/seaweedfs/seaweedfs/weed/pb/mq_pb" +) + +// TestLoopProcessLogDataWithOffset_ClientDisconnect tests that the loop exits +// when the client disconnects (waitForDataFn returns false) +func TestLoopProcessLogDataWithOffset_ClientDisconnect(t *testing.T) { + flushFn := func(logBuffer *LogBuffer, startTime, stopTime time.Time, buf []byte, minOffset, maxOffset int64) {} + logBuffer := NewLogBuffer("test", 1*time.Minute, flushFn, nil, nil) + defer logBuffer.ShutdownLogBuffer() + + // Simulate client disconnect after 100ms + ctx, cancel := context.WithTimeout(context.Background(), 100*time.Millisecond) + defer cancel() + + waitForDataFn := func() bool { + select { + case <-ctx.Done(): + return false // Client disconnected + default: + return true + } + } + + eachLogEntryFn := func(logEntry *filer_pb.LogEntry, offset int64) (bool, error) { + return true, nil + } + + startPosition := NewMessagePositionFromOffset(0) + startTime := time.Now() + + // This should exit within 200ms (100ms timeout + some buffer) + _, isDone, _ := logBuffer.LoopProcessLogDataWithOffset("test-client", startPosition, 0, waitForDataFn, eachLogEntryFn) + + elapsed := time.Since(startTime) + + if !isDone { + t.Errorf("Expected isDone=true when client disconnects, got false") + } + + if elapsed > 500*time.Millisecond { + t.Errorf("Loop took too long to exit: %v (expected < 500ms)", elapsed) + } + + t.Logf("Loop exited cleanly in %v after client disconnect", elapsed) +} + +// TestLoopProcessLogDataWithOffset_EmptyBuffer tests that the loop doesn't +// busy-wait when the buffer is empty +func TestLoopProcessLogDataWithOffset_EmptyBuffer(t *testing.T) { + flushFn := func(logBuffer *LogBuffer, startTime, stopTime time.Time, buf []byte, minOffset, maxOffset int64) {} + logBuffer := NewLogBuffer("test", 1*time.Minute, flushFn, nil, nil) + defer logBuffer.ShutdownLogBuffer() + + callCount := 0 + maxCalls := 10 + mu := sync.Mutex{} + + waitForDataFn := func() bool { + mu.Lock() + defer mu.Unlock() + callCount++ + // Disconnect after maxCalls to prevent infinite loop + return callCount < maxCalls + } + + eachLogEntryFn := func(logEntry *filer_pb.LogEntry, offset int64) (bool, error) { + return true, nil + } + + startPosition := NewMessagePositionFromOffset(0) + startTime := time.Now() + + _, isDone, _ := logBuffer.LoopProcessLogDataWithOffset("test-client", startPosition, 0, waitForDataFn, eachLogEntryFn) + + elapsed := time.Since(startTime) + + if !isDone { + t.Errorf("Expected isDone=true when waitForDataFn returns false, got false") + } + + // With 10ms sleep per iteration, 10 iterations should take ~100ms minimum + minExpectedTime := time.Duration(maxCalls-1) * 10 * time.Millisecond + if elapsed < minExpectedTime { + t.Errorf("Loop exited too quickly (%v), expected at least %v (suggests busy-waiting)", elapsed, minExpectedTime) + } + + // But shouldn't take more than 2x expected (allows for some overhead) + maxExpectedTime := time.Duration(maxCalls) * 30 * time.Millisecond + if elapsed > maxExpectedTime { + t.Errorf("Loop took too long: %v (expected < %v)", elapsed, maxExpectedTime) + } + + mu.Lock() + finalCallCount := callCount + mu.Unlock() + + if finalCallCount != maxCalls { + t.Errorf("Expected exactly %d calls to waitForDataFn, got %d", maxCalls, finalCallCount) + } + + t.Logf("Loop exited cleanly in %v after %d iterations (no busy-waiting detected)", elapsed, finalCallCount) +} + +// TestLoopProcessLogDataWithOffset_NoDataResumeFromDisk tests that the loop +// properly handles ResumeFromDiskError without busy-waiting +func TestLoopProcessLogDataWithOffset_NoDataResumeFromDisk(t *testing.T) { + readFromDiskFn := func(startPosition MessagePosition, stopTsNs int64, eachLogEntryFn EachLogEntryFuncType) (lastReadPosition MessagePosition, isDone bool, err error) { + // No data on disk + return startPosition, false, nil + } + flushFn := func(logBuffer *LogBuffer, startTime, stopTime time.Time, buf []byte, minOffset, maxOffset int64) {} + logBuffer := NewLogBuffer("test", 1*time.Minute, flushFn, readFromDiskFn, nil) + defer logBuffer.ShutdownLogBuffer() + + callCount := 0 + maxCalls := 5 + mu := sync.Mutex{} + + waitForDataFn := func() bool { + mu.Lock() + defer mu.Unlock() + callCount++ + // Disconnect after maxCalls + return callCount < maxCalls + } + + eachLogEntryFn := func(logEntry *filer_pb.LogEntry, offset int64) (bool, error) { + return true, nil + } + + startPosition := NewMessagePositionFromOffset(0) + startTime := time.Now() + + _, isDone, _ := logBuffer.LoopProcessLogDataWithOffset("test-client", startPosition, 0, waitForDataFn, eachLogEntryFn) + + elapsed := time.Since(startTime) + + if !isDone { + t.Errorf("Expected isDone=true when waitForDataFn returns false, got false") + } + + // Should take at least (maxCalls-1) * 10ms due to sleep in ResumeFromDiskError path + minExpectedTime := time.Duration(maxCalls-1) * 10 * time.Millisecond + if elapsed < minExpectedTime { + t.Errorf("Loop exited too quickly (%v), expected at least %v (suggests missing sleep)", elapsed, minExpectedTime) + } + + t.Logf("Loop exited cleanly in %v after %d iterations (proper sleep detected)", elapsed, callCount) +} + +// TestLoopProcessLogDataWithOffset_WithData tests normal operation with data +func TestLoopProcessLogDataWithOffset_WithData(t *testing.T) { + flushFn := func(logBuffer *LogBuffer, startTime, stopTime time.Time, buf []byte, minOffset, maxOffset int64) {} + logBuffer := NewLogBuffer("test", 1*time.Minute, flushFn, nil, nil) + defer logBuffer.ShutdownLogBuffer() + + // Add some test data to the buffer + testMessages := []*mq_pb.DataMessage{ + {Key: []byte("key1"), Value: []byte("message1"), TsNs: 1}, + {Key: []byte("key2"), Value: []byte("message2"), TsNs: 2}, + {Key: []byte("key3"), Value: []byte("message3"), TsNs: 3}, + } + + for _, msg := range testMessages { + logBuffer.AddToBuffer(msg) + } + + receivedCount := 0 + mu := sync.Mutex{} + + // Disconnect after receiving at least 1 message to test that data processing works + waitForDataFn := func() bool { + mu.Lock() + defer mu.Unlock() + return receivedCount == 0 // Disconnect after first message + } + + eachLogEntryFn := func(logEntry *filer_pb.LogEntry, offset int64) (bool, error) { + mu.Lock() + receivedCount++ + mu.Unlock() + return true, nil // Continue processing + } + + startPosition := NewMessagePositionFromOffset(0) + startTime := time.Now() + + _, isDone, _ := logBuffer.LoopProcessLogDataWithOffset("test-client", startPosition, 0, waitForDataFn, eachLogEntryFn) + + elapsed := time.Since(startTime) + + if !isDone { + t.Errorf("Expected isDone=true after client disconnect, got false") + } + + mu.Lock() + finalCount := receivedCount + mu.Unlock() + + if finalCount < 1 { + t.Errorf("Expected to receive at least 1 message, got %d", finalCount) + } + + // Should complete quickly since data is available + if elapsed > 1*time.Second { + t.Errorf("Processing took too long: %v (expected < 1s)", elapsed) + } + + t.Logf("Successfully processed %d message(s) in %v", finalCount, elapsed) +} + +// TestLoopProcessLogDataWithOffset_ConcurrentDisconnect tests that the loop +// handles concurrent client disconnects without panicking +func TestLoopProcessLogDataWithOffset_ConcurrentDisconnect(t *testing.T) { + flushFn := func(logBuffer *LogBuffer, startTime, stopTime time.Time, buf []byte, minOffset, maxOffset int64) {} + logBuffer := NewLogBuffer("test", 1*time.Minute, flushFn, nil, nil) + defer logBuffer.ShutdownLogBuffer() + + numClients := 10 + var wg sync.WaitGroup + + for i := 0; i < numClients; i++ { + wg.Add(1) + go func(clientID int) { + defer wg.Done() + + ctx, cancel := context.WithTimeout(context.Background(), 50*time.Millisecond) + defer cancel() + + waitForDataFn := func() bool { + select { + case <-ctx.Done(): + return false + default: + return true + } + } + + eachLogEntryFn := func(logEntry *filer_pb.LogEntry, offset int64) (bool, error) { + return true, nil + } + + startPosition := NewMessagePositionFromOffset(0) + _, _, _ = logBuffer.LoopProcessLogDataWithOffset("test-client", startPosition, 0, waitForDataFn, eachLogEntryFn) + }(i) + } + + // Wait for all clients to finish with a timeout + done := make(chan struct{}) + go func() { + wg.Wait() + close(done) + }() + + select { + case <-done: + t.Logf("All %d concurrent clients exited cleanly", numClients) + case <-time.After(5 * time.Second): + t.Errorf("Timeout waiting for concurrent clients to exit (possible deadlock or stuck loop)") + } +} + +// TestLoopProcessLogDataWithOffset_StopTime tests that the loop respects stopTsNs +func TestLoopProcessLogDataWithOffset_StopTime(t *testing.T) { + flushFn := func(logBuffer *LogBuffer, startTime, stopTime time.Time, buf []byte, minOffset, maxOffset int64) {} + logBuffer := NewLogBuffer("test", 1*time.Minute, flushFn, nil, nil) + defer logBuffer.ShutdownLogBuffer() + + callCount := 0 + waitForDataFn := func() bool { + callCount++ + // Prevent infinite loop in case of test failure + return callCount < 10 + } + + eachLogEntryFn := func(logEntry *filer_pb.LogEntry, offset int64) (bool, error) { + t.Errorf("Should not process any entries when stopTsNs is in the past") + return false, nil + } + + startPosition := NewMessagePositionFromOffset(0) + stopTsNs := time.Now().Add(-1 * time.Hour).UnixNano() // Stop time in the past + + startTime := time.Now() + _, isDone, _ := logBuffer.LoopProcessLogDataWithOffset("test-client", startPosition, stopTsNs, waitForDataFn, eachLogEntryFn) + elapsed := time.Since(startTime) + + if !isDone { + t.Errorf("Expected isDone=true when stopTsNs is in the past, got false") + } + + if elapsed > 1*time.Second { + t.Errorf("Loop should exit quickly when stopTsNs is in the past, took %v", elapsed) + } + + t.Logf("Loop correctly exited for past stopTsNs in %v (waitForDataFn called %d times)", elapsed, callCount) +} + +// BenchmarkLoopProcessLogDataWithOffset_EmptyBuffer benchmarks the performance +// of the loop with an empty buffer to ensure no busy-waiting +func BenchmarkLoopProcessLogDataWithOffset_EmptyBuffer(b *testing.B) { + flushFn := func(logBuffer *LogBuffer, startTime, stopTime time.Time, buf []byte, minOffset, maxOffset int64) {} + logBuffer := NewLogBuffer("test", 1*time.Minute, flushFn, nil, nil) + defer logBuffer.ShutdownLogBuffer() + + for i := 0; i < b.N; i++ { + callCount := 0 + waitForDataFn := func() bool { + callCount++ + return callCount < 3 // Exit after 3 calls + } + + eachLogEntryFn := func(logEntry *filer_pb.LogEntry, offset int64) (bool, error) { + return true, nil + } + + startPosition := NewMessagePositionFromOffset(0) + logBuffer.LoopProcessLogDataWithOffset("test-client", startPosition, 0, waitForDataFn, eachLogEntryFn) + } +} diff --git a/weed/util/log_buffer/sealed_buffer.go b/weed/util/log_buffer/sealed_buffer.go index c41b30fcc..397dab1d4 100644 --- a/weed/util/log_buffer/sealed_buffer.go +++ b/weed/util/log_buffer/sealed_buffer.go @@ -6,11 +6,12 @@ import ( ) type MemBuffer struct { - buf []byte - size int - startTime time.Time - stopTime time.Time - batchIndex int64 + buf []byte + size int + startTime time.Time + stopTime time.Time + startOffset int64 // First offset in this buffer + offset int64 // Last offset in this buffer (endOffset) } type SealedBuffers struct { @@ -30,7 +31,7 @@ func newSealedBuffers(size int) *SealedBuffers { return sbs } -func (sbs *SealedBuffers) SealBuffer(startTime, stopTime time.Time, buf []byte, pos int, batchIndex int64) (newBuf []byte) { +func (sbs *SealedBuffers) SealBuffer(startTime, stopTime time.Time, buf []byte, pos int, startOffset int64, endOffset int64) (newBuf []byte) { oldMemBuffer := sbs.buffers[0] size := len(sbs.buffers) for i := 0; i < size-1; i++ { @@ -38,13 +39,15 @@ func (sbs *SealedBuffers) SealBuffer(startTime, stopTime time.Time, buf []byte, sbs.buffers[i].size = sbs.buffers[i+1].size sbs.buffers[i].startTime = sbs.buffers[i+1].startTime sbs.buffers[i].stopTime = sbs.buffers[i+1].stopTime - sbs.buffers[i].batchIndex = sbs.buffers[i+1].batchIndex + sbs.buffers[i].startOffset = sbs.buffers[i+1].startOffset + sbs.buffers[i].offset = sbs.buffers[i+1].offset } sbs.buffers[size-1].buf = buf sbs.buffers[size-1].size = pos sbs.buffers[size-1].startTime = startTime sbs.buffers[size-1].stopTime = stopTime - sbs.buffers[size-1].batchIndex = batchIndex + sbs.buffers[size-1].startOffset = startOffset + sbs.buffers[size-1].offset = endOffset return oldMemBuffer.buf } |
