aboutsummaryrefslogtreecommitdiff
path: root/weed/operation/delete_content.go
diff options
context:
space:
mode:
Diffstat (limited to 'weed/operation/delete_content.go')
-rw-r--r--weed/operation/delete_content.go66
1 files changed, 41 insertions, 25 deletions
diff --git a/weed/operation/delete_content.go b/weed/operation/delete_content.go
index 419223165..5028fbf48 100644
--- a/weed/operation/delete_content.go
+++ b/weed/operation/delete_content.go
@@ -4,12 +4,13 @@ import (
"context"
"errors"
"fmt"
- "github.com/seaweedfs/seaweedfs/weed/pb"
- "google.golang.org/grpc"
"net/http"
"strings"
"sync"
+ "github.com/seaweedfs/seaweedfs/weed/pb"
+ "google.golang.org/grpc"
+
"github.com/seaweedfs/seaweedfs/weed/pb/volume_server_pb"
)
@@ -29,7 +30,8 @@ func ParseFileId(fid string) (vid string, key_cookie string, err error) {
}
// DeleteFileIds batch deletes a list of fileIds
-func DeleteFileIds(masterFn GetMasterFn, usePublicUrl bool, grpcDialOption grpc.DialOption, fileIds []string) ([]*volume_server_pb.DeleteResult, error) {
+// Returns individual results for each file ID. Check result.Error for per-file failures.
+func DeleteFileIds(masterFn GetMasterFn, usePublicUrl bool, grpcDialOption grpc.DialOption, fileIds []string) []*volume_server_pb.DeleteResult {
lookupFunc := func(vids []string) (results map[string]*LookupResult, err error) {
results, err = LookupVolumeIds(masterFn, grpcDialOption, vids)
@@ -47,7 +49,7 @@ func DeleteFileIds(masterFn GetMasterFn, usePublicUrl bool, grpcDialOption grpc.
}
-func DeleteFileIdsWithLookupVolumeId(grpcDialOption grpc.DialOption, fileIds []string, lookupFunc func(vid []string) (map[string]*LookupResult, error)) ([]*volume_server_pb.DeleteResult, error) {
+func DeleteFileIdsWithLookupVolumeId(grpcDialOption grpc.DialOption, fileIds []string, lookupFunc func(vid []string) (map[string]*LookupResult, error)) []*volume_server_pb.DeleteResult {
var ret []*volume_server_pb.DeleteResult
@@ -72,17 +74,30 @@ func DeleteFileIdsWithLookupVolumeId(grpcDialOption grpc.DialOption, fileIds []s
lookupResults, err := lookupFunc(vids)
if err != nil {
- return ret, err
+ // Lookup failed - return error results for all file IDs that passed parsing
+ for _, fids := range vid_to_fileIds {
+ for _, fileId := range fids {
+ ret = append(ret, &volume_server_pb.DeleteResult{
+ FileId: fileId,
+ Status: http.StatusInternalServerError,
+ Error: fmt.Sprintf("lookup error: %v", err),
+ })
+ }
+ }
+ return ret
}
server_to_fileIds := make(map[pb.ServerAddress][]string)
for vid, result := range lookupResults {
if result.Error != "" {
- ret = append(ret, &volume_server_pb.DeleteResult{
- FileId: vid,
- Status: http.StatusBadRequest,
- Error: result.Error},
- )
+ // Lookup error for this volume - mark all its files as failed
+ for _, fileId := range vid_to_fileIds[vid] {
+ ret = append(ret, &volume_server_pb.DeleteResult{
+ FileId: fileId,
+ Status: http.StatusBadRequest,
+ Error: result.Error},
+ )
+ }
continue
}
for _, location := range result.Locations {
@@ -102,11 +117,7 @@ func DeleteFileIdsWithLookupVolumeId(grpcDialOption grpc.DialOption, fileIds []s
go func(server pb.ServerAddress, fidList []string) {
defer wg.Done()
- if deleteResults, deleteErr := DeleteFileIdsAtOneVolumeServer(server, grpcDialOption, fidList, false); deleteErr != nil {
- err = deleteErr
- } else if deleteResults != nil {
- resultChan <- deleteResults
- }
+ resultChan <- DeleteFileIdsAtOneVolumeServer(server, grpcDialOption, fidList, false)
}(server, fidList)
}
@@ -117,13 +128,16 @@ func DeleteFileIdsWithLookupVolumeId(grpcDialOption grpc.DialOption, fileIds []s
ret = append(ret, result...)
}
- return ret, err
+ return ret
}
// DeleteFileIdsAtOneVolumeServer deletes a list of files that is on one volume server via gRpc
-func DeleteFileIdsAtOneVolumeServer(volumeServer pb.ServerAddress, grpcDialOption grpc.DialOption, fileIds []string, includeCookie bool) (ret []*volume_server_pb.DeleteResult, err error) {
+// Returns individual results for each file ID. Check result.Error for per-file failures.
+func DeleteFileIdsAtOneVolumeServer(volumeServer pb.ServerAddress, grpcDialOption grpc.DialOption, fileIds []string, includeCookie bool) []*volume_server_pb.DeleteResult {
- err = WithVolumeServerClient(false, volumeServer, grpcDialOption, func(volumeServerClient volume_server_pb.VolumeServerClient) error {
+ var ret []*volume_server_pb.DeleteResult
+
+ err := WithVolumeServerClient(false, volumeServer, grpcDialOption, func(volumeServerClient volume_server_pb.VolumeServerClient) error {
req := &volume_server_pb.BatchDeleteRequest{
FileIds: fileIds,
@@ -144,15 +158,17 @@ func DeleteFileIdsAtOneVolumeServer(volumeServer pb.ServerAddress, grpcDialOptio
})
if err != nil {
- return
- }
-
- for _, result := range ret {
- if result.Error != "" && result.Error != "not found" {
- return nil, fmt.Errorf("delete fileId %s: %v", result.FileId, result.Error)
+ // Connection or communication error - return error results for all files
+ ret = make([]*volume_server_pb.DeleteResult, 0, len(fileIds))
+ for _, fileId := range fileIds {
+ ret = append(ret, &volume_server_pb.DeleteResult{
+ FileId: fileId,
+ Status: http.StatusInternalServerError,
+ Error: err.Error(),
+ })
}
}
- return
+ return ret
}