aboutsummaryrefslogtreecommitdiff
path: root/weed/filer2/filer_notify.go
diff options
context:
space:
mode:
Diffstat (limited to 'weed/filer2/filer_notify.go')
-rw-r--r--weed/filer2/filer_notify.go37
1 files changed, 37 insertions, 0 deletions
diff --git a/weed/filer2/filer_notify.go b/weed/filer2/filer_notify.go
index e808e45f0..402d87313 100644
--- a/weed/filer2/filer_notify.go
+++ b/weed/filer2/filer_notify.go
@@ -77,3 +77,40 @@ func (f *Filer) logFlushFunc(startTime, stopTime time.Time, buf []byte) {
}
}
+func (f *Filer) ReadLogBuffer(lastReadTime time.Time, eachEventFn func(fullpath string, eventNotification *filer_pb.EventNotification) error) (newLastReadTime time.Time, err error) {
+
+ var buf []byte
+ newLastReadTime, buf = f.metaLogBuffer.ReadFromBuffer(lastReadTime)
+
+ for pos := 0; pos+4 < len(buf); {
+
+ size := util.BytesToUint32(buf[pos : pos+4])
+ entryData := buf[pos+4 : pos+4+int(size)]
+
+ logEntry := &filer_pb.LogEntry{}
+ err = proto.Unmarshal(entryData, logEntry)
+ if err != nil {
+ glog.Errorf("unexpected unmarshal filer_pb.LogEntry: %v", err)
+ return lastReadTime, fmt.Errorf("unexpected unmarshal filer_pb.LogEntry: %v", err)
+ }
+
+ event := &filer_pb.FullEventNotification{}
+ err = proto.Unmarshal(logEntry.Data, event)
+ if err != nil {
+ glog.Errorf("unexpected unmarshal filer_pb.FullEventNotification: %v", err)
+ return lastReadTime, fmt.Errorf("unexpected unmarshal filer_pb.FullEventNotification: %v", err)
+ }
+
+ err = eachEventFn(event.Directory, event.EventNotification)
+
+ if err != nil {
+ return
+ }
+
+ pos += 4 + int(size)
+
+ }
+
+ return
+
+}