aboutsummaryrefslogtreecommitdiff
path: root/weed/wdclient/masterclient.go
diff options
context:
space:
mode:
Diffstat (limited to 'weed/wdclient/masterclient.go')
-rw-r--r--weed/wdclient/masterclient.go67
1 files changed, 47 insertions, 20 deletions
diff --git a/weed/wdclient/masterclient.go b/weed/wdclient/masterclient.go
index 98442c1af..da46a440b 100644
--- a/weed/wdclient/masterclient.go
+++ b/weed/wdclient/masterclient.go
@@ -61,7 +61,7 @@ func (mc *MasterClient) LookupFileIdWithFallback(fileId string) (fullUrls []stri
if err == nil && len(fullUrls) > 0 {
return
}
- err = pb.WithMasterClient(false, mc.GetMaster(), mc.grpcDialOption, false, func(client master_pb.SeaweedClient) error {
+ err = pb.WithMasterClient(false, mc.GetMaster(context.Background()), mc.grpcDialOption, false, func(client master_pb.SeaweedClient) error {
resp, err := client.LookupVolume(context.Background(), &master_pb.LookupVolumeRequest{
VolumeOrFileIds: []string{fileId},
})
@@ -103,31 +103,43 @@ func (mc *MasterClient) setCurrentMaster(master pb.ServerAddress) {
mc.currentMasterLock.Unlock()
}
-func (mc *MasterClient) GetMaster() pb.ServerAddress {
- mc.WaitUntilConnected()
+func (mc *MasterClient) GetMaster(ctx context.Context) pb.ServerAddress {
+ mc.WaitUntilConnected(ctx)
return mc.getCurrentMaster()
}
-func (mc *MasterClient) GetMasters() []pb.ServerAddress {
- mc.WaitUntilConnected()
+func (mc *MasterClient) GetMasters(ctx context.Context) []pb.ServerAddress {
+ mc.WaitUntilConnected(ctx)
return mc.masters.GetInstances()
}
-func (mc *MasterClient) WaitUntilConnected() {
+func (mc *MasterClient) WaitUntilConnected(ctx context.Context) {
for {
- if mc.getCurrentMaster() != "" {
+ select {
+ case <-ctx.Done():
+ glog.V(0).Infof("Connection wait stopped: %v", ctx.Err())
return
+ default:
+ if mc.getCurrentMaster() != "" {
+ return
+ }
+ time.Sleep(time.Duration(rand.Int31n(200)) * time.Millisecond)
+ print(".")
}
- time.Sleep(time.Duration(rand.Int31n(200)) * time.Millisecond)
- print(".")
}
}
-func (mc *MasterClient) KeepConnectedToMaster() {
+func (mc *MasterClient) KeepConnectedToMaster(ctx context.Context) {
glog.V(1).Infof("%s.%s masterClient bootstraps with masters %v", mc.FilerGroup, mc.clientType, mc.masters)
for {
- mc.tryAllMasters()
- time.Sleep(time.Second)
+ select {
+ case <-ctx.Done():
+ glog.V(0).Infof("Connection to masters stopped: %v", ctx.Err())
+ return
+ default:
+ mc.tryAllMasters(ctx)
+ time.Sleep(time.Second)
+ }
}
}
@@ -157,23 +169,29 @@ func (mc *MasterClient) FindLeaderFromOtherPeers(myMasterAddress pb.ServerAddres
return
}
-func (mc *MasterClient) tryAllMasters() {
+func (mc *MasterClient) tryAllMasters(ctx context.Context) {
var nextHintedLeader pb.ServerAddress
mc.masters.RefreshBySrvIfAvailable()
for _, master := range mc.masters.GetInstances() {
- nextHintedLeader = mc.tryConnectToMaster(master)
+ nextHintedLeader = mc.tryConnectToMaster(ctx, master)
for nextHintedLeader != "" {
- nextHintedLeader = mc.tryConnectToMaster(nextHintedLeader)
+ select {
+ case <-ctx.Done():
+ glog.V(0).Infof("Connection attempt to all masters stopped: %v", ctx.Err())
+ return
+ default:
+ nextHintedLeader = mc.tryConnectToMaster(ctx, nextHintedLeader)
+ }
}
mc.setCurrentMaster("")
}
}
-func (mc *MasterClient) tryConnectToMaster(master pb.ServerAddress) (nextHintedLeader pb.ServerAddress) {
+func (mc *MasterClient) tryConnectToMaster(ctx context.Context, master pb.ServerAddress) (nextHintedLeader pb.ServerAddress) {
glog.V(1).Infof("%s.%s masterClient Connecting to master %v", mc.FilerGroup, mc.clientType, master)
stats.MasterClientConnectCounter.WithLabelValues("total").Inc()
gprcErr := pb.WithMasterClient(true, master, mc.grpcDialOption, false, func(client master_pb.SeaweedClient) error {
- ctx, cancel := context.WithCancel(context.Background())
+ ctx, cancel := context.WithCancel(ctx)
defer cancel()
stream, err := client.KeepConnected(ctx)
@@ -229,8 +247,8 @@ func (mc *MasterClient) tryConnectToMaster(master pb.ServerAddress) (nextHintedL
if resp.VolumeLocation != nil {
// maybe the leader is changed
- if resp.VolumeLocation.Leader != "" && string(mc.GetMaster()) != resp.VolumeLocation.Leader {
- glog.V(0).Infof("currentMaster %v redirected to leader %v", mc.GetMaster(), resp.VolumeLocation.Leader)
+ if resp.VolumeLocation.Leader != "" && string(mc.GetMaster(ctx)) != resp.VolumeLocation.Leader {
+ glog.V(0).Infof("currentMaster %v redirected to leader %v", mc.GetMaster(ctx), resp.VolumeLocation.Leader)
nextHintedLeader = pb.ServerAddress(resp.VolumeLocation.Leader)
stats.MasterClientConnectCounter.WithLabelValues(stats.RedirectedToLeader).Inc()
return nil
@@ -254,6 +272,10 @@ func (mc *MasterClient) tryConnectToMaster(master pb.ServerAddress) (nextHintedL
}
mc.OnPeerUpdateLock.RUnlock()
}
+ if err := ctx.Err(); err != nil {
+ glog.V(0).Infof("Connection attempt to master stopped: %v", err)
+ return err
+ }
}
})
if gprcErr != nil {
@@ -298,8 +320,13 @@ func (mc *MasterClient) updateVidMap(resp *master_pb.KeepConnectedResponse) {
}
func (mc *MasterClient) WithClient(streamingMode bool, fn func(client master_pb.SeaweedClient) error) error {
+ getMasterF := func() pb.ServerAddress { return mc.GetMaster(context.Background()) }
+ return mc.WithClientCustomGetMaster(getMasterF, streamingMode, fn)
+}
+
+func (mc *MasterClient) WithClientCustomGetMaster(getMasterF func() pb.ServerAddress, streamingMode bool, fn func(client master_pb.SeaweedClient) error) error {
return util.Retry("master grpc", func() error {
- return pb.WithMasterClient(streamingMode, mc.GetMaster(), mc.grpcDialOption, false, func(client master_pb.SeaweedClient) error {
+ return pb.WithMasterClient(streamingMode, getMasterF(), mc.grpcDialOption, false, func(client master_pb.SeaweedClient) error {
return fn(client)
})
})