aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorChris Lu <chris.lu@gmail.com>2020-04-20 17:28:18 -0700
committerChris Lu <chris.lu@gmail.com>2020-04-20 17:28:18 -0700
commit258fba8a0f9c449b2aa7582a7e19159e3230b1a8 (patch)
tree8d78fea91293f777645837de4f9a31955f8b012e
parent4bf959edf0195bdd80fc268f795f6e9710e0b269 (diff)
downloadseaweedfs-258fba8a0f9c449b2aa7582a7e19159e3230b1a8.tar.xz
seaweedfs-258fba8a0f9c449b2aa7582a7e19159e3230b1a8.zip
continue for reading from sealed memory buffer
-rw-r--r--weed/util/log_buffer/log_buffer.go36
1 files changed, 25 insertions, 11 deletions
diff --git a/weed/util/log_buffer/log_buffer.go b/weed/util/log_buffer/log_buffer.go
index f84a58c74..e733ddc75 100644
--- a/weed/util/log_buffer/log_buffer.go
+++ b/weed/util/log_buffer/log_buffer.go
@@ -92,6 +92,9 @@ func (m *LogBuffer) AddToBuffer(partitionKey, data []byte) {
copy(m.buf[m.pos:m.pos+4], m.sizeBuf)
copy(m.buf[m.pos+4:m.pos+4+size], logEntryData)
m.pos += size + 4
+
+ // fmt.Printf("entry size %d total %d count %d\n", size, m.pos, len(m.idx))
+
}
func (m *LogBuffer) Shutdown() {
@@ -117,16 +120,12 @@ func (m *LogBuffer) loopFlush() {
func (m *LogBuffer) loopInterval() {
for !m.isStopping {
+ time.Sleep(m.flushInterval)
m.Lock()
+ // println("loop interval")
toFlush := m.copyToFlush()
m.Unlock()
m.flushChan <- toFlush
- time.Sleep(m.flushInterval)
- if m.notifyFn != nil {
- // check whether blocked clients are already disconnected
- println("notifying log buffer readers")
- m.notifyFn()
- }
}
}
@@ -139,7 +138,8 @@ func (m *LogBuffer) copyToFlush() *dataToFlush {
stopTime: m.stopTime,
data: copiedBytes(m.buf[:m.pos]),
}
- m.buf = m.prevBuffers.SealBuffer(m.startTime, m.stopTime, m.buf)
+ // fmt.Printf("flusing [0,%d) with %d entries\n", m.pos, len(m.idx))
+ m.buf = m.prevBuffers.SealBuffer(m.startTime, m.stopTime, m.buf, m.pos)
m.pos = 0
m.idx = m.idx[:0]
return d
@@ -166,6 +166,20 @@ func (m *LogBuffer) ReadFromBuffer(lastReadTime time.Time) (bufferCopy *bytes.Bu
return nil
}
if lastReadTime.Before(m.startTime) {
+ // println("checking ", lastReadTime.UnixNano())
+ for i, buf := range m.prevBuffers.buffers {
+ if buf.startTime.After(lastReadTime) {
+ if i == 0 {
+ println("return the earliest in memory")
+ return copiedBytes(buf.buf[:buf.size])
+ }
+ return copiedBytes(buf.buf[:buf.size])
+ }
+ if !buf.startTime.After(lastReadTime) && buf.stopTime.After(lastReadTime) {
+ pos := buf.locateByTs(lastReadTime)
+ return copiedBytes(buf.buf[pos:])
+ }
+ }
return copiedBytes(m.buf[:m.pos])
}
@@ -227,16 +241,16 @@ func copiedBytes(buf []byte) (copied *bytes.Buffer) {
return
}
-func readTs(buf []byte, pos int) (*filer_pb.LogEntry, int64) {
+func readTs(buf []byte, pos int) (size int, ts int64) {
- size := util.BytesToUint32(buf[pos : pos+4])
- entryData := buf[pos+4 : pos+4+int(size)]
+ size = int(util.BytesToUint32(buf[pos : pos+4]))
+ entryData := buf[pos+4 : pos+4+size]
logEntry := &filer_pb.LogEntry{}
err := proto.Unmarshal(entryData, logEntry)
if err != nil {
glog.Fatalf("unexpected unmarshal filer_pb.LogEntry: %v", err)
}
- return logEntry, logEntry.TsNs
+ return size, logEntry.TsNs
}