diff options
| author | Chris Lu <chris.lu@gmail.com> | 2018-10-14 00:12:28 -0700 |
|---|---|---|
| committer | Chris Lu <chris.lu@gmail.com> | 2018-10-14 00:12:28 -0700 |
| commit | ff66269b62ddffe70127bbb9835ae5e7b24e8ce7 (patch) | |
| tree | 469ac5785c44913288437e2ca765042583aeba28 /weed/operation | |
| parent | 3ddcd870983e68e24ad569127f9a989d7bc986dc (diff) | |
| download | seaweedfs-ff66269b62ddffe70127bbb9835ae5e7b24e8ce7.tar.xz seaweedfs-ff66269b62ddffe70127bbb9835ae5e7b24e8ce7.zip | |
use grpc to replace http APIs for batch volume id lookup and batch delete
1. remove batch volume id lookup http API /vol/lookup
2. remove batch delete http API /delete
Diffstat (limited to 'weed/operation')
| -rw-r--r-- | weed/operation/chunked_file.go | 22 | ||||
| -rw-r--r-- | weed/operation/delete_content.go | 109 | ||||
| -rw-r--r-- | weed/operation/grpc_client.go | 53 | ||||
| -rw-r--r-- | weed/operation/lookup.go | 49 |
4 files changed, 158 insertions, 75 deletions
diff --git a/weed/operation/chunked_file.go b/weed/operation/chunked_file.go index 52086514a..69e9b758f 100644 --- a/weed/operation/chunked_file.go +++ b/weed/operation/chunked_file.go @@ -10,8 +10,8 @@ import ( "sync" - "github.com/chrislusf/seaweedfs/weed/glog" "github.com/chrislusf/seaweedfs/weed/util" + "github.com/chrislusf/seaweedfs/weed/glog" ) var ( @@ -70,16 +70,22 @@ func (cm *ChunkManifest) Marshal() ([]byte, error) { } func (cm *ChunkManifest) DeleteChunks(master string) error { - deleteError := 0 + var fileIds []string for _, ci := range cm.Chunks { - if e := DeleteFile(master, ci.Fid, ""); e != nil { - deleteError++ - glog.V(0).Infof("Delete %s error: %v, master: %s", ci.Fid, e, master) - } + fileIds = append(fileIds, ci.Fid) } - if deleteError > 0 { - return errors.New("Not all chunks deleted.") + results, err := DeleteFiles(master, fileIds) + if err != nil { + glog.V(0).Infof("delete %+v: %v", fileIds, err) + return fmt.Errorf("chunk delete: %v", err) } + for _, result := range results { + if result.Error != "" { + glog.V(0).Infof("delete file %+v: %v", result.FileId, result.Error) + return fmt.Errorf("chunk delete %v: %v", result.FileId, result.Error) + } + } + return nil } diff --git a/weed/operation/delete_content.go b/weed/operation/delete_content.go index b77718846..2e414873f 100644 --- a/weed/operation/delete_content.go +++ b/weed/operation/delete_content.go @@ -1,18 +1,15 @@ package operation import ( - "encoding/json" "errors" - "fmt" - "net/url" "strings" "sync" "net/http" - "github.com/chrislusf/seaweedfs/weed/glog" - "github.com/chrislusf/seaweedfs/weed/security" - "github.com/chrislusf/seaweedfs/weed/util" + "github.com/chrislusf/seaweedfs/weed/pb/volume_server_pb" + "context" + "fmt" ) type DeleteResult struct { @@ -22,27 +19,6 @@ type DeleteResult struct { Error string `json:"error,omitempty"` } -func DeleteFromVolumeServer(fileUrlOnVolume string, jwt security.EncodedJwt) error { - err := util.Delete(fileUrlOnVolume, jwt) - if err != nil { - return fmt.Errorf("Failed to delete %s:%v", fileUrlOnVolume, err) - } - return nil -} - -func DeleteFile(master string, fileId string, jwt security.EncodedJwt) error { - fileUrl, err := LookupFileId(master, fileId) - if err != nil { - glog.V(0).Infof("Delete %s lookup: %v, master: %s", fileId, err, master) - return nil - } - err = util.Delete(fileUrl, jwt) - if err != nil { - return fmt.Errorf("Failed to delete %s:%v", fileUrl, err) - } - return nil -} - func ParseFileId(fid string) (vid string, key_cookie string, err error) { commaIndex := strings.Index(fid, ",") if commaIndex <= 0 { @@ -51,20 +27,18 @@ func ParseFileId(fid string) (vid string, key_cookie string, err error) { return fid[:commaIndex], fid[commaIndex+1:], nil } -type DeleteFilesResult struct { - Errors []string - Results []DeleteResult -} +// DeleteFiles batch deletes a list of fileIds +func DeleteFiles(master string, fileIds []string) ([]*volume_server_pb.DeleteResult, error) { + + var ret []*volume_server_pb.DeleteResult -func DeleteFiles(master string, fileIds []string) (*DeleteFilesResult, error) { vid_to_fileIds := make(map[string][]string) - ret := &DeleteFilesResult{} var vids []string for _, fileId := range fileIds { vid, _, err := ParseFileId(fileId) if err != nil { - ret.Results = append(ret.Results, DeleteResult{ - Fid: vid, + ret = append(ret, &volume_server_pb.DeleteResult{ + FileId: vid, Status: http.StatusBadRequest, Error: err.Error()}, ) @@ -85,7 +59,11 @@ func DeleteFiles(master string, fileIds []string) (*DeleteFilesResult, error) { server_to_fileIds := make(map[string][]string) for vid, result := range lookupResults { if result.Error != "" { - ret.Errors = append(ret.Errors, result.Error) + ret = append(ret, &volume_server_pb.DeleteResult{ + FileId: vid, + Status: http.StatusBadRequest, + Error: err.Error()}, + ) continue } for _, location := range result.Locations { @@ -103,25 +81,52 @@ func DeleteFiles(master string, fileIds []string) (*DeleteFilesResult, error) { wg.Add(1) go func(server string, fidList []string) { defer wg.Done() - values := make(url.Values) - for _, fid := range fidList { - values.Add("fid", fid) - } - jsonBlob, err := util.Post("http://"+server+"/delete", values) - if err != nil { - ret.Errors = append(ret.Errors, err.Error()+" "+string(jsonBlob)) - return - } - var result []DeleteResult - err = json.Unmarshal(jsonBlob, &result) - if err != nil { - ret.Errors = append(ret.Errors, err.Error()+" "+string(jsonBlob)) - return + + if deleteResults, deleteErr := DeleteFilesAtOneVolumeServer(server, fidList); deleteErr != nil { + err = deleteErr + } else { + ret = append(ret, deleteResults...) } - ret.Results = append(ret.Results, result...) + }(server, fidList) } wg.Wait() - return ret, nil + return ret, err +} + +// DeleteFilesAtOneVolumeServer deletes a list of files that is on one volume server via gRpc +func DeleteFilesAtOneVolumeServer(volumeServer string, fileIds []string) (ret []*volume_server_pb.DeleteResult, err error) { + + err = withVolumeServerClient(volumeServer, func(volumeServerClient volume_server_pb.VolumeServerClient) error { + + req := &volume_server_pb.BatchDeleteRequest{ + FileIds: fileIds, + } + + resp, err := volumeServerClient.BatchDelete(context.Background(), req) + + fmt.Printf("deleted %v %v: %v\n", fileIds, err, resp) + + if err != nil { + return err + } + + ret = append(ret, resp.Results...) + + return nil + }) + + if err != nil { + return + } + + for _, result := range ret { + if result.Error != "" { + return nil, fmt.Errorf("delete fileId %s: %v", result.FileId, result.Error) + } + } + + return + } diff --git a/weed/operation/grpc_client.go b/weed/operation/grpc_client.go new file mode 100644 index 000000000..c27954c53 --- /dev/null +++ b/weed/operation/grpc_client.go @@ -0,0 +1,53 @@ +package operation + +import ( + "fmt" + "strings" + "strconv" + + "github.com/chrislusf/seaweedfs/weed/glog" + "github.com/chrislusf/seaweedfs/weed/pb/volume_server_pb" + "github.com/chrislusf/seaweedfs/weed/pb/master_pb" + "github.com/chrislusf/seaweedfs/weed/util" +) + +func withVolumeServerClient(volumeServer string, fn func(volume_server_pb.VolumeServerClient) error) error { + + grpcAddress, err := toVolumeServerGrpcAddress(volumeServer) + if err != nil { + return err + } + + grpcConnection, err := util.GrpcDial(grpcAddress) + if err != nil { + return fmt.Errorf("fail to dial %s: %v", grpcAddress, err) + } + defer grpcConnection.Close() + + client := volume_server_pb.NewVolumeServerClient(grpcConnection) + + return fn(client) +} + +func toVolumeServerGrpcAddress(volumeServer string) (grpcAddress string, err error) { + sepIndex := strings.LastIndex(volumeServer, ":") + port, err := strconv.Atoi(volumeServer[sepIndex+1:]) + if err != nil { + glog.Errorf("failed to parse volume server address: %v", volumeServer) + return "", err + } + return fmt.Sprintf("%s:%d", volumeServer[0:sepIndex], port+10000), nil +} + +func withMasterServerClient(masterServer string, fn func(masterClient master_pb.SeaweedClient) error) error { + + grpcConnection, err := util.GrpcDial(masterServer) + if err != nil { + return fmt.Errorf("fail to dial %s: %v", masterServer, err) + } + defer grpcConnection.Close() + + client := master_pb.NewSeaweedClient(grpcConnection) + + return fn(client) +} diff --git a/weed/operation/lookup.go b/weed/operation/lookup.go index 19d9dbb94..25cc65c51 100644 --- a/weed/operation/lookup.go +++ b/weed/operation/lookup.go @@ -10,6 +10,8 @@ import ( "time" "github.com/chrislusf/seaweedfs/weed/util" + "github.com/chrislusf/seaweedfs/weed/pb/master_pb" + "context" ) type Location struct { @@ -95,24 +97,41 @@ func LookupVolumeIds(server string, vids []string) (map[string]LookupResult, err } //only query unknown_vids - values := make(url.Values) - for _, vid := range unknown_vids { - values.Add("volumeId", vid) - } - jsonBlob, err := util.Post("http://"+server+"/vol/lookup", values) + + err := withMasterServerClient(server, func(masterClient master_pb.SeaweedClient) error { + req := &master_pb.LookupVolumeRequest{ + VolumeIds: unknown_vids, + } + resp, grpcErr := masterClient.LookupVolume(context.Background(), req) + if grpcErr != nil { + return grpcErr + } + + //set newly checked vids to cache + for _, vidLocations := range resp.VolumeIdLocations { + var locations []Location + for _, loc := range vidLocations.Locations { + locations = append(locations, Location{ + Url: loc.Url, + PublicUrl: loc.PublicUrl, + }) + } + if vidLocations.Error != "" { + vc.Set(vidLocations.VolumeId, locations, 10*time.Minute) + } + ret[vidLocations.VolumeId] = LookupResult{ + VolumeId: vidLocations.VolumeId, + Locations: locations, + Error: vidLocations.Error, + } + } + + return nil + }) + if err != nil { return nil, err } - err = json.Unmarshal(jsonBlob, &ret) - if err != nil { - return nil, errors.New(err.Error() + " " + string(jsonBlob)) - } - - //set newly checked vids to cache - for _, vid := range unknown_vids { - locations := ret[vid].Locations - vc.Set(vid, locations, 10*time.Minute) - } return ret, nil } |
