aboutsummaryrefslogtreecommitdiff
path: root/weed/util/log_buffer/log_buffer.go
diff options
context:
space:
mode:
Diffstat (limited to 'weed/util/log_buffer/log_buffer.go')
-rw-r--r--weed/util/log_buffer/log_buffer.go167
1 files changed, 82 insertions, 85 deletions
diff --git a/weed/util/log_buffer/log_buffer.go b/weed/util/log_buffer/log_buffer.go
index 715dbdd30..22e69cc60 100644
--- a/weed/util/log_buffer/log_buffer.go
+++ b/weed/util/log_buffer/log_buffer.go
@@ -19,6 +19,12 @@ import (
const BufferSize = 8 * 1024 * 1024
const PreviousBufferCount = 32
+// Errors that can be returned by log buffer operations
+var (
+ // ErrBufferCorrupted indicates the log buffer contains corrupted data
+ ErrBufferCorrupted = fmt.Errorf("log buffer is corrupted")
+)
+
type dataToFlush struct {
startTime time.Time
stopTime time.Time
@@ -117,14 +123,12 @@ func (logBuffer *LogBuffer) RegisterSubscriber(subscriberID string) chan struct{
// Check if already registered
if existingChan, exists := logBuffer.subscribers[subscriberID]; exists {
- glog.V(2).Infof("Subscriber %s already registered for %s, reusing channel", subscriberID, logBuffer.name)
return existingChan
}
// Create buffered channel (size 1) so notifications never block
notifyChan := make(chan struct{}, 1)
logBuffer.subscribers[subscriberID] = notifyChan
- glog.V(1).Infof("Registered subscriber %s for %s (total: %d)", subscriberID, logBuffer.name, len(logBuffer.subscribers))
return notifyChan
}
@@ -136,7 +140,6 @@ func (logBuffer *LogBuffer) UnregisterSubscriber(subscriberID string) {
if ch, exists := logBuffer.subscribers[subscriberID]; exists {
close(ch)
delete(logBuffer.subscribers, subscriberID)
- glog.V(1).Infof("Unregistered subscriber %s from %s (remaining: %d)", subscriberID, logBuffer.name, len(logBuffer.subscribers))
}
}
@@ -158,7 +161,6 @@ func (logBuffer *LogBuffer) IsOffsetInMemory(offset int64) bool {
// it MUST be in memory (not written to disk yet)
lastFlushed := logBuffer.lastFlushedOffset.Load()
if lastFlushed >= 0 && offset > lastFlushed {
- glog.V(3).Infof("Offset %d is in memory (newer than lastFlushed=%d)", offset, lastFlushed)
return true
}
@@ -168,11 +170,9 @@ func (logBuffer *LogBuffer) IsOffsetInMemory(offset int64) bool {
// CRITICAL: Check if buffer actually has data (pos > 0)
// After flush, pos=0 but range is still valid - data is on disk, not in memory
if logBuffer.pos > 0 {
- glog.V(3).Infof("Offset %d is in current buffer [%d-%d] with data", offset, logBuffer.bufferStartOffset, logBuffer.offset)
return true
}
// Buffer is empty (just flushed) - data is on disk
- glog.V(3).Infof("Offset %d in range [%d-%d] but buffer empty (pos=0), data on disk", offset, logBuffer.bufferStartOffset, logBuffer.offset)
return false
}
@@ -181,17 +181,14 @@ func (logBuffer *LogBuffer) IsOffsetInMemory(offset int64) bool {
if offset >= buf.startOffset && offset <= buf.offset {
// Check if prevBuffer actually has data
if buf.size > 0 {
- glog.V(3).Infof("Offset %d is in previous buffer [%d-%d] with data", offset, buf.startOffset, buf.offset)
return true
}
// Buffer is empty (flushed) - data is on disk
- glog.V(3).Infof("Offset %d in prevBuffer [%d-%d] but empty (size=0), data on disk", offset, buf.startOffset, buf.offset)
return false
}
}
// Offset is older than memory buffers - only available on disk
- glog.V(3).Infof("Offset %d is NOT in memory (bufferStart=%d, lastFlushed=%d)", offset, logBuffer.bufferStartOffset, lastFlushed)
return false
}
@@ -205,15 +202,13 @@ func (logBuffer *LogBuffer) notifySubscribers() {
return // No subscribers, skip notification
}
- for subscriberID, notifyChan := range logBuffer.subscribers {
+ for _, notifyChan := range logBuffer.subscribers {
select {
case notifyChan <- struct{}{}:
// Notification sent successfully
- glog.V(3).Infof("Notified subscriber %s for %s", subscriberID, logBuffer.name)
default:
// Channel full - subscriber hasn't consumed previous notification yet
// This is OK because one notification is sufficient to wake the subscriber
- glog.V(3).Infof("Subscriber %s notification channel full (OK - already notified)", subscriberID)
}
}
}
@@ -227,7 +222,6 @@ func (logBuffer *LogBuffer) InitializeOffsetFromExistingData(getHighestOffsetFn
highestOffset, err := getHighestOffsetFn()
if err != nil {
- glog.V(0).Infof("Failed to get highest offset for %s: %v, starting from 0", logBuffer.name, err)
return nil // Continue with offset 0 if we can't read existing data
}
@@ -243,37 +237,36 @@ func (logBuffer *LogBuffer) InitializeOffsetFromExistingData(getHighestOffsetFn
logBuffer.lastFlushedOffset.Store(highestOffset)
// Set lastFlushedTime to current time (we know data up to highestOffset is on disk)
logBuffer.lastFlushTsNs.Store(time.Now().UnixNano())
- glog.V(0).Infof("Initialized LogBuffer %s offset to %d (highest existing: %d), buffer starts at %d, lastFlushedOffset=%d, lastFlushedTime=%v",
- logBuffer.name, nextOffset, highestOffset, nextOffset, highestOffset, time.Now())
} else {
logBuffer.bufferStartOffset = 0 // Start from offset 0
// No data on disk yet
- glog.V(0).Infof("No existing data found for %s, starting from offset 0, lastFlushedOffset=-1, lastFlushedTime=0", logBuffer.name)
}
return nil
}
-func (logBuffer *LogBuffer) AddToBuffer(message *mq_pb.DataMessage) {
- logBuffer.AddDataToBuffer(message.Key, message.Value, message.TsNs)
+func (logBuffer *LogBuffer) AddToBuffer(message *mq_pb.DataMessage) error {
+ return logBuffer.AddDataToBuffer(message.Key, message.Value, message.TsNs)
}
// AddLogEntryToBuffer directly adds a LogEntry to the buffer, preserving offset information
-func (logBuffer *LogBuffer) AddLogEntryToBuffer(logEntry *filer_pb.LogEntry) {
- logEntryData, _ := proto.Marshal(logEntry)
-
+func (logBuffer *LogBuffer) AddLogEntryToBuffer(logEntry *filer_pb.LogEntry) error {
var toFlush *dataToFlush
+ var marshalErr error
logBuffer.Lock()
defer func() {
logBuffer.Unlock()
if toFlush != nil {
logBuffer.flushChan <- toFlush
}
- if logBuffer.notifyFn != nil {
- logBuffer.notifyFn()
+ // Only notify if there was no error
+ if marshalErr == nil {
+ if logBuffer.notifyFn != nil {
+ logBuffer.notifyFn()
+ }
+ // Notify all registered subscribers instantly (<1ms latency)
+ logBuffer.notifySubscribers()
}
- // Notify all registered subscribers instantly (<1ms latency)
- logBuffer.notifySubscribers()
}()
processingTsNs := logEntry.TsNs
@@ -285,11 +278,16 @@ func (logBuffer *LogBuffer) AddLogEntryToBuffer(logEntry *filer_pb.LogEntry) {
ts = time.Unix(0, processingTsNs)
// Re-marshal with corrected timestamp
logEntry.TsNs = processingTsNs
- logEntryData, _ = proto.Marshal(logEntry)
} else {
logBuffer.LastTsNs.Store(processingTsNs)
}
+ logEntryData, err := proto.Marshal(logEntry)
+ if err != nil {
+ marshalErr = fmt.Errorf("failed to marshal LogEntry: %w", err)
+ glog.Errorf("%v", marshalErr)
+ return marshalErr
+ }
size := len(logEntryData)
if logBuffer.pos == 0 {
@@ -323,8 +321,9 @@ func (logBuffer *LogBuffer) AddLogEntryToBuffer(logEntry *filer_pb.LogEntry) {
const maxBufferSize = 1 << 30 // 1 GiB practical limit
// Ensure 2*size + 4 won't overflow int and stays within practical bounds
if size < 0 || size > (math.MaxInt-4)/2 || size > (maxBufferSize-4)/2 {
- glog.Errorf("Buffer size out of valid range: %d bytes, skipping", size)
- return
+ marshalErr = fmt.Errorf("message size %d exceeds maximum allowed size", size)
+ glog.Errorf("%v", marshalErr)
+ return marshalErr
}
// Safe to compute now that we've validated size is in valid range
newSize := 2*size + 4
@@ -340,9 +339,10 @@ func (logBuffer *LogBuffer) AddLogEntryToBuffer(logEntry *filer_pb.LogEntry) {
logBuffer.pos += size + 4
logBuffer.offset++
+ return nil
}
-func (logBuffer *LogBuffer) AddDataToBuffer(partitionKey, data []byte, processingTsNs int64) {
+func (logBuffer *LogBuffer) AddDataToBuffer(partitionKey, data []byte, processingTsNs int64) error {
// PERFORMANCE OPTIMIZATION: Pre-process expensive operations OUTSIDE the lock
var ts time.Time
@@ -360,20 +360,22 @@ func (logBuffer *LogBuffer) AddDataToBuffer(partitionKey, data []byte, processin
Key: partitionKey,
}
- logEntryData, _ := proto.Marshal(logEntry)
-
var toFlush *dataToFlush
+ var marshalErr error
logBuffer.Lock()
defer func() {
logBuffer.Unlock()
if toFlush != nil {
logBuffer.flushChan <- toFlush
}
- if logBuffer.notifyFn != nil {
- logBuffer.notifyFn()
+ // Only notify if there was no error
+ if marshalErr == nil {
+ if logBuffer.notifyFn != nil {
+ logBuffer.notifyFn()
+ }
+ // Notify all registered subscribers instantly (<1ms latency)
+ logBuffer.notifySubscribers()
}
- // Notify all registered subscribers instantly (<1ms latency)
- logBuffer.notifySubscribers()
}()
// Handle timestamp collision inside lock (rare case)
@@ -390,20 +392,13 @@ func (logBuffer *LogBuffer) AddDataToBuffer(partitionKey, data []byte, processin
// Note: This also enables AddToBuffer to work correctly with Kafka-style offset-based reads
logEntry.Offset = logBuffer.offset
- // DEBUG: Log data being added to buffer for GitHub Actions debugging
- dataPreview := ""
- if len(data) > 0 {
- if len(data) <= 50 {
- dataPreview = string(data)
- } else {
- dataPreview = fmt.Sprintf("%s...(total %d bytes)", string(data[:50]), len(data))
- }
- }
- glog.V(2).Infof("[LOG_BUFFER_ADD] buffer=%s offset=%d dataLen=%d dataPreview=%q",
- logBuffer.name, logBuffer.offset, len(data), dataPreview)
-
// Marshal with correct timestamp and offset
- logEntryData, _ = proto.Marshal(logEntry)
+ logEntryData, err := proto.Marshal(logEntry)
+ if err != nil {
+ marshalErr = fmt.Errorf("failed to marshal LogEntry: %w", err)
+ glog.Errorf("%v", marshalErr)
+ return marshalErr
+ }
size := len(logEntryData)
@@ -429,7 +424,6 @@ func (logBuffer *LogBuffer) AddDataToBuffer(partitionKey, data []byte, processin
}
if logBuffer.startTime.Add(logBuffer.flushInterval).Before(ts) || len(logBuffer.buf)-logBuffer.pos < size+4 {
- // glog.V(0).Infof("%s copyToFlush1 offset:%d count:%d start time %v, ts %v, remaining %d bytes", logBuffer.name, logBuffer.offset, len(logBuffer.idx), logBuffer.startTime, ts, len(logBuffer.buf)-logBuffer.pos)
toFlush = logBuffer.copyToFlush()
logBuffer.startTime = ts
if len(logBuffer.buf) < size+4 {
@@ -437,8 +431,9 @@ func (logBuffer *LogBuffer) AddDataToBuffer(partitionKey, data []byte, processin
const maxBufferSize = 1 << 30 // 1 GiB practical limit
// Ensure 2*size + 4 won't overflow int and stays within practical bounds
if size < 0 || size > (math.MaxInt-4)/2 || size > (maxBufferSize-4)/2 {
- glog.Errorf("Buffer size out of valid range: %d bytes, skipping", size)
- return
+ marshalErr = fmt.Errorf("message size %d exceeds maximum allowed size", size)
+ glog.Errorf("%v", marshalErr)
+ return marshalErr
}
// Safe to compute now that we've validated size is in valid range
newSize := 2*size + 4
@@ -454,6 +449,7 @@ func (logBuffer *LogBuffer) AddDataToBuffer(partitionKey, data []byte, processin
logBuffer.pos += size + 4
logBuffer.offset++
+ return nil
}
func (logBuffer *LogBuffer) IsStopping() bool {
@@ -480,14 +476,11 @@ func (logBuffer *LogBuffer) ForceFlush() {
select {
case <-toFlush.done:
// Flush completed successfully
- glog.V(1).Infof("ForceFlush completed for %s", logBuffer.name)
case <-time.After(5 * time.Second):
// Timeout waiting for flush - this shouldn't happen
- glog.Warningf("ForceFlush timed out waiting for completion on %s", logBuffer.name)
}
case <-time.After(2 * time.Second):
// If flush channel is still blocked after 2s, something is wrong
- glog.Warningf("ForceFlush channel timeout for %s - flush channel busy for 2s", logBuffer.name)
}
}
}
@@ -511,7 +504,6 @@ func (logBuffer *LogBuffer) IsAllFlushed() bool {
func (logBuffer *LogBuffer) loopFlush() {
for d := range logBuffer.flushChan {
if d != nil {
- // glog.V(4).Infof("%s flush [%v, %v] size %d", m.name, d.startTime, d.stopTime, len(d.data.Bytes()))
logBuffer.flushFn(logBuffer, d.startTime, d.stopTime, d.data.Bytes(), d.minOffset, d.maxOffset)
d.releaseMemory()
// local logbuffer is different from aggregate logbuffer here
@@ -546,10 +538,7 @@ func (logBuffer *LogBuffer) loopInterval() {
toFlush := logBuffer.copyToFlush()
logBuffer.Unlock()
if toFlush != nil {
- glog.V(4).Infof("%s flush [%v, %v] size %d", logBuffer.name, toFlush.startTime, toFlush.stopTime, len(toFlush.data.Bytes()))
logBuffer.flushChan <- toFlush
- } else {
- // glog.V(0).Infof("%s no flush", m.name)
}
}
}
@@ -578,9 +567,7 @@ func (logBuffer *LogBuffer) copyToFlushInternal(withCallback bool) *dataToFlush
if withCallback {
d.done = make(chan struct{})
}
- // glog.V(4).Infof("%s flushing [0,%d) with %d entries [%v, %v]", m.name, m.pos, len(m.idx), m.startTime, m.stopTime)
} else {
- // glog.V(4).Infof("%s removed from memory [0,%d) with %d entries [%v, %v]", m.name, m.pos, len(m.idx), m.startTime, m.stopTime)
logBuffer.lastFlushDataTime = logBuffer.stopTime
}
// CRITICAL: logBuffer.offset is the "next offset to assign", so last offset in buffer is offset-1
@@ -647,8 +634,6 @@ func (logBuffer *LogBuffer) ReadFromBuffer(lastReadPosition MessagePosition) (bu
defer logBuffer.RUnlock()
isOffsetBased := lastReadPosition.IsOffsetBased
- glog.V(2).Infof("[ReadFromBuffer] %s: isOffsetBased=%v, position=%+v, bufferStartOffset=%d, offset=%d, pos=%d",
- logBuffer.name, isOffsetBased, lastReadPosition, logBuffer.bufferStartOffset, logBuffer.offset, logBuffer.pos)
// For offset-based subscriptions, use offset comparisons, not time comparisons!
if isOffsetBased {
@@ -729,11 +714,7 @@ func (logBuffer *LogBuffer) ReadFromBuffer(lastReadPosition MessagePosition) (bu
if !logBuffer.startTime.IsZero() {
tsMemory = logBuffer.startTime
}
- glog.V(2).Infof("[ReadFromBuffer] %s: checking prevBuffers, count=%d, currentStartTime=%v",
- logBuffer.name, len(logBuffer.prevBuffers.buffers), logBuffer.startTime)
- for i, prevBuf := range logBuffer.prevBuffers.buffers {
- glog.V(2).Infof("[ReadFromBuffer] %s: prevBuf[%d]: startTime=%v stopTime=%v size=%d startOffset=%d endOffset=%d",
- logBuffer.name, i, prevBuf.startTime, prevBuf.stopTime, prevBuf.size, prevBuf.startOffset, prevBuf.offset)
+ for _, prevBuf := range logBuffer.prevBuffers.buffers {
if !prevBuf.startTime.IsZero() {
// If tsMemory is zero, assign directly; otherwise compare
if tsMemory.IsZero() || prevBuf.startTime.Before(tsMemory) {
@@ -754,19 +735,12 @@ func (logBuffer *LogBuffer) ReadFromBuffer(lastReadPosition MessagePosition) (bu
// Fall through to case 2.1 to read from earliest buffer
} else if lastReadPosition.Offset <= 0 && lastReadPosition.Time.Before(tsMemory) {
// Treat first read with sentinel/zero offset as inclusive of earliest in-memory data
- glog.V(4).Infof("first read (offset=%d) at time %v before earliest memory %v, reading from memory",
- lastReadPosition.Offset, lastReadPosition.Time, tsMemory)
} else {
// Data not in memory buffers - read from disk
- glog.V(0).Infof("[ReadFromBuffer] %s resume from disk: requested time %v < earliest memory time %v",
- logBuffer.name, lastReadPosition.Time, tsMemory)
return nil, -2, ResumeFromDiskError
}
}
- glog.V(2).Infof("[ReadFromBuffer] %s: time-based read continuing, tsMemory=%v, lastReadPos=%v",
- logBuffer.name, tsMemory, lastReadPosition.Time)
-
// the following is case 2.1
if lastReadPosition.Time.Equal(logBuffer.stopTime) && !logBuffer.stopTime.IsZero() {
@@ -776,14 +750,12 @@ func (logBuffer *LogBuffer) ReadFromBuffer(lastReadPosition MessagePosition) (bu
}
}
if lastReadPosition.Time.After(logBuffer.stopTime) && !logBuffer.stopTime.IsZero() {
- // glog.Fatalf("unexpected last read time %v, older than latest %v", lastReadPosition, m.stopTime)
return nil, logBuffer.offset, nil
}
// Also check prevBuffers when current buffer is empty (startTime is zero)
if lastReadPosition.Time.Before(logBuffer.startTime) || logBuffer.startTime.IsZero() {
for _, buf := range logBuffer.prevBuffers.buffers {
if buf.startTime.After(lastReadPosition.Time) {
- // glog.V(4).Infof("%s return the %d sealed buffer %v", m.name, i, buf.startTime)
return copiedBytes(buf.buf[:buf.size]), buf.offset, nil
}
if !buf.startTime.After(lastReadPosition.Time) && buf.stopTime.After(lastReadPosition.Time) {
@@ -791,14 +763,17 @@ func (logBuffer *LogBuffer) ReadFromBuffer(lastReadPosition MessagePosition) (bu
if lastReadPosition.Offset <= 0 {
searchTime = searchTime.Add(-time.Nanosecond)
}
- pos := buf.locateByTs(searchTime)
- glog.V(2).Infof("[ReadFromBuffer] %s: found data in prevBuffer at pos %d, bufSize=%d", logBuffer.name, pos, buf.size)
+ pos, err := buf.locateByTs(searchTime)
+ if err != nil {
+ // Buffer corruption detected - return error wrapped with ErrBufferCorrupted
+ glog.Errorf("ReadFromBuffer: buffer corruption in prevBuffer: %v", err)
+ return nil, -1, fmt.Errorf("%w: %v", ErrBufferCorrupted, err)
+ }
return copiedBytes(buf.buf[pos:buf.size]), buf.offset, nil
}
}
// If current buffer is not empty, return it
if logBuffer.pos > 0 {
- // glog.V(4).Infof("%s return the current buf %v", m.name, lastReadPosition)
return copiedBytes(logBuffer.buf[:logBuffer.pos]), logBuffer.offset, nil
}
// Buffer is empty and no data in prevBuffers - wait for new data
@@ -830,13 +805,23 @@ func (logBuffer *LogBuffer) ReadFromBuffer(lastReadPosition MessagePosition) (bu
for l <= h {
mid := (l + h) / 2
pos := logBuffer.idx[mid]
- _, t := readTs(logBuffer.buf, pos)
+ _, t, err := readTs(logBuffer.buf, pos)
+ if err != nil {
+ // Buffer corruption detected in binary search
+ glog.Errorf("ReadFromBuffer: buffer corruption at idx[%d] pos %d: %v", mid, pos, err)
+ return nil, -1, fmt.Errorf("%w: %v", ErrBufferCorrupted, err)
+ }
if t <= searchTs {
l = mid + 1
} else if searchTs < t {
var prevT int64
if mid > 0 {
- _, prevT = readTs(logBuffer.buf, logBuffer.idx[mid-1])
+ _, prevT, err = readTs(logBuffer.buf, logBuffer.idx[mid-1])
+ if err != nil {
+ // Buffer corruption detected in binary search (previous entry)
+ glog.Errorf("ReadFromBuffer: buffer corruption at idx[%d] pos %d: %v", mid-1, logBuffer.idx[mid-1], err)
+ return nil, -1, fmt.Errorf("%w: %v", ErrBufferCorrupted, err)
+ }
}
if prevT <= searchTs {
return copiedBytes(logBuffer.buf[pos:logBuffer.pos]), logBuffer.offset, nil
@@ -881,16 +866,28 @@ func copiedBytes(buf []byte) (copied *bytes.Buffer) {
return
}
-func readTs(buf []byte, pos int) (size int, ts int64) {
+func readTs(buf []byte, pos int) (size int, ts int64, err error) {
+ // Bounds check for size field (overflow-safe)
+ if pos < 0 || pos > len(buf)-4 {
+ return 0, 0, fmt.Errorf("corrupted log buffer: cannot read size at pos %d, buffer length %d", pos, len(buf))
+ }
size = int(util.BytesToUint32(buf[pos : pos+4]))
+
+ // Bounds check for entry data (overflow-safe, protects against negative size)
+ if size < 0 || size > len(buf)-pos-4 {
+ return 0, 0, fmt.Errorf("corrupted log buffer: entry size %d at pos %d exceeds buffer length %d", size, pos, len(buf))
+ }
+
entryData := buf[pos+4 : pos+4+size]
logEntry := &filer_pb.LogEntry{}
- err := proto.Unmarshal(entryData, logEntry)
+ err = proto.Unmarshal(entryData, logEntry)
if err != nil {
- glog.Fatalf("unexpected unmarshal filer_pb.LogEntry: %v", err)
+ // Return error instead of failing fast
+ // This allows caller to handle corruption gracefully
+ return 0, 0, fmt.Errorf("corrupted log buffer: failed to unmarshal LogEntry at pos %d, size %d: %w", pos, size, err)
}
- return size, logEntry.TsNs
+ return size, logEntry.TsNs, nil
}