diff options
Diffstat (limited to 'weed/filer2/filer_notify.go')
| -rw-r--r-- | weed/filer2/filer_notify.go | 37 |
1 files changed, 37 insertions, 0 deletions
diff --git a/weed/filer2/filer_notify.go b/weed/filer2/filer_notify.go index e808e45f0..402d87313 100644 --- a/weed/filer2/filer_notify.go +++ b/weed/filer2/filer_notify.go @@ -77,3 +77,40 @@ func (f *Filer) logFlushFunc(startTime, stopTime time.Time, buf []byte) { } } +func (f *Filer) ReadLogBuffer(lastReadTime time.Time, eachEventFn func(fullpath string, eventNotification *filer_pb.EventNotification) error) (newLastReadTime time.Time, err error) { + + var buf []byte + newLastReadTime, buf = f.metaLogBuffer.ReadFromBuffer(lastReadTime) + + for pos := 0; pos+4 < len(buf); { + + size := util.BytesToUint32(buf[pos : pos+4]) + entryData := buf[pos+4 : pos+4+int(size)] + + logEntry := &filer_pb.LogEntry{} + err = proto.Unmarshal(entryData, logEntry) + if err != nil { + glog.Errorf("unexpected unmarshal filer_pb.LogEntry: %v", err) + return lastReadTime, fmt.Errorf("unexpected unmarshal filer_pb.LogEntry: %v", err) + } + + event := &filer_pb.FullEventNotification{} + err = proto.Unmarshal(logEntry.Data, event) + if err != nil { + glog.Errorf("unexpected unmarshal filer_pb.FullEventNotification: %v", err) + return lastReadTime, fmt.Errorf("unexpected unmarshal filer_pb.FullEventNotification: %v", err) + } + + err = eachEventFn(event.Directory, event.EventNotification) + + if err != nil { + return + } + + pos += 4 + int(size) + + } + + return + +} |
