diff options
Diffstat (limited to 'weed/wdclient/masterclient.go')
| -rw-r--r-- | weed/wdclient/masterclient.go | 67 |
1 files changed, 47 insertions, 20 deletions
diff --git a/weed/wdclient/masterclient.go b/weed/wdclient/masterclient.go index 98442c1af..da46a440b 100644 --- a/weed/wdclient/masterclient.go +++ b/weed/wdclient/masterclient.go @@ -61,7 +61,7 @@ func (mc *MasterClient) LookupFileIdWithFallback(fileId string) (fullUrls []stri if err == nil && len(fullUrls) > 0 { return } - err = pb.WithMasterClient(false, mc.GetMaster(), mc.grpcDialOption, false, func(client master_pb.SeaweedClient) error { + err = pb.WithMasterClient(false, mc.GetMaster(context.Background()), mc.grpcDialOption, false, func(client master_pb.SeaweedClient) error { resp, err := client.LookupVolume(context.Background(), &master_pb.LookupVolumeRequest{ VolumeOrFileIds: []string{fileId}, }) @@ -103,31 +103,43 @@ func (mc *MasterClient) setCurrentMaster(master pb.ServerAddress) { mc.currentMasterLock.Unlock() } -func (mc *MasterClient) GetMaster() pb.ServerAddress { - mc.WaitUntilConnected() +func (mc *MasterClient) GetMaster(ctx context.Context) pb.ServerAddress { + mc.WaitUntilConnected(ctx) return mc.getCurrentMaster() } -func (mc *MasterClient) GetMasters() []pb.ServerAddress { - mc.WaitUntilConnected() +func (mc *MasterClient) GetMasters(ctx context.Context) []pb.ServerAddress { + mc.WaitUntilConnected(ctx) return mc.masters.GetInstances() } -func (mc *MasterClient) WaitUntilConnected() { +func (mc *MasterClient) WaitUntilConnected(ctx context.Context) { for { - if mc.getCurrentMaster() != "" { + select { + case <-ctx.Done(): + glog.V(0).Infof("Connection wait stopped: %v", ctx.Err()) return + default: + if mc.getCurrentMaster() != "" { + return + } + time.Sleep(time.Duration(rand.Int31n(200)) * time.Millisecond) + print(".") } - time.Sleep(time.Duration(rand.Int31n(200)) * time.Millisecond) - print(".") } } -func (mc *MasterClient) KeepConnectedToMaster() { +func (mc *MasterClient) KeepConnectedToMaster(ctx context.Context) { glog.V(1).Infof("%s.%s masterClient bootstraps with masters %v", mc.FilerGroup, mc.clientType, mc.masters) for { - mc.tryAllMasters() - time.Sleep(time.Second) + select { + case <-ctx.Done(): + glog.V(0).Infof("Connection to masters stopped: %v", ctx.Err()) + return + default: + mc.tryAllMasters(ctx) + time.Sleep(time.Second) + } } } @@ -157,23 +169,29 @@ func (mc *MasterClient) FindLeaderFromOtherPeers(myMasterAddress pb.ServerAddres return } -func (mc *MasterClient) tryAllMasters() { +func (mc *MasterClient) tryAllMasters(ctx context.Context) { var nextHintedLeader pb.ServerAddress mc.masters.RefreshBySrvIfAvailable() for _, master := range mc.masters.GetInstances() { - nextHintedLeader = mc.tryConnectToMaster(master) + nextHintedLeader = mc.tryConnectToMaster(ctx, master) for nextHintedLeader != "" { - nextHintedLeader = mc.tryConnectToMaster(nextHintedLeader) + select { + case <-ctx.Done(): + glog.V(0).Infof("Connection attempt to all masters stopped: %v", ctx.Err()) + return + default: + nextHintedLeader = mc.tryConnectToMaster(ctx, nextHintedLeader) + } } mc.setCurrentMaster("") } } -func (mc *MasterClient) tryConnectToMaster(master pb.ServerAddress) (nextHintedLeader pb.ServerAddress) { +func (mc *MasterClient) tryConnectToMaster(ctx context.Context, master pb.ServerAddress) (nextHintedLeader pb.ServerAddress) { glog.V(1).Infof("%s.%s masterClient Connecting to master %v", mc.FilerGroup, mc.clientType, master) stats.MasterClientConnectCounter.WithLabelValues("total").Inc() gprcErr := pb.WithMasterClient(true, master, mc.grpcDialOption, false, func(client master_pb.SeaweedClient) error { - ctx, cancel := context.WithCancel(context.Background()) + ctx, cancel := context.WithCancel(ctx) defer cancel() stream, err := client.KeepConnected(ctx) @@ -229,8 +247,8 @@ func (mc *MasterClient) tryConnectToMaster(master pb.ServerAddress) (nextHintedL if resp.VolumeLocation != nil { // maybe the leader is changed - if resp.VolumeLocation.Leader != "" && string(mc.GetMaster()) != resp.VolumeLocation.Leader { - glog.V(0).Infof("currentMaster %v redirected to leader %v", mc.GetMaster(), resp.VolumeLocation.Leader) + if resp.VolumeLocation.Leader != "" && string(mc.GetMaster(ctx)) != resp.VolumeLocation.Leader { + glog.V(0).Infof("currentMaster %v redirected to leader %v", mc.GetMaster(ctx), resp.VolumeLocation.Leader) nextHintedLeader = pb.ServerAddress(resp.VolumeLocation.Leader) stats.MasterClientConnectCounter.WithLabelValues(stats.RedirectedToLeader).Inc() return nil @@ -254,6 +272,10 @@ func (mc *MasterClient) tryConnectToMaster(master pb.ServerAddress) (nextHintedL } mc.OnPeerUpdateLock.RUnlock() } + if err := ctx.Err(); err != nil { + glog.V(0).Infof("Connection attempt to master stopped: %v", err) + return err + } } }) if gprcErr != nil { @@ -298,8 +320,13 @@ func (mc *MasterClient) updateVidMap(resp *master_pb.KeepConnectedResponse) { } func (mc *MasterClient) WithClient(streamingMode bool, fn func(client master_pb.SeaweedClient) error) error { + getMasterF := func() pb.ServerAddress { return mc.GetMaster(context.Background()) } + return mc.WithClientCustomGetMaster(getMasterF, streamingMode, fn) +} + +func (mc *MasterClient) WithClientCustomGetMaster(getMasterF func() pb.ServerAddress, streamingMode bool, fn func(client master_pb.SeaweedClient) error) error { return util.Retry("master grpc", func() error { - return pb.WithMasterClient(streamingMode, mc.GetMaster(), mc.grpcDialOption, false, func(client master_pb.SeaweedClient) error { + return pb.WithMasterClient(streamingMode, getMasterF(), mc.grpcDialOption, false, func(client master_pb.SeaweedClient) error { return fn(client) }) }) |
