diff options
| author | Konstantin Lebedev <9497591+kmlebedev@users.noreply.github.com> | 2022-04-06 21:17:04 +0500 |
|---|---|---|
| committer | Konstantin Lebedev <9497591+kmlebedev@users.noreply.github.com> | 2022-04-06 21:17:04 +0500 |
| commit | 14a2cc83bf35c2c30bb2306cca128ca8a1eb8286 (patch) | |
| tree | 171c09b2705b3899a3f7dbdf557524f3bc5d0985 /weed/server/master_server.go | |
| parent | 357aa818fe6b8165c2af7bbccbbb7ffa1cd22f3c (diff) | |
| download | seaweedfs-14a2cc83bf35c2c30bb2306cca128ca8a1eb8286.tar.xz seaweedfs-14a2cc83bf35c2c30bb2306cca128ca8a1eb8286.zip | |
raft update peers via OnPeerUpdate
Diffstat (limited to 'weed/server/master_server.go')
| -rw-r--r-- | weed/server/master_server.go | 46 |
1 files changed, 44 insertions, 2 deletions
diff --git a/weed/server/master_server.go b/weed/server/master_server.go index 37e7f245c..2c626b511 100644 --- a/weed/server/master_server.go +++ b/weed/server/master_server.go @@ -1,6 +1,7 @@ package weed_server import ( + "context" "fmt" "github.com/chrislusf/seaweedfs/weed/stats" "net/http" @@ -17,6 +18,7 @@ import ( "github.com/chrislusf/raft" "github.com/gorilla/mux" + hashicorpRaft "github.com/hashicorp/raft" "google.golang.org/grpc" "github.com/chrislusf/seaweedfs/weed/glog" @@ -30,8 +32,9 @@ import ( ) const ( - SequencerType = "master.sequencer.type" - SequencerSnowflakeId = "master.sequencer.sequencer_snowflake_id" + SequencerType = "master.sequencer.type" + SequencerSnowflakeId = "master.sequencer.sequencer_snowflake_id" + RaftServerRemovalTime = 72 * time.Hour ) type MasterOption struct { @@ -61,6 +64,7 @@ type MasterServer struct { vgCh chan *topology.VolumeGrowRequest boundedLeaderChan chan int + onPeerUpdatDoneCn chan string // notifying clients clientChansLock sync.RWMutex @@ -112,6 +116,8 @@ func NewMasterServer(r *mux.Router, option *MasterOption, peers map[string]pb.Se Cluster: cluster.NewCluster(), } ms.boundedLeaderChan = make(chan int, 16) + ms.onPeerUpdatDoneCn = make(chan string) + ms.MasterClient.OnPeerUpdate = ms.OnPeerUpdate seq := ms.createSequencer(option) if nil == seq { @@ -323,3 +329,39 @@ func (ms *MasterServer) createSequencer(option *MasterOption) sequence.Sequencer } return seq } + +func (ms *MasterServer) OnPeerUpdate(update *master_pb.ClusterNodeUpdate) { + glog.V(0).Infof("OnPeerUpdate: %+v", update) + if update.NodeType != cluster.MasterType || ms.Topo.HashicorpRaft == nil { + return + } + peerAddress := pb.ServerAddress(update.Address) + if update.IsAdd { + if ms.Topo.HashicorpRaft.State() == hashicorpRaft.Leader { + glog.V(0).Infof("adding new raft server: %s", peerAddress.String()) + ms.Topo.HashicorpRaft.AddVoter( + hashicorpRaft.ServerID(peerAddress.String()), + hashicorpRaft.ServerAddress(peerAddress.ToGrpcAddress()), 0, 0) + } + ms.onPeerUpdatDoneCn <- string(peerAddress) + } else if ms.Topo.HashicorpRaft.State() == hashicorpRaft.Leader { + go func(peerName string) { + for { + select { + case peerDone := <-ms.onPeerUpdatDoneCn: + if peerName == peerDone { + return + } + case <-time.After(RaftServerRemovalTime): + glog.V(0).Infof("removing old raft server: %s", peerName) + if _, err := ms.RaftRemoveServer(context.Background(), &master_pb.RaftRemoveServerRequest{ + Id: peerName, + }); err != nil { + glog.Warningf("failed removing old raft server: %v", err) + } + return + } + } + }(string(peerAddress)) + } +} |
