aboutsummaryrefslogtreecommitdiff
path: root/weed/util/log_buffer
diff options
context:
space:
mode:
authorhasagi <30975629+LIBA-S@users.noreply.github.com>2020-09-22 21:38:38 +0800
committerGitHub <noreply@github.com>2020-09-22 21:38:38 +0800
commitd7bf2390e2bf4ac55132878faa68119b3558e8e4 (patch)
tree48ede45893c2130d3e039f7fe4af8440835eb02d /weed/util/log_buffer
parent37e964d4bd60a9dd792a9cc24f05eaa05d3766f2 (diff)
parentec5b9f1e91a8609d0e70bf9d26dc0840774153c4 (diff)
downloadseaweedfs-d7bf2390e2bf4ac55132878faa68119b3558e8e4.tar.xz
seaweedfs-d7bf2390e2bf4ac55132878faa68119b3558e8e4.zip
Merge pull request #1 from chrislusf/master
catch up
Diffstat (limited to 'weed/util/log_buffer')
-rw-r--r--weed/util/log_buffer/log_buffer.go23
-rw-r--r--weed/util/log_buffer/log_buffer_test.go2
-rw-r--r--weed/util/log_buffer/log_read.go20
3 files changed, 29 insertions, 16 deletions
diff --git a/weed/util/log_buffer/log_buffer.go b/weed/util/log_buffer/log_buffer.go
index cb9565fb2..e4310b5c5 100644
--- a/weed/util/log_buffer/log_buffer.go
+++ b/weed/util/log_buffer/log_buffer.go
@@ -53,7 +53,7 @@ func NewLogBuffer(flushInterval time.Duration, flushFn func(startTime, stopTime
return lb
}
-func (m *LogBuffer) AddToBuffer(partitionKey, data []byte) {
+func (m *LogBuffer) AddToBuffer(partitionKey, data []byte, eventTsNs int64) {
m.Lock()
defer func() {
@@ -64,16 +64,21 @@ func (m *LogBuffer) AddToBuffer(partitionKey, data []byte) {
}()
// need to put the timestamp inside the lock
- ts := time.Now()
- tsNs := ts.UnixNano()
- if m.lastTsNs >= tsNs {
+ var ts time.Time
+ if eventTsNs == 0 {
+ ts = time.Now()
+ eventTsNs = ts.UnixNano()
+ } else {
+ ts = time.Unix(0, eventTsNs)
+ }
+ if m.lastTsNs >= eventTsNs {
// this is unlikely to happen, but just in case
- tsNs = m.lastTsNs + 1
- ts = time.Unix(0, tsNs)
+ eventTsNs = m.lastTsNs + 1
+ ts = time.Unix(0, eventTsNs)
}
- m.lastTsNs = tsNs
+ m.lastTsNs = eventTsNs
logEntry := &filer_pb.LogEntry{
- TsNs: tsNs,
+ TsNs: eventTsNs,
PartitionKeyHash: util.HashToInt32(partitionKey),
Data: data,
}
@@ -249,7 +254,7 @@ func (m *LogBuffer) ReadFromBuffer(lastReadTime time.Time) (bufferCopy *bytes.Bu
return nil
}
-func (m *LogBuffer) ReleaseMeory(b *bytes.Buffer) {
+func (m *LogBuffer) ReleaseMemory(b *bytes.Buffer) {
bufferPool.Put(b)
}
diff --git a/weed/util/log_buffer/log_buffer_test.go b/weed/util/log_buffer/log_buffer_test.go
index f9ccc95c2..3d77afb18 100644
--- a/weed/util/log_buffer/log_buffer_test.go
+++ b/weed/util/log_buffer/log_buffer_test.go
@@ -23,7 +23,7 @@ func TestNewLogBufferFirstBuffer(t *testing.T) {
var buf = make([]byte, messageSize)
for i := 0; i < messageCount; i++ {
rand.Read(buf)
- lb.AddToBuffer(nil, buf)
+ lb.AddToBuffer(nil, buf, 0)
}
receivedmessageCount := 0
diff --git a/weed/util/log_buffer/log_read.go b/weed/util/log_buffer/log_read.go
index 2b73a8064..57f4b0115 100644
--- a/weed/util/log_buffer/log_read.go
+++ b/weed/util/log_buffer/log_read.go
@@ -2,6 +2,7 @@ package log_buffer
import (
"bytes"
+ "fmt"
"time"
"github.com/golang/protobuf/proto"
@@ -11,23 +12,27 @@ import (
"github.com/chrislusf/seaweedfs/weed/util"
)
+var (
+ ResumeError = fmt.Errorf("resume")
+)
+
func (logBuffer *LogBuffer) LoopProcessLogData(
startTreadTime time.Time,
waitForDataFn func() bool,
- eachLogDataFn func(logEntry *filer_pb.LogEntry) error) (err error) {
+ eachLogDataFn func(logEntry *filer_pb.LogEntry) error) (lastReadTime time.Time, err error) {
// loop through all messages
var bytesBuf *bytes.Buffer
- lastReadTime := startTreadTime
+ lastReadTime = startTreadTime
defer func() {
if bytesBuf != nil {
- logBuffer.ReleaseMeory(bytesBuf)
+ logBuffer.ReleaseMemory(bytesBuf)
}
}()
for {
if bytesBuf != nil {
- logBuffer.ReleaseMeory(bytesBuf)
+ logBuffer.ReleaseMemory(bytesBuf)
}
bytesBuf = logBuffer.ReadFromBuffer(lastReadTime)
// fmt.Printf("ReadFromBuffer by %v\n", lastReadTime)
@@ -48,10 +53,13 @@ func (logBuffer *LogBuffer) LoopProcessLogData(
for pos := 0; pos+4 < len(buf); {
size := util.BytesToUint32(buf[pos : pos+4])
+ if pos+4+int(size) > len(buf) {
+ err = ResumeError
+ glog.Errorf("LoopProcessLogData: read buffer %v read %d [%d,%d) from [0,%d)", lastReadTime, batchSize, pos, pos+int(size)+4, len(buf))
+ return
+ }
entryData := buf[pos+4 : pos+4+int(size)]
- // fmt.Printf("read buffer read %d [%d,%d) from [0,%d)\n", batchSize, pos, pos+int(size)+4, len(buf))
-
logEntry := &filer_pb.LogEntry{}
if err = proto.Unmarshal(entryData, logEntry); err != nil {
glog.Errorf("unexpected unmarshal messaging_pb.Message: %v", err)