aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorKonstantin Lebedev <9497591+kmlebedev@users.noreply.github.com>2022-08-30 12:05:38 +0500
committerGitHub <noreply@github.com>2022-08-30 00:05:38 -0700
commite16dda88e48d37ac46a4858450ca2c2563910bdc (patch)
tree9263a935b70b34dcd0e5bf3836ed10e80fccc472
parent105702ebe0e1cf7b36ddb075d4cde22b4b3f1bbe (diff)
downloadseaweedfs-e16dda88e48d37ac46a4858450ca2c2563910bdc.tar.xz
seaweedfs-e16dda88e48d37ac46a4858450ca2c2563910bdc.zip
avoid race conditions access to MasterClient.currentMaster (#3538)
https://github.com/seaweedfs/seaweedfs/issues/3510
-rw-r--r--weed/wdclient/masterclient.go35
1 files changed, 23 insertions, 12 deletions
diff --git a/weed/wdclient/masterclient.go b/weed/wdclient/masterclient.go
index d330e9ec6..08bee0f73 100644
--- a/weed/wdclient/masterclient.go
+++ b/weed/wdclient/masterclient.go
@@ -27,10 +27,10 @@ type MasterClient struct {
grpcDialOption grpc.DialOption
vidMap
- vidMapCacheSize int
-
+ vidMapCacheSize int
OnPeerUpdate func(update *master_pb.ClusterNodeUpdate, startFrom time.Time)
OnPeerUpdateLock sync.RWMutex
+ accessLock sync.RWMutex
}
func NewMasterClient(grpcDialOption grpc.DialOption, filerGroup string, clientType string, clientHost pb.ServerAddress, clientDataCenter string, rack string, masters map[string]pb.ServerAddress) *MasterClient {
@@ -61,7 +61,7 @@ func (mc *MasterClient) LookupFileIdWithFallback(fileId string) (fullUrls []stri
if err == nil && len(fullUrls) > 0 {
return
}
- err = pb.WithMasterClient(false, mc.currentMaster, mc.grpcDialOption, false, func(client master_pb.SeaweedClient) error {
+ err = pb.WithMasterClient(false, mc.getCurrentMaster(), mc.grpcDialOption, false, func(client master_pb.SeaweedClient) error {
resp, err := client.LookupVolume(context.Background(), &master_pb.LookupVolumeRequest{
VolumeOrFileIds: []string{fileId},
})
@@ -91,9 +91,21 @@ func (mc *MasterClient) LookupFileIdWithFallback(fileId string) (fullUrls []stri
return
}
+func (mc *MasterClient) getCurrentMaster() pb.ServerAddress {
+ mc.accessLock.RLock()
+ defer mc.accessLock.RUnlock()
+ return mc.currentMaster
+}
+
+func (mc *MasterClient) setCurrentMaster(master pb.ServerAddress) {
+ mc.accessLock.Lock()
+ mc.currentMaster = master
+ mc.accessLock.Unlock()
+}
+
func (mc *MasterClient) GetMaster() pb.ServerAddress {
mc.WaitUntilConnected()
- return mc.currentMaster
+ return mc.getCurrentMaster()
}
func (mc *MasterClient) GetMasters() map[string]pb.ServerAddress {
@@ -103,7 +115,7 @@ func (mc *MasterClient) GetMasters() map[string]pb.ServerAddress {
func (mc *MasterClient) WaitUntilConnected() {
for {
- if mc.currentMaster != "" {
+ if mc.getCurrentMaster() != "" {
return
}
time.Sleep(time.Duration(rand.Int31n(200)) * time.Millisecond)
@@ -151,8 +163,7 @@ func (mc *MasterClient) tryAllMasters() {
for nextHintedLeader != "" {
nextHintedLeader = mc.tryConnectToMaster(nextHintedLeader)
}
-
- mc.currentMaster = ""
+ mc.setCurrentMaster("")
}
}
@@ -204,7 +215,7 @@ func (mc *MasterClient) tryConnectToMaster(master pb.ServerAddress) (nextHintedL
} else {
mc.resetVidMap()
}
- mc.currentMaster = master
+ mc.setCurrentMaster(master)
for {
resp, err := stream.Recv()
@@ -216,8 +227,8 @@ func (mc *MasterClient) tryConnectToMaster(master pb.ServerAddress) (nextHintedL
if resp.VolumeLocation != nil {
// maybe the leader is changed
- if resp.VolumeLocation.Leader != "" && string(mc.currentMaster) != resp.VolumeLocation.Leader {
- glog.V(0).Infof("currentMaster %v redirected to leader %v", mc.currentMaster, resp.VolumeLocation.Leader)
+ if resp.VolumeLocation.Leader != "" && string(mc.getCurrentMaster()) != resp.VolumeLocation.Leader {
+ glog.V(0).Infof("currentMaster %v redirected to leader %v", mc.getCurrentMaster(), resp.VolumeLocation.Leader)
nextHintedLeader = pb.ServerAddress(resp.VolumeLocation.Leader)
stats.MasterClientConnectCounter.WithLabelValues(stats.RedirectedToLeader).Inc()
return nil
@@ -279,10 +290,10 @@ func (mc *MasterClient) updateVidMap(resp *master_pb.KeepConnectedResponse) {
func (mc *MasterClient) WithClient(streamingMode bool, fn func(client master_pb.SeaweedClient) error) error {
return util.Retry("master grpc", func() error {
- for mc.currentMaster == "" {
+ for mc.getCurrentMaster() == "" {
time.Sleep(3 * time.Second)
}
- return pb.WithMasterClient(streamingMode, mc.currentMaster, mc.grpcDialOption, false, func(client master_pb.SeaweedClient) error {
+ return pb.WithMasterClient(streamingMode, mc.getCurrentMaster(), mc.grpcDialOption, false, func(client master_pb.SeaweedClient) error {
return fn(client)
})
})