aboutsummaryrefslogtreecommitdiff
path: root/weed/util/log_buffer/log_buffer.go
diff options
context:
space:
mode:
authorchrislu <chris.lu@gmail.com>2024-02-29 09:38:52 -0800
committerchrislu <chris.lu@gmail.com>2024-02-29 09:38:52 -0800
commit1b4484bf0a63f66935c9d0d12fda66d619195542 (patch)
tree410ab9eefe80fe861a7f91ed287222c8db20fa7a /weed/util/log_buffer/log_buffer.go
parent2a7028373d653d40428410a78dcb291a168ccac6 (diff)
downloadseaweedfs-1b4484bf0a63f66935c9d0d12fda66d619195542.tar.xz
seaweedfs-1b4484bf0a63f66935c9d0d12fda66d619195542.zip
go fmt
Diffstat (limited to 'weed/util/log_buffer/log_buffer.go')
-rw-r--r--weed/util/log_buffer/log_buffer.go56
1 files changed, 28 insertions, 28 deletions
diff --git a/weed/util/log_buffer/log_buffer.go b/weed/util/log_buffer/log_buffer.go
index e7dd3dce0..273df5593 100644
--- a/weed/util/log_buffer/log_buffer.go
+++ b/weed/util/log_buffer/log_buffer.go
@@ -27,39 +27,39 @@ type LogFlushFuncType func(startTime, stopTime time.Time, buf []byte)
type LogReadFromDiskFuncType func(startPosition MessagePosition, stopTsNs int64, eachLogEntryFn EachLogEntryFuncType) (lastReadPosition MessagePosition, isDone bool, err error)
type LogBuffer struct {
- name string
- prevBuffers *SealedBuffers
- buf []byte
- batchIndex int64
- idx []int
- pos int
- startTime time.Time
- stopTime time.Time
- lastFlushTime time.Time
- sizeBuf []byte
- flushInterval time.Duration
+ name string
+ prevBuffers *SealedBuffers
+ buf []byte
+ batchIndex int64
+ idx []int
+ pos int
+ startTime time.Time
+ stopTime time.Time
+ lastFlushTime time.Time
+ sizeBuf []byte
+ flushInterval time.Duration
flushFn LogFlushFuncType
ReadFromDiskFn LogReadFromDiskFuncType
notifyFn func()
- isStopping *atomic.Bool
- flushChan chan *dataToFlush
- lastTsNs int64
+ isStopping *atomic.Bool
+ flushChan chan *dataToFlush
+ lastTsNs int64
sync.RWMutex
}
func NewLogBuffer(name string, flushInterval time.Duration, flushFn LogFlushFuncType,
readFromDiskFn LogReadFromDiskFuncType, notifyFn func()) *LogBuffer {
lb := &LogBuffer{
- name: name,
- prevBuffers: newSealedBuffers(PreviousBufferCount),
- buf: make([]byte, BufferSize),
- sizeBuf: make([]byte, 4),
- flushInterval: flushInterval,
- flushFn: flushFn,
+ name: name,
+ prevBuffers: newSealedBuffers(PreviousBufferCount),
+ buf: make([]byte, BufferSize),
+ sizeBuf: make([]byte, 4),
+ flushInterval: flushInterval,
+ flushFn: flushFn,
ReadFromDiskFn: readFromDiskFn,
- notifyFn: notifyFn,
- flushChan: make(chan *dataToFlush, 256),
- isStopping: new(atomic.Bool),
+ notifyFn: notifyFn,
+ flushChan: make(chan *dataToFlush, 256),
+ isStopping: new(atomic.Bool),
}
go lb.loopFlush()
go lb.loopInterval()
@@ -199,10 +199,10 @@ func (logBuffer *LogBuffer) copyToFlush() *dataToFlush {
return nil
}
-func (logBuffer *LogBuffer) GetEarliestTime() time.Time{
+func (logBuffer *LogBuffer) GetEarliestTime() time.Time {
return logBuffer.startTime
}
-func (logBuffer *LogBuffer) GetEarliestPosition() MessagePosition{
+func (logBuffer *LogBuffer) GetEarliestPosition() MessagePosition {
return MessagePosition{
Time: logBuffer.startTime,
BatchIndex: logBuffer.batchIndex,
@@ -241,8 +241,8 @@ func (logBuffer *LogBuffer) ReadFromBuffer(lastReadPosition MessagePosition) (bu
}
if tsMemory.IsZero() { // case 2.2
println("2.2 no data")
- return nil, -2,nil
- } else if lastReadPosition.Before(tsMemory) && lastReadPosition.BatchIndex +1 < tsBatchIndex { // case 2.3
+ return nil, -2, nil
+ } else if lastReadPosition.Before(tsMemory) && lastReadPosition.BatchIndex+1 < tsBatchIndex { // case 2.3
if !logBuffer.lastFlushTime.IsZero() {
glog.V(0).Infof("resume with last flush time: %v", logBuffer.lastFlushTime)
return nil, -2, ResumeFromDiskError
@@ -273,7 +273,7 @@ func (logBuffer *LogBuffer) ReadFromBuffer(lastReadPosition MessagePosition) (bu
}
}
// glog.V(4).Infof("%s return the current buf %v", m.name, lastReadPosition)
- return copiedBytes(logBuffer.buf[:logBuffer.pos]), logBuffer.batchIndex,nil
+ return copiedBytes(logBuffer.buf[:logBuffer.pos]), logBuffer.batchIndex, nil
}
lastTs := lastReadPosition.UnixNano()