aboutsummaryrefslogtreecommitdiff
path: root/weed/filer2
diff options
context:
space:
mode:
authorChris Lu <chris.lu@gmail.com>2018-08-13 01:20:49 -0700
committerChris Lu <chris.lu@gmail.com>2018-08-13 01:20:49 -0700
commitf036ef8a3c50af3c933dcd96026ca70dc5fd0da3 (patch)
treec4bc38f75396b44476d8cdaad28b6180af3c6291 /weed/filer2
parent75d63db60d1677f2e3350c3ee2b9dbecf931ec1a (diff)
downloadseaweedfs-f036ef8a3c50af3c933dcd96026ca70dc5fd0da3.tar.xz
seaweedfs-f036ef8a3c50af3c933dcd96026ca70dc5fd0da3.zip
add filer notification
Diffstat (limited to 'weed/filer2')
-rw-r--r--weed/filer2/entry_codec.go22
-rw-r--r--weed/filer2/filer.go8
-rw-r--r--weed/filer2/filer_notify.go38
3 files changed, 68 insertions, 0 deletions
diff --git a/weed/filer2/entry_codec.go b/weed/filer2/entry_codec.go
index 671568b75..baa6a9440 100644
--- a/weed/filer2/entry_codec.go
+++ b/weed/filer2/entry_codec.go
@@ -63,3 +63,25 @@ func PbToEntryAttribute(attr *filer_pb.FuseAttributes) Attr {
return t
}
+
+func EqualEntry(a, b *Entry) bool {
+ if a == b {
+ return true
+ }
+ if a == nil && b != nil || a != nil && b == nil {
+ return false
+ }
+ if !proto.Equal(EntryAttributeToPb(a), EntryAttributeToPb(b)) {
+ return false
+ }
+ if len(a.Chunks) != len(b.Chunks) {
+ return false
+ }
+
+ for i := 0; i < len(a.Chunks); i++ {
+ if !proto.Equal(a.Chunks[i], b.Chunks[i]) {
+ return false
+ }
+ }
+ return true
+}
diff --git a/weed/filer2/filer.go b/weed/filer2/filer.go
index 823371a6b..475e79a17 100644
--- a/weed/filer2/filer.go
+++ b/weed/filer2/filer.go
@@ -90,6 +90,9 @@ func (f *Filer) CreateEntry(entry *Entry) error {
if mkdirErr != nil {
return fmt.Errorf("mkdir %s: %v", dirPath, mkdirErr)
}
+
+ f.NotifyUpdateEvent(nil, dirEntry)
+
} else if !dirEntry.IsDirectory() {
return fmt.Errorf("%s is a file", dirPath)
}
@@ -122,6 +125,8 @@ func (f *Filer) CreateEntry(entry *Entry) error {
return fmt.Errorf("insert entry %s: %v", entry.FullPath, err)
}
+ f.NotifyUpdateEvent(oldEntry, entry)
+
f.deleteChunksIfNotNew(oldEntry, entry)
return nil
@@ -170,6 +175,9 @@ func (f *Filer) DeleteEntryMetaAndData(p FullPath, isRecursive bool, shouldDelet
return nil
}
glog.V(0).Infof("deleting entry %v", p)
+
+ f.NotifyUpdateEvent(entry, nil)
+
return f.store.DeleteEntry(p)
}
diff --git a/weed/filer2/filer_notify.go b/weed/filer2/filer_notify.go
new file mode 100644
index 000000000..2ce45db0e
--- /dev/null
+++ b/weed/filer2/filer_notify.go
@@ -0,0 +1,38 @@
+package filer2
+
+import (
+ "github.com/chrislusf/seaweedfs/weed/msgqueue"
+ "github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
+)
+
+func (f *Filer) NotifyUpdateEvent(oldEntry, newEntry *Entry) {
+ var key string
+ if oldEntry != nil {
+ key = string(oldEntry.FullPath)
+ } else if newEntry != nil {
+ key = string(newEntry.FullPath)
+ } else {
+ return
+ }
+
+ msgqueue.Queue.SendMessage(
+ key,
+ &filer_pb.EventNotification{
+ OldEntry: toProtoEntry(oldEntry),
+ NewEntry: toProtoEntry(newEntry),
+ },
+ )
+
+}
+
+func toProtoEntry(entry *Entry) *filer_pb.Entry {
+ if entry == nil {
+ return nil
+ }
+ return &filer_pb.Entry{
+ Name: string(entry.FullPath),
+ IsDirectory: entry.IsDirectory(),
+ Attributes: EntryAttributeToPb(entry),
+ Chunks: entry.Chunks,
+ }
+}