diff options
Diffstat (limited to 'weed/util/log_buffer/log_buffer.go')
| -rw-r--r-- | weed/util/log_buffer/log_buffer.go | 167 |
1 files changed, 82 insertions, 85 deletions
diff --git a/weed/util/log_buffer/log_buffer.go b/weed/util/log_buffer/log_buffer.go index 715dbdd30..22e69cc60 100644 --- a/weed/util/log_buffer/log_buffer.go +++ b/weed/util/log_buffer/log_buffer.go @@ -19,6 +19,12 @@ import ( const BufferSize = 8 * 1024 * 1024 const PreviousBufferCount = 32 +// Errors that can be returned by log buffer operations +var ( + // ErrBufferCorrupted indicates the log buffer contains corrupted data + ErrBufferCorrupted = fmt.Errorf("log buffer is corrupted") +) + type dataToFlush struct { startTime time.Time stopTime time.Time @@ -117,14 +123,12 @@ func (logBuffer *LogBuffer) RegisterSubscriber(subscriberID string) chan struct{ // Check if already registered if existingChan, exists := logBuffer.subscribers[subscriberID]; exists { - glog.V(2).Infof("Subscriber %s already registered for %s, reusing channel", subscriberID, logBuffer.name) return existingChan } // Create buffered channel (size 1) so notifications never block notifyChan := make(chan struct{}, 1) logBuffer.subscribers[subscriberID] = notifyChan - glog.V(1).Infof("Registered subscriber %s for %s (total: %d)", subscriberID, logBuffer.name, len(logBuffer.subscribers)) return notifyChan } @@ -136,7 +140,6 @@ func (logBuffer *LogBuffer) UnregisterSubscriber(subscriberID string) { if ch, exists := logBuffer.subscribers[subscriberID]; exists { close(ch) delete(logBuffer.subscribers, subscriberID) - glog.V(1).Infof("Unregistered subscriber %s from %s (remaining: %d)", subscriberID, logBuffer.name, len(logBuffer.subscribers)) } } @@ -158,7 +161,6 @@ func (logBuffer *LogBuffer) IsOffsetInMemory(offset int64) bool { // it MUST be in memory (not written to disk yet) lastFlushed := logBuffer.lastFlushedOffset.Load() if lastFlushed >= 0 && offset > lastFlushed { - glog.V(3).Infof("Offset %d is in memory (newer than lastFlushed=%d)", offset, lastFlushed) return true } @@ -168,11 +170,9 @@ func (logBuffer *LogBuffer) IsOffsetInMemory(offset int64) bool { // CRITICAL: Check if buffer actually has data (pos > 0) // After flush, pos=0 but range is still valid - data is on disk, not in memory if logBuffer.pos > 0 { - glog.V(3).Infof("Offset %d is in current buffer [%d-%d] with data", offset, logBuffer.bufferStartOffset, logBuffer.offset) return true } // Buffer is empty (just flushed) - data is on disk - glog.V(3).Infof("Offset %d in range [%d-%d] but buffer empty (pos=0), data on disk", offset, logBuffer.bufferStartOffset, logBuffer.offset) return false } @@ -181,17 +181,14 @@ func (logBuffer *LogBuffer) IsOffsetInMemory(offset int64) bool { if offset >= buf.startOffset && offset <= buf.offset { // Check if prevBuffer actually has data if buf.size > 0 { - glog.V(3).Infof("Offset %d is in previous buffer [%d-%d] with data", offset, buf.startOffset, buf.offset) return true } // Buffer is empty (flushed) - data is on disk - glog.V(3).Infof("Offset %d in prevBuffer [%d-%d] but empty (size=0), data on disk", offset, buf.startOffset, buf.offset) return false } } // Offset is older than memory buffers - only available on disk - glog.V(3).Infof("Offset %d is NOT in memory (bufferStart=%d, lastFlushed=%d)", offset, logBuffer.bufferStartOffset, lastFlushed) return false } @@ -205,15 +202,13 @@ func (logBuffer *LogBuffer) notifySubscribers() { return // No subscribers, skip notification } - for subscriberID, notifyChan := range logBuffer.subscribers { + for _, notifyChan := range logBuffer.subscribers { select { case notifyChan <- struct{}{}: // Notification sent successfully - glog.V(3).Infof("Notified subscriber %s for %s", subscriberID, logBuffer.name) default: // Channel full - subscriber hasn't consumed previous notification yet // This is OK because one notification is sufficient to wake the subscriber - glog.V(3).Infof("Subscriber %s notification channel full (OK - already notified)", subscriberID) } } } @@ -227,7 +222,6 @@ func (logBuffer *LogBuffer) InitializeOffsetFromExistingData(getHighestOffsetFn highestOffset, err := getHighestOffsetFn() if err != nil { - glog.V(0).Infof("Failed to get highest offset for %s: %v, starting from 0", logBuffer.name, err) return nil // Continue with offset 0 if we can't read existing data } @@ -243,37 +237,36 @@ func (logBuffer *LogBuffer) InitializeOffsetFromExistingData(getHighestOffsetFn logBuffer.lastFlushedOffset.Store(highestOffset) // Set lastFlushedTime to current time (we know data up to highestOffset is on disk) logBuffer.lastFlushTsNs.Store(time.Now().UnixNano()) - glog.V(0).Infof("Initialized LogBuffer %s offset to %d (highest existing: %d), buffer starts at %d, lastFlushedOffset=%d, lastFlushedTime=%v", - logBuffer.name, nextOffset, highestOffset, nextOffset, highestOffset, time.Now()) } else { logBuffer.bufferStartOffset = 0 // Start from offset 0 // No data on disk yet - glog.V(0).Infof("No existing data found for %s, starting from offset 0, lastFlushedOffset=-1, lastFlushedTime=0", logBuffer.name) } return nil } -func (logBuffer *LogBuffer) AddToBuffer(message *mq_pb.DataMessage) { - logBuffer.AddDataToBuffer(message.Key, message.Value, message.TsNs) +func (logBuffer *LogBuffer) AddToBuffer(message *mq_pb.DataMessage) error { + return logBuffer.AddDataToBuffer(message.Key, message.Value, message.TsNs) } // AddLogEntryToBuffer directly adds a LogEntry to the buffer, preserving offset information -func (logBuffer *LogBuffer) AddLogEntryToBuffer(logEntry *filer_pb.LogEntry) { - logEntryData, _ := proto.Marshal(logEntry) - +func (logBuffer *LogBuffer) AddLogEntryToBuffer(logEntry *filer_pb.LogEntry) error { var toFlush *dataToFlush + var marshalErr error logBuffer.Lock() defer func() { logBuffer.Unlock() if toFlush != nil { logBuffer.flushChan <- toFlush } - if logBuffer.notifyFn != nil { - logBuffer.notifyFn() + // Only notify if there was no error + if marshalErr == nil { + if logBuffer.notifyFn != nil { + logBuffer.notifyFn() + } + // Notify all registered subscribers instantly (<1ms latency) + logBuffer.notifySubscribers() } - // Notify all registered subscribers instantly (<1ms latency) - logBuffer.notifySubscribers() }() processingTsNs := logEntry.TsNs @@ -285,11 +278,16 @@ func (logBuffer *LogBuffer) AddLogEntryToBuffer(logEntry *filer_pb.LogEntry) { ts = time.Unix(0, processingTsNs) // Re-marshal with corrected timestamp logEntry.TsNs = processingTsNs - logEntryData, _ = proto.Marshal(logEntry) } else { logBuffer.LastTsNs.Store(processingTsNs) } + logEntryData, err := proto.Marshal(logEntry) + if err != nil { + marshalErr = fmt.Errorf("failed to marshal LogEntry: %w", err) + glog.Errorf("%v", marshalErr) + return marshalErr + } size := len(logEntryData) if logBuffer.pos == 0 { @@ -323,8 +321,9 @@ func (logBuffer *LogBuffer) AddLogEntryToBuffer(logEntry *filer_pb.LogEntry) { const maxBufferSize = 1 << 30 // 1 GiB practical limit // Ensure 2*size + 4 won't overflow int and stays within practical bounds if size < 0 || size > (math.MaxInt-4)/2 || size > (maxBufferSize-4)/2 { - glog.Errorf("Buffer size out of valid range: %d bytes, skipping", size) - return + marshalErr = fmt.Errorf("message size %d exceeds maximum allowed size", size) + glog.Errorf("%v", marshalErr) + return marshalErr } // Safe to compute now that we've validated size is in valid range newSize := 2*size + 4 @@ -340,9 +339,10 @@ func (logBuffer *LogBuffer) AddLogEntryToBuffer(logEntry *filer_pb.LogEntry) { logBuffer.pos += size + 4 logBuffer.offset++ + return nil } -func (logBuffer *LogBuffer) AddDataToBuffer(partitionKey, data []byte, processingTsNs int64) { +func (logBuffer *LogBuffer) AddDataToBuffer(partitionKey, data []byte, processingTsNs int64) error { // PERFORMANCE OPTIMIZATION: Pre-process expensive operations OUTSIDE the lock var ts time.Time @@ -360,20 +360,22 @@ func (logBuffer *LogBuffer) AddDataToBuffer(partitionKey, data []byte, processin Key: partitionKey, } - logEntryData, _ := proto.Marshal(logEntry) - var toFlush *dataToFlush + var marshalErr error logBuffer.Lock() defer func() { logBuffer.Unlock() if toFlush != nil { logBuffer.flushChan <- toFlush } - if logBuffer.notifyFn != nil { - logBuffer.notifyFn() + // Only notify if there was no error + if marshalErr == nil { + if logBuffer.notifyFn != nil { + logBuffer.notifyFn() + } + // Notify all registered subscribers instantly (<1ms latency) + logBuffer.notifySubscribers() } - // Notify all registered subscribers instantly (<1ms latency) - logBuffer.notifySubscribers() }() // Handle timestamp collision inside lock (rare case) @@ -390,20 +392,13 @@ func (logBuffer *LogBuffer) AddDataToBuffer(partitionKey, data []byte, processin // Note: This also enables AddToBuffer to work correctly with Kafka-style offset-based reads logEntry.Offset = logBuffer.offset - // DEBUG: Log data being added to buffer for GitHub Actions debugging - dataPreview := "" - if len(data) > 0 { - if len(data) <= 50 { - dataPreview = string(data) - } else { - dataPreview = fmt.Sprintf("%s...(total %d bytes)", string(data[:50]), len(data)) - } - } - glog.V(2).Infof("[LOG_BUFFER_ADD] buffer=%s offset=%d dataLen=%d dataPreview=%q", - logBuffer.name, logBuffer.offset, len(data), dataPreview) - // Marshal with correct timestamp and offset - logEntryData, _ = proto.Marshal(logEntry) + logEntryData, err := proto.Marshal(logEntry) + if err != nil { + marshalErr = fmt.Errorf("failed to marshal LogEntry: %w", err) + glog.Errorf("%v", marshalErr) + return marshalErr + } size := len(logEntryData) @@ -429,7 +424,6 @@ func (logBuffer *LogBuffer) AddDataToBuffer(partitionKey, data []byte, processin } if logBuffer.startTime.Add(logBuffer.flushInterval).Before(ts) || len(logBuffer.buf)-logBuffer.pos < size+4 { - // glog.V(0).Infof("%s copyToFlush1 offset:%d count:%d start time %v, ts %v, remaining %d bytes", logBuffer.name, logBuffer.offset, len(logBuffer.idx), logBuffer.startTime, ts, len(logBuffer.buf)-logBuffer.pos) toFlush = logBuffer.copyToFlush() logBuffer.startTime = ts if len(logBuffer.buf) < size+4 { @@ -437,8 +431,9 @@ func (logBuffer *LogBuffer) AddDataToBuffer(partitionKey, data []byte, processin const maxBufferSize = 1 << 30 // 1 GiB practical limit // Ensure 2*size + 4 won't overflow int and stays within practical bounds if size < 0 || size > (math.MaxInt-4)/2 || size > (maxBufferSize-4)/2 { - glog.Errorf("Buffer size out of valid range: %d bytes, skipping", size) - return + marshalErr = fmt.Errorf("message size %d exceeds maximum allowed size", size) + glog.Errorf("%v", marshalErr) + return marshalErr } // Safe to compute now that we've validated size is in valid range newSize := 2*size + 4 @@ -454,6 +449,7 @@ func (logBuffer *LogBuffer) AddDataToBuffer(partitionKey, data []byte, processin logBuffer.pos += size + 4 logBuffer.offset++ + return nil } func (logBuffer *LogBuffer) IsStopping() bool { @@ -480,14 +476,11 @@ func (logBuffer *LogBuffer) ForceFlush() { select { case <-toFlush.done: // Flush completed successfully - glog.V(1).Infof("ForceFlush completed for %s", logBuffer.name) case <-time.After(5 * time.Second): // Timeout waiting for flush - this shouldn't happen - glog.Warningf("ForceFlush timed out waiting for completion on %s", logBuffer.name) } case <-time.After(2 * time.Second): // If flush channel is still blocked after 2s, something is wrong - glog.Warningf("ForceFlush channel timeout for %s - flush channel busy for 2s", logBuffer.name) } } } @@ -511,7 +504,6 @@ func (logBuffer *LogBuffer) IsAllFlushed() bool { func (logBuffer *LogBuffer) loopFlush() { for d := range logBuffer.flushChan { if d != nil { - // glog.V(4).Infof("%s flush [%v, %v] size %d", m.name, d.startTime, d.stopTime, len(d.data.Bytes())) logBuffer.flushFn(logBuffer, d.startTime, d.stopTime, d.data.Bytes(), d.minOffset, d.maxOffset) d.releaseMemory() // local logbuffer is different from aggregate logbuffer here @@ -546,10 +538,7 @@ func (logBuffer *LogBuffer) loopInterval() { toFlush := logBuffer.copyToFlush() logBuffer.Unlock() if toFlush != nil { - glog.V(4).Infof("%s flush [%v, %v] size %d", logBuffer.name, toFlush.startTime, toFlush.stopTime, len(toFlush.data.Bytes())) logBuffer.flushChan <- toFlush - } else { - // glog.V(0).Infof("%s no flush", m.name) } } } @@ -578,9 +567,7 @@ func (logBuffer *LogBuffer) copyToFlushInternal(withCallback bool) *dataToFlush if withCallback { d.done = make(chan struct{}) } - // glog.V(4).Infof("%s flushing [0,%d) with %d entries [%v, %v]", m.name, m.pos, len(m.idx), m.startTime, m.stopTime) } else { - // glog.V(4).Infof("%s removed from memory [0,%d) with %d entries [%v, %v]", m.name, m.pos, len(m.idx), m.startTime, m.stopTime) logBuffer.lastFlushDataTime = logBuffer.stopTime } // CRITICAL: logBuffer.offset is the "next offset to assign", so last offset in buffer is offset-1 @@ -647,8 +634,6 @@ func (logBuffer *LogBuffer) ReadFromBuffer(lastReadPosition MessagePosition) (bu defer logBuffer.RUnlock() isOffsetBased := lastReadPosition.IsOffsetBased - glog.V(2).Infof("[ReadFromBuffer] %s: isOffsetBased=%v, position=%+v, bufferStartOffset=%d, offset=%d, pos=%d", - logBuffer.name, isOffsetBased, lastReadPosition, logBuffer.bufferStartOffset, logBuffer.offset, logBuffer.pos) // For offset-based subscriptions, use offset comparisons, not time comparisons! if isOffsetBased { @@ -729,11 +714,7 @@ func (logBuffer *LogBuffer) ReadFromBuffer(lastReadPosition MessagePosition) (bu if !logBuffer.startTime.IsZero() { tsMemory = logBuffer.startTime } - glog.V(2).Infof("[ReadFromBuffer] %s: checking prevBuffers, count=%d, currentStartTime=%v", - logBuffer.name, len(logBuffer.prevBuffers.buffers), logBuffer.startTime) - for i, prevBuf := range logBuffer.prevBuffers.buffers { - glog.V(2).Infof("[ReadFromBuffer] %s: prevBuf[%d]: startTime=%v stopTime=%v size=%d startOffset=%d endOffset=%d", - logBuffer.name, i, prevBuf.startTime, prevBuf.stopTime, prevBuf.size, prevBuf.startOffset, prevBuf.offset) + for _, prevBuf := range logBuffer.prevBuffers.buffers { if !prevBuf.startTime.IsZero() { // If tsMemory is zero, assign directly; otherwise compare if tsMemory.IsZero() || prevBuf.startTime.Before(tsMemory) { @@ -754,19 +735,12 @@ func (logBuffer *LogBuffer) ReadFromBuffer(lastReadPosition MessagePosition) (bu // Fall through to case 2.1 to read from earliest buffer } else if lastReadPosition.Offset <= 0 && lastReadPosition.Time.Before(tsMemory) { // Treat first read with sentinel/zero offset as inclusive of earliest in-memory data - glog.V(4).Infof("first read (offset=%d) at time %v before earliest memory %v, reading from memory", - lastReadPosition.Offset, lastReadPosition.Time, tsMemory) } else { // Data not in memory buffers - read from disk - glog.V(0).Infof("[ReadFromBuffer] %s resume from disk: requested time %v < earliest memory time %v", - logBuffer.name, lastReadPosition.Time, tsMemory) return nil, -2, ResumeFromDiskError } } - glog.V(2).Infof("[ReadFromBuffer] %s: time-based read continuing, tsMemory=%v, lastReadPos=%v", - logBuffer.name, tsMemory, lastReadPosition.Time) - // the following is case 2.1 if lastReadPosition.Time.Equal(logBuffer.stopTime) && !logBuffer.stopTime.IsZero() { @@ -776,14 +750,12 @@ func (logBuffer *LogBuffer) ReadFromBuffer(lastReadPosition MessagePosition) (bu } } if lastReadPosition.Time.After(logBuffer.stopTime) && !logBuffer.stopTime.IsZero() { - // glog.Fatalf("unexpected last read time %v, older than latest %v", lastReadPosition, m.stopTime) return nil, logBuffer.offset, nil } // Also check prevBuffers when current buffer is empty (startTime is zero) if lastReadPosition.Time.Before(logBuffer.startTime) || logBuffer.startTime.IsZero() { for _, buf := range logBuffer.prevBuffers.buffers { if buf.startTime.After(lastReadPosition.Time) { - // glog.V(4).Infof("%s return the %d sealed buffer %v", m.name, i, buf.startTime) return copiedBytes(buf.buf[:buf.size]), buf.offset, nil } if !buf.startTime.After(lastReadPosition.Time) && buf.stopTime.After(lastReadPosition.Time) { @@ -791,14 +763,17 @@ func (logBuffer *LogBuffer) ReadFromBuffer(lastReadPosition MessagePosition) (bu if lastReadPosition.Offset <= 0 { searchTime = searchTime.Add(-time.Nanosecond) } - pos := buf.locateByTs(searchTime) - glog.V(2).Infof("[ReadFromBuffer] %s: found data in prevBuffer at pos %d, bufSize=%d", logBuffer.name, pos, buf.size) + pos, err := buf.locateByTs(searchTime) + if err != nil { + // Buffer corruption detected - return error wrapped with ErrBufferCorrupted + glog.Errorf("ReadFromBuffer: buffer corruption in prevBuffer: %v", err) + return nil, -1, fmt.Errorf("%w: %v", ErrBufferCorrupted, err) + } return copiedBytes(buf.buf[pos:buf.size]), buf.offset, nil } } // If current buffer is not empty, return it if logBuffer.pos > 0 { - // glog.V(4).Infof("%s return the current buf %v", m.name, lastReadPosition) return copiedBytes(logBuffer.buf[:logBuffer.pos]), logBuffer.offset, nil } // Buffer is empty and no data in prevBuffers - wait for new data @@ -830,13 +805,23 @@ func (logBuffer *LogBuffer) ReadFromBuffer(lastReadPosition MessagePosition) (bu for l <= h { mid := (l + h) / 2 pos := logBuffer.idx[mid] - _, t := readTs(logBuffer.buf, pos) + _, t, err := readTs(logBuffer.buf, pos) + if err != nil { + // Buffer corruption detected in binary search + glog.Errorf("ReadFromBuffer: buffer corruption at idx[%d] pos %d: %v", mid, pos, err) + return nil, -1, fmt.Errorf("%w: %v", ErrBufferCorrupted, err) + } if t <= searchTs { l = mid + 1 } else if searchTs < t { var prevT int64 if mid > 0 { - _, prevT = readTs(logBuffer.buf, logBuffer.idx[mid-1]) + _, prevT, err = readTs(logBuffer.buf, logBuffer.idx[mid-1]) + if err != nil { + // Buffer corruption detected in binary search (previous entry) + glog.Errorf("ReadFromBuffer: buffer corruption at idx[%d] pos %d: %v", mid-1, logBuffer.idx[mid-1], err) + return nil, -1, fmt.Errorf("%w: %v", ErrBufferCorrupted, err) + } } if prevT <= searchTs { return copiedBytes(logBuffer.buf[pos:logBuffer.pos]), logBuffer.offset, nil @@ -881,16 +866,28 @@ func copiedBytes(buf []byte) (copied *bytes.Buffer) { return } -func readTs(buf []byte, pos int) (size int, ts int64) { +func readTs(buf []byte, pos int) (size int, ts int64, err error) { + // Bounds check for size field (overflow-safe) + if pos < 0 || pos > len(buf)-4 { + return 0, 0, fmt.Errorf("corrupted log buffer: cannot read size at pos %d, buffer length %d", pos, len(buf)) + } size = int(util.BytesToUint32(buf[pos : pos+4])) + + // Bounds check for entry data (overflow-safe, protects against negative size) + if size < 0 || size > len(buf)-pos-4 { + return 0, 0, fmt.Errorf("corrupted log buffer: entry size %d at pos %d exceeds buffer length %d", size, pos, len(buf)) + } + entryData := buf[pos+4 : pos+4+size] logEntry := &filer_pb.LogEntry{} - err := proto.Unmarshal(entryData, logEntry) + err = proto.Unmarshal(entryData, logEntry) if err != nil { - glog.Fatalf("unexpected unmarshal filer_pb.LogEntry: %v", err) + // Return error instead of failing fast + // This allows caller to handle corruption gracefully + return 0, 0, fmt.Errorf("corrupted log buffer: failed to unmarshal LogEntry at pos %d, size %d: %w", pos, size, err) } - return size, logEntry.TsNs + return size, logEntry.TsNs, nil } |
