diff options
| author | chrislu <chris.lu@gmail.com> | 2024-02-29 09:38:52 -0800 |
|---|---|---|
| committer | chrislu <chris.lu@gmail.com> | 2024-02-29 09:38:52 -0800 |
| commit | 1b4484bf0a63f66935c9d0d12fda66d619195542 (patch) | |
| tree | 410ab9eefe80fe861a7f91ed287222c8db20fa7a /weed/util/log_buffer/log_buffer.go | |
| parent | 2a7028373d653d40428410a78dcb291a168ccac6 (diff) | |
| download | seaweedfs-1b4484bf0a63f66935c9d0d12fda66d619195542.tar.xz seaweedfs-1b4484bf0a63f66935c9d0d12fda66d619195542.zip | |
go fmt
Diffstat (limited to 'weed/util/log_buffer/log_buffer.go')
| -rw-r--r-- | weed/util/log_buffer/log_buffer.go | 56 |
1 files changed, 28 insertions, 28 deletions
diff --git a/weed/util/log_buffer/log_buffer.go b/weed/util/log_buffer/log_buffer.go index e7dd3dce0..273df5593 100644 --- a/weed/util/log_buffer/log_buffer.go +++ b/weed/util/log_buffer/log_buffer.go @@ -27,39 +27,39 @@ type LogFlushFuncType func(startTime, stopTime time.Time, buf []byte) type LogReadFromDiskFuncType func(startPosition MessagePosition, stopTsNs int64, eachLogEntryFn EachLogEntryFuncType) (lastReadPosition MessagePosition, isDone bool, err error) type LogBuffer struct { - name string - prevBuffers *SealedBuffers - buf []byte - batchIndex int64 - idx []int - pos int - startTime time.Time - stopTime time.Time - lastFlushTime time.Time - sizeBuf []byte - flushInterval time.Duration + name string + prevBuffers *SealedBuffers + buf []byte + batchIndex int64 + idx []int + pos int + startTime time.Time + stopTime time.Time + lastFlushTime time.Time + sizeBuf []byte + flushInterval time.Duration flushFn LogFlushFuncType ReadFromDiskFn LogReadFromDiskFuncType notifyFn func() - isStopping *atomic.Bool - flushChan chan *dataToFlush - lastTsNs int64 + isStopping *atomic.Bool + flushChan chan *dataToFlush + lastTsNs int64 sync.RWMutex } func NewLogBuffer(name string, flushInterval time.Duration, flushFn LogFlushFuncType, readFromDiskFn LogReadFromDiskFuncType, notifyFn func()) *LogBuffer { lb := &LogBuffer{ - name: name, - prevBuffers: newSealedBuffers(PreviousBufferCount), - buf: make([]byte, BufferSize), - sizeBuf: make([]byte, 4), - flushInterval: flushInterval, - flushFn: flushFn, + name: name, + prevBuffers: newSealedBuffers(PreviousBufferCount), + buf: make([]byte, BufferSize), + sizeBuf: make([]byte, 4), + flushInterval: flushInterval, + flushFn: flushFn, ReadFromDiskFn: readFromDiskFn, - notifyFn: notifyFn, - flushChan: make(chan *dataToFlush, 256), - isStopping: new(atomic.Bool), + notifyFn: notifyFn, + flushChan: make(chan *dataToFlush, 256), + isStopping: new(atomic.Bool), } go lb.loopFlush() go lb.loopInterval() @@ -199,10 +199,10 @@ func (logBuffer *LogBuffer) copyToFlush() *dataToFlush { return nil } -func (logBuffer *LogBuffer) GetEarliestTime() time.Time{ +func (logBuffer *LogBuffer) GetEarliestTime() time.Time { return logBuffer.startTime } -func (logBuffer *LogBuffer) GetEarliestPosition() MessagePosition{ +func (logBuffer *LogBuffer) GetEarliestPosition() MessagePosition { return MessagePosition{ Time: logBuffer.startTime, BatchIndex: logBuffer.batchIndex, @@ -241,8 +241,8 @@ func (logBuffer *LogBuffer) ReadFromBuffer(lastReadPosition MessagePosition) (bu } if tsMemory.IsZero() { // case 2.2 println("2.2 no data") - return nil, -2,nil - } else if lastReadPosition.Before(tsMemory) && lastReadPosition.BatchIndex +1 < tsBatchIndex { // case 2.3 + return nil, -2, nil + } else if lastReadPosition.Before(tsMemory) && lastReadPosition.BatchIndex+1 < tsBatchIndex { // case 2.3 if !logBuffer.lastFlushTime.IsZero() { glog.V(0).Infof("resume with last flush time: %v", logBuffer.lastFlushTime) return nil, -2, ResumeFromDiskError @@ -273,7 +273,7 @@ func (logBuffer *LogBuffer) ReadFromBuffer(lastReadPosition MessagePosition) (bu } } // glog.V(4).Infof("%s return the current buf %v", m.name, lastReadPosition) - return copiedBytes(logBuffer.buf[:logBuffer.pos]), logBuffer.batchIndex,nil + return copiedBytes(logBuffer.buf[:logBuffer.pos]), logBuffer.batchIndex, nil } lastTs := lastReadPosition.UnixNano() |
