From ed5468259867502facd8df0e3e90574421abfc94 Mon Sep 17 00:00:00 2001 From: Chris Lu Date: Sat, 11 Apr 2020 12:43:17 -0700 Subject: refactoring --- weed/util/log_buffer/log_buffer.go | 220 +++++++++++++++++++++++++++++++++++++ 1 file changed, 220 insertions(+) create mode 100644 weed/util/log_buffer/log_buffer.go (limited to 'weed/util/log_buffer/log_buffer.go') diff --git a/weed/util/log_buffer/log_buffer.go b/weed/util/log_buffer/log_buffer.go new file mode 100644 index 000000000..3b3396128 --- /dev/null +++ b/weed/util/log_buffer/log_buffer.go @@ -0,0 +1,220 @@ +package log_buffer + +import ( + "sync" + "time" + + "github.com/golang/protobuf/proto" + + "github.com/chrislusf/seaweedfs/weed/glog" + "github.com/chrislusf/seaweedfs/weed/pb/filer_pb" + "github.com/chrislusf/seaweedfs/weed/util" +) + +const BufferSize = 4 * 1024 * 1024 +const PreviousBufferCount = 3 + +type dataToFlush struct { + startTime time.Time + stopTime time.Time + data []byte +} + +type LogBuffer struct { + prevBuffers *SealedBuffers + buf []byte + idx []int + pos int + startTime time.Time + stopTime time.Time + sizeBuf []byte + flushInterval time.Duration + flushFn func(startTime, stopTime time.Time, buf []byte) + notifyFn func() + isStopping bool + flushChan chan *dataToFlush + sync.RWMutex +} + +func NewLogBuffer(flushInterval time.Duration, flushFn func(startTime, stopTime time.Time, buf []byte), notifyFn func()) *LogBuffer { + lb := &LogBuffer{ + prevBuffers: newSealedBuffers(PreviousBufferCount), + buf: make([]byte, BufferSize), + sizeBuf: make([]byte, 4), + flushInterval: flushInterval, + flushFn: flushFn, + notifyFn: notifyFn, + flushChan: make(chan *dataToFlush, 256), + } + go lb.loopFlush() + go lb.loopInterval() + return lb +} + +func (m *LogBuffer) AddToBuffer(key, data []byte) { + + m.Lock() + defer func() { + m.Unlock() + if m.notifyFn != nil { + m.notifyFn() + } + }() + + // need to put the timestamp inside the lock + ts := time.Now() + logEntry := &filer_pb.LogEntry{ + TsNs: ts.UnixNano(), + PartitionKeyHash: util.HashToInt32(key), + Data: data, + } + + logEntryData, _ := proto.Marshal(logEntry) + + size := len(logEntryData) + + if m.pos == 0 { + m.startTime = ts + } + + if m.startTime.Add(m.flushInterval).Before(ts) || len(m.buf)-m.pos < size+4 { + m.flushChan <- m.copyToFlush() + m.startTime = ts + if len(m.buf) < size+4 { + m.buf = make([]byte, 2*size+4) + } + } + m.stopTime = ts + + m.idx = append(m.idx, m.pos) + util.Uint32toBytes(m.sizeBuf, uint32(size)) + copy(m.buf[m.pos:m.pos+4], m.sizeBuf) + copy(m.buf[m.pos+4:m.pos+4+size], logEntryData) + m.pos += size + 4 +} + +func (m *LogBuffer) Shutdown() { + if m.isStopping { + return + } + m.isStopping = true + m.Lock() + toFlush := m.copyToFlush() + m.Unlock() + m.flushChan <- toFlush + close(m.flushChan) +} + +func (m *LogBuffer) loopFlush() { + for d := range m.flushChan { + if d != nil { + m.flushFn(d.startTime, d.stopTime, d.data) + } + } +} + +func (m *LogBuffer) loopInterval() { + for !m.isStopping { + m.Lock() + toFlush := m.copyToFlush() + m.Unlock() + m.flushChan <- toFlush + time.Sleep(m.flushInterval) + } +} + +func (m *LogBuffer) copyToFlush() *dataToFlush { + + if m.flushFn != nil && m.pos > 0 { + // fmt.Printf("flush buffer %d pos %d empty space %d\n", len(m.buf), m.pos, len(m.buf)-m.pos) + d := &dataToFlush{ + startTime: m.startTime, + stopTime: m.stopTime, + data: copiedBytes(m.buf[:m.pos]), + } + m.buf = m.prevBuffers.SealBuffer(m.startTime, m.stopTime, m.buf) + m.pos = 0 + m.idx = m.idx[:0] + return d + } + return nil +} + +func (m *LogBuffer) ReadFromBuffer(lastReadTime time.Time) (ts time.Time, bufferCopy []byte) { + m.RLock() + defer m.RUnlock() + + // fmt.Printf("read from buffer: %v\n", lastReadTime) + + if lastReadTime.Equal(m.stopTime) { + return lastReadTime, nil + } + if lastReadTime.After(m.stopTime) { + // glog.Fatalf("unexpected last read time %v, older than latest %v", lastReadTime, m.stopTime) + return lastReadTime, nil + } + if lastReadTime.Before(m.startTime) { + return m.stopTime, copiedBytes(m.buf[:m.pos]) + } + + lastTs := lastReadTime.UnixNano() + l, h := 0, len(m.idx)-1 + + /* + for i, pos := range m.idx { + logEntry, ts := readTs(m.buf, pos) + event := &filer_pb.FullEventNotification{} + proto.Unmarshal(logEntry.Data, event) + entry := event.EventNotification.OldEntry + if entry == nil { + entry = event.EventNotification.NewEntry + } + fmt.Printf("entry %d ts: %v offset:%d dir:%s name:%s\n", i, time.Unix(0, ts), pos, event.Directory, entry.Name) + } + fmt.Printf("l=%d, h=%d\n", l, h) + */ + + for l <= h { + mid := (l + h) / 2 + pos := m.idx[mid] + _, t := readTs(m.buf, m.idx[mid]) + if t <= lastTs { + l = mid + 1 + } else if lastTs < t { + var prevT int64 + if mid > 0 { + _, prevT = readTs(m.buf, m.idx[mid-1]) + } + if prevT <= lastTs { + // println("found mid = ", mid) + return time.Unix(0, t), copiedBytes(m.buf[pos:m.pos]) + } + h = mid - 1 + } + // fmt.Printf("l=%d, h=%d\n", l, h) + } + + // FIXME: this could be that the buffer has been flushed already + // println("not found") + return lastReadTime, nil + +} +func copiedBytes(buf []byte) (copied []byte) { + copied = make([]byte, len(buf)) + copy(copied, buf) + return +} + +func readTs(buf []byte, pos int) (*filer_pb.LogEntry, int64) { + + size := util.BytesToUint32(buf[pos : pos+4]) + entryData := buf[pos+4 : pos+4+int(size)] + logEntry := &filer_pb.LogEntry{} + + err := proto.Unmarshal(entryData, logEntry) + if err != nil { + glog.Fatalf("unexpected unmarshal filer_pb.LogEntry: %v", err) + } + return logEntry, logEntry.TsNs + +} -- cgit v1.2.3 From 7764e0465ce976bb528c27bb9aa25857102570ef Mon Sep 17 00:00:00 2001 From: Chris Lu Date: Sun, 12 Apr 2020 21:00:55 -0700 Subject: refactoring --- weed/util/log_buffer/log_buffer.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) (limited to 'weed/util/log_buffer/log_buffer.go') diff --git a/weed/util/log_buffer/log_buffer.go b/weed/util/log_buffer/log_buffer.go index 3b3396128..04fabdf8c 100644 --- a/weed/util/log_buffer/log_buffer.go +++ b/weed/util/log_buffer/log_buffer.go @@ -163,7 +163,7 @@ func (m *LogBuffer) ReadFromBuffer(lastReadTime time.Time) (ts time.Time, buffer /* for i, pos := range m.idx { logEntry, ts := readTs(m.buf, pos) - event := &filer_pb.FullEventNotification{} + event := &filer_pb.SubscribeMetadataResponse{} proto.Unmarshal(logEntry.Data, event) entry := event.EventNotification.OldEntry if entry == nil { -- cgit v1.2.3 From f5a748d33c52a2874dbde92746a03140ef379296 Mon Sep 17 00:00:00 2001 From: Chris Lu Date: Thu, 16 Apr 2020 02:55:09 -0700 Subject: refactoring --- weed/util/log_buffer/log_buffer.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) (limited to 'weed/util/log_buffer/log_buffer.go') diff --git a/weed/util/log_buffer/log_buffer.go b/weed/util/log_buffer/log_buffer.go index 04fabdf8c..c7cb90549 100644 --- a/weed/util/log_buffer/log_buffer.go +++ b/weed/util/log_buffer/log_buffer.go @@ -51,7 +51,7 @@ func NewLogBuffer(flushInterval time.Duration, flushFn func(startTime, stopTime return lb } -func (m *LogBuffer) AddToBuffer(key, data []byte) { +func (m *LogBuffer) AddToBuffer(partitionKey, data []byte) { m.Lock() defer func() { @@ -65,7 +65,7 @@ func (m *LogBuffer) AddToBuffer(key, data []byte) { ts := time.Now() logEntry := &filer_pb.LogEntry{ TsNs: ts.UnixNano(), - PartitionKeyHash: util.HashToInt32(key), + PartitionKeyHash: util.HashToInt32(partitionKey), Data: data, } -- cgit v1.2.3 From ce3cb25cfbf30a06348386210f72cc51c3fbd13a Mon Sep 17 00:00:00 2001 From: Chris Lu Date: Sun, 19 Apr 2020 23:37:04 -0700 Subject: working for in memory single log buffer --- weed/util/log_buffer/log_buffer.go | 49 +++++++++++++++++++++++++------------- 1 file changed, 33 insertions(+), 16 deletions(-) (limited to 'weed/util/log_buffer/log_buffer.go') diff --git a/weed/util/log_buffer/log_buffer.go b/weed/util/log_buffer/log_buffer.go index c7cb90549..d875dd54b 100644 --- a/weed/util/log_buffer/log_buffer.go +++ b/weed/util/log_buffer/log_buffer.go @@ -1,6 +1,7 @@ package log_buffer import ( + "bytes" "sync" "time" @@ -17,7 +18,7 @@ const PreviousBufferCount = 3 type dataToFlush struct { startTime time.Time stopTime time.Time - data []byte + data *bytes.Buffer } type LogBuffer struct { @@ -108,7 +109,8 @@ func (m *LogBuffer) Shutdown() { func (m *LogBuffer) loopFlush() { for d := range m.flushChan { if d != nil { - m.flushFn(d.startTime, d.stopTime, d.data) + m.flushFn(d.startTime, d.stopTime, d.data.Bytes()) + d.releaseMemory() } } } @@ -140,21 +142,26 @@ func (m *LogBuffer) copyToFlush() *dataToFlush { return nil } -func (m *LogBuffer) ReadFromBuffer(lastReadTime time.Time) (ts time.Time, bufferCopy []byte) { +func (d *dataToFlush) releaseMemory() { + d.data.Reset() + bufferPool.Put(d.data) +} + +func (m *LogBuffer) ReadFromBuffer(lastReadTime time.Time) (bufferCopy *bytes.Buffer) { m.RLock() defer m.RUnlock() - // fmt.Printf("read from buffer: %v\n", lastReadTime) + // fmt.Printf("read from buffer: %v last stop time: %v\n", lastReadTime.UnixNano(), m.stopTime.UnixNano()) if lastReadTime.Equal(m.stopTime) { - return lastReadTime, nil + return nil } if lastReadTime.After(m.stopTime) { // glog.Fatalf("unexpected last read time %v, older than latest %v", lastReadTime, m.stopTime) - return lastReadTime, nil + return nil } if lastReadTime.Before(m.startTime) { - return m.stopTime, copiedBytes(m.buf[:m.pos]) + return copiedBytes(m.buf[:m.pos]) } lastTs := lastReadTime.UnixNano() @@ -177,7 +184,7 @@ func (m *LogBuffer) ReadFromBuffer(lastReadTime time.Time) (ts time.Time, buffer for l <= h { mid := (l + h) / 2 pos := m.idx[mid] - _, t := readTs(m.buf, m.idx[mid]) + _, t := readTs(m.buf, pos) if t <= lastTs { l = mid + 1 } else if lastTs < t { @@ -186,22 +193,32 @@ func (m *LogBuffer) ReadFromBuffer(lastReadTime time.Time) (ts time.Time, buffer _, prevT = readTs(m.buf, m.idx[mid-1]) } if prevT <= lastTs { - // println("found mid = ", mid) - return time.Unix(0, t), copiedBytes(m.buf[pos:m.pos]) + // fmt.Printf("found l=%d, m-1=%d(ts=%d), m=%d(ts=%d), h=%d [%d, %d) \n", l, mid-1, prevT, mid, t, h, pos, m.pos) + return copiedBytes(m.buf[pos:m.pos]) } - h = mid - 1 + h = mid } // fmt.Printf("l=%d, h=%d\n", l, h) } // FIXME: this could be that the buffer has been flushed already - // println("not found") - return lastReadTime, nil + return nil } -func copiedBytes(buf []byte) (copied []byte) { - copied = make([]byte, len(buf)) - copy(copied, buf) +func (m *LogBuffer) ReleaseMeory(b *bytes.Buffer) { + b.Reset() + bufferPool.Put(b) +} + +var bufferPool = sync.Pool{ + New: func() interface{} { + return new(bytes.Buffer) + }, +} + +func copiedBytes(buf []byte) (copied *bytes.Buffer) { + copied = bufferPool.Get().(*bytes.Buffer) + copied.Write(buf) return } -- cgit v1.2.3 From 4bf959edf0195bdd80fc268f795f6e9710e0b269 Mon Sep 17 00:00:00 2001 From: Chris Lu Date: Mon, 20 Apr 2020 17:26:38 -0700 Subject: message broker: read also from sealed memory buffer --- weed/util/log_buffer/log_buffer.go | 5 +++++ 1 file changed, 5 insertions(+) (limited to 'weed/util/log_buffer/log_buffer.go') diff --git a/weed/util/log_buffer/log_buffer.go b/weed/util/log_buffer/log_buffer.go index d875dd54b..f84a58c74 100644 --- a/weed/util/log_buffer/log_buffer.go +++ b/weed/util/log_buffer/log_buffer.go @@ -122,6 +122,11 @@ func (m *LogBuffer) loopInterval() { m.Unlock() m.flushChan <- toFlush time.Sleep(m.flushInterval) + if m.notifyFn != nil { + // check whether blocked clients are already disconnected + println("notifying log buffer readers") + m.notifyFn() + } } } -- cgit v1.2.3 From 258fba8a0f9c449b2aa7582a7e19159e3230b1a8 Mon Sep 17 00:00:00 2001 From: Chris Lu Date: Mon, 20 Apr 2020 17:28:18 -0700 Subject: continue for reading from sealed memory buffer --- weed/util/log_buffer/log_buffer.go | 36 +++++++++++++++++++++++++----------- 1 file changed, 25 insertions(+), 11 deletions(-) (limited to 'weed/util/log_buffer/log_buffer.go') diff --git a/weed/util/log_buffer/log_buffer.go b/weed/util/log_buffer/log_buffer.go index f84a58c74..e733ddc75 100644 --- a/weed/util/log_buffer/log_buffer.go +++ b/weed/util/log_buffer/log_buffer.go @@ -92,6 +92,9 @@ func (m *LogBuffer) AddToBuffer(partitionKey, data []byte) { copy(m.buf[m.pos:m.pos+4], m.sizeBuf) copy(m.buf[m.pos+4:m.pos+4+size], logEntryData) m.pos += size + 4 + + // fmt.Printf("entry size %d total %d count %d\n", size, m.pos, len(m.idx)) + } func (m *LogBuffer) Shutdown() { @@ -117,16 +120,12 @@ func (m *LogBuffer) loopFlush() { func (m *LogBuffer) loopInterval() { for !m.isStopping { + time.Sleep(m.flushInterval) m.Lock() + // println("loop interval") toFlush := m.copyToFlush() m.Unlock() m.flushChan <- toFlush - time.Sleep(m.flushInterval) - if m.notifyFn != nil { - // check whether blocked clients are already disconnected - println("notifying log buffer readers") - m.notifyFn() - } } } @@ -139,7 +138,8 @@ func (m *LogBuffer) copyToFlush() *dataToFlush { stopTime: m.stopTime, data: copiedBytes(m.buf[:m.pos]), } - m.buf = m.prevBuffers.SealBuffer(m.startTime, m.stopTime, m.buf) + // fmt.Printf("flusing [0,%d) with %d entries\n", m.pos, len(m.idx)) + m.buf = m.prevBuffers.SealBuffer(m.startTime, m.stopTime, m.buf, m.pos) m.pos = 0 m.idx = m.idx[:0] return d @@ -166,6 +166,20 @@ func (m *LogBuffer) ReadFromBuffer(lastReadTime time.Time) (bufferCopy *bytes.Bu return nil } if lastReadTime.Before(m.startTime) { + // println("checking ", lastReadTime.UnixNano()) + for i, buf := range m.prevBuffers.buffers { + if buf.startTime.After(lastReadTime) { + if i == 0 { + println("return the earliest in memory") + return copiedBytes(buf.buf[:buf.size]) + } + return copiedBytes(buf.buf[:buf.size]) + } + if !buf.startTime.After(lastReadTime) && buf.stopTime.After(lastReadTime) { + pos := buf.locateByTs(lastReadTime) + return copiedBytes(buf.buf[pos:]) + } + } return copiedBytes(m.buf[:m.pos]) } @@ -227,16 +241,16 @@ func copiedBytes(buf []byte) (copied *bytes.Buffer) { return } -func readTs(buf []byte, pos int) (*filer_pb.LogEntry, int64) { +func readTs(buf []byte, pos int) (size int, ts int64) { - size := util.BytesToUint32(buf[pos : pos+4]) - entryData := buf[pos+4 : pos+4+int(size)] + size = int(util.BytesToUint32(buf[pos : pos+4])) + entryData := buf[pos+4 : pos+4+size] logEntry := &filer_pb.LogEntry{} err := proto.Unmarshal(entryData, logEntry) if err != nil { glog.Fatalf("unexpected unmarshal filer_pb.LogEntry: %v", err) } - return logEntry, logEntry.TsNs + return size, logEntry.TsNs } -- cgit v1.2.3 From 5eb83dfbd349b069dbf1ec0b2cafc160f743e312 Mon Sep 17 00:00:00 2001 From: Chris Lu Date: Mon, 20 Apr 2020 17:43:50 -0700 Subject: add memory buffer size limit --- weed/util/log_buffer/log_buffer.go | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) (limited to 'weed/util/log_buffer/log_buffer.go') diff --git a/weed/util/log_buffer/log_buffer.go b/weed/util/log_buffer/log_buffer.go index e733ddc75..e447aacb7 100644 --- a/weed/util/log_buffer/log_buffer.go +++ b/weed/util/log_buffer/log_buffer.go @@ -170,14 +170,15 @@ func (m *LogBuffer) ReadFromBuffer(lastReadTime time.Time) (bufferCopy *bytes.Bu for i, buf := range m.prevBuffers.buffers { if buf.startTime.After(lastReadTime) { if i == 0 { - println("return the earliest in memory") + // println("return the earliest in memory", buf.startTime.UnixNano()) return copiedBytes(buf.buf[:buf.size]) } return copiedBytes(buf.buf[:buf.size]) } if !buf.startTime.After(lastReadTime) && buf.stopTime.After(lastReadTime) { pos := buf.locateByTs(lastReadTime) - return copiedBytes(buf.buf[pos:]) + // fmt.Printf("locate buffer[%d] pos %d\n", i, pos) + return copiedBytes(buf.buf[pos:buf.size]) } } return copiedBytes(m.buf[:m.pos]) -- cgit v1.2.3 From 5a0986dca093e38e9a74213683801b39736f4362 Mon Sep 17 00:00:00 2001 From: Chris Lu Date: Wed, 29 Apr 2020 02:41:01 -0700 Subject: reset on getting the buffer --- weed/util/log_buffer/log_buffer.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) (limited to 'weed/util/log_buffer/log_buffer.go') diff --git a/weed/util/log_buffer/log_buffer.go b/weed/util/log_buffer/log_buffer.go index e447aacb7..69d663484 100644 --- a/weed/util/log_buffer/log_buffer.go +++ b/weed/util/log_buffer/log_buffer.go @@ -226,7 +226,6 @@ func (m *LogBuffer) ReadFromBuffer(lastReadTime time.Time) (bufferCopy *bytes.Bu } func (m *LogBuffer) ReleaseMeory(b *bytes.Buffer) { - b.Reset() bufferPool.Put(b) } @@ -238,6 +237,7 @@ var bufferPool = sync.Pool{ func copiedBytes(buf []byte) (copied *bytes.Buffer) { copied = bufferPool.Get().(*bytes.Buffer) + copied.Reset() copied.Write(buf) return } -- cgit v1.2.3 From f9b6178b8f12bd1b34a3756b15d6cee69930b26c Mon Sep 17 00:00:00 2001 From: Chris Lu Date: Thu, 30 Apr 2020 03:05:34 -0700 Subject: log messages --- weed/util/log_buffer/log_buffer.go | 12 ++++++++++-- 1 file changed, 10 insertions(+), 2 deletions(-) (limited to 'weed/util/log_buffer/log_buffer.go') diff --git a/weed/util/log_buffer/log_buffer.go b/weed/util/log_buffer/log_buffer.go index 69d663484..6ba7f3737 100644 --- a/weed/util/log_buffer/log_buffer.go +++ b/weed/util/log_buffer/log_buffer.go @@ -93,7 +93,7 @@ func (m *LogBuffer) AddToBuffer(partitionKey, data []byte) { copy(m.buf[m.pos+4:m.pos+4+size], logEntryData) m.pos += size + 4 - // fmt.Printf("entry size %d total %d count %d\n", size, m.pos, len(m.idx)) + // fmt.Printf("entry size %d total %d count %d, buffer:%p\n", size, m.pos, len(m.idx), m) } @@ -112,6 +112,7 @@ func (m *LogBuffer) Shutdown() { func (m *LogBuffer) loopFlush() { for d := range m.flushChan { if d != nil { + // fmt.Printf("flush [%v, %v] size %d\n", d.startTime, d.stopTime, len(d.data.Bytes())) m.flushFn(d.startTime, d.stopTime, d.data.Bytes()) d.releaseMemory() } @@ -156,7 +157,12 @@ func (m *LogBuffer) ReadFromBuffer(lastReadTime time.Time) (bufferCopy *bytes.Bu m.RLock() defer m.RUnlock() - // fmt.Printf("read from buffer: %v last stop time: %v\n", lastReadTime.UnixNano(), m.stopTime.UnixNano()) + /* + fmt.Printf("read buffer %p: %v last stop time: [%v,%v], pos %d, entries:%d, prevBufs:%d\n", m, lastReadTime, m.startTime, m.stopTime, m.pos, len(m.idx), len(m.prevBuffers.buffers)) + for i, prevBuf := range m.prevBuffers.buffers { + fmt.Printf(" prev %d : %s\n", i, prevBuf.String()) + } + */ if lastReadTime.Equal(m.stopTime) { return nil @@ -173,6 +179,7 @@ func (m *LogBuffer) ReadFromBuffer(lastReadTime time.Time) (bufferCopy *bytes.Bu // println("return the earliest in memory", buf.startTime.UnixNano()) return copiedBytes(buf.buf[:buf.size]) } + // println("return the", i, "th in memory", buf.startTime.UnixNano()) return copiedBytes(buf.buf[:buf.size]) } if !buf.startTime.After(lastReadTime) && buf.stopTime.After(lastReadTime) { @@ -181,6 +188,7 @@ func (m *LogBuffer) ReadFromBuffer(lastReadTime time.Time) (bufferCopy *bytes.Bu return copiedBytes(buf.buf[pos:buf.size]) } } + // println("return the current buf", lastReadTime.UnixNano()) return copiedBytes(m.buf[:m.pos]) } -- cgit v1.2.3 From 6bf3eb69cb9abd02e1d63ecdee485d198a3cab9a Mon Sep 17 00:00:00 2001 From: Chris Lu Date: Sun, 10 May 2020 03:48:35 -0700 Subject: async chan write read, no write for closed chan --- weed/util/log_buffer/log_buffer.go | 11 ++++++++--- 1 file changed, 8 insertions(+), 3 deletions(-) (limited to 'weed/util/log_buffer/log_buffer.go') diff --git a/weed/util/log_buffer/log_buffer.go b/weed/util/log_buffer/log_buffer.go index 6ba7f3737..67c44dc57 100644 --- a/weed/util/log_buffer/log_buffer.go +++ b/weed/util/log_buffer/log_buffer.go @@ -98,13 +98,14 @@ func (m *LogBuffer) AddToBuffer(partitionKey, data []byte) { } func (m *LogBuffer) Shutdown() { + m.Lock() + defer m.Unlock() + if m.isStopping { return } m.isStopping = true - m.Lock() toFlush := m.copyToFlush() - m.Unlock() m.flushChan <- toFlush close(m.flushChan) } @@ -123,10 +124,14 @@ func (m *LogBuffer) loopInterval() { for !m.isStopping { time.Sleep(m.flushInterval) m.Lock() + if m.isStopping { + m.Unlock() + return + } // println("loop interval") toFlush := m.copyToFlush() - m.Unlock() m.flushChan <- toFlush + m.Unlock() } } -- cgit v1.2.3 From 4b7fa31468eb1b8e0af53543a7e760c517faf227 Mon Sep 17 00:00:00 2001 From: Chris Lu Date: Mon, 11 May 2020 01:53:54 -0700 Subject: ensure montonically increasing tsNs --- weed/util/log_buffer/log_buffer.go | 10 +++++++++- 1 file changed, 9 insertions(+), 1 deletion(-) (limited to 'weed/util/log_buffer/log_buffer.go') diff --git a/weed/util/log_buffer/log_buffer.go b/weed/util/log_buffer/log_buffer.go index 67c44dc57..b02c45b52 100644 --- a/weed/util/log_buffer/log_buffer.go +++ b/weed/util/log_buffer/log_buffer.go @@ -34,6 +34,7 @@ type LogBuffer struct { notifyFn func() isStopping bool flushChan chan *dataToFlush + lastTsNs int64 sync.RWMutex } @@ -64,8 +65,15 @@ func (m *LogBuffer) AddToBuffer(partitionKey, data []byte) { // need to put the timestamp inside the lock ts := time.Now() + tsNs := ts.UnixNano() + if m.lastTsNs >= tsNs { + // this is unlikely to happen, but just in case + tsNs = m.lastTsNs + 1 + ts = time.Unix(0, tsNs) + } + m.lastTsNs = tsNs logEntry := &filer_pb.LogEntry{ - TsNs: ts.UnixNano(), + TsNs: tsNs, PartitionKeyHash: util.HashToInt32(partitionKey), Data: data, } -- cgit v1.2.3 From 224103e13b57339db94e426f02bc9e9272f0ee4d Mon Sep 17 00:00:00 2001 From: Chris Lu Date: Sat, 11 Jul 2020 09:12:03 -0700 Subject: aggregated logs has empty flushFn --- weed/util/log_buffer/log_buffer.go | 13 ++++++++----- 1 file changed, 8 insertions(+), 5 deletions(-) (limited to 'weed/util/log_buffer/log_buffer.go') diff --git a/weed/util/log_buffer/log_buffer.go b/weed/util/log_buffer/log_buffer.go index b02c45b52..cb9565fb2 100644 --- a/weed/util/log_buffer/log_buffer.go +++ b/weed/util/log_buffer/log_buffer.go @@ -145,12 +145,15 @@ func (m *LogBuffer) loopInterval() { func (m *LogBuffer) copyToFlush() *dataToFlush { - if m.flushFn != nil && m.pos > 0 { + if m.pos > 0 { // fmt.Printf("flush buffer %d pos %d empty space %d\n", len(m.buf), m.pos, len(m.buf)-m.pos) - d := &dataToFlush{ - startTime: m.startTime, - stopTime: m.stopTime, - data: copiedBytes(m.buf[:m.pos]), + var d *dataToFlush + if m.flushFn != nil { + d = &dataToFlush{ + startTime: m.startTime, + stopTime: m.stopTime, + data: copiedBytes(m.buf[:m.pos]), + } } // fmt.Printf("flusing [0,%d) with %d entries\n", m.pos, len(m.idx)) m.buf = m.prevBuffers.SealBuffer(m.startTime, m.stopTime, m.buf, m.pos) -- cgit v1.2.3 From 0be6863c878643843239e04629a42dfa1a8c62ec Mon Sep 17 00:00:00 2001 From: Chris Lu Date: Wed, 26 Aug 2020 22:40:15 -0700 Subject: rename --- weed/util/log_buffer/log_buffer.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) (limited to 'weed/util/log_buffer/log_buffer.go') diff --git a/weed/util/log_buffer/log_buffer.go b/weed/util/log_buffer/log_buffer.go index cb9565fb2..d066014d1 100644 --- a/weed/util/log_buffer/log_buffer.go +++ b/weed/util/log_buffer/log_buffer.go @@ -249,7 +249,7 @@ func (m *LogBuffer) ReadFromBuffer(lastReadTime time.Time) (bufferCopy *bytes.Bu return nil } -func (m *LogBuffer) ReleaseMeory(b *bytes.Buffer) { +func (m *LogBuffer) ReleaseMemory(b *bytes.Buffer) { bufferPool.Put(b) } -- cgit v1.2.3 From b69cb74c033376d6738ef1537593c2349196bdb6 Mon Sep 17 00:00:00 2001 From: Chris Lu Date: Sat, 29 Aug 2020 17:37:19 -0700 Subject: read meta logs by timestamp pass in event ts when moving logs meta aggregator reads in memory logs only --- weed/util/log_buffer/log_buffer.go | 21 +++++++++++++-------- 1 file changed, 13 insertions(+), 8 deletions(-) (limited to 'weed/util/log_buffer/log_buffer.go') diff --git a/weed/util/log_buffer/log_buffer.go b/weed/util/log_buffer/log_buffer.go index d066014d1..e4310b5c5 100644 --- a/weed/util/log_buffer/log_buffer.go +++ b/weed/util/log_buffer/log_buffer.go @@ -53,7 +53,7 @@ func NewLogBuffer(flushInterval time.Duration, flushFn func(startTime, stopTime return lb } -func (m *LogBuffer) AddToBuffer(partitionKey, data []byte) { +func (m *LogBuffer) AddToBuffer(partitionKey, data []byte, eventTsNs int64) { m.Lock() defer func() { @@ -64,16 +64,21 @@ func (m *LogBuffer) AddToBuffer(partitionKey, data []byte) { }() // need to put the timestamp inside the lock - ts := time.Now() - tsNs := ts.UnixNano() - if m.lastTsNs >= tsNs { + var ts time.Time + if eventTsNs == 0 { + ts = time.Now() + eventTsNs = ts.UnixNano() + } else { + ts = time.Unix(0, eventTsNs) + } + if m.lastTsNs >= eventTsNs { // this is unlikely to happen, but just in case - tsNs = m.lastTsNs + 1 - ts = time.Unix(0, tsNs) + eventTsNs = m.lastTsNs + 1 + ts = time.Unix(0, eventTsNs) } - m.lastTsNs = tsNs + m.lastTsNs = eventTsNs logEntry := &filer_pb.LogEntry{ - TsNs: tsNs, + TsNs: eventTsNs, PartitionKeyHash: util.HashToInt32(partitionKey), Data: data, } -- cgit v1.2.3 From 394513f598f5fac7889086ab6420d6ccf1b8d53a Mon Sep 17 00:00:00 2001 From: Chris Lu Date: Mon, 11 Jan 2021 02:08:55 -0800 Subject: filer: ensure seamless meta data updates --- weed/util/log_buffer/log_buffer.go | 24 +++++++++++++++--------- 1 file changed, 15 insertions(+), 9 deletions(-) (limited to 'weed/util/log_buffer/log_buffer.go') diff --git a/weed/util/log_buffer/log_buffer.go b/weed/util/log_buffer/log_buffer.go index e4310b5c5..f84c674ff 100644 --- a/weed/util/log_buffer/log_buffer.go +++ b/weed/util/log_buffer/log_buffer.go @@ -28,6 +28,7 @@ type LogBuffer struct { pos int startTime time.Time stopTime time.Time + lastFlushTime time.Time sizeBuf []byte flushInterval time.Duration flushFn func(startTime, stopTime time.Time, buf []byte) @@ -129,6 +130,7 @@ func (m *LogBuffer) loopFlush() { // fmt.Printf("flush [%v, %v] size %d\n", d.startTime, d.stopTime, len(d.data.Bytes())) m.flushFn(d.startTime, d.stopTime, d.data.Bytes()) d.releaseMemory() + m.lastFlushTime = d.stopTime } } } @@ -174,10 +176,14 @@ func (d *dataToFlush) releaseMemory() { bufferPool.Put(d.data) } -func (m *LogBuffer) ReadFromBuffer(lastReadTime time.Time) (bufferCopy *bytes.Buffer) { +func (m *LogBuffer) ReadFromBuffer(lastReadTime time.Time) (bufferCopy *bytes.Buffer, err error) { m.RLock() defer m.RUnlock() + if !m.lastFlushTime.IsZero() && m.lastFlushTime.After(lastReadTime) { + return nil, ResumeFromDiskError + } + /* fmt.Printf("read buffer %p: %v last stop time: [%v,%v], pos %d, entries:%d, prevBufs:%d\n", m, lastReadTime, m.startTime, m.stopTime, m.pos, len(m.idx), len(m.prevBuffers.buffers)) for i, prevBuf := range m.prevBuffers.buffers { @@ -186,11 +192,11 @@ func (m *LogBuffer) ReadFromBuffer(lastReadTime time.Time) (bufferCopy *bytes.Bu */ if lastReadTime.Equal(m.stopTime) { - return nil + return nil, nil } if lastReadTime.After(m.stopTime) { // glog.Fatalf("unexpected last read time %v, older than latest %v", lastReadTime, m.stopTime) - return nil + return nil, nil } if lastReadTime.Before(m.startTime) { // println("checking ", lastReadTime.UnixNano()) @@ -198,19 +204,19 @@ func (m *LogBuffer) ReadFromBuffer(lastReadTime time.Time) (bufferCopy *bytes.Bu if buf.startTime.After(lastReadTime) { if i == 0 { // println("return the earliest in memory", buf.startTime.UnixNano()) - return copiedBytes(buf.buf[:buf.size]) + return copiedBytes(buf.buf[:buf.size]), nil } // println("return the", i, "th in memory", buf.startTime.UnixNano()) - return copiedBytes(buf.buf[:buf.size]) + return copiedBytes(buf.buf[:buf.size]), nil } if !buf.startTime.After(lastReadTime) && buf.stopTime.After(lastReadTime) { pos := buf.locateByTs(lastReadTime) // fmt.Printf("locate buffer[%d] pos %d\n", i, pos) - return copiedBytes(buf.buf[pos:buf.size]) + return copiedBytes(buf.buf[pos:buf.size]), nil } } // println("return the current buf", lastReadTime.UnixNano()) - return copiedBytes(m.buf[:m.pos]) + return copiedBytes(m.buf[:m.pos]), nil } lastTs := lastReadTime.UnixNano() @@ -243,7 +249,7 @@ func (m *LogBuffer) ReadFromBuffer(lastReadTime time.Time) (bufferCopy *bytes.Bu } if prevT <= lastTs { // fmt.Printf("found l=%d, m-1=%d(ts=%d), m=%d(ts=%d), h=%d [%d, %d) \n", l, mid-1, prevT, mid, t, h, pos, m.pos) - return copiedBytes(m.buf[pos:m.pos]) + return copiedBytes(m.buf[pos:m.pos]), nil } h = mid } @@ -251,7 +257,7 @@ func (m *LogBuffer) ReadFromBuffer(lastReadTime time.Time) (bufferCopy *bytes.Bu } // FIXME: this could be that the buffer has been flushed already - return nil + return nil, nil } func (m *LogBuffer) ReleaseMemory(b *bytes.Buffer) { -- cgit v1.2.3