aboutsummaryrefslogtreecommitdiff
path: root/weed/operation
diff options
context:
space:
mode:
Diffstat (limited to 'weed/operation')
-rw-r--r--weed/operation/assign_file_id.go5
-rw-r--r--weed/operation/chunked_file.go5
-rw-r--r--weed/operation/delete_content.go15
-rw-r--r--weed/operation/grpc_client.go8
-rw-r--r--weed/operation/lookup.go5
-rw-r--r--weed/operation/stats.go5
-rw-r--r--weed/operation/submit.go19
-rw-r--r--weed/operation/sync_volume.go9
8 files changed, 39 insertions, 32 deletions
diff --git a/weed/operation/assign_file_id.go b/weed/operation/assign_file_id.go
index acadc88c8..7e7a9059d 100644
--- a/weed/operation/assign_file_id.go
+++ b/weed/operation/assign_file_id.go
@@ -3,6 +3,7 @@ package operation
import (
"context"
"fmt"
+ "google.golang.org/grpc"
"strings"
"time"
@@ -30,7 +31,7 @@ type AssignResult struct {
Auth security.EncodedJwt `json:"auth,omitempty"`
}
-func Assign(server string, primaryRequest *VolumeAssignRequest, alternativeRequests ...*VolumeAssignRequest) (*AssignResult, error) {
+func Assign(server string, grpcDialOption grpc.DialOption, primaryRequest *VolumeAssignRequest, alternativeRequests ...*VolumeAssignRequest) (*AssignResult, error) {
var requests []*VolumeAssignRequest
requests = append(requests, primaryRequest)
@@ -44,7 +45,7 @@ func Assign(server string, primaryRequest *VolumeAssignRequest, alternativeReque
continue
}
- lastError = withMasterServerClient(server, func(masterClient master_pb.SeaweedClient) error {
+ lastError = withMasterServerClient(server, grpcDialOption, func(masterClient master_pb.SeaweedClient) error {
ctx, cancel := context.WithTimeout(context.Background(), time.Duration(5*time.Second))
defer cancel()
diff --git a/weed/operation/chunked_file.go b/weed/operation/chunked_file.go
index 9d8267dee..f3f6e7b00 100644
--- a/weed/operation/chunked_file.go
+++ b/weed/operation/chunked_file.go
@@ -4,6 +4,7 @@ import (
"encoding/json"
"errors"
"fmt"
+ "google.golang.org/grpc"
"io"
"net/http"
"sort"
@@ -69,12 +70,12 @@ func (cm *ChunkManifest) Marshal() ([]byte, error) {
return json.Marshal(cm)
}
-func (cm *ChunkManifest) DeleteChunks(master string) error {
+func (cm *ChunkManifest) DeleteChunks(master string, grpcDialOption grpc.DialOption) error {
var fileIds []string
for _, ci := range cm.Chunks {
fileIds = append(fileIds, ci.Fid)
}
- results, err := DeleteFiles(master, fileIds)
+ results, err := DeleteFiles(master, grpcDialOption, fileIds)
if err != nil {
glog.V(0).Infof("delete %+v: %v", fileIds, err)
return fmt.Errorf("chunk delete: %v", err)
diff --git a/weed/operation/delete_content.go b/weed/operation/delete_content.go
index 57fc0329e..1df95211e 100644
--- a/weed/operation/delete_content.go
+++ b/weed/operation/delete_content.go
@@ -4,6 +4,7 @@ import (
"context"
"errors"
"fmt"
+ "google.golang.org/grpc"
"net/http"
"strings"
"sync"
@@ -28,17 +29,17 @@ func ParseFileId(fid string) (vid string, key_cookie string, err error) {
}
// DeleteFiles batch deletes a list of fileIds
-func DeleteFiles(master string, fileIds []string) ([]*volume_server_pb.DeleteResult, error) {
+func DeleteFiles(master string, grpcDialOption grpc.DialOption, fileIds []string) ([]*volume_server_pb.DeleteResult, error) {
lookupFunc := func(vids []string) (map[string]LookupResult, error) {
- return LookupVolumeIds(master, vids)
+ return LookupVolumeIds(master, grpcDialOption, vids)
}
- return DeleteFilesWithLookupVolumeId(fileIds, lookupFunc)
+ return DeleteFilesWithLookupVolumeId(grpcDialOption, fileIds, lookupFunc)
}
-func DeleteFilesWithLookupVolumeId(fileIds []string, lookupFunc func(vid []string) (map[string]LookupResult, error)) ([]*volume_server_pb.DeleteResult, error) {
+func DeleteFilesWithLookupVolumeId(grpcDialOption grpc.DialOption, fileIds []string, lookupFunc func(vid []string) (map[string]LookupResult, error)) ([]*volume_server_pb.DeleteResult, error) {
var ret []*volume_server_pb.DeleteResult
@@ -92,7 +93,7 @@ func DeleteFilesWithLookupVolumeId(fileIds []string, lookupFunc func(vid []strin
go func(server string, fidList []string) {
defer wg.Done()
- if deleteResults, deleteErr := DeleteFilesAtOneVolumeServer(server, fidList); deleteErr != nil {
+ if deleteResults, deleteErr := DeleteFilesAtOneVolumeServer(server, grpcDialOption, fidList); deleteErr != nil {
err = deleteErr
} else {
ret = append(ret, deleteResults...)
@@ -106,9 +107,9 @@ func DeleteFilesWithLookupVolumeId(fileIds []string, lookupFunc func(vid []strin
}
// DeleteFilesAtOneVolumeServer deletes a list of files that is on one volume server via gRpc
-func DeleteFilesAtOneVolumeServer(volumeServer string, fileIds []string) (ret []*volume_server_pb.DeleteResult, err error) {
+func DeleteFilesAtOneVolumeServer(volumeServer string, grpcDialOption grpc.DialOption, fileIds []string) (ret []*volume_server_pb.DeleteResult, err error) {
- err = WithVolumeServerClient(volumeServer, func(volumeServerClient volume_server_pb.VolumeServerClient) error {
+ err = WithVolumeServerClient(volumeServer, grpcDialOption, func(volumeServerClient volume_server_pb.VolumeServerClient) error {
ctx, cancel := context.WithTimeout(context.Background(), time.Duration(5*time.Second))
defer cancel()
diff --git a/weed/operation/grpc_client.go b/weed/operation/grpc_client.go
index d0931a8d3..a02844657 100644
--- a/weed/operation/grpc_client.go
+++ b/weed/operation/grpc_client.go
@@ -18,7 +18,7 @@ var (
grpcClientsLock sync.Mutex
)
-func WithVolumeServerClient(volumeServer string, fn func(volume_server_pb.VolumeServerClient) error) error {
+func WithVolumeServerClient(volumeServer string, grpcDialOption grpc.DialOption, fn func(volume_server_pb.VolumeServerClient) error) error {
grpcAddress, err := toVolumeServerGrpcAddress(volumeServer)
if err != nil {
@@ -28,7 +28,7 @@ func WithVolumeServerClient(volumeServer string, fn func(volume_server_pb.Volume
return util.WithCachedGrpcClient(func(grpcConnection *grpc.ClientConn) error {
client := volume_server_pb.NewVolumeServerClient(grpcConnection)
return fn(client)
- }, grpcAddress)
+ }, grpcAddress, grpcDialOption)
}
@@ -42,7 +42,7 @@ func toVolumeServerGrpcAddress(volumeServer string) (grpcAddress string, err err
return fmt.Sprintf("%s:%d", volumeServer[0:sepIndex], port+10000), nil
}
-func withMasterServerClient(masterServer string, fn func(masterClient master_pb.SeaweedClient) error) error {
+func withMasterServerClient(masterServer string, grpcDialOption grpc.DialOption, fn func(masterClient master_pb.SeaweedClient) error) error {
masterGrpcAddress, parseErr := util.ParseServerToGrpcAddress(masterServer, 0)
if parseErr != nil {
@@ -52,6 +52,6 @@ func withMasterServerClient(masterServer string, fn func(masterClient master_pb.
return util.WithCachedGrpcClient(func(grpcConnection *grpc.ClientConn) error {
client := master_pb.NewSeaweedClient(grpcConnection)
return fn(client)
- }, masterGrpcAddress)
+ }, masterGrpcAddress, grpcDialOption)
}
diff --git a/weed/operation/lookup.go b/weed/operation/lookup.go
index 562a11580..c4040f3e7 100644
--- a/weed/operation/lookup.go
+++ b/weed/operation/lookup.go
@@ -5,6 +5,7 @@ import (
"encoding/json"
"errors"
"fmt"
+ "google.golang.org/grpc"
"math/rand"
"net/url"
"strings"
@@ -78,7 +79,7 @@ func LookupFileId(server string, fileId string) (fullUrl string, err error) {
}
// LookupVolumeIds find volume locations by cache and actual lookup
-func LookupVolumeIds(server string, vids []string) (map[string]LookupResult, error) {
+func LookupVolumeIds(server string, grpcDialOption grpc.DialOption, vids []string) (map[string]LookupResult, error) {
ret := make(map[string]LookupResult)
var unknown_vids []string
@@ -98,7 +99,7 @@ func LookupVolumeIds(server string, vids []string) (map[string]LookupResult, err
//only query unknown_vids
- err := withMasterServerClient(server, func(masterClient master_pb.SeaweedClient) error {
+ err := withMasterServerClient(server, grpcDialOption, func(masterClient master_pb.SeaweedClient) error {
ctx, cancel := context.WithTimeout(context.Background(), time.Duration(5*time.Second))
defer cancel()
diff --git a/weed/operation/stats.go b/weed/operation/stats.go
index 364727272..9f7166864 100644
--- a/weed/operation/stats.go
+++ b/weed/operation/stats.go
@@ -2,14 +2,15 @@ package operation
import (
"context"
+ "google.golang.org/grpc"
"time"
"github.com/chrislusf/seaweedfs/weed/pb/master_pb"
)
-func Statistics(server string, req *master_pb.StatisticsRequest) (resp *master_pb.StatisticsResponse, err error) {
+func Statistics(server string, grpcDialOption grpc.DialOption, req *master_pb.StatisticsRequest) (resp *master_pb.StatisticsResponse, err error) {
- err = withMasterServerClient(server, func(masterClient master_pb.SeaweedClient) error {
+ err = withMasterServerClient(server, grpcDialOption, func(masterClient master_pb.SeaweedClient) error {
ctx, cancel := context.WithTimeout(context.Background(), time.Duration(5*time.Second))
defer cancel()
diff --git a/weed/operation/submit.go b/weed/operation/submit.go
index 374927495..bdf59d966 100644
--- a/weed/operation/submit.go
+++ b/weed/operation/submit.go
@@ -2,6 +2,7 @@ package operation
import (
"bytes"
+ "google.golang.org/grpc"
"io"
"mime"
"net/url"
@@ -36,7 +37,7 @@ type SubmitResult struct {
Error string `json:"error,omitempty"`
}
-func SubmitFiles(master string, files []FilePart,
+func SubmitFiles(master string, grpcDialOption grpc.DialOption, files []FilePart,
replication string, collection string, dataCenter string, ttl string, maxMB int) ([]SubmitResult, error) {
results := make([]SubmitResult, len(files))
for index, file := range files {
@@ -49,7 +50,7 @@ func SubmitFiles(master string, files []FilePart,
DataCenter: dataCenter,
Ttl: ttl,
}
- ret, err := Assign(master, ar)
+ ret, err := Assign(master, grpcDialOption, ar)
if err != nil {
for index, _ := range files {
results[index].Error = err.Error()
@@ -65,7 +66,7 @@ func SubmitFiles(master string, files []FilePart,
file.Replication = replication
file.Collection = collection
file.DataCenter = dataCenter
- results[index].Size, err = file.Upload(maxMB, master, ret.Auth)
+ results[index].Size, err = file.Upload(maxMB, master, ret.Auth, grpcDialOption)
if err != nil {
results[index].Error = err.Error()
}
@@ -108,7 +109,7 @@ func newFilePart(fullPathFilename string) (ret FilePart, err error) {
return ret, nil
}
-func (fi FilePart) Upload(maxMB int, master string, jwt security.EncodedJwt) (retSize uint32, err error) {
+func (fi FilePart) Upload(maxMB int, master string, jwt security.EncodedJwt, grpcDialOption grpc.DialOption) (retSize uint32, err error) {
fileUrl := "http://" + fi.Server + "/" + fi.Fid
if fi.ModTime != 0 {
fileUrl += "?ts=" + strconv.Itoa(int(fi.ModTime))
@@ -136,7 +137,7 @@ func (fi FilePart) Upload(maxMB int, master string, jwt security.EncodedJwt) (re
Collection: fi.Collection,
Ttl: fi.Ttl,
}
- ret, err = Assign(master, ar)
+ ret, err = Assign(master, grpcDialOption, ar)
if err != nil {
return
}
@@ -149,10 +150,10 @@ func (fi FilePart) Upload(maxMB int, master string, jwt security.EncodedJwt) (re
Collection: fi.Collection,
Ttl: fi.Ttl,
}
- ret, err = Assign(master, ar)
+ ret, err = Assign(master, grpcDialOption, ar)
if err != nil {
// delete all uploaded chunks
- cm.DeleteChunks(master)
+ cm.DeleteChunks(master, grpcDialOption)
return
}
id = ret.Fid
@@ -170,7 +171,7 @@ func (fi FilePart) Upload(maxMB int, master string, jwt security.EncodedJwt) (re
ret.Auth)
if e != nil {
// delete all uploaded chunks
- cm.DeleteChunks(master)
+ cm.DeleteChunks(master, grpcDialOption)
return 0, e
}
cm.Chunks = append(cm.Chunks,
@@ -185,7 +186,7 @@ func (fi FilePart) Upload(maxMB int, master string, jwt security.EncodedJwt) (re
err = upload_chunked_file_manifest(fileUrl, &cm, jwt)
if err != nil {
// delete all uploaded chunks
- cm.DeleteChunks(master)
+ cm.DeleteChunks(master, grpcDialOption)
}
} else {
ret, e := Upload(fileUrl, baseName, fi.Reader, false, fi.MimeType, nil, jwt)
diff --git a/weed/operation/sync_volume.go b/weed/operation/sync_volume.go
index e40c7de41..bf81415c9 100644
--- a/weed/operation/sync_volume.go
+++ b/weed/operation/sync_volume.go
@@ -3,6 +3,7 @@ package operation
import (
"context"
"fmt"
+ "google.golang.org/grpc"
"io"
"time"
@@ -11,9 +12,9 @@ import (
"github.com/chrislusf/seaweedfs/weed/util"
)
-func GetVolumeSyncStatus(server string, vid uint32) (resp *volume_server_pb.VolumeSyncStatusResponse, err error) {
+func GetVolumeSyncStatus(server string, grpcDialOption grpc.DialOption, vid uint32) (resp *volume_server_pb.VolumeSyncStatusResponse, err error) {
- WithVolumeServerClient(server, func(client volume_server_pb.VolumeServerClient) error {
+ WithVolumeServerClient(server, grpcDialOption, func(client volume_server_pb.VolumeServerClient) error {
ctx, cancel := context.WithTimeout(context.Background(), time.Duration(5*time.Second))
defer cancel()
@@ -26,9 +27,9 @@ func GetVolumeSyncStatus(server string, vid uint32) (resp *volume_server_pb.Volu
return
}
-func GetVolumeIdxEntries(server string, vid uint32, eachEntryFn func(key NeedleId, offset Offset, size uint32)) error {
+func GetVolumeIdxEntries(server string, grpcDialOption grpc.DialOption, vid uint32, eachEntryFn func(key NeedleId, offset Offset, size uint32)) error {
- return WithVolumeServerClient(server, func(client volume_server_pb.VolumeServerClient) error {
+ return WithVolumeServerClient(server, grpcDialOption, func(client volume_server_pb.VolumeServerClient) error {
stream, err := client.VolumeSyncIndex(context.Background(), &volume_server_pb.VolumeSyncIndexRequest{
VolumdId: vid,
})