aboutsummaryrefslogtreecommitdiff
path: root/weed/operation
diff options
context:
space:
mode:
Diffstat (limited to 'weed/operation')
-rw-r--r--weed/operation/assign_file_id.go29
-rw-r--r--weed/operation/chunked_file.go7
-rw-r--r--weed/operation/delete_content.go16
-rw-r--r--weed/operation/grpc_client.go49
-rw-r--r--weed/operation/lookup.go8
-rw-r--r--weed/operation/submit.go3
-rw-r--r--weed/operation/sync_volume.go3
-rw-r--r--weed/operation/tail_volume.go5
8 files changed, 46 insertions, 74 deletions
diff --git a/weed/operation/assign_file_id.go b/weed/operation/assign_file_id.go
index 8dbdbbe57..9eac69631 100644
--- a/weed/operation/assign_file_id.go
+++ b/weed/operation/assign_file_id.go
@@ -3,6 +3,7 @@ package operation
import (
"context"
"fmt"
+ "github.com/chrislusf/seaweedfs/weed/pb"
"github.com/chrislusf/seaweedfs/weed/storage/needle"
"google.golang.org/grpc"
@@ -22,18 +23,15 @@ type VolumeAssignRequest struct {
WritableVolumeCount uint32
}
-type AssignResultReplica struct {
- Url string `json:"url,omitempty"`
- PublicUrl string `json:"publicUrl,omitempty"`
-}
type AssignResult struct {
- Fid string `json:"fid,omitempty"`
- Url string `json:"url,omitempty"`
- PublicUrl string `json:"publicUrl,omitempty"`
- Count uint64 `json:"count,omitempty"`
- Error string `json:"error,omitempty"`
- Auth security.EncodedJwt `json:"auth,omitempty"`
- Replicas []AssignResultReplica `json:"replicas,omitempty"`
+ Fid string `json:"fid,omitempty"`
+ Url string `json:"url,omitempty"`
+ PublicUrl string `json:"publicUrl,omitempty"`
+ GrpcPort int `json:"grpcPort,omitempty"`
+ Count uint64 `json:"count,omitempty"`
+ Error string `json:"error,omitempty"`
+ Auth security.EncodedJwt `json:"auth,omitempty"`
+ Replicas []Location `json:"replicas,omitempty"`
}
func Assign(masterFn GetMasterFn, grpcDialOption grpc.DialOption, primaryRequest *VolumeAssignRequest, alternativeRequests ...*VolumeAssignRequest) (*AssignResult, error) {
@@ -70,12 +68,13 @@ func Assign(masterFn GetMasterFn, grpcDialOption grpc.DialOption, primaryRequest
ret.Count = resp.Count
ret.Fid = resp.Fid
- ret.Url = resp.Url
- ret.PublicUrl = resp.PublicUrl
+ ret.Url = resp.Location.Url
+ ret.PublicUrl = resp.Location.PublicUrl
+ ret.GrpcPort = int(resp.Location.GrpcPort)
ret.Error = resp.Error
ret.Auth = security.EncodedJwt(resp.Auth)
for _, r := range resp.Replicas {
- ret.Replicas = append(ret.Replicas, AssignResultReplica{
+ ret.Replicas = append(ret.Replicas, Location{
Url: r.Url,
PublicUrl: r.PublicUrl,
})
@@ -104,7 +103,7 @@ func Assign(masterFn GetMasterFn, grpcDialOption grpc.DialOption, primaryRequest
return ret, lastError
}
-func LookupJwt(master string, grpcDialOption grpc.DialOption, fileId string) (token security.EncodedJwt) {
+func LookupJwt(master pb.ServerAddress, grpcDialOption grpc.DialOption, fileId string) (token security.EncodedJwt) {
WithMasterServerClient(master, grpcDialOption, func(masterClient master_pb.SeaweedClient) error {
diff --git a/weed/operation/chunked_file.go b/weed/operation/chunked_file.go
index 94939f1f3..0227db1bf 100644
--- a/weed/operation/chunked_file.go
+++ b/weed/operation/chunked_file.go
@@ -4,6 +4,7 @@ import (
"encoding/json"
"errors"
"fmt"
+ "github.com/chrislusf/seaweedfs/weed/pb"
"io"
"io/ioutil"
"net/http"
@@ -42,7 +43,7 @@ type ChunkManifest struct {
type ChunkedFileReader struct {
totalSize int64
chunkList []*ChunkInfo
- master string
+ master pb.ServerAddress
pos int64
pr *io.PipeReader
pw *io.PipeWriter
@@ -127,7 +128,7 @@ func readChunkNeedle(fileUrl string, w io.Writer, offset int64, jwt string) (wri
return io.Copy(w, resp.Body)
}
-func NewChunkedFileReader(chunkList []*ChunkInfo, master string, grpcDialOption grpc.DialOption) *ChunkedFileReader {
+func NewChunkedFileReader(chunkList []*ChunkInfo, master pb.ServerAddress, grpcDialOption grpc.DialOption) *ChunkedFileReader {
var totalSize int64
for _, chunk := range chunkList {
totalSize += chunk.Size
@@ -176,7 +177,7 @@ func (cf *ChunkedFileReader) WriteTo(w io.Writer) (n int64, err error) {
for ; chunkIndex < len(cf.chunkList); chunkIndex++ {
ci := cf.chunkList[chunkIndex]
// if we need read date from local volume server first?
- fileUrl, jwt, lookupError := LookupFileId(func() string {
+ fileUrl, jwt, lookupError := LookupFileId(func() pb.ServerAddress {
return cf.master
}, cf.grpcDialOption, ci.Fid)
if lookupError != nil {
diff --git a/weed/operation/delete_content.go b/weed/operation/delete_content.go
index 15d07a52e..d762f51e1 100644
--- a/weed/operation/delete_content.go
+++ b/weed/operation/delete_content.go
@@ -4,6 +4,7 @@ import (
"context"
"errors"
"fmt"
+ "github.com/chrislusf/seaweedfs/weed/pb"
"google.golang.org/grpc"
"net/http"
"strings"
@@ -74,7 +75,7 @@ func DeleteFilesWithLookupVolumeId(grpcDialOption grpc.DialOption, fileIds []str
return ret, err
}
- server_to_fileIds := make(map[string][]string)
+ server_to_fileIds := make(map[pb.ServerAddress][]string)
for vid, result := range lookupResults {
if result.Error != "" {
ret = append(ret, &volume_server_pb.DeleteResult{
@@ -85,11 +86,12 @@ func DeleteFilesWithLookupVolumeId(grpcDialOption grpc.DialOption, fileIds []str
continue
}
for _, location := range result.Locations {
- if _, ok := server_to_fileIds[location.Url]; !ok {
- server_to_fileIds[location.Url] = make([]string, 0)
+ serverAddress := location.ServerAddress()
+ if _, ok := server_to_fileIds[serverAddress]; !ok {
+ server_to_fileIds[serverAddress] = make([]string, 0)
}
- server_to_fileIds[location.Url] = append(
- server_to_fileIds[location.Url], vid_to_fileIds[vid]...)
+ server_to_fileIds[serverAddress] = append(
+ server_to_fileIds[serverAddress], vid_to_fileIds[vid]...)
}
}
@@ -97,7 +99,7 @@ func DeleteFilesWithLookupVolumeId(grpcDialOption grpc.DialOption, fileIds []str
var wg sync.WaitGroup
for server, fidList := range server_to_fileIds {
wg.Add(1)
- go func(server string, fidList []string) {
+ go func(server pb.ServerAddress, fidList []string) {
defer wg.Done()
if deleteResults, deleteErr := DeleteFilesAtOneVolumeServer(server, grpcDialOption, fidList, false); deleteErr != nil {
@@ -119,7 +121,7 @@ func DeleteFilesWithLookupVolumeId(grpcDialOption grpc.DialOption, fileIds []str
}
// DeleteFilesAtOneVolumeServer deletes a list of files that is on one volume server via gRpc
-func DeleteFilesAtOneVolumeServer(volumeServer string, grpcDialOption grpc.DialOption, fileIds []string, includeCookie bool) (ret []*volume_server_pb.DeleteResult, err error) {
+func DeleteFilesAtOneVolumeServer(volumeServer pb.ServerAddress, grpcDialOption grpc.DialOption, fileIds []string, includeCookie bool) (ret []*volume_server_pb.DeleteResult, err error) {
err = WithVolumeServerClient(volumeServer, grpcDialOption, func(volumeServerClient volume_server_pb.VolumeServerClient) error {
diff --git a/weed/operation/grpc_client.go b/weed/operation/grpc_client.go
index 39f70343a..743682203 100644
--- a/weed/operation/grpc_client.go
+++ b/weed/operation/grpc_client.go
@@ -1,68 +1,27 @@
package operation
import (
- "fmt"
- "github.com/chrislusf/seaweedfs/weed/util"
- "strconv"
- "strings"
-
"google.golang.org/grpc"
- "github.com/chrislusf/seaweedfs/weed/glog"
"github.com/chrislusf/seaweedfs/weed/pb"
- "github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
"github.com/chrislusf/seaweedfs/weed/pb/master_pb"
"github.com/chrislusf/seaweedfs/weed/pb/volume_server_pb"
)
-func WithVolumeServerClient(volumeServer string, grpcDialOption grpc.DialOption, fn func(volume_server_pb.VolumeServerClient) error) error {
-
- grpcAddress, err := toVolumeServerGrpcAddress(volumeServer)
- if err != nil {
- return fmt.Errorf("failed to parse volume server %v: %v", volumeServer, err)
- }
+func WithVolumeServerClient(volumeServer pb.ServerAddress, grpcDialOption grpc.DialOption, fn func(volume_server_pb.VolumeServerClient) error) error {
return pb.WithCachedGrpcClient(func(grpcConnection *grpc.ClientConn) error {
client := volume_server_pb.NewVolumeServerClient(grpcConnection)
return fn(client)
- }, grpcAddress, grpcDialOption)
-
-}
+ }, volumeServer.ToGrpcAddress(), grpcDialOption)
-func toVolumeServerGrpcAddress(volumeServer string) (grpcAddress string, err error) {
- sepIndex := strings.LastIndex(volumeServer, ":")
- port, err := strconv.Atoi(volumeServer[sepIndex+1:])
- if err != nil {
- glog.Errorf("failed to parse volume server address: %v", volumeServer)
- return "", err
- }
- return util.JoinHostPort(volumeServer[0:sepIndex], port+10000), nil
}
-func WithMasterServerClient(masterServer string, grpcDialOption grpc.DialOption, fn func(masterClient master_pb.SeaweedClient) error) error {
-
- masterGrpcAddress, parseErr := pb.ParseServerToGrpcAddress(masterServer)
- if parseErr != nil {
- return fmt.Errorf("failed to parse master %v: %v", masterServer, parseErr)
- }
+func WithMasterServerClient(masterServer pb.ServerAddress, grpcDialOption grpc.DialOption, fn func(masterClient master_pb.SeaweedClient) error) error {
return pb.WithCachedGrpcClient(func(grpcConnection *grpc.ClientConn) error {
client := master_pb.NewSeaweedClient(grpcConnection)
return fn(client)
- }, masterGrpcAddress, grpcDialOption)
-
-}
-
-func WithFilerServerClient(filerServer string, grpcDialOption grpc.DialOption, fn func(masterClient filer_pb.SeaweedFilerClient) error) error {
-
- filerGrpcAddress, parseErr := pb.ParseServerToGrpcAddress(filerServer)
- if parseErr != nil {
- return fmt.Errorf("failed to parse filer %v: %v", filerGrpcAddress, parseErr)
- }
-
- return pb.WithCachedGrpcClient(func(grpcConnection *grpc.ClientConn) error {
- client := filer_pb.NewSeaweedFilerClient(grpcConnection)
- return fn(client)
- }, filerGrpcAddress, grpcDialOption)
+ }, masterServer.ToGrpcAddress(), grpcDialOption)
}
diff --git a/weed/operation/lookup.go b/weed/operation/lookup.go
index 8717f6b36..daf8cd152 100644
--- a/weed/operation/lookup.go
+++ b/weed/operation/lookup.go
@@ -4,6 +4,7 @@ import (
"context"
"errors"
"fmt"
+ "github.com/chrislusf/seaweedfs/weed/pb"
"google.golang.org/grpc"
"math/rand"
"strings"
@@ -15,7 +16,13 @@ import (
type Location struct {
Url string `json:"url,omitempty"`
PublicUrl string `json:"publicUrl,omitempty"`
+ GrpcPort int `json:"grpcPort,omitempty"`
}
+
+func (l *Location) ServerAddress() pb.ServerAddress {
+ return pb.NewServerAddressWithGrpcPort(l.Url, l.GrpcPort)
+}
+
type LookupResult struct {
VolumeOrFileId string `json:"volumeOrFileId,omitempty"`
Locations []Location `json:"locations,omitempty"`
@@ -89,6 +96,7 @@ func LookupVolumeIds(masterFn GetMasterFn, grpcDialOption grpc.DialOption, vids
locations = append(locations, Location{
Url: loc.Url,
PublicUrl: loc.PublicUrl,
+ GrpcPort: int(loc.GrpcPort),
})
}
if vidLocations.Error != "" {
diff --git a/weed/operation/submit.go b/weed/operation/submit.go
index 80bc6fcb4..648df174a 100644
--- a/weed/operation/submit.go
+++ b/weed/operation/submit.go
@@ -1,6 +1,7 @@
package operation
import (
+ "github.com/chrislusf/seaweedfs/weed/pb"
"io"
"mime"
"net/url"
@@ -39,7 +40,7 @@ type SubmitResult struct {
Error string `json:"error,omitempty"`
}
-type GetMasterFn func() string
+type GetMasterFn func() pb.ServerAddress
func SubmitFiles(masterFn GetMasterFn, grpcDialOption grpc.DialOption, files []FilePart, replication string, collection string, dataCenter string, ttl string, diskType string, maxMB int, usePublicUrl bool) ([]SubmitResult, error) {
results := make([]SubmitResult, len(files))
diff --git a/weed/operation/sync_volume.go b/weed/operation/sync_volume.go
index 5562f12ab..fdd22ac85 100644
--- a/weed/operation/sync_volume.go
+++ b/weed/operation/sync_volume.go
@@ -2,11 +2,12 @@ package operation
import (
"context"
+ "github.com/chrislusf/seaweedfs/weed/pb"
"github.com/chrislusf/seaweedfs/weed/pb/volume_server_pb"
"google.golang.org/grpc"
)
-func GetVolumeSyncStatus(server string, grpcDialOption grpc.DialOption, vid uint32) (resp *volume_server_pb.VolumeSyncStatusResponse, err error) {
+func GetVolumeSyncStatus(server pb.ServerAddress, grpcDialOption grpc.DialOption, vid uint32) (resp *volume_server_pb.VolumeSyncStatusResponse, err error) {
WithVolumeServerClient(server, grpcDialOption, func(client volume_server_pb.VolumeServerClient) error {
diff --git a/weed/operation/tail_volume.go b/weed/operation/tail_volume.go
index e3f2c0664..bedeeb3b5 100644
--- a/weed/operation/tail_volume.go
+++ b/weed/operation/tail_volume.go
@@ -3,6 +3,7 @@ package operation
import (
"context"
"fmt"
+ "github.com/chrislusf/seaweedfs/weed/pb"
"io"
"google.golang.org/grpc"
@@ -21,12 +22,12 @@ func TailVolume(masterFn GetMasterFn, grpcDialOption grpc.DialOption, vid needle
return fmt.Errorf("unable to locate volume %d", vid)
}
- volumeServer := lookup.Locations[0].Url
+ volumeServer := lookup.Locations[0].ServerAddress()
return TailVolumeFromSource(volumeServer, grpcDialOption, vid, sinceNs, timeoutSeconds, fn)
}
-func TailVolumeFromSource(volumeServer string, grpcDialOption grpc.DialOption, vid needle.VolumeId, sinceNs uint64, idleTimeoutSeconds int, fn func(n *needle.Needle) error) error {
+func TailVolumeFromSource(volumeServer pb.ServerAddress, grpcDialOption grpc.DialOption, vid needle.VolumeId, sinceNs uint64, idleTimeoutSeconds int, fn func(n *needle.Needle) error) error {
return WithVolumeServerClient(volumeServer, grpcDialOption, func(client volume_server_pb.VolumeServerClient) error {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()