diff options
| author | Konstantin Lebedev <9497591+kmlebedev@users.noreply.github.com> | 2022-04-07 15:31:37 +0500 |
|---|---|---|
| committer | Konstantin Lebedev <9497591+kmlebedev@users.noreply.github.com> | 2022-04-07 15:31:37 +0500 |
| commit | 85d80fd36d1b6a70922c0d39dc3ebc6b1197ac7c (patch) | |
| tree | 60c3ca3391f9eeea0e91d3c8e296b07b8cb9d9a7 /weed/server | |
| parent | 14a2cc83bf35c2c30bb2306cca128ca8a1eb8286 (diff) | |
| download | seaweedfs-85d80fd36d1b6a70922c0d39dc3ebc6b1197ac7c.tar.xz seaweedfs-85d80fd36d1b6a70922c0d39dc3ebc6b1197ac7c.zip | |
fix removing old raft server
Diffstat (limited to 'weed/server')
| -rw-r--r-- | weed/server/master_grpc_server_raft.go | 18 | ||||
| -rw-r--r-- | weed/server/master_server.go | 49 |
2 files changed, 52 insertions, 15 deletions
diff --git a/weed/server/master_grpc_server_raft.go b/weed/server/master_grpc_server_raft.go index 71ab6a1da..37491b3df 100644 --- a/weed/server/master_grpc_server_raft.go +++ b/weed/server/master_grpc_server_raft.go @@ -2,6 +2,8 @@ package weed_server import ( "context" + "fmt" + "github.com/chrislusf/seaweedfs/weed/cluster" "github.com/chrislusf/seaweedfs/weed/pb/master_pb" "github.com/hashicorp/raft" ) @@ -23,6 +25,9 @@ func (ms *MasterServer) RaftListClusterServers(ctx context.Context, req *master_ func (ms *MasterServer) RaftAddServer(ctx context.Context, req *master_pb.RaftAddServerRequest) (*master_pb.RaftAddServerResponse, error) { resp := &master_pb.RaftAddServerResponse{} + if ms.Topo.HashicorpRaft.State() != raft.Leader { + return nil, fmt.Errorf("raft add server %s failed: %s is no current leader", req.Id, ms.Topo.HashicorpRaft.String()) + } var idxFuture raft.IndexFuture if req.Voter { @@ -40,6 +45,19 @@ func (ms *MasterServer) RaftAddServer(ctx context.Context, req *master_pb.RaftAd func (ms *MasterServer) RaftRemoveServer(ctx context.Context, req *master_pb.RaftRemoveServerRequest) (*master_pb.RaftRemoveServerResponse, error) { resp := &master_pb.RaftRemoveServerResponse{} + if ms.Topo.HashicorpRaft.State() != raft.Leader { + return nil, fmt.Errorf("raft remove server %s failed: %s is no current leader", req.Id, ms.Topo.HashicorpRaft.String()) + } + + if !req.Force { + ms.clientChansLock.RLock() + _, ok := ms.clientChans[fmt.Sprintf("%s@%s", cluster.MasterType, req.Id)] + ms.clientChansLock.RUnlock() + if ok { + return resp, fmt.Errorf("raft remove server %s failed: client connection to master exists", req.Id) + } + } + idxFuture := ms.Topo.HashicorpRaft.RemoveServer(raft.ServerID(req.Id), 0, 0) if err := idxFuture.Error(); err != nil { return nil, err diff --git a/weed/server/master_server.go b/weed/server/master_server.go index 2c626b511..0aba7b957 100644 --- a/weed/server/master_server.go +++ b/weed/server/master_server.go @@ -34,7 +34,7 @@ import ( const ( SequencerType = "master.sequencer.type" SequencerSnowflakeId = "master.sequencer.sequencer_snowflake_id" - RaftServerRemovalTime = 72 * time.Hour + RaftServerRemovalTime = 72 * time.Minute ) type MasterOption struct { @@ -64,7 +64,9 @@ type MasterServer struct { vgCh chan *topology.VolumeGrowRequest boundedLeaderChan chan int - onPeerUpdatDoneCn chan string + + onPeerUpdatDoneCn chan string + onPeerUpdatDoneCnExist bool // notifying clients clientChansLock sync.RWMutex @@ -117,6 +119,7 @@ func NewMasterServer(r *mux.Router, option *MasterOption, peers map[string]pb.Se } ms.boundedLeaderChan = make(chan int, 16) ms.onPeerUpdatDoneCn = make(chan string) + ms.MasterClient.OnPeerUpdate = ms.OnPeerUpdate seq := ms.createSequencer(option) @@ -336,32 +339,48 @@ func (ms *MasterServer) OnPeerUpdate(update *master_pb.ClusterNodeUpdate) { return } peerAddress := pb.ServerAddress(update.Address) + peerName := string(peerAddress) if update.IsAdd { if ms.Topo.HashicorpRaft.State() == hashicorpRaft.Leader { - glog.V(0).Infof("adding new raft server: %s", peerAddress.String()) - ms.Topo.HashicorpRaft.AddVoter( - hashicorpRaft.ServerID(peerAddress.String()), - hashicorpRaft.ServerAddress(peerAddress.ToGrpcAddress()), 0, 0) + raftServerFound := false + for _, server := range ms.Topo.HashicorpRaft.GetConfiguration().Configuration().Servers { + if string(server.ID) == peerName { + raftServerFound = true + } + } + if !raftServerFound { + glog.V(0).Infof("adding new raft server: %s", peerAddress.String()) + ms.Topo.HashicorpRaft.AddVoter( + hashicorpRaft.ServerID(peerName), + hashicorpRaft.ServerAddress(peerAddress.ToGrpcAddress()), 0, 0) + } + } + if ms.onPeerUpdatDoneCnExist { + ms.onPeerUpdatDoneCn <- string(peerAddress) } - ms.onPeerUpdatDoneCn <- string(peerAddress) } else if ms.Topo.HashicorpRaft.State() == hashicorpRaft.Leader { go func(peerName string) { for { select { - case peerDone := <-ms.onPeerUpdatDoneCn: - if peerName == peerDone { - return - } case <-time.After(RaftServerRemovalTime): - glog.V(0).Infof("removing old raft server: %s", peerName) - if _, err := ms.RaftRemoveServer(context.Background(), &master_pb.RaftRemoveServerRequest{ - Id: peerName, - }); err != nil { + err := ms.MasterClient.WithClient(false, func(client master_pb.SeaweedClient) error { + _, err := client.RaftRemoveServer(context.Background(), &master_pb.RaftRemoveServerRequest{ + Id: peerName, + Force: false, + }) + return err + }) + if err != nil { glog.Warningf("failed removing old raft server: %v", err) } return + case peerDone := <-ms.onPeerUpdatDoneCn: + if peerName == peerDone { + return + } } } }(string(peerAddress)) + ms.onPeerUpdatDoneCnExist = true } } |
