From 6bf3eb69cb9abd02e1d63ecdee485d198a3cab9a Mon Sep 17 00:00:00 2001 From: Chris Lu Date: Sun, 10 May 2020 03:48:35 -0700 Subject: async chan write read, no write for closed chan --- weed/util/log_buffer/log_buffer.go | 11 ++++++++--- 1 file changed, 8 insertions(+), 3 deletions(-) (limited to 'weed/util/log_buffer/log_buffer.go') diff --git a/weed/util/log_buffer/log_buffer.go b/weed/util/log_buffer/log_buffer.go index 6ba7f3737..67c44dc57 100644 --- a/weed/util/log_buffer/log_buffer.go +++ b/weed/util/log_buffer/log_buffer.go @@ -98,13 +98,14 @@ func (m *LogBuffer) AddToBuffer(partitionKey, data []byte) { } func (m *LogBuffer) Shutdown() { + m.Lock() + defer m.Unlock() + if m.isStopping { return } m.isStopping = true - m.Lock() toFlush := m.copyToFlush() - m.Unlock() m.flushChan <- toFlush close(m.flushChan) } @@ -123,10 +124,14 @@ func (m *LogBuffer) loopInterval() { for !m.isStopping { time.Sleep(m.flushInterval) m.Lock() + if m.isStopping { + m.Unlock() + return + } // println("loop interval") toFlush := m.copyToFlush() - m.Unlock() m.flushChan <- toFlush + m.Unlock() } } -- cgit v1.2.3 From 4b7fa31468eb1b8e0af53543a7e760c517faf227 Mon Sep 17 00:00:00 2001 From: Chris Lu Date: Mon, 11 May 2020 01:53:54 -0700 Subject: ensure montonically increasing tsNs --- weed/util/log_buffer/log_buffer.go | 10 +++++++++- 1 file changed, 9 insertions(+), 1 deletion(-) (limited to 'weed/util/log_buffer/log_buffer.go') diff --git a/weed/util/log_buffer/log_buffer.go b/weed/util/log_buffer/log_buffer.go index 67c44dc57..b02c45b52 100644 --- a/weed/util/log_buffer/log_buffer.go +++ b/weed/util/log_buffer/log_buffer.go @@ -34,6 +34,7 @@ type LogBuffer struct { notifyFn func() isStopping bool flushChan chan *dataToFlush + lastTsNs int64 sync.RWMutex } @@ -64,8 +65,15 @@ func (m *LogBuffer) AddToBuffer(partitionKey, data []byte) { // need to put the timestamp inside the lock ts := time.Now() + tsNs := ts.UnixNano() + if m.lastTsNs >= tsNs { + // this is unlikely to happen, but just in case + tsNs = m.lastTsNs + 1 + ts = time.Unix(0, tsNs) + } + m.lastTsNs = tsNs logEntry := &filer_pb.LogEntry{ - TsNs: ts.UnixNano(), + TsNs: tsNs, PartitionKeyHash: util.HashToInt32(partitionKey), Data: data, } -- cgit v1.2.3