diff options
| author | Chris Lu <chris.lu@gmail.com> | 2018-08-13 01:20:49 -0700 |
|---|---|---|
| committer | Chris Lu <chris.lu@gmail.com> | 2018-08-13 01:20:49 -0700 |
| commit | f036ef8a3c50af3c933dcd96026ca70dc5fd0da3 (patch) | |
| tree | c4bc38f75396b44476d8cdaad28b6180af3c6291 /weed/filer2 | |
| parent | 75d63db60d1677f2e3350c3ee2b9dbecf931ec1a (diff) | |
| download | seaweedfs-f036ef8a3c50af3c933dcd96026ca70dc5fd0da3.tar.xz seaweedfs-f036ef8a3c50af3c933dcd96026ca70dc5fd0da3.zip | |
add filer notification
Diffstat (limited to 'weed/filer2')
| -rw-r--r-- | weed/filer2/entry_codec.go | 22 | ||||
| -rw-r--r-- | weed/filer2/filer.go | 8 | ||||
| -rw-r--r-- | weed/filer2/filer_notify.go | 38 |
3 files changed, 68 insertions, 0 deletions
diff --git a/weed/filer2/entry_codec.go b/weed/filer2/entry_codec.go index 671568b75..baa6a9440 100644 --- a/weed/filer2/entry_codec.go +++ b/weed/filer2/entry_codec.go @@ -63,3 +63,25 @@ func PbToEntryAttribute(attr *filer_pb.FuseAttributes) Attr { return t } + +func EqualEntry(a, b *Entry) bool { + if a == b { + return true + } + if a == nil && b != nil || a != nil && b == nil { + return false + } + if !proto.Equal(EntryAttributeToPb(a), EntryAttributeToPb(b)) { + return false + } + if len(a.Chunks) != len(b.Chunks) { + return false + } + + for i := 0; i < len(a.Chunks); i++ { + if !proto.Equal(a.Chunks[i], b.Chunks[i]) { + return false + } + } + return true +} diff --git a/weed/filer2/filer.go b/weed/filer2/filer.go index 823371a6b..475e79a17 100644 --- a/weed/filer2/filer.go +++ b/weed/filer2/filer.go @@ -90,6 +90,9 @@ func (f *Filer) CreateEntry(entry *Entry) error { if mkdirErr != nil { return fmt.Errorf("mkdir %s: %v", dirPath, mkdirErr) } + + f.NotifyUpdateEvent(nil, dirEntry) + } else if !dirEntry.IsDirectory() { return fmt.Errorf("%s is a file", dirPath) } @@ -122,6 +125,8 @@ func (f *Filer) CreateEntry(entry *Entry) error { return fmt.Errorf("insert entry %s: %v", entry.FullPath, err) } + f.NotifyUpdateEvent(oldEntry, entry) + f.deleteChunksIfNotNew(oldEntry, entry) return nil @@ -170,6 +175,9 @@ func (f *Filer) DeleteEntryMetaAndData(p FullPath, isRecursive bool, shouldDelet return nil } glog.V(0).Infof("deleting entry %v", p) + + f.NotifyUpdateEvent(entry, nil) + return f.store.DeleteEntry(p) } diff --git a/weed/filer2/filer_notify.go b/weed/filer2/filer_notify.go new file mode 100644 index 000000000..2ce45db0e --- /dev/null +++ b/weed/filer2/filer_notify.go @@ -0,0 +1,38 @@ +package filer2 + +import ( + "github.com/chrislusf/seaweedfs/weed/msgqueue" + "github.com/chrislusf/seaweedfs/weed/pb/filer_pb" +) + +func (f *Filer) NotifyUpdateEvent(oldEntry, newEntry *Entry) { + var key string + if oldEntry != nil { + key = string(oldEntry.FullPath) + } else if newEntry != nil { + key = string(newEntry.FullPath) + } else { + return + } + + msgqueue.Queue.SendMessage( + key, + &filer_pb.EventNotification{ + OldEntry: toProtoEntry(oldEntry), + NewEntry: toProtoEntry(newEntry), + }, + ) + +} + +func toProtoEntry(entry *Entry) *filer_pb.Entry { + if entry == nil { + return nil + } + return &filer_pb.Entry{ + Name: string(entry.FullPath), + IsDirectory: entry.IsDirectory(), + Attributes: EntryAttributeToPb(entry), + Chunks: entry.Chunks, + } +} |
