aboutsummaryrefslogtreecommitdiff
path: root/weed
diff options
context:
space:
mode:
Diffstat (limited to 'weed')
-rw-r--r--weed/command/backup.go3
-rw-r--r--weed/command/benchmark.go8
-rw-r--r--weed/command/download.go3
-rw-r--r--weed/command/filer_copy.go2
-rw-r--r--weed/command/master.go3
-rw-r--r--weed/command/master_follower.go2
-rw-r--r--weed/command/upload.go4
-rw-r--r--weed/filer/filer.go12
-rw-r--r--weed/mq/broker/broker_server.go5
-rw-r--r--weed/operation/assign_file_id.go6
-rw-r--r--weed/operation/assign_file_id_test.go7
-rw-r--r--weed/operation/chunked_file.go3
-rw-r--r--weed/operation/lookup.go2
-rw-r--r--weed/operation/submit.go3
-rw-r--r--weed/server/filer_server.go4
-rw-r--r--weed/server/master_server.go4
-rw-r--r--weed/server/master_server_handlers_admin.go4
-rw-r--r--weed/server/volume_grpc_admin.go6
-rw-r--r--weed/server/volume_grpc_client_to_master.go2
-rw-r--r--weed/server/volume_grpc_copy.go6
-rw-r--r--weed/server/volume_server_handlers_read.go3
-rw-r--r--weed/shell/command_cluster_check.go2
-rw-r--r--weed/shell/shell_liner.go5
-rw-r--r--weed/wdclient/masterclient.go67
24 files changed, 102 insertions, 64 deletions
diff --git a/weed/command/backup.go b/weed/command/backup.go
index bbb6c6724..a8be4838e 100644
--- a/weed/command/backup.go
+++ b/weed/command/backup.go
@@ -1,6 +1,7 @@
package command
import (
+ "context"
"fmt"
"github.com/seaweedfs/seaweedfs/weed/pb"
@@ -74,7 +75,7 @@ func runBackup(cmd *Command, args []string) bool {
vid := needle.VolumeId(*s.volumeId)
// find volume location, replication, ttl info
- lookup, err := operation.LookupVolumeId(func() pb.ServerAddress { return pb.ServerAddress(*s.master) }, grpcDialOption, vid.String())
+ lookup, err := operation.LookupVolumeId(func(_ context.Context) pb.ServerAddress { return pb.ServerAddress(*s.master) }, grpcDialOption, vid.String())
if err != nil {
fmt.Printf("Error looking up volume %d: %v\n", vid, err)
return true
diff --git a/weed/command/benchmark.go b/weed/command/benchmark.go
index 7f9a23cf8..2a0db47c2 100644
--- a/weed/command/benchmark.go
+++ b/weed/command/benchmark.go
@@ -2,6 +2,7 @@ package command
import (
"bufio"
+ "context"
"fmt"
"github.com/seaweedfs/seaweedfs/weed/pb"
"io"
@@ -128,8 +129,9 @@ func runBenchmark(cmd *Command, args []string) bool {
}
b.masterClient = wdclient.NewMasterClient(b.grpcDialOption, "", "client", "", "", "", *pb.ServerAddresses(*b.masters).ToServiceDiscovery())
- go b.masterClient.KeepConnectedToMaster()
- b.masterClient.WaitUntilConnected()
+ ctx := context.Background()
+ go b.masterClient.KeepConnectedToMaster(ctx)
+ b.masterClient.WaitUntilConnected(ctx)
if *b.write {
benchWrite()
@@ -210,7 +212,7 @@ func writeFiles(idChan chan int, fileIdLineChan chan string, s *stat) {
}
var jwtAuthorization security.EncodedJwt
if isSecure {
- jwtAuthorization = operation.LookupJwt(b.masterClient.GetMaster(), b.grpcDialOption, df.fp.Fid)
+ jwtAuthorization = operation.LookupJwt(b.masterClient.GetMaster(context.Background()), b.grpcDialOption, df.fp.Fid)
}
if e := util.Delete(fmt.Sprintf("http://%s/%s", df.fp.Server, df.fp.Fid), string(jwtAuthorization)); e == nil {
s.completed++
diff --git a/weed/command/download.go b/weed/command/download.go
index de33643fc..060be9f14 100644
--- a/weed/command/download.go
+++ b/weed/command/download.go
@@ -1,6 +1,7 @@
package command
import (
+ "context"
"fmt"
"io"
"net/http"
@@ -50,7 +51,7 @@ func runDownload(cmd *Command, args []string) bool {
grpcDialOption := security.LoadClientTLS(util.GetViper(), "grpc.client")
for _, fid := range args {
- if e := downloadToFile(func() pb.ServerAddress { return pb.ServerAddress(*d.server) }, grpcDialOption, fid, util.ResolvePath(*d.dir)); e != nil {
+ if e := downloadToFile(func(_ context.Context) pb.ServerAddress { return pb.ServerAddress(*d.server) }, grpcDialOption, fid, util.ResolvePath(*d.dir)); e != nil {
fmt.Println("Download Error: ", fid, e)
}
}
diff --git a/weed/command/filer_copy.go b/weed/command/filer_copy.go
index 83c5b167e..df5e002c5 100644
--- a/weed/command/filer_copy.go
+++ b/weed/command/filer_copy.go
@@ -472,7 +472,7 @@ func (worker *FileCopyWorker) uploadFileInChunks(task FileCopyTask, f *os.File,
for _, chunk := range chunks {
fileIds = append(fileIds, chunk.FileId)
}
- operation.DeleteFiles(func() pb.ServerAddress {
+ operation.DeleteFiles(func(_ context.Context) pb.ServerAddress {
return pb.ServerAddress(copy.masters[0])
}, false, worker.options.grpcDialOption, fileIds)
return uploadError
diff --git a/weed/command/master.go b/weed/command/master.go
index 6a32b8abe..f80d8faeb 100644
--- a/weed/command/master.go
+++ b/weed/command/master.go
@@ -1,6 +1,7 @@
package command
import (
+ "context"
"fmt"
"net/http"
"os"
@@ -218,7 +219,7 @@ func startMaster(masterOption MasterOptions, masterWhiteList []string) {
}()
}
- go ms.MasterClient.KeepConnectedToMaster()
+ go ms.MasterClient.KeepConnectedToMaster(context.Background())
// start http server
var (
diff --git a/weed/command/master_follower.go b/weed/command/master_follower.go
index 64583c602..7217aff0b 100644
--- a/weed/command/master_follower.go
+++ b/weed/command/master_follower.go
@@ -140,7 +140,7 @@ func startMasterFollower(masterOptions MasterOptions) {
}
go grpcS.Serve(grpcL)
- go ms.MasterClient.KeepConnectedToMaster()
+ go ms.MasterClient.KeepConnectedToMaster(context.Background())
// start http server
httpS := &http.Server{Handler: r}
diff --git a/weed/command/upload.go b/weed/command/upload.go
index 1f03f7b5a..3e6b8f9a2 100644
--- a/weed/command/upload.go
+++ b/weed/command/upload.go
@@ -97,7 +97,7 @@ func runUpload(cmd *Command, args []string) bool {
if e != nil {
return e
}
- results, e := operation.SubmitFiles(func() pb.ServerAddress { return pb.ServerAddress(*upload.master) }, grpcDialOption, parts, *upload.replication, *upload.collection, *upload.dataCenter, *upload.ttl, *upload.diskType, *upload.maxMB, *upload.usePublicUrl)
+ results, e := operation.SubmitFiles(func(_ context.Context) pb.ServerAddress { return pb.ServerAddress(*upload.master) }, grpcDialOption, parts, *upload.replication, *upload.collection, *upload.dataCenter, *upload.ttl, *upload.diskType, *upload.maxMB, *upload.usePublicUrl)
bytes, _ := json.Marshal(results)
fmt.Println(string(bytes))
if e != nil {
@@ -119,7 +119,7 @@ func runUpload(cmd *Command, args []string) bool {
fmt.Println(e.Error())
return false
}
- results, err := operation.SubmitFiles(func() pb.ServerAddress { return pb.ServerAddress(*upload.master) }, grpcDialOption, parts, *upload.replication, *upload.collection, *upload.dataCenter, *upload.ttl, *upload.diskType, *upload.maxMB, *upload.usePublicUrl)
+ results, err := operation.SubmitFiles(func(_ context.Context) pb.ServerAddress { return pb.ServerAddress(*upload.master) }, grpcDialOption, parts, *upload.replication, *upload.collection, *upload.dataCenter, *upload.ttl, *upload.diskType, *upload.maxMB, *upload.usePublicUrl)
if err != nil {
fmt.Println(err.Error())
return false
diff --git a/weed/filer/filer.go b/weed/filer/filer.go
index 9be8d5259..016bfc8fa 100644
--- a/weed/filer/filer.go
+++ b/weed/filer/filer.go
@@ -143,8 +143,8 @@ func (f *Filer) AggregateFromPeers(self pb.ServerAddress, existingNodes []*maste
}
-func (f *Filer) ListExistingPeerUpdates() (existingNodes []*master_pb.ClusterNodeUpdate) {
- return cluster.ListExistingPeerUpdates(f.GetMaster(), f.GrpcDialOption, f.MasterClient.FilerGroup, cluster.FilerType)
+func (f *Filer) ListExistingPeerUpdates(ctx context.Context) (existingNodes []*master_pb.ClusterNodeUpdate) {
+ return cluster.ListExistingPeerUpdates(f.GetMaster(ctx), f.GrpcDialOption, f.MasterClient.FilerGroup, cluster.FilerType)
}
func (f *Filer) SetStore(store FilerStore) (isFresh bool) {
@@ -177,12 +177,12 @@ func (f *Filer) GetStore() (store FilerStore) {
return f.Store
}
-func (fs *Filer) GetMaster() pb.ServerAddress {
- return fs.MasterClient.GetMaster()
+func (fs *Filer) GetMaster(ctx context.Context) pb.ServerAddress {
+ return fs.MasterClient.GetMaster(ctx)
}
-func (fs *Filer) KeepMasterClientConnected() {
- fs.MasterClient.KeepConnectedToMaster()
+func (fs *Filer) KeepMasterClientConnected(ctx context.Context) {
+ fs.MasterClient.KeepConnectedToMaster(ctx)
}
func (f *Filer) BeginTransaction(ctx context.Context) (context.Context, error) {
diff --git a/weed/mq/broker/broker_server.go b/weed/mq/broker/broker_server.go
index 9c321744b..9e62fe996 100644
--- a/weed/mq/broker/broker_server.go
+++ b/weed/mq/broker/broker_server.go
@@ -1,6 +1,7 @@
package broker
import (
+ "context"
"github.com/seaweedfs/seaweedfs/weed/glog"
"github.com/seaweedfs/seaweedfs/weed/mq/pub_balancer"
"github.com/seaweedfs/seaweedfs/weed/mq/sub_coordinator"
@@ -68,9 +69,9 @@ func NewMessageBroker(option *MessageQueueBrokerOption, grpcDialOption grpc.Dial
pub_broker_balancer.OnAddBroker = mqBroker.Coordinator.OnSubAddBroker
pub_broker_balancer.OnRemoveBroker = mqBroker.Coordinator.OnSubRemoveBroker
- go mqBroker.MasterClient.KeepConnectedToMaster()
+ go mqBroker.MasterClient.KeepConnectedToMaster(context.Background())
- existingNodes := cluster.ListExistingPeerUpdates(mqBroker.MasterClient.GetMaster(), grpcDialOption, option.FilerGroup, cluster.FilerType)
+ existingNodes := cluster.ListExistingPeerUpdates(mqBroker.MasterClient.GetMaster(context.Background()), grpcDialOption, option.FilerGroup, cluster.FilerType)
for _, newNode := range existingNodes {
mqBroker.OnBrokerUpdate(newNode, time.Now())
}
diff --git a/weed/operation/assign_file_id.go b/weed/operation/assign_file_id.go
index 1b7a0146d..cc8e87b21 100644
--- a/weed/operation/assign_file_id.go
+++ b/weed/operation/assign_file_id.go
@@ -47,9 +47,9 @@ func NewAssignProxy(masterFn GetMasterFn, grpcDialOption grpc.DialOption, concur
ap = &AssignProxy{
pool: make(chan *singleThreadAssignProxy, concurrency),
}
- ap.grpcConnection, err = pb.GrpcDial(context.Background(), masterFn().ToGrpcAddress(), true, grpcDialOption)
+ ap.grpcConnection, err = pb.GrpcDial(context.Background(), masterFn(context.Background()).ToGrpcAddress(), true, grpcDialOption)
if err != nil {
- return nil, fmt.Errorf("fail to dial %s: %v", masterFn().ToGrpcAddress(), err)
+ return nil, fmt.Errorf("fail to dial %s: %v", masterFn(context.Background()).ToGrpcAddress(), err)
}
for i := 0; i < concurrency; i++ {
ap.pool <- &singleThreadAssignProxy{}
@@ -153,7 +153,7 @@ func Assign(masterFn GetMasterFn, grpcDialOption grpc.DialOption, primaryRequest
continue
}
- lastError = WithMasterServerClient(false, masterFn(), grpcDialOption, func(masterClient master_pb.SeaweedClient) error {
+ lastError = WithMasterServerClient(false, masterFn(context.Background()), grpcDialOption, func(masterClient master_pb.SeaweedClient) error {
req := &master_pb.AssignRequest{
Count: request.Count,
Replication: request.Replication,
diff --git a/weed/operation/assign_file_id_test.go b/weed/operation/assign_file_id_test.go
index f6362dceb..ac0f4eee6 100644
--- a/weed/operation/assign_file_id_test.go
+++ b/weed/operation/assign_file_id_test.go
@@ -1,6 +1,7 @@
package operation
import (
+ "context"
"fmt"
"github.com/seaweedfs/seaweedfs/weed/pb"
"google.golang.org/grpc"
@@ -11,7 +12,7 @@ import (
func BenchmarkWithConcurrency(b *testing.B) {
concurrencyLevels := []int{1, 10, 100, 1000}
- ap, _ := NewAssignProxy(func() pb.ServerAddress {
+ ap, _ := NewAssignProxy(func(_ context.Context) pb.ServerAddress {
return pb.ServerAddress("localhost:9333")
}, grpc.WithInsecure(), 16)
@@ -47,7 +48,7 @@ func BenchmarkWithConcurrency(b *testing.B) {
}
func BenchmarkStreamAssign(b *testing.B) {
- ap, _ := NewAssignProxy(func() pb.ServerAddress {
+ ap, _ := NewAssignProxy(func(_ context.Context) pb.ServerAddress {
return pb.ServerAddress("localhost:9333")
}, grpc.WithInsecure(), 16)
for i := 0; i < b.N; i++ {
@@ -59,7 +60,7 @@ func BenchmarkStreamAssign(b *testing.B) {
func BenchmarkUnaryAssign(b *testing.B) {
for i := 0; i < b.N; i++ {
- Assign(func() pb.ServerAddress {
+ Assign(func(_ context.Context) pb.ServerAddress {
return pb.ServerAddress("localhost:9333")
}, grpc.WithInsecure(), &VolumeAssignRequest{
Count: 1,
diff --git a/weed/operation/chunked_file.go b/weed/operation/chunked_file.go
index eacf64112..c451420fe 100644
--- a/weed/operation/chunked_file.go
+++ b/weed/operation/chunked_file.go
@@ -1,6 +1,7 @@
package operation
import (
+ "context"
"encoding/json"
"errors"
"fmt"
@@ -173,7 +174,7 @@ func (cf *ChunkedFileReader) WriteTo(w io.Writer) (n int64, err error) {
for ; chunkIndex < len(cf.chunkList); chunkIndex++ {
ci := cf.chunkList[chunkIndex]
// if we need read date from local volume server first?
- fileUrl, jwt, lookupError := LookupFileId(func() pb.ServerAddress {
+ fileUrl, jwt, lookupError := LookupFileId(func(_ context.Context) pb.ServerAddress {
return cf.master
}, cf.grpcDialOption, ci.Fid)
if lookupError != nil {
diff --git a/weed/operation/lookup.go b/weed/operation/lookup.go
index fc4609a2d..6c89c17b1 100644
--- a/weed/operation/lookup.go
+++ b/weed/operation/lookup.go
@@ -80,7 +80,7 @@ func LookupVolumeIds(masterFn GetMasterFn, grpcDialOption grpc.DialOption, vids
//only query unknown_vids
- err := WithMasterServerClient(false, masterFn(), grpcDialOption, func(masterClient master_pb.SeaweedClient) error {
+ err := WithMasterServerClient(false, masterFn(context.Background()), grpcDialOption, func(masterClient master_pb.SeaweedClient) error {
req := &master_pb.LookupVolumeRequest{
VolumeOrFileIds: unknown_vids,
diff --git a/weed/operation/submit.go b/weed/operation/submit.go
index 3eb38c31e..57bd81b14 100644
--- a/weed/operation/submit.go
+++ b/weed/operation/submit.go
@@ -1,6 +1,7 @@
package operation
import (
+ "context"
"github.com/seaweedfs/seaweedfs/weed/pb"
"io"
"mime"
@@ -40,7 +41,7 @@ type SubmitResult struct {
Error string `json:"error,omitempty"`
}
-type GetMasterFn func() pb.ServerAddress
+type GetMasterFn func(ctx context.Context) pb.ServerAddress
func SubmitFiles(masterFn GetMasterFn, grpcDialOption grpc.DialOption, files []FilePart, replication string, collection string, dataCenter string, ttl string, diskType string, maxMB int, usePublicUrl bool) ([]SubmitResult, error) {
results := make([]SubmitResult, len(files))
diff --git a/weed/server/filer_server.go b/weed/server/filer_server.go
index 795cd3ccc..0b7254c0d 100644
--- a/weed/server/filer_server.go
+++ b/weed/server/filer_server.go
@@ -160,7 +160,7 @@ func NewFilerServer(defaultMux, readonlyMux *http.ServeMux, option *FilerOption)
fs.checkWithMaster()
go stats.LoopPushingMetric("filer", string(fs.option.Host), fs.metricsAddress, fs.metricsIntervalSec)
- go fs.filer.KeepMasterClientConnected()
+ go fs.filer.KeepMasterClientConnected(context.Background())
if !util.LoadConfiguration("filer", false) {
v.SetDefault("leveldb2.enabled", true)
@@ -196,7 +196,7 @@ func NewFilerServer(defaultMux, readonlyMux *http.ServeMux, option *FilerOption)
readonlyMux.HandleFunc("/", fs.readonlyFilerHandler)
}
- existingNodes := fs.filer.ListExistingPeerUpdates()
+ existingNodes := fs.filer.ListExistingPeerUpdates(context.Background())
startFromTime := time.Now().Add(-filer.LogFlushInterval)
if option.JoinExistingFiler {
startFromTime = time.Time{}
diff --git a/weed/server/master_server.go b/weed/server/master_server.go
index 580cdfed2..3499a2e13 100644
--- a/weed/server/master_server.go
+++ b/weed/server/master_server.go
@@ -292,12 +292,12 @@ func (ms *MasterServer) startAdminScripts() {
reg, _ := regexp.Compile(`'.*?'|".*?"|\S+`)
- go commandEnv.MasterClient.KeepConnectedToMaster()
+ go commandEnv.MasterClient.KeepConnectedToMaster(context.Background())
go func() {
for {
time.Sleep(time.Duration(sleepMinutes) * time.Minute)
- if ms.Topo.IsLeader() && ms.MasterClient.GetMaster() != "" {
+ if ms.Topo.IsLeader() && ms.MasterClient.GetMaster(context.Background()) != "" {
shellOptions.FilerAddress = ms.GetOneFiler(cluster.FilerGroupName(*shellOptions.FilerGroup))
if shellOptions.FilerAddress == "" {
continue
diff --git a/weed/server/master_server_handlers_admin.go b/weed/server/master_server_handlers_admin.go
index 0d7131340..f40b819af 100644
--- a/weed/server/master_server_handlers_admin.go
+++ b/weed/server/master_server_handlers_admin.go
@@ -124,13 +124,13 @@ func (ms *MasterServer) redirectHandler(w http.ResponseWriter, r *http.Request)
func (ms *MasterServer) submitFromMasterServerHandler(w http.ResponseWriter, r *http.Request) {
if ms.Topo.IsLeader() {
- submitForClientHandler(w, r, func() pb.ServerAddress { return ms.option.Master }, ms.grpcDialOption)
+ submitForClientHandler(w, r, func(ctx context.Context) pb.ServerAddress { return ms.option.Master }, ms.grpcDialOption)
} else {
masterUrl, err := ms.Topo.Leader()
if err != nil {
writeJsonError(w, r, http.StatusInternalServerError, err)
} else {
- submitForClientHandler(w, r, func() pb.ServerAddress { return masterUrl }, ms.grpcDialOption)
+ submitForClientHandler(w, r, func(ctx context.Context) pb.ServerAddress { return masterUrl }, ms.grpcDialOption)
}
}
}
diff --git a/weed/server/volume_grpc_admin.go b/weed/server/volume_grpc_admin.go
index abd39b582..00a285406 100644
--- a/weed/server/volume_grpc_admin.go
+++ b/weed/server/volume_grpc_admin.go
@@ -181,7 +181,7 @@ func (vs *VolumeServer) VolumeMarkReadonly(ctx context.Context, req *volume_serv
}
func (vs *VolumeServer) notifyMasterVolumeReadonly(v *storage.Volume, isReadOnly bool) error {
- if grpcErr := pb.WithMasterClient(false, vs.GetMaster(), vs.grpcDialOption, false, func(client master_pb.SeaweedClient) error {
+ if grpcErr := pb.WithMasterClient(false, vs.GetMaster(context.Background()), vs.grpcDialOption, false, func(client master_pb.SeaweedClient) error {
_, err := client.VolumeMarkReadonly(context.Background(), &master_pb.VolumeMarkReadonlyRequest{
Ip: vs.store.Ip,
Port: uint32(vs.store.Port),
@@ -197,8 +197,8 @@ func (vs *VolumeServer) notifyMasterVolumeReadonly(v *storage.Volume, isReadOnly
}
return nil
}); grpcErr != nil {
- glog.V(0).Infof("connect to %s: %v", vs.GetMaster(), grpcErr)
- return fmt.Errorf("grpc VolumeMarkReadonly with master %s: %v", vs.GetMaster(), grpcErr)
+ glog.V(0).Infof("connect to %s: %v", vs.GetMaster(context.Background()), grpcErr)
+ return fmt.Errorf("grpc VolumeMarkReadonly with master %s: %v", vs.GetMaster(context.Background()), grpcErr)
}
return nil
}
diff --git a/weed/server/volume_grpc_client_to_master.go b/weed/server/volume_grpc_client_to_master.go
index d2aa61a17..81bb87613 100644
--- a/weed/server/volume_grpc_client_to_master.go
+++ b/weed/server/volume_grpc_client_to_master.go
@@ -21,7 +21,7 @@ import (
"github.com/seaweedfs/seaweedfs/weed/util"
)
-func (vs *VolumeServer) GetMaster() pb.ServerAddress {
+func (vs *VolumeServer) GetMaster(ctx context.Context) pb.ServerAddress {
return vs.currentMaster
}
diff --git a/weed/server/volume_grpc_copy.go b/weed/server/volume_grpc_copy.go
index 51b61b225..6548b7c56 100644
--- a/weed/server/volume_grpc_copy.go
+++ b/weed/server/volume_grpc_copy.go
@@ -84,17 +84,17 @@ func (vs *VolumeServer) VolumeCopy(req *volume_server_pb.VolumeCopyRequest, stre
}()
var preallocateSize int64
- if grpcErr := pb.WithMasterClient(false, vs.GetMaster(), vs.grpcDialOption, false, func(client master_pb.SeaweedClient) error {
+ if grpcErr := pb.WithMasterClient(false, vs.GetMaster(context.Background()), vs.grpcDialOption, false, func(client master_pb.SeaweedClient) error {
resp, err := client.GetMasterConfiguration(context.Background(), &master_pb.GetMasterConfigurationRequest{})
if err != nil {
- return fmt.Errorf("get master %s configuration: %v", vs.GetMaster(), err)
+ return fmt.Errorf("get master %s configuration: %v", vs.GetMaster(context.Background()), err)
}
if resp.VolumePreallocate {
preallocateSize = int64(resp.VolumeSizeLimitMB) * (1 << 20)
}
return nil
}); grpcErr != nil {
- glog.V(0).Infof("connect to %s: %v", vs.GetMaster(), grpcErr)
+ glog.V(0).Infof("connect to %s: %v", vs.GetMaster(context.Background()), grpcErr)
}
if preallocateSize > 0 && !hasRemoteDatFile {
diff --git a/weed/server/volume_server_handlers_read.go b/weed/server/volume_server_handlers_read.go
index 08e536811..cc364513b 100644
--- a/weed/server/volume_server_handlers_read.go
+++ b/weed/server/volume_server_handlers_read.go
@@ -2,6 +2,7 @@ package weed_server
import (
"bytes"
+ "context"
"encoding/json"
"errors"
"fmt"
@@ -291,7 +292,7 @@ func (vs *VolumeServer) tryHandleChunkedFile(n *needle.Needle, fileName string,
w.Header().Set("X-File-Store", "chunked")
- chunkedFileReader := operation.NewChunkedFileReader(chunkManifest.Chunks, vs.GetMaster(), vs.grpcDialOption)
+ chunkedFileReader := operation.NewChunkedFileReader(chunkManifest.Chunks, vs.GetMaster(context.Background()), vs.grpcDialOption)
defer chunkedFileReader.Close()
rs := conditionallyCropImages(chunkedFileReader, ext, r)
diff --git a/weed/shell/command_cluster_check.go b/weed/shell/command_cluster_check.go
index 03acca5b2..5c9866d29 100644
--- a/weed/shell/command_cluster_check.go
+++ b/weed/shell/command_cluster_check.go
@@ -103,7 +103,7 @@ func (c *commandClusterCheck) Do(args []string, commandEnv *CommandEnv, writer i
// collect all masters
var masters []pb.ServerAddress
- masters = append(masters, commandEnv.MasterClient.GetMasters()...)
+ masters = append(masters, commandEnv.MasterClient.GetMasters(context.Background())...)
// check from master to volume servers
for _, master := range masters {
diff --git a/weed/shell/shell_liner.go b/weed/shell/shell_liner.go
index 20add302a..407dce006 100644
--- a/weed/shell/shell_liner.go
+++ b/weed/shell/shell_liner.go
@@ -46,8 +46,9 @@ func RunShell(options ShellOptions) {
commandEnv := NewCommandEnv(&options)
- go commandEnv.MasterClient.KeepConnectedToMaster()
- commandEnv.MasterClient.WaitUntilConnected()
+ ctx := context.Background()
+ go commandEnv.MasterClient.KeepConnectedToMaster(ctx)
+ commandEnv.MasterClient.WaitUntilConnected(ctx)
if commandEnv.option.FilerAddress == "" {
var filers []pb.ServerAddress
diff --git a/weed/wdclient/masterclient.go b/weed/wdclient/masterclient.go
index 98442c1af..da46a440b 100644
--- a/weed/wdclient/masterclient.go
+++ b/weed/wdclient/masterclient.go
@@ -61,7 +61,7 @@ func (mc *MasterClient) LookupFileIdWithFallback(fileId string) (fullUrls []stri
if err == nil && len(fullUrls) > 0 {
return
}
- err = pb.WithMasterClient(false, mc.GetMaster(), mc.grpcDialOption, false, func(client master_pb.SeaweedClient) error {
+ err = pb.WithMasterClient(false, mc.GetMaster(context.Background()), mc.grpcDialOption, false, func(client master_pb.SeaweedClient) error {
resp, err := client.LookupVolume(context.Background(), &master_pb.LookupVolumeRequest{
VolumeOrFileIds: []string{fileId},
})
@@ -103,31 +103,43 @@ func (mc *MasterClient) setCurrentMaster(master pb.ServerAddress) {
mc.currentMasterLock.Unlock()
}
-func (mc *MasterClient) GetMaster() pb.ServerAddress {
- mc.WaitUntilConnected()
+func (mc *MasterClient) GetMaster(ctx context.Context) pb.ServerAddress {
+ mc.WaitUntilConnected(ctx)
return mc.getCurrentMaster()
}
-func (mc *MasterClient) GetMasters() []pb.ServerAddress {
- mc.WaitUntilConnected()
+func (mc *MasterClient) GetMasters(ctx context.Context) []pb.ServerAddress {
+ mc.WaitUntilConnected(ctx)
return mc.masters.GetInstances()
}
-func (mc *MasterClient) WaitUntilConnected() {
+func (mc *MasterClient) WaitUntilConnected(ctx context.Context) {
for {
- if mc.getCurrentMaster() != "" {
+ select {
+ case <-ctx.Done():
+ glog.V(0).Infof("Connection wait stopped: %v", ctx.Err())
return
+ default:
+ if mc.getCurrentMaster() != "" {
+ return
+ }
+ time.Sleep(time.Duration(rand.Int31n(200)) * time.Millisecond)
+ print(".")
}
- time.Sleep(time.Duration(rand.Int31n(200)) * time.Millisecond)
- print(".")
}
}
-func (mc *MasterClient) KeepConnectedToMaster() {
+func (mc *MasterClient) KeepConnectedToMaster(ctx context.Context) {
glog.V(1).Infof("%s.%s masterClient bootstraps with masters %v", mc.FilerGroup, mc.clientType, mc.masters)
for {
- mc.tryAllMasters()
- time.Sleep(time.Second)
+ select {
+ case <-ctx.Done():
+ glog.V(0).Infof("Connection to masters stopped: %v", ctx.Err())
+ return
+ default:
+ mc.tryAllMasters(ctx)
+ time.Sleep(time.Second)
+ }
}
}
@@ -157,23 +169,29 @@ func (mc *MasterClient) FindLeaderFromOtherPeers(myMasterAddress pb.ServerAddres
return
}
-func (mc *MasterClient) tryAllMasters() {
+func (mc *MasterClient) tryAllMasters(ctx context.Context) {
var nextHintedLeader pb.ServerAddress
mc.masters.RefreshBySrvIfAvailable()
for _, master := range mc.masters.GetInstances() {
- nextHintedLeader = mc.tryConnectToMaster(master)
+ nextHintedLeader = mc.tryConnectToMaster(ctx, master)
for nextHintedLeader != "" {
- nextHintedLeader = mc.tryConnectToMaster(nextHintedLeader)
+ select {
+ case <-ctx.Done():
+ glog.V(0).Infof("Connection attempt to all masters stopped: %v", ctx.Err())
+ return
+ default:
+ nextHintedLeader = mc.tryConnectToMaster(ctx, nextHintedLeader)
+ }
}
mc.setCurrentMaster("")
}
}
-func (mc *MasterClient) tryConnectToMaster(master pb.ServerAddress) (nextHintedLeader pb.ServerAddress) {
+func (mc *MasterClient) tryConnectToMaster(ctx context.Context, master pb.ServerAddress) (nextHintedLeader pb.ServerAddress) {
glog.V(1).Infof("%s.%s masterClient Connecting to master %v", mc.FilerGroup, mc.clientType, master)
stats.MasterClientConnectCounter.WithLabelValues("total").Inc()
gprcErr := pb.WithMasterClient(true, master, mc.grpcDialOption, false, func(client master_pb.SeaweedClient) error {
- ctx, cancel := context.WithCancel(context.Background())
+ ctx, cancel := context.WithCancel(ctx)
defer cancel()
stream, err := client.KeepConnected(ctx)
@@ -229,8 +247,8 @@ func (mc *MasterClient) tryConnectToMaster(master pb.ServerAddress) (nextHintedL
if resp.VolumeLocation != nil {
// maybe the leader is changed
- if resp.VolumeLocation.Leader != "" && string(mc.GetMaster()) != resp.VolumeLocation.Leader {
- glog.V(0).Infof("currentMaster %v redirected to leader %v", mc.GetMaster(), resp.VolumeLocation.Leader)
+ if resp.VolumeLocation.Leader != "" && string(mc.GetMaster(ctx)) != resp.VolumeLocation.Leader {
+ glog.V(0).Infof("currentMaster %v redirected to leader %v", mc.GetMaster(ctx), resp.VolumeLocation.Leader)
nextHintedLeader = pb.ServerAddress(resp.VolumeLocation.Leader)
stats.MasterClientConnectCounter.WithLabelValues(stats.RedirectedToLeader).Inc()
return nil
@@ -254,6 +272,10 @@ func (mc *MasterClient) tryConnectToMaster(master pb.ServerAddress) (nextHintedL
}
mc.OnPeerUpdateLock.RUnlock()
}
+ if err := ctx.Err(); err != nil {
+ glog.V(0).Infof("Connection attempt to master stopped: %v", err)
+ return err
+ }
}
})
if gprcErr != nil {
@@ -298,8 +320,13 @@ func (mc *MasterClient) updateVidMap(resp *master_pb.KeepConnectedResponse) {
}
func (mc *MasterClient) WithClient(streamingMode bool, fn func(client master_pb.SeaweedClient) error) error {
+ getMasterF := func() pb.ServerAddress { return mc.GetMaster(context.Background()) }
+ return mc.WithClientCustomGetMaster(getMasterF, streamingMode, fn)
+}
+
+func (mc *MasterClient) WithClientCustomGetMaster(getMasterF func() pb.ServerAddress, streamingMode bool, fn func(client master_pb.SeaweedClient) error) error {
return util.Retry("master grpc", func() error {
- return pb.WithMasterClient(streamingMode, mc.GetMaster(), mc.grpcDialOption, false, func(client master_pb.SeaweedClient) error {
+ return pb.WithMasterClient(streamingMode, getMasterF(), mc.grpcDialOption, false, func(client master_pb.SeaweedClient) error {
return fn(client)
})
})