aboutsummaryrefslogtreecommitdiff
path: root/weed/util/log_buffer
diff options
context:
space:
mode:
Diffstat (limited to 'weed/util/log_buffer')
-rw-r--r--weed/util/log_buffer/disk_buffer_cache.go195
-rw-r--r--weed/util/log_buffer/log_buffer.go501
-rw-r--r--weed/util/log_buffer/log_buffer_queryability_test.go238
-rw-r--r--weed/util/log_buffer/log_buffer_test.go485
-rw-r--r--weed/util/log_buffer/log_read.go233
-rw-r--r--weed/util/log_buffer/log_read_test.go329
-rw-r--r--weed/util/log_buffer/sealed_buffer.go19
7 files changed, 1911 insertions, 89 deletions
diff --git a/weed/util/log_buffer/disk_buffer_cache.go b/weed/util/log_buffer/disk_buffer_cache.go
new file mode 100644
index 000000000..ceafa9329
--- /dev/null
+++ b/weed/util/log_buffer/disk_buffer_cache.go
@@ -0,0 +1,195 @@
+package log_buffer
+
+import (
+ "container/list"
+ "sync"
+ "time"
+
+ "github.com/seaweedfs/seaweedfs/weed/glog"
+)
+
+// DiskBufferCache is a small LRU cache for recently-read historical data buffers
+// This reduces Filer load when multiple consumers are catching up on historical messages
+type DiskBufferCache struct {
+ maxSize int
+ ttl time.Duration
+ cache map[string]*cacheEntry
+ lruList *list.List
+ mu sync.RWMutex
+ hits int64
+ misses int64
+ evictions int64
+}
+
+type cacheEntry struct {
+ key string
+ data []byte
+ offset int64
+ timestamp time.Time
+ lruElement *list.Element
+ isNegative bool // true if this is a negative cache entry (data not found)
+}
+
+// NewDiskBufferCache creates a new cache with the specified size and TTL
+// Recommended size: 3-5 buffers (each ~8MB)
+// Recommended TTL: 30-60 seconds
+func NewDiskBufferCache(maxSize int, ttl time.Duration) *DiskBufferCache {
+ cache := &DiskBufferCache{
+ maxSize: maxSize,
+ ttl: ttl,
+ cache: make(map[string]*cacheEntry),
+ lruList: list.New(),
+ }
+
+ // Start background cleanup goroutine
+ go cache.cleanupLoop()
+
+ return cache
+}
+
+// Get retrieves a buffer from the cache
+// Returns (data, offset, found)
+// If found=true and data=nil, this is a negative cache entry (data doesn't exist)
+func (c *DiskBufferCache) Get(key string) ([]byte, int64, bool) {
+ c.mu.Lock()
+ defer c.mu.Unlock()
+
+ entry, exists := c.cache[key]
+ if !exists {
+ c.misses++
+ return nil, 0, false
+ }
+
+ // Check if entry has expired
+ if time.Since(entry.timestamp) > c.ttl {
+ c.evict(entry)
+ c.misses++
+ return nil, 0, false
+ }
+
+ // Move to front of LRU list (most recently used)
+ c.lruList.MoveToFront(entry.lruElement)
+ c.hits++
+
+ if entry.isNegative {
+ glog.V(4).Infof("📦 CACHE HIT (NEGATIVE): key=%s - data not found (hits=%d misses=%d)",
+ key, c.hits, c.misses)
+ } else {
+ glog.V(4).Infof("📦 CACHE HIT: key=%s offset=%d size=%d (hits=%d misses=%d)",
+ key, entry.offset, len(entry.data), c.hits, c.misses)
+ }
+
+ return entry.data, entry.offset, true
+}
+
+// Put adds a buffer to the cache
+// If data is nil, this creates a negative cache entry (data doesn't exist)
+func (c *DiskBufferCache) Put(key string, data []byte, offset int64) {
+ c.mu.Lock()
+ defer c.mu.Unlock()
+
+ isNegative := data == nil
+
+ // Check if entry already exists
+ if entry, exists := c.cache[key]; exists {
+ // Update existing entry
+ entry.data = data
+ entry.offset = offset
+ entry.timestamp = time.Now()
+ entry.isNegative = isNegative
+ c.lruList.MoveToFront(entry.lruElement)
+ if isNegative {
+ glog.V(4).Infof("📦 CACHE UPDATE (NEGATIVE): key=%s - data not found", key)
+ } else {
+ glog.V(4).Infof("📦 CACHE UPDATE: key=%s offset=%d size=%d", key, offset, len(data))
+ }
+ return
+ }
+
+ // Evict oldest entry if cache is full
+ if c.lruList.Len() >= c.maxSize {
+ oldest := c.lruList.Back()
+ if oldest != nil {
+ c.evict(oldest.Value.(*cacheEntry))
+ }
+ }
+
+ // Add new entry
+ entry := &cacheEntry{
+ key: key,
+ data: data,
+ offset: offset,
+ timestamp: time.Now(),
+ isNegative: isNegative,
+ }
+ entry.lruElement = c.lruList.PushFront(entry)
+ c.cache[key] = entry
+
+ if isNegative {
+ glog.V(4).Infof("📦 CACHE PUT (NEGATIVE): key=%s - data not found (cache_size=%d/%d)",
+ key, c.lruList.Len(), c.maxSize)
+ } else {
+ glog.V(4).Infof("📦 CACHE PUT: key=%s offset=%d size=%d (cache_size=%d/%d)",
+ key, offset, len(data), c.lruList.Len(), c.maxSize)
+ }
+}
+
+// evict removes an entry from the cache (must be called with lock held)
+func (c *DiskBufferCache) evict(entry *cacheEntry) {
+ delete(c.cache, entry.key)
+ c.lruList.Remove(entry.lruElement)
+ c.evictions++
+ glog.V(4).Infof("📦 CACHE EVICT: key=%s (evictions=%d)", entry.key, c.evictions)
+}
+
+// cleanupLoop periodically removes expired entries
+func (c *DiskBufferCache) cleanupLoop() {
+ ticker := time.NewTicker(c.ttl / 2)
+ defer ticker.Stop()
+
+ for range ticker.C {
+ c.cleanup()
+ }
+}
+
+// cleanup removes expired entries
+func (c *DiskBufferCache) cleanup() {
+ c.mu.Lock()
+ defer c.mu.Unlock()
+
+ now := time.Now()
+ var toEvict []*cacheEntry
+
+ // Find expired entries
+ for _, entry := range c.cache {
+ if now.Sub(entry.timestamp) > c.ttl {
+ toEvict = append(toEvict, entry)
+ }
+ }
+
+ // Evict expired entries
+ for _, entry := range toEvict {
+ c.evict(entry)
+ }
+
+ if len(toEvict) > 0 {
+ glog.V(3).Infof("📦 CACHE CLEANUP: evicted %d expired entries", len(toEvict))
+ }
+}
+
+// Stats returns cache statistics
+func (c *DiskBufferCache) Stats() (hits, misses, evictions int64, size int) {
+ c.mu.RLock()
+ defer c.mu.RUnlock()
+ return c.hits, c.misses, c.evictions, c.lruList.Len()
+}
+
+// Clear removes all entries from the cache
+func (c *DiskBufferCache) Clear() {
+ c.mu.Lock()
+ defer c.mu.Unlock()
+
+ c.cache = make(map[string]*cacheEntry)
+ c.lruList = list.New()
+ glog.V(2).Infof("📦 CACHE CLEARED")
+}
diff --git a/weed/util/log_buffer/log_buffer.go b/weed/util/log_buffer/log_buffer.go
index 15ea062c6..aff8ec80b 100644
--- a/weed/util/log_buffer/log_buffer.go
+++ b/weed/util/log_buffer/log_buffer.go
@@ -2,6 +2,8 @@ package log_buffer
import (
"bytes"
+ "math"
+ "strings"
"sync"
"sync/atomic"
"time"
@@ -21,11 +23,14 @@ type dataToFlush struct {
startTime time.Time
stopTime time.Time
data *bytes.Buffer
+ minOffset int64
+ maxOffset int64
+ done chan struct{} // Signal when flush completes
}
type EachLogEntryFuncType func(logEntry *filer_pb.LogEntry) (isDone bool, err error)
-type EachLogEntryWithBatchIndexFuncType func(logEntry *filer_pb.LogEntry, batchIndex int64) (isDone bool, err error)
-type LogFlushFuncType func(logBuffer *LogBuffer, startTime, stopTime time.Time, buf []byte)
+type EachLogEntryWithOffsetFuncType func(logEntry *filer_pb.LogEntry, offset int64) (isDone bool, err error)
+type LogFlushFuncType func(logBuffer *LogBuffer, startTime, stopTime time.Time, buf []byte, minOffset, maxOffset int64)
type LogReadFromDiskFuncType func(startPosition MessagePosition, stopTsNs int64, eachLogEntryFn EachLogEntryFuncType) (lastReadPosition MessagePosition, isDone bool, err error)
type LogBuffer struct {
@@ -33,7 +38,8 @@ type LogBuffer struct {
name string
prevBuffers *SealedBuffers
buf []byte
- batchIndex int64
+ offset int64 // Last offset in current buffer (endOffset)
+ bufferStartOffset int64 // First offset in current buffer
idx []int
pos int
startTime time.Time
@@ -44,10 +50,19 @@ type LogBuffer struct {
flushFn LogFlushFuncType
ReadFromDiskFn LogReadFromDiskFuncType
notifyFn func()
- isStopping *atomic.Bool
- isAllFlushed bool
- flushChan chan *dataToFlush
- LastTsNs atomic.Int64
+ // Per-subscriber notification channels for instant wake-up
+ subscribersMu sync.RWMutex
+ subscribers map[string]chan struct{} // subscriberID -> notification channel
+ isStopping *atomic.Bool
+ isAllFlushed bool
+ flushChan chan *dataToFlush
+ LastTsNs atomic.Int64
+ // Offset range tracking for Kafka integration
+ minOffset int64
+ maxOffset int64
+ hasOffsets bool
+ lastFlushedOffset atomic.Int64 // Highest offset that has been flushed to disk (-1 = nothing flushed yet)
+ lastFlushedTime atomic.Int64 // Latest timestamp that has been flushed to disk (0 = nothing flushed yet)
sync.RWMutex
}
@@ -62,19 +77,250 @@ func NewLogBuffer(name string, flushInterval time.Duration, flushFn LogFlushFunc
flushFn: flushFn,
ReadFromDiskFn: readFromDiskFn,
notifyFn: notifyFn,
+ subscribers: make(map[string]chan struct{}),
flushChan: make(chan *dataToFlush, 256),
isStopping: new(atomic.Bool),
- batchIndex: time.Now().UnixNano(), // Initialize with creation time for uniqueness across restarts
+ offset: 0, // Will be initialized from existing data if available
}
+ lb.lastFlushedOffset.Store(-1) // Nothing flushed to disk yet
go lb.loopFlush()
go lb.loopInterval()
return lb
}
+// RegisterSubscriber registers a subscriber for instant notifications when data is written
+// Returns a channel that will receive notifications (<1ms latency)
+func (logBuffer *LogBuffer) RegisterSubscriber(subscriberID string) chan struct{} {
+ logBuffer.subscribersMu.Lock()
+ defer logBuffer.subscribersMu.Unlock()
+
+ // Check if already registered
+ if existingChan, exists := logBuffer.subscribers[subscriberID]; exists {
+ glog.V(2).Infof("Subscriber %s already registered for %s, reusing channel", subscriberID, logBuffer.name)
+ return existingChan
+ }
+
+ // Create buffered channel (size 1) so notifications never block
+ notifyChan := make(chan struct{}, 1)
+ logBuffer.subscribers[subscriberID] = notifyChan
+ glog.V(1).Infof("Registered subscriber %s for %s (total: %d)", subscriberID, logBuffer.name, len(logBuffer.subscribers))
+ return notifyChan
+}
+
+// UnregisterSubscriber removes a subscriber and closes its notification channel
+func (logBuffer *LogBuffer) UnregisterSubscriber(subscriberID string) {
+ logBuffer.subscribersMu.Lock()
+ defer logBuffer.subscribersMu.Unlock()
+
+ if ch, exists := logBuffer.subscribers[subscriberID]; exists {
+ close(ch)
+ delete(logBuffer.subscribers, subscriberID)
+ glog.V(1).Infof("Unregistered subscriber %s from %s (remaining: %d)", subscriberID, logBuffer.name, len(logBuffer.subscribers))
+ }
+}
+
+// IsOffsetInMemory checks if the given offset is available in the in-memory buffer
+// Returns true if:
+// 1. Offset is newer than what's been flushed to disk (must be in memory)
+// 2. Offset is in current buffer or previous buffers (may be flushed but still in memory)
+// Returns false if offset is older than memory buffers (only on disk)
+func (logBuffer *LogBuffer) IsOffsetInMemory(offset int64) bool {
+ logBuffer.RLock()
+ defer logBuffer.RUnlock()
+
+ // Check if we're tracking offsets at all
+ if !logBuffer.hasOffsets {
+ return false // No offsets tracked yet
+ }
+
+ // OPTIMIZATION: If offset is newer than what's been flushed to disk,
+ // it MUST be in memory (not written to disk yet)
+ lastFlushed := logBuffer.lastFlushedOffset.Load()
+ if lastFlushed >= 0 && offset > lastFlushed {
+ glog.V(3).Infof("Offset %d is in memory (newer than lastFlushed=%d)", offset, lastFlushed)
+ return true
+ }
+
+ // Check if offset is in current buffer range AND buffer has data
+ // (data can be both on disk AND in memory during flush window)
+ if offset >= logBuffer.bufferStartOffset && offset <= logBuffer.offset {
+ // CRITICAL: Check if buffer actually has data (pos > 0)
+ // After flush, pos=0 but range is still valid - data is on disk, not in memory
+ if logBuffer.pos > 0 {
+ glog.V(3).Infof("Offset %d is in current buffer [%d-%d] with data", offset, logBuffer.bufferStartOffset, logBuffer.offset)
+ return true
+ }
+ // Buffer is empty (just flushed) - data is on disk
+ glog.V(3).Infof("Offset %d in range [%d-%d] but buffer empty (pos=0), data on disk", offset, logBuffer.bufferStartOffset, logBuffer.offset)
+ return false
+ }
+
+ // Check if offset is in previous buffers AND they have data
+ for _, buf := range logBuffer.prevBuffers.buffers {
+ if offset >= buf.startOffset && offset <= buf.offset {
+ // Check if prevBuffer actually has data
+ if buf.size > 0 {
+ glog.V(3).Infof("Offset %d is in previous buffer [%d-%d] with data", offset, buf.startOffset, buf.offset)
+ return true
+ }
+ // Buffer is empty (flushed) - data is on disk
+ glog.V(3).Infof("Offset %d in prevBuffer [%d-%d] but empty (size=0), data on disk", offset, buf.startOffset, buf.offset)
+ return false
+ }
+ }
+
+ // Offset is older than memory buffers - only available on disk
+ glog.V(3).Infof("Offset %d is NOT in memory (bufferStart=%d, lastFlushed=%d)", offset, logBuffer.bufferStartOffset, lastFlushed)
+ return false
+}
+
+// notifySubscribers sends notifications to all registered subscribers
+// Non-blocking: uses select with default to avoid blocking on full channels
+func (logBuffer *LogBuffer) notifySubscribers() {
+ logBuffer.subscribersMu.RLock()
+ defer logBuffer.subscribersMu.RUnlock()
+
+ if len(logBuffer.subscribers) == 0 {
+ return // No subscribers, skip notification
+ }
+
+ for subscriberID, notifyChan := range logBuffer.subscribers {
+ select {
+ case notifyChan <- struct{}{}:
+ // Notification sent successfully
+ glog.V(3).Infof("Notified subscriber %s for %s", subscriberID, logBuffer.name)
+ default:
+ // Channel full - subscriber hasn't consumed previous notification yet
+ // This is OK because one notification is sufficient to wake the subscriber
+ glog.V(3).Infof("Subscriber %s notification channel full (OK - already notified)", subscriberID)
+ }
+ }
+}
+
+// InitializeOffsetFromExistingData initializes the offset counter from existing data on disk
+// This should be called after LogBuffer creation to ensure offset continuity on restart
+func (logBuffer *LogBuffer) InitializeOffsetFromExistingData(getHighestOffsetFn func() (int64, error)) error {
+ if getHighestOffsetFn == nil {
+ return nil // No initialization function provided
+ }
+
+ highestOffset, err := getHighestOffsetFn()
+ if err != nil {
+ glog.V(0).Infof("Failed to get highest offset for %s: %v, starting from 0", logBuffer.name, err)
+ return nil // Continue with offset 0 if we can't read existing data
+ }
+
+ if highestOffset >= 0 {
+ // Set the next offset to be one after the highest existing offset
+ nextOffset := highestOffset + 1
+ logBuffer.offset = nextOffset
+ // CRITICAL FIX: bufferStartOffset should match offset after initialization
+ // This ensures that reads for old offsets (0...highestOffset) will trigger disk reads
+ // New data written after this will start at nextOffset
+ logBuffer.bufferStartOffset = nextOffset
+ // CRITICAL: Track that data [0...highestOffset] is on disk
+ logBuffer.lastFlushedOffset.Store(highestOffset)
+ // Set lastFlushedTime to current time (we know data up to highestOffset is on disk)
+ logBuffer.lastFlushedTime.Store(time.Now().UnixNano())
+ glog.V(0).Infof("Initialized LogBuffer %s offset to %d (highest existing: %d), buffer starts at %d, lastFlushedOffset=%d, lastFlushedTime=%v",
+ logBuffer.name, nextOffset, highestOffset, nextOffset, highestOffset, time.Now())
+ } else {
+ logBuffer.bufferStartOffset = 0 // Start from offset 0
+ // No data on disk yet
+ glog.V(0).Infof("No existing data found for %s, starting from offset 0, lastFlushedOffset=-1, lastFlushedTime=0", logBuffer.name)
+ }
+
+ return nil
+}
+
func (logBuffer *LogBuffer) AddToBuffer(message *mq_pb.DataMessage) {
logBuffer.AddDataToBuffer(message.Key, message.Value, message.TsNs)
}
+// AddLogEntryToBuffer directly adds a LogEntry to the buffer, preserving offset information
+func (logBuffer *LogBuffer) AddLogEntryToBuffer(logEntry *filer_pb.LogEntry) {
+ logEntryData, _ := proto.Marshal(logEntry)
+
+ var toFlush *dataToFlush
+ logBuffer.Lock()
+ defer func() {
+ logBuffer.Unlock()
+ if toFlush != nil {
+ logBuffer.flushChan <- toFlush
+ }
+ if logBuffer.notifyFn != nil {
+ logBuffer.notifyFn()
+ }
+ // Notify all registered subscribers instantly (<1ms latency)
+ logBuffer.notifySubscribers()
+ }()
+
+ processingTsNs := logEntry.TsNs
+ ts := time.Unix(0, processingTsNs)
+
+ // Handle timestamp collision inside lock (rare case)
+ if logBuffer.LastTsNs.Load() >= processingTsNs {
+ processingTsNs = logBuffer.LastTsNs.Add(1)
+ ts = time.Unix(0, processingTsNs)
+ // Re-marshal with corrected timestamp
+ logEntry.TsNs = processingTsNs
+ logEntryData, _ = proto.Marshal(logEntry)
+ } else {
+ logBuffer.LastTsNs.Store(processingTsNs)
+ }
+
+ size := len(logEntryData)
+
+ if logBuffer.pos == 0 {
+ logBuffer.startTime = ts
+ // Reset offset tracking for new buffer
+ logBuffer.hasOffsets = false
+ }
+
+ // Track offset ranges for Kafka integration
+ // CRITICAL FIX: Use >= 0 to include offset 0 (first message in a topic)
+ if logEntry.Offset >= 0 {
+ if !logBuffer.hasOffsets {
+ logBuffer.minOffset = logEntry.Offset
+ logBuffer.maxOffset = logEntry.Offset
+ logBuffer.hasOffsets = true
+ } else {
+ if logEntry.Offset < logBuffer.minOffset {
+ logBuffer.minOffset = logEntry.Offset
+ }
+ if logEntry.Offset > logBuffer.maxOffset {
+ logBuffer.maxOffset = logEntry.Offset
+ }
+ }
+ }
+
+ if logBuffer.startTime.Add(logBuffer.flushInterval).Before(ts) || len(logBuffer.buf)-logBuffer.pos < size+4 {
+ toFlush = logBuffer.copyToFlush()
+ logBuffer.startTime = ts
+ if len(logBuffer.buf) < size+4 {
+ // Validate size to prevent integer overflow in computation BEFORE allocation
+ const maxBufferSize = 1 << 30 // 1 GiB practical limit
+ // Ensure 2*size + 4 won't overflow int and stays within practical bounds
+ if size < 0 || size > (math.MaxInt-4)/2 || size > (maxBufferSize-4)/2 {
+ glog.Errorf("Buffer size out of valid range: %d bytes, skipping", size)
+ return
+ }
+ // Safe to compute now that we've validated size is in valid range
+ newSize := 2*size + 4
+ logBuffer.buf = make([]byte, newSize)
+ }
+ }
+ logBuffer.stopTime = ts
+
+ logBuffer.idx = append(logBuffer.idx, logBuffer.pos)
+ util.Uint32toBytes(logBuffer.sizeBuf, uint32(size))
+ copy(logBuffer.buf[logBuffer.pos:logBuffer.pos+4], logBuffer.sizeBuf)
+ copy(logBuffer.buf[logBuffer.pos+4:logBuffer.pos+4+size], logEntryData)
+ logBuffer.pos += size + 4
+
+ logBuffer.offset++
+}
+
func (logBuffer *LogBuffer) AddDataToBuffer(partitionKey, data []byte, processingTsNs int64) {
// PERFORMANCE OPTIMIZATION: Pre-process expensive operations OUTSIDE the lock
@@ -105,6 +351,8 @@ func (logBuffer *LogBuffer) AddDataToBuffer(partitionKey, data []byte, processin
if logBuffer.notifyFn != nil {
logBuffer.notifyFn()
}
+ // Notify all registered subscribers instantly (<1ms latency)
+ logBuffer.notifySubscribers()
}()
// Handle timestamp collision inside lock (rare case)
@@ -125,11 +373,20 @@ func (logBuffer *LogBuffer) AddDataToBuffer(partitionKey, data []byte, processin
}
if logBuffer.startTime.Add(logBuffer.flushInterval).Before(ts) || len(logBuffer.buf)-logBuffer.pos < size+4 {
- // glog.V(0).Infof("%s copyToFlush1 batch:%d count:%d start time %v, ts %v, remaining %d bytes", logBuffer.name, logBuffer.batchIndex, len(logBuffer.idx), logBuffer.startTime, ts, len(logBuffer.buf)-logBuffer.pos)
+ // glog.V(0).Infof("%s copyToFlush1 offset:%d count:%d start time %v, ts %v, remaining %d bytes", logBuffer.name, logBuffer.offset, len(logBuffer.idx), logBuffer.startTime, ts, len(logBuffer.buf)-logBuffer.pos)
toFlush = logBuffer.copyToFlush()
logBuffer.startTime = ts
if len(logBuffer.buf) < size+4 {
- logBuffer.buf = make([]byte, 2*size+4)
+ // Validate size to prevent integer overflow in computation BEFORE allocation
+ const maxBufferSize = 1 << 30 // 1 GiB practical limit
+ // Ensure 2*size + 4 won't overflow int and stays within practical bounds
+ if size < 0 || size > (math.MaxInt-4)/2 || size > (maxBufferSize-4)/2 {
+ glog.Errorf("Buffer size out of valid range: %d bytes, skipping", size)
+ return
+ }
+ // Safe to compute now that we've validated size is in valid range
+ newSize := 2*size + 4
+ logBuffer.buf = make([]byte, newSize)
}
}
logBuffer.stopTime = ts
@@ -140,14 +397,44 @@ func (logBuffer *LogBuffer) AddDataToBuffer(partitionKey, data []byte, processin
copy(logBuffer.buf[logBuffer.pos+4:logBuffer.pos+4+size], logEntryData)
logBuffer.pos += size + 4
- // fmt.Printf("partitionKey %v entry size %d total %d count %d\n", string(partitionKey), size, m.pos, len(m.idx))
-
}
func (logBuffer *LogBuffer) IsStopping() bool {
return logBuffer.isStopping.Load()
}
+// ForceFlush immediately flushes the current buffer content and WAITS for completion
+// This is useful for critical topics that need immediate persistence
+// CRITICAL: This function is now SYNCHRONOUS - it blocks until the flush completes
+func (logBuffer *LogBuffer) ForceFlush() {
+ if logBuffer.isStopping.Load() {
+ return // Don't flush if we're shutting down
+ }
+
+ logBuffer.Lock()
+ toFlush := logBuffer.copyToFlushWithCallback()
+ logBuffer.Unlock()
+
+ if toFlush != nil {
+ // Send to flush channel (with reasonable timeout)
+ select {
+ case logBuffer.flushChan <- toFlush:
+ // Successfully queued for flush - now WAIT for it to complete
+ select {
+ case <-toFlush.done:
+ // Flush completed successfully
+ glog.V(1).Infof("ForceFlush completed for %s", logBuffer.name)
+ case <-time.After(5 * time.Second):
+ // Timeout waiting for flush - this shouldn't happen
+ glog.Warningf("ForceFlush timed out waiting for completion on %s", logBuffer.name)
+ }
+ case <-time.After(2 * time.Second):
+ // If flush channel is still blocked after 2s, something is wrong
+ glog.Warningf("ForceFlush channel timeout for %s - flush channel busy for 2s", logBuffer.name)
+ }
+ }
+}
+
// ShutdownLogBuffer flushes the buffer and stops the log buffer
func (logBuffer *LogBuffer) ShutdownLogBuffer() {
isAlreadyStopped := logBuffer.isStopping.Swap(true)
@@ -168,10 +455,24 @@ func (logBuffer *LogBuffer) loopFlush() {
for d := range logBuffer.flushChan {
if d != nil {
// glog.V(4).Infof("%s flush [%v, %v] size %d", m.name, d.startTime, d.stopTime, len(d.data.Bytes()))
- logBuffer.flushFn(logBuffer, d.startTime, d.stopTime, d.data.Bytes())
+ logBuffer.flushFn(logBuffer, d.startTime, d.stopTime, d.data.Bytes(), d.minOffset, d.maxOffset)
d.releaseMemory()
// local logbuffer is different from aggregate logbuffer here
logBuffer.lastFlushDataTime = d.stopTime
+
+ // CRITICAL: Track what's been flushed to disk for both offset-based and time-based reads
+ // CRITICAL FIX: Use >= 0 to include offset 0 (first message in a topic)
+ if d.maxOffset >= 0 {
+ logBuffer.lastFlushedOffset.Store(d.maxOffset)
+ }
+ if !d.stopTime.IsZero() {
+ logBuffer.lastFlushedTime.Store(d.stopTime.UnixNano())
+ }
+
+ // Signal completion if there's a callback channel
+ if d.done != nil {
+ close(d.done)
+ }
}
}
logBuffer.isAllFlushed = true
@@ -183,6 +484,7 @@ func (logBuffer *LogBuffer) loopInterval() {
if logBuffer.IsStopping() {
return
}
+
logBuffer.Lock()
toFlush := logBuffer.copyToFlush()
logBuffer.Unlock()
@@ -196,27 +498,48 @@ func (logBuffer *LogBuffer) loopInterval() {
}
func (logBuffer *LogBuffer) copyToFlush() *dataToFlush {
+ return logBuffer.copyToFlushInternal(false)
+}
+
+func (logBuffer *LogBuffer) copyToFlushWithCallback() *dataToFlush {
+ return logBuffer.copyToFlushInternal(true)
+}
+
+func (logBuffer *LogBuffer) copyToFlushInternal(withCallback bool) *dataToFlush {
if logBuffer.pos > 0 {
- // fmt.Printf("flush buffer %d pos %d empty space %d\n", len(m.buf), m.pos, len(m.buf)-m.pos)
var d *dataToFlush
if logBuffer.flushFn != nil {
d = &dataToFlush{
startTime: logBuffer.startTime,
stopTime: logBuffer.stopTime,
data: copiedBytes(logBuffer.buf[:logBuffer.pos]),
+ minOffset: logBuffer.minOffset,
+ maxOffset: logBuffer.maxOffset,
+ }
+ // Add callback channel for synchronous ForceFlush
+ if withCallback {
+ d.done = make(chan struct{})
}
// glog.V(4).Infof("%s flushing [0,%d) with %d entries [%v, %v]", m.name, m.pos, len(m.idx), m.startTime, m.stopTime)
} else {
// glog.V(4).Infof("%s removed from memory [0,%d) with %d entries [%v, %v]", m.name, m.pos, len(m.idx), m.startTime, m.stopTime)
logBuffer.lastFlushDataTime = logBuffer.stopTime
}
- logBuffer.buf = logBuffer.prevBuffers.SealBuffer(logBuffer.startTime, logBuffer.stopTime, logBuffer.buf, logBuffer.pos, logBuffer.batchIndex)
+ // CRITICAL: logBuffer.offset is the "next offset to assign", so last offset in buffer is offset-1
+ lastOffsetInBuffer := logBuffer.offset - 1
+ logBuffer.buf = logBuffer.prevBuffers.SealBuffer(logBuffer.startTime, logBuffer.stopTime, logBuffer.buf, logBuffer.pos, logBuffer.bufferStartOffset, lastOffsetInBuffer)
logBuffer.startTime = time.Unix(0, 0)
logBuffer.stopTime = time.Unix(0, 0)
logBuffer.pos = 0
logBuffer.idx = logBuffer.idx[:0]
- logBuffer.batchIndex++
+ // DON'T increment offset - it's already pointing to the next offset!
+ // logBuffer.offset++ // REMOVED - this was causing offset gaps!
+ logBuffer.bufferStartOffset = logBuffer.offset // Next buffer starts at current offset (which is already the next one)
+ // Reset offset tracking
+ logBuffer.hasOffsets = false
+ logBuffer.minOffset = 0
+ logBuffer.maxOffset = 0
return d
}
return nil
@@ -227,8 +550,8 @@ func (logBuffer *LogBuffer) GetEarliestTime() time.Time {
}
func (logBuffer *LogBuffer) GetEarliestPosition() MessagePosition {
return MessagePosition{
- Time: logBuffer.startTime,
- BatchIndex: logBuffer.batchIndex,
+ Time: logBuffer.startTime,
+ Offset: logBuffer.offset,
}
}
@@ -241,6 +564,93 @@ func (logBuffer *LogBuffer) ReadFromBuffer(lastReadPosition MessagePosition) (bu
logBuffer.RLock()
defer logBuffer.RUnlock()
+ isOffsetBased := lastReadPosition.IsOffsetBased
+
+ // CRITICAL FIX: For offset-based subscriptions, use offset comparisons, not time comparisons!
+ if isOffsetBased {
+ requestedOffset := lastReadPosition.Offset
+
+ // DEBUG: Log buffer state for _schemas topic
+ if strings.Contains(logBuffer.name, "_schemas") {
+ glog.Infof("[SCHEMAS ReadFromBuffer] requested=%d bufferStart=%d bufferEnd=%d pos=%d lastFlushed=%d",
+ requestedOffset, logBuffer.bufferStartOffset, logBuffer.offset, logBuffer.pos, logBuffer.lastFlushedOffset.Load())
+ }
+
+ // Check if the requested offset is in the current buffer range
+ if requestedOffset >= logBuffer.bufferStartOffset && requestedOffset <= logBuffer.offset {
+ // If current buffer is empty (pos=0), check if data is on disk or not yet written
+ if logBuffer.pos == 0 {
+ // CRITICAL FIX: If buffer is empty but offset range covers the request,
+ // it means data was in memory and has been flushed/moved out.
+ // The bufferStartOffset advancing to cover this offset proves data existed.
+ //
+ // Three cases:
+ // 1. requestedOffset < logBuffer.offset: Data was here, now flushed
+ // 2. requestedOffset == logBuffer.offset && bufferStartOffset > 0: Buffer advanced, data flushed
+ // 3. requestedOffset == logBuffer.offset && bufferStartOffset == 0: Initial state - try disk first!
+ //
+ // Cases 1 & 2: try disk read
+ // Case 3: try disk read (historical data might exist)
+ if requestedOffset < logBuffer.offset {
+ // Data was in the buffer range but buffer is now empty = flushed to disk
+ if strings.Contains(logBuffer.name, "_schemas") {
+ glog.Infof("[SCHEMAS ReadFromBuffer] Returning ResumeFromDiskError: empty buffer, offset %d was flushed (bufferStart=%d, offset=%d)",
+ requestedOffset, logBuffer.bufferStartOffset, logBuffer.offset)
+ }
+ return nil, -2, ResumeFromDiskError
+ }
+ // requestedOffset == logBuffer.offset: Current position
+ // CRITICAL: For subscribers starting from offset 0, try disk read first
+ // (historical data might exist from previous runs)
+ if requestedOffset == 0 && logBuffer.bufferStartOffset == 0 && logBuffer.offset == 0 {
+ // Initial state: try disk read before waiting for new data
+ if strings.Contains(logBuffer.name, "_schemas") {
+ glog.Infof("[SCHEMAS ReadFromBuffer] Initial state, trying disk read for offset 0")
+ }
+ return nil, -2, ResumeFromDiskError
+ }
+ // Otherwise, wait for new data to arrive
+ if strings.Contains(logBuffer.name, "_schemas") {
+ glog.Infof("[SCHEMAS ReadFromBuffer] Returning nil: waiting for offset %d to arrive", requestedOffset)
+ }
+ return nil, logBuffer.offset, nil
+ }
+ if strings.Contains(logBuffer.name, "_schemas") {
+ glog.Infof("[SCHEMAS ReadFromBuffer] Returning %d bytes from buffer", logBuffer.pos)
+ }
+ return copiedBytes(logBuffer.buf[:logBuffer.pos]), logBuffer.offset, nil
+ }
+
+ // Check previous buffers for the requested offset
+ for _, buf := range logBuffer.prevBuffers.buffers {
+ if requestedOffset >= buf.startOffset && requestedOffset <= buf.offset {
+ // If prevBuffer is empty, it means the data was flushed to disk
+ // (prevBuffers are created when buffer is flushed)
+ if buf.size == 0 {
+ // Empty prevBuffer covering this offset means data was flushed
+ return nil, -2, ResumeFromDiskError
+ }
+ return copiedBytes(buf.buf[:buf.size]), buf.offset, nil
+ }
+ }
+
+ // Offset not found in any buffer
+ if requestedOffset < logBuffer.bufferStartOffset {
+ // Data not in current buffers - must be on disk (flushed or never existed)
+ // Return ResumeFromDiskError to trigger disk read
+ return nil, -2, ResumeFromDiskError
+ }
+
+ if requestedOffset > logBuffer.offset {
+ // Future data, not available yet
+ return nil, logBuffer.offset, nil
+ }
+
+ // Offset not found - return nil
+ return nil, logBuffer.offset, nil
+ }
+
+ // TIMESTAMP-BASED READ (original logic)
// Read from disk and memory
// 1. read from disk, last time is = td
// 2. in memory, the earliest time = tm
@@ -254,52 +664,55 @@ func (logBuffer *LogBuffer) ReadFromBuffer(lastReadPosition MessagePosition) (bu
var tsBatchIndex int64
if !logBuffer.startTime.IsZero() {
tsMemory = logBuffer.startTime
- tsBatchIndex = logBuffer.batchIndex
+ tsBatchIndex = logBuffer.offset
}
for _, prevBuf := range logBuffer.prevBuffers.buffers {
if !prevBuf.startTime.IsZero() && prevBuf.startTime.Before(tsMemory) {
tsMemory = prevBuf.startTime
- tsBatchIndex = prevBuf.batchIndex
+ tsBatchIndex = prevBuf.offset
}
}
if tsMemory.IsZero() { // case 2.2
- // println("2.2 no data")
return nil, -2, nil
- } else if lastReadPosition.Before(tsMemory) && lastReadPosition.BatchIndex+1 < tsBatchIndex { // case 2.3
- if !logBuffer.lastFlushDataTime.IsZero() {
- glog.V(0).Infof("resume with last flush time: %v", logBuffer.lastFlushDataTime)
+ } else if lastReadPosition.Time.Before(tsMemory) && lastReadPosition.Offset+1 < tsBatchIndex { // case 2.3
+ // Special case: If requested time is zero (Unix epoch), treat as "start from beginning"
+ // This handles queries that want to read all data without knowing the exact start time
+ if lastReadPosition.Time.IsZero() || lastReadPosition.Time.Unix() == 0 {
+ // Start from the beginning of memory
+ // Fall through to case 2.1 to read from earliest buffer
+ } else {
+ // Data not in memory buffers - read from disk
+ glog.V(0).Infof("resume from disk: requested time %v < earliest memory time %v",
+ lastReadPosition.Time, tsMemory)
return nil, -2, ResumeFromDiskError
}
}
// the following is case 2.1
- if lastReadPosition.Equal(logBuffer.stopTime) {
- return nil, logBuffer.batchIndex, nil
+ if lastReadPosition.Time.Equal(logBuffer.stopTime) {
+ return nil, logBuffer.offset, nil
}
- if lastReadPosition.After(logBuffer.stopTime) {
+ if lastReadPosition.Time.After(logBuffer.stopTime) {
// glog.Fatalf("unexpected last read time %v, older than latest %v", lastReadPosition, m.stopTime)
- return nil, logBuffer.batchIndex, nil
+ return nil, logBuffer.offset, nil
}
- if lastReadPosition.Before(logBuffer.startTime) {
- // println("checking ", lastReadPosition.UnixNano())
+ if lastReadPosition.Time.Before(logBuffer.startTime) {
for _, buf := range logBuffer.prevBuffers.buffers {
if buf.startTime.After(lastReadPosition.Time) {
// glog.V(4).Infof("%s return the %d sealed buffer %v", m.name, i, buf.startTime)
- // println("return the", i, "th in memory", buf.startTime.UnixNano())
- return copiedBytes(buf.buf[:buf.size]), buf.batchIndex, nil
+ return copiedBytes(buf.buf[:buf.size]), buf.offset, nil
}
if !buf.startTime.After(lastReadPosition.Time) && buf.stopTime.After(lastReadPosition.Time) {
pos := buf.locateByTs(lastReadPosition.Time)
- // fmt.Printf("locate buffer[%d] pos %d\n", i, pos)
- return copiedBytes(buf.buf[pos:buf.size]), buf.batchIndex, nil
+ return copiedBytes(buf.buf[pos:buf.size]), buf.offset, nil
}
}
// glog.V(4).Infof("%s return the current buf %v", m.name, lastReadPosition)
- return copiedBytes(logBuffer.buf[:logBuffer.pos]), logBuffer.batchIndex, nil
+ return copiedBytes(logBuffer.buf[:logBuffer.pos]), logBuffer.offset, nil
}
- lastTs := lastReadPosition.UnixNano()
+ lastTs := lastReadPosition.Time.UnixNano()
l, h := 0, len(logBuffer.idx)-1
/*
@@ -311,9 +724,7 @@ func (logBuffer *LogBuffer) ReadFromBuffer(lastReadPosition MessagePosition) (bu
if entry == nil {
entry = event.EventNotification.NewEntry
}
- fmt.Printf("entry %d ts: %v offset:%d dir:%s name:%s\n", i, time.Unix(0, ts), pos, event.Directory, entry.Name)
}
- fmt.Printf("l=%d, h=%d\n", l, h)
*/
for l <= h {
@@ -328,16 +739,14 @@ func (logBuffer *LogBuffer) ReadFromBuffer(lastReadPosition MessagePosition) (bu
_, prevT = readTs(logBuffer.buf, logBuffer.idx[mid-1])
}
if prevT <= lastTs {
- // fmt.Printf("found l=%d, m-1=%d(ts=%d), m=%d(ts=%d), h=%d [%d, %d) \n", l, mid-1, prevT, mid, t, h, pos, m.pos)
- return copiedBytes(logBuffer.buf[pos:logBuffer.pos]), logBuffer.batchIndex, nil
+ return copiedBytes(logBuffer.buf[pos:logBuffer.pos]), logBuffer.offset, nil
}
h = mid
}
- // fmt.Printf("l=%d, h=%d\n", l, h)
}
- // FIXME: this could be that the buffer has been flushed already
- println("Not sure why no data", lastReadPosition.BatchIndex, tsBatchIndex)
+ // Binary search didn't find the timestamp - data may have been flushed to disk already
+ // Returning -2 signals to caller that data is not available in memory
return nil, -2, nil
}
@@ -352,11 +761,11 @@ func (logBuffer *LogBuffer) GetName() string {
return logBuffer.name
}
-// GetBatchIndex returns the current batch index for metadata tracking
-func (logBuffer *LogBuffer) GetBatchIndex() int64 {
+// GetOffset returns the current offset for metadata tracking
+func (logBuffer *LogBuffer) GetOffset() int64 {
logBuffer.RLock()
defer logBuffer.RUnlock()
- return logBuffer.batchIndex
+ return logBuffer.offset
}
var bufferPool = sync.Pool{
diff --git a/weed/util/log_buffer/log_buffer_queryability_test.go b/weed/util/log_buffer/log_buffer_queryability_test.go
new file mode 100644
index 000000000..6e372d2b3
--- /dev/null
+++ b/weed/util/log_buffer/log_buffer_queryability_test.go
@@ -0,0 +1,238 @@
+package log_buffer
+
+import (
+ "bytes"
+ "testing"
+ "time"
+
+ "github.com/seaweedfs/seaweedfs/weed/pb/filer_pb"
+ "github.com/seaweedfs/seaweedfs/weed/util"
+ "google.golang.org/protobuf/proto"
+)
+
+// TestBufferQueryability tests that data written to the buffer can be immediately queried
+func TestBufferQueryability(t *testing.T) {
+ // Create a log buffer with a long flush interval to prevent premature flushing
+ logBuffer := NewLogBuffer("test-buffer", 10*time.Minute,
+ func(logBuffer *LogBuffer, startTime, stopTime time.Time, buf []byte, minOffset, maxOffset int64) {
+ // Mock flush function - do nothing to keep data in memory
+ },
+ func(startPosition MessagePosition, stopTsNs int64, eachLogEntryFn EachLogEntryFuncType) (MessagePosition, bool, error) {
+ // Mock read from disk function
+ return startPosition, false, nil
+ },
+ func() {
+ // Mock notify function
+ })
+
+ // Test data similar to schema registry messages
+ testKey := []byte(`{"keytype":"SCHEMA","subject":"test-topic-value","version":1,"magic":1}`)
+ testValue := []byte(`{"subject":"test-topic-value","version":1,"id":1,"schemaType":"AVRO","schema":"\"string\"","deleted":false}`)
+
+ // Create a LogEntry with offset (simulating the schema registry scenario)
+ logEntry := &filer_pb.LogEntry{
+ TsNs: time.Now().UnixNano(),
+ PartitionKeyHash: 12345,
+ Data: testValue,
+ Key: testKey,
+ Offset: 1,
+ }
+
+ // Add the entry to the buffer
+ logBuffer.AddLogEntryToBuffer(logEntry)
+
+ // Verify the buffer has data
+ if logBuffer.pos == 0 {
+ t.Fatal("Buffer should have data after adding entry")
+ }
+
+ // Test immediate queryability - read from buffer starting from beginning
+ startPosition := NewMessagePosition(0, 0) // Start from beginning
+ bufferCopy, batchIndex, err := logBuffer.ReadFromBuffer(startPosition)
+
+ if err != nil {
+ t.Fatalf("ReadFromBuffer failed: %v", err)
+ }
+
+ if bufferCopy == nil {
+ t.Fatal("ReadFromBuffer returned nil buffer - data should be queryable immediately")
+ }
+
+ if batchIndex != 1 {
+ t.Errorf("Expected batchIndex=1, got %d", batchIndex)
+ }
+
+ // Verify we can read the data back
+ buf := bufferCopy.Bytes()
+ if len(buf) == 0 {
+ t.Fatal("Buffer copy is empty")
+ }
+
+ // Parse the first entry from the buffer
+ if len(buf) < 4 {
+ t.Fatal("Buffer too small to contain entry size")
+ }
+
+ size := util.BytesToUint32(buf[0:4])
+ if len(buf) < 4+int(size) {
+ t.Fatalf("Buffer too small to contain entry data: need %d, have %d", 4+int(size), len(buf))
+ }
+
+ entryData := buf[4 : 4+int(size)]
+
+ // Unmarshal and verify the entry
+ retrievedEntry := &filer_pb.LogEntry{}
+ if err := proto.Unmarshal(entryData, retrievedEntry); err != nil {
+ t.Fatalf("Failed to unmarshal retrieved entry: %v", err)
+ }
+
+ // Verify the data matches
+ if !bytes.Equal(retrievedEntry.Key, testKey) {
+ t.Errorf("Key mismatch: expected %s, got %s", string(testKey), string(retrievedEntry.Key))
+ }
+
+ if !bytes.Equal(retrievedEntry.Data, testValue) {
+ t.Errorf("Value mismatch: expected %s, got %s", string(testValue), string(retrievedEntry.Data))
+ }
+
+ if retrievedEntry.Offset != 1 {
+ t.Errorf("Offset mismatch: expected 1, got %d", retrievedEntry.Offset)
+ }
+
+ t.Logf("Buffer queryability test passed - data is immediately readable")
+}
+
+// TestMultipleEntriesQueryability tests querying multiple entries from buffer
+func TestMultipleEntriesQueryability(t *testing.T) {
+ logBuffer := NewLogBuffer("test-multi-buffer", 10*time.Minute,
+ func(logBuffer *LogBuffer, startTime, stopTime time.Time, buf []byte, minOffset, maxOffset int64) {
+ // Mock flush function
+ },
+ func(startPosition MessagePosition, stopTsNs int64, eachLogEntryFn EachLogEntryFuncType) (MessagePosition, bool, error) {
+ return startPosition, false, nil
+ },
+ func() {})
+
+ // Add multiple entries
+ for i := 1; i <= 3; i++ {
+ logEntry := &filer_pb.LogEntry{
+ TsNs: time.Now().UnixNano() + int64(i*1000), // Ensure different timestamps
+ PartitionKeyHash: int32(i),
+ Data: []byte("test-data-" + string(rune('0'+i))),
+ Key: []byte("test-key-" + string(rune('0'+i))),
+ Offset: int64(i),
+ }
+ logBuffer.AddLogEntryToBuffer(logEntry)
+ }
+
+ // Read all entries
+ startPosition := NewMessagePosition(0, 0)
+ bufferCopy, batchIndex, err := logBuffer.ReadFromBuffer(startPosition)
+
+ if err != nil {
+ t.Fatalf("ReadFromBuffer failed: %v", err)
+ }
+
+ if bufferCopy == nil {
+ t.Fatal("ReadFromBuffer returned nil buffer")
+ }
+
+ if batchIndex != 3 {
+ t.Errorf("Expected batchIndex=3, got %d", batchIndex)
+ }
+
+ // Count entries in buffer
+ buf := bufferCopy.Bytes()
+ entryCount := 0
+ pos := 0
+
+ for pos+4 < len(buf) {
+ size := util.BytesToUint32(buf[pos : pos+4])
+ if pos+4+int(size) > len(buf) {
+ break
+ }
+
+ entryData := buf[pos+4 : pos+4+int(size)]
+ entry := &filer_pb.LogEntry{}
+ if err := proto.Unmarshal(entryData, entry); err != nil {
+ t.Fatalf("Failed to unmarshal entry %d: %v", entryCount+1, err)
+ }
+
+ entryCount++
+ pos += 4 + int(size)
+
+ t.Logf("Entry %d: Key=%s, Data=%s, Offset=%d", entryCount, string(entry.Key), string(entry.Data), entry.Offset)
+ }
+
+ if entryCount != 3 {
+ t.Errorf("Expected 3 entries, found %d", entryCount)
+ }
+
+ t.Logf("Multiple entries queryability test passed - found %d entries", entryCount)
+}
+
+// TestSchemaRegistryScenario tests the specific scenario that was failing
+func TestSchemaRegistryScenario(t *testing.T) {
+ logBuffer := NewLogBuffer("_schemas", 10*time.Minute,
+ func(logBuffer *LogBuffer, startTime, stopTime time.Time, buf []byte, minOffset, maxOffset int64) {
+ // Mock flush function - simulate what happens in real scenario
+ t.Logf("FLUSH: startTime=%v, stopTime=%v, bufSize=%d, minOffset=%d, maxOffset=%d",
+ startTime, stopTime, len(buf), minOffset, maxOffset)
+ },
+ func(startPosition MessagePosition, stopTsNs int64, eachLogEntryFn EachLogEntryFuncType) (MessagePosition, bool, error) {
+ return startPosition, false, nil
+ },
+ func() {})
+
+ // Simulate schema registry message
+ schemaKey := []byte(`{"keytype":"SCHEMA","subject":"test-schema-value","version":1,"magic":1}`)
+ schemaValue := []byte(`{"subject":"test-schema-value","version":1,"id":12,"schemaType":"AVRO","schema":"\"string\"","deleted":false}`)
+
+ logEntry := &filer_pb.LogEntry{
+ TsNs: time.Now().UnixNano(),
+ PartitionKeyHash: 12345,
+ Data: schemaValue,
+ Key: schemaKey,
+ Offset: 0, // First message
+ }
+
+ // Add to buffer
+ logBuffer.AddLogEntryToBuffer(logEntry)
+
+ // Simulate the SQL query scenario - read from offset 0
+ startPosition := NewMessagePosition(0, 0)
+ bufferCopy, _, err := logBuffer.ReadFromBuffer(startPosition)
+
+ if err != nil {
+ t.Fatalf("Schema registry scenario failed: %v", err)
+ }
+
+ if bufferCopy == nil {
+ t.Fatal("Schema registry scenario: ReadFromBuffer returned nil - this is the bug!")
+ }
+
+ // Verify schema data is readable
+ buf := bufferCopy.Bytes()
+ if len(buf) < 4 {
+ t.Fatal("Buffer too small")
+ }
+
+ size := util.BytesToUint32(buf[0:4])
+ entryData := buf[4 : 4+int(size)]
+
+ retrievedEntry := &filer_pb.LogEntry{}
+ if err := proto.Unmarshal(entryData, retrievedEntry); err != nil {
+ t.Fatalf("Failed to unmarshal schema entry: %v", err)
+ }
+
+ // Verify schema value is preserved
+ if !bytes.Equal(retrievedEntry.Data, schemaValue) {
+ t.Errorf("Schema value lost! Expected: %s, Got: %s", string(schemaValue), string(retrievedEntry.Data))
+ }
+
+ if len(retrievedEntry.Data) != len(schemaValue) {
+ t.Errorf("Schema value length mismatch! Expected: %d, Got: %d", len(schemaValue), len(retrievedEntry.Data))
+ }
+
+ t.Logf("Schema registry scenario test passed - schema value preserved: %d bytes", len(retrievedEntry.Data))
+}
diff --git a/weed/util/log_buffer/log_buffer_test.go b/weed/util/log_buffer/log_buffer_test.go
index a4947a611..7b851de06 100644
--- a/weed/util/log_buffer/log_buffer_test.go
+++ b/weed/util/log_buffer/log_buffer_test.go
@@ -3,18 +3,19 @@ package log_buffer
import (
"crypto/rand"
"fmt"
- "github.com/seaweedfs/seaweedfs/weed/pb/mq_pb"
"io"
"sync"
"testing"
"time"
+ "github.com/seaweedfs/seaweedfs/weed/pb/mq_pb"
+
"github.com/seaweedfs/seaweedfs/weed/pb/filer_pb"
)
func TestNewLogBufferFirstBuffer(t *testing.T) {
flushInterval := time.Second
- lb := NewLogBuffer("test", flushInterval, func(logBuffer *LogBuffer, startTime time.Time, stopTime time.Time, buf []byte) {
+ lb := NewLogBuffer("test", flushInterval, func(logBuffer *LogBuffer, startTime time.Time, stopTime time.Time, buf []byte, minOffset, maxOffset int64) {
fmt.Printf("flush from %v to %v %d bytes\n", startTime, stopTime, len(buf))
}, nil, func() {
})
@@ -63,3 +64,483 @@ func TestNewLogBufferFirstBuffer(t *testing.T) {
t.Errorf("expect %d messages, but got %d", messageCount, receivedMessageCount)
}
}
+
+// TestReadFromBuffer_OldOffsetReturnsResumeFromDiskError tests that requesting an old offset
+// that has been flushed to disk properly returns ResumeFromDiskError instead of hanging forever.
+// This reproduces the bug where Schema Registry couldn't read the _schemas topic.
+func TestReadFromBuffer_OldOffsetReturnsResumeFromDiskError(t *testing.T) {
+ tests := []struct {
+ name string
+ bufferStartOffset int64
+ currentOffset int64
+ requestedOffset int64
+ hasData bool
+ expectError error
+ description string
+ }{
+ {
+ name: "Request offset 0 when buffer starts at 4 (Schema Registry bug scenario)",
+ bufferStartOffset: 4,
+ currentOffset: 10,
+ requestedOffset: 0,
+ hasData: true,
+ expectError: ResumeFromDiskError,
+ description: "When Schema Registry tries to read from offset 0, but data has been flushed to disk",
+ },
+ {
+ name: "Request offset before buffer start with empty buffer",
+ bufferStartOffset: 10,
+ currentOffset: 10,
+ requestedOffset: 5,
+ hasData: false,
+ expectError: ResumeFromDiskError,
+ description: "Old offset with no data in memory should trigger disk read",
+ },
+ {
+ name: "Request offset before buffer start with data",
+ bufferStartOffset: 100,
+ currentOffset: 150,
+ requestedOffset: 50,
+ hasData: true,
+ expectError: ResumeFromDiskError,
+ description: "Old offset with current data in memory should still trigger disk read",
+ },
+ {
+ name: "Request current offset (no disk read needed)",
+ bufferStartOffset: 4,
+ currentOffset: 10,
+ requestedOffset: 10,
+ hasData: true,
+ expectError: nil,
+ description: "Current offset should return data from memory without error",
+ },
+ {
+ name: "Request offset within buffer range",
+ bufferStartOffset: 4,
+ currentOffset: 10,
+ requestedOffset: 7,
+ hasData: true,
+ expectError: nil,
+ description: "Offset within buffer range should return data without error",
+ },
+ }
+
+ for _, tt := range tests {
+ t.Run(tt.name, func(t *testing.T) {
+ // Create a LogBuffer with minimal configuration
+ lb := NewLogBuffer("test", time.Hour, nil, nil, func() {})
+
+ // Simulate data that has been flushed to disk by setting bufferStartOffset
+ lb.bufferStartOffset = tt.bufferStartOffset
+ lb.offset = tt.currentOffset
+
+ // CRITICAL: Mark this as an offset-based buffer
+ lb.hasOffsets = true
+
+ // Add some data to the buffer if needed (at current offset position)
+ if tt.hasData {
+ testData := []byte("test message")
+ // Use AddLogEntryToBuffer to preserve offset information
+ lb.AddLogEntryToBuffer(&filer_pb.LogEntry{
+ TsNs: time.Now().UnixNano(),
+ Key: []byte("key"),
+ Data: testData,
+ Offset: tt.currentOffset, // Add data at current offset
+ })
+ }
+
+ // Create an offset-based position for the requested offset
+ requestPosition := NewMessagePositionFromOffset(tt.requestedOffset)
+
+ // Try to read from the buffer
+ buf, batchIdx, err := lb.ReadFromBuffer(requestPosition)
+
+ // Verify the error matches expectations
+ if tt.expectError != nil {
+ if err != tt.expectError {
+ t.Errorf("%s\nExpected error: %v\nGot error: %v\nbuf=%v, batchIdx=%d",
+ tt.description, tt.expectError, err, buf != nil, batchIdx)
+ } else {
+ t.Logf("✓ %s: correctly returned %v", tt.description, err)
+ }
+ } else {
+ if err != nil {
+ t.Errorf("%s\nExpected no error but got: %v\nbuf=%v, batchIdx=%d",
+ tt.description, err, buf != nil, batchIdx)
+ } else {
+ t.Logf("✓ %s: correctly returned data without error", tt.description)
+ }
+ }
+ })
+ }
+}
+
+// TestReadFromBuffer_OldOffsetWithNoPrevBuffers specifically tests the bug fix
+// where requesting an old offset would return nil instead of ResumeFromDiskError
+func TestReadFromBuffer_OldOffsetWithNoPrevBuffers(t *testing.T) {
+ // This is the exact scenario that caused the Schema Registry to hang:
+ // 1. Data was published to _schemas topic (offsets 0, 1, 2, 3)
+ // 2. Data was flushed to disk
+ // 3. LogBuffer's bufferStartOffset was updated to 4
+ // 4. Schema Registry tried to read from offset 0
+ // 5. ReadFromBuffer would return (nil, offset, nil) instead of ResumeFromDiskError
+ // 6. The subscriber would wait forever for data that would never come from memory
+
+ lb := NewLogBuffer("_schemas", time.Hour, nil, nil, func() {})
+
+ // Simulate the state after data has been flushed to disk:
+ // - bufferStartOffset = 10 (data 0-9 has been flushed)
+ // - offset = 15 (next offset to assign, current buffer has 10-14)
+ // - pos = 100 (some data in current buffer)
+ // Set prevBuffers to have non-overlapping ranges to avoid the safety check at line 420-428
+ lb.bufferStartOffset = 10
+ lb.offset = 15
+ lb.pos = 100
+
+ // Modify prevBuffers to have non-zero offset ranges that DON'T include the requested offset
+ // This bypasses the safety check and exposes the real bug
+ for i := range lb.prevBuffers.buffers {
+ lb.prevBuffers.buffers[i].startOffset = 20 + int64(i)*10 // 20, 30, 40, etc.
+ lb.prevBuffers.buffers[i].offset = 25 + int64(i)*10 // 25, 35, 45, etc.
+ lb.prevBuffers.buffers[i].size = 0 // Empty (flushed)
+ }
+
+ // Schema Registry requests offset 5 (which is before bufferStartOffset=10)
+ requestPosition := NewMessagePositionFromOffset(5)
+
+ // Before the fix, this would return (nil, offset, nil) causing an infinite wait
+ // After the fix, this should return ResumeFromDiskError
+ buf, batchIdx, err := lb.ReadFromBuffer(requestPosition)
+
+ t.Logf("DEBUG: ReadFromBuffer returned: buf=%v, batchIdx=%d, err=%v", buf != nil, batchIdx, err)
+ t.Logf("DEBUG: Buffer state: bufferStartOffset=%d, offset=%d, pos=%d",
+ lb.bufferStartOffset, lb.offset, lb.pos)
+ t.Logf("DEBUG: Requested offset 5, prevBuffers[0] range: [%d-%d]",
+ lb.prevBuffers.buffers[0].startOffset, lb.prevBuffers.buffers[0].offset)
+
+ if err != ResumeFromDiskError {
+ t.Errorf("CRITICAL BUG REPRODUCED: Expected ResumeFromDiskError but got err=%v, buf=%v, batchIdx=%d\n"+
+ "This causes Schema Registry to hang indefinitely waiting for data that's on disk!",
+ err, buf != nil, batchIdx)
+ t.Errorf("The buggy code falls through without returning ResumeFromDiskError!")
+ } else {
+ t.Logf("✓ BUG FIX VERIFIED: Correctly returns ResumeFromDiskError when requesting old offset 5")
+ t.Logf(" This allows the subscriber to read from disk instead of waiting forever")
+ }
+}
+
+// TestReadFromBuffer_EmptyBufferAtCurrentOffset tests Bug #2
+// where an empty buffer at the current offset would return empty data instead of ResumeFromDiskError
+func TestReadFromBuffer_EmptyBufferAtCurrentOffset(t *testing.T) {
+ lb := NewLogBuffer("_schemas", time.Hour, nil, nil, func() {})
+
+ // Simulate buffer state where data 0-3 was published and flushed, but buffer NOT advanced yet:
+ // - bufferStartOffset = 0 (buffer hasn't been advanced after flush)
+ // - offset = 4 (next offset to assign - data 0-3 exists)
+ // - pos = 0 (buffer is empty after flush)
+ // This happens in the window between flush and buffer advancement
+ lb.bufferStartOffset = 0
+ lb.offset = 4
+ lb.pos = 0
+
+ // Schema Registry requests offset 0 (which appears to be in range [0, 4])
+ requestPosition := NewMessagePositionFromOffset(0)
+
+ // BUG: Without fix, this returns empty buffer instead of checking disk
+ // FIX: Should return ResumeFromDiskError because buffer is empty (pos=0) despite valid range
+ buf, batchIdx, err := lb.ReadFromBuffer(requestPosition)
+
+ t.Logf("DEBUG: ReadFromBuffer returned: buf=%v, batchIdx=%d, err=%v", buf != nil, batchIdx, err)
+ t.Logf("DEBUG: Buffer state: bufferStartOffset=%d, offset=%d, pos=%d",
+ lb.bufferStartOffset, lb.offset, lb.pos)
+
+ if err != ResumeFromDiskError {
+ if buf == nil || len(buf.Bytes()) == 0 {
+ t.Errorf("CRITICAL BUG #2 REPRODUCED: Empty buffer should return ResumeFromDiskError, got err=%v, buf=%v\n"+
+ "Without the fix, Schema Registry gets empty data instead of reading from disk!",
+ err, buf != nil)
+ }
+ } else {
+ t.Logf("✓ BUG #2 FIX VERIFIED: Empty buffer correctly returns ResumeFromDiskError to check disk")
+ }
+}
+
+// TestReadFromBuffer_OffsetRanges tests various offset range scenarios
+func TestReadFromBuffer_OffsetRanges(t *testing.T) {
+ lb := NewLogBuffer("test", time.Hour, nil, nil, func() {})
+
+ // Setup: buffer contains offsets 10-20
+ lb.bufferStartOffset = 10
+ lb.offset = 20
+ lb.pos = 100 // some data in buffer
+
+ testCases := []struct {
+ name string
+ requestedOffset int64
+ expectedError error
+ description string
+ }{
+ {
+ name: "Before buffer start",
+ requestedOffset: 5,
+ expectedError: ResumeFromDiskError,
+ description: "Offset 5 < bufferStartOffset 10 → read from disk",
+ },
+ {
+ name: "At buffer start",
+ requestedOffset: 10,
+ expectedError: nil,
+ description: "Offset 10 == bufferStartOffset 10 → read from buffer",
+ },
+ {
+ name: "Within buffer range",
+ requestedOffset: 15,
+ expectedError: nil,
+ description: "Offset 15 is within [10, 20] → read from buffer",
+ },
+ {
+ name: "At buffer end",
+ requestedOffset: 20,
+ expectedError: nil,
+ description: "Offset 20 == offset 20 → read from buffer",
+ },
+ {
+ name: "After buffer end",
+ requestedOffset: 25,
+ expectedError: nil,
+ description: "Offset 25 > offset 20 → future data, return nil without error",
+ },
+ }
+
+ for _, tc := range testCases {
+ t.Run(tc.name, func(t *testing.T) {
+ requestPosition := NewMessagePositionFromOffset(tc.requestedOffset)
+ _, _, err := lb.ReadFromBuffer(requestPosition)
+
+ if tc.expectedError != nil {
+ if err != tc.expectedError {
+ t.Errorf("%s\nExpected error: %v, got: %v", tc.description, tc.expectedError, err)
+ } else {
+ t.Logf("✓ %s", tc.description)
+ }
+ } else {
+ // For nil expectedError, we accept either nil or no error condition
+ // (future offsets return nil without error)
+ if err != nil && err != ResumeFromDiskError {
+ t.Errorf("%s\nExpected no ResumeFromDiskError, got: %v", tc.description, err)
+ } else {
+ t.Logf("✓ %s", tc.description)
+ }
+ }
+ })
+ }
+}
+
+// TestReadFromBuffer_InitializedFromDisk tests Bug #3
+// where bufferStartOffset was incorrectly set to 0 after InitializeOffsetFromExistingData,
+// causing reads for old offsets to return new data instead of triggering a disk read.
+func TestReadFromBuffer_InitializedFromDisk(t *testing.T) {
+ // This reproduces the real Schema Registry bug scenario:
+ // 1. Broker restarts, finds 4 messages on disk (offsets 0-3)
+ // 2. InitializeOffsetFromExistingData sets offset=4
+ // - BUG: bufferStartOffset=0 (wrong!)
+ // - FIX: bufferStartOffset=4 (correct!)
+ // 3. First new message is written (offset 4)
+ // 4. Schema Registry reads offset 0
+ // 5. With FIX: requestedOffset=0 < bufferStartOffset=4 → ResumeFromDiskError (correct!)
+ // 6. Without FIX: requestedOffset=0 in range [0, 5] → returns wrong data (bug!)
+
+ lb := NewLogBuffer("_schemas", time.Hour, nil, nil, func() {})
+
+ // Use the actual InitializeOffsetFromExistingData to test the fix
+ err := lb.InitializeOffsetFromExistingData(func() (int64, error) {
+ return 3, nil // Simulate 4 messages on disk (offsets 0-3, highest=3)
+ })
+ if err != nil {
+ t.Fatalf("InitializeOffsetFromExistingData failed: %v", err)
+ }
+
+ t.Logf("After InitializeOffsetFromExistingData(highestOffset=3):")
+ t.Logf(" offset=%d (should be 4), bufferStartOffset=%d (FIX: should be 4, not 0)",
+ lb.offset, lb.bufferStartOffset)
+
+ // Now write a new message at offset 4
+ lb.AddToBuffer(&mq_pb.DataMessage{
+ Key: []byte("new-key"),
+ Value: []byte("new-message-at-offset-4"),
+ TsNs: time.Now().UnixNano(),
+ })
+ // After AddToBuffer: offset=5, pos>0
+
+ // Schema Registry tries to read offset 0 (should be on disk)
+ requestPosition := NewMessagePositionFromOffset(0)
+
+ buf, batchIdx, err := lb.ReadFromBuffer(requestPosition)
+
+ t.Logf("After writing new message:")
+ t.Logf(" bufferStartOffset=%d, offset=%d, pos=%d", lb.bufferStartOffset, lb.offset, lb.pos)
+ t.Logf(" Requested offset 0, got: buf=%v, batchIdx=%d, err=%v", buf != nil, batchIdx, err)
+
+ // EXPECTED BEHAVIOR (with fix):
+ // bufferStartOffset=4 after initialization, so requestedOffset=0 < bufferStartOffset=4
+ // → returns ResumeFromDiskError
+
+ // BUGGY BEHAVIOR (without fix):
+ // bufferStartOffset=0 after initialization, so requestedOffset=0 is in range [0, 5]
+ // → returns the NEW message (offset 4) instead of reading from disk!
+
+ if err != ResumeFromDiskError {
+ t.Errorf("CRITICAL BUG #3 REPRODUCED: Reading offset 0 after initialization from disk should return ResumeFromDiskError\n"+
+ "Instead got: err=%v, buf=%v, batchIdx=%d\n"+
+ "This means Schema Registry would receive WRONG data (offset 4) when requesting offset 0!",
+ err, buf != nil, batchIdx)
+ t.Errorf("Root cause: bufferStartOffset=%d should be 4 after InitializeOffsetFromExistingData(highestOffset=3)",
+ lb.bufferStartOffset)
+ } else {
+ t.Logf("✓ BUG #3 FIX VERIFIED: Reading old offset 0 correctly returns ResumeFromDiskError")
+ t.Logf(" This ensures Schema Registry reads correct data from disk instead of getting new messages")
+ }
+}
+
+// TestLoopProcessLogDataWithOffset_DiskReadRetry tests that when a subscriber
+// reads from disk before flush completes, it continues to retry disk reads
+// and eventually finds the data after flush completes.
+// This reproduces the Schema Registry timeout issue on first start.
+func TestLoopProcessLogDataWithOffset_DiskReadRetry(t *testing.T) {
+ diskReadCallCount := 0
+ diskReadMu := sync.Mutex{}
+ dataFlushedToDisk := false
+ var flushedData []*filer_pb.LogEntry
+
+ // Create a readFromDiskFn that simulates the race condition
+ readFromDiskFn := func(startPosition MessagePosition, stopTsNs int64, eachLogEntryFn EachLogEntryFuncType) (MessagePosition, bool, error) {
+ diskReadMu.Lock()
+ diskReadCallCount++
+ callNum := diskReadCallCount
+ hasData := dataFlushedToDisk
+ diskReadMu.Unlock()
+
+ t.Logf("DISK READ #%d: startOffset=%d, dataFlushedToDisk=%v", callNum, startPosition.Offset, hasData)
+
+ if !hasData {
+ // Simulate: data not yet on disk (flush hasn't completed)
+ t.Logf(" → No data found (flush not completed yet)")
+ return startPosition, false, nil
+ }
+
+ // Data is now on disk, process it
+ t.Logf(" → Found %d entries on disk", len(flushedData))
+ for _, entry := range flushedData {
+ if entry.Offset >= startPosition.Offset {
+ isDone, err := eachLogEntryFn(entry)
+ if err != nil || isDone {
+ return NewMessagePositionFromOffset(entry.Offset + 1), isDone, err
+ }
+ }
+ }
+ return NewMessagePositionFromOffset(int64(len(flushedData))), false, nil
+ }
+
+ flushFn := func(logBuffer *LogBuffer, startTime, stopTime time.Time, buf []byte, minOffset, maxOffset int64) {
+ t.Logf("FLUSH: minOffset=%d maxOffset=%d size=%d bytes", minOffset, maxOffset, len(buf))
+ // Simulate writing to disk
+ diskReadMu.Lock()
+ dataFlushedToDisk = true
+ // Parse the buffer and add entries to flushedData
+ // For this test, we'll just create mock entries
+ flushedData = append(flushedData, &filer_pb.LogEntry{
+ Key: []byte("key-0"),
+ Data: []byte("message-0"),
+ TsNs: time.Now().UnixNano(),
+ Offset: 0,
+ })
+ diskReadMu.Unlock()
+ }
+
+ logBuffer := NewLogBuffer("test", 1*time.Minute, flushFn, readFromDiskFn, nil)
+ defer logBuffer.ShutdownLogBuffer()
+
+ // Simulate the race condition:
+ // 1. Subscriber starts reading from offset 0
+ // 2. Data is not yet flushed
+ // 3. Loop calls readFromDiskFn → no data found
+ // 4. A bit later, data gets flushed
+ // 5. Loop should continue and call readFromDiskFn again
+
+ receivedMessages := 0
+ mu := sync.Mutex{}
+ maxIterations := 50 // Allow up to 50 iterations (500ms with 10ms sleep each)
+ iterationCount := 0
+
+ waitForDataFn := func() bool {
+ mu.Lock()
+ defer mu.Unlock()
+ iterationCount++
+ // Stop after receiving message or max iterations
+ return receivedMessages == 0 && iterationCount < maxIterations
+ }
+
+ eachLogEntryFn := func(logEntry *filer_pb.LogEntry, offset int64) (bool, error) {
+ mu.Lock()
+ receivedMessages++
+ mu.Unlock()
+ t.Logf("✉️ RECEIVED: offset=%d key=%s", offset, string(logEntry.Key))
+ return true, nil // Stop after first message
+ }
+
+ // Start the reader in a goroutine
+ var readerWg sync.WaitGroup
+ readerWg.Add(1)
+ go func() {
+ defer readerWg.Done()
+ startPosition := NewMessagePositionFromOffset(0)
+ _, isDone, err := logBuffer.LoopProcessLogDataWithOffset("test-subscriber", startPosition, 0, waitForDataFn, eachLogEntryFn)
+ t.Logf("📋 Reader finished: isDone=%v, err=%v", isDone, err)
+ }()
+
+ // Wait a bit to let the first disk read happen (returns no data)
+ time.Sleep(50 * time.Millisecond)
+
+ // Now add data and flush it
+ t.Logf("➕ Adding message to buffer...")
+ logBuffer.AddToBuffer(&mq_pb.DataMessage{
+ Key: []byte("key-0"),
+ Value: []byte("message-0"),
+ TsNs: time.Now().UnixNano(),
+ })
+
+ // Force flush
+ t.Logf("Force flushing...")
+ logBuffer.ForceFlush()
+
+ // Wait for reader to finish
+ readerWg.Wait()
+
+ // Check results
+ diskReadMu.Lock()
+ finalDiskReadCount := diskReadCallCount
+ diskReadMu.Unlock()
+
+ mu.Lock()
+ finalReceivedMessages := receivedMessages
+ finalIterations := iterationCount
+ mu.Unlock()
+
+ t.Logf("\nRESULTS:")
+ t.Logf(" Disk reads: %d", finalDiskReadCount)
+ t.Logf(" Received messages: %d", finalReceivedMessages)
+ t.Logf(" Loop iterations: %d", finalIterations)
+
+ if finalDiskReadCount < 2 {
+ t.Errorf("CRITICAL BUG REPRODUCED: Disk read was only called %d time(s)", finalDiskReadCount)
+ t.Errorf("Expected: Multiple disk reads as the loop continues after flush completes")
+ t.Errorf("This is why Schema Registry times out - it reads once before flush, never re-reads after flush")
+ }
+
+ if finalReceivedMessages == 0 {
+ t.Errorf("SCHEMA REGISTRY TIMEOUT REPRODUCED: No messages received even after flush")
+ t.Errorf("The subscriber is stuck because disk reads are not retried")
+ } else {
+ t.Logf("✓ SUCCESS: Message received after %d disk read attempts", finalDiskReadCount)
+ }
+}
diff --git a/weed/util/log_buffer/log_read.go b/weed/util/log_buffer/log_read.go
index 0ebcc7cc9..77f03ddb8 100644
--- a/weed/util/log_buffer/log_read.go
+++ b/weed/util/log_buffer/log_read.go
@@ -18,19 +18,43 @@ var (
)
type MessagePosition struct {
- time.Time // this is the timestamp of the message
- BatchIndex int64 // this is only used when the timestamp is not enough to identify the next message, when the timestamp is in the previous batch.
+ Time time.Time // timestamp of the message
+ Offset int64 // Kafka offset for offset-based positioning, or batch index for timestamp-based
+ IsOffsetBased bool // true if this position is offset-based, false if timestamp-based
}
-func NewMessagePosition(tsNs int64, batchIndex int64) MessagePosition {
+func NewMessagePosition(tsNs int64, offset int64) MessagePosition {
return MessagePosition{
- Time: time.Unix(0, tsNs).UTC(),
- BatchIndex: batchIndex,
+ Time: time.Unix(0, tsNs).UTC(),
+ Offset: offset,
+ IsOffsetBased: false, // timestamp-based by default
}
}
+// NewMessagePositionFromOffset creates a MessagePosition that represents a specific offset
+func NewMessagePositionFromOffset(offset int64) MessagePosition {
+ return MessagePosition{
+ Time: time.Time{}, // Zero time for offset-based positions
+ Offset: offset,
+ IsOffsetBased: true,
+ }
+}
+
+// GetOffset extracts the offset from an offset-based MessagePosition
+func (mp MessagePosition) GetOffset() int64 {
+ if !mp.IsOffsetBased {
+ return -1 // Not an offset-based position
+ }
+ return mp.Offset // Offset is stored directly
+}
+
func (logBuffer *LogBuffer) LoopProcessLogData(readerName string, startPosition MessagePosition, stopTsNs int64,
waitForDataFn func() bool, eachLogDataFn EachLogEntryFuncType) (lastReadPosition MessagePosition, isDone bool, err error) {
+
+ // Register for instant notifications (<1ms latency)
+ notifyChan := logBuffer.RegisterSubscriber(readerName)
+ defer logBuffer.UnregisterSubscriber(readerName)
+
// loop through all messages
var bytesBuf *bytes.Buffer
var batchIndex int64
@@ -57,10 +81,10 @@ func (logBuffer *LogBuffer) LoopProcessLogData(readerName string, startPosition
if bytesBuf != nil {
readSize = bytesBuf.Len()
}
- glog.V(4).Infof("%s ReadFromBuffer at %v batch %d. Read bytes %v batch %d", readerName, lastReadPosition, lastReadPosition.BatchIndex, readSize, batchIndex)
+ glog.V(4).Infof("%s ReadFromBuffer at %v offset %d. Read bytes %v batchIndex %d", readerName, lastReadPosition, lastReadPosition.Offset, readSize, batchIndex)
if bytesBuf == nil {
if batchIndex >= 0 {
- lastReadPosition = NewMessagePosition(lastReadPosition.UnixNano(), batchIndex)
+ lastReadPosition = NewMessagePosition(lastReadPosition.Time.UnixNano(), batchIndex)
}
if stopTsNs != 0 {
isDone = true
@@ -69,12 +93,23 @@ func (logBuffer *LogBuffer) LoopProcessLogData(readerName string, startPosition
lastTsNs := logBuffer.LastTsNs.Load()
for lastTsNs == logBuffer.LastTsNs.Load() {
- if waitForDataFn() {
- continue
- } else {
+ if !waitForDataFn() {
isDone = true
return
}
+ // Wait for notification or timeout (instant wake-up when data arrives)
+ select {
+ case <-notifyChan:
+ // New data available, break and retry read
+ glog.V(3).Infof("%s: Woke up from notification (LoopProcessLogData)", readerName)
+ break
+ case <-time.After(10 * time.Millisecond):
+ // Timeout, check if timestamp changed
+ if lastTsNs != logBuffer.LastTsNs.Load() {
+ break
+ }
+ glog.V(4).Infof("%s: Notification timeout (LoopProcessLogData), polling", readerName)
+ }
}
if logBuffer.IsStopping() {
isDone = true
@@ -104,6 +139,18 @@ func (logBuffer *LogBuffer) LoopProcessLogData(readerName string, startPosition
pos += 4 + int(size)
continue
}
+
+ // Handle offset-based filtering for offset-based start positions
+ if startPosition.IsOffsetBased {
+ startOffset := startPosition.GetOffset()
+ if logEntry.Offset < startOffset {
+ // Skip entries before the starting offset
+ pos += 4 + int(size)
+ batchSize++
+ continue
+ }
+ }
+
if stopTsNs != 0 && logEntry.TsNs > stopTsNs {
isDone = true
// println("stopTsNs", stopTsNs, "logEntry.TsNs", logEntry.TsNs)
@@ -131,63 +178,163 @@ func (logBuffer *LogBuffer) LoopProcessLogData(readerName string, startPosition
}
-// LoopProcessLogDataWithBatchIndex is similar to LoopProcessLogData but provides batchIndex to the callback
-func (logBuffer *LogBuffer) LoopProcessLogDataWithBatchIndex(readerName string, startPosition MessagePosition, stopTsNs int64,
- waitForDataFn func() bool, eachLogDataFn EachLogEntryWithBatchIndexFuncType) (lastReadPosition MessagePosition, isDone bool, err error) {
+// LoopProcessLogDataWithOffset is similar to LoopProcessLogData but provides offset to the callback
+func (logBuffer *LogBuffer) LoopProcessLogDataWithOffset(readerName string, startPosition MessagePosition, stopTsNs int64,
+ waitForDataFn func() bool, eachLogDataFn EachLogEntryWithOffsetFuncType) (lastReadPosition MessagePosition, isDone bool, err error) {
+ glog.V(4).Infof("LoopProcessLogDataWithOffset started for %s, startPosition=%v", readerName, startPosition)
+
+ // Register for instant notifications (<1ms latency)
+ notifyChan := logBuffer.RegisterSubscriber(readerName)
+ defer logBuffer.UnregisterSubscriber(readerName)
+
// loop through all messages
var bytesBuf *bytes.Buffer
- var batchIndex int64
+ var offset int64
lastReadPosition = startPosition
var entryCounter int64
defer func() {
if bytesBuf != nil {
logBuffer.ReleaseMemory(bytesBuf)
}
- // println("LoopProcessLogDataWithBatchIndex", readerName, "sent messages total", entryCounter)
+ // println("LoopProcessLogDataWithOffset", readerName, "sent messages total", entryCounter)
}()
for {
+ // Check stopTsNs at the beginning of each iteration
+ // This ensures we exit immediately if the stop time is in the past
+ if stopTsNs != 0 && time.Now().UnixNano() > stopTsNs {
+ isDone = true
+ return
+ }
if bytesBuf != nil {
logBuffer.ReleaseMemory(bytesBuf)
}
- bytesBuf, batchIndex, err = logBuffer.ReadFromBuffer(lastReadPosition)
+ bytesBuf, offset, err = logBuffer.ReadFromBuffer(lastReadPosition)
+ glog.V(4).Infof("ReadFromBuffer for %s returned bytesBuf=%v, offset=%d, err=%v", readerName, bytesBuf != nil, offset, err)
if err == ResumeFromDiskError {
- time.Sleep(1127 * time.Millisecond)
- return lastReadPosition, isDone, ResumeFromDiskError
+ // Try to read from disk if readFromDiskFn is available
+ if logBuffer.ReadFromDiskFn != nil {
+ // Wrap eachLogDataFn to match the expected signature
+ diskReadFn := func(logEntry *filer_pb.LogEntry) (bool, error) {
+ return eachLogDataFn(logEntry, logEntry.Offset)
+ }
+ lastReadPosition, isDone, err = logBuffer.ReadFromDiskFn(lastReadPosition, stopTsNs, diskReadFn)
+ if err != nil {
+ return lastReadPosition, isDone, err
+ }
+ if isDone {
+ return lastReadPosition, isDone, nil
+ }
+ // Continue to next iteration after disk read
+ }
+
+ // CRITICAL: Check if client is still connected after disk read
+ if !waitForDataFn() {
+ // Client disconnected - exit cleanly
+ glog.V(4).Infof("%s: Client disconnected after disk read", readerName)
+ return lastReadPosition, true, nil
+ }
+
+ // Wait for notification or timeout (instant wake-up when data arrives)
+ select {
+ case <-notifyChan:
+ // New data available, retry immediately
+ glog.V(3).Infof("%s: Woke up from notification after disk read", readerName)
+ case <-time.After(10 * time.Millisecond):
+ // Timeout, retry anyway (fallback for edge cases)
+ glog.V(4).Infof("%s: Notification timeout, polling", readerName)
+ }
+
+ // Continue to next iteration (don't return ResumeFromDiskError)
+ continue
}
readSize := 0
if bytesBuf != nil {
readSize = bytesBuf.Len()
}
- glog.V(4).Infof("%s ReadFromBuffer at %v batch %d. Read bytes %v batch %d", readerName, lastReadPosition, lastReadPosition.BatchIndex, readSize, batchIndex)
+ glog.V(4).Infof("%s ReadFromBuffer at %v posOffset %d. Read bytes %v bufferOffset %d", readerName, lastReadPosition, lastReadPosition.Offset, readSize, offset)
if bytesBuf == nil {
- if batchIndex >= 0 {
- lastReadPosition = NewMessagePosition(lastReadPosition.UnixNano(), batchIndex)
+ // CRITICAL: Check if subscription is still active BEFORE waiting
+ // This prevents infinite loops when client has disconnected
+ if !waitForDataFn() {
+ glog.V(4).Infof("%s: waitForDataFn returned false, subscription ending", readerName)
+ return lastReadPosition, true, nil
+ }
+
+ if offset >= 0 {
+ lastReadPosition = NewMessagePosition(lastReadPosition.Time.UnixNano(), offset)
}
if stopTsNs != 0 {
isDone = true
return
}
+
+ // CRITICAL FIX: If we're reading offset-based and there's no data in LogBuffer,
+ // return ResumeFromDiskError to let Subscribe try reading from disk again.
+ // This prevents infinite blocking when all data is on disk (e.g., after restart).
+ if startPosition.IsOffsetBased {
+ glog.V(4).Infof("%s: No data in LogBuffer for offset-based read at %v, checking if client still connected", readerName, lastReadPosition)
+ // Check if client is still connected before busy-looping
+ if !waitForDataFn() {
+ glog.V(4).Infof("%s: Client disconnected, stopping offset-based read", readerName)
+ return lastReadPosition, true, nil
+ }
+ // Wait for notification or timeout (instant wake-up when data arrives)
+ select {
+ case <-notifyChan:
+ // New data available, retry immediately
+ glog.V(3).Infof("%s: Woke up from notification for offset-based read", readerName)
+ case <-time.After(10 * time.Millisecond):
+ // Timeout, retry anyway (fallback for edge cases)
+ glog.V(4).Infof("%s: Notification timeout for offset-based, polling", readerName)
+ }
+ return lastReadPosition, isDone, ResumeFromDiskError
+ }
+
lastTsNs := logBuffer.LastTsNs.Load()
for lastTsNs == logBuffer.LastTsNs.Load() {
- if waitForDataFn() {
- continue
- } else {
- isDone = true
- return
+ if !waitForDataFn() {
+ glog.V(4).Infof("%s: Client disconnected during timestamp wait", readerName)
+ return lastReadPosition, true, nil
+ }
+ // Wait for notification or timeout (instant wake-up when data arrives)
+ select {
+ case <-notifyChan:
+ // New data available, break and retry read
+ glog.V(3).Infof("%s: Woke up from notification (main loop)", readerName)
+ break
+ case <-time.After(10 * time.Millisecond):
+ // Timeout, check if timestamp changed
+ if lastTsNs != logBuffer.LastTsNs.Load() {
+ break
+ }
+ glog.V(4).Infof("%s: Notification timeout (main loop), polling", readerName)
}
}
if logBuffer.IsStopping() {
- isDone = true
- return
+ glog.V(4).Infof("%s: LogBuffer is stopping", readerName)
+ return lastReadPosition, true, nil
}
continue
}
buf := bytesBuf.Bytes()
// fmt.Printf("ReadFromBuffer %s by %v size %d\n", readerName, lastReadPosition, len(buf))
+ glog.V(4).Infof("Processing buffer with %d bytes for %s", len(buf), readerName)
+
+ // If buffer is empty, check if client is still connected before looping
+ if len(buf) == 0 {
+ glog.V(4).Infof("Empty buffer for %s, checking if client still connected", readerName)
+ if !waitForDataFn() {
+ glog.V(4).Infof("%s: Client disconnected on empty buffer", readerName)
+ return lastReadPosition, true, nil
+ }
+ // Sleep to avoid busy-wait on empty buffer
+ time.Sleep(10 * time.Millisecond)
+ continue
+ }
batchSize := 0
@@ -196,7 +343,7 @@ func (logBuffer *LogBuffer) LoopProcessLogDataWithBatchIndex(readerName string,
size := util.BytesToUint32(buf[pos : pos+4])
if pos+4+int(size) > len(buf) {
err = ResumeError
- glog.Errorf("LoopProcessLogDataWithBatchIndex: %s read buffer %v read %d entries [%d,%d) from [0,%d)", readerName, lastReadPosition, batchSize, pos, pos+int(size)+4, len(buf))
+ glog.Errorf("LoopProcessLogDataWithOffset: %s read buffer %v read %d entries [%d,%d) from [0,%d)", readerName, lastReadPosition, batchSize, pos, pos+int(size)+4, len(buf))
return
}
entryData := buf[pos+4 : pos+4+int(size)]
@@ -207,19 +354,39 @@ func (logBuffer *LogBuffer) LoopProcessLogDataWithBatchIndex(readerName string,
pos += 4 + int(size)
continue
}
+
+ glog.V(4).Infof("Unmarshaled log entry %d: TsNs=%d, Offset=%d, Key=%s", batchSize+1, logEntry.TsNs, logEntry.Offset, string(logEntry.Key))
+
+ // Handle offset-based filtering for offset-based start positions
+ if startPosition.IsOffsetBased {
+ startOffset := startPosition.GetOffset()
+ glog.V(4).Infof("Offset-based filtering: logEntry.Offset=%d, startOffset=%d", logEntry.Offset, startOffset)
+ if logEntry.Offset < startOffset {
+ // Skip entries before the starting offset
+ glog.V(4).Infof("Skipping entry due to offset filter")
+ pos += 4 + int(size)
+ batchSize++
+ continue
+ }
+ }
+
if stopTsNs != 0 && logEntry.TsNs > stopTsNs {
+ glog.V(4).Infof("Stopping due to stopTsNs")
isDone = true
// println("stopTsNs", stopTsNs, "logEntry.TsNs", logEntry.TsNs)
return
}
- lastReadPosition = NewMessagePosition(logEntry.TsNs, batchIndex)
+ // CRITICAL FIX: Use logEntry.Offset + 1 to move PAST the current entry
+ // This prevents infinite loops where we keep requesting the same offset
+ lastReadPosition = NewMessagePosition(logEntry.TsNs, logEntry.Offset+1)
- if isDone, err = eachLogDataFn(logEntry, batchIndex); err != nil {
- glog.Errorf("LoopProcessLogDataWithBatchIndex: %s process log entry %d %v: %v", readerName, batchSize+1, logEntry, err)
+ glog.V(4).Infof("Calling eachLogDataFn for entry at offset %d, next position will be %d", logEntry.Offset, logEntry.Offset+1)
+ if isDone, err = eachLogDataFn(logEntry, logEntry.Offset); err != nil {
+ glog.Errorf("LoopProcessLogDataWithOffset: %s process log entry %d %v: %v", readerName, batchSize+1, logEntry, err)
return
}
if isDone {
- glog.V(0).Infof("LoopProcessLogDataWithBatchIndex: %s process log entry %d", readerName, batchSize+1)
+ glog.V(0).Infof("LoopProcessLogDataWithOffset: %s process log entry %d", readerName, batchSize+1)
return
}
diff --git a/weed/util/log_buffer/log_read_test.go b/weed/util/log_buffer/log_read_test.go
new file mode 100644
index 000000000..f01e2912a
--- /dev/null
+++ b/weed/util/log_buffer/log_read_test.go
@@ -0,0 +1,329 @@
+package log_buffer
+
+import (
+ "context"
+ "sync"
+ "testing"
+ "time"
+
+ "github.com/seaweedfs/seaweedfs/weed/pb/filer_pb"
+ "github.com/seaweedfs/seaweedfs/weed/pb/mq_pb"
+)
+
+// TestLoopProcessLogDataWithOffset_ClientDisconnect tests that the loop exits
+// when the client disconnects (waitForDataFn returns false)
+func TestLoopProcessLogDataWithOffset_ClientDisconnect(t *testing.T) {
+ flushFn := func(logBuffer *LogBuffer, startTime, stopTime time.Time, buf []byte, minOffset, maxOffset int64) {}
+ logBuffer := NewLogBuffer("test", 1*time.Minute, flushFn, nil, nil)
+ defer logBuffer.ShutdownLogBuffer()
+
+ // Simulate client disconnect after 100ms
+ ctx, cancel := context.WithTimeout(context.Background(), 100*time.Millisecond)
+ defer cancel()
+
+ waitForDataFn := func() bool {
+ select {
+ case <-ctx.Done():
+ return false // Client disconnected
+ default:
+ return true
+ }
+ }
+
+ eachLogEntryFn := func(logEntry *filer_pb.LogEntry, offset int64) (bool, error) {
+ return true, nil
+ }
+
+ startPosition := NewMessagePositionFromOffset(0)
+ startTime := time.Now()
+
+ // This should exit within 200ms (100ms timeout + some buffer)
+ _, isDone, _ := logBuffer.LoopProcessLogDataWithOffset("test-client", startPosition, 0, waitForDataFn, eachLogEntryFn)
+
+ elapsed := time.Since(startTime)
+
+ if !isDone {
+ t.Errorf("Expected isDone=true when client disconnects, got false")
+ }
+
+ if elapsed > 500*time.Millisecond {
+ t.Errorf("Loop took too long to exit: %v (expected < 500ms)", elapsed)
+ }
+
+ t.Logf("Loop exited cleanly in %v after client disconnect", elapsed)
+}
+
+// TestLoopProcessLogDataWithOffset_EmptyBuffer tests that the loop doesn't
+// busy-wait when the buffer is empty
+func TestLoopProcessLogDataWithOffset_EmptyBuffer(t *testing.T) {
+ flushFn := func(logBuffer *LogBuffer, startTime, stopTime time.Time, buf []byte, minOffset, maxOffset int64) {}
+ logBuffer := NewLogBuffer("test", 1*time.Minute, flushFn, nil, nil)
+ defer logBuffer.ShutdownLogBuffer()
+
+ callCount := 0
+ maxCalls := 10
+ mu := sync.Mutex{}
+
+ waitForDataFn := func() bool {
+ mu.Lock()
+ defer mu.Unlock()
+ callCount++
+ // Disconnect after maxCalls to prevent infinite loop
+ return callCount < maxCalls
+ }
+
+ eachLogEntryFn := func(logEntry *filer_pb.LogEntry, offset int64) (bool, error) {
+ return true, nil
+ }
+
+ startPosition := NewMessagePositionFromOffset(0)
+ startTime := time.Now()
+
+ _, isDone, _ := logBuffer.LoopProcessLogDataWithOffset("test-client", startPosition, 0, waitForDataFn, eachLogEntryFn)
+
+ elapsed := time.Since(startTime)
+
+ if !isDone {
+ t.Errorf("Expected isDone=true when waitForDataFn returns false, got false")
+ }
+
+ // With 10ms sleep per iteration, 10 iterations should take ~100ms minimum
+ minExpectedTime := time.Duration(maxCalls-1) * 10 * time.Millisecond
+ if elapsed < minExpectedTime {
+ t.Errorf("Loop exited too quickly (%v), expected at least %v (suggests busy-waiting)", elapsed, minExpectedTime)
+ }
+
+ // But shouldn't take more than 2x expected (allows for some overhead)
+ maxExpectedTime := time.Duration(maxCalls) * 30 * time.Millisecond
+ if elapsed > maxExpectedTime {
+ t.Errorf("Loop took too long: %v (expected < %v)", elapsed, maxExpectedTime)
+ }
+
+ mu.Lock()
+ finalCallCount := callCount
+ mu.Unlock()
+
+ if finalCallCount != maxCalls {
+ t.Errorf("Expected exactly %d calls to waitForDataFn, got %d", maxCalls, finalCallCount)
+ }
+
+ t.Logf("Loop exited cleanly in %v after %d iterations (no busy-waiting detected)", elapsed, finalCallCount)
+}
+
+// TestLoopProcessLogDataWithOffset_NoDataResumeFromDisk tests that the loop
+// properly handles ResumeFromDiskError without busy-waiting
+func TestLoopProcessLogDataWithOffset_NoDataResumeFromDisk(t *testing.T) {
+ readFromDiskFn := func(startPosition MessagePosition, stopTsNs int64, eachLogEntryFn EachLogEntryFuncType) (lastReadPosition MessagePosition, isDone bool, err error) {
+ // No data on disk
+ return startPosition, false, nil
+ }
+ flushFn := func(logBuffer *LogBuffer, startTime, stopTime time.Time, buf []byte, minOffset, maxOffset int64) {}
+ logBuffer := NewLogBuffer("test", 1*time.Minute, flushFn, readFromDiskFn, nil)
+ defer logBuffer.ShutdownLogBuffer()
+
+ callCount := 0
+ maxCalls := 5
+ mu := sync.Mutex{}
+
+ waitForDataFn := func() bool {
+ mu.Lock()
+ defer mu.Unlock()
+ callCount++
+ // Disconnect after maxCalls
+ return callCount < maxCalls
+ }
+
+ eachLogEntryFn := func(logEntry *filer_pb.LogEntry, offset int64) (bool, error) {
+ return true, nil
+ }
+
+ startPosition := NewMessagePositionFromOffset(0)
+ startTime := time.Now()
+
+ _, isDone, _ := logBuffer.LoopProcessLogDataWithOffset("test-client", startPosition, 0, waitForDataFn, eachLogEntryFn)
+
+ elapsed := time.Since(startTime)
+
+ if !isDone {
+ t.Errorf("Expected isDone=true when waitForDataFn returns false, got false")
+ }
+
+ // Should take at least (maxCalls-1) * 10ms due to sleep in ResumeFromDiskError path
+ minExpectedTime := time.Duration(maxCalls-1) * 10 * time.Millisecond
+ if elapsed < minExpectedTime {
+ t.Errorf("Loop exited too quickly (%v), expected at least %v (suggests missing sleep)", elapsed, minExpectedTime)
+ }
+
+ t.Logf("Loop exited cleanly in %v after %d iterations (proper sleep detected)", elapsed, callCount)
+}
+
+// TestLoopProcessLogDataWithOffset_WithData tests normal operation with data
+func TestLoopProcessLogDataWithOffset_WithData(t *testing.T) {
+ flushFn := func(logBuffer *LogBuffer, startTime, stopTime time.Time, buf []byte, minOffset, maxOffset int64) {}
+ logBuffer := NewLogBuffer("test", 1*time.Minute, flushFn, nil, nil)
+ defer logBuffer.ShutdownLogBuffer()
+
+ // Add some test data to the buffer
+ testMessages := []*mq_pb.DataMessage{
+ {Key: []byte("key1"), Value: []byte("message1"), TsNs: 1},
+ {Key: []byte("key2"), Value: []byte("message2"), TsNs: 2},
+ {Key: []byte("key3"), Value: []byte("message3"), TsNs: 3},
+ }
+
+ for _, msg := range testMessages {
+ logBuffer.AddToBuffer(msg)
+ }
+
+ receivedCount := 0
+ mu := sync.Mutex{}
+
+ // Disconnect after receiving at least 1 message to test that data processing works
+ waitForDataFn := func() bool {
+ mu.Lock()
+ defer mu.Unlock()
+ return receivedCount == 0 // Disconnect after first message
+ }
+
+ eachLogEntryFn := func(logEntry *filer_pb.LogEntry, offset int64) (bool, error) {
+ mu.Lock()
+ receivedCount++
+ mu.Unlock()
+ return true, nil // Continue processing
+ }
+
+ startPosition := NewMessagePositionFromOffset(0)
+ startTime := time.Now()
+
+ _, isDone, _ := logBuffer.LoopProcessLogDataWithOffset("test-client", startPosition, 0, waitForDataFn, eachLogEntryFn)
+
+ elapsed := time.Since(startTime)
+
+ if !isDone {
+ t.Errorf("Expected isDone=true after client disconnect, got false")
+ }
+
+ mu.Lock()
+ finalCount := receivedCount
+ mu.Unlock()
+
+ if finalCount < 1 {
+ t.Errorf("Expected to receive at least 1 message, got %d", finalCount)
+ }
+
+ // Should complete quickly since data is available
+ if elapsed > 1*time.Second {
+ t.Errorf("Processing took too long: %v (expected < 1s)", elapsed)
+ }
+
+ t.Logf("Successfully processed %d message(s) in %v", finalCount, elapsed)
+}
+
+// TestLoopProcessLogDataWithOffset_ConcurrentDisconnect tests that the loop
+// handles concurrent client disconnects without panicking
+func TestLoopProcessLogDataWithOffset_ConcurrentDisconnect(t *testing.T) {
+ flushFn := func(logBuffer *LogBuffer, startTime, stopTime time.Time, buf []byte, minOffset, maxOffset int64) {}
+ logBuffer := NewLogBuffer("test", 1*time.Minute, flushFn, nil, nil)
+ defer logBuffer.ShutdownLogBuffer()
+
+ numClients := 10
+ var wg sync.WaitGroup
+
+ for i := 0; i < numClients; i++ {
+ wg.Add(1)
+ go func(clientID int) {
+ defer wg.Done()
+
+ ctx, cancel := context.WithTimeout(context.Background(), 50*time.Millisecond)
+ defer cancel()
+
+ waitForDataFn := func() bool {
+ select {
+ case <-ctx.Done():
+ return false
+ default:
+ return true
+ }
+ }
+
+ eachLogEntryFn := func(logEntry *filer_pb.LogEntry, offset int64) (bool, error) {
+ return true, nil
+ }
+
+ startPosition := NewMessagePositionFromOffset(0)
+ _, _, _ = logBuffer.LoopProcessLogDataWithOffset("test-client", startPosition, 0, waitForDataFn, eachLogEntryFn)
+ }(i)
+ }
+
+ // Wait for all clients to finish with a timeout
+ done := make(chan struct{})
+ go func() {
+ wg.Wait()
+ close(done)
+ }()
+
+ select {
+ case <-done:
+ t.Logf("All %d concurrent clients exited cleanly", numClients)
+ case <-time.After(5 * time.Second):
+ t.Errorf("Timeout waiting for concurrent clients to exit (possible deadlock or stuck loop)")
+ }
+}
+
+// TestLoopProcessLogDataWithOffset_StopTime tests that the loop respects stopTsNs
+func TestLoopProcessLogDataWithOffset_StopTime(t *testing.T) {
+ flushFn := func(logBuffer *LogBuffer, startTime, stopTime time.Time, buf []byte, minOffset, maxOffset int64) {}
+ logBuffer := NewLogBuffer("test", 1*time.Minute, flushFn, nil, nil)
+ defer logBuffer.ShutdownLogBuffer()
+
+ callCount := 0
+ waitForDataFn := func() bool {
+ callCount++
+ // Prevent infinite loop in case of test failure
+ return callCount < 10
+ }
+
+ eachLogEntryFn := func(logEntry *filer_pb.LogEntry, offset int64) (bool, error) {
+ t.Errorf("Should not process any entries when stopTsNs is in the past")
+ return false, nil
+ }
+
+ startPosition := NewMessagePositionFromOffset(0)
+ stopTsNs := time.Now().Add(-1 * time.Hour).UnixNano() // Stop time in the past
+
+ startTime := time.Now()
+ _, isDone, _ := logBuffer.LoopProcessLogDataWithOffset("test-client", startPosition, stopTsNs, waitForDataFn, eachLogEntryFn)
+ elapsed := time.Since(startTime)
+
+ if !isDone {
+ t.Errorf("Expected isDone=true when stopTsNs is in the past, got false")
+ }
+
+ if elapsed > 1*time.Second {
+ t.Errorf("Loop should exit quickly when stopTsNs is in the past, took %v", elapsed)
+ }
+
+ t.Logf("Loop correctly exited for past stopTsNs in %v (waitForDataFn called %d times)", elapsed, callCount)
+}
+
+// BenchmarkLoopProcessLogDataWithOffset_EmptyBuffer benchmarks the performance
+// of the loop with an empty buffer to ensure no busy-waiting
+func BenchmarkLoopProcessLogDataWithOffset_EmptyBuffer(b *testing.B) {
+ flushFn := func(logBuffer *LogBuffer, startTime, stopTime time.Time, buf []byte, minOffset, maxOffset int64) {}
+ logBuffer := NewLogBuffer("test", 1*time.Minute, flushFn, nil, nil)
+ defer logBuffer.ShutdownLogBuffer()
+
+ for i := 0; i < b.N; i++ {
+ callCount := 0
+ waitForDataFn := func() bool {
+ callCount++
+ return callCount < 3 // Exit after 3 calls
+ }
+
+ eachLogEntryFn := func(logEntry *filer_pb.LogEntry, offset int64) (bool, error) {
+ return true, nil
+ }
+
+ startPosition := NewMessagePositionFromOffset(0)
+ logBuffer.LoopProcessLogDataWithOffset("test-client", startPosition, 0, waitForDataFn, eachLogEntryFn)
+ }
+}
diff --git a/weed/util/log_buffer/sealed_buffer.go b/weed/util/log_buffer/sealed_buffer.go
index c41b30fcc..397dab1d4 100644
--- a/weed/util/log_buffer/sealed_buffer.go
+++ b/weed/util/log_buffer/sealed_buffer.go
@@ -6,11 +6,12 @@ import (
)
type MemBuffer struct {
- buf []byte
- size int
- startTime time.Time
- stopTime time.Time
- batchIndex int64
+ buf []byte
+ size int
+ startTime time.Time
+ stopTime time.Time
+ startOffset int64 // First offset in this buffer
+ offset int64 // Last offset in this buffer (endOffset)
}
type SealedBuffers struct {
@@ -30,7 +31,7 @@ func newSealedBuffers(size int) *SealedBuffers {
return sbs
}
-func (sbs *SealedBuffers) SealBuffer(startTime, stopTime time.Time, buf []byte, pos int, batchIndex int64) (newBuf []byte) {
+func (sbs *SealedBuffers) SealBuffer(startTime, stopTime time.Time, buf []byte, pos int, startOffset int64, endOffset int64) (newBuf []byte) {
oldMemBuffer := sbs.buffers[0]
size := len(sbs.buffers)
for i := 0; i < size-1; i++ {
@@ -38,13 +39,15 @@ func (sbs *SealedBuffers) SealBuffer(startTime, stopTime time.Time, buf []byte,
sbs.buffers[i].size = sbs.buffers[i+1].size
sbs.buffers[i].startTime = sbs.buffers[i+1].startTime
sbs.buffers[i].stopTime = sbs.buffers[i+1].stopTime
- sbs.buffers[i].batchIndex = sbs.buffers[i+1].batchIndex
+ sbs.buffers[i].startOffset = sbs.buffers[i+1].startOffset
+ sbs.buffers[i].offset = sbs.buffers[i+1].offset
}
sbs.buffers[size-1].buf = buf
sbs.buffers[size-1].size = pos
sbs.buffers[size-1].startTime = startTime
sbs.buffers[size-1].stopTime = stopTime
- sbs.buffers[size-1].batchIndex = batchIndex
+ sbs.buffers[size-1].startOffset = startOffset
+ sbs.buffers[size-1].offset = endOffset
return oldMemBuffer.buf
}