diff options
Diffstat (limited to 'weed/queue/log_buffer.go')
| -rw-r--r-- | weed/queue/log_buffer.go | 35 |
1 files changed, 22 insertions, 13 deletions
diff --git a/weed/queue/log_buffer.go b/weed/queue/log_buffer.go index 6925757a7..5f8db140d 100644 --- a/weed/queue/log_buffer.go +++ b/weed/queue/log_buffer.go @@ -1,8 +1,6 @@ package queue import ( - "fmt" - "runtime/debug" "sync" "time" @@ -121,8 +119,7 @@ func (m *LogBuffer) loopInterval() { func (m *LogBuffer) copyToFlush() *dataToFlush { if m.flushFn != nil && m.pos > 0 { - fmt.Printf("flush buffer %d pos %d empty space %d\n", len(m.buf), m.pos, len(m.buf)-m.pos) - debug.PrintStack() + // fmt.Printf("flush buffer %d pos %d empty space %d\n", len(m.buf), m.pos, len(m.buf)-m.pos) d := &dataToFlush{ startTime: m.startTime, stopTime: m.stopTime, @@ -154,26 +151,34 @@ func (m *LogBuffer) ReadFromBuffer(lastReadTime time.Time) (ts time.Time, buffer lastTs := lastReadTime.UnixNano() l, h := 0, len(m.idx)-1 + /* - for i, pos := range m.idx { - ts := readTs(m.buf, pos) - fmt.Printf("entry %d ts: %v offset:%d\n", i, time.Unix(0,ts), pos) + for i, pos := range m.idx { + logEntry, ts := readTs(m.buf, pos) + event := &filer_pb.FullEventNotification{} + proto.Unmarshal(logEntry.Data, event) + entry := event.EventNotification.OldEntry + if entry == nil { + entry = event.EventNotification.NewEntry } - fmt.Printf("l=%d, h=%d\n", l, h) - */ + fmt.Printf("entry %d ts: %v offset:%d dir:%s name:%s\n", i, time.Unix(0, ts), pos, event.Directory, entry.Name) + } + fmt.Printf("l=%d, h=%d\n", l, h) + */ for l <= h { mid := (l + h) / 2 pos := m.idx[mid] - t := readTs(m.buf, m.idx[mid]) + _, t := readTs(m.buf, m.idx[mid]) if t <= lastTs { l = mid + 1 } else if lastTs < t { var prevT int64 if mid > 0 { - prevT = readTs(m.buf, m.idx[mid-1]) + _, prevT = readTs(m.buf, m.idx[mid-1]) } if prevT <= lastTs { + // println("found mid = ", mid) return time.Unix(0, t), copiedBytes(m.buf[pos:m.pos]) } h = mid - 1 @@ -181,6 +186,10 @@ func (m *LogBuffer) ReadFromBuffer(lastReadTime time.Time) (ts time.Time, buffer // fmt.Printf("l=%d, h=%d\n", l, h) } + // FIXME: this could be that the buffer has been flushed already + // println("not found") + return lastReadTime, nil + } func copiedBytes(buf []byte) (copied []byte) { copied = make([]byte, len(buf)) @@ -188,7 +197,7 @@ func copiedBytes(buf []byte) (copied []byte) { return } -func readTs(buf []byte, pos int) int64 { +func readTs(buf []byte, pos int) (*filer_pb.LogEntry, int64) { size := util.BytesToUint32(buf[pos : pos+4]) entryData := buf[pos+4 : pos+4+int(size)] @@ -198,6 +207,6 @@ func readTs(buf []byte, pos int) int64 { if err != nil { glog.Fatalf("unexpected unmarshal filer_pb.LogEntry: %v", err) } - return logEntry.TsNs + return logEntry, logEntry.TsNs } |
