diff options
Diffstat (limited to 'weed/operation/delete_content.go')
| -rw-r--r-- | weed/operation/delete_content.go | 37 |
1 files changed, 21 insertions, 16 deletions
diff --git a/weed/operation/delete_content.go b/weed/operation/delete_content.go index 3e468e1a3..6d84be76f 100644 --- a/weed/operation/delete_content.go +++ b/weed/operation/delete_content.go @@ -4,12 +4,12 @@ import ( "context" "errors" "fmt" + "github.com/chrislusf/seaweedfs/weed/glog" + "github.com/chrislusf/seaweedfs/weed/pb/volume_server_pb" + "google.golang.org/grpc" "net/http" "strings" "sync" - "time" - - "github.com/chrislusf/seaweedfs/weed/pb/volume_server_pb" ) type DeleteResult struct { @@ -28,17 +28,17 @@ func ParseFileId(fid string) (vid string, key_cookie string, err error) { } // DeleteFiles batch deletes a list of fileIds -func DeleteFiles(master string, fileIds []string) ([]*volume_server_pb.DeleteResult, error) { +func DeleteFiles(master string, grpcDialOption grpc.DialOption, fileIds []string) ([]*volume_server_pb.DeleteResult, error) { lookupFunc := func(vids []string) (map[string]LookupResult, error) { - return LookupVolumeIds(master, vids) + return LookupVolumeIds(master, grpcDialOption, vids) } - return DeleteFilesWithLookupVolumeId(fileIds, lookupFunc) + return DeleteFilesWithLookupVolumeId(grpcDialOption, fileIds, lookupFunc) } -func DeleteFilesWithLookupVolumeId(fileIds []string, lookupFunc func(vid []string) (map[string]LookupResult, error)) ([]*volume_server_pb.DeleteResult, error) { +func DeleteFilesWithLookupVolumeId(grpcDialOption grpc.DialOption, fileIds []string, lookupFunc func(vid []string) (map[string]LookupResult, error)) ([]*volume_server_pb.DeleteResult, error) { var ret []*volume_server_pb.DeleteResult @@ -48,7 +48,7 @@ func DeleteFilesWithLookupVolumeId(fileIds []string, lookupFunc func(vid []strin vid, _, err := ParseFileId(fileId) if err != nil { ret = append(ret, &volume_server_pb.DeleteResult{ - FileId: vid, + FileId: fileId, Status: http.StatusBadRequest, Error: err.Error()}, ) @@ -85,38 +85,43 @@ func DeleteFilesWithLookupVolumeId(fileIds []string, lookupFunc func(vid []strin } } + resultChan := make(chan []*volume_server_pb.DeleteResult, len(server_to_fileIds)) var wg sync.WaitGroup - for server, fidList := range server_to_fileIds { wg.Add(1) go func(server string, fidList []string) { defer wg.Done() - if deleteResults, deleteErr := DeleteFilesAtOneVolumeServer(server, fidList); deleteErr != nil { + if deleteResults, deleteErr := DeleteFilesAtOneVolumeServer(server, grpcDialOption, fidList); deleteErr != nil { err = deleteErr } else { - ret = append(ret, deleteResults...) + resultChan <- deleteResults } }(server, fidList) } wg.Wait() + close(resultChan) + + for result := range resultChan { + ret = append(ret, result...) + } + + glog.V(0).Infof("deleted %d items", len(ret)) return ret, err } // DeleteFilesAtOneVolumeServer deletes a list of files that is on one volume server via gRpc -func DeleteFilesAtOneVolumeServer(volumeServer string, fileIds []string) (ret []*volume_server_pb.DeleteResult, err error) { +func DeleteFilesAtOneVolumeServer(volumeServer string, grpcDialOption grpc.DialOption, fileIds []string) (ret []*volume_server_pb.DeleteResult, err error) { - err = WithVolumeServerClient(volumeServer, func(volumeServerClient volume_server_pb.VolumeServerClient) error { - ctx, cancel := context.WithTimeout(context.Background(), time.Duration(5*time.Second)) - defer cancel() + err = WithVolumeServerClient(volumeServer, grpcDialOption, func(volumeServerClient volume_server_pb.VolumeServerClient) error { req := &volume_server_pb.BatchDeleteRequest{ FileIds: fileIds, } - resp, err := volumeServerClient.BatchDelete(ctx, req) + resp, err := volumeServerClient.BatchDelete(context.Background(), req) // fmt.Printf("deleted %v %v: %v\n", fileIds, err, resp) |
