diff options
| author | chrislu <chris.lu@gmail.com> | 2025-08-14 08:16:00 -0700 |
|---|---|---|
| committer | chrislu <chris.lu@gmail.com> | 2025-08-14 20:38:03 -0700 |
| commit | 80db6f4d79cc534b7ab8c8c0c708bd62de579563 (patch) | |
| tree | 9a7e81c6e100ee6f1dcf99f3b3df1a4a49e91685 | |
| parent | 18a22177b97e59558959223495ac11ed45cf654a (diff) | |
| download | seaweedfs-80db6f4d79cc534b7ab8c8c0c708bd62de579563.tar.xz seaweedfs-80db6f4d79cc534b7ab8c8c0c708bd62de579563.zip | |
reduce lock scope to improve log buffer performance
| -rw-r--r-- | weed/util/log_buffer/log_buffer.go | 42 |
1 files changed, 24 insertions, 18 deletions
diff --git a/weed/util/log_buffer/log_buffer.go b/weed/util/log_buffer/log_buffer.go index fb1f8dc2f..8683dfffc 100644 --- a/weed/util/log_buffer/log_buffer.go +++ b/weed/util/log_buffer/log_buffer.go @@ -75,6 +75,24 @@ func (logBuffer *LogBuffer) AddToBuffer(message *mq_pb.DataMessage) { func (logBuffer *LogBuffer) AddDataToBuffer(partitionKey, data []byte, processingTsNs int64) { + // PERFORMANCE OPTIMIZATION: Pre-process expensive operations OUTSIDE the lock + var ts time.Time + if processingTsNs == 0 { + ts = time.Now() + processingTsNs = ts.UnixNano() + } else { + ts = time.Unix(0, processingTsNs) + } + + logEntry := &filer_pb.LogEntry{ + TsNs: processingTsNs, // Will be updated if needed + PartitionKeyHash: util.HashToInt32(partitionKey), + Data: data, + Key: partitionKey, + } + + logEntryData, _ := proto.Marshal(logEntry) + var toFlush *dataToFlush logBuffer.Lock() defer func() { @@ -87,28 +105,16 @@ func (logBuffer *LogBuffer) AddDataToBuffer(partitionKey, data []byte, processin } }() - // need to put the timestamp inside the lock - var ts time.Time - if processingTsNs == 0 { - ts = time.Now() - processingTsNs = ts.UnixNano() - } else { - ts = time.Unix(0, processingTsNs) - } + // Handle timestamp collision inside lock (rare case) if logBuffer.LastTsNs.Load() >= processingTsNs { - // this is unlikely to happen, but just in case processingTsNs = logBuffer.LastTsNs.Add(1) ts = time.Unix(0, processingTsNs) + // Re-marshal with corrected timestamp + logEntry.TsNs = processingTsNs + logEntryData, _ = proto.Marshal(logEntry) + } else { + logBuffer.LastTsNs.Store(processingTsNs) } - logBuffer.LastTsNs.Store(processingTsNs) - logEntry := &filer_pb.LogEntry{ - TsNs: processingTsNs, - PartitionKeyHash: util.HashToInt32(partitionKey), - Data: data, - Key: partitionKey, - } - - logEntryData, _ := proto.Marshal(logEntry) size := len(logEntryData) |
