aboutsummaryrefslogtreecommitdiff
path: root/weed/server/master_server.go
diff options
context:
space:
mode:
Diffstat (limited to 'weed/server/master_server.go')
-rw-r--r--weed/server/master_server.go218
1 files changed, 152 insertions, 66 deletions
diff --git a/weed/server/master_server.go b/weed/server/master_server.go
index d2edeb6cb..9bf840f08 100644
--- a/weed/server/master_server.go
+++ b/weed/server/master_server.go
@@ -1,7 +1,9 @@
package weed_server
import (
+ "context"
"fmt"
+ "github.com/chrislusf/seaweedfs/weed/stats"
"net/http"
"net/http/httputil"
"net/url"
@@ -11,8 +13,12 @@ import (
"sync"
"time"
+ "github.com/chrislusf/seaweedfs/weed/cluster"
+ "github.com/chrislusf/seaweedfs/weed/pb"
+
"github.com/chrislusf/raft"
"github.com/gorilla/mux"
+ hashicorpRaft "github.com/hashicorp/raft"
"google.golang.org/grpc"
"github.com/chrislusf/seaweedfs/weed/glog"
@@ -26,14 +32,13 @@ import (
)
const (
- SequencerType = "master.sequencer.type"
- SequencerEtcdUrls = "master.sequencer.sequencer_etcd_urls"
- SequencerSnowflakeId = "master.sequencer.sequencer_snowflake_id"
+ SequencerType = "master.sequencer.type"
+ SequencerSnowflakeId = "master.sequencer.sequencer_snowflake_id"
+ RaftServerRemovalTime = 72 * time.Minute
)
type MasterOption struct {
- Host string
- Port int
+ Master pb.ServerAddress
MetaFolder string
VolumeSizeLimitMB uint32
VolumePreallocate bool
@@ -48,6 +53,7 @@ type MasterOption struct {
}
type MasterServer struct {
+ master_pb.UnimplementedSeaweedServer
option *MasterOption
guard *security.Guard
@@ -59,18 +65,23 @@ type MasterServer struct {
boundedLeaderChan chan int
+ onPeerUpdatDoneCn chan string
+ onPeerUpdatDoneCnExist bool
+
// notifying clients
clientChansLock sync.RWMutex
- clientChans map[string]chan *master_pb.VolumeLocation
+ clientChans map[string]chan *master_pb.KeepConnectedResponse
grpcDialOption grpc.DialOption
MasterClient *wdclient.MasterClient
adminLocks *AdminLocks
+
+ Cluster *cluster.Cluster
}
-func NewMasterServer(r *mux.Router, option *MasterOption, peers []string) *MasterServer {
+func NewMasterServer(r *mux.Router, option *MasterOption, peers map[string]pb.ServerAddress) *MasterServer {
v := util.GetViper()
signingKey := v.GetString("jwt.signing.key")
@@ -100,12 +111,16 @@ func NewMasterServer(r *mux.Router, option *MasterOption, peers []string) *Maste
option: option,
preallocateSize: preallocateSize,
vgCh: make(chan *topology.VolumeGrowRequest, 1<<6),
- clientChans: make(map[string]chan *master_pb.VolumeLocation),
+ clientChans: make(map[string]chan *master_pb.KeepConnectedResponse),
grpcDialOption: grpcDialOption,
- MasterClient: wdclient.NewMasterClient(grpcDialOption, "master", option.Host, 0, "", peers),
+ MasterClient: wdclient.NewMasterClient(grpcDialOption, "", cluster.MasterType, option.Master, "", peers),
adminLocks: NewAdminLocks(),
+ Cluster: cluster.NewCluster(),
}
ms.boundedLeaderChan = make(chan int, 16)
+ ms.onPeerUpdatDoneCn = make(chan string)
+
+ ms.MasterClient.OnPeerUpdate = ms.OnPeerUpdate
seq := ms.createSequencer(option)
if nil == seq {
@@ -154,18 +169,41 @@ func NewMasterServer(r *mux.Router, option *MasterOption, peers []string) *Maste
}
func (ms *MasterServer) SetRaftServer(raftServer *RaftServer) {
- ms.Topo.RaftServer = raftServer.raftServer
- ms.Topo.RaftServer.AddEventListener(raft.LeaderChangeEventType, func(e raft.Event) {
- glog.V(0).Infof("leader change event: %+v => %+v", e.PrevValue(), e.Value())
- if ms.Topo.RaftServer.Leader() != "" {
- glog.V(0).Infoln("[", ms.Topo.RaftServer.Name(), "]", ms.Topo.RaftServer.Leader(), "becomes leader.")
- }
- })
+ var raftServerName string
+ if raftServer.raftServer != nil {
+ ms.Topo.RaftServer = raftServer.raftServer
+ ms.Topo.RaftServer.AddEventListener(raft.LeaderChangeEventType, func(e raft.Event) {
+ glog.V(0).Infof("leader change event: %+v => %+v", e.PrevValue(), e.Value())
+ stats.MasterLeaderChangeCounter.WithLabelValues(fmt.Sprintf("%+v", e.Value())).Inc()
+ if ms.Topo.RaftServer.Leader() != "" {
+ glog.V(0).Infoln("[", ms.Topo.RaftServer.Name(), "]", ms.Topo.RaftServer.Leader(), "becomes leader.")
+ }
+ })
+ raftServerName = ms.Topo.RaftServer.Name()
+ } else if raftServer.RaftHashicorp != nil {
+ ms.Topo.HashicorpRaft = raftServer.RaftHashicorp
+ leaderCh := raftServer.RaftHashicorp.LeaderCh()
+ prevLeader := ms.Topo.HashicorpRaft.Leader()
+ go func() {
+ for {
+ select {
+ case isLeader := <-leaderCh:
+ leader := ms.Topo.HashicorpRaft.Leader()
+ glog.V(0).Infof("is leader %+v change event: %+v => %+v", isLeader, prevLeader, leader)
+ stats.MasterLeaderChangeCounter.WithLabelValues(fmt.Sprintf("%+v", leader)).Inc()
+ prevLeader = leader
+ }
+ }
+ }()
+ raftServerName = ms.Topo.HashicorpRaft.String()
+ }
if ms.Topo.IsLeader() {
- glog.V(0).Infoln("[", ms.Topo.RaftServer.Name(), "]", "I am the leader!")
+ glog.V(0).Infoln("[", raftServerName, "]", "I am the leader!")
} else {
- if ms.Topo.RaftServer.Leader() != "" {
+ if ms.Topo.RaftServer != nil && ms.Topo.RaftServer.Leader() != "" {
glog.V(0).Infoln("[", ms.Topo.RaftServer.Name(), "]", ms.Topo.RaftServer.Leader(), "is the leader.")
+ } else if ms.Topo.HashicorpRaft != nil && ms.Topo.HashicorpRaft.Leader() != "" {
+ glog.V(0).Infoln("[", ms.Topo.HashicorpRaft.String(), "]", ms.Topo.HashicorpRaft.Leader(), "is the leader.")
}
}
}
@@ -174,71 +212,70 @@ func (ms *MasterServer) proxyToLeader(f http.HandlerFunc) http.HandlerFunc {
return func(w http.ResponseWriter, r *http.Request) {
if ms.Topo.IsLeader() {
f(w, r)
- } else if ms.Topo.RaftServer != nil && ms.Topo.RaftServer.Leader() != "" {
- ms.boundedLeaderChan <- 1
- defer func() { <-ms.boundedLeaderChan }()
- targetUrl, err := url.Parse("http://" + ms.Topo.RaftServer.Leader())
- if err != nil {
- writeJsonError(w, r, http.StatusInternalServerError,
- fmt.Errorf("Leader URL http://%s Parse Error: %v", ms.Topo.RaftServer.Leader(), err))
- return
- }
- glog.V(4).Infoln("proxying to leader", ms.Topo.RaftServer.Leader())
- proxy := httputil.NewSingleHostReverseProxy(targetUrl)
- director := proxy.Director
- proxy.Director = func(req *http.Request) {
- actualHost, err := security.GetActualRemoteHost(req)
- if err == nil {
- req.Header.Set("HTTP_X_FORWARDED_FOR", actualHost)
- }
- director(req)
- }
- proxy.Transport = util.Transport
- proxy.ServeHTTP(w, r)
- } else {
- // handle requests locally
+ return
+ }
+ var raftServerLeader string
+ if ms.Topo.RaftServer != nil && ms.Topo.RaftServer.Leader() != "" {
+ raftServerLeader = ms.Topo.RaftServer.Leader()
+ } else if ms.Topo.HashicorpRaft != nil && ms.Topo.HashicorpRaft.Leader() != "" {
+ raftServerLeader = string(ms.Topo.HashicorpRaft.Leader())
+ }
+ if raftServerLeader == "" {
f(w, r)
+ return
+ }
+ ms.boundedLeaderChan <- 1
+ defer func() { <-ms.boundedLeaderChan }()
+ targetUrl, err := url.Parse("http://" + raftServerLeader)
+ if err != nil {
+ writeJsonError(w, r, http.StatusInternalServerError,
+ fmt.Errorf("Leader URL http://%s Parse Error: %v", raftServerLeader, err))
+ return
}
+ glog.V(4).Infoln("proxying to leader", raftServerLeader)
+ proxy := httputil.NewSingleHostReverseProxy(targetUrl)
+ director := proxy.Director
+ proxy.Director = func(req *http.Request) {
+ actualHost, err := security.GetActualRemoteHost(req)
+ if err == nil {
+ req.Header.Set("HTTP_X_FORWARDED_FOR", actualHost)
+ }
+ director(req)
+ }
+ proxy.Transport = util.Transport
+ proxy.ServeHTTP(w, r)
}
}
func (ms *MasterServer) startAdminScripts() {
- var err error
v := util.GetViper()
adminScripts := v.GetString("master.maintenance.scripts")
- glog.V(0).Infof("adminScripts:\n%v", adminScripts)
if adminScripts == "" {
return
}
+ glog.V(0).Infof("adminScripts: %v", adminScripts)
v.SetDefault("master.maintenance.sleep_minutes", 17)
sleepMinutes := v.GetInt("master.maintenance.sleep_minutes")
- v.SetDefault("master.filer.default", "localhost:8888")
- filerHostPort := v.GetString("master.filer.default")
-
scriptLines := strings.Split(adminScripts, "\n")
if !strings.Contains(adminScripts, "lock") {
scriptLines = append(append([]string{}, "lock"), scriptLines...)
scriptLines = append(scriptLines, "unlock")
}
- masterAddress := fmt.Sprintf("%s:%d", ms.option.Host, ms.option.Port)
+ masterAddress := string(ms.option.Master)
var shellOptions shell.ShellOptions
shellOptions.GrpcDialOption = security.LoadClientTLS(v, "grpc.master")
shellOptions.Masters = &masterAddress
- shellOptions.FilerHost, shellOptions.FilerPort, err = util.ParseHostPort(filerHostPort)
- shellOptions.FilerAddress = filerHostPort
shellOptions.Directory = "/"
- if err != nil {
- glog.V(0).Infof("failed to parse master.filer.default = %s : %v\n", filerHostPort, err)
- return
- }
+ emptyFilerGroup := ""
+ shellOptions.FilerGroup = &emptyFilerGroup
- commandEnv := shell.NewCommandEnv(shellOptions)
+ commandEnv := shell.NewCommandEnv(&shellOptions)
reg, _ := regexp.Compile(`'.*?'|".*?"|\S+`)
@@ -247,9 +284,13 @@ func (ms *MasterServer) startAdminScripts() {
go func() {
commandEnv.MasterClient.WaitUntilConnected()
- c := time.Tick(time.Duration(sleepMinutes) * time.Minute)
- for range c {
+ for {
+ time.Sleep(time.Duration(sleepMinutes) * time.Minute)
if ms.Topo.IsLeader() {
+ shellOptions.FilerAddress = ms.GetOneFiler(cluster.FilerGroup(*shellOptions.FilerGroup))
+ if shellOptions.FilerAddress == "" {
+ continue
+ }
for _, line := range scriptLines {
for _, c := range strings.Split(line, ";") {
processEachCmd(reg, c, commandEnv)
@@ -287,19 +328,10 @@ func (ms *MasterServer) createSequencer(option *MasterOption) sequence.Sequencer
seqType := strings.ToLower(v.GetString(SequencerType))
glog.V(1).Infof("[%s] : [%s]", SequencerType, seqType)
switch strings.ToLower(seqType) {
- case "etcd":
- var err error
- urls := v.GetString(SequencerEtcdUrls)
- glog.V(0).Infof("[%s] : [%s]", SequencerEtcdUrls, urls)
- seq, err = sequence.NewEtcdSequencer(urls, option.MetaFolder)
- if err != nil {
- glog.Error(err)
- seq = nil
- }
case "snowflake":
var err error
snowflakeId := v.GetInt(SequencerSnowflakeId)
- seq, err = sequence.NewSnowflakeSequencer(fmt.Sprintf("%s:%d", option.Host, option.Port), snowflakeId)
+ seq, err = sequence.NewSnowflakeSequencer(string(option.Master), snowflakeId)
if err != nil {
glog.Error(err)
seq = nil
@@ -309,3 +341,57 @@ func (ms *MasterServer) createSequencer(option *MasterOption) sequence.Sequencer
}
return seq
}
+
+func (ms *MasterServer) OnPeerUpdate(update *master_pb.ClusterNodeUpdate, startFrom time.Time) {
+ if update.NodeType != cluster.MasterType || ms.Topo.HashicorpRaft == nil {
+ return
+ }
+ glog.V(4).Infof("OnPeerUpdate: %+v", update)
+
+ peerAddress := pb.ServerAddress(update.Address)
+ peerName := string(peerAddress)
+ isLeader := ms.Topo.HashicorpRaft.State() == hashicorpRaft.Leader
+ if update.IsAdd {
+ if isLeader {
+ raftServerFound := false
+ for _, server := range ms.Topo.HashicorpRaft.GetConfiguration().Configuration().Servers {
+ if string(server.ID) == peerName {
+ raftServerFound = true
+ }
+ }
+ if !raftServerFound {
+ glog.V(0).Infof("adding new raft server: %s", peerName)
+ ms.Topo.HashicorpRaft.AddVoter(
+ hashicorpRaft.ServerID(peerName),
+ hashicorpRaft.ServerAddress(peerAddress.ToGrpcAddress()), 0, 0)
+ }
+ }
+ if ms.onPeerUpdatDoneCnExist {
+ ms.onPeerUpdatDoneCn <- peerName
+ }
+ } else if isLeader {
+ go func(peerName string) {
+ for {
+ select {
+ case <-time.After(RaftServerRemovalTime):
+ err := ms.MasterClient.WithClient(false, func(client master_pb.SeaweedClient) error {
+ _, err := client.RaftRemoveServer(context.Background(), &master_pb.RaftRemoveServerRequest{
+ Id: peerName,
+ Force: false,
+ })
+ return err
+ })
+ if err != nil {
+ glog.Warningf("failed to removing old raft server %s: %v", peerName, err)
+ }
+ return
+ case peerDone := <-ms.onPeerUpdatDoneCn:
+ if peerName == peerDone {
+ return
+ }
+ }
+ }
+ }(peerName)
+ ms.onPeerUpdatDoneCnExist = true
+ }
+}