diff options
| author | Konstantin Lebedev <9497591+kmlebedev@users.noreply.github.com> | 2022-04-05 13:29:46 +0500 |
|---|---|---|
| committer | Konstantin Lebedev <9497591+kmlebedev@users.noreply.github.com> | 2022-04-05 13:29:46 +0500 |
| commit | 17c6e8e39fd3dd2d3220032bd2ef0d537514257d (patch) | |
| tree | 462ede8ea61484114f2bde09b6eeb0e9e37b2ab6 /weed/util | |
| parent | b7cdde14ae44508c59dbf5cf3835a0a68ba1aabb (diff) | |
| parent | 3176bf126ae21395b26d6f6531c05fb571fac54f (diff) | |
| download | seaweedfs-17c6e8e39fd3dd2d3220032bd2ef0d537514257d.tar.xz seaweedfs-17c6e8e39fd3dd2d3220032bd2ef0d537514257d.zip | |
Merge branch 'new_master' into hashicorp_raft
# Conflicts:
# go.mod
# go.sum
Diffstat (limited to 'weed/util')
| -rw-r--r-- | weed/util/constants.go | 2 | ||||
| -rw-r--r-- | weed/util/log_buffer/log_buffer.go | 10 |
2 files changed, 9 insertions, 3 deletions
diff --git a/weed/util/constants.go b/weed/util/constants.go index db2e1e958..41482490c 100644 --- a/weed/util/constants.go +++ b/weed/util/constants.go @@ -5,7 +5,7 @@ import ( ) var ( - VERSION_NUMBER = fmt.Sprintf("%.02f", 2.96) + VERSION_NUMBER = fmt.Sprintf("%.02f", 2.97) VERSION = sizeLimit + " " + VERSION_NUMBER COMMIT = "" ) diff --git a/weed/util/log_buffer/log_buffer.go b/weed/util/log_buffer/log_buffer.go index d5e6cb214..422575193 100644 --- a/weed/util/log_buffer/log_buffer.go +++ b/weed/util/log_buffer/log_buffer.go @@ -58,9 +58,13 @@ func NewLogBuffer(name string, flushInterval time.Duration, flushFn func(startTi func (m *LogBuffer) AddToBuffer(partitionKey, data []byte, processingTsNs int64) { + var toFlush *dataToFlush m.Lock() defer func() { m.Unlock() + if toFlush != nil { + m.flushChan <- toFlush + } if m.notifyFn != nil { m.notifyFn() } @@ -96,7 +100,7 @@ func (m *LogBuffer) AddToBuffer(partitionKey, data []byte, processingTsNs int64) if m.startTime.Add(m.flushInterval).Before(ts) || len(m.buf)-m.pos < size+4 { // glog.V(4).Infof("%s copyToFlush1 start time %v, ts %v, remaining %d bytes", m.name, m.startTime, ts, len(m.buf)-m.pos) - m.flushChan <- m.copyToFlush() + toFlush = m.copyToFlush() m.startTime = ts if len(m.buf) < size+4 { m.buf = make([]byte, 2*size+4) @@ -148,8 +152,10 @@ func (m *LogBuffer) loopInterval() { return } toFlush := m.copyToFlush() - m.flushChan <- toFlush m.Unlock() + if toFlush != nil { + m.flushChan <- toFlush + } } } |
