aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--weed/command/scaffold.go9
-rw-r--r--weed/filer2/abstract_sql/abstract_sql_store.go30
-rw-r--r--weed/filer2/cassandra/cassandra_store.go11
-rw-r--r--weed/filer2/etcd/etcd_store.go10
-rw-r--r--weed/filer2/filer.go62
-rw-r--r--weed/filer2/filer_client_util.go49
-rw-r--r--weed/filer2/filer_delete_entry.go102
-rw-r--r--weed/filer2/filer_deletion.go7
-rw-r--r--weed/filer2/filerstore.go11
-rw-r--r--weed/filer2/leveldb/leveldb_store.go35
-rw-r--r--weed/filer2/leveldb2/leveldb2_store.go35
-rw-r--r--weed/filer2/memdb/memdb_store.go132
-rw-r--r--weed/filer2/memdb/memdb_store_test.go149
-rw-r--r--weed/filer2/mysql/mysql_store.go1
-rw-r--r--weed/filer2/postgres/postgres_store.go1
-rw-r--r--weed/filer2/redis/universal_redis_store.go18
-rw-r--r--weed/filer2/tikv/tikv_store.go32
-rw-r--r--weed/s3api/s3api_server.go6
-rw-r--r--weed/server/filer_grpc_server.go6
-rw-r--r--weed/server/filer_server.go1
-rw-r--r--weed/server/filer_server_handlers_write.go2
-rw-r--r--weed/server/filer_server_handlers_write_autochunk.go2
22 files changed, 310 insertions, 401 deletions
diff --git a/weed/command/scaffold.go b/weed/command/scaffold.go
index ebb72dc44..6ad8effc2 100644
--- a/weed/command/scaffold.go
+++ b/weed/command/scaffold.go
@@ -59,15 +59,6 @@ const (
# $HOME/.seaweedfs/filer.toml
# /etc/seaweedfs/filer.toml
-[memory]
-# local in memory, mostly for testing purpose
-enabled = false
-
-[leveldb]
-# local on disk, mostly for simple single-machine setup, fairly scalable
-enabled = false
-dir = "." # directory to store level db files
-
[leveldb2]
# local on disk, mostly for simple single-machine setup, fairly scalable
# faster than previous leveldb, recommended.
diff --git a/weed/filer2/abstract_sql/abstract_sql_store.go b/weed/filer2/abstract_sql/abstract_sql_store.go
index 3e8554957..d512467c7 100644
--- a/weed/filer2/abstract_sql/abstract_sql_store.go
+++ b/weed/filer2/abstract_sql/abstract_sql_store.go
@@ -10,13 +10,14 @@ import (
)
type AbstractSqlStore struct {
- DB *sql.DB
- SqlInsert string
- SqlUpdate string
- SqlFind string
- SqlDelete string
- SqlListExclusive string
- SqlListInclusive string
+ DB *sql.DB
+ SqlInsert string
+ SqlUpdate string
+ SqlFind string
+ SqlDelete string
+ SqlDeleteFolderChildren string
+ SqlListExclusive string
+ SqlListInclusive string
}
type TxOrDB interface {
@@ -132,6 +133,21 @@ func (store *AbstractSqlStore) DeleteEntry(ctx context.Context, fullpath filer2.
return nil
}
+func (store *AbstractSqlStore) DeleteFolderChildren(ctx context.Context, fullpath filer2.FullPath) error {
+
+ res, err := store.getTxOrDB(ctx).ExecContext(ctx, store.SqlDeleteFolderChildren, hashToLong(string(fullpath)), fullpath)
+ if err != nil {
+ return fmt.Errorf("deleteFolderChildren %s: %s", fullpath, err)
+ }
+
+ _, err = res.RowsAffected()
+ if err != nil {
+ return fmt.Errorf("deleteFolderChildren %s but no rows affected: %s", fullpath, err)
+ }
+
+ return nil
+}
+
func (store *AbstractSqlStore) ListDirectoryEntries(ctx context.Context, fullpath filer2.FullPath, startFileName string, inclusive bool, limit int) (entries []*filer2.Entry, err error) {
sqlText := store.SqlListExclusive
diff --git a/weed/filer2/cassandra/cassandra_store.go b/weed/filer2/cassandra/cassandra_store.go
index 466be5bf3..dcaab8bc4 100644
--- a/weed/filer2/cassandra/cassandra_store.go
+++ b/weed/filer2/cassandra/cassandra_store.go
@@ -112,6 +112,17 @@ func (store *CassandraStore) DeleteEntry(ctx context.Context, fullpath filer2.Fu
return nil
}
+func (store *CassandraStore) DeleteFolderChildren(ctx context.Context, fullpath filer2.FullPath) error {
+
+ if err := store.session.Query(
+ "DELETE FROM filemeta WHERE directory=?",
+ fullpath).Exec(); err != nil {
+ return fmt.Errorf("delete %s : %v", fullpath, err)
+ }
+
+ return nil
+}
+
func (store *CassandraStore) ListDirectoryEntries(ctx context.Context, fullpath filer2.FullPath, startFileName string, inclusive bool,
limit int) (entries []*filer2.Entry, err error) {
diff --git a/weed/filer2/etcd/etcd_store.go b/weed/filer2/etcd/etcd_store.go
index 1b0f928d0..2eb9e3e86 100644
--- a/weed/filer2/etcd/etcd_store.go
+++ b/weed/filer2/etcd/etcd_store.go
@@ -123,6 +123,16 @@ func (store *EtcdStore) DeleteEntry(ctx context.Context, fullpath filer2.FullPat
return nil
}
+func (store *EtcdStore) DeleteFolderChildren(ctx context.Context, fullpath filer2.FullPath) (err error) {
+ directoryPrefix := genDirectoryKeyPrefix(fullpath, "")
+
+ if _, err := store.client.Delete(ctx, string(directoryPrefix), clientv3.WithPrefix()); err != nil {
+ return fmt.Errorf("deleteFolderChildren %s : %v", fullpath, err)
+ }
+
+ return nil
+}
+
func (store *EtcdStore) ListDirectoryEntries(
ctx context.Context, fullpath filer2.FullPath, startFileName string, inclusive bool, limit int,
) (entries []*filer2.Entry, err error) {
diff --git a/weed/filer2/filer.go b/weed/filer2/filer.go
index c5d10196f..b724e20fd 100644
--- a/weed/filer2/filer.go
+++ b/weed/filer2/filer.go
@@ -3,7 +3,6 @@ package filer2
import (
"context"
"fmt"
- "math"
"os"
"path/filepath"
"strings"
@@ -207,67 +206,6 @@ func (f *Filer) FindEntry(ctx context.Context, p FullPath) (entry *Entry, err er
return f.store.FindEntry(ctx, p)
}
-func (f *Filer) DeleteEntryMetaAndData(ctx context.Context, p FullPath, isRecursive bool, ignoreRecursiveError, shouldDeleteChunks bool) (err error) {
- entry, err := f.FindEntry(ctx, p)
- if err != nil {
- return err
- }
-
- if entry.IsDirectory() {
- limit := int(1)
- if isRecursive {
- limit = math.MaxInt32
- }
- lastFileName := ""
- includeLastFile := false
- for limit > 0 {
- entries, err := f.ListDirectoryEntries(ctx, p, lastFileName, includeLastFile, PaginationSize)
- if err != nil {
- glog.Errorf("list folder %s: %v", p, err)
- return fmt.Errorf("list folder %s: %v", p, err)
- }
-
- if len(entries) == 0 {
- break
- }
-
- if isRecursive {
- for _, sub := range entries {
- lastFileName = sub.Name()
- err = f.DeleteEntryMetaAndData(ctx, sub.FullPath, isRecursive, ignoreRecursiveError, shouldDeleteChunks)
- if err != nil && !ignoreRecursiveError {
- return err
- }
- limit--
- if limit <= 0 {
- break
- }
- }
- }
-
- if len(entries) < PaginationSize {
- break
- }
- }
-
- f.cacheDelDirectory(string(p))
-
- }
-
- if shouldDeleteChunks {
- f.DeleteChunks(p, entry.Chunks)
- }
-
- if p == "/" {
- return nil
- }
- glog.V(3).Infof("deleting entry %v", p)
-
- f.NotifyUpdateEvent(entry, nil, shouldDeleteChunks)
-
- return f.store.DeleteEntry(ctx, p)
-}
-
func (f *Filer) ListDirectoryEntries(ctx context.Context, p FullPath, startFileName string, inclusive bool, limit int) ([]*Entry, error) {
if strings.HasSuffix(string(p), "/") && len(p) > 1 {
p = p[0 : len(p)-1]
diff --git a/weed/filer2/filer_client_util.go b/weed/filer2/filer_client_util.go
index 6d656fa11..1a10f7c20 100644
--- a/weed/filer2/filer_client_util.go
+++ b/weed/filer2/filer_client_util.go
@@ -3,6 +3,8 @@ package filer2
import (
"context"
"fmt"
+ "io"
+ "math"
"strings"
"sync"
@@ -124,35 +126,42 @@ func GetEntry(ctx context.Context, filerClient FilerClient, fullFilePath string)
return
}
-func ReadDirAllEntries(ctx context.Context, filerClient FilerClient, fullDirPath string, fn func(entry *filer_pb.Entry)) (err error) {
+func ReadDirAllEntries(ctx context.Context, filerClient FilerClient, fullDirPath, prefix string, fn func(entry *filer_pb.Entry, isLast bool)) (err error) {
err = filerClient.WithFilerClient(ctx, func(client filer_pb.SeaweedFilerClient) error {
lastEntryName := ""
- for {
-
- request := &filer_pb.ListEntriesRequest{
- Directory: fullDirPath,
- StartFromFileName: lastEntryName,
- Limit: PaginationSize,
- }
+ request := &filer_pb.ListEntriesRequest{
+ Directory: fullDirPath,
+ Prefix: prefix,
+ StartFromFileName: lastEntryName,
+ Limit: math.MaxUint32,
+ }
- glog.V(3).Infof("read directory: %v", request)
- resp, err := client.ListEntries(ctx, request)
- if err != nil {
- return fmt.Errorf("list %s: %v", fullDirPath, err)
- }
+ glog.V(3).Infof("read directory: %v", request)
+ stream, err := client.ListEntries(ctx, request)
+ if err != nil {
+ return fmt.Errorf("list %s: %v", fullDirPath, err)
+ }
- for _, entry := range resp.Entries {
- fn(entry)
- lastEntryName = entry.Name
+ var prevEntry *filer_pb.Entry
+ for {
+ resp, recvErr := stream.Recv()
+ if recvErr != nil {
+ if recvErr == io.EOF {
+ if prevEntry != nil {
+ fn(prevEntry, true)
+ }
+ break
+ } else {
+ return recvErr
+ }
}
-
- if len(resp.Entries) < PaginationSize {
- break
+ if prevEntry != nil {
+ fn(prevEntry, false)
}
-
+ prevEntry = resp.Entry
}
return nil
diff --git a/weed/filer2/filer_delete_entry.go b/weed/filer2/filer_delete_entry.go
new file mode 100644
index 000000000..75a09e7ef
--- /dev/null
+++ b/weed/filer2/filer_delete_entry.go
@@ -0,0 +1,102 @@
+package filer2
+
+import (
+ "context"
+ "fmt"
+
+ "github.com/chrislusf/seaweedfs/weed/glog"
+ "github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
+)
+
+func (f *Filer) DeleteEntryMetaAndData(ctx context.Context, p FullPath, isRecursive bool, ignoreRecursiveError, shouldDeleteChunks bool) (err error) {
+ if p == "/" {
+ return nil
+ }
+
+ entry, findErr := f.FindEntry(ctx, p)
+ if findErr != nil {
+ return findErr
+ }
+
+ var chunks []*filer_pb.FileChunk
+ chunks = append(chunks, entry.Chunks...)
+ if entry.IsDirectory() {
+ // delete the folder children, not including the folder itself
+ var dirChunks []*filer_pb.FileChunk
+ dirChunks, err = f.doBatchDeleteFolderMetaAndData(ctx, entry, isRecursive, ignoreRecursiveError, shouldDeleteChunks)
+ if err != nil {
+ return fmt.Errorf("delete directory %s: %v", p, err)
+ }
+ chunks = append(chunks, dirChunks...)
+ f.cacheDelDirectory(string(p))
+ }
+ // delete the file or folder
+ err = f.doDeleteEntryMetaAndData(ctx, entry, shouldDeleteChunks)
+ if err != nil {
+ return fmt.Errorf("delete file %s: %v", p, err)
+ }
+
+ if shouldDeleteChunks {
+ go f.DeleteChunks(chunks)
+ }
+
+ return nil
+}
+
+func (f *Filer) doBatchDeleteFolderMetaAndData(ctx context.Context, entry *Entry, isRecursive bool, ignoreRecursiveError, shouldDeleteChunks bool) (chunks []*filer_pb.FileChunk, err error) {
+
+ lastFileName := ""
+ includeLastFile := false
+ for {
+ entries, err := f.ListDirectoryEntries(ctx, entry.FullPath, lastFileName, includeLastFile, PaginationSize)
+ if err != nil {
+ glog.Errorf("list folder %s: %v", entry.FullPath, err)
+ return nil, fmt.Errorf("list folder %s: %v", entry.FullPath, err)
+ }
+ if lastFileName == "" && !isRecursive && len(entries) > 0 {
+ // only for first iteration in the loop
+ return nil, fmt.Errorf("fail to delete non-empty folder: %s", entry.FullPath)
+ }
+
+ for _, sub := range entries {
+ lastFileName = sub.Name()
+ var dirChunks []*filer_pb.FileChunk
+ if sub.IsDirectory() {
+ dirChunks, err = f.doBatchDeleteFolderMetaAndData(ctx, sub, isRecursive, ignoreRecursiveError, shouldDeleteChunks)
+ }
+ if err != nil && !ignoreRecursiveError {
+ return nil, err
+ }
+ if shouldDeleteChunks {
+ chunks = append(chunks, dirChunks...)
+ }
+ }
+
+ if len(entries) < PaginationSize {
+ break
+ }
+ }
+
+ f.cacheDelDirectory(string(entry.FullPath))
+
+ glog.V(3).Infof("deleting directory %v", entry.FullPath)
+
+ if storeDeletionErr := f.store.DeleteFolderChildren(ctx, entry.FullPath); storeDeletionErr != nil {
+ return nil, fmt.Errorf("filer store delete: %v", storeDeletionErr)
+ }
+ f.NotifyUpdateEvent(entry, nil, shouldDeleteChunks)
+
+ return chunks, nil
+}
+
+func (f *Filer) doDeleteEntryMetaAndData(ctx context.Context, entry *Entry, shouldDeleteChunks bool) (err error) {
+
+ glog.V(3).Infof("deleting entry %v", entry.FullPath)
+
+ if storeDeletionErr := f.store.DeleteEntry(ctx, entry.FullPath); storeDeletionErr != nil {
+ return fmt.Errorf("filer store delete: %v", storeDeletionErr)
+ }
+ f.NotifyUpdateEvent(entry, nil, shouldDeleteChunks)
+
+ return nil
+}
diff --git a/weed/filer2/filer_deletion.go b/weed/filer2/filer_deletion.go
index 25e27e504..9937685f7 100644
--- a/weed/filer2/filer_deletion.go
+++ b/weed/filer2/filer_deletion.go
@@ -51,9 +51,8 @@ func (f *Filer) loopProcessingDeletion() {
}
}
-func (f *Filer) DeleteChunks(fullpath FullPath, chunks []*filer_pb.FileChunk) {
+func (f *Filer) DeleteChunks(chunks []*filer_pb.FileChunk) {
for _, chunk := range chunks {
- glog.V(3).Infof("deleting %s chunk %s", fullpath, chunk.String())
f.fileIdDeletionChan <- chunk.GetFileIdString()
}
}
@@ -70,7 +69,7 @@ func (f *Filer) deleteChunksIfNotNew(oldEntry, newEntry *Entry) {
return
}
if newEntry == nil {
- f.DeleteChunks(oldEntry.FullPath, oldEntry.Chunks)
+ f.DeleteChunks(oldEntry.Chunks)
}
var toDelete []*filer_pb.FileChunk
@@ -84,5 +83,5 @@ func (f *Filer) deleteChunksIfNotNew(oldEntry, newEntry *Entry) {
toDelete = append(toDelete, oldChunk)
}
}
- f.DeleteChunks(oldEntry.FullPath, toDelete)
+ f.DeleteChunks(toDelete)
}
diff --git a/weed/filer2/filerstore.go b/weed/filer2/filerstore.go
index 8caa44ee2..0bb0bd611 100644
--- a/weed/filer2/filerstore.go
+++ b/weed/filer2/filerstore.go
@@ -20,6 +20,7 @@ type FilerStore interface {
// err == filer2.ErrNotFound if not found
FindEntry(context.Context, FullPath) (entry *Entry, err error)
DeleteEntry(context.Context, FullPath) (err error)
+ DeleteFolderChildren(context.Context, FullPath) (err error)
ListDirectoryEntries(ctx context.Context, dirPath FullPath, startFileName string, includeStartFile bool, limit int) ([]*Entry, error)
BeginTransaction(ctx context.Context) (context.Context, error)
@@ -97,6 +98,16 @@ func (fsw *FilerStoreWrapper) DeleteEntry(ctx context.Context, fp FullPath) (err
return fsw.actualStore.DeleteEntry(ctx, fp)
}
+func (fsw *FilerStoreWrapper) DeleteFolderChildren(ctx context.Context, fp FullPath) (err error) {
+ stats.FilerStoreCounter.WithLabelValues(fsw.actualStore.GetName(), "deleteFolderChildren").Inc()
+ start := time.Now()
+ defer func() {
+ stats.FilerStoreHistogram.WithLabelValues(fsw.actualStore.GetName(), "deleteFolderChildren").Observe(time.Since(start).Seconds())
+ }()
+
+ return fsw.actualStore.DeleteFolderChildren(ctx, fp)
+}
+
func (fsw *FilerStoreWrapper) ListDirectoryEntries(ctx context.Context, dirPath FullPath, startFileName string, includeStartFile bool, limit int) ([]*Entry, error) {
stats.FilerStoreCounter.WithLabelValues(fsw.actualStore.GetName(), "list").Inc()
start := time.Now()
diff --git a/weed/filer2/leveldb/leveldb_store.go b/weed/filer2/leveldb/leveldb_store.go
index d00eba859..4952b3b3a 100644
--- a/weed/filer2/leveldb/leveldb_store.go
+++ b/weed/filer2/leveldb/leveldb_store.go
@@ -5,12 +5,13 @@ import (
"context"
"fmt"
- "github.com/chrislusf/seaweedfs/weed/filer2"
- "github.com/chrislusf/seaweedfs/weed/glog"
- weed_util "github.com/chrislusf/seaweedfs/weed/util"
"github.com/syndtr/goleveldb/leveldb"
"github.com/syndtr/goleveldb/leveldb/opt"
leveldb_util "github.com/syndtr/goleveldb/leveldb/util"
+
+ "github.com/chrislusf/seaweedfs/weed/filer2"
+ "github.com/chrislusf/seaweedfs/weed/glog"
+ weed_util "github.com/chrislusf/seaweedfs/weed/util"
)
const (
@@ -123,6 +124,34 @@ func (store *LevelDBStore) DeleteEntry(ctx context.Context, fullpath filer2.Full
return nil
}
+func (store *LevelDBStore) DeleteFolderChildren(ctx context.Context, fullpath filer2.FullPath) (err error) {
+
+ batch := new(leveldb.Batch)
+
+ directoryPrefix := genDirectoryKeyPrefix(fullpath, "")
+ iter := store.db.NewIterator(&leveldb_util.Range{Start: directoryPrefix}, nil)
+ for iter.Next() {
+ key := iter.Key()
+ if !bytes.HasPrefix(key, directoryPrefix) {
+ break
+ }
+ fileName := getNameFromKey(key)
+ if fileName == "" {
+ continue
+ }
+ batch.Delete([]byte(genKey(string(fullpath), fileName)))
+ }
+ iter.Release()
+
+ err = store.db.Write(batch, nil)
+
+ if err != nil {
+ return fmt.Errorf("delete %s : %v", fullpath, err)
+ }
+
+ return nil
+}
+
func (store *LevelDBStore) ListDirectoryEntries(ctx context.Context, fullpath filer2.FullPath, startFileName string, inclusive bool,
limit int) (entries []*filer2.Entry, err error) {
diff --git a/weed/filer2/leveldb2/leveldb2_store.go b/weed/filer2/leveldb2/leveldb2_store.go
index 4b47d2eb3..8a16822ab 100644
--- a/weed/filer2/leveldb2/leveldb2_store.go
+++ b/weed/filer2/leveldb2/leveldb2_store.go
@@ -8,12 +8,13 @@ import (
"io"
"os"
- "github.com/chrislusf/seaweedfs/weed/filer2"
- "github.com/chrislusf/seaweedfs/weed/glog"
- weed_util "github.com/chrislusf/seaweedfs/weed/util"
"github.com/syndtr/goleveldb/leveldb"
"github.com/syndtr/goleveldb/leveldb/opt"
leveldb_util "github.com/syndtr/goleveldb/leveldb/util"
+
+ "github.com/chrislusf/seaweedfs/weed/filer2"
+ "github.com/chrislusf/seaweedfs/weed/glog"
+ weed_util "github.com/chrislusf/seaweedfs/weed/util"
)
func init() {
@@ -134,6 +135,34 @@ func (store *LevelDB2Store) DeleteEntry(ctx context.Context, fullpath filer2.Ful
return nil
}
+func (store *LevelDB2Store) DeleteFolderChildren(ctx context.Context, fullpath filer2.FullPath) (err error) {
+ directoryPrefix, partitionId := genDirectoryKeyPrefix(fullpath, "", store.dbCount)
+
+ batch := new(leveldb.Batch)
+
+ iter := store.dbs[partitionId].NewIterator(&leveldb_util.Range{Start: directoryPrefix}, nil)
+ for iter.Next() {
+ key := iter.Key()
+ if !bytes.HasPrefix(key, directoryPrefix) {
+ break
+ }
+ fileName := getNameFromKey(key)
+ if fileName == "" {
+ continue
+ }
+ batch.Delete(append(directoryPrefix, []byte(fileName)...))
+ }
+ iter.Release()
+
+ err = store.dbs[partitionId].Write(batch, nil)
+
+ if err != nil {
+ return fmt.Errorf("delete %s : %v", fullpath, err)
+ }
+
+ return nil
+}
+
func (store *LevelDB2Store) ListDirectoryEntries(ctx context.Context, fullpath filer2.FullPath, startFileName string, inclusive bool,
limit int) (entries []*filer2.Entry, err error) {
diff --git a/weed/filer2/memdb/memdb_store.go b/weed/filer2/memdb/memdb_store.go
deleted file mode 100644
index 9c10a5472..000000000
--- a/weed/filer2/memdb/memdb_store.go
+++ /dev/null
@@ -1,132 +0,0 @@
-package memdb
-
-import (
- "context"
- "fmt"
- "github.com/chrislusf/seaweedfs/weed/filer2"
- "github.com/chrislusf/seaweedfs/weed/util"
- "github.com/google/btree"
- "strings"
- "sync"
-)
-
-func init() {
- filer2.Stores = append(filer2.Stores, &MemDbStore{})
-}
-
-type MemDbStore struct {
- tree *btree.BTree
- treeLock sync.Mutex
-}
-
-type entryItem struct {
- *filer2.Entry
-}
-
-func (a entryItem) Less(b btree.Item) bool {
- return strings.Compare(string(a.FullPath), string(b.(entryItem).FullPath)) < 0
-}
-
-func (store *MemDbStore) GetName() string {
- return "memory"
-}
-
-func (store *MemDbStore) Initialize(configuration util.Configuration) (err error) {
- store.tree = btree.New(8)
- return nil
-}
-
-func (store *MemDbStore) BeginTransaction(ctx context.Context) (context.Context, error) {
- return ctx, nil
-}
-func (store *MemDbStore) CommitTransaction(ctx context.Context) error {
- return nil
-}
-func (store *MemDbStore) RollbackTransaction(ctx context.Context) error {
- return nil
-}
-
-func (store *MemDbStore) InsertEntry(ctx context.Context, entry *filer2.Entry) (err error) {
- // println("inserting", entry.FullPath)
- store.treeLock.Lock()
- store.tree.ReplaceOrInsert(entryItem{entry})
- store.treeLock.Unlock()
- return nil
-}
-
-func (store *MemDbStore) UpdateEntry(ctx context.Context, entry *filer2.Entry) (err error) {
- if _, err = store.FindEntry(ctx, entry.FullPath); err != nil {
- return fmt.Errorf("no such file %s : %v", entry.FullPath, err)
- }
- store.treeLock.Lock()
- store.tree.ReplaceOrInsert(entryItem{entry})
- store.treeLock.Unlock()
- return nil
-}
-
-func (store *MemDbStore) FindEntry(ctx context.Context, fullpath filer2.FullPath) (entry *filer2.Entry, err error) {
- item := store.tree.Get(entryItem{&filer2.Entry{FullPath: fullpath}})
- if item == nil {
- return nil, filer2.ErrNotFound
- }
- entry = item.(entryItem).Entry
- return entry, nil
-}
-
-func (store *MemDbStore) DeleteEntry(ctx context.Context, fullpath filer2.FullPath) (err error) {
- store.treeLock.Lock()
- store.tree.Delete(entryItem{&filer2.Entry{FullPath: fullpath}})
- store.treeLock.Unlock()
- return nil
-}
-
-func (store *MemDbStore) ListDirectoryEntries(ctx context.Context, fullpath filer2.FullPath, startFileName string, inclusive bool, limit int) (entries []*filer2.Entry, err error) {
-
- startFrom := string(fullpath)
- if startFileName != "" {
- startFrom = startFrom + "/" + startFileName
- }
-
- store.tree.AscendGreaterOrEqual(entryItem{&filer2.Entry{FullPath: filer2.FullPath(startFrom)}},
- func(item btree.Item) bool {
- if limit <= 0 {
- return false
- }
- entry := item.(entryItem).Entry
- // println("checking", entry.FullPath)
-
- if entry.FullPath == fullpath {
- // skipping the current directory
- // println("skipping the folder", entry.FullPath)
- return true
- }
-
- dir, name := entry.FullPath.DirAndName()
- if name == startFileName {
- if inclusive {
- limit--
- entries = append(entries, entry)
- }
- return true
- }
-
- // only iterate the same prefix
- if !strings.HasPrefix(string(entry.FullPath), string(fullpath)) {
- // println("breaking from", entry.FullPath)
- return false
- }
-
- if dir != string(fullpath) {
- // this could be items in deeper directories
- // println("skipping deeper folder", entry.FullPath)
- return true
- }
- // now process the directory items
- // println("adding entry", entry.FullPath)
- limit--
- entries = append(entries, entry)
- return true
- },
- )
- return entries, nil
-}
diff --git a/weed/filer2/memdb/memdb_store_test.go b/weed/filer2/memdb/memdb_store_test.go
deleted file mode 100644
index 3fd806aeb..000000000
--- a/weed/filer2/memdb/memdb_store_test.go
+++ /dev/null
@@ -1,149 +0,0 @@
-package memdb
-
-import (
- "context"
- "github.com/chrislusf/seaweedfs/weed/filer2"
- "testing"
-)
-
-func TestCreateAndFind(t *testing.T) {
- filer := filer2.NewFiler(nil, nil)
- store := &MemDbStore{}
- store.Initialize(nil)
- filer.SetStore(store)
- filer.DisableDirectoryCache()
-
- ctx := context.Background()
-
- fullpath := filer2.FullPath("/home/chris/this/is/one/file1.jpg")
-
- entry1 := &filer2.Entry{
- FullPath: fullpath,
- Attr: filer2.Attr{
- Mode: 0440,
- Uid: 1234,
- Gid: 5678,
- },
- }
-
- if err := filer.CreateEntry(ctx, entry1); err != nil {
- t.Errorf("create entry %v: %v", entry1.FullPath, err)
- return
- }
-
- entry, err := filer.FindEntry(ctx, fullpath)
-
- if err != nil {
- t.Errorf("find entry: %v", err)
- return
- }
-
- if entry.FullPath != entry1.FullPath {
- t.Errorf("find wrong entry: %v", entry.FullPath)
- return
- }
-
-}
-
-func TestCreateFileAndList(t *testing.T) {
- filer := filer2.NewFiler(nil, nil)
- store := &MemDbStore{}
- store.Initialize(nil)
- filer.SetStore(store)
- filer.DisableDirectoryCache()
-
- ctx := context.Background()
-
- entry1 := &filer2.Entry{
- FullPath: filer2.FullPath("/home/chris/this/is/one/file1.jpg"),
- Attr: filer2.Attr{
- Mode: 0440,
- Uid: 1234,
- Gid: 5678,
- },
- }
-
- entry2 := &filer2.Entry{
- FullPath: filer2.FullPath("/home/chris/this/is/one/file2.jpg"),
- Attr: filer2.Attr{
- Mode: 0440,
- Uid: 1234,
- Gid: 5678,
- },
- }
-
- filer.CreateEntry(ctx, entry1)
- filer.CreateEntry(ctx, entry2)
-
- // checking the 2 files
- entries, err := filer.ListDirectoryEntries(ctx, filer2.FullPath("/home/chris/this/is/one/"), "", false, 100)
-
- if err != nil {
- t.Errorf("list entries: %v", err)
- return
- }
-
- if len(entries) != 2 {
- t.Errorf("list entries count: %v", len(entries))
- return
- }
-
- if entries[0].FullPath != entry1.FullPath {
- t.Errorf("find wrong entry 1: %v", entries[0].FullPath)
- return
- }
-
- if entries[1].FullPath != entry2.FullPath {
- t.Errorf("find wrong entry 2: %v", entries[1].FullPath)
- return
- }
-
- // checking the offset
- entries, err = filer.ListDirectoryEntries(ctx, filer2.FullPath("/home/chris/this/is/one/"), "file1.jpg", false, 100)
- if len(entries) != 1 {
- t.Errorf("list entries count: %v", len(entries))
- return
- }
-
- // checking one upper directory
- entries, _ = filer.ListDirectoryEntries(ctx, filer2.FullPath("/home/chris/this/is"), "", false, 100)
- if len(entries) != 1 {
- t.Errorf("list entries count: %v", len(entries))
- return
- }
-
- // checking root directory
- entries, _ = filer.ListDirectoryEntries(ctx, filer2.FullPath("/"), "", false, 100)
- if len(entries) != 1 {
- t.Errorf("list entries count: %v", len(entries))
- return
- }
-
- // add file3
- file3Path := filer2.FullPath("/home/chris/this/is/file3.jpg")
- entry3 := &filer2.Entry{
- FullPath: file3Path,
- Attr: filer2.Attr{
- Mode: 0440,
- Uid: 1234,
- Gid: 5678,
- },
- }
- filer.CreateEntry(ctx, entry3)
-
- // checking one upper directory
- entries, _ = filer.ListDirectoryEntries(ctx, filer2.FullPath("/home/chris/this/is"), "", false, 100)
- if len(entries) != 2 {
- t.Errorf("list entries count: %v", len(entries))
- return
- }
-
- // delete file and count
- filer.DeleteEntryMetaAndData(ctx, file3Path, false, false, false)
- entries, _ = filer.ListDirectoryEntries(ctx, filer2.FullPath("/home/chris/this/is"), "", false, 100)
- if len(entries) != 1 {
- t.Errorf("list entries count: %v", len(entries))
- return
- }
-
-}
diff --git a/weed/filer2/mysql/mysql_store.go b/weed/filer2/mysql/mysql_store.go
index 3eca80ff7..d1b06ece5 100644
--- a/weed/filer2/mysql/mysql_store.go
+++ b/weed/filer2/mysql/mysql_store.go
@@ -46,6 +46,7 @@ func (store *MysqlStore) initialize(user, password, hostname string, port int, d
store.SqlUpdate = "UPDATE filemeta SET meta=? WHERE dirhash=? AND name=? AND directory=?"
store.SqlFind = "SELECT meta FROM filemeta WHERE dirhash=? AND name=? AND directory=?"
store.SqlDelete = "DELETE FROM filemeta WHERE dirhash=? AND name=? AND directory=?"
+ store.SqlDeleteFolderChildren = "DELETE FROM filemeta WHERE dirhash=? AND directory=?"
store.SqlListExclusive = "SELECT NAME, meta FROM filemeta WHERE dirhash=? AND name>? AND directory=? ORDER BY NAME ASC LIMIT ?"
store.SqlListInclusive = "SELECT NAME, meta FROM filemeta WHERE dirhash=? AND name>=? AND directory=? ORDER BY NAME ASC LIMIT ?"
diff --git a/weed/filer2/postgres/postgres_store.go b/weed/filer2/postgres/postgres_store.go
index ffd3d1e01..3ec000fe0 100644
--- a/weed/filer2/postgres/postgres_store.go
+++ b/weed/filer2/postgres/postgres_store.go
@@ -45,6 +45,7 @@ func (store *PostgresStore) initialize(user, password, hostname string, port int
store.SqlUpdate = "UPDATE filemeta SET meta=$1 WHERE dirhash=$2 AND name=$3 AND directory=$4"
store.SqlFind = "SELECT meta FROM filemeta WHERE dirhash=$1 AND name=$2 AND directory=$3"
store.SqlDelete = "DELETE FROM filemeta WHERE dirhash=$1 AND name=$2 AND directory=$3"
+ store.SqlDeleteFolderChildren = "DELETE FROM filemeta WHERE dirhash=$1 AND directory=$2"
store.SqlListExclusive = "SELECT NAME, meta FROM filemeta WHERE dirhash=$1 AND name>$2 AND directory=$3 ORDER BY NAME ASC LIMIT $4"
store.SqlListInclusive = "SELECT NAME, meta FROM filemeta WHERE dirhash=$1 AND name>=$2 AND directory=$3 ORDER BY NAME ASC LIMIT $4"
diff --git a/weed/filer2/redis/universal_redis_store.go b/weed/filer2/redis/universal_redis_store.go
index ce41d4d70..62257e91e 100644
--- a/weed/filer2/redis/universal_redis_store.go
+++ b/weed/filer2/redis/universal_redis_store.go
@@ -99,6 +99,24 @@ func (store *UniversalRedisStore) DeleteEntry(ctx context.Context, fullpath file
return nil
}
+func (store *UniversalRedisStore) DeleteFolderChildren(ctx context.Context, fullpath filer2.FullPath) (err error) {
+
+ members, err := store.Client.SMembers(genDirectoryListKey(string(fullpath))).Result()
+ if err != nil {
+ return fmt.Errorf("delete folder %s : %v", fullpath, err)
+ }
+
+ for _, fileName := range members {
+ path := filer2.NewFullPath(string(fullpath), fileName)
+ _, err = store.Client.Del(string(path)).Result()
+ if err != nil {
+ return fmt.Errorf("delete %s in parent dir: %v", fullpath, err)
+ }
+ }
+
+ return nil
+}
+
func (store *UniversalRedisStore) ListDirectoryEntries(ctx context.Context, fullpath filer2.FullPath, startFileName string, inclusive bool,
limit int) (entries []*filer2.Entry, err error) {
diff --git a/weed/filer2/tikv/tikv_store.go b/weed/filer2/tikv/tikv_store.go
index 0e6b93c86..4eb8cb90d 100644
--- a/weed/filer2/tikv/tikv_store.go
+++ b/weed/filer2/tikv/tikv_store.go
@@ -141,6 +141,38 @@ func (store *TikvStore) DeleteEntry(ctx context.Context, fullpath filer2.FullPat
return nil
}
+func (store *TikvStore) DeleteFolderChildren(ctx context.Context, fullpath filer2.FullPath) (err error) {
+
+ directoryPrefix := genDirectoryKeyPrefix(fullpath, "")
+
+ tx := store.getTx(ctx)
+
+ iter, err := tx.Iter(directoryPrefix, nil)
+ if err != nil {
+ return fmt.Errorf("deleteFolderChildren %s: %v", fullpath, err)
+ }
+ defer iter.Close()
+ for iter.Valid() {
+ key := iter.Key()
+ if !bytes.HasPrefix(key, directoryPrefix) {
+ break
+ }
+ fileName := getNameFromKey(key)
+ if fileName == "" {
+ iter.Next()
+ continue
+ }
+
+ if err = tx.Delete(genKey(string(fullpath), fileName)); err != nil {
+ return fmt.Errorf("delete %s : %v", fullpath, err)
+ }
+
+ iter.Next()
+ }
+
+ return nil
+}
+
func (store *TikvStore) ListDirectoryEntries(ctx context.Context, fullpath filer2.FullPath, startFileName string, inclusive bool,
limit int) (entries []*filer2.Entry, err error) {
diff --git a/weed/s3api/s3api_server.go b/weed/s3api/s3api_server.go
index 24458592d..edf634444 100644
--- a/weed/s3api/s3api_server.go
+++ b/weed/s3api/s3api_server.go
@@ -1,12 +1,6 @@
package s3api
import (
- _ "github.com/chrislusf/seaweedfs/weed/filer2/cassandra"
- _ "github.com/chrislusf/seaweedfs/weed/filer2/leveldb"
- _ "github.com/chrislusf/seaweedfs/weed/filer2/memdb"
- _ "github.com/chrislusf/seaweedfs/weed/filer2/mysql"
- _ "github.com/chrislusf/seaweedfs/weed/filer2/postgres"
- _ "github.com/chrislusf/seaweedfs/weed/filer2/redis"
"github.com/gorilla/mux"
"google.golang.org/grpc"
"net/http"
diff --git a/weed/server/filer_grpc_server.go b/weed/server/filer_grpc_server.go
index acfd2260d..ad51d49cf 100644
--- a/weed/server/filer_grpc_server.go
+++ b/weed/server/filer_grpc_server.go
@@ -140,7 +140,7 @@ func (fs *FilerServer) CreateEntry(ctx context.Context, req *filer_pb.CreateEntr
})
if err == nil {
- fs.filer.DeleteChunks(fullpath, garbages)
+ fs.filer.DeleteChunks(garbages)
}
return &filer_pb.CreateEntryResponse{}, err
@@ -189,8 +189,8 @@ func (fs *FilerServer) UpdateEntry(ctx context.Context, req *filer_pb.UpdateEntr
}
if err = fs.filer.UpdateEntry(ctx, entry, newEntry); err == nil {
- fs.filer.DeleteChunks(entry.FullPath, unusedChunks)
- fs.filer.DeleteChunks(entry.FullPath, garbages)
+ fs.filer.DeleteChunks(unusedChunks)
+ fs.filer.DeleteChunks(garbages)
}
fs.filer.NotifyUpdateEvent(entry, newEntry, true)
diff --git a/weed/server/filer_server.go b/weed/server/filer_server.go
index 2cf26b1bb..41ba81366 100644
--- a/weed/server/filer_server.go
+++ b/weed/server/filer_server.go
@@ -18,7 +18,6 @@ import (
_ "github.com/chrislusf/seaweedfs/weed/filer2/etcd"
_ "github.com/chrislusf/seaweedfs/weed/filer2/leveldb"
_ "github.com/chrislusf/seaweedfs/weed/filer2/leveldb2"
- _ "github.com/chrislusf/seaweedfs/weed/filer2/memdb"
_ "github.com/chrislusf/seaweedfs/weed/filer2/mysql"
_ "github.com/chrislusf/seaweedfs/weed/filer2/postgres"
_ "github.com/chrislusf/seaweedfs/weed/filer2/redis"
diff --git a/weed/server/filer_server_handlers_write.go b/weed/server/filer_server_handlers_write.go
index c919efbc9..fb6855a99 100644
--- a/weed/server/filer_server_handlers_write.go
+++ b/weed/server/filer_server_handlers_write.go
@@ -194,7 +194,7 @@ func (fs *FilerServer) updateFilerStore(ctx context.Context, r *http.Request, w
}
// glog.V(4).Infof("saving %s => %+v", path, entry)
if dbErr := fs.filer.CreateEntry(ctx, entry); dbErr != nil {
- fs.filer.DeleteChunks(entry.FullPath, entry.Chunks)
+ fs.filer.DeleteChunks(entry.Chunks)
glog.V(0).Infof("failing to write %s to filer server : %v", path, dbErr)
writeJsonError(w, r, http.StatusInternalServerError, dbErr)
err = dbErr
diff --git a/weed/server/filer_server_handlers_write_autochunk.go b/weed/server/filer_server_handlers_write_autochunk.go
index 492b55943..7c872c2b4 100644
--- a/weed/server/filer_server_handlers_write_autochunk.go
+++ b/weed/server/filer_server_handlers_write_autochunk.go
@@ -177,7 +177,7 @@ func (fs *FilerServer) doAutoChunk(ctx context.Context, w http.ResponseWriter, r
Chunks: fileChunks,
}
if dbErr := fs.filer.CreateEntry(ctx, entry); dbErr != nil {
- fs.filer.DeleteChunks(entry.FullPath, entry.Chunks)
+ fs.filer.DeleteChunks(entry.Chunks)
replyerr = dbErr
filerResult.Error = dbErr.Error()
glog.V(0).Infof("failing to write %s to filer server : %v", path, dbErr)