diff options
Diffstat (limited to 'weed/operation')
| -rw-r--r-- | weed/operation/assign_file_id.go | 5 | ||||
| -rw-r--r-- | weed/operation/chunked_file.go | 5 | ||||
| -rw-r--r-- | weed/operation/delete_content.go | 15 | ||||
| -rw-r--r-- | weed/operation/grpc_client.go | 8 | ||||
| -rw-r--r-- | weed/operation/lookup.go | 5 | ||||
| -rw-r--r-- | weed/operation/stats.go | 5 | ||||
| -rw-r--r-- | weed/operation/submit.go | 19 | ||||
| -rw-r--r-- | weed/operation/sync_volume.go | 9 |
8 files changed, 39 insertions, 32 deletions
diff --git a/weed/operation/assign_file_id.go b/weed/operation/assign_file_id.go index acadc88c8..7e7a9059d 100644 --- a/weed/operation/assign_file_id.go +++ b/weed/operation/assign_file_id.go @@ -3,6 +3,7 @@ package operation import ( "context" "fmt" + "google.golang.org/grpc" "strings" "time" @@ -30,7 +31,7 @@ type AssignResult struct { Auth security.EncodedJwt `json:"auth,omitempty"` } -func Assign(server string, primaryRequest *VolumeAssignRequest, alternativeRequests ...*VolumeAssignRequest) (*AssignResult, error) { +func Assign(server string, grpcDialOption grpc.DialOption, primaryRequest *VolumeAssignRequest, alternativeRequests ...*VolumeAssignRequest) (*AssignResult, error) { var requests []*VolumeAssignRequest requests = append(requests, primaryRequest) @@ -44,7 +45,7 @@ func Assign(server string, primaryRequest *VolumeAssignRequest, alternativeReque continue } - lastError = withMasterServerClient(server, func(masterClient master_pb.SeaweedClient) error { + lastError = withMasterServerClient(server, grpcDialOption, func(masterClient master_pb.SeaweedClient) error { ctx, cancel := context.WithTimeout(context.Background(), time.Duration(5*time.Second)) defer cancel() diff --git a/weed/operation/chunked_file.go b/weed/operation/chunked_file.go index 9d8267dee..f3f6e7b00 100644 --- a/weed/operation/chunked_file.go +++ b/weed/operation/chunked_file.go @@ -4,6 +4,7 @@ import ( "encoding/json" "errors" "fmt" + "google.golang.org/grpc" "io" "net/http" "sort" @@ -69,12 +70,12 @@ func (cm *ChunkManifest) Marshal() ([]byte, error) { return json.Marshal(cm) } -func (cm *ChunkManifest) DeleteChunks(master string) error { +func (cm *ChunkManifest) DeleteChunks(master string, grpcDialOption grpc.DialOption) error { var fileIds []string for _, ci := range cm.Chunks { fileIds = append(fileIds, ci.Fid) } - results, err := DeleteFiles(master, fileIds) + results, err := DeleteFiles(master, grpcDialOption, fileIds) if err != nil { glog.V(0).Infof("delete %+v: %v", fileIds, err) return fmt.Errorf("chunk delete: %v", err) diff --git a/weed/operation/delete_content.go b/weed/operation/delete_content.go index 57fc0329e..1df95211e 100644 --- a/weed/operation/delete_content.go +++ b/weed/operation/delete_content.go @@ -4,6 +4,7 @@ import ( "context" "errors" "fmt" + "google.golang.org/grpc" "net/http" "strings" "sync" @@ -28,17 +29,17 @@ func ParseFileId(fid string) (vid string, key_cookie string, err error) { } // DeleteFiles batch deletes a list of fileIds -func DeleteFiles(master string, fileIds []string) ([]*volume_server_pb.DeleteResult, error) { +func DeleteFiles(master string, grpcDialOption grpc.DialOption, fileIds []string) ([]*volume_server_pb.DeleteResult, error) { lookupFunc := func(vids []string) (map[string]LookupResult, error) { - return LookupVolumeIds(master, vids) + return LookupVolumeIds(master, grpcDialOption, vids) } - return DeleteFilesWithLookupVolumeId(fileIds, lookupFunc) + return DeleteFilesWithLookupVolumeId(grpcDialOption, fileIds, lookupFunc) } -func DeleteFilesWithLookupVolumeId(fileIds []string, lookupFunc func(vid []string) (map[string]LookupResult, error)) ([]*volume_server_pb.DeleteResult, error) { +func DeleteFilesWithLookupVolumeId(grpcDialOption grpc.DialOption, fileIds []string, lookupFunc func(vid []string) (map[string]LookupResult, error)) ([]*volume_server_pb.DeleteResult, error) { var ret []*volume_server_pb.DeleteResult @@ -92,7 +93,7 @@ func DeleteFilesWithLookupVolumeId(fileIds []string, lookupFunc func(vid []strin go func(server string, fidList []string) { defer wg.Done() - if deleteResults, deleteErr := DeleteFilesAtOneVolumeServer(server, fidList); deleteErr != nil { + if deleteResults, deleteErr := DeleteFilesAtOneVolumeServer(server, grpcDialOption, fidList); deleteErr != nil { err = deleteErr } else { ret = append(ret, deleteResults...) @@ -106,9 +107,9 @@ func DeleteFilesWithLookupVolumeId(fileIds []string, lookupFunc func(vid []strin } // DeleteFilesAtOneVolumeServer deletes a list of files that is on one volume server via gRpc -func DeleteFilesAtOneVolumeServer(volumeServer string, fileIds []string) (ret []*volume_server_pb.DeleteResult, err error) { +func DeleteFilesAtOneVolumeServer(volumeServer string, grpcDialOption grpc.DialOption, fileIds []string) (ret []*volume_server_pb.DeleteResult, err error) { - err = WithVolumeServerClient(volumeServer, func(volumeServerClient volume_server_pb.VolumeServerClient) error { + err = WithVolumeServerClient(volumeServer, grpcDialOption, func(volumeServerClient volume_server_pb.VolumeServerClient) error { ctx, cancel := context.WithTimeout(context.Background(), time.Duration(5*time.Second)) defer cancel() diff --git a/weed/operation/grpc_client.go b/weed/operation/grpc_client.go index d0931a8d3..a02844657 100644 --- a/weed/operation/grpc_client.go +++ b/weed/operation/grpc_client.go @@ -18,7 +18,7 @@ var ( grpcClientsLock sync.Mutex ) -func WithVolumeServerClient(volumeServer string, fn func(volume_server_pb.VolumeServerClient) error) error { +func WithVolumeServerClient(volumeServer string, grpcDialOption grpc.DialOption, fn func(volume_server_pb.VolumeServerClient) error) error { grpcAddress, err := toVolumeServerGrpcAddress(volumeServer) if err != nil { @@ -28,7 +28,7 @@ func WithVolumeServerClient(volumeServer string, fn func(volume_server_pb.Volume return util.WithCachedGrpcClient(func(grpcConnection *grpc.ClientConn) error { client := volume_server_pb.NewVolumeServerClient(grpcConnection) return fn(client) - }, grpcAddress) + }, grpcAddress, grpcDialOption) } @@ -42,7 +42,7 @@ func toVolumeServerGrpcAddress(volumeServer string) (grpcAddress string, err err return fmt.Sprintf("%s:%d", volumeServer[0:sepIndex], port+10000), nil } -func withMasterServerClient(masterServer string, fn func(masterClient master_pb.SeaweedClient) error) error { +func withMasterServerClient(masterServer string, grpcDialOption grpc.DialOption, fn func(masterClient master_pb.SeaweedClient) error) error { masterGrpcAddress, parseErr := util.ParseServerToGrpcAddress(masterServer, 0) if parseErr != nil { @@ -52,6 +52,6 @@ func withMasterServerClient(masterServer string, fn func(masterClient master_pb. return util.WithCachedGrpcClient(func(grpcConnection *grpc.ClientConn) error { client := master_pb.NewSeaweedClient(grpcConnection) return fn(client) - }, masterGrpcAddress) + }, masterGrpcAddress, grpcDialOption) } diff --git a/weed/operation/lookup.go b/weed/operation/lookup.go index 562a11580..c4040f3e7 100644 --- a/weed/operation/lookup.go +++ b/weed/operation/lookup.go @@ -5,6 +5,7 @@ import ( "encoding/json" "errors" "fmt" + "google.golang.org/grpc" "math/rand" "net/url" "strings" @@ -78,7 +79,7 @@ func LookupFileId(server string, fileId string) (fullUrl string, err error) { } // LookupVolumeIds find volume locations by cache and actual lookup -func LookupVolumeIds(server string, vids []string) (map[string]LookupResult, error) { +func LookupVolumeIds(server string, grpcDialOption grpc.DialOption, vids []string) (map[string]LookupResult, error) { ret := make(map[string]LookupResult) var unknown_vids []string @@ -98,7 +99,7 @@ func LookupVolumeIds(server string, vids []string) (map[string]LookupResult, err //only query unknown_vids - err := withMasterServerClient(server, func(masterClient master_pb.SeaweedClient) error { + err := withMasterServerClient(server, grpcDialOption, func(masterClient master_pb.SeaweedClient) error { ctx, cancel := context.WithTimeout(context.Background(), time.Duration(5*time.Second)) defer cancel() diff --git a/weed/operation/stats.go b/weed/operation/stats.go index 364727272..9f7166864 100644 --- a/weed/operation/stats.go +++ b/weed/operation/stats.go @@ -2,14 +2,15 @@ package operation import ( "context" + "google.golang.org/grpc" "time" "github.com/chrislusf/seaweedfs/weed/pb/master_pb" ) -func Statistics(server string, req *master_pb.StatisticsRequest) (resp *master_pb.StatisticsResponse, err error) { +func Statistics(server string, grpcDialOption grpc.DialOption, req *master_pb.StatisticsRequest) (resp *master_pb.StatisticsResponse, err error) { - err = withMasterServerClient(server, func(masterClient master_pb.SeaweedClient) error { + err = withMasterServerClient(server, grpcDialOption, func(masterClient master_pb.SeaweedClient) error { ctx, cancel := context.WithTimeout(context.Background(), time.Duration(5*time.Second)) defer cancel() diff --git a/weed/operation/submit.go b/weed/operation/submit.go index 374927495..bdf59d966 100644 --- a/weed/operation/submit.go +++ b/weed/operation/submit.go @@ -2,6 +2,7 @@ package operation import ( "bytes" + "google.golang.org/grpc" "io" "mime" "net/url" @@ -36,7 +37,7 @@ type SubmitResult struct { Error string `json:"error,omitempty"` } -func SubmitFiles(master string, files []FilePart, +func SubmitFiles(master string, grpcDialOption grpc.DialOption, files []FilePart, replication string, collection string, dataCenter string, ttl string, maxMB int) ([]SubmitResult, error) { results := make([]SubmitResult, len(files)) for index, file := range files { @@ -49,7 +50,7 @@ func SubmitFiles(master string, files []FilePart, DataCenter: dataCenter, Ttl: ttl, } - ret, err := Assign(master, ar) + ret, err := Assign(master, grpcDialOption, ar) if err != nil { for index, _ := range files { results[index].Error = err.Error() @@ -65,7 +66,7 @@ func SubmitFiles(master string, files []FilePart, file.Replication = replication file.Collection = collection file.DataCenter = dataCenter - results[index].Size, err = file.Upload(maxMB, master, ret.Auth) + results[index].Size, err = file.Upload(maxMB, master, ret.Auth, grpcDialOption) if err != nil { results[index].Error = err.Error() } @@ -108,7 +109,7 @@ func newFilePart(fullPathFilename string) (ret FilePart, err error) { return ret, nil } -func (fi FilePart) Upload(maxMB int, master string, jwt security.EncodedJwt) (retSize uint32, err error) { +func (fi FilePart) Upload(maxMB int, master string, jwt security.EncodedJwt, grpcDialOption grpc.DialOption) (retSize uint32, err error) { fileUrl := "http://" + fi.Server + "/" + fi.Fid if fi.ModTime != 0 { fileUrl += "?ts=" + strconv.Itoa(int(fi.ModTime)) @@ -136,7 +137,7 @@ func (fi FilePart) Upload(maxMB int, master string, jwt security.EncodedJwt) (re Collection: fi.Collection, Ttl: fi.Ttl, } - ret, err = Assign(master, ar) + ret, err = Assign(master, grpcDialOption, ar) if err != nil { return } @@ -149,10 +150,10 @@ func (fi FilePart) Upload(maxMB int, master string, jwt security.EncodedJwt) (re Collection: fi.Collection, Ttl: fi.Ttl, } - ret, err = Assign(master, ar) + ret, err = Assign(master, grpcDialOption, ar) if err != nil { // delete all uploaded chunks - cm.DeleteChunks(master) + cm.DeleteChunks(master, grpcDialOption) return } id = ret.Fid @@ -170,7 +171,7 @@ func (fi FilePart) Upload(maxMB int, master string, jwt security.EncodedJwt) (re ret.Auth) if e != nil { // delete all uploaded chunks - cm.DeleteChunks(master) + cm.DeleteChunks(master, grpcDialOption) return 0, e } cm.Chunks = append(cm.Chunks, @@ -185,7 +186,7 @@ func (fi FilePart) Upload(maxMB int, master string, jwt security.EncodedJwt) (re err = upload_chunked_file_manifest(fileUrl, &cm, jwt) if err != nil { // delete all uploaded chunks - cm.DeleteChunks(master) + cm.DeleteChunks(master, grpcDialOption) } } else { ret, e := Upload(fileUrl, baseName, fi.Reader, false, fi.MimeType, nil, jwt) diff --git a/weed/operation/sync_volume.go b/weed/operation/sync_volume.go index e40c7de41..bf81415c9 100644 --- a/weed/operation/sync_volume.go +++ b/weed/operation/sync_volume.go @@ -3,6 +3,7 @@ package operation import ( "context" "fmt" + "google.golang.org/grpc" "io" "time" @@ -11,9 +12,9 @@ import ( "github.com/chrislusf/seaweedfs/weed/util" ) -func GetVolumeSyncStatus(server string, vid uint32) (resp *volume_server_pb.VolumeSyncStatusResponse, err error) { +func GetVolumeSyncStatus(server string, grpcDialOption grpc.DialOption, vid uint32) (resp *volume_server_pb.VolumeSyncStatusResponse, err error) { - WithVolumeServerClient(server, func(client volume_server_pb.VolumeServerClient) error { + WithVolumeServerClient(server, grpcDialOption, func(client volume_server_pb.VolumeServerClient) error { ctx, cancel := context.WithTimeout(context.Background(), time.Duration(5*time.Second)) defer cancel() @@ -26,9 +27,9 @@ func GetVolumeSyncStatus(server string, vid uint32) (resp *volume_server_pb.Volu return } -func GetVolumeIdxEntries(server string, vid uint32, eachEntryFn func(key NeedleId, offset Offset, size uint32)) error { +func GetVolumeIdxEntries(server string, grpcDialOption grpc.DialOption, vid uint32, eachEntryFn func(key NeedleId, offset Offset, size uint32)) error { - return WithVolumeServerClient(server, func(client volume_server_pb.VolumeServerClient) error { + return WithVolumeServerClient(server, grpcDialOption, func(client volume_server_pb.VolumeServerClient) error { stream, err := client.VolumeSyncIndex(context.Background(), &volume_server_pb.VolumeSyncIndexRequest{ VolumdId: vid, }) |
