aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorchrislu <chris.lu@gmail.com>2022-02-25 02:57:54 -0800
committerchrislu <chris.lu@gmail.com>2022-02-25 02:57:54 -0800
commit03466f955e7907f5e7442dd3e60c45a3ab261ea3 (patch)
tree7b77cf6944e2f24a17a2854637506fc71678bc60
parent8080fe4cc19cd2d0479b605ae7ad7993a5bcf50a (diff)
downloadseaweedfs-03466f955e7907f5e7442dd3e60c45a3ab261ea3.tar.xz
seaweedfs-03466f955e7907f5e7442dd3e60c45a3ab261ea3.zip
rename: delete source entry metadata only, skipping hard links
-rw-r--r--weed/filer/filer_delete_entry.go6
-rw-r--r--weed/filer/filerstore_wrapper.go13
-rw-r--r--weed/mount/meta_cache/meta_cache.go18
-rw-r--r--weed/mount/meta_cache/meta_cache_subscribe.go2
-rw-r--r--weed/mount/weedfs_rename.go4
5 files changed, 36 insertions, 7 deletions
diff --git a/weed/filer/filer_delete_entry.go b/weed/filer/filer_delete_entry.go
index 425da7cea..c774f5d27 100644
--- a/weed/filer/filer_delete_entry.go
+++ b/weed/filer/filer_delete_entry.go
@@ -125,7 +125,11 @@ func (f *Filer) doDeleteEntryMetaAndData(ctx context.Context, entry *Entry, shou
glog.V(3).Infof("deleting entry %v, delete chunks: %v", entry.FullPath, shouldDeleteChunks)
- if storeDeletionErr := f.Store.DeleteOneEntry(ctx, entry); storeDeletionErr != nil {
+ if !entry.IsDirectory() && !shouldDeleteChunks {
+ if storeDeletionErr := f.Store.DeleteOneEntrySkipHardlink(ctx, entry.FullPath); storeDeletionErr != nil {
+ return fmt.Errorf("filer store delete skip hardlink: %v", storeDeletionErr)
+ }
+ } else if storeDeletionErr := f.Store.DeleteOneEntry(ctx, entry); storeDeletionErr != nil {
return fmt.Errorf("filer store delete: %v", storeDeletionErr)
}
if !entry.IsDirectory() {
diff --git a/weed/filer/filerstore_wrapper.go b/weed/filer/filerstore_wrapper.go
index a9650f766..548a2c9df 100644
--- a/weed/filer/filerstore_wrapper.go
+++ b/weed/filer/filerstore_wrapper.go
@@ -23,6 +23,7 @@ type VirtualFilerStore interface {
FilerStore
DeleteHardLink(ctx context.Context, hardLinkId HardLinkId) error
DeleteOneEntry(ctx context.Context, entry *Entry) error
+ DeleteOneEntrySkipHardlink(ctx context.Context, fullpath util.FullPath) error
AddPathSpecificStore(path string, storeId string, store FilerStore)
OnBucketCreation(bucket string)
OnBucketDeletion(bucket string)
@@ -216,6 +217,18 @@ func (fsw *FilerStoreWrapper) DeleteOneEntry(ctx context.Context, existingEntry
return actualStore.DeleteEntry(ctx, existingEntry.FullPath)
}
+func (fsw *FilerStoreWrapper) DeleteOneEntrySkipHardlink(ctx context.Context, fullpath util.FullPath) (err error) {
+ actualStore := fsw.getActualStore(fullpath)
+ stats.FilerStoreCounter.WithLabelValues(actualStore.GetName(), "delete").Inc()
+ start := time.Now()
+ defer func() {
+ stats.FilerStoreHistogram.WithLabelValues(actualStore.GetName(), "delete").Observe(time.Since(start).Seconds())
+ }()
+
+ glog.V(4).Infof("DeleteOneEntrySkipHardlink %s", fullpath)
+ return actualStore.DeleteEntry(ctx, fullpath)
+}
+
func (fsw *FilerStoreWrapper) DeleteFolderChildren(ctx context.Context, fp util.FullPath) (err error) {
actualStore := fsw.getActualStore(fp + "/")
stats.FilerStoreCounter.WithLabelValues(actualStore.GetName(), "deleteFolderChildren").Inc()
diff --git a/weed/mount/meta_cache/meta_cache.go b/weed/mount/meta_cache/meta_cache.go
index 7f997c5b0..994f00463 100644
--- a/weed/mount/meta_cache/meta_cache.go
+++ b/weed/mount/meta_cache/meta_cache.go
@@ -62,7 +62,7 @@ func (mc *MetaCache) doInsertEntry(ctx context.Context, entry *filer.Entry) erro
return mc.localStore.InsertEntry(ctx, entry)
}
-func (mc *MetaCache) AtomicUpdateEntryFromFiler(ctx context.Context, oldPath util.FullPath, newEntry *filer.Entry) error {
+func (mc *MetaCache) AtomicUpdateEntryFromFiler(ctx context.Context, oldPath util.FullPath, newEntry *filer.Entry, shouldDeleteChunks bool) error {
//mc.Lock()
//defer mc.Unlock()
@@ -74,8 +74,14 @@ func (mc *MetaCache) AtomicUpdateEntryFromFiler(ctx context.Context, oldPath uti
// leave the update to the following InsertEntry operation
} else {
glog.V(3).Infof("DeleteEntry %s", oldPath)
- if err := mc.localStore.DeleteEntry(ctx, oldPath); err != nil {
- return err
+ if shouldDeleteChunks {
+ if err := mc.localStore.DeleteEntry(ctx, oldPath); err != nil {
+ return err
+ }
+ } else {
+ if err := mc.localStore.DeleteOneEntrySkipHardlink(ctx, oldPath); err != nil {
+ return err
+ }
}
}
}
@@ -112,6 +118,12 @@ func (mc *MetaCache) FindEntry(ctx context.Context, fp util.FullPath) (entry *fi
return
}
+func (mc *MetaCache) DeleteEntrySkipHardlink(ctx context.Context, fp util.FullPath) (err error) {
+ //mc.Lock()
+ //defer mc.Unlock()
+ return mc.localStore.DeleteOneEntrySkipHardlink(ctx, fp)
+}
+
func (mc *MetaCache) DeleteEntry(ctx context.Context, fp util.FullPath) (err error) {
//mc.Lock()
//defer mc.Unlock()
diff --git a/weed/mount/meta_cache/meta_cache_subscribe.go b/weed/mount/meta_cache/meta_cache_subscribe.go
index a0c5935ca..12ce8d8a7 100644
--- a/weed/mount/meta_cache/meta_cache_subscribe.go
+++ b/weed/mount/meta_cache/meta_cache_subscribe.go
@@ -36,7 +36,7 @@ func SubscribeMetaEvents(mc *MetaCache, selfSignature int32, client filer_pb.Fil
glog.V(4).Infof("creating %v", key)
newEntry = filer.FromPbEntry(dir, message.NewEntry)
}
- err := mc.AtomicUpdateEntryFromFiler(context.Background(), oldPath, newEntry)
+ err := mc.AtomicUpdateEntryFromFiler(context.Background(), oldPath, newEntry, message.DeleteChunks)
if err == nil {
if message.OldEntry != nil && message.NewEntry != nil {
oldKey := util.NewFullPath(resp.Directory, message.OldEntry.Name)
diff --git a/weed/mount/weedfs_rename.go b/weed/mount/weedfs_rename.go
index 9c83a4b94..44f3c910c 100644
--- a/weed/mount/weedfs_rename.go
+++ b/weed/mount/weedfs_rename.go
@@ -217,7 +217,7 @@ func (wfs *WFS) handleRenameResponse(ctx context.Context, resp *filer_pb.StreamR
if resp.EventNotification.NewEntry != nil {
// with new entry, the old entry name also exists. This is the first step to create new entry
newEntry := filer.FromPbEntry(resp.EventNotification.NewParentPath, resp.EventNotification.NewEntry)
- if err := wfs.metaCache.AtomicUpdateEntryFromFiler(ctx, "", newEntry); err != nil {
+ if err := wfs.metaCache.AtomicUpdateEntryFromFiler(ctx, "", newEntry, false); err != nil {
return err
}
@@ -231,7 +231,7 @@ func (wfs *WFS) handleRenameResponse(ctx context.Context, resp *filer_pb.StreamR
} else if resp.EventNotification.OldEntry != nil {
// without new entry, only old entry name exists. This is the second step to delete old entry
- if err := wfs.metaCache.AtomicUpdateEntryFromFiler(ctx, util.NewFullPath(resp.Directory, resp.EventNotification.OldEntry.Name), nil); err != nil {
+ if err := wfs.metaCache.AtomicUpdateEntryFromFiler(ctx, util.NewFullPath(resp.Directory, resp.EventNotification.OldEntry.Name), nil, resp.EventNotification.DeleteChunks); err != nil {
return err
}
}