aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorKonstantin Lebedev <9497591+kmlebedev@users.noreply.github.com>2022-08-01 12:51:41 +0500
committerKonstantin Lebedev <9497591+kmlebedev@users.noreply.github.com>2022-08-01 12:51:41 +0500
commita98f6d66a38b8dd7231191361a4333b46182a407 (patch)
treea3253fd78c9c46a6c929ffa92bdef41a4bf68e8a
parentc88ea31f62315bca22cb0457dca92376b032f8b2 (diff)
downloadseaweedfs-a98f6d66a38b8dd7231191361a4333b46182a407.tar.xz
seaweedfs-a98f6d66a38b8dd7231191361a4333b46182a407.zip
rollback over onPeerupdate implementation of automatic clean-up of failed servers in favor of synchronous ping
-rw-r--r--weed/filer/meta_aggregator.go2
-rw-r--r--weed/server/master_server.go110
2 files changed, 13 insertions, 99 deletions
diff --git a/weed/filer/meta_aggregator.go b/weed/filer/meta_aggregator.go
index 5799e247e..c78dcac95 100644
--- a/weed/filer/meta_aggregator.go
+++ b/weed/filer/meta_aggregator.go
@@ -102,7 +102,7 @@ func (ma *MetaAggregator) loopSubscribeToOneFiler(f *Filer, self pb.ServerAddres
if err != nil {
errLvl := glog.Level(0)
if strings.Contains(err.Error(), "duplicated local subscription detected") {
- errLvl = glog.Level(1)
+ errLvl = glog.Level(4)
}
glog.V(errLvl).Infof("subscribing remote %s meta change: %v", peer, err)
}
diff --git a/weed/server/master_server.go b/weed/server/master_server.go
index d2e98c2a2..1c623388c 100644
--- a/weed/server/master_server.go
+++ b/weed/server/master_server.go
@@ -1,7 +1,6 @@
package weed_server
import (
- "context"
"fmt"
"github.com/chrislusf/seaweedfs/weed/stats"
"net/http"
@@ -32,10 +31,8 @@ import (
)
const (
- SequencerType = "master.sequencer.type"
- SequencerSnowflakeId = "master.sequencer.sequencer_snowflake_id"
- RaftServerRemovalTime = 72 * time.Minute
- ResetRaftServerRemovalTimeMsg = "ResetRaftServerRemovalTime"
+ SequencerType = "master.sequencer.type"
+ SequencerSnowflakeId = "master.sequencer.sequencer_snowflake_id"
)
type MasterOption struct {
@@ -66,9 +63,6 @@ type MasterServer struct {
boundedLeaderChan chan int
- onPeerUpdateDoneCns map[string]*chan string
- onPeerUpdateLock sync.RWMutex
-
// notifying clients
clientChansLock sync.RWMutex
clientChans map[string]chan *master_pb.KeepConnectedResponse
@@ -121,7 +115,6 @@ func NewMasterServer(r *mux.Router, option *MasterOption, peers map[string]pb.Se
ms.boundedLeaderChan = make(chan int, 16)
ms.MasterClient.OnPeerUpdate = ms.OnPeerUpdate
- ms.onPeerUpdateDoneCns = make(map[string]*chan string)
seq := ms.createSequencer(option)
if nil == seq {
@@ -352,97 +345,18 @@ func (ms *MasterServer) OnPeerUpdate(update *master_pb.ClusterNodeUpdate, startF
peerAddress := pb.ServerAddress(update.Address)
peerName := string(peerAddress)
isLeader := ms.Topo.HashicorpRaft.State() == hashicorpRaft.Leader
- if update.IsAdd {
- if isLeader {
- raftServerFound := false
- for _, server := range ms.Topo.HashicorpRaft.GetConfiguration().Configuration().Servers {
- if string(server.ID) == peerName {
- raftServerFound = true
- }
- }
- if !raftServerFound {
- glog.V(0).Infof("adding new raft server: %s", peerName)
- ms.Topo.HashicorpRaft.AddVoter(
- hashicorpRaft.ServerID(peerName),
- hashicorpRaft.ServerAddress(peerAddress.ToGrpcAddress()), 0, 0)
- }
- }
- ms.onPeerUpdateLock.RLock()
- isGtZero := len(ms.onPeerUpdateDoneCns) > 0
- ms.onPeerUpdateLock.RUnlock()
- if isGtZero {
- var chanPtrs []*chan string
- ms.onPeerUpdateLock.RLock()
- for _, cn := range ms.onPeerUpdateDoneCns {
- chanPtrs = append(chanPtrs, cn)
- }
- ms.onPeerUpdateLock.RUnlock()
- for _, onPeerUpdateDoneCn := range chanPtrs {
- *onPeerUpdateDoneCn <- peerName
+ if update.IsAdd && isLeader {
+ raftServerFound := false
+ for _, server := range ms.Topo.HashicorpRaft.GetConfiguration().Configuration().Servers {
+ if string(server.ID) == peerName {
+ raftServerFound = true
}
}
- } else if isLeader {
- if onPeerUpdateDoneCnPrev, ok := ms.onPeerUpdateDoneCns[peerName]; ok {
- *onPeerUpdateDoneCnPrev <- ResetRaftServerRemovalTimeMsg
- return
+ if !raftServerFound {
+ glog.V(0).Infof("adding new raft server: %s", peerName)
+ ms.Topo.HashicorpRaft.AddVoter(
+ hashicorpRaft.ServerID(peerName),
+ hashicorpRaft.ServerAddress(peerAddress.ToGrpcAddress()), 0, 0)
}
- onPeerUpdateDoneCn := make(chan string)
- ms.onPeerUpdateLock.Lock()
- ms.onPeerUpdateDoneCns[peerName] = &onPeerUpdateDoneCn
- ms.onPeerUpdateLock.Unlock()
-
- go func(peerName string) {
- raftServerRemovalTimeAfter := time.After(RaftServerRemovalTime)
- raftServerPingTicker := time.NewTicker(5 * time.Minute)
- defer func() {
- ms.onPeerUpdateLock.Lock()
- delete(ms.onPeerUpdateDoneCns, peerName)
- ms.onPeerUpdateLock.Unlock()
- close(onPeerUpdateDoneCn)
- }()
- for {
- select {
- case <-raftServerPingTicker.C:
- err := ms.MasterClient.WithClient(false, func(client master_pb.SeaweedClient) error {
- _, err := client.Ping(context.Background(), &master_pb.PingRequest{
- Target: peerName,
- TargetType: cluster.MasterType,
- })
- return err
- })
- if err != nil {
- glog.Warningf("raft server %s ping failed %+v", peerName, err)
- } else {
- glog.V(0).Infof("raft server %s remove canceled on ping success", peerName)
- return
- }
- case <-raftServerRemovalTimeAfter:
- err := ms.MasterClient.WithClient(false, func(client master_pb.SeaweedClient) error {
- _, err := client.RaftRemoveServer(context.Background(), &master_pb.RaftRemoveServerRequest{
- Id: peerName,
- Force: false,
- })
- return err
- })
- if err != nil {
- glog.Warningf("failed to removing old raft server %s: %v", peerName, err)
- return
- }
- glog.V(0).Infof("old raft server %s removed", peerName)
- return
- case peerDone := <-onPeerUpdateDoneCn:
- if peerName == peerDone {
- glog.V(0).Infof("raft server %s remove canceled on onPeerUpdate", peerName)
- return
- }
- if peerDone == ResetRaftServerRemovalTimeMsg {
- raftServerRemovalTimeAfter = time.After(RaftServerRemovalTime)
- glog.V(0).Infof("rest wait %v for raft server %s activity, otherwise delete",
- RaftServerRemovalTime, peerName)
- }
- }
- }
- }(peerName)
- glog.V(0).Infof("wait %v for raft server %s activity, otherwise delete", RaftServerRemovalTime, peerName)
}
}