aboutsummaryrefslogtreecommitdiff
path: root/weed/filer2/filer_delete_entry.go
diff options
context:
space:
mode:
Diffstat (limited to 'weed/filer2/filer_delete_entry.go')
-rw-r--r--weed/filer2/filer_delete_entry.go125
1 files changed, 125 insertions, 0 deletions
diff --git a/weed/filer2/filer_delete_entry.go b/weed/filer2/filer_delete_entry.go
new file mode 100644
index 000000000..d0792ac66
--- /dev/null
+++ b/weed/filer2/filer_delete_entry.go
@@ -0,0 +1,125 @@
+package filer2
+
+import (
+ "context"
+ "fmt"
+
+ "github.com/chrislusf/seaweedfs/weed/glog"
+ "github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
+ "github.com/chrislusf/seaweedfs/weed/pb/master_pb"
+)
+
+func (f *Filer) DeleteEntryMetaAndData(ctx context.Context, p FullPath, isRecursive bool, ignoreRecursiveError, shouldDeleteChunks bool) (err error) {
+ if p == "/" {
+ return nil
+ }
+
+ entry, findErr := f.FindEntry(ctx, p)
+ if findErr != nil {
+ return findErr
+ }
+
+ isCollection := f.isBucket(entry)
+
+ var chunks []*filer_pb.FileChunk
+ chunks = append(chunks, entry.Chunks...)
+ if entry.IsDirectory() {
+ // delete the folder children, not including the folder itself
+ var dirChunks []*filer_pb.FileChunk
+ dirChunks, err = f.doBatchDeleteFolderMetaAndData(ctx, entry, isRecursive, ignoreRecursiveError, shouldDeleteChunks && !isCollection)
+ if err != nil {
+ glog.V(0).Infof("delete directory %s: %v", p, err)
+ return fmt.Errorf("delete directory %s: %v", p, err)
+ }
+ chunks = append(chunks, dirChunks...)
+ }
+
+ // delete the file or folder
+ err = f.doDeleteEntryMetaAndData(ctx, entry, shouldDeleteChunks)
+ if err != nil {
+ return fmt.Errorf("delete file %s: %v", p, err)
+ }
+
+ if shouldDeleteChunks && !isCollection {
+ go f.DeleteChunks(chunks)
+ }
+ if isCollection {
+ collectionName := entry.Name()
+ f.doDeleteCollection(collectionName)
+ f.deleteBucket(collectionName)
+ }
+
+ return nil
+}
+
+func (f *Filer) doBatchDeleteFolderMetaAndData(ctx context.Context, entry *Entry, isRecursive bool, ignoreRecursiveError, shouldDeleteChunks bool) (chunks []*filer_pb.FileChunk, err error) {
+
+ lastFileName := ""
+ includeLastFile := false
+ for {
+ entries, err := f.ListDirectoryEntries(ctx, entry.FullPath, lastFileName, includeLastFile, PaginationSize)
+ if err != nil {
+ glog.Errorf("list folder %s: %v", entry.FullPath, err)
+ return nil, fmt.Errorf("list folder %s: %v", entry.FullPath, err)
+ }
+ if lastFileName == "" && !isRecursive && len(entries) > 0 {
+ // only for first iteration in the loop
+ return nil, fmt.Errorf("fail to delete non-empty folder: %s", entry.FullPath)
+ }
+
+ for _, sub := range entries {
+ lastFileName = sub.Name()
+ var dirChunks []*filer_pb.FileChunk
+ if sub.IsDirectory() {
+ dirChunks, err = f.doBatchDeleteFolderMetaAndData(ctx, sub, isRecursive, ignoreRecursiveError, shouldDeleteChunks)
+ chunks = append(chunks, dirChunks...)
+ } else {
+ chunks = append(chunks, sub.Chunks...)
+ }
+ if err != nil && !ignoreRecursiveError {
+ return nil, err
+ }
+ }
+
+ if len(entries) < PaginationSize {
+ break
+ }
+ }
+
+ f.cacheDelDirectory(string(entry.FullPath))
+
+ glog.V(3).Infof("deleting directory %v delete %d chunks: %v", entry.FullPath, len(chunks), shouldDeleteChunks)
+
+ if storeDeletionErr := f.store.DeleteFolderChildren(ctx, entry.FullPath); storeDeletionErr != nil {
+ return nil, fmt.Errorf("filer store delete: %v", storeDeletionErr)
+ }
+ f.NotifyUpdateEvent(entry, nil, shouldDeleteChunks)
+
+ return chunks, nil
+}
+
+func (f *Filer) doDeleteEntryMetaAndData(ctx context.Context, entry *Entry, shouldDeleteChunks bool) (err error) {
+
+ glog.V(3).Infof("deleting entry %v, delete chunks: %v", entry.FullPath, shouldDeleteChunks)
+
+ if storeDeletionErr := f.store.DeleteEntry(ctx, entry.FullPath); storeDeletionErr != nil {
+ return fmt.Errorf("filer store delete: %v", storeDeletionErr)
+ }
+ f.NotifyUpdateEvent(entry, nil, shouldDeleteChunks)
+
+ return nil
+}
+
+func (f *Filer) doDeleteCollection(collectionName string) (err error) {
+
+ return f.MasterClient.WithClient(func(client master_pb.SeaweedClient) error {
+ _, err := client.CollectionDelete(context.Background(), &master_pb.CollectionDeleteRequest{
+ Name: collectionName,
+ })
+ if err != nil {
+ glog.Infof("delete collection %s: %v", collectionName, err)
+ }
+ return err
+ })
+
+}