aboutsummaryrefslogtreecommitdiff
path: root/weed/queue/log_buffer.go
diff options
context:
space:
mode:
Diffstat (limited to 'weed/queue/log_buffer.go')
-rw-r--r--weed/queue/log_buffer.go35
1 files changed, 22 insertions, 13 deletions
diff --git a/weed/queue/log_buffer.go b/weed/queue/log_buffer.go
index 6925757a7..5f8db140d 100644
--- a/weed/queue/log_buffer.go
+++ b/weed/queue/log_buffer.go
@@ -1,8 +1,6 @@
package queue
import (
- "fmt"
- "runtime/debug"
"sync"
"time"
@@ -121,8 +119,7 @@ func (m *LogBuffer) loopInterval() {
func (m *LogBuffer) copyToFlush() *dataToFlush {
if m.flushFn != nil && m.pos > 0 {
- fmt.Printf("flush buffer %d pos %d empty space %d\n", len(m.buf), m.pos, len(m.buf)-m.pos)
- debug.PrintStack()
+ // fmt.Printf("flush buffer %d pos %d empty space %d\n", len(m.buf), m.pos, len(m.buf)-m.pos)
d := &dataToFlush{
startTime: m.startTime,
stopTime: m.stopTime,
@@ -154,26 +151,34 @@ func (m *LogBuffer) ReadFromBuffer(lastReadTime time.Time) (ts time.Time, buffer
lastTs := lastReadTime.UnixNano()
l, h := 0, len(m.idx)-1
+
/*
- for i, pos := range m.idx {
- ts := readTs(m.buf, pos)
- fmt.Printf("entry %d ts: %v offset:%d\n", i, time.Unix(0,ts), pos)
+ for i, pos := range m.idx {
+ logEntry, ts := readTs(m.buf, pos)
+ event := &filer_pb.FullEventNotification{}
+ proto.Unmarshal(logEntry.Data, event)
+ entry := event.EventNotification.OldEntry
+ if entry == nil {
+ entry = event.EventNotification.NewEntry
}
- fmt.Printf("l=%d, h=%d\n", l, h)
- */
+ fmt.Printf("entry %d ts: %v offset:%d dir:%s name:%s\n", i, time.Unix(0, ts), pos, event.Directory, entry.Name)
+ }
+ fmt.Printf("l=%d, h=%d\n", l, h)
+ */
for l <= h {
mid := (l + h) / 2
pos := m.idx[mid]
- t := readTs(m.buf, m.idx[mid])
+ _, t := readTs(m.buf, m.idx[mid])
if t <= lastTs {
l = mid + 1
} else if lastTs < t {
var prevT int64
if mid > 0 {
- prevT = readTs(m.buf, m.idx[mid-1])
+ _, prevT = readTs(m.buf, m.idx[mid-1])
}
if prevT <= lastTs {
+ // println("found mid = ", mid)
return time.Unix(0, t), copiedBytes(m.buf[pos:m.pos])
}
h = mid - 1
@@ -181,6 +186,10 @@ func (m *LogBuffer) ReadFromBuffer(lastReadTime time.Time) (ts time.Time, buffer
// fmt.Printf("l=%d, h=%d\n", l, h)
}
+ // FIXME: this could be that the buffer has been flushed already
+ // println("not found")
+ return lastReadTime, nil
+
}
func copiedBytes(buf []byte) (copied []byte) {
copied = make([]byte, len(buf))
@@ -188,7 +197,7 @@ func copiedBytes(buf []byte) (copied []byte) {
return
}
-func readTs(buf []byte, pos int) int64 {
+func readTs(buf []byte, pos int) (*filer_pb.LogEntry, int64) {
size := util.BytesToUint32(buf[pos : pos+4])
entryData := buf[pos+4 : pos+4+int(size)]
@@ -198,6 +207,6 @@ func readTs(buf []byte, pos int) int64 {
if err != nil {
glog.Fatalf("unexpected unmarshal filer_pb.LogEntry: %v", err)
}
- return logEntry.TsNs
+ return logEntry, logEntry.TsNs
}