aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--weed/filesys/file.go2
-rw-r--r--weed/filesys/meta_cache/meta_cache.go8
-rw-r--r--weed/filesys/meta_cache/meta_cache_subscribe.go13
-rw-r--r--weed/filesys/wfs.go9
4 files changed, 24 insertions, 8 deletions
diff --git a/weed/filesys/file.go b/weed/filesys/file.go
index 383986f23..7aa1016d7 100644
--- a/weed/filesys/file.go
+++ b/weed/filesys/file.go
@@ -253,7 +253,7 @@ func (file *File) Forget() {
}
func (file *File) maybeLoadEntry(ctx context.Context) error {
- if file.isOpen > 0 {
+ if (file.entry != nil && len(file.entry.HardLinkId) != 0) || file.isOpen > 0 {
return nil
}
entry, err := file.wfs.maybeLoadEntry(file.dir.FullPath(), file.Name)
diff --git a/weed/filesys/meta_cache/meta_cache.go b/weed/filesys/meta_cache/meta_cache.go
index 81b2d77c3..4b282253d 100644
--- a/weed/filesys/meta_cache/meta_cache.go
+++ b/weed/filesys/meta_cache/meta_cache.go
@@ -21,13 +21,15 @@ type MetaCache struct {
sync.RWMutex
visitedBoundary *bounded_tree.BoundedTree
uidGidMapper *UidGidMapper
+ invalidateFunc func(util.FullPath)
}
-func NewMetaCache(dbFolder string, baseDir util.FullPath, uidGidMapper *UidGidMapper) *MetaCache {
+func NewMetaCache(dbFolder string, baseDir util.FullPath, uidGidMapper *UidGidMapper, invalidateFunc func(util.FullPath)) *MetaCache {
return &MetaCache{
localStore: openMetaStore(dbFolder),
visitedBoundary: bounded_tree.NewBoundedTree(baseDir),
uidGidMapper: uidGidMapper,
+ invalidateFunc: invalidateFunc,
}
}
@@ -70,7 +72,7 @@ func (mc *MetaCache) AtomicUpdateEntryFromFiler(ctx context.Context, oldPath uti
// skip the unnecessary deletion
// leave the update to the following InsertEntry operation
} else {
- glog.V(3).Infof("DeleteEntry %s/%s", oldPath,oldPath.Name())
+ glog.V(3).Infof("DeleteEntry %s/%s", oldPath, oldPath.Name())
if err := mc.localStore.DeleteEntry(ctx, oldPath); err != nil {
return err
}
@@ -83,7 +85,7 @@ func (mc *MetaCache) AtomicUpdateEntryFromFiler(ctx context.Context, oldPath uti
if newEntry != nil {
newDir, _ := newEntry.DirAndName()
if mc.visitedBoundary.HasVisited(util.FullPath(newDir)) {
- glog.V(3).Infof("InsertEntry %s/%s", newDir,newEntry.Name())
+ glog.V(3).Infof("InsertEntry %s/%s", newDir, newEntry.Name())
if err := mc.localStore.InsertEntry(ctx, newEntry); err != nil {
return err
}
diff --git a/weed/filesys/meta_cache/meta_cache_subscribe.go b/weed/filesys/meta_cache/meta_cache_subscribe.go
index 9b72cadcf..f9973f436 100644
--- a/weed/filesys/meta_cache/meta_cache_subscribe.go
+++ b/weed/filesys/meta_cache/meta_cache_subscribe.go
@@ -23,15 +23,15 @@ func SubscribeMetaEvents(mc *MetaCache, selfSignature int32, client filer_pb.Fil
}
}
+ dir := resp.Directory
var oldPath util.FullPath
var newEntry *filer.Entry
if message.OldEntry != nil {
- oldPath = util.NewFullPath(resp.Directory, message.OldEntry.Name)
+ oldPath = util.NewFullPath(dir, message.OldEntry.Name)
glog.V(4).Infof("deleting %v", oldPath)
}
if message.NewEntry != nil {
- dir := resp.Directory
if message.NewParentPath != "" {
dir = message.NewParentPath
}
@@ -39,7 +39,14 @@ func SubscribeMetaEvents(mc *MetaCache, selfSignature int32, client filer_pb.Fil
glog.V(4).Infof("creating %v", key)
newEntry = filer.FromPbEntry(dir, message.NewEntry)
}
- return mc.AtomicUpdateEntryFromFiler(context.Background(), oldPath, newEntry)
+ err := mc.AtomicUpdateEntryFromFiler(context.Background(), oldPath, newEntry)
+ if err == nil && message.OldEntry != nil && message.NewEntry != nil {
+ key := util.NewFullPath(dir, message.NewEntry.Name)
+ mc.invalidateFunc(key)
+ }
+
+ return err
+
}
for {
diff --git a/weed/filesys/wfs.go b/weed/filesys/wfs.go
index 912be3c05..759e21b15 100644
--- a/weed/filesys/wfs.go
+++ b/weed/filesys/wfs.go
@@ -92,7 +92,14 @@ func NewSeaweedFileSystem(option *Option) *WFS {
wfs.chunkCache = chunk_cache.NewTieredChunkCache(256, cacheDir, option.CacheSizeMB, 1024*1024)
}
- wfs.metaCache = meta_cache.NewMetaCache(path.Join(cacheDir, "meta"), util.FullPath(option.FilerMountRootPath), option.UidGidMapper)
+ wfs.metaCache = meta_cache.NewMetaCache(path.Join(cacheDir, "meta"), util.FullPath(option.FilerMountRootPath), option.UidGidMapper, func(filePath util.FullPath) {
+ fsNode := wfs.fsNodeCache.GetFsNode(filePath)
+ if fsNode != nil {
+ if file, ok := fsNode.(*File); ok {
+ file.entry = nil
+ }
+ }
+ })
startTime := time.Now()
go meta_cache.SubscribeMetaEvents(wfs.metaCache, wfs.signature, wfs, wfs.option.FilerMountRootPath, startTime.UnixNano())
grace.OnInterrupt(func() {