aboutsummaryrefslogtreecommitdiff
path: root/weed/util/log_buffer
diff options
context:
space:
mode:
Diffstat (limited to 'weed/util/log_buffer')
-rw-r--r--weed/util/log_buffer/log_buffer.go21
-rw-r--r--weed/util/log_buffer/log_read.go8
-rw-r--r--weed/util/log_buffer/sealed_buffer.go2
3 files changed, 23 insertions, 8 deletions
diff --git a/weed/util/log_buffer/log_buffer.go b/weed/util/log_buffer/log_buffer.go
index 6ba7f3737..b02c45b52 100644
--- a/weed/util/log_buffer/log_buffer.go
+++ b/weed/util/log_buffer/log_buffer.go
@@ -34,6 +34,7 @@ type LogBuffer struct {
notifyFn func()
isStopping bool
flushChan chan *dataToFlush
+ lastTsNs int64
sync.RWMutex
}
@@ -64,8 +65,15 @@ func (m *LogBuffer) AddToBuffer(partitionKey, data []byte) {
// need to put the timestamp inside the lock
ts := time.Now()
+ tsNs := ts.UnixNano()
+ if m.lastTsNs >= tsNs {
+ // this is unlikely to happen, but just in case
+ tsNs = m.lastTsNs + 1
+ ts = time.Unix(0, tsNs)
+ }
+ m.lastTsNs = tsNs
logEntry := &filer_pb.LogEntry{
- TsNs: ts.UnixNano(),
+ TsNs: tsNs,
PartitionKeyHash: util.HashToInt32(partitionKey),
Data: data,
}
@@ -98,13 +106,14 @@ func (m *LogBuffer) AddToBuffer(partitionKey, data []byte) {
}
func (m *LogBuffer) Shutdown() {
+ m.Lock()
+ defer m.Unlock()
+
if m.isStopping {
return
}
m.isStopping = true
- m.Lock()
toFlush := m.copyToFlush()
- m.Unlock()
m.flushChan <- toFlush
close(m.flushChan)
}
@@ -123,10 +132,14 @@ func (m *LogBuffer) loopInterval() {
for !m.isStopping {
time.Sleep(m.flushInterval)
m.Lock()
+ if m.isStopping {
+ m.Unlock()
+ return
+ }
// println("loop interval")
toFlush := m.copyToFlush()
- m.Unlock()
m.flushChan <- toFlush
+ m.Unlock()
}
}
diff --git a/weed/util/log_buffer/log_read.go b/weed/util/log_buffer/log_read.go
index 6339d9d77..2b73a8064 100644
--- a/weed/util/log_buffer/log_read.go
+++ b/weed/util/log_buffer/log_read.go
@@ -12,8 +12,9 @@ import (
)
func (logBuffer *LogBuffer) LoopProcessLogData(
- startTreadTime time.Time, waitForDataFn func() bool,
- eachLogDataFn func(logEntry *filer_pb.LogEntry) error) (processed int64, err error) {
+ startTreadTime time.Time,
+ waitForDataFn func() bool,
+ eachLogDataFn func(logEntry *filer_pb.LogEntry) error) (err error) {
// loop through all messages
var bytesBuf *bytes.Buffer
lastReadTime := startTreadTime
@@ -29,6 +30,7 @@ func (logBuffer *LogBuffer) LoopProcessLogData(
logBuffer.ReleaseMeory(bytesBuf)
}
bytesBuf = logBuffer.ReadFromBuffer(lastReadTime)
+ // fmt.Printf("ReadFromBuffer by %v\n", lastReadTime)
if bytesBuf == nil {
if waitForDataFn() {
continue
@@ -38,6 +40,7 @@ func (logBuffer *LogBuffer) LoopProcessLogData(
}
buf := bytesBuf.Bytes()
+ // fmt.Printf("ReadFromBuffer by %v size %d\n", lastReadTime, len(buf))
batchSize := 0
var startReadTime time.Time
@@ -66,7 +69,6 @@ func (logBuffer *LogBuffer) LoopProcessLogData(
pos += 4 + int(size)
batchSize++
- processed++
}
// fmt.Printf("sent message ts[%d,%d] size %d\n", startReadTime.UnixNano(), lastReadTime.UnixNano(), batchSize)
diff --git a/weed/util/log_buffer/sealed_buffer.go b/weed/util/log_buffer/sealed_buffer.go
index cade45dd1..d133cf8d3 100644
--- a/weed/util/log_buffer/sealed_buffer.go
+++ b/weed/util/log_buffer/sealed_buffer.go
@@ -59,4 +59,4 @@ func (mb *MemBuffer) locateByTs(lastReadTime time.Time) (pos int) {
func (mb *MemBuffer) String() string {
return fmt.Sprintf("[%v,%v] bytes:%d", mb.startTime, mb.stopTime, mb.size)
-} \ No newline at end of file
+}