aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--weed/filer/filer_deletion.go38
-rw-r--r--weed/operation/chunked_file.go8
-rw-r--r--weed/operation/delete_content.go66
-rw-r--r--weed/shell/command_volume_check_disk.go13
-rw-r--r--weed/shell/command_volume_fsck.go5
5 files changed, 86 insertions, 44 deletions
diff --git a/weed/filer/filer_deletion.go b/weed/filer/filer_deletion.go
index 6d22be600..b3a4296ba 100644
--- a/weed/filer/filer_deletion.go
+++ b/weed/filer/filer_deletion.go
@@ -2,6 +2,7 @@ package filer
import (
"context"
+ "fmt"
"strings"
"time"
@@ -56,13 +57,38 @@ func (f *Filer) loopProcessingDeletion() {
fileIds = fileIds[:0]
}
deletionCount = len(toDeleteFileIds)
- _, err := operation.DeleteFileIdsWithLookupVolumeId(f.GrpcDialOption, toDeleteFileIds, lookupFunc)
- if err != nil {
- if !strings.Contains(err.Error(), storage.ErrorDeleted.Error()) {
- glog.V(0).Infof("deleting fileIds len=%d error: %v", deletionCount, err)
+ results := operation.DeleteFileIdsWithLookupVolumeId(f.GrpcDialOption, toDeleteFileIds, lookupFunc)
+
+ // Process individual results for better error tracking
+ var successCount, notFoundCount, errorCount int
+ var errorDetails []string
+
+ for _, result := range results {
+ if result.Error == "" {
+ successCount++
+ } else if result.Error == "not found" || strings.Contains(result.Error, storage.ErrorDeleted.Error()) {
+ // Already deleted - acceptable
+ notFoundCount++
+ } else {
+ // Actual error
+ errorCount++
+ if errorCount <= 10 {
+ // Only log first 10 errors to avoid flooding logs
+ errorDetails = append(errorDetails, result.FileId+": "+result.Error)
+ }
}
- } else {
- glog.V(2).Infof("deleting fileIds %+v", toDeleteFileIds)
+ }
+
+ if successCount > 0 || notFoundCount > 0 {
+ glog.V(2).Infof("deleted %d files successfully, %d already deleted (not found)", successCount, notFoundCount)
+ }
+
+ if errorCount > 0 {
+ logMessage := fmt.Sprintf("failed to delete %d/%d files", errorCount, len(toDeleteFileIds))
+ if errorCount > 10 {
+ logMessage += " (showing first 10)"
+ }
+ glog.V(0).Infof("%s: %v", logMessage, strings.Join(errorDetails, "; "))
}
}
})
diff --git a/weed/operation/chunked_file.go b/weed/operation/chunked_file.go
index b0c6c651f..1fedb74bc 100644
--- a/weed/operation/chunked_file.go
+++ b/weed/operation/chunked_file.go
@@ -80,11 +80,9 @@ func (cm *ChunkManifest) DeleteChunks(masterFn GetMasterFn, usePublicUrl bool, g
for _, ci := range cm.Chunks {
fileIds = append(fileIds, ci.Fid)
}
- results, err := DeleteFileIds(masterFn, usePublicUrl, grpcDialOption, fileIds)
- if err != nil {
- glog.V(0).Infof("delete %+v: %v", fileIds, err)
- return fmt.Errorf("chunk delete: %w", err)
- }
+ results := DeleteFileIds(masterFn, usePublicUrl, grpcDialOption, fileIds)
+
+ // Check for any errors in results
for _, result := range results {
if result.Error != "" {
glog.V(0).Infof("delete file %+v: %v", result.FileId, result.Error)
diff --git a/weed/operation/delete_content.go b/weed/operation/delete_content.go
index 419223165..5028fbf48 100644
--- a/weed/operation/delete_content.go
+++ b/weed/operation/delete_content.go
@@ -4,12 +4,13 @@ import (
"context"
"errors"
"fmt"
- "github.com/seaweedfs/seaweedfs/weed/pb"
- "google.golang.org/grpc"
"net/http"
"strings"
"sync"
+ "github.com/seaweedfs/seaweedfs/weed/pb"
+ "google.golang.org/grpc"
+
"github.com/seaweedfs/seaweedfs/weed/pb/volume_server_pb"
)
@@ -29,7 +30,8 @@ func ParseFileId(fid string) (vid string, key_cookie string, err error) {
}
// DeleteFileIds batch deletes a list of fileIds
-func DeleteFileIds(masterFn GetMasterFn, usePublicUrl bool, grpcDialOption grpc.DialOption, fileIds []string) ([]*volume_server_pb.DeleteResult, error) {
+// Returns individual results for each file ID. Check result.Error for per-file failures.
+func DeleteFileIds(masterFn GetMasterFn, usePublicUrl bool, grpcDialOption grpc.DialOption, fileIds []string) []*volume_server_pb.DeleteResult {
lookupFunc := func(vids []string) (results map[string]*LookupResult, err error) {
results, err = LookupVolumeIds(masterFn, grpcDialOption, vids)
@@ -47,7 +49,7 @@ func DeleteFileIds(masterFn GetMasterFn, usePublicUrl bool, grpcDialOption grpc.
}
-func DeleteFileIdsWithLookupVolumeId(grpcDialOption grpc.DialOption, fileIds []string, lookupFunc func(vid []string) (map[string]*LookupResult, error)) ([]*volume_server_pb.DeleteResult, error) {
+func DeleteFileIdsWithLookupVolumeId(grpcDialOption grpc.DialOption, fileIds []string, lookupFunc func(vid []string) (map[string]*LookupResult, error)) []*volume_server_pb.DeleteResult {
var ret []*volume_server_pb.DeleteResult
@@ -72,17 +74,30 @@ func DeleteFileIdsWithLookupVolumeId(grpcDialOption grpc.DialOption, fileIds []s
lookupResults, err := lookupFunc(vids)
if err != nil {
- return ret, err
+ // Lookup failed - return error results for all file IDs that passed parsing
+ for _, fids := range vid_to_fileIds {
+ for _, fileId := range fids {
+ ret = append(ret, &volume_server_pb.DeleteResult{
+ FileId: fileId,
+ Status: http.StatusInternalServerError,
+ Error: fmt.Sprintf("lookup error: %v", err),
+ })
+ }
+ }
+ return ret
}
server_to_fileIds := make(map[pb.ServerAddress][]string)
for vid, result := range lookupResults {
if result.Error != "" {
- ret = append(ret, &volume_server_pb.DeleteResult{
- FileId: vid,
- Status: http.StatusBadRequest,
- Error: result.Error},
- )
+ // Lookup error for this volume - mark all its files as failed
+ for _, fileId := range vid_to_fileIds[vid] {
+ ret = append(ret, &volume_server_pb.DeleteResult{
+ FileId: fileId,
+ Status: http.StatusBadRequest,
+ Error: result.Error},
+ )
+ }
continue
}
for _, location := range result.Locations {
@@ -102,11 +117,7 @@ func DeleteFileIdsWithLookupVolumeId(grpcDialOption grpc.DialOption, fileIds []s
go func(server pb.ServerAddress, fidList []string) {
defer wg.Done()
- if deleteResults, deleteErr := DeleteFileIdsAtOneVolumeServer(server, grpcDialOption, fidList, false); deleteErr != nil {
- err = deleteErr
- } else if deleteResults != nil {
- resultChan <- deleteResults
- }
+ resultChan <- DeleteFileIdsAtOneVolumeServer(server, grpcDialOption, fidList, false)
}(server, fidList)
}
@@ -117,13 +128,16 @@ func DeleteFileIdsWithLookupVolumeId(grpcDialOption grpc.DialOption, fileIds []s
ret = append(ret, result...)
}
- return ret, err
+ return ret
}
// DeleteFileIdsAtOneVolumeServer deletes a list of files that is on one volume server via gRpc
-func DeleteFileIdsAtOneVolumeServer(volumeServer pb.ServerAddress, grpcDialOption grpc.DialOption, fileIds []string, includeCookie bool) (ret []*volume_server_pb.DeleteResult, err error) {
+// Returns individual results for each file ID. Check result.Error for per-file failures.
+func DeleteFileIdsAtOneVolumeServer(volumeServer pb.ServerAddress, grpcDialOption grpc.DialOption, fileIds []string, includeCookie bool) []*volume_server_pb.DeleteResult {
- err = WithVolumeServerClient(false, volumeServer, grpcDialOption, func(volumeServerClient volume_server_pb.VolumeServerClient) error {
+ var ret []*volume_server_pb.DeleteResult
+
+ err := WithVolumeServerClient(false, volumeServer, grpcDialOption, func(volumeServerClient volume_server_pb.VolumeServerClient) error {
req := &volume_server_pb.BatchDeleteRequest{
FileIds: fileIds,
@@ -144,15 +158,17 @@ func DeleteFileIdsAtOneVolumeServer(volumeServer pb.ServerAddress, grpcDialOptio
})
if err != nil {
- return
- }
-
- for _, result := range ret {
- if result.Error != "" && result.Error != "not found" {
- return nil, fmt.Errorf("delete fileId %s: %v", result.FileId, result.Error)
+ // Connection or communication error - return error results for all files
+ ret = make([]*volume_server_pb.DeleteResult, 0, len(fileIds))
+ for _, fileId := range fileIds {
+ ret = append(ret, &volume_server_pb.DeleteResult{
+ FileId: fileId,
+ Status: http.StatusInternalServerError,
+ Error: err.Error(),
+ })
}
}
- return
+ return ret
}
diff --git a/weed/shell/command_volume_check_disk.go b/weed/shell/command_volume_check_disk.go
index fbad37f02..a8cc72d4d 100644
--- a/weed/shell/command_volume_check_disk.go
+++ b/weed/shell/command_volume_check_disk.go
@@ -11,6 +11,8 @@ import (
"sync"
"time"
+ "slices"
+
"github.com/seaweedfs/seaweedfs/weed/operation"
"github.com/seaweedfs/seaweedfs/weed/pb"
"github.com/seaweedfs/seaweedfs/weed/pb/master_pb"
@@ -18,7 +20,6 @@ import (
"github.com/seaweedfs/seaweedfs/weed/server/constants"
"github.com/seaweedfs/seaweedfs/weed/storage/needle_map"
"google.golang.org/grpc"
- "slices"
)
func init() {
@@ -321,13 +322,15 @@ func doVolumeCheckDisk(minuend, subtrahend *needle_map.MemDb, source, target *Vo
fmt.Fprintf(writer, "delete %s %s => %s\n", needleValue.Key.FileId(source.info.Id), source.location.dataNode.Id, target.location.dataNode.Id)
}
}
- deleteResults, deleteErr := operation.DeleteFileIdsAtOneVolumeServer(
+ deleteResults := operation.DeleteFileIdsAtOneVolumeServer(
pb.NewServerAddressFromDataNode(target.location.dataNode),
grpcDialOption, fidList, false)
- if deleteErr != nil {
- return hasChanges, deleteErr
- }
+
+ // Check for errors in results
for _, deleteResult := range deleteResults {
+ if deleteResult.Error != "" && deleteResult.Error != "not found" {
+ return hasChanges, fmt.Errorf("delete file %s: %v", deleteResult.FileId, deleteResult.Error)
+ }
if deleteResult.Status == http.StatusAccepted && deleteResult.Size > 0 {
hasChanges = true
}
diff --git a/weed/shell/command_volume_fsck.go b/weed/shell/command_volume_fsck.go
index a8ccebf84..878109ecb 100644
--- a/weed/shell/command_volume_fsck.go
+++ b/weed/shell/command_volume_fsck.go
@@ -702,9 +702,8 @@ func (c *commandVolumeFsck) purgeFileIdsForOneVolume(volumeId uint32, fileIds []
go func(server pb.ServerAddress, fidList []string) {
defer wg.Done()
- if deleteResults, deleteErr := operation.DeleteFileIdsAtOneVolumeServer(server, c.env.option.GrpcDialOption, fidList, false); deleteErr != nil {
- err = deleteErr
- } else if deleteResults != nil {
+ deleteResults := operation.DeleteFileIdsAtOneVolumeServer(server, c.env.option.GrpcDialOption, fidList, false)
+ if deleteResults != nil {
resultChan <- deleteResults
}