aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authortam-i13 <46927823+tam-i13@users.noreply.github.com>2025-11-26 06:35:19 +0300
committerGitHub <noreply@github.com>2025-11-25 19:35:19 -0800
commitb669607fcdb959c9a5c8ba7accd5b65e54cafbc6 (patch)
tree49ebe348b10e2f3ff2edcb5ecf17b04fbcf225e7
parent76f1a23fec89fa59b61767d507bd82325ae4833f (diff)
downloadseaweedfs-b669607fcdb959c9a5c8ba7accd5b65e54cafbc6.tar.xz
seaweedfs-b669607fcdb959c9a5c8ba7accd5b65e54cafbc6.zip
Add error list each entry func (#7485)
* added error return in type ListEachEntryFunc * return error if errClose * fix fmt.Errorf * fix return errClose * use %w fmt.Errorf * added entry in messege error * add callbackErr in ListDirectoryEntries * fix error * add log * clear err when the scanner stops on io.EOF, so returning err doesn’t surface EOF as a failure. * more info in error * add ctx to logs, error handling * fix return eachEntryFunc * fix * fix log * fix return * fix foundationdb test s * fix eachEntryFunc * fix return resEachEntryFuncErr * Update weed/filer/filer.go Co-authored-by: gemini-code-assist[bot] <176961590+gemini-code-assist[bot]@users.noreply.github.com> * Update weed/filer/elastic/v7/elastic_store.go Co-authored-by: gemini-code-assist[bot] <176961590+gemini-code-assist[bot]@users.noreply.github.com> * Update weed/filer/hbase/hbase_store.go Co-authored-by: gemini-code-assist[bot] <176961590+gemini-code-assist[bot]@users.noreply.github.com> * Update weed/filer/foundationdb/foundationdb_store.go Co-authored-by: gemini-code-assist[bot] <176961590+gemini-code-assist[bot]@users.noreply.github.com> * Update weed/filer/ydb/ydb_store.go Co-authored-by: gemini-code-assist[bot] <176961590+gemini-code-assist[bot]@users.noreply.github.com> * fix * add scanErr --------- Co-authored-by: Roman Tamarov <r.tamarov@kryptonite.ru> Co-authored-by: Chris Lu <chrislusf@users.noreply.github.com> Co-authored-by: chrislu <chris.lu@gmail.com> Co-authored-by: gemini-code-assist[bot] <176961590+gemini-code-assist[bot]@users.noreply.github.com>
-rw-r--r--test/foundationdb/foundationdb_concurrent_test.go12
-rw-r--r--test/foundationdb/foundationdb_integration_test.go12
-rw-r--r--test/foundationdb/mock_integration_test.go19
-rw-r--r--weed/filer/abstract_sql/abstract_sql_store.go12
-rw-r--r--weed/filer/arangodb/arangodb_store.go8
-rw-r--r--weed/filer/cassandra/cassandra_store.go17
-rw-r--r--weed/filer/cassandra2/cassandra_store.go18
-rw-r--r--weed/filer/elastic/v7/elastic_store.go18
-rw-r--r--weed/filer/etcd/etcd_store.go12
-rw-r--r--weed/filer/filer.go13
-rw-r--r--weed/filer/filer_search.go27
-rw-r--r--weed/filer/filerstore.go5
-rw-r--r--weed/filer/filerstore_translate_path.go7
-rw-r--r--weed/filer/filerstore_wrapper.go22
-rw-r--r--weed/filer/foundationdb/foundationdb_store.go12
-rw-r--r--weed/filer/foundationdb/foundationdb_store_test.go56
-rw-r--r--weed/filer/hbase/hbase_store.go21
-rw-r--r--weed/filer/leveldb/leveldb_store.go9
-rw-r--r--weed/filer/leveldb2/leveldb2_store.go9
-rw-r--r--weed/filer/leveldb3/leveldb3_store.go9
-rw-r--r--weed/filer/mongodb/mongodb_store.go14
-rw-r--r--weed/filer/redis/universal_redis_store.go9
-rw-r--r--weed/filer/redis2/universal_redis_store.go9
-rw-r--r--weed/filer/redis3/universal_redis_store.go19
-rw-r--r--weed/filer/redis_lua/universal_redis_store.go9
-rw-r--r--weed/filer/rocksdb/rocksdb_store.go20
-rw-r--r--weed/filer/store_test/test_suite.go13
-rw-r--r--weed/filer/tarantool/tarantool_store.go9
-rw-r--r--weed/filer/tikv/tikv_store.go24
-rw-r--r--weed/filer/ydb/ydb_store.go7
-rw-r--r--weed/mount/meta_cache/meta_cache.go4
-rw-r--r--weed/mount/weedfs_dir_read.go17
-rw-r--r--weed/server/filer_grpc_server.go8
-rw-r--r--weed/server/filer_grpc_server_traverse_meta.go7
34 files changed, 350 insertions, 137 deletions
diff --git a/test/foundationdb/foundationdb_concurrent_test.go b/test/foundationdb/foundationdb_concurrent_test.go
index b0ecaf742..de49ecc61 100644
--- a/test/foundationdb/foundationdb_concurrent_test.go
+++ b/test/foundationdb/foundationdb_concurrent_test.go
@@ -65,9 +65,9 @@ func TestFoundationDBStore_ConcurrentInserts(t *testing.T) {
expectedTotal := numGoroutines * entriesPerGoroutine
actualCount := 0
- _, err := store.ListDirectoryEntries(ctx, "/concurrent", "", true, 10000, func(entry *filer.Entry) bool {
+ _, err := store.ListDirectoryEntries(ctx, "/concurrent", "", true, 10000, func(entry *filer.Entry) (bool, error) {
actualCount++
- return true
+ return true, nil
})
if err != nil {
t.Fatalf("ListDirectoryEntries failed: %v", err)
@@ -265,9 +265,9 @@ func TestFoundationDBStore_ConcurrentTransactions(t *testing.T) {
totalExpectedEntries := successCount * entriesPerTransaction
actualCount := 0
- _, err := store.ListDirectoryEntries(ctx, "/transactions", "", true, 10000, func(entry *filer.Entry) bool {
+ _, err := store.ListDirectoryEntries(ctx, "/transactions", "", true, 10000, func(entry *filer.Entry) (bool, error) {
actualCount++
- return true
+ return true, nil
})
if err != nil {
t.Fatalf("ListDirectoryEntries failed: %v", err)
@@ -335,9 +335,9 @@ func TestFoundationDBStore_ConcurrentDirectoryOperations(t *testing.T) {
dirPath := fmt.Sprintf("/worker%d/dir%d", w, d)
fileCount := 0
- _, err := store.ListDirectoryEntries(ctx, dirPath, "", true, 1000, func(entry *filer.Entry) bool {
+ _, err := store.ListDirectoryEntries(ctx, dirPath, "", true, 1000, func(entry *filer.Entry) (bool, error) {
fileCount++
- return true
+ return true, nil
})
if err != nil {
t.Errorf("ListDirectoryEntries failed for %s: %v", dirPath, err)
diff --git a/test/foundationdb/foundationdb_integration_test.go b/test/foundationdb/foundationdb_integration_test.go
index 5fdf993d7..63ed41ef9 100644
--- a/test/foundationdb/foundationdb_integration_test.go
+++ b/test/foundationdb/foundationdb_integration_test.go
@@ -115,9 +115,9 @@ func TestFoundationDBStore_DirectoryOperations(t *testing.T) {
// Test ListDirectoryEntries
var listedFiles []string
- lastFileName, err := store.ListDirectoryEntries(ctx, testDir, "", true, 100, func(entry *filer.Entry) bool {
+ lastFileName, err := store.ListDirectoryEntries(ctx, testDir, "", true, 100, func(entry *filer.Entry) (bool, error) {
listedFiles = append(listedFiles, entry.Name())
- return true
+ return true, nil
})
if err != nil {
t.Fatalf("ListDirectoryEntries failed: %v", err)
@@ -132,9 +132,9 @@ func TestFoundationDBStore_DirectoryOperations(t *testing.T) {
// Test ListDirectoryPrefixedEntries
var prefixedFiles []string
- _, err = store.ListDirectoryPrefixedEntries(ctx, testDir, "", true, 100, "file", func(entry *filer.Entry) bool {
+ _, err = store.ListDirectoryPrefixedEntries(ctx, testDir, "", true, 100, "file", func(entry *filer.Entry) (bool, error) {
prefixedFiles = append(prefixedFiles, entry.Name())
- return true
+ return true, nil
})
if err != nil {
t.Fatalf("ListDirectoryPrefixedEntries failed: %v", err)
@@ -153,9 +153,9 @@ func TestFoundationDBStore_DirectoryOperations(t *testing.T) {
// Verify children are deleted
var remainingFiles []string
- _, err = store.ListDirectoryEntries(ctx, testDir, "", true, 100, func(entry *filer.Entry) bool {
+ _, err = store.ListDirectoryEntries(ctx, testDir, "", true, 100, func(entry *filer.Entry) (bool, error) {
remainingFiles = append(remainingFiles, entry.Name())
- return true
+ return true, nil
})
if err != nil {
t.Fatalf("ListDirectoryEntries after delete failed: %v", err)
diff --git a/test/foundationdb/mock_integration_test.go b/test/foundationdb/mock_integration_test.go
index 5073ba5b3..9639932ba 100644
--- a/test/foundationdb/mock_integration_test.go
+++ b/test/foundationdb/mock_integration_test.go
@@ -2,6 +2,7 @@ package foundationdb
import (
"context"
+ "fmt"
"sort"
"strings"
"testing"
@@ -157,14 +158,20 @@ func (store *MockFoundationDBStore) ListDirectoryPrefixedEntries(ctx context.Con
continue
}
- if !eachEntryFunc(entry) {
+ resEachEntryFunc, resEachEntryFuncErr := eachEntryFunc(entry)
+ if resEachEntryFuncErr != nil {
+ err = fmt.Errorf("failed to process eachEntryFunc: %w", resEachEntryFuncErr)
break
}
+ if !resEachEntryFunc {
+ break
+ }
+
lastFileName = entry.Name()
count++
}
- return lastFileName, nil
+ return lastFileName, err
}
func (store *MockFoundationDBStore) KvPut(ctx context.Context, key []byte, value []byte) error {
@@ -390,9 +397,9 @@ func TestMockFoundationDBStore_DirectoryOperations(t *testing.T) {
// Test ListDirectoryEntries
var listedFiles []string
- lastFileName, err := store.ListDirectoryEntries(ctx, testDir, "", true, 100, func(entry *filer.Entry) bool {
+ lastFileName, err := store.ListDirectoryEntries(ctx, testDir, "", true, 100, func(entry *filer.Entry) (bool, error) {
listedFiles = append(listedFiles, entry.Name())
- return true
+ return true, nil
})
if err != nil {
t.Fatalf("ListDirectoryEntries failed: %v", err)
@@ -409,9 +416,9 @@ func TestMockFoundationDBStore_DirectoryOperations(t *testing.T) {
// Verify children are deleted
var remainingFiles []string
- _, err = store.ListDirectoryEntries(ctx, testDir, "", true, 100, func(entry *filer.Entry) bool {
+ _, err = store.ListDirectoryEntries(ctx, testDir, "", true, 100, func(entry *filer.Entry) (bool, error) {
remainingFiles = append(remainingFiles, entry.Name())
- return true
+ return true, nil
})
if err != nil {
t.Fatalf("ListDirectoryEntries after delete failed: %v", err)
diff --git a/weed/filer/abstract_sql/abstract_sql_store.go b/weed/filer/abstract_sql/abstract_sql_store.go
index a83b33341..0b27104cf 100644
--- a/weed/filer/abstract_sql/abstract_sql_store.go
+++ b/weed/filer/abstract_sql/abstract_sql_store.go
@@ -326,17 +326,23 @@ func (store *AbstractSqlStore) ListDirectoryPrefixedEntries(ctx context.Context,
return lastFileName, fmt.Errorf("scan decode %s : %v", entry.FullPath, err)
}
- if !eachEntryFunc(entry) {
+ resEachEntryFunc, resEachEntryFuncErr := eachEntryFunc(entry)
+ if resEachEntryFuncErr != nil {
+ err = fmt.Errorf("failed to process eachEntryFunc: %w", resEachEntryFuncErr)
+ break
+ }
+
+ if !resEachEntryFunc {
break
}
}
- return lastFileName, nil
+ return lastFileName, err
}
func (store *AbstractSqlStore) ListDirectoryEntries(ctx context.Context, dirPath util.FullPath, startFileName string, includeStartFile bool, limit int64, eachEntryFunc filer.ListEachEntryFunc) (lastFileName string, err error) {
- return store.ListDirectoryPrefixedEntries(ctx, dirPath, startFileName, includeStartFile, limit, "", nil)
+ return store.ListDirectoryPrefixedEntries(ctx, dirPath, startFileName, includeStartFile, limit, "", eachEntryFunc)
}
func (store *AbstractSqlStore) Shutdown() {
diff --git a/weed/filer/arangodb/arangodb_store.go b/weed/filer/arangodb/arangodb_store.go
index 0a3a06d16..7b2184c62 100644
--- a/weed/filer/arangodb/arangodb_store.go
+++ b/weed/filer/arangodb/arangodb_store.go
@@ -335,7 +335,13 @@ sort d.name asc
break
}
- if !eachEntryFunc(entry) {
+ resEachEntryFunc, resEachEntryFuncErr := eachEntryFunc(entry)
+ if resEachEntryFuncErr != nil {
+ err = fmt.Errorf("failed to process eachEntryFunc: %w", resEachEntryFuncErr)
+ break
+ }
+
+ if !resEachEntryFunc {
break
}
diff --git a/weed/filer/cassandra/cassandra_store.go b/weed/filer/cassandra/cassandra_store.go
index 0d0c17e1d..968a2b3c3 100644
--- a/weed/filer/cassandra/cassandra_store.go
+++ b/weed/filer/cassandra/cassandra_store.go
@@ -206,12 +206,23 @@ func (store *CassandraStore) ListDirectoryEntries(ctx context.Context, dirPath u
glog.V(0).InfofCtx(ctx, "list %s : %v", entry.FullPath, err)
break
}
- if !eachEntryFunc(entry) {
+
+ resEachEntryFunc, resEachEntryFuncErr := eachEntryFunc(entry)
+ if resEachEntryFuncErr != nil {
+ err = fmt.Errorf("failed to process eachEntryFunc for entry %q: %w", entry.FullPath, resEachEntryFuncErr)
+ glog.V(0).InfofCtx(ctx, "failed to process eachEntryFunc for entry %q: %v", entry.FullPath, resEachEntryFuncErr)
+ break
+ }
+
+ if !resEachEntryFunc {
break
}
}
- if err = iter.Close(); err != nil {
- glog.V(0).InfofCtx(ctx, "list iterator close: %v", err)
+ if errClose := iter.Close(); errClose != nil {
+ glog.V(0).InfofCtx(ctx, "list iterator close: %v", errClose)
+ if err == nil {
+ return lastFileName, errClose
+ }
}
return lastFileName, err
diff --git a/weed/filer/cassandra2/cassandra_store.go b/weed/filer/cassandra2/cassandra_store.go
index 8ff7f5874..7ce3d32c1 100644
--- a/weed/filer/cassandra2/cassandra_store.go
+++ b/weed/filer/cassandra2/cassandra_store.go
@@ -206,12 +206,24 @@ func (store *Cassandra2Store) ListDirectoryEntries(ctx context.Context, dirPath
glog.V(0).InfofCtx(ctx, "list %s : %v", entry.FullPath, err)
break
}
- if !eachEntryFunc(entry) {
+
+ resEachEntryFunc, resEachEntryFuncErr := eachEntryFunc(entry)
+ if resEachEntryFuncErr != nil {
+ err = fmt.Errorf("failed to process eachEntryFunc for entry %q: %w", entry.FullPath, resEachEntryFuncErr)
+ glog.V(0).InfofCtx(ctx, "failed to process eachEntryFunc for entry %q: %v", entry.FullPath, resEachEntryFuncErr)
+ break
+ }
+
+ if !resEachEntryFunc {
break
}
}
- if err = iter.Close(); err != nil {
- glog.V(0).InfofCtx(ctx, "list iterator close: %v", err)
+
+ if errClose := iter.Close(); errClose != nil {
+ glog.V(0).InfofCtx(ctx, "list iterator close: %v", errClose)
+ if err == nil {
+ return lastFileName, errClose
+ }
}
return lastFileName, err
diff --git a/weed/filer/elastic/v7/elastic_store.go b/weed/filer/elastic/v7/elastic_store.go
index 5b88025e4..159330dec 100644
--- a/weed/filer/elastic/v7/elastic_store.go
+++ b/weed/filer/elastic/v7/elastic_store.go
@@ -198,12 +198,12 @@ func (store *ElasticStore) deleteEntry(ctx context.Context, index, id string) (e
}
func (store *ElasticStore) DeleteFolderChildren(ctx context.Context, fullpath weed_util.FullPath) (err error) {
- _, err = store.ListDirectoryEntries(ctx, fullpath, "", false, math.MaxInt32, func(entry *filer.Entry) bool {
+ _, err = store.ListDirectoryEntries(ctx, fullpath, "", false, math.MaxInt32, func(entry *filer.Entry) (bool, error) {
if err := store.DeleteEntry(ctx, entry.FullPath); err != nil {
glog.ErrorfCtx(ctx, "elastic delete %s: %v.", entry.FullPath, err)
- return false
+ return false, err
}
- return true
+ return true, nil
})
return
}
@@ -258,9 +258,17 @@ func (store *ElasticStore) listDirectoryEntries(
if fileName == startFileName && !inclusive {
continue
}
- if !eachEntryFunc(esEntry.Entry) {
- break
+
+ resEachEntryFunc, resEachEntryFuncErr := eachEntryFunc(esEntry.Entry)
+ if resEachEntryFuncErr != nil {
+ glog.ErrorfCtx(ctx, "failed to process eachEntryFunc for entry %q: %v", fileName, resEachEntryFuncErr)
+ return lastFileName, fmt.Errorf("failed to process eachEntryFunc for entry %q: %w", fileName, resEachEntryFuncErr)
}
+
+ if !resEachEntryFunc {
+ return lastFileName, nil
+ }
+
lastFileName = fileName
}
}
diff --git a/weed/filer/etcd/etcd_store.go b/weed/filer/etcd/etcd_store.go
index d300a7048..0b83bacc8 100644
--- a/weed/filer/etcd/etcd_store.go
+++ b/weed/filer/etcd/etcd_store.go
@@ -9,7 +9,7 @@ import (
"go.etcd.io/etcd/client/pkg/v3/transport"
- "go.etcd.io/etcd/client/v3"
+ clientv3 "go.etcd.io/etcd/client/v3"
"github.com/seaweedfs/seaweedfs/weed/filer"
"github.com/seaweedfs/seaweedfs/weed/glog"
@@ -212,9 +212,17 @@ func (store *EtcdStore) ListDirectoryPrefixedEntries(ctx context.Context, dirPat
glog.V(0).InfofCtx(ctx, "list %s : %v", entry.FullPath, err)
break
}
- if !eachEntryFunc(entry) {
+
+ resEachEntryFunc, resEachEntryFuncErr := eachEntryFunc(entry)
+ if resEachEntryFuncErr != nil {
+ err = fmt.Errorf("failed to process eachEntryFunc: %w", resEachEntryFuncErr)
break
}
+
+ if !resEachEntryFunc {
+ break
+ }
+
lastFileName = fileName
}
diff --git a/weed/filer/filer.go b/weed/filer/filer.go
index b68004a8b..f9f3d4fb2 100644
--- a/weed/filer/filer.go
+++ b/weed/filer/filer.go
@@ -369,10 +369,11 @@ func (f *Filer) doListDirectoryEntries(ctx context.Context, p util.FullPath, sta
var s3ExpiredEntries []*Entry
var hasValidEntries bool
- lastFileName, err = f.Store.ListDirectoryPrefixedEntries(ctx, p, startFileName, inclusive, limit, prefix, func(entry *Entry) bool {
+ lastFileName, err = f.Store.ListDirectoryPrefixedEntries(ctx, p, startFileName, inclusive, limit, prefix, func(entry *Entry) (bool, error) {
select {
case <-ctx.Done():
- return false
+ glog.Errorf("Context is done.")
+ return false, fmt.Errorf("context canceled: %w", ctx.Err())
default:
if entry.TtlSec > 0 {
if entry.IsExpireS3Enabled() {
@@ -380,13 +381,13 @@ func (f *Filer) doListDirectoryEntries(ctx context.Context, p util.FullPath, sta
// Collect for deletion after iteration completes to avoid DB deadlock
s3ExpiredEntries = append(s3ExpiredEntries, entry)
expiredCount++
- return true
+ return true, nil
}
} else if entry.Crtime.Add(time.Duration(entry.TtlSec) * time.Second).Before(time.Now()) {
// Collect for deletion after iteration completes to avoid DB deadlock
expiredEntries = append(expiredEntries, entry)
expiredCount++
- return true
+ return true, nil
}
}
// Track that we found at least one valid (non-expired) entry
@@ -496,9 +497,9 @@ func (f *Filer) DeleteEmptyParentDirectories(ctx context.Context, dirPath util.F
// IsDirectoryEmpty checks if a directory contains any entries
func (f *Filer) IsDirectoryEmpty(ctx context.Context, dirPath util.FullPath) (bool, error) {
isEmpty := true
- _, err := f.Store.ListDirectoryPrefixedEntries(ctx, dirPath, "", true, 1, "", func(entry *Entry) bool {
+ _, err := f.Store.ListDirectoryPrefixedEntries(ctx, dirPath, "", true, 1, "", func(entry *Entry) (bool, error) {
isEmpty = false
- return false // Stop after first entry
+ return false, nil // Stop after first entry
})
return isEmpty, err
}
diff --git a/weed/filer/filer_search.go b/weed/filer/filer_search.go
index 6c7ba0747..294fc0e7f 100644
--- a/weed/filer/filer_search.go
+++ b/weed/filer/filer_search.go
@@ -2,10 +2,11 @@ package filer
import (
"context"
- "github.com/seaweedfs/seaweedfs/weed/util"
"math"
"path/filepath"
"strings"
+
+ "github.com/seaweedfs/seaweedfs/weed/util"
)
func splitPattern(pattern string) (prefix string, restPattern string) {
@@ -27,9 +28,9 @@ func (f *Filer) ListDirectoryEntries(ctx context.Context, p util.FullPath, start
limit = math.MaxInt32 - 1
}
- _, err = f.StreamListDirectoryEntries(ctx, p, startFileName, inclusive, limit+1, prefix, namePattern, namePatternExclude, func(entry *Entry) bool {
+ _, err = f.StreamListDirectoryEntries(ctx, p, startFileName, inclusive, limit+1, prefix, namePattern, namePatternExclude, func(entry *Entry) (bool, error) {
entries = append(entries, entry)
- return true
+ return true, nil
})
hasMore = int64(len(entries)) >= limit+1
@@ -68,24 +69,32 @@ func (f *Filer) doListPatternMatchedEntries(ctx context.Context, p util.FullPath
return 0, lastFileName, err
}
- lastFileName, err = f.doListValidEntries(ctx, p, startFileName, inclusive, limit, prefix, func(entry *Entry) bool {
+ lastFileName, err = f.doListValidEntries(ctx, p, startFileName, inclusive, limit, prefix, func(entry *Entry) (bool, error) {
nameToTest := entry.Name()
if len(namePatternExclude) > 0 {
if matched, matchErr := filepath.Match(namePatternExclude, nameToTest); matchErr == nil && matched {
missedCount++
- return true
+ return true, nil
}
}
if len(restNamePattern) > 0 {
if matched, matchErr := filepath.Match(restNamePattern, nameToTest[len(prefix):]); matchErr == nil && !matched {
missedCount++
- return true
+ return true, nil
}
}
- if !eachEntryFunc(entry) {
- return false
+
+ res, resErr := eachEntryFunc(entry)
+
+ if resErr != nil {
+ return false, resErr
+ }
+
+ if !res {
+ return false, nil
}
- return true
+
+ return true, nil
})
if err != nil {
return
diff --git a/weed/filer/filerstore.go b/weed/filer/filerstore.go
index 87e212ea5..968943608 100644
--- a/weed/filer/filerstore.go
+++ b/weed/filer/filerstore.go
@@ -3,8 +3,9 @@ package filer
import (
"context"
"errors"
- "github.com/seaweedfs/seaweedfs/weed/util"
"io"
+
+ "github.com/seaweedfs/seaweedfs/weed/util"
)
const CountEntryChunksForGzip = 50
@@ -16,7 +17,7 @@ var (
ErrKvNotFound = errors.New("kv: not found")
)
-type ListEachEntryFunc func(entry *Entry) bool
+type ListEachEntryFunc func(entry *Entry) (bool, error)
type FilerStore interface {
// GetName gets the name to locate the configuration in filer.toml file
diff --git a/weed/filer/filerstore_translate_path.go b/weed/filer/filerstore_translate_path.go
index 900154fde..97d388466 100644
--- a/weed/filer/filerstore_translate_path.go
+++ b/weed/filer/filerstore_translate_path.go
@@ -2,9 +2,10 @@ package filer
import (
"context"
- "github.com/seaweedfs/seaweedfs/weed/util"
"math"
"strings"
+
+ "github.com/seaweedfs/seaweedfs/weed/util"
)
var (
@@ -111,7 +112,7 @@ func (t *FilerStorePathTranslator) ListDirectoryEntries(ctx context.Context, dir
newFullPath := t.translatePath(dirPath)
- return t.actualStore.ListDirectoryEntries(ctx, newFullPath, startFileName, includeStartFile, limit, func(entry *Entry) bool {
+ return t.actualStore.ListDirectoryEntries(ctx, newFullPath, startFileName, includeStartFile, limit, func(entry *Entry) (bool, error) {
entry.FullPath = dirPath[:len(t.storeRoot)-1] + entry.FullPath
return eachEntryFunc(entry)
})
@@ -125,7 +126,7 @@ func (t *FilerStorePathTranslator) ListDirectoryPrefixedEntries(ctx context.Cont
limit = math.MaxInt32 - 1
}
- return t.actualStore.ListDirectoryPrefixedEntries(ctx, newFullPath, startFileName, includeStartFile, limit, prefix, func(entry *Entry) bool {
+ return t.actualStore.ListDirectoryPrefixedEntries(ctx, newFullPath, startFileName, includeStartFile, limit, prefix, func(entry *Entry) (bool, error) {
entry.FullPath = dirPath[:len(t.storeRoot)-1] + entry.FullPath
return eachEntryFunc(entry)
})
diff --git a/weed/filer/filerstore_wrapper.go b/weed/filer/filerstore_wrapper.go
index ea039d444..8694db984 100644
--- a/weed/filer/filerstore_wrapper.go
+++ b/weed/filer/filerstore_wrapper.go
@@ -2,6 +2,7 @@ package filer
import (
"context"
+ "fmt"
"io"
"math"
"strings"
@@ -254,7 +255,7 @@ func (fsw *FilerStoreWrapper) ListDirectoryEntries(ctx context.Context, dirPath
}()
// glog.V(4).Infof("ListDirectoryEntries %s from %s limit %d", dirPath, startFileName, limit)
- return actualStore.ListDirectoryEntries(ctx, dirPath, startFileName, includeStartFile, limit, func(entry *Entry) bool {
+ return actualStore.ListDirectoryEntries(ctx, dirPath, startFileName, includeStartFile, limit, func(entry *Entry) (bool, error) {
fsw.maybeReadHardLink(ctx, entry)
filer_pb.AfterEntryDeserialization(entry.GetChunks())
return eachEntryFunc(entry)
@@ -273,7 +274,7 @@ func (fsw *FilerStoreWrapper) ListDirectoryPrefixedEntries(ctx context.Context,
limit = math.MaxInt32 - 1
}
// glog.V(4).Infof("ListDirectoryPrefixedEntries %s from %s prefix %s limit %d", dirPath, startFileName, prefix, limit)
- adjustedEntryFunc := func(entry *Entry) bool {
+ adjustedEntryFunc := func(entry *Entry) (bool, error) {
fsw.maybeReadHardLink(ctx, entry)
filer_pb.AfterEntryDeserialization(entry.GetChunks())
return eachEntryFunc(entry)
@@ -293,9 +294,9 @@ func (fsw *FilerStoreWrapper) prefixFilterEntries(ctx context.Context, dirPath u
}
var notPrefixed []*Entry
- lastFileName, err = actualStore.ListDirectoryEntries(ctx, dirPath, startFileName, includeStartFile, limit, func(entry *Entry) bool {
+ lastFileName, err = actualStore.ListDirectoryEntries(ctx, dirPath, startFileName, includeStartFile, limit, func(entry *Entry) (bool, error) {
notPrefixed = append(notPrefixed, entry)
- return true
+ return true, nil
})
if err != nil {
return
@@ -306,7 +307,14 @@ func (fsw *FilerStoreWrapper) prefixFilterEntries(ctx context.Context, dirPath u
for _, entry := range notPrefixed {
if strings.HasPrefix(entry.Name(), prefix) {
count++
- if !eachEntryFunc(entry) {
+ res, resErr := eachEntryFunc(entry)
+
+ if resErr != nil {
+ err = fmt.Errorf("failed to process eachEntryFunc for entry %q: %w", entry.Name(), resErr)
+ return
+ }
+
+ if !res {
return
}
if count >= limit {
@@ -316,9 +324,9 @@ func (fsw *FilerStoreWrapper) prefixFilterEntries(ctx context.Context, dirPath u
}
if count < limit && lastFileName < prefix {
notPrefixed = notPrefixed[:0]
- lastFileName, err = actualStore.ListDirectoryEntries(ctx, dirPath, lastFileName, false, limit, func(entry *Entry) bool {
+ lastFileName, err = actualStore.ListDirectoryEntries(ctx, dirPath, lastFileName, false, limit, func(entry *Entry) (bool, error) {
notPrefixed = append(notPrefixed, entry)
- return true
+ return true, nil
})
if err != nil {
return
diff --git a/weed/filer/foundationdb/foundationdb_store.go b/weed/filer/foundationdb/foundationdb_store.go
index 509ee4b86..720afd7bc 100644
--- a/weed/filer/foundationdb/foundationdb_store.go
+++ b/weed/filer/foundationdb/foundationdb_store.go
@@ -318,12 +318,12 @@ func (store *FoundationDBStore) deleteFolderChildrenInBatches(ctx context.Contex
var subDirectories []util.FullPath
// List entries - we'll process BATCH_SIZE at a time
- _, err := store.ListDirectoryEntries(ctxNoTxn, fullpath, "", true, int64(BATCH_SIZE), func(entry *filer.Entry) bool {
+ _, err := store.ListDirectoryEntries(ctxNoTxn, fullpath, "", true, int64(BATCH_SIZE), func(entry *filer.Entry) (bool, error) {
entriesToDelete = append(entriesToDelete, entry.FullPath)
if entry.IsDirectory() {
subDirectories = append(subDirectories, entry.FullPath)
}
- return true
+ return true, nil
})
if err != nil {
@@ -474,9 +474,15 @@ func (store *FoundationDBStore) ListDirectoryPrefixedEntries(ctx context.Context
continue
}
- if !eachEntryFunc(entry) {
+ resEachEntryFunc, resEachEntryFuncErr := eachEntryFunc(entry)
+ if resEachEntryFuncErr != nil {
+ glog.ErrorfCtx(ctx, "failed to process eachEntryFunc for entry %q: %v", fileName, resEachEntryFuncErr)
+ return lastFileName, fmt.Errorf("failed to process eachEntryFunc for entry %q: %w", fileName, resEachEntryFuncErr)
+ }
+ if !resEachEntryFunc {
break
}
+
lastFileName = fileName
}
diff --git a/weed/filer/foundationdb/foundationdb_store_test.go b/weed/filer/foundationdb/foundationdb_store_test.go
index 215c98c76..73255d67d 100644
--- a/weed/filer/foundationdb/foundationdb_store_test.go
+++ b/weed/filer/foundationdb/foundationdb_store_test.go
@@ -372,16 +372,16 @@ func containsString(s, substr string) bool {
func TestFoundationDBStore_DeleteFolderChildrenWithBatching(t *testing.T) {
// This test validates that DeleteFolderChildren always uses batching
// to safely handle large directories, regardless of transaction context
-
+
store := getTestStore(t)
defer store.Shutdown()
-
+
ctx := context.Background()
testDir := util.FullPath(fmt.Sprintf("/test_batch_delete_%d", time.Now().UnixNano()))
-
+
// Create a large directory (> 100 entries to trigger batching)
const NUM_ENTRIES = 250
-
+
t.Logf("Creating %d test entries...", NUM_ENTRIES)
for i := 0; i < NUM_ENTRIES; i++ {
entry := &filer.Entry{
@@ -397,11 +397,11 @@ func TestFoundationDBStore_DeleteFolderChildrenWithBatching(t *testing.T) {
t.Fatalf("Failed to insert test entry %d: %v", i, err)
}
}
-
+
// Test 1: DeleteFolderChildren outside transaction should succeed
t.Run("OutsideTransaction", func(t *testing.T) {
testDir1 := util.FullPath(fmt.Sprintf("/test_batch_1_%d", time.Now().UnixNano()))
-
+
// Create entries
for i := 0; i < NUM_ENTRIES; i++ {
entry := &filer.Entry{
@@ -415,28 +415,28 @@ func TestFoundationDBStore_DeleteFolderChildrenWithBatching(t *testing.T) {
}
store.InsertEntry(ctx, entry)
}
-
+
// Delete with batching
err := store.DeleteFolderChildren(ctx, testDir1)
if err != nil {
t.Errorf("DeleteFolderChildren outside transaction should succeed, got error: %v", err)
}
-
+
// Verify all entries deleted
var count int
- store.ListDirectoryEntries(ctx, testDir1, "", true, 1000, func(entry *filer.Entry) bool {
+ store.ListDirectoryEntries(ctx, testDir1, "", true, 1000, func(entry *filer.Entry) (bool, error) {
count++
- return true
+ return true, nil
})
if count != 0 {
t.Errorf("Expected all entries to be deleted, found %d", count)
}
})
-
+
// Test 2: DeleteFolderChildren with transaction context - uses its own batched transactions
t.Run("WithTransactionContext", func(t *testing.T) {
testDir2 := util.FullPath(fmt.Sprintf("/test_batch_2_%d", time.Now().UnixNano()))
-
+
// Create entries
for i := 0; i < NUM_ENTRIES; i++ {
entry := &filer.Entry{
@@ -450,38 +450,38 @@ func TestFoundationDBStore_DeleteFolderChildrenWithBatching(t *testing.T) {
}
store.InsertEntry(ctx, entry)
}
-
+
// Start a transaction (DeleteFolderChildren will ignore it and use its own batching)
txCtx, err := store.BeginTransaction(ctx)
if err != nil {
t.Fatalf("BeginTransaction failed: %v", err)
}
-
+
// Delete large directory - should succeed with batching
err = store.DeleteFolderChildren(txCtx, testDir2)
if err != nil {
t.Errorf("DeleteFolderChildren should succeed with batching even when transaction context present, got: %v", err)
}
-
+
// Rollback transaction (DeleteFolderChildren used its own transactions, so this doesn't affect deletions)
store.RollbackTransaction(txCtx)
-
+
// Verify entries are still deleted (because DeleteFolderChildren managed its own transactions)
var count int
- store.ListDirectoryEntries(ctx, testDir2, "", true, 1000, func(entry *filer.Entry) bool {
+ store.ListDirectoryEntries(ctx, testDir2, "", true, 1000, func(entry *filer.Entry) (bool, error) {
count++
- return true
+ return true, nil
})
-
+
if count != 0 {
t.Errorf("Expected all entries to be deleted, found %d (DeleteFolderChildren uses its own transactions)", count)
}
})
-
+
// Test 3: Nested directories with batching
t.Run("NestedDirectories", func(t *testing.T) {
testDir3 := util.FullPath(fmt.Sprintf("/test_batch_3_%d", time.Now().UnixNano()))
-
+
// Create nested structure
for i := 0; i < 50; i++ {
// Files in root
@@ -495,7 +495,7 @@ func TestFoundationDBStore_DeleteFolderChildrenWithBatching(t *testing.T) {
},
}
store.InsertEntry(ctx, entry)
-
+
// Subdirectory
subDir := &filer.Entry{
FullPath: util.NewFullPath(string(testDir3), fmt.Sprintf("dir_%02d", i)),
@@ -507,7 +507,7 @@ func TestFoundationDBStore_DeleteFolderChildrenWithBatching(t *testing.T) {
},
}
store.InsertEntry(ctx, subDir)
-
+
// Files in subdirectory
for j := 0; j < 3; j++ {
subEntry := &filer.Entry{
@@ -522,24 +522,24 @@ func TestFoundationDBStore_DeleteFolderChildrenWithBatching(t *testing.T) {
store.InsertEntry(ctx, subEntry)
}
}
-
+
// Delete all with batching
err := store.DeleteFolderChildren(ctx, testDir3)
if err != nil {
t.Errorf("DeleteFolderChildren should handle nested directories, got: %v", err)
}
-
+
// Verify all deleted
var count int
- store.ListDirectoryEntries(ctx, testDir3, "", true, 1000, func(entry *filer.Entry) bool {
+ store.ListDirectoryEntries(ctx, testDir3, "", true, 1000, func(entry *filer.Entry) (bool, error) {
count++
- return true
+ return true, nil
})
if count != 0 {
t.Errorf("Expected all nested entries to be deleted, found %d", count)
}
})
-
+
// Cleanup
store.DeleteFolderChildren(ctx, testDir)
}
diff --git a/weed/filer/hbase/hbase_store.go b/weed/filer/hbase/hbase_store.go
index 8642146e6..7663fef9d 100644
--- a/weed/filer/hbase/hbase_store.go
+++ b/weed/filer/hbase/hbase_store.go
@@ -4,13 +4,14 @@ import (
"bytes"
"context"
"fmt"
+ "io"
+
"github.com/seaweedfs/seaweedfs/weed/filer"
"github.com/seaweedfs/seaweedfs/weed/glog"
"github.com/seaweedfs/seaweedfs/weed/pb/filer_pb"
"github.com/seaweedfs/seaweedfs/weed/util"
"github.com/tsuna/gohbase"
"github.com/tsuna/gohbase/hrpc"
- "io"
)
func init() {
@@ -163,12 +164,12 @@ func (store *HbaseStore) ListDirectoryPrefixedEntries(ctx context.Context, dirPa
scanner := store.Client.Scan(scan)
defer scanner.Close()
for {
- res, err := scanner.Next()
- if err == io.EOF {
+ res, scanErr := scanner.Next()
+ if scanErr == io.EOF {
break
}
- if err != nil {
- return lastFileName, err
+ if scanErr != nil {
+ return lastFileName, scanErr
}
if len(res.Cells) == 0 {
continue
@@ -206,12 +207,18 @@ func (store *HbaseStore) ListDirectoryPrefixedEntries(ctx context.Context, dirPa
glog.V(0).InfofCtx(ctx, "list %s : %v", entry.FullPath, err)
break
}
- if !eachEntryFunc(entry) {
+
+ resEachEntryFunc, resEachEntryFuncErr := eachEntryFunc(entry)
+ if resEachEntryFuncErr != nil {
+ return lastFileName, fmt.Errorf("failed to process eachEntryFunc for entry %q: %w", entry.FullPath, resEachEntryFuncErr)
+ }
+
+ if !resEachEntryFunc {
break
}
}
- return lastFileName, nil
+ return lastFileName, err
}
func (store *HbaseStore) BeginTransaction(ctx context.Context) (context.Context, error) {
diff --git a/weed/filer/leveldb/leveldb_store.go b/weed/filer/leveldb/leveldb_store.go
index ff1465c23..fea6e0a3d 100644
--- a/weed/filer/leveldb/leveldb_store.go
+++ b/weed/filer/leveldb/leveldb_store.go
@@ -209,7 +209,14 @@ func (store *LevelDBStore) ListDirectoryPrefixedEntries(ctx context.Context, dir
glog.V(0).InfofCtx(ctx, "list %s : %v", entry.FullPath, err)
break
}
- if !eachEntryFunc(entry) {
+
+ resEachEntryFunc, resEachEntryFuncErr := eachEntryFunc(entry)
+ if resEachEntryFuncErr != nil {
+ err = fmt.Errorf("failed to process eachEntryFunc: %w", resEachEntryFuncErr)
+ break
+ }
+
+ if !resEachEntryFunc {
break
}
}
diff --git a/weed/filer/leveldb2/leveldb2_store.go b/weed/filer/leveldb2/leveldb2_store.go
index 1bd6fe597..c3bb2ac55 100644
--- a/weed/filer/leveldb2/leveldb2_store.go
+++ b/weed/filer/leveldb2/leveldb2_store.go
@@ -216,7 +216,14 @@ func (store *LevelDB2Store) ListDirectoryPrefixedEntries(ctx context.Context, di
glog.V(0).InfofCtx(ctx, "list %s : %v", entry.FullPath, err)
break
}
- if !eachEntryFunc(entry) {
+
+ resEachEntryFunc, resEachEntryFuncErr := eachEntryFunc(entry)
+ if resEachEntryFuncErr != nil {
+ err = fmt.Errorf("failed to process eachEntryFunc: %w", resEachEntryFuncErr)
+ break
+ }
+
+ if !resEachEntryFunc {
break
}
}
diff --git a/weed/filer/leveldb3/leveldb3_store.go b/weed/filer/leveldb3/leveldb3_store.go
index eb8b4e578..439143ceb 100644
--- a/weed/filer/leveldb3/leveldb3_store.go
+++ b/weed/filer/leveldb3/leveldb3_store.go
@@ -345,7 +345,14 @@ func (store *LevelDB3Store) ListDirectoryPrefixedEntries(ctx context.Context, di
glog.V(0).InfofCtx(ctx, "list %s : %v", entry.FullPath, err)
break
}
- if !eachEntryFunc(entry) {
+
+ resEachEntryFunc, resEachEntryFuncErr := eachEntryFunc(entry)
+ if resEachEntryFuncErr != nil {
+ err = fmt.Errorf("failed to process eachEntryFunc: %w", resEachEntryFuncErr)
+ break
+ }
+
+ if !resEachEntryFunc {
break
}
}
diff --git a/weed/filer/mongodb/mongodb_store.go b/weed/filer/mongodb/mongodb_store.go
index 21463dc32..f87878f08 100644
--- a/weed/filer/mongodb/mongodb_store.go
+++ b/weed/filer/mongodb/mongodb_store.go
@@ -319,14 +319,22 @@ func (store *MongodbStore) ListDirectoryPrefixedEntries(ctx context.Context, dir
break
}
- if !eachEntryFunc(entry) {
+ resEachEntryFunc, resEachEntryFuncErr := eachEntryFunc(entry)
+ if resEachEntryFuncErr != nil {
+ err = fmt.Errorf("failed to process eachEntryFunc: %w", resEachEntryFuncErr)
break
}
+ if !resEachEntryFunc {
+ break
+ }
}
- if err := cur.Close(ctx); err != nil {
- glog.V(0).InfofCtx(ctx, "list iterator close: %v", err)
+ if errClose := cur.Close(ctx); errClose != nil {
+ glog.V(0).InfofCtx(ctx, "list iterator close: %v", errClose)
+ if err == nil {
+ return lastFileName, errClose
+ }
}
return lastFileName, err
diff --git a/weed/filer/redis/universal_redis_store.go b/weed/filer/redis/universal_redis_store.go
index 407491a04..7c2a0e47b 100644
--- a/weed/filer/redis/universal_redis_store.go
+++ b/weed/filer/redis/universal_redis_store.go
@@ -191,7 +191,14 @@ func (store *UniversalRedisStore) ListDirectoryEntries(ctx context.Context, dirP
continue
}
}
- if !eachEntryFunc(entry) {
+
+ resEachEntryFunc, resEachEntryFuncErr := eachEntryFunc(entry)
+ if resEachEntryFuncErr != nil {
+ err = fmt.Errorf("failed to process eachEntryFunc: %w", resEachEntryFuncErr)
+ break
+ }
+
+ if !resEachEntryFunc {
break
}
}
diff --git a/weed/filer/redis2/universal_redis_store.go b/weed/filer/redis2/universal_redis_store.go
index 1fa384f29..0dbf7a72a 100644
--- a/weed/filer/redis2/universal_redis_store.go
+++ b/weed/filer/redis2/universal_redis_store.go
@@ -206,7 +206,14 @@ func (store *UniversalRedis2Store) ListDirectoryEntries(ctx context.Context, dir
continue
}
}
- if !eachEntryFunc(entry) {
+
+ resEachEntryFunc, resEachEntryFuncErr := eachEntryFunc(entry)
+ if resEachEntryFuncErr != nil {
+ err = fmt.Errorf("failed to process eachEntryFunc: %w", resEachEntryFuncErr)
+ break
+ }
+
+ if !resEachEntryFunc {
break
}
}
diff --git a/weed/filer/redis3/universal_redis_store.go b/weed/filer/redis3/universal_redis_store.go
index 699683d91..84cd42908 100644
--- a/weed/filer/redis3/universal_redis_store.go
+++ b/weed/filer/redis3/universal_redis_store.go
@@ -140,6 +140,7 @@ func (store *UniversalRedis3Store) ListDirectoryEntries(ctx context.Context, dir
dirListKey := genDirectoryListKey(string(dirPath))
counter := int64(0)
+ var callbackErr error
err = listChildren(ctx, store, dirListKey, startFileName, func(fileName string) bool {
if startFileName != "" {
if !includeStartFile && startFileName == fileName {
@@ -164,9 +165,18 @@ func (store *UniversalRedis3Store) ListDirectoryEntries(ctx context.Context, dir
}
}
counter++
- if !eachEntryFunc(entry) {
+
+ resEachEntryFunc, resEachEntryFuncErr := eachEntryFunc(entry)
+ if resEachEntryFuncErr != nil {
+ glog.V(0).InfofCtx(ctx, "failed to process eachEntryFunc for entry %q: %v", fileName, resEachEntryFuncErr)
+ callbackErr = resEachEntryFuncErr
+ return false
+ }
+
+ if !resEachEntryFunc {
return false
}
+
if counter >= limit {
return false
}
@@ -174,6 +184,13 @@ func (store *UniversalRedis3Store) ListDirectoryEntries(ctx context.Context, dir
return true
})
+ if callbackErr != nil {
+ return lastFileName, fmt.Errorf(
+ "failed to process eachEntryFunc for dir %q, entry %q: %w",
+ dirPath, lastFileName, callbackErr,
+ )
+ }
+
return lastFileName, err
}
diff --git a/weed/filer/redis_lua/universal_redis_store.go b/weed/filer/redis_lua/universal_redis_store.go
index 20b83a2a9..35f6d4991 100644
--- a/weed/filer/redis_lua/universal_redis_store.go
+++ b/weed/filer/redis_lua/universal_redis_store.go
@@ -173,7 +173,14 @@ func (store *UniversalRedisLuaStore) ListDirectoryEntries(ctx context.Context, d
continue
}
}
- if !eachEntryFunc(entry) {
+
+ resEachEntryFunc, resEachEntryFuncErr := eachEntryFunc(entry)
+ if resEachEntryFuncErr != nil {
+ err = fmt.Errorf("failed to process eachEntryFunc: %w", resEachEntryFuncErr)
+ break
+ }
+
+ if !resEachEntryFunc {
break
}
}
diff --git a/weed/filer/rocksdb/rocksdb_store.go b/weed/filer/rocksdb/rocksdb_store.go
index 044dc1342..2283efa6f 100644
--- a/weed/filer/rocksdb/rocksdb_store.go
+++ b/weed/filer/rocksdb/rocksdb_store.go
@@ -251,6 +251,7 @@ func (store *RocksDBStore) ListDirectoryPrefixedEntries(ctx context.Context, dir
defer ro.Destroy()
ro.SetFillCache(false)
+ var callbackErr error
iter := store.db.NewIterator(ro)
defer iter.Close()
err = enumerate(iter, directoryPrefix, lastFileStart, includeStartFile, limit, startFileName, func(key, value []byte) bool {
@@ -269,11 +270,28 @@ func (store *RocksDBStore) ListDirectoryPrefixedEntries(ctx context.Context, dir
glog.V(0).InfofCtx(ctx, "list %s : %v", entry.FullPath, err)
return false
}
- if !eachEntryFunc(entry) {
+
+ resEachEntryFunc, resEachEntryFuncErr := eachEntryFunc(entry)
+ if resEachEntryFuncErr != nil {
+ glog.V(0).InfofCtx(ctx, "failed to process eachEntryFunc for entry %q: %v", fileName, resEachEntryFuncErr)
+ callbackErr = resEachEntryFuncErr
+ return false
+ }
+
+ if !resEachEntryFunc {
return false
}
+
return true
})
+
+ if callbackErr != nil {
+ return lastFileName, fmt.Errorf(
+ "failed to process eachEntryFunc for dir %q, entry %q: %w",
+ dirPath, lastFileName, callbackErr,
+ )
+ }
+
if err != nil {
return lastFileName, fmt.Errorf("prefix list %s : %v", dirPath, err)
}
diff --git a/weed/filer/store_test/test_suite.go b/weed/filer/store_test/test_suite.go
index fda694f26..ae334cdd8 100644
--- a/weed/filer/store_test/test_suite.go
+++ b/weed/filer/store_test/test_suite.go
@@ -3,11 +3,12 @@ package store_test
import (
"context"
"fmt"
+ "os"
+ "testing"
+
"github.com/seaweedfs/seaweedfs/weed/filer"
"github.com/seaweedfs/seaweedfs/weed/util"
"github.com/stretchr/testify/assert"
- "os"
- "testing"
)
func TestFilerStore(t *testing.T, store filer.FilerStore) {
@@ -23,16 +24,16 @@ func TestFilerStore(t *testing.T, store filer.FilerStore) {
{
var counter int
- lastFileName, err := store.ListDirectoryEntries(ctx, util.FullPath("/a/b/c"), "", false, 3, func(entry *filer.Entry) bool {
+ lastFileName, err := store.ListDirectoryEntries(ctx, util.FullPath("/a/b/c"), "", false, 3, func(entry *filer.Entry) (bool, error) {
counter++
- return true
+ return true, nil
})
assert.Nil(t, err, "list directory")
assert.Equal(t, 3, counter, "directory list counter")
assert.Equal(t, "f00002", lastFileName, "directory list last file")
- lastFileName, err = store.ListDirectoryEntries(ctx, util.FullPath("/a/b/c"), lastFileName, false, 1024, func(entry *filer.Entry) bool {
+ lastFileName, err = store.ListDirectoryEntries(ctx, util.FullPath("/a/b/c"), lastFileName, false, 1024, func(entry *filer.Entry) (bool, error) {
counter++
- return true
+ return true, nil
})
assert.Nil(t, err, "list directory")
assert.Equal(t, 1027, counter, "directory list counter")
diff --git a/weed/filer/tarantool/tarantool_store.go b/weed/filer/tarantool/tarantool_store.go
index 4c9f8a600..1bcd31830 100644
--- a/weed/filer/tarantool/tarantool_store.go
+++ b/weed/filer/tarantool/tarantool_store.go
@@ -305,7 +305,14 @@ func (store *TarantoolStore) ListDirectoryEntries(ctx context.Context, dirPath w
glog.V(0).InfofCtx(ctx, "list %s : %v", entry.FullPath, err)
break
}
- if !eachEntryFunc(entry) {
+
+ resEachEntryFunc, resEachEntryFuncErr := eachEntryFunc(entry)
+ if resEachEntryFuncErr != nil {
+ err = fmt.Errorf("failed to process eachEntryFunc: %w", resEachEntryFuncErr)
+ break
+ }
+
+ if !resEachEntryFunc {
break
}
}
diff --git a/weed/filer/tikv/tikv_store.go b/weed/filer/tikv/tikv_store.go
index 3708ddec5..307d2b3fb 100644
--- a/weed/filer/tikv/tikv_store.go
+++ b/weed/filer/tikv/tikv_store.go
@@ -223,6 +223,7 @@ func (store *TikvStore) ListDirectoryPrefixedEntries(ctx context.Context, dirPat
if err != nil {
return lastFileName, err
}
+ var callbackErr error
err = txn.RunInTxn(func(txn *txnkv.KVTxn) error {
iter, err := txn.Iter(lastFileStart, nil)
if err != nil {
@@ -283,12 +284,33 @@ func (store *TikvStore) ListDirectoryPrefixedEntries(ctx context.Context, dirPat
// Only increment counter for non-expired entries
i++
- if err := iter.Next(); !eachEntryFunc(entry) || err != nil {
+ resEachEntryFunc, resEachEntryFuncErr := eachEntryFunc(entry)
+ if resEachEntryFuncErr != nil {
+ glog.V(0).InfofCtx(ctx, "failed to process eachEntryFunc for entry %q: %v", fileName, resEachEntryFuncErr)
+ callbackErr = resEachEntryFuncErr
+ break
+ }
+
+ nextErr := iter.Next()
+ if nextErr != nil {
+ err = nextErr
+ break
+ }
+
+ if !resEachEntryFunc {
break
}
}
return err
})
+
+ if callbackErr != nil {
+ return lastFileName, fmt.Errorf(
+ "failed to process eachEntryFunc for dir %q, entry %q: %w",
+ dirPath, lastFileName, callbackErr,
+ )
+ }
+
if err != nil {
return lastFileName, fmt.Errorf("prefix list %s : %v", dirPath, err)
}
diff --git a/weed/filer/ydb/ydb_store.go b/weed/filer/ydb/ydb_store.go
index 90b13aa04..ee94d13e1 100644
--- a/weed/filer/ydb/ydb_store.go
+++ b/weed/filer/ydb/ydb_store.go
@@ -313,7 +313,12 @@ func (store *YdbStore) ListDirectoryPrefixedEntries(ctx context.Context, dirPath
return fmt.Errorf("decode entry %s: %w", entry.FullPath, decodeErr)
}
- if !eachEntryFunc(entry) {
+ resEachEntryFunc, resEachEntryFuncErr := eachEntryFunc(entry)
+ if resEachEntryFuncErr != nil {
+ return fmt.Errorf("failed to process eachEntryFunc for entry %q: %w", entry.FullPath, resEachEntryFuncErr)
+ }
+
+ if !resEachEntryFunc {
return nil
}
diff --git a/weed/mount/meta_cache/meta_cache.go b/weed/mount/meta_cache/meta_cache.go
index 0f0b1de30..9578aff72 100644
--- a/weed/mount/meta_cache/meta_cache.go
+++ b/weed/mount/meta_cache/meta_cache.go
@@ -146,9 +146,9 @@ func (mc *MetaCache) ListDirectoryEntries(ctx context.Context, dirPath util.Full
glog.Warningf("unsynchronized dir: %v", dirPath)
}
- _, err := mc.localStore.ListDirectoryEntries(ctx, dirPath, startFileName, includeStartFile, limit, func(entry *filer.Entry) bool {
+ _, err := mc.localStore.ListDirectoryEntries(ctx, dirPath, startFileName, includeStartFile, limit, func(entry *filer.Entry) (bool, error) {
if entry.TtlSec > 0 && entry.Crtime.Add(time.Duration(entry.TtlSec)*time.Second).Before(time.Now()) {
- return true
+ return true, nil
}
mc.mapIdFromFilerToLocal(entry)
return eachEntryFunc(entry)
diff --git a/weed/mount/weedfs_dir_read.go b/weed/mount/weedfs_dir_read.go
index 6e18b50e8..ebf0d9191 100644
--- a/weed/mount/weedfs_dir_read.go
+++ b/weed/mount/weedfs_dir_read.go
@@ -2,13 +2,14 @@ package mount
import (
"context"
+ "math"
+ "sync"
+
"github.com/hanwen/go-fuse/v2/fuse"
"github.com/seaweedfs/seaweedfs/weed/filer"
"github.com/seaweedfs/seaweedfs/weed/glog"
"github.com/seaweedfs/seaweedfs/weed/mount/meta_cache"
"github.com/seaweedfs/seaweedfs/weed/util"
- "math"
- "sync"
)
type DirectoryHandleId uint64
@@ -153,7 +154,7 @@ func (wfs *WFS) doReadDirectory(input *fuse.ReadIn, out *fuse.DirEntryList, isPl
}
var dirEntry fuse.DirEntry
- processEachEntryFn := func(entry *filer.Entry) bool {
+ processEachEntryFn := func(entry *filer.Entry) (bool, error) {
dirEntry.Name = entry.Name()
dirEntry.Mode = toSyscallMode(entry.Mode)
inode := wfs.inodeToPath.Lookup(dirPath.Child(dirEntry.Name), entry.Crtime.Unix(), entry.IsDirectory(), len(entry.HardLinkId) > 0, entry.Inode, isPlusMode)
@@ -161,13 +162,13 @@ func (wfs *WFS) doReadDirectory(input *fuse.ReadIn, out *fuse.DirEntryList, isPl
if !isPlusMode {
if !out.AddDirEntry(dirEntry) {
isEarlyTerminated = true
- return false
+ return false, nil
}
} else {
entryOut := out.AddDirLookupEntry(dirEntry)
if entryOut == nil {
isEarlyTerminated = true
- return false
+ return false, nil
}
if fh, found := wfs.fhMap.FindFileHandle(inode); found {
glog.V(4).Infof("readdir opened file %s", dirPath.Child(dirEntry.Name))
@@ -175,7 +176,7 @@ func (wfs *WFS) doReadDirectory(input *fuse.ReadIn, out *fuse.DirEntryList, isPl
}
wfs.outputFilerEntry(entryOut, inode, entry)
}
- return true
+ return true, nil
}
if input.Offset < directoryStreamBaseOffset {
@@ -206,7 +207,7 @@ func (wfs *WFS) doReadDirectory(input *fuse.ReadIn, out *fuse.DirEntryList, isPl
entryCurrentIndex := input.Offset - dh.entryStreamOffset
for uint64(len(dh.entryStream)) > entryCurrentIndex {
entry := dh.entryStream[entryCurrentIndex]
- if processEachEntryFn(entry) {
+ if process, _ := processEachEntryFn(entry); process {
lastEntryName = entry.Name()
entryCurrentIndex++
} else {
@@ -221,7 +222,7 @@ func (wfs *WFS) doReadDirectory(input *fuse.ReadIn, out *fuse.DirEntryList, isPl
glog.Errorf("dir ReadDirAll %s: %v", dirPath, err)
return fuse.EIO
}
- listErr := wfs.metaCache.ListDirectoryEntries(context.Background(), dirPath, lastEntryName, false, int64(math.MaxInt32), func(entry *filer.Entry) bool {
+ listErr := wfs.metaCache.ListDirectoryEntries(context.Background(), dirPath, lastEntryName, false, int64(math.MaxInt32), func(entry *filer.Entry) (bool, error) {
dh.entryStream = append(dh.entryStream, entry)
return processEachEntryFn(entry)
})
diff --git a/weed/server/filer_grpc_server.go b/weed/server/filer_grpc_server.go
index 02eceebde..ef5225181 100644
--- a/weed/server/filer_grpc_server.go
+++ b/weed/server/filer_grpc_server.go
@@ -56,19 +56,19 @@ func (fs *FilerServer) ListEntries(req *filer_pb.ListEntriesRequest, stream file
var listErr error
for limit > 0 {
var hasEntries bool
- lastFileName, listErr = fs.filer.StreamListDirectoryEntries(stream.Context(), util.FullPath(req.Directory), lastFileName, includeLastFile, int64(paginationLimit), req.Prefix, "", "", func(entry *filer.Entry) bool {
+ lastFileName, listErr = fs.filer.StreamListDirectoryEntries(stream.Context(), util.FullPath(req.Directory), lastFileName, includeLastFile, int64(paginationLimit), req.Prefix, "", "", func(entry *filer.Entry) (bool, error) {
hasEntries = true
if err = stream.Send(&filer_pb.ListEntriesResponse{
Entry: entry.ToProtoEntry(),
}); err != nil {
- return false
+ return false, err
}
limit--
if limit == 0 {
- return false
+ return false, nil
}
- return true
+ return true, nil
})
if listErr != nil {
diff --git a/weed/server/filer_grpc_server_traverse_meta.go b/weed/server/filer_grpc_server_traverse_meta.go
index 841e7b88b..9e317e83f 100644
--- a/weed/server/filer_grpc_server_traverse_meta.go
+++ b/weed/server/filer_grpc_server_traverse_meta.go
@@ -3,6 +3,7 @@ package weed_server
import (
"context"
"fmt"
+
"github.com/seaweedfs/seaweedfs/weed/filer"
"github.com/seaweedfs/seaweedfs/weed/glog"
"github.com/seaweedfs/seaweedfs/weed/pb/filer_pb"
@@ -63,13 +64,13 @@ func (fs *FilerServer) iterateDirectory(ctx context.Context, dirPath util.FullPa
var listErr error
for {
var hasEntries bool
- lastFileName, listErr = fs.filer.StreamListDirectoryEntries(ctx, dirPath, lastFileName, false, 1024, "", "", "", func(entry *filer.Entry) bool {
+ lastFileName, listErr = fs.filer.StreamListDirectoryEntries(ctx, dirPath, lastFileName, false, 1024, "", "", "", func(entry *filer.Entry) (bool, error) {
hasEntries = true
if fnErr := fn(entry); fnErr != nil {
err = fnErr
- return false
+ return false, err
}
- return true
+ return true, nil
})
if listErr != nil {
return listErr