diff options
| author | Chris Lu <chris.lu@gmail.com> | 2020-08-28 23:48:48 -0700 |
|---|---|---|
| committer | Chris Lu <chris.lu@gmail.com> | 2020-08-28 23:48:48 -0700 |
| commit | ca658a97c5248ba099356b006f0b341af53b0816 (patch) | |
| tree | 8b59defed9a417c4fa2e9346a23cd8a64e851852 /weed/filesys | |
| parent | 63ad1abccec691d2204b8dc63109ffeead0b0eed (diff) | |
| download | seaweedfs-ca658a97c5248ba099356b006f0b341af53b0816.tar.xz seaweedfs-ca658a97c5248ba099356b006f0b341af53b0816.zip | |
add signatures to messages to avoid double processing
Diffstat (limited to 'weed/filesys')
| -rw-r--r-- | weed/filesys/dir.go | 20 | ||||
| -rw-r--r-- | weed/filesys/dir_link.go | 1 | ||||
| -rw-r--r-- | weed/filesys/file.go | 5 | ||||
| -rw-r--r-- | weed/filesys/filehandle.go | 5 | ||||
| -rw-r--r-- | weed/filesys/meta_cache/meta_cache_subscribe.go | 10 | ||||
| -rw-r--r-- | weed/filesys/wfs.go | 4 |
6 files changed, 30 insertions, 15 deletions
diff --git a/weed/filesys/dir.go b/weed/filesys/dir.go index f20e67df1..108b6832a 100644 --- a/weed/filesys/dir.go +++ b/weed/filesys/dir.go @@ -142,7 +142,8 @@ func (dir *Dir) Create(ctx context.Context, req *fuse.CreateRequest, TtlSec: dir.wfs.option.TtlSec, }, }, - OExcl: req.Flags&fuse.OpenExclusive != 0, + OExcl: req.Flags&fuse.OpenExclusive != 0, + Signatures: []int32{dir.wfs.signature}, } glog.V(1).Infof("create %s/%s: %v", dir.FullPath(), req.Name, req.Flags) @@ -192,8 +193,9 @@ func (dir *Dir) Mkdir(ctx context.Context, req *fuse.MkdirRequest) (fs.Node, err err := dir.wfs.WithFilerClient(func(client filer_pb.SeaweedFilerClient) error { request := &filer_pb.CreateEntryRequest{ - Directory: dir.FullPath(), - Entry: newEntry, + Directory: dir.FullPath(), + Entry: newEntry, + Signatures: []int32{dir.wfs.signature}, } glog.V(1).Infof("mkdir: %v", request) @@ -316,10 +318,9 @@ func (dir *Dir) removeOneFile(req *fuse.RemoveRequest) error { return nil } - // first, ensure the filer store can correctly delete glog.V(3).Infof("remove file: %v", req) - err = filer_pb.Remove(dir.wfs, dir.FullPath(), req.Name, false, false, false, false) + err = filer_pb.Remove(dir.wfs, dir.FullPath(), req.Name, false, false, false, false, dir.wfs.signature) if err != nil { glog.V(3).Infof("not found remove file %s/%s: %v", dir.FullPath(), req.Name, err) return fuse.ENOENT @@ -339,10 +340,10 @@ func (dir *Dir) removeOneFile(req *fuse.RemoveRequest) error { func (dir *Dir) removeFolder(req *fuse.RemoveRequest) error { glog.V(3).Infof("remove directory entry: %v", req) - err := filer_pb.Remove(dir.wfs, dir.FullPath(), req.Name, true, false, false, false) + err := filer_pb.Remove(dir.wfs, dir.FullPath(), req.Name, true, false, false, false, dir.wfs.signature) if err != nil { glog.V(0).Infof("remove %s/%s: %v", dir.FullPath(), req.Name, err) - if strings.Contains(err.Error(), "non-empty"){ + if strings.Contains(err.Error(), "non-empty") { return fuse.EEXIST } return fuse.ENOENT @@ -457,8 +458,9 @@ func (dir *Dir) saveEntry() error { return dir.wfs.WithFilerClient(func(client filer_pb.SeaweedFilerClient) error { request := &filer_pb.UpdateEntryRequest{ - Directory: parentDir, - Entry: dir.entry, + Directory: parentDir, + Entry: dir.entry, + Signatures: []int32{dir.wfs.signature}, } glog.V(1).Infof("save dir entry: %v", request) diff --git a/weed/filesys/dir_link.go b/weed/filesys/dir_link.go index bd564f413..d813dd96a 100644 --- a/weed/filesys/dir_link.go +++ b/weed/filesys/dir_link.go @@ -34,6 +34,7 @@ func (dir *Dir) Symlink(ctx context.Context, req *fuse.SymlinkRequest) (fs.Node, SymlinkTarget: req.Target, }, }, + Signatures: []int32{dir.wfs.signature}, } err := dir.wfs.WithFilerClient(func(client filer_pb.SeaweedFilerClient) error { diff --git a/weed/filesys/file.go b/weed/filesys/file.go index a1b0b2202..ac0dd6a80 100644 --- a/weed/filesys/file.go +++ b/weed/filesys/file.go @@ -293,8 +293,9 @@ func (file *File) saveEntry() error { return file.wfs.WithFilerClient(func(client filer_pb.SeaweedFilerClient) error { request := &filer_pb.UpdateEntryRequest{ - Directory: file.dir.FullPath(), - Entry: file.entry, + Directory: file.dir.FullPath(), + Entry: file.entry, + Signatures: []int32{file.wfs.signature}, } glog.V(4).Infof("save file entry: %v", request) diff --git a/weed/filesys/filehandle.go b/weed/filesys/filehandle.go index db3e7d10e..87254b7c4 100644 --- a/weed/filesys/filehandle.go +++ b/weed/filesys/filehandle.go @@ -244,8 +244,9 @@ func (fh *FileHandle) doFlush(ctx context.Context, header fuse.Header) error { } request := &filer_pb.CreateEntryRequest{ - Directory: fh.f.dir.FullPath(), - Entry: fh.f.entry, + Directory: fh.f.dir.FullPath(), + Entry: fh.f.entry, + Signatures: []int32{fh.f.wfs.signature}, } glog.V(4).Infof("%s set chunks: %v", fh.f.fullpath(), len(fh.f.entry.Chunks)) diff --git a/weed/filesys/meta_cache/meta_cache_subscribe.go b/weed/filesys/meta_cache/meta_cache_subscribe.go index ca18411e0..3c0a9c2ac 100644 --- a/weed/filesys/meta_cache/meta_cache_subscribe.go +++ b/weed/filesys/meta_cache/meta_cache_subscribe.go @@ -12,10 +12,17 @@ import ( "github.com/chrislusf/seaweedfs/weed/util" ) -func SubscribeMetaEvents(mc *MetaCache, client filer_pb.FilerClient, dir string, lastTsNs int64) error { +func SubscribeMetaEvents(mc *MetaCache, selfSignature int32, client filer_pb.FilerClient, dir string, lastTsNs int64) error { processEventFn := func(resp *filer_pb.SubscribeMetadataResponse) error { message := resp.EventNotification + + for _, sig := range message.Signatures { + if sig == selfSignature && selfSignature != 0 { + return nil + } + } + var oldPath util.FullPath var newEntry *filer2.Entry if message.OldEntry != nil { @@ -41,6 +48,7 @@ func SubscribeMetaEvents(mc *MetaCache, client filer_pb.FilerClient, dir string, ClientName: "mount", PathPrefix: dir, SinceNs: lastTsNs, + Signature: selfSignature, }) if err != nil { return fmt.Errorf("subscribe: %v", err) diff --git a/weed/filesys/wfs.go b/weed/filesys/wfs.go index e9ee0864b..44c2895d3 100644 --- a/weed/filesys/wfs.go +++ b/weed/filesys/wfs.go @@ -67,6 +67,7 @@ type WFS struct { chunkCache *chunk_cache.TieredChunkCache metaCache *meta_cache.MetaCache + signature int32 } type statsCache struct { filer_pb.StatisticsResponse @@ -82,6 +83,7 @@ func NewSeaweedFileSystem(option *Option) *WFS { return make([]byte, option.ChunkSizeLimit) }, }, + signature: util.RandomInt32(), } cacheUniqueId := util.Md5String([]byte(option.FilerGrpcAddress + option.FilerMountRootPath + util.Version()))[0:4] cacheDir := path.Join(option.CacheDir, cacheUniqueId) @@ -92,7 +94,7 @@ func NewSeaweedFileSystem(option *Option) *WFS { wfs.metaCache = meta_cache.NewMetaCache(path.Join(cacheDir, "meta")) startTime := time.Now() - go meta_cache.SubscribeMetaEvents(wfs.metaCache, wfs, wfs.option.FilerMountRootPath, startTime.UnixNano()) + go meta_cache.SubscribeMetaEvents(wfs.metaCache, wfs.signature, wfs, wfs.option.FilerMountRootPath, startTime.UnixNano()) grace.OnInterrupt(func() { wfs.metaCache.Shutdown() }) |
