aboutsummaryrefslogtreecommitdiff
path: root/weed/util/log_buffer/log_buffer.go
diff options
context:
space:
mode:
authorChris Lu <chrislusf@users.noreply.github.com>2020-05-17 17:39:16 -0700
committerGitHub <noreply@github.com>2020-05-17 17:39:16 -0700
commite0e31e67a809d00c99edaa299531c7ce4d4750dc (patch)
tree0f890277ef14c748faed4fecb7f8b8d4edeb9849 /weed/util/log_buffer/log_buffer.go
parentb4e02ec525a6ec87b26686202307896faf3296a7 (diff)
parent081ee6fe349b519da8ea54cf3cdc17d2b15c5a71 (diff)
downloadseaweedfs-e0e31e67a809d00c99edaa299531c7ce4d4750dc.tar.xz
seaweedfs-e0e31e67a809d00c99edaa299531c7ce4d4750dc.zip
Merge pull request #1318 from chrislusf/msg_channel
Add messaging, add channel
Diffstat (limited to 'weed/util/log_buffer/log_buffer.go')
-rw-r--r--weed/util/log_buffer/log_buffer.go21
1 files changed, 17 insertions, 4 deletions
diff --git a/weed/util/log_buffer/log_buffer.go b/weed/util/log_buffer/log_buffer.go
index 6ba7f3737..b02c45b52 100644
--- a/weed/util/log_buffer/log_buffer.go
+++ b/weed/util/log_buffer/log_buffer.go
@@ -34,6 +34,7 @@ type LogBuffer struct {
notifyFn func()
isStopping bool
flushChan chan *dataToFlush
+ lastTsNs int64
sync.RWMutex
}
@@ -64,8 +65,15 @@ func (m *LogBuffer) AddToBuffer(partitionKey, data []byte) {
// need to put the timestamp inside the lock
ts := time.Now()
+ tsNs := ts.UnixNano()
+ if m.lastTsNs >= tsNs {
+ // this is unlikely to happen, but just in case
+ tsNs = m.lastTsNs + 1
+ ts = time.Unix(0, tsNs)
+ }
+ m.lastTsNs = tsNs
logEntry := &filer_pb.LogEntry{
- TsNs: ts.UnixNano(),
+ TsNs: tsNs,
PartitionKeyHash: util.HashToInt32(partitionKey),
Data: data,
}
@@ -98,13 +106,14 @@ func (m *LogBuffer) AddToBuffer(partitionKey, data []byte) {
}
func (m *LogBuffer) Shutdown() {
+ m.Lock()
+ defer m.Unlock()
+
if m.isStopping {
return
}
m.isStopping = true
- m.Lock()
toFlush := m.copyToFlush()
- m.Unlock()
m.flushChan <- toFlush
close(m.flushChan)
}
@@ -123,10 +132,14 @@ func (m *LogBuffer) loopInterval() {
for !m.isStopping {
time.Sleep(m.flushInterval)
m.Lock()
+ if m.isStopping {
+ m.Unlock()
+ return
+ }
// println("loop interval")
toFlush := m.copyToFlush()
- m.Unlock()
m.flushChan <- toFlush
+ m.Unlock()
}
}