aboutsummaryrefslogtreecommitdiff
path: root/weed/server
diff options
context:
space:
mode:
Diffstat (limited to 'weed/server')
-rw-r--r--weed/server/master_grpc_server_raft.go18
-rw-r--r--weed/server/master_server.go49
2 files changed, 52 insertions, 15 deletions
diff --git a/weed/server/master_grpc_server_raft.go b/weed/server/master_grpc_server_raft.go
index 71ab6a1da..37491b3df 100644
--- a/weed/server/master_grpc_server_raft.go
+++ b/weed/server/master_grpc_server_raft.go
@@ -2,6 +2,8 @@ package weed_server
import (
"context"
+ "fmt"
+ "github.com/chrislusf/seaweedfs/weed/cluster"
"github.com/chrislusf/seaweedfs/weed/pb/master_pb"
"github.com/hashicorp/raft"
)
@@ -23,6 +25,9 @@ func (ms *MasterServer) RaftListClusterServers(ctx context.Context, req *master_
func (ms *MasterServer) RaftAddServer(ctx context.Context, req *master_pb.RaftAddServerRequest) (*master_pb.RaftAddServerResponse, error) {
resp := &master_pb.RaftAddServerResponse{}
+ if ms.Topo.HashicorpRaft.State() != raft.Leader {
+ return nil, fmt.Errorf("raft add server %s failed: %s is no current leader", req.Id, ms.Topo.HashicorpRaft.String())
+ }
var idxFuture raft.IndexFuture
if req.Voter {
@@ -40,6 +45,19 @@ func (ms *MasterServer) RaftAddServer(ctx context.Context, req *master_pb.RaftAd
func (ms *MasterServer) RaftRemoveServer(ctx context.Context, req *master_pb.RaftRemoveServerRequest) (*master_pb.RaftRemoveServerResponse, error) {
resp := &master_pb.RaftRemoveServerResponse{}
+ if ms.Topo.HashicorpRaft.State() != raft.Leader {
+ return nil, fmt.Errorf("raft remove server %s failed: %s is no current leader", req.Id, ms.Topo.HashicorpRaft.String())
+ }
+
+ if !req.Force {
+ ms.clientChansLock.RLock()
+ _, ok := ms.clientChans[fmt.Sprintf("%s@%s", cluster.MasterType, req.Id)]
+ ms.clientChansLock.RUnlock()
+ if ok {
+ return resp, fmt.Errorf("raft remove server %s failed: client connection to master exists", req.Id)
+ }
+ }
+
idxFuture := ms.Topo.HashicorpRaft.RemoveServer(raft.ServerID(req.Id), 0, 0)
if err := idxFuture.Error(); err != nil {
return nil, err
diff --git a/weed/server/master_server.go b/weed/server/master_server.go
index 2c626b511..0aba7b957 100644
--- a/weed/server/master_server.go
+++ b/weed/server/master_server.go
@@ -34,7 +34,7 @@ import (
const (
SequencerType = "master.sequencer.type"
SequencerSnowflakeId = "master.sequencer.sequencer_snowflake_id"
- RaftServerRemovalTime = 72 * time.Hour
+ RaftServerRemovalTime = 72 * time.Minute
)
type MasterOption struct {
@@ -64,7 +64,9 @@ type MasterServer struct {
vgCh chan *topology.VolumeGrowRequest
boundedLeaderChan chan int
- onPeerUpdatDoneCn chan string
+
+ onPeerUpdatDoneCn chan string
+ onPeerUpdatDoneCnExist bool
// notifying clients
clientChansLock sync.RWMutex
@@ -117,6 +119,7 @@ func NewMasterServer(r *mux.Router, option *MasterOption, peers map[string]pb.Se
}
ms.boundedLeaderChan = make(chan int, 16)
ms.onPeerUpdatDoneCn = make(chan string)
+
ms.MasterClient.OnPeerUpdate = ms.OnPeerUpdate
seq := ms.createSequencer(option)
@@ -336,32 +339,48 @@ func (ms *MasterServer) OnPeerUpdate(update *master_pb.ClusterNodeUpdate) {
return
}
peerAddress := pb.ServerAddress(update.Address)
+ peerName := string(peerAddress)
if update.IsAdd {
if ms.Topo.HashicorpRaft.State() == hashicorpRaft.Leader {
- glog.V(0).Infof("adding new raft server: %s", peerAddress.String())
- ms.Topo.HashicorpRaft.AddVoter(
- hashicorpRaft.ServerID(peerAddress.String()),
- hashicorpRaft.ServerAddress(peerAddress.ToGrpcAddress()), 0, 0)
+ raftServerFound := false
+ for _, server := range ms.Topo.HashicorpRaft.GetConfiguration().Configuration().Servers {
+ if string(server.ID) == peerName {
+ raftServerFound = true
+ }
+ }
+ if !raftServerFound {
+ glog.V(0).Infof("adding new raft server: %s", peerAddress.String())
+ ms.Topo.HashicorpRaft.AddVoter(
+ hashicorpRaft.ServerID(peerName),
+ hashicorpRaft.ServerAddress(peerAddress.ToGrpcAddress()), 0, 0)
+ }
+ }
+ if ms.onPeerUpdatDoneCnExist {
+ ms.onPeerUpdatDoneCn <- string(peerAddress)
}
- ms.onPeerUpdatDoneCn <- string(peerAddress)
} else if ms.Topo.HashicorpRaft.State() == hashicorpRaft.Leader {
go func(peerName string) {
for {
select {
- case peerDone := <-ms.onPeerUpdatDoneCn:
- if peerName == peerDone {
- return
- }
case <-time.After(RaftServerRemovalTime):
- glog.V(0).Infof("removing old raft server: %s", peerName)
- if _, err := ms.RaftRemoveServer(context.Background(), &master_pb.RaftRemoveServerRequest{
- Id: peerName,
- }); err != nil {
+ err := ms.MasterClient.WithClient(false, func(client master_pb.SeaweedClient) error {
+ _, err := client.RaftRemoveServer(context.Background(), &master_pb.RaftRemoveServerRequest{
+ Id: peerName,
+ Force: false,
+ })
+ return err
+ })
+ if err != nil {
glog.Warningf("failed removing old raft server: %v", err)
}
return
+ case peerDone := <-ms.onPeerUpdatDoneCn:
+ if peerName == peerDone {
+ return
+ }
}
}
}(string(peerAddress))
+ ms.onPeerUpdatDoneCnExist = true
}
}