diff options
Diffstat (limited to 'weed/server/master_server.go')
| -rw-r--r-- | weed/server/master_server.go | 218 |
1 files changed, 152 insertions, 66 deletions
diff --git a/weed/server/master_server.go b/weed/server/master_server.go index d2edeb6cb..9bf840f08 100644 --- a/weed/server/master_server.go +++ b/weed/server/master_server.go @@ -1,7 +1,9 @@ package weed_server import ( + "context" "fmt" + "github.com/chrislusf/seaweedfs/weed/stats" "net/http" "net/http/httputil" "net/url" @@ -11,8 +13,12 @@ import ( "sync" "time" + "github.com/chrislusf/seaweedfs/weed/cluster" + "github.com/chrislusf/seaweedfs/weed/pb" + "github.com/chrislusf/raft" "github.com/gorilla/mux" + hashicorpRaft "github.com/hashicorp/raft" "google.golang.org/grpc" "github.com/chrislusf/seaweedfs/weed/glog" @@ -26,14 +32,13 @@ import ( ) const ( - SequencerType = "master.sequencer.type" - SequencerEtcdUrls = "master.sequencer.sequencer_etcd_urls" - SequencerSnowflakeId = "master.sequencer.sequencer_snowflake_id" + SequencerType = "master.sequencer.type" + SequencerSnowflakeId = "master.sequencer.sequencer_snowflake_id" + RaftServerRemovalTime = 72 * time.Minute ) type MasterOption struct { - Host string - Port int + Master pb.ServerAddress MetaFolder string VolumeSizeLimitMB uint32 VolumePreallocate bool @@ -48,6 +53,7 @@ type MasterOption struct { } type MasterServer struct { + master_pb.UnimplementedSeaweedServer option *MasterOption guard *security.Guard @@ -59,18 +65,23 @@ type MasterServer struct { boundedLeaderChan chan int + onPeerUpdatDoneCn chan string + onPeerUpdatDoneCnExist bool + // notifying clients clientChansLock sync.RWMutex - clientChans map[string]chan *master_pb.VolumeLocation + clientChans map[string]chan *master_pb.KeepConnectedResponse grpcDialOption grpc.DialOption MasterClient *wdclient.MasterClient adminLocks *AdminLocks + + Cluster *cluster.Cluster } -func NewMasterServer(r *mux.Router, option *MasterOption, peers []string) *MasterServer { +func NewMasterServer(r *mux.Router, option *MasterOption, peers map[string]pb.ServerAddress) *MasterServer { v := util.GetViper() signingKey := v.GetString("jwt.signing.key") @@ -100,12 +111,16 @@ func NewMasterServer(r *mux.Router, option *MasterOption, peers []string) *Maste option: option, preallocateSize: preallocateSize, vgCh: make(chan *topology.VolumeGrowRequest, 1<<6), - clientChans: make(map[string]chan *master_pb.VolumeLocation), + clientChans: make(map[string]chan *master_pb.KeepConnectedResponse), grpcDialOption: grpcDialOption, - MasterClient: wdclient.NewMasterClient(grpcDialOption, "master", option.Host, 0, "", peers), + MasterClient: wdclient.NewMasterClient(grpcDialOption, "", cluster.MasterType, option.Master, "", peers), adminLocks: NewAdminLocks(), + Cluster: cluster.NewCluster(), } ms.boundedLeaderChan = make(chan int, 16) + ms.onPeerUpdatDoneCn = make(chan string) + + ms.MasterClient.OnPeerUpdate = ms.OnPeerUpdate seq := ms.createSequencer(option) if nil == seq { @@ -154,18 +169,41 @@ func NewMasterServer(r *mux.Router, option *MasterOption, peers []string) *Maste } func (ms *MasterServer) SetRaftServer(raftServer *RaftServer) { - ms.Topo.RaftServer = raftServer.raftServer - ms.Topo.RaftServer.AddEventListener(raft.LeaderChangeEventType, func(e raft.Event) { - glog.V(0).Infof("leader change event: %+v => %+v", e.PrevValue(), e.Value()) - if ms.Topo.RaftServer.Leader() != "" { - glog.V(0).Infoln("[", ms.Topo.RaftServer.Name(), "]", ms.Topo.RaftServer.Leader(), "becomes leader.") - } - }) + var raftServerName string + if raftServer.raftServer != nil { + ms.Topo.RaftServer = raftServer.raftServer + ms.Topo.RaftServer.AddEventListener(raft.LeaderChangeEventType, func(e raft.Event) { + glog.V(0).Infof("leader change event: %+v => %+v", e.PrevValue(), e.Value()) + stats.MasterLeaderChangeCounter.WithLabelValues(fmt.Sprintf("%+v", e.Value())).Inc() + if ms.Topo.RaftServer.Leader() != "" { + glog.V(0).Infoln("[", ms.Topo.RaftServer.Name(), "]", ms.Topo.RaftServer.Leader(), "becomes leader.") + } + }) + raftServerName = ms.Topo.RaftServer.Name() + } else if raftServer.RaftHashicorp != nil { + ms.Topo.HashicorpRaft = raftServer.RaftHashicorp + leaderCh := raftServer.RaftHashicorp.LeaderCh() + prevLeader := ms.Topo.HashicorpRaft.Leader() + go func() { + for { + select { + case isLeader := <-leaderCh: + leader := ms.Topo.HashicorpRaft.Leader() + glog.V(0).Infof("is leader %+v change event: %+v => %+v", isLeader, prevLeader, leader) + stats.MasterLeaderChangeCounter.WithLabelValues(fmt.Sprintf("%+v", leader)).Inc() + prevLeader = leader + } + } + }() + raftServerName = ms.Topo.HashicorpRaft.String() + } if ms.Topo.IsLeader() { - glog.V(0).Infoln("[", ms.Topo.RaftServer.Name(), "]", "I am the leader!") + glog.V(0).Infoln("[", raftServerName, "]", "I am the leader!") } else { - if ms.Topo.RaftServer.Leader() != "" { + if ms.Topo.RaftServer != nil && ms.Topo.RaftServer.Leader() != "" { glog.V(0).Infoln("[", ms.Topo.RaftServer.Name(), "]", ms.Topo.RaftServer.Leader(), "is the leader.") + } else if ms.Topo.HashicorpRaft != nil && ms.Topo.HashicorpRaft.Leader() != "" { + glog.V(0).Infoln("[", ms.Topo.HashicorpRaft.String(), "]", ms.Topo.HashicorpRaft.Leader(), "is the leader.") } } } @@ -174,71 +212,70 @@ func (ms *MasterServer) proxyToLeader(f http.HandlerFunc) http.HandlerFunc { return func(w http.ResponseWriter, r *http.Request) { if ms.Topo.IsLeader() { f(w, r) - } else if ms.Topo.RaftServer != nil && ms.Topo.RaftServer.Leader() != "" { - ms.boundedLeaderChan <- 1 - defer func() { <-ms.boundedLeaderChan }() - targetUrl, err := url.Parse("http://" + ms.Topo.RaftServer.Leader()) - if err != nil { - writeJsonError(w, r, http.StatusInternalServerError, - fmt.Errorf("Leader URL http://%s Parse Error: %v", ms.Topo.RaftServer.Leader(), err)) - return - } - glog.V(4).Infoln("proxying to leader", ms.Topo.RaftServer.Leader()) - proxy := httputil.NewSingleHostReverseProxy(targetUrl) - director := proxy.Director - proxy.Director = func(req *http.Request) { - actualHost, err := security.GetActualRemoteHost(req) - if err == nil { - req.Header.Set("HTTP_X_FORWARDED_FOR", actualHost) - } - director(req) - } - proxy.Transport = util.Transport - proxy.ServeHTTP(w, r) - } else { - // handle requests locally + return + } + var raftServerLeader string + if ms.Topo.RaftServer != nil && ms.Topo.RaftServer.Leader() != "" { + raftServerLeader = ms.Topo.RaftServer.Leader() + } else if ms.Topo.HashicorpRaft != nil && ms.Topo.HashicorpRaft.Leader() != "" { + raftServerLeader = string(ms.Topo.HashicorpRaft.Leader()) + } + if raftServerLeader == "" { f(w, r) + return + } + ms.boundedLeaderChan <- 1 + defer func() { <-ms.boundedLeaderChan }() + targetUrl, err := url.Parse("http://" + raftServerLeader) + if err != nil { + writeJsonError(w, r, http.StatusInternalServerError, + fmt.Errorf("Leader URL http://%s Parse Error: %v", raftServerLeader, err)) + return } + glog.V(4).Infoln("proxying to leader", raftServerLeader) + proxy := httputil.NewSingleHostReverseProxy(targetUrl) + director := proxy.Director + proxy.Director = func(req *http.Request) { + actualHost, err := security.GetActualRemoteHost(req) + if err == nil { + req.Header.Set("HTTP_X_FORWARDED_FOR", actualHost) + } + director(req) + } + proxy.Transport = util.Transport + proxy.ServeHTTP(w, r) } } func (ms *MasterServer) startAdminScripts() { - var err error v := util.GetViper() adminScripts := v.GetString("master.maintenance.scripts") - glog.V(0).Infof("adminScripts:\n%v", adminScripts) if adminScripts == "" { return } + glog.V(0).Infof("adminScripts: %v", adminScripts) v.SetDefault("master.maintenance.sleep_minutes", 17) sleepMinutes := v.GetInt("master.maintenance.sleep_minutes") - v.SetDefault("master.filer.default", "localhost:8888") - filerHostPort := v.GetString("master.filer.default") - scriptLines := strings.Split(adminScripts, "\n") if !strings.Contains(adminScripts, "lock") { scriptLines = append(append([]string{}, "lock"), scriptLines...) scriptLines = append(scriptLines, "unlock") } - masterAddress := fmt.Sprintf("%s:%d", ms.option.Host, ms.option.Port) + masterAddress := string(ms.option.Master) var shellOptions shell.ShellOptions shellOptions.GrpcDialOption = security.LoadClientTLS(v, "grpc.master") shellOptions.Masters = &masterAddress - shellOptions.FilerHost, shellOptions.FilerPort, err = util.ParseHostPort(filerHostPort) - shellOptions.FilerAddress = filerHostPort shellOptions.Directory = "/" - if err != nil { - glog.V(0).Infof("failed to parse master.filer.default = %s : %v\n", filerHostPort, err) - return - } + emptyFilerGroup := "" + shellOptions.FilerGroup = &emptyFilerGroup - commandEnv := shell.NewCommandEnv(shellOptions) + commandEnv := shell.NewCommandEnv(&shellOptions) reg, _ := regexp.Compile(`'.*?'|".*?"|\S+`) @@ -247,9 +284,13 @@ func (ms *MasterServer) startAdminScripts() { go func() { commandEnv.MasterClient.WaitUntilConnected() - c := time.Tick(time.Duration(sleepMinutes) * time.Minute) - for range c { + for { + time.Sleep(time.Duration(sleepMinutes) * time.Minute) if ms.Topo.IsLeader() { + shellOptions.FilerAddress = ms.GetOneFiler(cluster.FilerGroup(*shellOptions.FilerGroup)) + if shellOptions.FilerAddress == "" { + continue + } for _, line := range scriptLines { for _, c := range strings.Split(line, ";") { processEachCmd(reg, c, commandEnv) @@ -287,19 +328,10 @@ func (ms *MasterServer) createSequencer(option *MasterOption) sequence.Sequencer seqType := strings.ToLower(v.GetString(SequencerType)) glog.V(1).Infof("[%s] : [%s]", SequencerType, seqType) switch strings.ToLower(seqType) { - case "etcd": - var err error - urls := v.GetString(SequencerEtcdUrls) - glog.V(0).Infof("[%s] : [%s]", SequencerEtcdUrls, urls) - seq, err = sequence.NewEtcdSequencer(urls, option.MetaFolder) - if err != nil { - glog.Error(err) - seq = nil - } case "snowflake": var err error snowflakeId := v.GetInt(SequencerSnowflakeId) - seq, err = sequence.NewSnowflakeSequencer(fmt.Sprintf("%s:%d", option.Host, option.Port), snowflakeId) + seq, err = sequence.NewSnowflakeSequencer(string(option.Master), snowflakeId) if err != nil { glog.Error(err) seq = nil @@ -309,3 +341,57 @@ func (ms *MasterServer) createSequencer(option *MasterOption) sequence.Sequencer } return seq } + +func (ms *MasterServer) OnPeerUpdate(update *master_pb.ClusterNodeUpdate, startFrom time.Time) { + if update.NodeType != cluster.MasterType || ms.Topo.HashicorpRaft == nil { + return + } + glog.V(4).Infof("OnPeerUpdate: %+v", update) + + peerAddress := pb.ServerAddress(update.Address) + peerName := string(peerAddress) + isLeader := ms.Topo.HashicorpRaft.State() == hashicorpRaft.Leader + if update.IsAdd { + if isLeader { + raftServerFound := false + for _, server := range ms.Topo.HashicorpRaft.GetConfiguration().Configuration().Servers { + if string(server.ID) == peerName { + raftServerFound = true + } + } + if !raftServerFound { + glog.V(0).Infof("adding new raft server: %s", peerName) + ms.Topo.HashicorpRaft.AddVoter( + hashicorpRaft.ServerID(peerName), + hashicorpRaft.ServerAddress(peerAddress.ToGrpcAddress()), 0, 0) + } + } + if ms.onPeerUpdatDoneCnExist { + ms.onPeerUpdatDoneCn <- peerName + } + } else if isLeader { + go func(peerName string) { + for { + select { + case <-time.After(RaftServerRemovalTime): + err := ms.MasterClient.WithClient(false, func(client master_pb.SeaweedClient) error { + _, err := client.RaftRemoveServer(context.Background(), &master_pb.RaftRemoveServerRequest{ + Id: peerName, + Force: false, + }) + return err + }) + if err != nil { + glog.Warningf("failed to removing old raft server %s: %v", peerName, err) + } + return + case peerDone := <-ms.onPeerUpdatDoneCn: + if peerName == peerDone { + return + } + } + } + }(peerName) + ms.onPeerUpdatDoneCnExist = true + } +} |
