aboutsummaryrefslogtreecommitdiff
path: root/weed
diff options
context:
space:
mode:
authorKonstantin Lebedev <9497591+kmlebedev@users.noreply.github.com>2022-04-06 21:17:04 +0500
committerKonstantin Lebedev <9497591+kmlebedev@users.noreply.github.com>2022-04-06 21:17:04 +0500
commit14a2cc83bf35c2c30bb2306cca128ca8a1eb8286 (patch)
tree171c09b2705b3899a3f7dbdf557524f3bc5d0985 /weed
parent357aa818fe6b8165c2af7bbccbbb7ffa1cd22f3c (diff)
downloadseaweedfs-14a2cc83bf35c2c30bb2306cca128ca8a1eb8286.tar.xz
seaweedfs-14a2cc83bf35c2c30bb2306cca128ca8a1eb8286.zip
raft update peers via OnPeerUpdate
Diffstat (limited to 'weed')
-rw-r--r--weed/cluster/cluster.go18
-rw-r--r--weed/server/master_server.go46
-rw-r--r--weed/server/raft_hashicorp.go9
3 files changed, 68 insertions, 5 deletions
diff --git a/weed/cluster/cluster.go b/weed/cluster/cluster.go
index e8752f4d9..3cff13724 100644
--- a/weed/cluster/cluster.go
+++ b/weed/cluster/cluster.go
@@ -81,6 +81,15 @@ func (cluster *Cluster) AddClusterNode(nodeType string, address pb.ServerAddress
},
}
case MasterType:
+ return []*master_pb.KeepConnectedResponse{
+ {
+ ClusterNodeUpdate: &master_pb.ClusterNodeUpdate{
+ NodeType: nodeType,
+ Address: string(address),
+ IsAdd: true,
+ },
+ },
+ }
}
return nil
}
@@ -120,6 +129,15 @@ func (cluster *Cluster) RemoveClusterNode(nodeType string, address pb.ServerAddr
}
}
case MasterType:
+ return []*master_pb.KeepConnectedResponse{
+ {
+ ClusterNodeUpdate: &master_pb.ClusterNodeUpdate{
+ NodeType: nodeType,
+ Address: string(address),
+ IsAdd: false,
+ },
+ },
+ }
}
return nil
}
diff --git a/weed/server/master_server.go b/weed/server/master_server.go
index 37e7f245c..2c626b511 100644
--- a/weed/server/master_server.go
+++ b/weed/server/master_server.go
@@ -1,6 +1,7 @@
package weed_server
import (
+ "context"
"fmt"
"github.com/chrislusf/seaweedfs/weed/stats"
"net/http"
@@ -17,6 +18,7 @@ import (
"github.com/chrislusf/raft"
"github.com/gorilla/mux"
+ hashicorpRaft "github.com/hashicorp/raft"
"google.golang.org/grpc"
"github.com/chrislusf/seaweedfs/weed/glog"
@@ -30,8 +32,9 @@ import (
)
const (
- SequencerType = "master.sequencer.type"
- SequencerSnowflakeId = "master.sequencer.sequencer_snowflake_id"
+ SequencerType = "master.sequencer.type"
+ SequencerSnowflakeId = "master.sequencer.sequencer_snowflake_id"
+ RaftServerRemovalTime = 72 * time.Hour
)
type MasterOption struct {
@@ -61,6 +64,7 @@ type MasterServer struct {
vgCh chan *topology.VolumeGrowRequest
boundedLeaderChan chan int
+ onPeerUpdatDoneCn chan string
// notifying clients
clientChansLock sync.RWMutex
@@ -112,6 +116,8 @@ func NewMasterServer(r *mux.Router, option *MasterOption, peers map[string]pb.Se
Cluster: cluster.NewCluster(),
}
ms.boundedLeaderChan = make(chan int, 16)
+ ms.onPeerUpdatDoneCn = make(chan string)
+ ms.MasterClient.OnPeerUpdate = ms.OnPeerUpdate
seq := ms.createSequencer(option)
if nil == seq {
@@ -323,3 +329,39 @@ func (ms *MasterServer) createSequencer(option *MasterOption) sequence.Sequencer
}
return seq
}
+
+func (ms *MasterServer) OnPeerUpdate(update *master_pb.ClusterNodeUpdate) {
+ glog.V(0).Infof("OnPeerUpdate: %+v", update)
+ if update.NodeType != cluster.MasterType || ms.Topo.HashicorpRaft == nil {
+ return
+ }
+ peerAddress := pb.ServerAddress(update.Address)
+ if update.IsAdd {
+ if ms.Topo.HashicorpRaft.State() == hashicorpRaft.Leader {
+ glog.V(0).Infof("adding new raft server: %s", peerAddress.String())
+ ms.Topo.HashicorpRaft.AddVoter(
+ hashicorpRaft.ServerID(peerAddress.String()),
+ hashicorpRaft.ServerAddress(peerAddress.ToGrpcAddress()), 0, 0)
+ }
+ ms.onPeerUpdatDoneCn <- string(peerAddress)
+ } else if ms.Topo.HashicorpRaft.State() == hashicorpRaft.Leader {
+ go func(peerName string) {
+ for {
+ select {
+ case peerDone := <-ms.onPeerUpdatDoneCn:
+ if peerName == peerDone {
+ return
+ }
+ case <-time.After(RaftServerRemovalTime):
+ glog.V(0).Infof("removing old raft server: %s", peerName)
+ if _, err := ms.RaftRemoveServer(context.Background(), &master_pb.RaftRemoveServerRequest{
+ Id: peerName,
+ }); err != nil {
+ glog.Warningf("failed removing old raft server: %v", err)
+ }
+ return
+ }
+ }
+ }(string(peerAddress))
+ }
+}
diff --git a/weed/server/raft_hashicorp.go b/weed/server/raft_hashicorp.go
index 3ce3ebcda..885ffdcc7 100644
--- a/weed/server/raft_hashicorp.go
+++ b/weed/server/raft_hashicorp.go
@@ -21,8 +21,9 @@ import (
)
const (
- ldbFile = "logs.dat"
- sdbFile = "stable.dat"
+ ldbFile = "logs.dat"
+ sdbFile = "stable.dat"
+ updatePeersTimeout = 15 * time.Minute
)
func getPeerIdx(self pb.ServerAddress, mapPeers map[string]pb.ServerAddress) int {
@@ -84,7 +85,9 @@ func (s *RaftServer) UpdatePeers() {
s.RaftHashicorp.RemoveServer(raft.ServerID(peerLeader), 0, 0)
}
}
- break
+ return
+ case <-time.After(updatePeersTimeout):
+ return
}
}
}