aboutsummaryrefslogtreecommitdiff
path: root/weed/filer
diff options
context:
space:
mode:
authorKonstantin Lebedev <lebedev_k@tochka.com>2020-12-01 16:03:34 +0500
committerKonstantin Lebedev <lebedev_k@tochka.com>2020-12-01 16:03:34 +0500
commit03620776ece3175dac979b05c491d26d14faef0f (patch)
tree4fce7d0a55c3fa7f5b7c43e9d90d5e194c439239 /weed/filer
parent4e55baf5b109cfe5cf9f65c44cd92c542b4acf5e (diff)
parent005a6123e98170b2bdf99eb5b8a67ca3cea94190 (diff)
downloadseaweedfs-03620776ece3175dac979b05c491d26d14faef0f.tar.xz
seaweedfs-03620776ece3175dac979b05c491d26d14faef0f.zip
Merge branch 'upstream_master' into store_s3cred
Diffstat (limited to 'weed/filer')
-rw-r--r--weed/filer/entry.go3
-rw-r--r--weed/filer/entry_codec.go5
-rw-r--r--weed/filer/filer.go13
-rw-r--r--weed/filer/filer_conf.go4
-rw-r--r--weed/filer/filer_conf_test.go2
-rw-r--r--weed/filer/filer_delete_entry.go46
-rw-r--r--weed/filer/filer_deletion.go1
-rw-r--r--weed/filer/filerstore.go18
-rw-r--r--weed/filer/leveldb/leveldb_store.go10
-rw-r--r--weed/filer/leveldb2/leveldb2_store.go10
10 files changed, 86 insertions, 26 deletions
diff --git a/weed/filer/entry.go b/weed/filer/entry.go
index 421e51432..d2f257967 100644
--- a/weed/filer/entry.go
+++ b/weed/filer/entry.go
@@ -40,6 +40,7 @@ type Entry struct {
HardLinkId HardLinkId
HardLinkCounter int32
+ Content []byte
}
func (entry *Entry) Size() uint64 {
@@ -66,6 +67,7 @@ func (entry *Entry) ToProtoEntry() *filer_pb.Entry {
Extended: entry.Extended,
HardLinkId: entry.HardLinkId,
HardLinkCounter: entry.HardLinkCounter,
+ Content: entry.Content,
}
}
@@ -98,6 +100,7 @@ func FromPbEntry(dir string, entry *filer_pb.Entry) *Entry {
Chunks: entry.Chunks,
HardLinkId: HardLinkId(entry.HardLinkId),
HardLinkCounter: entry.HardLinkCounter,
+ Content: entry.Content,
}
}
diff --git a/weed/filer/entry_codec.go b/weed/filer/entry_codec.go
index 884fb2670..1693b551e 100644
--- a/weed/filer/entry_codec.go
+++ b/weed/filer/entry_codec.go
@@ -18,6 +18,7 @@ func (entry *Entry) EncodeAttributesAndChunks() ([]byte, error) {
Extended: entry.Extended,
HardLinkId: entry.HardLinkId,
HardLinkCounter: entry.HardLinkCounter,
+ Content: entry.Content,
}
return proto.Marshal(message)
}
@@ -38,6 +39,7 @@ func (entry *Entry) DecodeAttributesAndChunks(blob []byte) error {
entry.HardLinkId = message.HardLinkId
entry.HardLinkCounter = message.HardLinkCounter
+ entry.Content = message.Content
return nil
}
@@ -122,6 +124,9 @@ func EqualEntry(a, b *Entry) bool {
if a.HardLinkCounter != b.HardLinkCounter {
return false
}
+ if !bytes.Equal(a.Content, b.Content) {
+ return false
+ }
return true
}
diff --git a/weed/filer/filer.go b/weed/filer/filer.go
index 105c8e04f..8319212f1 100644
--- a/weed/filer/filer.go
+++ b/weed/filer/filer.go
@@ -238,6 +238,7 @@ func (f *Filer) CreateEntry(ctx context.Context, entry *Entry, o_excl bool, isFr
func (f *Filer) UpdateEntry(ctx context.Context, oldEntry, entry *Entry) (err error) {
if oldEntry != nil {
+ entry.Attr.Crtime = oldEntry.Attr.Crtime
if oldEntry.IsDirectory() && !entry.IsDirectory() {
glog.Errorf("existing %s is a directory", entry.FullPath)
return fmt.Errorf("existing %s is a directory", entry.FullPath)
@@ -269,7 +270,7 @@ func (f *Filer) FindEntry(ctx context.Context, p util.FullPath) (entry *Entry, e
entry, err = f.Store.FindEntry(ctx, p)
if entry != nil && entry.TtlSec > 0 {
if entry.Crtime.Add(time.Duration(entry.TtlSec) * time.Second).Before(time.Now()) {
- f.Store.DeleteEntry(ctx, p.Child(entry.Name()))
+ f.Store.DeleteOneEntry(ctx, entry)
return nil, filer_pb.ErrNotFound
}
}
@@ -303,7 +304,7 @@ func (f *Filer) doListDirectoryEntries(ctx context.Context, p util.FullPath, sta
lastFileName = entry.Name()
if entry.TtlSec > 0 {
if entry.Crtime.Add(time.Duration(entry.TtlSec) * time.Second).Before(time.Now()) {
- f.Store.DeleteEntry(ctx, p.Child(entry.Name()))
+ f.Store.DeleteOneEntry(ctx, entry)
expiredCount++
continue
}
@@ -317,11 +318,3 @@ func (f *Filer) Shutdown() {
f.LocalMetaLogBuffer.Shutdown()
f.Store.Shutdown()
}
-
-func (f *Filer) maybeDeleteHardLinks(hardLinkIds []HardLinkId) {
- for _, hardLinkId := range hardLinkIds {
- if err := f.Store.DeleteHardLink(context.Background(), hardLinkId); err != nil {
- glog.Errorf("delete hard link id %d : %v", hardLinkId, err)
- }
- }
-}
diff --git a/weed/filer/filer_conf.go b/weed/filer/filer_conf.go
index 6aa0d2b87..0328fdbff 100644
--- a/weed/filer/filer_conf.go
+++ b/weed/filer/filer_conf.go
@@ -40,6 +40,10 @@ func (fc *FilerConf) loadFromFiler(filer *Filer) (err error) {
return
}
+ if len(entry.Content) > 0 {
+ return fc.LoadFromBytes(entry.Content)
+ }
+
return fc.loadFromChunks(filer, entry.Chunks)
}
diff --git a/weed/filer/filer_conf_test.go b/weed/filer/filer_conf_test.go
index 91f006cda..ff868a3ec 100644
--- a/weed/filer/filer_conf_test.go
+++ b/weed/filer/filer_conf_test.go
@@ -22,7 +22,7 @@ func TestFilerConf(t *testing.T) {
},
{
LocationPrefix: "/buckets/",
- Replication: "001",
+ Replication: "001",
},
}}
fc.doLoadConf(conf)
diff --git a/weed/filer/filer_delete_entry.go b/weed/filer/filer_delete_entry.go
index 4415d45d9..b4f4e46ff 100644
--- a/weed/filer/filer_delete_entry.go
+++ b/weed/filer/filer_delete_entry.go
@@ -3,6 +3,7 @@ package filer
import (
"context"
"fmt"
+ "strings"
"github.com/chrislusf/seaweedfs/weed/glog"
"github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
@@ -22,7 +23,7 @@ func (f *Filer) DeleteEntryMetaAndData(ctx context.Context, p util.FullPath, isR
return findErr
}
- isCollection := f.isBucket(entry)
+ isDeleteCollection := f.isBucket(entry)
var chunks []*filer_pb.FileChunk
var hardLinkIds []HardLinkId
@@ -31,7 +32,7 @@ func (f *Filer) DeleteEntryMetaAndData(ctx context.Context, p util.FullPath, isR
// delete the folder children, not including the folder itself
var dirChunks []*filer_pb.FileChunk
var dirHardLinkIds []HardLinkId
- dirChunks, dirHardLinkIds, err = f.doBatchDeleteFolderMetaAndData(ctx, entry, isRecursive, ignoreRecursiveError, shouldDeleteChunks && !isCollection, isFromOtherCluster, signatures)
+ dirChunks, dirHardLinkIds, err = f.doBatchDeleteFolderMetaAndData(ctx, entry, isRecursive, ignoreRecursiveError, shouldDeleteChunks && !isDeleteCollection, isFromOtherCluster, signatures)
if err != nil {
glog.V(0).Infof("delete directory %s: %v", p, err)
return fmt.Errorf("delete directory %s: %v", p, err)
@@ -46,7 +47,7 @@ func (f *Filer) DeleteEntryMetaAndData(ctx context.Context, p util.FullPath, isR
return fmt.Errorf("delete file %s: %v", p, err)
}
- if shouldDeleteChunks && !isCollection {
+ if shouldDeleteChunks && !isDeleteCollection {
f.DirectDeleteChunks(chunks)
}
// A case not handled:
@@ -55,10 +56,15 @@ func (f *Filer) DeleteEntryMetaAndData(ctx context.Context, p util.FullPath, isR
f.maybeDeleteHardLinks(hardLinkIds)
}
- if isCollection {
+ if isDeleteCollection {
collectionName := entry.Name()
f.doDeleteCollection(collectionName)
f.deleteBucket(collectionName)
+ } else {
+ parent, _ := p.DirAndName()
+ if err := f.removeEmptyParentFolder(ctx, util.FullPath(parent)); err != nil {
+ glog.Errorf("clean up empty folders for %s : %v", p, err)
+ }
}
return nil
@@ -122,7 +128,7 @@ func (f *Filer) doDeleteEntryMetaAndData(ctx context.Context, entry *Entry, shou
glog.V(3).Infof("deleting entry %v, delete chunks: %v", entry.FullPath, shouldDeleteChunks)
- if storeDeletionErr := f.Store.DeleteEntry(ctx, entry.FullPath); storeDeletionErr != nil {
+ if storeDeletionErr := f.Store.DeleteOneEntry(ctx, entry); storeDeletionErr != nil {
return fmt.Errorf("filer store delete: %v", storeDeletionErr)
}
if !entry.IsDirectory() {
@@ -145,3 +151,33 @@ func (f *Filer) doDeleteCollection(collectionName string) (err error) {
})
}
+
+func (f *Filer) maybeDeleteHardLinks(hardLinkIds []HardLinkId) {
+ for _, hardLinkId := range hardLinkIds {
+ if err := f.Store.DeleteHardLink(context.Background(), hardLinkId); err != nil {
+ glog.Errorf("delete hard link id %d : %v", hardLinkId, err)
+ }
+ }
+}
+
+func (f *Filer) removeEmptyParentFolder(ctx context.Context, dir util.FullPath) error {
+ if !strings.HasPrefix(string(dir), f.DirBucketsPath) {
+ return nil
+ }
+ parent, _ := dir.DirAndName()
+ if parent == f.DirBucketsPath {
+ // should not delete bucket itself
+ return nil
+ }
+ entries, err := f.ListDirectoryEntries(ctx, dir, "", false, 1, "")
+ if err != nil {
+ return err
+ }
+ if len(entries) > 0 {
+ return nil
+ }
+ if err := f.Store.DeleteEntry(ctx, dir); err != nil {
+ return err
+ }
+ return f.removeEmptyParentFolder(ctx, util.FullPath(parent))
+}
diff --git a/weed/filer/filer_deletion.go b/weed/filer/filer_deletion.go
index 9eee38277..09af80b42 100644
--- a/weed/filer/filer_deletion.go
+++ b/weed/filer/filer_deletion.go
@@ -151,3 +151,4 @@ func (f *Filer) deleteChunksIfNotNew(oldEntry, newEntry *Entry) {
}
f.DeleteChunks(toDelete)
}
+
diff --git a/weed/filer/filerstore.go b/weed/filer/filerstore.go
index 11e30878d..3ad7a787e 100644
--- a/weed/filer/filerstore.go
+++ b/weed/filer/filerstore.go
@@ -45,6 +45,7 @@ type FilerStore interface {
type VirtualFilerStore interface {
FilerStore
DeleteHardLink(ctx context.Context, hardLinkId HardLinkId) error
+ DeleteOneEntry(ctx context.Context, entry *Entry) error
}
type FilerStoreWrapper struct {
@@ -145,6 +146,23 @@ func (fsw *FilerStoreWrapper) DeleteEntry(ctx context.Context, fp util.FullPath)
return fsw.ActualStore.DeleteEntry(ctx, fp)
}
+func (fsw *FilerStoreWrapper) DeleteOneEntry(ctx context.Context, existingEntry *Entry) (err error) {
+ stats.FilerStoreCounter.WithLabelValues(fsw.ActualStore.GetName(), "delete").Inc()
+ start := time.Now()
+ defer func() {
+ stats.FilerStoreHistogram.WithLabelValues(fsw.ActualStore.GetName(), "delete").Observe(time.Since(start).Seconds())
+ }()
+
+ if len(existingEntry.HardLinkId) != 0 {
+ // remove hard link
+ if err = fsw.DeleteHardLink(ctx, existingEntry.HardLinkId); err != nil {
+ return err
+ }
+ }
+
+ return fsw.ActualStore.DeleteEntry(ctx, existingEntry.FullPath)
+}
+
func (fsw *FilerStoreWrapper) DeleteFolderChildren(ctx context.Context, fp util.FullPath) (err error) {
stats.FilerStoreCounter.WithLabelValues(fsw.ActualStore.GetName(), "deleteFolderChildren").Inc()
start := time.Now()
diff --git a/weed/filer/leveldb/leveldb_store.go b/weed/filer/leveldb/leveldb_store.go
index 4b8dd5ea9..b879f3a6e 100644
--- a/weed/filer/leveldb/leveldb_store.go
+++ b/weed/filer/leveldb/leveldb_store.go
@@ -162,14 +162,14 @@ func (store *LevelDBStore) DeleteFolderChildren(ctx context.Context, fullpath we
return nil
}
-func (store *LevelDBStore) ListDirectoryPrefixedEntries(ctx context.Context, fullpath weed_util.FullPath, startFileName string, inclusive bool, limit int, prefix string) (entries []*filer.Entry, err error) {
- return nil, filer.ErrUnsupportedListDirectoryPrefixed
-}
-
func (store *LevelDBStore) ListDirectoryEntries(ctx context.Context, fullpath weed_util.FullPath, startFileName string, inclusive bool,
limit int) (entries []*filer.Entry, err error) {
+ return store.ListDirectoryPrefixedEntries(ctx, fullpath, startFileName, inclusive, limit, "")
+}
- directoryPrefix := genDirectoryKeyPrefix(fullpath, "")
+func (store *LevelDBStore) ListDirectoryPrefixedEntries(ctx context.Context, fullpath weed_util.FullPath, startFileName string, inclusive bool, limit int, prefix string) (entries []*filer.Entry, err error) {
+
+ directoryPrefix := genDirectoryKeyPrefix(fullpath, prefix)
iter := store.db.NewIterator(&leveldb_util.Range{Start: genDirectoryKeyPrefix(fullpath, startFileName)}, nil)
for iter.Next() {
diff --git a/weed/filer/leveldb2/leveldb2_store.go b/weed/filer/leveldb2/leveldb2_store.go
index 2ad0dd648..4b41554b9 100644
--- a/weed/filer/leveldb2/leveldb2_store.go
+++ b/weed/filer/leveldb2/leveldb2_store.go
@@ -171,14 +171,14 @@ func (store *LevelDB2Store) DeleteFolderChildren(ctx context.Context, fullpath w
return nil
}
-func (store *LevelDB2Store) ListDirectoryPrefixedEntries(ctx context.Context, fullpath weed_util.FullPath, startFileName string, inclusive bool, limit int, prefix string) (entries []*filer.Entry, err error) {
- return nil, filer.ErrUnsupportedListDirectoryPrefixed
-}
-
func (store *LevelDB2Store) ListDirectoryEntries(ctx context.Context, fullpath weed_util.FullPath, startFileName string, inclusive bool,
limit int) (entries []*filer.Entry, err error) {
+ return store.ListDirectoryPrefixedEntries(ctx, fullpath, startFileName, inclusive, limit, "")
+}
- directoryPrefix, partitionId := genDirectoryKeyPrefix(fullpath, "", store.dbCount)
+func (store *LevelDB2Store) ListDirectoryPrefixedEntries(ctx context.Context, fullpath weed_util.FullPath, startFileName string, inclusive bool, limit int, prefix string) (entries []*filer.Entry, err error) {
+
+ directoryPrefix, partitionId := genDirectoryKeyPrefix(fullpath, prefix, store.dbCount)
lastFileStart, _ := genDirectoryKeyPrefix(fullpath, startFileName, store.dbCount)
iter := store.dbs[partitionId].NewIterator(&leveldb_util.Range{Start: lastFileStart}, nil)