diff options
| author | Chris Lu <chrislusf@users.noreply.github.com> | 2020-05-17 17:39:16 -0700 |
|---|---|---|
| committer | GitHub <noreply@github.com> | 2020-05-17 17:39:16 -0700 |
| commit | e0e31e67a809d00c99edaa299531c7ce4d4750dc (patch) | |
| tree | 0f890277ef14c748faed4fecb7f8b8d4edeb9849 /weed/util/log_buffer | |
| parent | b4e02ec525a6ec87b26686202307896faf3296a7 (diff) | |
| parent | 081ee6fe349b519da8ea54cf3cdc17d2b15c5a71 (diff) | |
| download | seaweedfs-e0e31e67a809d00c99edaa299531c7ce4d4750dc.tar.xz seaweedfs-e0e31e67a809d00c99edaa299531c7ce4d4750dc.zip | |
Merge pull request #1318 from chrislusf/msg_channel
Add messaging, add channel
Diffstat (limited to 'weed/util/log_buffer')
| -rw-r--r-- | weed/util/log_buffer/log_buffer.go | 21 | ||||
| -rw-r--r-- | weed/util/log_buffer/log_read.go | 8 | ||||
| -rw-r--r-- | weed/util/log_buffer/sealed_buffer.go | 2 |
3 files changed, 23 insertions, 8 deletions
diff --git a/weed/util/log_buffer/log_buffer.go b/weed/util/log_buffer/log_buffer.go index 6ba7f3737..b02c45b52 100644 --- a/weed/util/log_buffer/log_buffer.go +++ b/weed/util/log_buffer/log_buffer.go @@ -34,6 +34,7 @@ type LogBuffer struct { notifyFn func() isStopping bool flushChan chan *dataToFlush + lastTsNs int64 sync.RWMutex } @@ -64,8 +65,15 @@ func (m *LogBuffer) AddToBuffer(partitionKey, data []byte) { // need to put the timestamp inside the lock ts := time.Now() + tsNs := ts.UnixNano() + if m.lastTsNs >= tsNs { + // this is unlikely to happen, but just in case + tsNs = m.lastTsNs + 1 + ts = time.Unix(0, tsNs) + } + m.lastTsNs = tsNs logEntry := &filer_pb.LogEntry{ - TsNs: ts.UnixNano(), + TsNs: tsNs, PartitionKeyHash: util.HashToInt32(partitionKey), Data: data, } @@ -98,13 +106,14 @@ func (m *LogBuffer) AddToBuffer(partitionKey, data []byte) { } func (m *LogBuffer) Shutdown() { + m.Lock() + defer m.Unlock() + if m.isStopping { return } m.isStopping = true - m.Lock() toFlush := m.copyToFlush() - m.Unlock() m.flushChan <- toFlush close(m.flushChan) } @@ -123,10 +132,14 @@ func (m *LogBuffer) loopInterval() { for !m.isStopping { time.Sleep(m.flushInterval) m.Lock() + if m.isStopping { + m.Unlock() + return + } // println("loop interval") toFlush := m.copyToFlush() - m.Unlock() m.flushChan <- toFlush + m.Unlock() } } diff --git a/weed/util/log_buffer/log_read.go b/weed/util/log_buffer/log_read.go index 6339d9d77..2b73a8064 100644 --- a/weed/util/log_buffer/log_read.go +++ b/weed/util/log_buffer/log_read.go @@ -12,8 +12,9 @@ import ( ) func (logBuffer *LogBuffer) LoopProcessLogData( - startTreadTime time.Time, waitForDataFn func() bool, - eachLogDataFn func(logEntry *filer_pb.LogEntry) error) (processed int64, err error) { + startTreadTime time.Time, + waitForDataFn func() bool, + eachLogDataFn func(logEntry *filer_pb.LogEntry) error) (err error) { // loop through all messages var bytesBuf *bytes.Buffer lastReadTime := startTreadTime @@ -29,6 +30,7 @@ func (logBuffer *LogBuffer) LoopProcessLogData( logBuffer.ReleaseMeory(bytesBuf) } bytesBuf = logBuffer.ReadFromBuffer(lastReadTime) + // fmt.Printf("ReadFromBuffer by %v\n", lastReadTime) if bytesBuf == nil { if waitForDataFn() { continue @@ -38,6 +40,7 @@ func (logBuffer *LogBuffer) LoopProcessLogData( } buf := bytesBuf.Bytes() + // fmt.Printf("ReadFromBuffer by %v size %d\n", lastReadTime, len(buf)) batchSize := 0 var startReadTime time.Time @@ -66,7 +69,6 @@ func (logBuffer *LogBuffer) LoopProcessLogData( pos += 4 + int(size) batchSize++ - processed++ } // fmt.Printf("sent message ts[%d,%d] size %d\n", startReadTime.UnixNano(), lastReadTime.UnixNano(), batchSize) diff --git a/weed/util/log_buffer/sealed_buffer.go b/weed/util/log_buffer/sealed_buffer.go index cade45dd1..d133cf8d3 100644 --- a/weed/util/log_buffer/sealed_buffer.go +++ b/weed/util/log_buffer/sealed_buffer.go @@ -59,4 +59,4 @@ func (mb *MemBuffer) locateByTs(lastReadTime time.Time) (pos int) { func (mb *MemBuffer) String() string { return fmt.Sprintf("[%v,%v] bytes:%d", mb.startTime, mb.stopTime, mb.size) -}
\ No newline at end of file +} |
