diff options
Diffstat (limited to 'weed/operation/delete_content.go')
| -rw-r--r-- | weed/operation/delete_content.go | 109 |
1 files changed, 57 insertions, 52 deletions
diff --git a/weed/operation/delete_content.go b/weed/operation/delete_content.go index b77718846..2e414873f 100644 --- a/weed/operation/delete_content.go +++ b/weed/operation/delete_content.go @@ -1,18 +1,15 @@ package operation import ( - "encoding/json" "errors" - "fmt" - "net/url" "strings" "sync" "net/http" - "github.com/chrislusf/seaweedfs/weed/glog" - "github.com/chrislusf/seaweedfs/weed/security" - "github.com/chrislusf/seaweedfs/weed/util" + "github.com/chrislusf/seaweedfs/weed/pb/volume_server_pb" + "context" + "fmt" ) type DeleteResult struct { @@ -22,27 +19,6 @@ type DeleteResult struct { Error string `json:"error,omitempty"` } -func DeleteFromVolumeServer(fileUrlOnVolume string, jwt security.EncodedJwt) error { - err := util.Delete(fileUrlOnVolume, jwt) - if err != nil { - return fmt.Errorf("Failed to delete %s:%v", fileUrlOnVolume, err) - } - return nil -} - -func DeleteFile(master string, fileId string, jwt security.EncodedJwt) error { - fileUrl, err := LookupFileId(master, fileId) - if err != nil { - glog.V(0).Infof("Delete %s lookup: %v, master: %s", fileId, err, master) - return nil - } - err = util.Delete(fileUrl, jwt) - if err != nil { - return fmt.Errorf("Failed to delete %s:%v", fileUrl, err) - } - return nil -} - func ParseFileId(fid string) (vid string, key_cookie string, err error) { commaIndex := strings.Index(fid, ",") if commaIndex <= 0 { @@ -51,20 +27,18 @@ func ParseFileId(fid string) (vid string, key_cookie string, err error) { return fid[:commaIndex], fid[commaIndex+1:], nil } -type DeleteFilesResult struct { - Errors []string - Results []DeleteResult -} +// DeleteFiles batch deletes a list of fileIds +func DeleteFiles(master string, fileIds []string) ([]*volume_server_pb.DeleteResult, error) { + + var ret []*volume_server_pb.DeleteResult -func DeleteFiles(master string, fileIds []string) (*DeleteFilesResult, error) { vid_to_fileIds := make(map[string][]string) - ret := &DeleteFilesResult{} var vids []string for _, fileId := range fileIds { vid, _, err := ParseFileId(fileId) if err != nil { - ret.Results = append(ret.Results, DeleteResult{ - Fid: vid, + ret = append(ret, &volume_server_pb.DeleteResult{ + FileId: vid, Status: http.StatusBadRequest, Error: err.Error()}, ) @@ -85,7 +59,11 @@ func DeleteFiles(master string, fileIds []string) (*DeleteFilesResult, error) { server_to_fileIds := make(map[string][]string) for vid, result := range lookupResults { if result.Error != "" { - ret.Errors = append(ret.Errors, result.Error) + ret = append(ret, &volume_server_pb.DeleteResult{ + FileId: vid, + Status: http.StatusBadRequest, + Error: err.Error()}, + ) continue } for _, location := range result.Locations { @@ -103,25 +81,52 @@ func DeleteFiles(master string, fileIds []string) (*DeleteFilesResult, error) { wg.Add(1) go func(server string, fidList []string) { defer wg.Done() - values := make(url.Values) - for _, fid := range fidList { - values.Add("fid", fid) - } - jsonBlob, err := util.Post("http://"+server+"/delete", values) - if err != nil { - ret.Errors = append(ret.Errors, err.Error()+" "+string(jsonBlob)) - return - } - var result []DeleteResult - err = json.Unmarshal(jsonBlob, &result) - if err != nil { - ret.Errors = append(ret.Errors, err.Error()+" "+string(jsonBlob)) - return + + if deleteResults, deleteErr := DeleteFilesAtOneVolumeServer(server, fidList); deleteErr != nil { + err = deleteErr + } else { + ret = append(ret, deleteResults...) } - ret.Results = append(ret.Results, result...) + }(server, fidList) } wg.Wait() - return ret, nil + return ret, err +} + +// DeleteFilesAtOneVolumeServer deletes a list of files that is on one volume server via gRpc +func DeleteFilesAtOneVolumeServer(volumeServer string, fileIds []string) (ret []*volume_server_pb.DeleteResult, err error) { + + err = withVolumeServerClient(volumeServer, func(volumeServerClient volume_server_pb.VolumeServerClient) error { + + req := &volume_server_pb.BatchDeleteRequest{ + FileIds: fileIds, + } + + resp, err := volumeServerClient.BatchDelete(context.Background(), req) + + fmt.Printf("deleted %v %v: %v\n", fileIds, err, resp) + + if err != nil { + return err + } + + ret = append(ret, resp.Results...) + + return nil + }) + + if err != nil { + return + } + + for _, result := range ret { + if result.Error != "" { + return nil, fmt.Errorf("delete fileId %s: %v", result.FileId, result.Error) + } + } + + return + } |
