diff options
Diffstat (limited to 'weed/operation')
| -rw-r--r-- | weed/operation/assign_file_id.go | 29 | ||||
| -rw-r--r-- | weed/operation/chunked_file.go | 7 | ||||
| -rw-r--r-- | weed/operation/delete_content.go | 16 | ||||
| -rw-r--r-- | weed/operation/grpc_client.go | 49 | ||||
| -rw-r--r-- | weed/operation/lookup.go | 8 | ||||
| -rw-r--r-- | weed/operation/submit.go | 3 | ||||
| -rw-r--r-- | weed/operation/sync_volume.go | 3 | ||||
| -rw-r--r-- | weed/operation/tail_volume.go | 5 |
8 files changed, 46 insertions, 74 deletions
diff --git a/weed/operation/assign_file_id.go b/weed/operation/assign_file_id.go index 8dbdbbe57..9eac69631 100644 --- a/weed/operation/assign_file_id.go +++ b/weed/operation/assign_file_id.go @@ -3,6 +3,7 @@ package operation import ( "context" "fmt" + "github.com/chrislusf/seaweedfs/weed/pb" "github.com/chrislusf/seaweedfs/weed/storage/needle" "google.golang.org/grpc" @@ -22,18 +23,15 @@ type VolumeAssignRequest struct { WritableVolumeCount uint32 } -type AssignResultReplica struct { - Url string `json:"url,omitempty"` - PublicUrl string `json:"publicUrl,omitempty"` -} type AssignResult struct { - Fid string `json:"fid,omitempty"` - Url string `json:"url,omitempty"` - PublicUrl string `json:"publicUrl,omitempty"` - Count uint64 `json:"count,omitempty"` - Error string `json:"error,omitempty"` - Auth security.EncodedJwt `json:"auth,omitempty"` - Replicas []AssignResultReplica `json:"replicas,omitempty"` + Fid string `json:"fid,omitempty"` + Url string `json:"url,omitempty"` + PublicUrl string `json:"publicUrl,omitempty"` + GrpcPort int `json:"grpcPort,omitempty"` + Count uint64 `json:"count,omitempty"` + Error string `json:"error,omitempty"` + Auth security.EncodedJwt `json:"auth,omitempty"` + Replicas []Location `json:"replicas,omitempty"` } func Assign(masterFn GetMasterFn, grpcDialOption grpc.DialOption, primaryRequest *VolumeAssignRequest, alternativeRequests ...*VolumeAssignRequest) (*AssignResult, error) { @@ -70,12 +68,13 @@ func Assign(masterFn GetMasterFn, grpcDialOption grpc.DialOption, primaryRequest ret.Count = resp.Count ret.Fid = resp.Fid - ret.Url = resp.Url - ret.PublicUrl = resp.PublicUrl + ret.Url = resp.Location.Url + ret.PublicUrl = resp.Location.PublicUrl + ret.GrpcPort = int(resp.Location.GrpcPort) ret.Error = resp.Error ret.Auth = security.EncodedJwt(resp.Auth) for _, r := range resp.Replicas { - ret.Replicas = append(ret.Replicas, AssignResultReplica{ + ret.Replicas = append(ret.Replicas, Location{ Url: r.Url, PublicUrl: r.PublicUrl, }) @@ -104,7 +103,7 @@ func Assign(masterFn GetMasterFn, grpcDialOption grpc.DialOption, primaryRequest return ret, lastError } -func LookupJwt(master string, grpcDialOption grpc.DialOption, fileId string) (token security.EncodedJwt) { +func LookupJwt(master pb.ServerAddress, grpcDialOption grpc.DialOption, fileId string) (token security.EncodedJwt) { WithMasterServerClient(master, grpcDialOption, func(masterClient master_pb.SeaweedClient) error { diff --git a/weed/operation/chunked_file.go b/weed/operation/chunked_file.go index 94939f1f3..0227db1bf 100644 --- a/weed/operation/chunked_file.go +++ b/weed/operation/chunked_file.go @@ -4,6 +4,7 @@ import ( "encoding/json" "errors" "fmt" + "github.com/chrislusf/seaweedfs/weed/pb" "io" "io/ioutil" "net/http" @@ -42,7 +43,7 @@ type ChunkManifest struct { type ChunkedFileReader struct { totalSize int64 chunkList []*ChunkInfo - master string + master pb.ServerAddress pos int64 pr *io.PipeReader pw *io.PipeWriter @@ -127,7 +128,7 @@ func readChunkNeedle(fileUrl string, w io.Writer, offset int64, jwt string) (wri return io.Copy(w, resp.Body) } -func NewChunkedFileReader(chunkList []*ChunkInfo, master string, grpcDialOption grpc.DialOption) *ChunkedFileReader { +func NewChunkedFileReader(chunkList []*ChunkInfo, master pb.ServerAddress, grpcDialOption grpc.DialOption) *ChunkedFileReader { var totalSize int64 for _, chunk := range chunkList { totalSize += chunk.Size @@ -176,7 +177,7 @@ func (cf *ChunkedFileReader) WriteTo(w io.Writer) (n int64, err error) { for ; chunkIndex < len(cf.chunkList); chunkIndex++ { ci := cf.chunkList[chunkIndex] // if we need read date from local volume server first? - fileUrl, jwt, lookupError := LookupFileId(func() string { + fileUrl, jwt, lookupError := LookupFileId(func() pb.ServerAddress { return cf.master }, cf.grpcDialOption, ci.Fid) if lookupError != nil { diff --git a/weed/operation/delete_content.go b/weed/operation/delete_content.go index 15d07a52e..d762f51e1 100644 --- a/weed/operation/delete_content.go +++ b/weed/operation/delete_content.go @@ -4,6 +4,7 @@ import ( "context" "errors" "fmt" + "github.com/chrislusf/seaweedfs/weed/pb" "google.golang.org/grpc" "net/http" "strings" @@ -74,7 +75,7 @@ func DeleteFilesWithLookupVolumeId(grpcDialOption grpc.DialOption, fileIds []str return ret, err } - server_to_fileIds := make(map[string][]string) + server_to_fileIds := make(map[pb.ServerAddress][]string) for vid, result := range lookupResults { if result.Error != "" { ret = append(ret, &volume_server_pb.DeleteResult{ @@ -85,11 +86,12 @@ func DeleteFilesWithLookupVolumeId(grpcDialOption grpc.DialOption, fileIds []str continue } for _, location := range result.Locations { - if _, ok := server_to_fileIds[location.Url]; !ok { - server_to_fileIds[location.Url] = make([]string, 0) + serverAddress := location.ServerAddress() + if _, ok := server_to_fileIds[serverAddress]; !ok { + server_to_fileIds[serverAddress] = make([]string, 0) } - server_to_fileIds[location.Url] = append( - server_to_fileIds[location.Url], vid_to_fileIds[vid]...) + server_to_fileIds[serverAddress] = append( + server_to_fileIds[serverAddress], vid_to_fileIds[vid]...) } } @@ -97,7 +99,7 @@ func DeleteFilesWithLookupVolumeId(grpcDialOption grpc.DialOption, fileIds []str var wg sync.WaitGroup for server, fidList := range server_to_fileIds { wg.Add(1) - go func(server string, fidList []string) { + go func(server pb.ServerAddress, fidList []string) { defer wg.Done() if deleteResults, deleteErr := DeleteFilesAtOneVolumeServer(server, grpcDialOption, fidList, false); deleteErr != nil { @@ -119,7 +121,7 @@ func DeleteFilesWithLookupVolumeId(grpcDialOption grpc.DialOption, fileIds []str } // DeleteFilesAtOneVolumeServer deletes a list of files that is on one volume server via gRpc -func DeleteFilesAtOneVolumeServer(volumeServer string, grpcDialOption grpc.DialOption, fileIds []string, includeCookie bool) (ret []*volume_server_pb.DeleteResult, err error) { +func DeleteFilesAtOneVolumeServer(volumeServer pb.ServerAddress, grpcDialOption grpc.DialOption, fileIds []string, includeCookie bool) (ret []*volume_server_pb.DeleteResult, err error) { err = WithVolumeServerClient(volumeServer, grpcDialOption, func(volumeServerClient volume_server_pb.VolumeServerClient) error { diff --git a/weed/operation/grpc_client.go b/weed/operation/grpc_client.go index 39f70343a..743682203 100644 --- a/weed/operation/grpc_client.go +++ b/weed/operation/grpc_client.go @@ -1,68 +1,27 @@ package operation import ( - "fmt" - "github.com/chrislusf/seaweedfs/weed/util" - "strconv" - "strings" - "google.golang.org/grpc" - "github.com/chrislusf/seaweedfs/weed/glog" "github.com/chrislusf/seaweedfs/weed/pb" - "github.com/chrislusf/seaweedfs/weed/pb/filer_pb" "github.com/chrislusf/seaweedfs/weed/pb/master_pb" "github.com/chrislusf/seaweedfs/weed/pb/volume_server_pb" ) -func WithVolumeServerClient(volumeServer string, grpcDialOption grpc.DialOption, fn func(volume_server_pb.VolumeServerClient) error) error { - - grpcAddress, err := toVolumeServerGrpcAddress(volumeServer) - if err != nil { - return fmt.Errorf("failed to parse volume server %v: %v", volumeServer, err) - } +func WithVolumeServerClient(volumeServer pb.ServerAddress, grpcDialOption grpc.DialOption, fn func(volume_server_pb.VolumeServerClient) error) error { return pb.WithCachedGrpcClient(func(grpcConnection *grpc.ClientConn) error { client := volume_server_pb.NewVolumeServerClient(grpcConnection) return fn(client) - }, grpcAddress, grpcDialOption) - -} + }, volumeServer.ToGrpcAddress(), grpcDialOption) -func toVolumeServerGrpcAddress(volumeServer string) (grpcAddress string, err error) { - sepIndex := strings.LastIndex(volumeServer, ":") - port, err := strconv.Atoi(volumeServer[sepIndex+1:]) - if err != nil { - glog.Errorf("failed to parse volume server address: %v", volumeServer) - return "", err - } - return util.JoinHostPort(volumeServer[0:sepIndex], port+10000), nil } -func WithMasterServerClient(masterServer string, grpcDialOption grpc.DialOption, fn func(masterClient master_pb.SeaweedClient) error) error { - - masterGrpcAddress, parseErr := pb.ParseServerToGrpcAddress(masterServer) - if parseErr != nil { - return fmt.Errorf("failed to parse master %v: %v", masterServer, parseErr) - } +func WithMasterServerClient(masterServer pb.ServerAddress, grpcDialOption grpc.DialOption, fn func(masterClient master_pb.SeaweedClient) error) error { return pb.WithCachedGrpcClient(func(grpcConnection *grpc.ClientConn) error { client := master_pb.NewSeaweedClient(grpcConnection) return fn(client) - }, masterGrpcAddress, grpcDialOption) - -} - -func WithFilerServerClient(filerServer string, grpcDialOption grpc.DialOption, fn func(masterClient filer_pb.SeaweedFilerClient) error) error { - - filerGrpcAddress, parseErr := pb.ParseServerToGrpcAddress(filerServer) - if parseErr != nil { - return fmt.Errorf("failed to parse filer %v: %v", filerGrpcAddress, parseErr) - } - - return pb.WithCachedGrpcClient(func(grpcConnection *grpc.ClientConn) error { - client := filer_pb.NewSeaweedFilerClient(grpcConnection) - return fn(client) - }, filerGrpcAddress, grpcDialOption) + }, masterServer.ToGrpcAddress(), grpcDialOption) } diff --git a/weed/operation/lookup.go b/weed/operation/lookup.go index 8717f6b36..daf8cd152 100644 --- a/weed/operation/lookup.go +++ b/weed/operation/lookup.go @@ -4,6 +4,7 @@ import ( "context" "errors" "fmt" + "github.com/chrislusf/seaweedfs/weed/pb" "google.golang.org/grpc" "math/rand" "strings" @@ -15,7 +16,13 @@ import ( type Location struct { Url string `json:"url,omitempty"` PublicUrl string `json:"publicUrl,omitempty"` + GrpcPort int `json:"grpcPort,omitempty"` } + +func (l *Location) ServerAddress() pb.ServerAddress { + return pb.NewServerAddressWithGrpcPort(l.Url, l.GrpcPort) +} + type LookupResult struct { VolumeOrFileId string `json:"volumeOrFileId,omitempty"` Locations []Location `json:"locations,omitempty"` @@ -89,6 +96,7 @@ func LookupVolumeIds(masterFn GetMasterFn, grpcDialOption grpc.DialOption, vids locations = append(locations, Location{ Url: loc.Url, PublicUrl: loc.PublicUrl, + GrpcPort: int(loc.GrpcPort), }) } if vidLocations.Error != "" { diff --git a/weed/operation/submit.go b/weed/operation/submit.go index 80bc6fcb4..648df174a 100644 --- a/weed/operation/submit.go +++ b/weed/operation/submit.go @@ -1,6 +1,7 @@ package operation import ( + "github.com/chrislusf/seaweedfs/weed/pb" "io" "mime" "net/url" @@ -39,7 +40,7 @@ type SubmitResult struct { Error string `json:"error,omitempty"` } -type GetMasterFn func() string +type GetMasterFn func() pb.ServerAddress func SubmitFiles(masterFn GetMasterFn, grpcDialOption grpc.DialOption, files []FilePart, replication string, collection string, dataCenter string, ttl string, diskType string, maxMB int, usePublicUrl bool) ([]SubmitResult, error) { results := make([]SubmitResult, len(files)) diff --git a/weed/operation/sync_volume.go b/weed/operation/sync_volume.go index 5562f12ab..fdd22ac85 100644 --- a/weed/operation/sync_volume.go +++ b/weed/operation/sync_volume.go @@ -2,11 +2,12 @@ package operation import ( "context" + "github.com/chrislusf/seaweedfs/weed/pb" "github.com/chrislusf/seaweedfs/weed/pb/volume_server_pb" "google.golang.org/grpc" ) -func GetVolumeSyncStatus(server string, grpcDialOption grpc.DialOption, vid uint32) (resp *volume_server_pb.VolumeSyncStatusResponse, err error) { +func GetVolumeSyncStatus(server pb.ServerAddress, grpcDialOption grpc.DialOption, vid uint32) (resp *volume_server_pb.VolumeSyncStatusResponse, err error) { WithVolumeServerClient(server, grpcDialOption, func(client volume_server_pb.VolumeServerClient) error { diff --git a/weed/operation/tail_volume.go b/weed/operation/tail_volume.go index e3f2c0664..bedeeb3b5 100644 --- a/weed/operation/tail_volume.go +++ b/weed/operation/tail_volume.go @@ -3,6 +3,7 @@ package operation import ( "context" "fmt" + "github.com/chrislusf/seaweedfs/weed/pb" "io" "google.golang.org/grpc" @@ -21,12 +22,12 @@ func TailVolume(masterFn GetMasterFn, grpcDialOption grpc.DialOption, vid needle return fmt.Errorf("unable to locate volume %d", vid) } - volumeServer := lookup.Locations[0].Url + volumeServer := lookup.Locations[0].ServerAddress() return TailVolumeFromSource(volumeServer, grpcDialOption, vid, sinceNs, timeoutSeconds, fn) } -func TailVolumeFromSource(volumeServer string, grpcDialOption grpc.DialOption, vid needle.VolumeId, sinceNs uint64, idleTimeoutSeconds int, fn func(n *needle.Needle) error) error { +func TailVolumeFromSource(volumeServer pb.ServerAddress, grpcDialOption grpc.DialOption, vid needle.VolumeId, sinceNs uint64, idleTimeoutSeconds int, fn func(n *needle.Needle) error) error { return WithVolumeServerClient(volumeServer, grpcDialOption, func(client volume_server_pb.VolumeServerClient) error { ctx, cancel := context.WithCancel(context.Background()) defer cancel() |
