aboutsummaryrefslogtreecommitdiff
path: root/weed/server
diff options
context:
space:
mode:
Diffstat (limited to 'weed/server')
-rw-r--r--weed/server/master_server.go44
1 files changed, 32 insertions, 12 deletions
diff --git a/weed/server/master_server.go b/weed/server/master_server.go
index 98fb6aab1..03a3f74d1 100644
--- a/weed/server/master_server.go
+++ b/weed/server/master_server.go
@@ -11,7 +11,6 @@ import (
"regexp"
"strings"
"sync"
- "sync/atomic"
"time"
"github.com/chrislusf/seaweedfs/weed/cluster"
@@ -33,9 +32,10 @@ import (
)
const (
- SequencerType = "master.sequencer.type"
- SequencerSnowflakeId = "master.sequencer.sequencer_snowflake_id"
- RaftServerRemovalTime = 72 * time.Minute
+ SequencerType = "master.sequencer.type"
+ SequencerSnowflakeId = "master.sequencer.sequencer_snowflake_id"
+ RaftServerRemovalTime = 72 * time.Minute
+ ResetRaftServerRemovalTimeMsg = "ResetRaftServerRemovalTime"
)
type MasterOption struct {
@@ -66,8 +66,8 @@ type MasterServer struct {
boundedLeaderChan chan int
- onPeerUpdateDoneCn chan string
- onPeerUpdateGoroutineCount int32
+ onPeerUpdateDoneCns map[string]*chan string
+ onPeerUpdateLock sync.RWMutex
// notifying clients
clientChansLock sync.RWMutex
@@ -119,9 +119,9 @@ func NewMasterServer(r *mux.Router, option *MasterOption, peers map[string]pb.Se
Cluster: cluster.NewCluster(),
}
ms.boundedLeaderChan = make(chan int, 16)
- ms.onPeerUpdateDoneCn = make(chan string)
ms.MasterClient.OnPeerUpdate = ms.OnPeerUpdate
+ ms.onPeerUpdateDoneCns = make(map[string]*chan string)
seq := ms.createSequencer(option)
if nil == seq {
@@ -367,16 +367,31 @@ func (ms *MasterServer) OnPeerUpdate(update *master_pb.ClusterNodeUpdate, startF
hashicorpRaft.ServerAddress(peerAddress.ToGrpcAddress()), 0, 0)
}
}
- if atomic.LoadInt32(&ms.onPeerUpdateGoroutineCount) > 0 {
- ms.onPeerUpdateDoneCn <- peerName
+ ms.onPeerUpdateLock.RLock()
+ if len(ms.onPeerUpdateDoneCns) > 0 {
+ for _, onPeerUpdateDoneCn := range ms.onPeerUpdateDoneCns {
+ *onPeerUpdateDoneCn <- peerName
+ }
}
+ ms.onPeerUpdateLock.RUnlock()
} else if isLeader {
+ if onPeerUpdateDoneCnPrev, ok := ms.onPeerUpdateDoneCns[peerName]; ok {
+ *onPeerUpdateDoneCnPrev <- ResetRaftServerRemovalTimeMsg
+ return
+ }
+ onPeerUpdateDoneCn := make(chan string)
+ ms.onPeerUpdateLock.Lock()
+ ms.onPeerUpdateDoneCns[peerName] = &onPeerUpdateDoneCn
+ ms.onPeerUpdateLock.Unlock()
+
go func(peerName string) {
raftServerRemovalTimeAfter := time.After(RaftServerRemovalTime)
raftServerPingTicker := time.NewTicker(5 * time.Minute)
- atomic.AddInt32(&ms.onPeerUpdateGoroutineCount, 1)
defer func() {
- atomic.AddInt32(&ms.onPeerUpdateGoroutineCount, -1)
+ ms.onPeerUpdateLock.Lock()
+ delete(ms.onPeerUpdateDoneCns, peerName)
+ ms.onPeerUpdateLock.Unlock()
+ close(onPeerUpdateDoneCn)
}()
for {
select {
@@ -408,11 +423,16 @@ func (ms *MasterServer) OnPeerUpdate(update *master_pb.ClusterNodeUpdate, startF
}
glog.V(0).Infof("old raft server %s removed", peerName)
return
- case peerDone := <-ms.onPeerUpdateDoneCn:
+ case peerDone := <-onPeerUpdateDoneCn:
if peerName == peerDone {
glog.V(0).Infof("raft server %s remove canceled on onPeerUpdate", peerName)
return
}
+ if peerDone == ResetRaftServerRemovalTimeMsg {
+ raftServerRemovalTimeAfter = time.After(RaftServerRemovalTime)
+ glog.V(0).Infof("rest wait %v for raft server %s activity, otherwise delete",
+ RaftServerRemovalTime, peerName)
+ }
}
}
}(peerName)