diff options
| author | Chris Lu <chris.lu@gmail.com> | 2020-08-29 17:37:19 -0700 |
|---|---|---|
| committer | Chris Lu <chris.lu@gmail.com> | 2020-08-29 17:37:19 -0700 |
| commit | b69cb74c033376d6738ef1537593c2349196bdb6 (patch) | |
| tree | a894c06765bd8a4f078b042626d3d82cb2857ada /weed/util/log_buffer | |
| parent | 063c9ddac5277f4c20489129223f2c49ce51ad3b (diff) | |
| download | seaweedfs-b69cb74c033376d6738ef1537593c2349196bdb6.tar.xz seaweedfs-b69cb74c033376d6738ef1537593c2349196bdb6.zip | |
read meta logs by timestamp
pass in event ts when moving logs
meta aggregator reads in memory logs only
Diffstat (limited to 'weed/util/log_buffer')
| -rw-r--r-- | weed/util/log_buffer/log_buffer.go | 21 | ||||
| -rw-r--r-- | weed/util/log_buffer/log_buffer_test.go | 2 |
2 files changed, 14 insertions, 9 deletions
diff --git a/weed/util/log_buffer/log_buffer.go b/weed/util/log_buffer/log_buffer.go index d066014d1..e4310b5c5 100644 --- a/weed/util/log_buffer/log_buffer.go +++ b/weed/util/log_buffer/log_buffer.go @@ -53,7 +53,7 @@ func NewLogBuffer(flushInterval time.Duration, flushFn func(startTime, stopTime return lb } -func (m *LogBuffer) AddToBuffer(partitionKey, data []byte) { +func (m *LogBuffer) AddToBuffer(partitionKey, data []byte, eventTsNs int64) { m.Lock() defer func() { @@ -64,16 +64,21 @@ func (m *LogBuffer) AddToBuffer(partitionKey, data []byte) { }() // need to put the timestamp inside the lock - ts := time.Now() - tsNs := ts.UnixNano() - if m.lastTsNs >= tsNs { + var ts time.Time + if eventTsNs == 0 { + ts = time.Now() + eventTsNs = ts.UnixNano() + } else { + ts = time.Unix(0, eventTsNs) + } + if m.lastTsNs >= eventTsNs { // this is unlikely to happen, but just in case - tsNs = m.lastTsNs + 1 - ts = time.Unix(0, tsNs) + eventTsNs = m.lastTsNs + 1 + ts = time.Unix(0, eventTsNs) } - m.lastTsNs = tsNs + m.lastTsNs = eventTsNs logEntry := &filer_pb.LogEntry{ - TsNs: tsNs, + TsNs: eventTsNs, PartitionKeyHash: util.HashToInt32(partitionKey), Data: data, } diff --git a/weed/util/log_buffer/log_buffer_test.go b/weed/util/log_buffer/log_buffer_test.go index f9ccc95c2..3d77afb18 100644 --- a/weed/util/log_buffer/log_buffer_test.go +++ b/weed/util/log_buffer/log_buffer_test.go @@ -23,7 +23,7 @@ func TestNewLogBufferFirstBuffer(t *testing.T) { var buf = make([]byte, messageSize) for i := 0; i < messageCount; i++ { rand.Read(buf) - lb.AddToBuffer(nil, buf) + lb.AddToBuffer(nil, buf, 0) } receivedmessageCount := 0 |
