aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorchrislu <chris.lu@gmail.com>2025-08-14 08:16:00 -0700
committerchrislu <chris.lu@gmail.com>2025-08-14 20:38:03 -0700
commit80db6f4d79cc534b7ab8c8c0c708bd62de579563 (patch)
tree9a7e81c6e100ee6f1dcf99f3b3df1a4a49e91685
parent18a22177b97e59558959223495ac11ed45cf654a (diff)
downloadseaweedfs-80db6f4d79cc534b7ab8c8c0c708bd62de579563.tar.xz
seaweedfs-80db6f4d79cc534b7ab8c8c0c708bd62de579563.zip
reduce lock scope to improve log buffer performance
-rw-r--r--weed/util/log_buffer/log_buffer.go42
1 files changed, 24 insertions, 18 deletions
diff --git a/weed/util/log_buffer/log_buffer.go b/weed/util/log_buffer/log_buffer.go
index fb1f8dc2f..8683dfffc 100644
--- a/weed/util/log_buffer/log_buffer.go
+++ b/weed/util/log_buffer/log_buffer.go
@@ -75,6 +75,24 @@ func (logBuffer *LogBuffer) AddToBuffer(message *mq_pb.DataMessage) {
func (logBuffer *LogBuffer) AddDataToBuffer(partitionKey, data []byte, processingTsNs int64) {
+ // PERFORMANCE OPTIMIZATION: Pre-process expensive operations OUTSIDE the lock
+ var ts time.Time
+ if processingTsNs == 0 {
+ ts = time.Now()
+ processingTsNs = ts.UnixNano()
+ } else {
+ ts = time.Unix(0, processingTsNs)
+ }
+
+ logEntry := &filer_pb.LogEntry{
+ TsNs: processingTsNs, // Will be updated if needed
+ PartitionKeyHash: util.HashToInt32(partitionKey),
+ Data: data,
+ Key: partitionKey,
+ }
+
+ logEntryData, _ := proto.Marshal(logEntry)
+
var toFlush *dataToFlush
logBuffer.Lock()
defer func() {
@@ -87,28 +105,16 @@ func (logBuffer *LogBuffer) AddDataToBuffer(partitionKey, data []byte, processin
}
}()
- // need to put the timestamp inside the lock
- var ts time.Time
- if processingTsNs == 0 {
- ts = time.Now()
- processingTsNs = ts.UnixNano()
- } else {
- ts = time.Unix(0, processingTsNs)
- }
+ // Handle timestamp collision inside lock (rare case)
if logBuffer.LastTsNs.Load() >= processingTsNs {
- // this is unlikely to happen, but just in case
processingTsNs = logBuffer.LastTsNs.Add(1)
ts = time.Unix(0, processingTsNs)
+ // Re-marshal with corrected timestamp
+ logEntry.TsNs = processingTsNs
+ logEntryData, _ = proto.Marshal(logEntry)
+ } else {
+ logBuffer.LastTsNs.Store(processingTsNs)
}
- logBuffer.LastTsNs.Store(processingTsNs)
- logEntry := &filer_pb.LogEntry{
- TsNs: processingTsNs,
- PartitionKeyHash: util.HashToInt32(partitionKey),
- Data: data,
- Key: partitionKey,
- }
-
- logEntryData, _ := proto.Marshal(logEntry)
size := len(logEntryData)