aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--weed/server/master_server.go31
1 files changed, 26 insertions, 5 deletions
diff --git a/weed/server/master_server.go b/weed/server/master_server.go
index 0fdc3944f..98fb6aab1 100644
--- a/weed/server/master_server.go
+++ b/weed/server/master_server.go
@@ -11,6 +11,7 @@ import (
"regexp"
"strings"
"sync"
+ "sync/atomic"
"time"
"github.com/chrislusf/seaweedfs/weed/cluster"
@@ -65,8 +66,8 @@ type MasterServer struct {
boundedLeaderChan chan int
- onPeerUpdateDoneCn chan string
- onPeerUpdateDoneCnExist bool
+ onPeerUpdateDoneCn chan string
+ onPeerUpdateGoroutineCount int32
// notifying clients
clientChansLock sync.RWMutex
@@ -366,14 +367,33 @@ func (ms *MasterServer) OnPeerUpdate(update *master_pb.ClusterNodeUpdate, startF
hashicorpRaft.ServerAddress(peerAddress.ToGrpcAddress()), 0, 0)
}
}
- if ms.onPeerUpdateDoneCnExist {
+ if atomic.LoadInt32(&ms.onPeerUpdateGoroutineCount) > 0 {
ms.onPeerUpdateDoneCn <- peerName
}
} else if isLeader {
go func(peerName string) {
raftServerRemovalTimeAfter := time.After(RaftServerRemovalTime)
+ raftServerPingTicker := time.NewTicker(5 * time.Minute)
+ atomic.AddInt32(&ms.onPeerUpdateGoroutineCount, 1)
+ defer func() {
+ atomic.AddInt32(&ms.onPeerUpdateGoroutineCount, -1)
+ }()
for {
select {
+ case <-raftServerPingTicker.C:
+ err := ms.MasterClient.WithClient(false, func(client master_pb.SeaweedClient) error {
+ _, err := client.Ping(context.Background(), &master_pb.PingRequest{
+ Target: peerName,
+ TargetType: cluster.MasterType,
+ })
+ return err
+ })
+ if err != nil {
+ glog.Warningf("raft server %s ping failed %+v", peerName, err)
+ } else {
+ glog.V(0).Infof("raft server %s remove canceled on ping success", peerName)
+ return
+ }
case <-raftServerRemovalTimeAfter:
err := ms.MasterClient.WithClient(false, func(client master_pb.SeaweedClient) error {
_, err := client.RaftRemoveServer(context.Background(), &master_pb.RaftRemoveServerRequest{
@@ -384,17 +404,18 @@ func (ms *MasterServer) OnPeerUpdate(update *master_pb.ClusterNodeUpdate, startF
})
if err != nil {
glog.Warningf("failed to removing old raft server %s: %v", peerName, err)
+ return
}
glog.V(0).Infof("old raft server %s removed", peerName)
return
case peerDone := <-ms.onPeerUpdateDoneCn:
if peerName == peerDone {
- glog.V(0).Infof("raft server %s remove canceled", peerName)
+ glog.V(0).Infof("raft server %s remove canceled on onPeerUpdate", peerName)
return
}
}
}
}(peerName)
- ms.onPeerUpdateDoneCnExist = true
+ glog.V(0).Infof("wait %v for raft server %s activity, otherwise delete", RaftServerRemovalTime, peerName)
}
}