aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorChris Lu <chrislusf@users.noreply.github.com>2025-10-25 00:09:18 -0700
committerGitHub <noreply@github.com>2025-10-25 00:09:18 -0700
commit6a8c53bc44beb057f64d5ba1f7ac026f8410fe04 (patch)
tree2b3ceed66edd6dd141370bad3fbfce07886840bd
parent37af41fbfeaf2e9830e25b658b8bed409fa6fae6 (diff)
downloadseaweedfs-6a8c53bc44beb057f64d5ba1f7ac026f8410fe04.tar.xz
seaweedfs-6a8c53bc44beb057f64d5ba1f7ac026f8410fe04.zip
Filer: batch deletion operations to return individual error results (#7382)
* batch deletion operations to return individual error results Modify batch deletion operations to return individual error results instead of one aggregated error, enabling better tracking of which specific files failed to delete (helping reduce orphan file issues). * Simplified logging logic * Optimized nested loop * handles the edge case where the RPC succeeds but connection cleanup fails * simplify * simplify * ignore 'not found' errors here
-rw-r--r--weed/filer/filer_deletion.go38
-rw-r--r--weed/operation/chunked_file.go8
-rw-r--r--weed/operation/delete_content.go66
-rw-r--r--weed/shell/command_volume_check_disk.go13
-rw-r--r--weed/shell/command_volume_fsck.go5
5 files changed, 86 insertions, 44 deletions
diff --git a/weed/filer/filer_deletion.go b/weed/filer/filer_deletion.go
index 6d22be600..b3a4296ba 100644
--- a/weed/filer/filer_deletion.go
+++ b/weed/filer/filer_deletion.go
@@ -2,6 +2,7 @@ package filer
import (
"context"
+ "fmt"
"strings"
"time"
@@ -56,13 +57,38 @@ func (f *Filer) loopProcessingDeletion() {
fileIds = fileIds[:0]
}
deletionCount = len(toDeleteFileIds)
- _, err := operation.DeleteFileIdsWithLookupVolumeId(f.GrpcDialOption, toDeleteFileIds, lookupFunc)
- if err != nil {
- if !strings.Contains(err.Error(), storage.ErrorDeleted.Error()) {
- glog.V(0).Infof("deleting fileIds len=%d error: %v", deletionCount, err)
+ results := operation.DeleteFileIdsWithLookupVolumeId(f.GrpcDialOption, toDeleteFileIds, lookupFunc)
+
+ // Process individual results for better error tracking
+ var successCount, notFoundCount, errorCount int
+ var errorDetails []string
+
+ for _, result := range results {
+ if result.Error == "" {
+ successCount++
+ } else if result.Error == "not found" || strings.Contains(result.Error, storage.ErrorDeleted.Error()) {
+ // Already deleted - acceptable
+ notFoundCount++
+ } else {
+ // Actual error
+ errorCount++
+ if errorCount <= 10 {
+ // Only log first 10 errors to avoid flooding logs
+ errorDetails = append(errorDetails, result.FileId+": "+result.Error)
+ }
}
- } else {
- glog.V(2).Infof("deleting fileIds %+v", toDeleteFileIds)
+ }
+
+ if successCount > 0 || notFoundCount > 0 {
+ glog.V(2).Infof("deleted %d files successfully, %d already deleted (not found)", successCount, notFoundCount)
+ }
+
+ if errorCount > 0 {
+ logMessage := fmt.Sprintf("failed to delete %d/%d files", errorCount, len(toDeleteFileIds))
+ if errorCount > 10 {
+ logMessage += " (showing first 10)"
+ }
+ glog.V(0).Infof("%s: %v", logMessage, strings.Join(errorDetails, "; "))
}
}
})
diff --git a/weed/operation/chunked_file.go b/weed/operation/chunked_file.go
index b0c6c651f..1fedb74bc 100644
--- a/weed/operation/chunked_file.go
+++ b/weed/operation/chunked_file.go
@@ -80,11 +80,9 @@ func (cm *ChunkManifest) DeleteChunks(masterFn GetMasterFn, usePublicUrl bool, g
for _, ci := range cm.Chunks {
fileIds = append(fileIds, ci.Fid)
}
- results, err := DeleteFileIds(masterFn, usePublicUrl, grpcDialOption, fileIds)
- if err != nil {
- glog.V(0).Infof("delete %+v: %v", fileIds, err)
- return fmt.Errorf("chunk delete: %w", err)
- }
+ results := DeleteFileIds(masterFn, usePublicUrl, grpcDialOption, fileIds)
+
+ // Check for any errors in results
for _, result := range results {
if result.Error != "" {
glog.V(0).Infof("delete file %+v: %v", result.FileId, result.Error)
diff --git a/weed/operation/delete_content.go b/weed/operation/delete_content.go
index 419223165..5028fbf48 100644
--- a/weed/operation/delete_content.go
+++ b/weed/operation/delete_content.go
@@ -4,12 +4,13 @@ import (
"context"
"errors"
"fmt"
- "github.com/seaweedfs/seaweedfs/weed/pb"
- "google.golang.org/grpc"
"net/http"
"strings"
"sync"
+ "github.com/seaweedfs/seaweedfs/weed/pb"
+ "google.golang.org/grpc"
+
"github.com/seaweedfs/seaweedfs/weed/pb/volume_server_pb"
)
@@ -29,7 +30,8 @@ func ParseFileId(fid string) (vid string, key_cookie string, err error) {
}
// DeleteFileIds batch deletes a list of fileIds
-func DeleteFileIds(masterFn GetMasterFn, usePublicUrl bool, grpcDialOption grpc.DialOption, fileIds []string) ([]*volume_server_pb.DeleteResult, error) {
+// Returns individual results for each file ID. Check result.Error for per-file failures.
+func DeleteFileIds(masterFn GetMasterFn, usePublicUrl bool, grpcDialOption grpc.DialOption, fileIds []string) []*volume_server_pb.DeleteResult {
lookupFunc := func(vids []string) (results map[string]*LookupResult, err error) {
results, err = LookupVolumeIds(masterFn, grpcDialOption, vids)
@@ -47,7 +49,7 @@ func DeleteFileIds(masterFn GetMasterFn, usePublicUrl bool, grpcDialOption grpc.
}
-func DeleteFileIdsWithLookupVolumeId(grpcDialOption grpc.DialOption, fileIds []string, lookupFunc func(vid []string) (map[string]*LookupResult, error)) ([]*volume_server_pb.DeleteResult, error) {
+func DeleteFileIdsWithLookupVolumeId(grpcDialOption grpc.DialOption, fileIds []string, lookupFunc func(vid []string) (map[string]*LookupResult, error)) []*volume_server_pb.DeleteResult {
var ret []*volume_server_pb.DeleteResult
@@ -72,17 +74,30 @@ func DeleteFileIdsWithLookupVolumeId(grpcDialOption grpc.DialOption, fileIds []s
lookupResults, err := lookupFunc(vids)
if err != nil {
- return ret, err
+ // Lookup failed - return error results for all file IDs that passed parsing
+ for _, fids := range vid_to_fileIds {
+ for _, fileId := range fids {
+ ret = append(ret, &volume_server_pb.DeleteResult{
+ FileId: fileId,
+ Status: http.StatusInternalServerError,
+ Error: fmt.Sprintf("lookup error: %v", err),
+ })
+ }
+ }
+ return ret
}
server_to_fileIds := make(map[pb.ServerAddress][]string)
for vid, result := range lookupResults {
if result.Error != "" {
- ret = append(ret, &volume_server_pb.DeleteResult{
- FileId: vid,
- Status: http.StatusBadRequest,
- Error: result.Error},
- )
+ // Lookup error for this volume - mark all its files as failed
+ for _, fileId := range vid_to_fileIds[vid] {
+ ret = append(ret, &volume_server_pb.DeleteResult{
+ FileId: fileId,
+ Status: http.StatusBadRequest,
+ Error: result.Error},
+ )
+ }
continue
}
for _, location := range result.Locations {
@@ -102,11 +117,7 @@ func DeleteFileIdsWithLookupVolumeId(grpcDialOption grpc.DialOption, fileIds []s
go func(server pb.ServerAddress, fidList []string) {
defer wg.Done()
- if deleteResults, deleteErr := DeleteFileIdsAtOneVolumeServer(server, grpcDialOption, fidList, false); deleteErr != nil {
- err = deleteErr
- } else if deleteResults != nil {
- resultChan <- deleteResults
- }
+ resultChan <- DeleteFileIdsAtOneVolumeServer(server, grpcDialOption, fidList, false)
}(server, fidList)
}
@@ -117,13 +128,16 @@ func DeleteFileIdsWithLookupVolumeId(grpcDialOption grpc.DialOption, fileIds []s
ret = append(ret, result...)
}
- return ret, err
+ return ret
}
// DeleteFileIdsAtOneVolumeServer deletes a list of files that is on one volume server via gRpc
-func DeleteFileIdsAtOneVolumeServer(volumeServer pb.ServerAddress, grpcDialOption grpc.DialOption, fileIds []string, includeCookie bool) (ret []*volume_server_pb.DeleteResult, err error) {
+// Returns individual results for each file ID. Check result.Error for per-file failures.
+func DeleteFileIdsAtOneVolumeServer(volumeServer pb.ServerAddress, grpcDialOption grpc.DialOption, fileIds []string, includeCookie bool) []*volume_server_pb.DeleteResult {
- err = WithVolumeServerClient(false, volumeServer, grpcDialOption, func(volumeServerClient volume_server_pb.VolumeServerClient) error {
+ var ret []*volume_server_pb.DeleteResult
+
+ err := WithVolumeServerClient(false, volumeServer, grpcDialOption, func(volumeServerClient volume_server_pb.VolumeServerClient) error {
req := &volume_server_pb.BatchDeleteRequest{
FileIds: fileIds,
@@ -144,15 +158,17 @@ func DeleteFileIdsAtOneVolumeServer(volumeServer pb.ServerAddress, grpcDialOptio
})
if err != nil {
- return
- }
-
- for _, result := range ret {
- if result.Error != "" && result.Error != "not found" {
- return nil, fmt.Errorf("delete fileId %s: %v", result.FileId, result.Error)
+ // Connection or communication error - return error results for all files
+ ret = make([]*volume_server_pb.DeleteResult, 0, len(fileIds))
+ for _, fileId := range fileIds {
+ ret = append(ret, &volume_server_pb.DeleteResult{
+ FileId: fileId,
+ Status: http.StatusInternalServerError,
+ Error: err.Error(),
+ })
}
}
- return
+ return ret
}
diff --git a/weed/shell/command_volume_check_disk.go b/weed/shell/command_volume_check_disk.go
index fbad37f02..a8cc72d4d 100644
--- a/weed/shell/command_volume_check_disk.go
+++ b/weed/shell/command_volume_check_disk.go
@@ -11,6 +11,8 @@ import (
"sync"
"time"
+ "slices"
+
"github.com/seaweedfs/seaweedfs/weed/operation"
"github.com/seaweedfs/seaweedfs/weed/pb"
"github.com/seaweedfs/seaweedfs/weed/pb/master_pb"
@@ -18,7 +20,6 @@ import (
"github.com/seaweedfs/seaweedfs/weed/server/constants"
"github.com/seaweedfs/seaweedfs/weed/storage/needle_map"
"google.golang.org/grpc"
- "slices"
)
func init() {
@@ -321,13 +322,15 @@ func doVolumeCheckDisk(minuend, subtrahend *needle_map.MemDb, source, target *Vo
fmt.Fprintf(writer, "delete %s %s => %s\n", needleValue.Key.FileId(source.info.Id), source.location.dataNode.Id, target.location.dataNode.Id)
}
}
- deleteResults, deleteErr := operation.DeleteFileIdsAtOneVolumeServer(
+ deleteResults := operation.DeleteFileIdsAtOneVolumeServer(
pb.NewServerAddressFromDataNode(target.location.dataNode),
grpcDialOption, fidList, false)
- if deleteErr != nil {
- return hasChanges, deleteErr
- }
+
+ // Check for errors in results
for _, deleteResult := range deleteResults {
+ if deleteResult.Error != "" && deleteResult.Error != "not found" {
+ return hasChanges, fmt.Errorf("delete file %s: %v", deleteResult.FileId, deleteResult.Error)
+ }
if deleteResult.Status == http.StatusAccepted && deleteResult.Size > 0 {
hasChanges = true
}
diff --git a/weed/shell/command_volume_fsck.go b/weed/shell/command_volume_fsck.go
index a8ccebf84..878109ecb 100644
--- a/weed/shell/command_volume_fsck.go
+++ b/weed/shell/command_volume_fsck.go
@@ -702,9 +702,8 @@ func (c *commandVolumeFsck) purgeFileIdsForOneVolume(volumeId uint32, fileIds []
go func(server pb.ServerAddress, fidList []string) {
defer wg.Done()
- if deleteResults, deleteErr := operation.DeleteFileIdsAtOneVolumeServer(server, c.env.option.GrpcDialOption, fidList, false); deleteErr != nil {
- err = deleteErr
- } else if deleteResults != nil {
+ deleteResults := operation.DeleteFileIdsAtOneVolumeServer(server, c.env.option.GrpcDialOption, fidList, false)
+ if deleteResults != nil {
resultChan <- deleteResults
}