diff options
| author | a <eddy@gfxlabs.io> | 2022-04-12 14:42:29 -0700 |
|---|---|---|
| committer | a <eddy@gfxlabs.io> | 2022-04-12 14:42:29 -0700 |
| commit | 846858fb436cc061c40c4f2565ed3682e3758596 (patch) | |
| tree | 28984dd19b8cdb4ddb41a4a4283cb4b6644b37a4 /weed/server | |
| parent | 41d396edc4a8cdd586e9e58cab7b725c070ca685 (diff) | |
| parent | 42fea7e7d6ce0aca8474c9beba27d33f15bd4f49 (diff) | |
| download | seaweedfs-846858fb436cc061c40c4f2565ed3682e3758596.tar.xz seaweedfs-846858fb436cc061c40c4f2565ed3682e3758596.zip | |
merge master
Diffstat (limited to 'weed/server')
| -rw-r--r-- | weed/server/filer_ui/breadcrumb.go | 6 | ||||
| -rw-r--r-- | weed/server/filer_ui/filer.html | 106 | ||||
| -rw-r--r-- | weed/server/master_grpc_server.go | 3 | ||||
| -rw-r--r-- | weed/server/master_grpc_server_raft.go | 66 | ||||
| -rw-r--r-- | weed/server/master_server.go | 162 | ||||
| -rw-r--r-- | weed/server/master_server_handlers_ui.go | 52 | ||||
| -rw-r--r-- | weed/server/master_ui/masterNewRaft.html | 121 | ||||
| -rw-r--r-- | weed/server/master_ui/templates.go | 4 | ||||
| -rw-r--r-- | weed/server/raft_hashicorp.go | 183 | ||||
| -rw-r--r-- | weed/server/raft_server.go | 64 | ||||
| -rw-r--r-- | weed/server/raft_server_handlers.go | 8 |
11 files changed, 667 insertions, 108 deletions
diff --git a/weed/server/filer_ui/breadcrumb.go b/weed/server/filer_ui/breadcrumb.go index 5016117a8..3201ff76c 100644 --- a/weed/server/filer_ui/breadcrumb.go +++ b/weed/server/filer_ui/breadcrumb.go @@ -15,8 +15,12 @@ func ToBreadcrumb(fullpath string) (crumbs []Breadcrumb) { parts := strings.Split(fullpath, "/") for i := 0; i < len(parts); i++ { + name := parts[i] + if name == "" { + name = "/" + } crumb := Breadcrumb{ - Name: parts[i] + " /", + Name: name, Link: "/" + util.Join(parts[0:i+1]...), } if !strings.HasSuffix(crumb.Link, "/") { diff --git a/weed/server/filer_ui/filer.html b/weed/server/filer_ui/filer.html index 593d115f0..785f82887 100644 --- a/weed/server/filer_ui/filer.html +++ b/weed/server/filer_ui/filer.html @@ -11,6 +11,7 @@ #drop-area { border: 1px transparent; + margin-top: 5px; } #drop-area.highlight { @@ -46,37 +47,25 @@ vertical-align: bottom; } - .danger { - color: red; - background: #fff; - border: 1px solid #fff; - border-radius: 2px; + .table-hover > tbody > tr:hover > * > div.operations { + display: block; } - .info { - background: #fff; - border: 1px solid #fff; - border-radius: 2px; + .table > tbody > tr { + height: 39px; } - .footer { - position: absolute; - bottom: 10px; - right: 10%; - min-width: 30%; - } - - .progress-table { - width: 100%; - } - - .progress-table-file-name { - text-align: right; + div.operations { + display: none; } - .progress-table-percent { - width: 60px; - text-align: right; + .footer { + position: absolute; + bottom: 0px; + right: 5%; + min-width: 25%; + border-left: 1px solid #ccc; + border-right: 1px solid #ccc; } .add-files { @@ -98,13 +87,21 @@ </div> <div class="row"> <div> + <div class="btn-group btn-group-sm pull-right" role="group" style="margin-top:3px;"> + <label class="btn btn-default" onclick="handleCreateDir()"> + <span class="glyphicon glyphicon-plus" aria-hidden="true"></span> New Folder + </label> + <label class="btn btn-default" for="fileElem"> + <span class="glyphicon glyphicon-cloud-upload" aria-hidden="true"></span> Upload + </label> + </div> + <ol class="breadcrumb"> {{ range $entry := .Breadcrumbs }} - <a href="{{ printpath $entry.Link }}"> + <li><a href="{{ printpath $entry.Link }}"> {{ $entry.Name }} - </a> + </li></a> {{ end }} - <label class="button" for="fileElem">Upload</label> - <label class="button" onclick="handleCreateDir()">New Folder</label> + </ol> </div> </div> @@ -112,13 +109,18 @@ <form class="upload-form"> <input type="file" id="fileElem" multiple onchange="handleFiles(this.files)"> - <table width="100%"> + {{if .EmptyFolder}} + <div class="row add-files"> + + + </div> + {{else}} + <table width="100%" class="table table-hover"> {{$path := .Path }} {{ range $entry_index, $entry := .Entries }} <tr> <td> {{if $entry.IsDirectory}} - <img src="/seaweedfsstatic/images/folder.gif" width="20" height="16"> + <span class="glyphicon glyphicon-folder-open" aria-hidden="true"></span> <a href="{{ printpath $path "/" $entry.Name "/"}}" > {{ $entry.Name }} </a> @@ -143,21 +145,25 @@ <td align="right" nowrap> {{ $entry.Timestamp.Format "2006-01-02 15:04" }} </td> - <td> - {{if $entry.IsDirectory}} - <label class="button danger" onclick="handleDelete('{{ printpath $path "/" $entry.Name "/" }}')">Delete</label> - {{else}} - <label class="button danger" onclick="handleDelete('{{ printpath $path "/" $entry.Name }}')">Delete</label> - {{end}} - <label class="button info" onclick="handleRename('{{ $entry.Name }}', '{{ printpath $path "/" }}')">Rename</label> + <td style="width:75px"> + <div class="btn-group btn-group-xs pull-right operations" role="group"> + <label class="btn" onclick="handleRename('{{ $entry.Name }}', '{{ printpath $path "/" }}')"> + <span class="glyphicon glyphicon-edit" aria-hidden="true"></span> + </label> + {{if $entry.IsDirectory}} + <label class="btn" onclick="handleDelete('{{ printpath $path "/" $entry.Name "/" }}')"> + <span class="glyphicon glyphicon-trash" aria-hidden="true"></span> + </label> + {{else}} + <label class="btn" onclick="handleDelete('{{ printpath $path "/" $entry.Name }}')"> + <span class="glyphicon glyphicon-trash" aria-hidden="true"></span> + </label> + {{end}} + </div> </td> </tr> {{ end }} </table> - {{if .EmptyFolder}} - <div class="row add-files"> - + - </div> {{end}} </form> </div> @@ -234,17 +240,23 @@ function renderProgress() { var values = Object.values(uploadList); - var html = '<table class="progress-table">\n'; + var html = '<table class="table">\n<tr><th>Uploading</th><\/tr>\n'; for (let i of values) { - html += '<tr>\n<td class="progress-table-file-name">' + i.name + '<\/td>\n'; - html += '<td class="progress-table-percent">' + i.percent + '% <\/td>\n<\/tr>\n'; + var progressBarClass = 'progress-bar-striped active'; + if (i.percent >= 100) { + progressBarClass = 'progress-bar-success'; + } + html += '<tr>\n<td>\n'; + html += '<div class="progress" style="margin-bottom: 2px;">\n'; + html += '<div class="progress-bar ' + progressBarClass + '" role="progressbar" aria-valuenow="' + '100" aria-valuemin="0" aria-valuemax="100" style="width:' + i.percent + '%;">'; + html += '<span style="margin-right: 10px;">' + i.name + '</span>' + i.percent + '%<\/div>'; + html += '<\/div>\n<\/td>\n<\/tr>\n'; } html += '<\/table>\n'; progressArea.innerHTML = html; if (values.length > 0) { progressArea.attributes.style.value = ''; } - console.log('Render Progress', values); } function reportProgress(file, percent) { @@ -289,7 +301,7 @@ } function handleCreateDir() { - var dirName = prompt('Directory Name:', ''); + var dirName = prompt('Folder Name:', ''); dirName = dirName.trim(); if (dirName == null || dirName == '') { return; diff --git a/weed/server/master_grpc_server.go b/weed/server/master_grpc_server.go index 1ad8edf91..83abdaaad 100644 --- a/weed/server/master_grpc_server.go +++ b/weed/server/master_grpc_server.go @@ -113,6 +113,9 @@ func (ms *MasterServer) SendHeartbeat(stream master_pb.Seaweed_SendHeartbeatServ } if len(heartbeat.Volumes) > 0 || heartbeat.HasNoVolumes { + dcName, rackName := ms.Topo.Configuration.Locate(heartbeat.Ip, heartbeat.DataCenter, heartbeat.Rack) + ms.Topo.DataNodeRegistration(dcName, rackName, dn) + // process heartbeat.Volumes stats.MasterReceivedHeartbeatCounter.WithLabelValues("Volumes").Inc() newVolumes, deletedVolumes := ms.Topo.SyncDataNodeRegistration(heartbeat.Volumes, dn) diff --git a/weed/server/master_grpc_server_raft.go b/weed/server/master_grpc_server_raft.go new file mode 100644 index 000000000..37491b3df --- /dev/null +++ b/weed/server/master_grpc_server_raft.go @@ -0,0 +1,66 @@ +package weed_server + +import ( + "context" + "fmt" + "github.com/chrislusf/seaweedfs/weed/cluster" + "github.com/chrislusf/seaweedfs/weed/pb/master_pb" + "github.com/hashicorp/raft" +) + +func (ms *MasterServer) RaftListClusterServers(ctx context.Context, req *master_pb.RaftListClusterServersRequest) (*master_pb.RaftListClusterServersResponse, error) { + resp := &master_pb.RaftListClusterServersResponse{} + + servers := ms.Topo.HashicorpRaft.GetConfiguration().Configuration().Servers + + for _, server := range servers { + resp.ClusterServers = append(resp.ClusterServers, &master_pb.RaftListClusterServersResponse_ClusterServers{ + Id: string(server.ID), + Address: string(server.Address), + Suffrage: server.Suffrage.String(), + }) + } + return resp, nil +} + +func (ms *MasterServer) RaftAddServer(ctx context.Context, req *master_pb.RaftAddServerRequest) (*master_pb.RaftAddServerResponse, error) { + resp := &master_pb.RaftAddServerResponse{} + if ms.Topo.HashicorpRaft.State() != raft.Leader { + return nil, fmt.Errorf("raft add server %s failed: %s is no current leader", req.Id, ms.Topo.HashicorpRaft.String()) + } + + var idxFuture raft.IndexFuture + if req.Voter { + idxFuture = ms.Topo.HashicorpRaft.AddVoter(raft.ServerID(req.Id), raft.ServerAddress(req.Address), 0, 0) + } else { + idxFuture = ms.Topo.HashicorpRaft.AddNonvoter(raft.ServerID(req.Id), raft.ServerAddress(req.Address), 0, 0) + } + + if err := idxFuture.Error(); err != nil { + return nil, err + } + return resp, nil +} + +func (ms *MasterServer) RaftRemoveServer(ctx context.Context, req *master_pb.RaftRemoveServerRequest) (*master_pb.RaftRemoveServerResponse, error) { + resp := &master_pb.RaftRemoveServerResponse{} + + if ms.Topo.HashicorpRaft.State() != raft.Leader { + return nil, fmt.Errorf("raft remove server %s failed: %s is no current leader", req.Id, ms.Topo.HashicorpRaft.String()) + } + + if !req.Force { + ms.clientChansLock.RLock() + _, ok := ms.clientChans[fmt.Sprintf("%s@%s", cluster.MasterType, req.Id)] + ms.clientChansLock.RUnlock() + if ok { + return resp, fmt.Errorf("raft remove server %s failed: client connection to master exists", req.Id) + } + } + + idxFuture := ms.Topo.HashicorpRaft.RemoveServer(raft.ServerID(req.Id), 0, 0) + if err := idxFuture.Error(); err != nil { + return nil, err + } + return resp, nil +} diff --git a/weed/server/master_server.go b/weed/server/master_server.go index b63e3a418..9f29d4ba7 100644 --- a/weed/server/master_server.go +++ b/weed/server/master_server.go @@ -1,6 +1,7 @@ package weed_server import ( + "context" "fmt" "github.com/chrislusf/seaweedfs/weed/stats" "net/http" @@ -17,6 +18,7 @@ import ( "github.com/chrislusf/raft" "github.com/gorilla/mux" + hashicorpRaft "github.com/hashicorp/raft" "google.golang.org/grpc" "github.com/chrislusf/seaweedfs/weed/glog" @@ -30,8 +32,9 @@ import ( ) const ( - SequencerType = "master.sequencer.type" - SequencerSnowflakeId = "master.sequencer.sequencer_snowflake_id" + SequencerType = "master.sequencer.type" + SequencerSnowflakeId = "master.sequencer.sequencer_snowflake_id" + RaftServerRemovalTime = 72 * time.Minute ) type MasterOption struct { @@ -62,6 +65,9 @@ type MasterServer struct { boundedLeaderChan chan int + onPeerUpdatDoneCn chan string + onPeerUpdatDoneCnExist bool + // notifying clients clientChansLock sync.RWMutex clientChans map[string]chan *master_pb.KeepConnectedResponse @@ -112,6 +118,9 @@ func NewMasterServer(r *mux.Router, option *MasterOption, peers map[string]pb.Se Cluster: cluster.NewCluster(), } ms.boundedLeaderChan = make(chan int, 16) + ms.onPeerUpdatDoneCn = make(chan string) + + ms.MasterClient.OnPeerUpdate = ms.OnPeerUpdate seq := ms.createSequencer(option) if nil == seq { @@ -160,19 +169,41 @@ func NewMasterServer(r *mux.Router, option *MasterOption, peers map[string]pb.Se } func (ms *MasterServer) SetRaftServer(raftServer *RaftServer) { - ms.Topo.RaftServer = raftServer.raftServer - ms.Topo.RaftServer.AddEventListener(raft.LeaderChangeEventType, func(e raft.Event) { - glog.V(0).Infof("leader change event: %+v => %+v", e.PrevValue(), e.Value()) - stats.MasterLeaderChangeCounter.WithLabelValues(fmt.Sprintf("%+v", e.Value())).Inc() - if ms.Topo.RaftServer.Leader() != "" { - glog.V(0).Infoln("[", ms.Topo.RaftServer.Name(), "]", ms.Topo.RaftServer.Leader(), "becomes leader.") - } - }) + var raftServerName string + if raftServer.raftServer != nil { + ms.Topo.RaftServer = raftServer.raftServer + ms.Topo.RaftServer.AddEventListener(raft.LeaderChangeEventType, func(e raft.Event) { + glog.V(0).Infof("leader change event: %+v => %+v", e.PrevValue(), e.Value()) + stats.MasterLeaderChangeCounter.WithLabelValues(fmt.Sprintf("%+v", e.Value())).Inc() + if ms.Topo.RaftServer.Leader() != "" { + glog.V(0).Infoln("[", ms.Topo.RaftServer.Name(), "]", ms.Topo.RaftServer.Leader(), "becomes leader.") + } + }) + raftServerName = ms.Topo.RaftServer.Name() + } else if raftServer.RaftHashicorp != nil { + ms.Topo.HashicorpRaft = raftServer.RaftHashicorp + leaderCh := raftServer.RaftHashicorp.LeaderCh() + prevLeader := ms.Topo.HashicorpRaft.Leader() + go func() { + for { + select { + case isLeader := <-leaderCh: + leader := ms.Topo.HashicorpRaft.Leader() + glog.V(0).Infof("is leader %+v change event: %+v => %+v", isLeader, prevLeader, leader) + stats.MasterLeaderChangeCounter.WithLabelValues(fmt.Sprintf("%+v", leader)).Inc() + prevLeader = leader + } + } + }() + raftServerName = ms.Topo.HashicorpRaft.String() + } if ms.Topo.IsLeader() { - glog.V(0).Infoln("[", ms.Topo.RaftServer.Name(), "]", "I am the leader!") + glog.V(0).Infoln("[", raftServerName, "]", "I am the leader!") } else { - if ms.Topo.RaftServer.Leader() != "" { + if ms.Topo.RaftServer != nil && ms.Topo.RaftServer.Leader() != "" { glog.V(0).Infoln("[", ms.Topo.RaftServer.Name(), "]", ms.Topo.RaftServer.Leader(), "is the leader.") + } else if ms.Topo.HashicorpRaft != nil && ms.Topo.HashicorpRaft.Leader() != "" { + glog.V(0).Infoln("[", ms.Topo.HashicorpRaft.String(), "]", ms.Topo.HashicorpRaft.Leader(), "is the leader.") } } } @@ -181,31 +212,38 @@ func (ms *MasterServer) proxyToLeader(f http.HandlerFunc) http.HandlerFunc { return func(w http.ResponseWriter, r *http.Request) { if ms.Topo.IsLeader() { f(w, r) - } else if ms.Topo.RaftServer != nil && ms.Topo.RaftServer.Leader() != "" { - ms.boundedLeaderChan <- 1 - defer func() { <-ms.boundedLeaderChan }() - targetUrl, err := url.Parse("http://" + ms.Topo.RaftServer.Leader()) - if err != nil { - writeJsonError(w, r, http.StatusInternalServerError, - fmt.Errorf("Leader URL http://%s Parse Error: %v", ms.Topo.RaftServer.Leader(), err)) - return - } - glog.V(4).Infoln("proxying to leader", ms.Topo.RaftServer.Leader()) - proxy := httputil.NewSingleHostReverseProxy(targetUrl) - director := proxy.Director - proxy.Director = func(req *http.Request) { - actualHost, err := security.GetActualRemoteHost(req) - if err == nil { - req.Header.Set("HTTP_X_FORWARDED_FOR", actualHost) - } - director(req) - } - proxy.Transport = util.Transport - proxy.ServeHTTP(w, r) - } else { - // handle requests locally + return + } + var raftServerLeader string + if ms.Topo.RaftServer != nil && ms.Topo.RaftServer.Leader() != "" { + raftServerLeader = ms.Topo.RaftServer.Leader() + } else if ms.Topo.HashicorpRaft != nil && ms.Topo.HashicorpRaft.Leader() != "" { + raftServerLeader = string(ms.Topo.HashicorpRaft.Leader()) + } + if raftServerLeader == "" { f(w, r) + return + } + ms.boundedLeaderChan <- 1 + defer func() { <-ms.boundedLeaderChan }() + targetUrl, err := url.Parse("http://" + raftServerLeader) + if err != nil { + writeJsonError(w, r, http.StatusInternalServerError, + fmt.Errorf("Leader URL http://%s Parse Error: %v", raftServerLeader, err)) + return + } + glog.V(4).Infoln("proxying to leader", raftServerLeader) + proxy := httputil.NewSingleHostReverseProxy(targetUrl) + director := proxy.Director + proxy.Director = func(req *http.Request) { + actualHost, err := security.GetActualRemoteHost(req) + if err == nil { + req.Header.Set("HTTP_X_FORWARDED_FOR", actualHost) + } + director(req) } + proxy.Transport = util.Transport + proxy.ServeHTTP(w, r) } } @@ -301,3 +339,57 @@ func (ms *MasterServer) createSequencer(option *MasterOption) sequence.Sequencer } return seq } + +func (ms *MasterServer) OnPeerUpdate(update *master_pb.ClusterNodeUpdate) { + if update.NodeType != cluster.MasterType || ms.Topo.HashicorpRaft == nil { + return + } + glog.V(4).Infof("OnPeerUpdate: %+v", update) + + peerAddress := pb.ServerAddress(update.Address) + peerName := string(peerAddress) + isLeader := ms.Topo.HashicorpRaft.State() == hashicorpRaft.Leader + if update.IsAdd { + if isLeader { + raftServerFound := false + for _, server := range ms.Topo.HashicorpRaft.GetConfiguration().Configuration().Servers { + if string(server.ID) == peerName { + raftServerFound = true + } + } + if !raftServerFound { + glog.V(0).Infof("adding new raft server: %s", peerName) + ms.Topo.HashicorpRaft.AddVoter( + hashicorpRaft.ServerID(peerName), + hashicorpRaft.ServerAddress(peerAddress.ToGrpcAddress()), 0, 0) + } + } + if ms.onPeerUpdatDoneCnExist { + ms.onPeerUpdatDoneCn <- peerName + } + } else if isLeader { + go func(peerName string) { + for { + select { + case <-time.After(RaftServerRemovalTime): + err := ms.MasterClient.WithClient(false, func(client master_pb.SeaweedClient) error { + _, err := client.RaftRemoveServer(context.Background(), &master_pb.RaftRemoveServerRequest{ + Id: peerName, + Force: false, + }) + return err + }) + if err != nil { + glog.Warningf("failed to removing old raft server %s: %v", peerName, err) + } + return + case peerDone := <-ms.onPeerUpdatDoneCn: + if peerName == peerDone { + return + } + } + } + }(peerName) + ms.onPeerUpdatDoneCnExist = true + } +} diff --git a/weed/server/master_server_handlers_ui.go b/weed/server/master_server_handlers_ui.go index 015bfbd00..d8260d8d2 100644 --- a/weed/server/master_server_handlers_ui.go +++ b/weed/server/master_server_handlers_ui.go @@ -5,6 +5,8 @@ import ( "time" "github.com/chrislusf/raft" + hashicorpRaft "github.com/hashicorp/raft" + ui "github.com/chrislusf/seaweedfs/weed/server/master_ui" "github.com/chrislusf/seaweedfs/weed/stats" "github.com/chrislusf/seaweedfs/weed/util" @@ -13,20 +15,40 @@ import ( func (ms *MasterServer) uiStatusHandler(w http.ResponseWriter, r *http.Request) { infos := make(map[string]interface{}) infos["Up Time"] = time.Now().Sub(startTime).String() - args := struct { - Version string - Topology interface{} - RaftServer raft.Server - Stats map[string]interface{} - Counters *stats.ServerStats - VolumeSizeLimitMB uint32 - }{ - util.Version(), - ms.Topo.ToMap(), - ms.Topo.RaftServer, - infos, - serverStats, - ms.option.VolumeSizeLimitMB, + infos["Max Volume Id"] = ms.Topo.GetMaxVolumeId() + if ms.Topo.RaftServer != nil { + args := struct { + Version string + Topology interface{} + RaftServer raft.Server + Stats map[string]interface{} + Counters *stats.ServerStats + VolumeSizeLimitMB uint32 + }{ + util.Version(), + ms.Topo.ToMap(), + ms.Topo.RaftServer, + infos, + serverStats, + ms.option.VolumeSizeLimitMB, + } + ui.StatusTpl.Execute(w, args) + } else if ms.Topo.HashicorpRaft != nil { + args := struct { + Version string + Topology interface{} + RaftServer *hashicorpRaft.Raft + Stats map[string]interface{} + Counters *stats.ServerStats + VolumeSizeLimitMB uint32 + }{ + util.Version(), + ms.Topo.ToMap(), + ms.Topo.HashicorpRaft, + infos, + serverStats, + ms.option.VolumeSizeLimitMB, + } + ui.StatusNewRaftTpl.Execute(w, args) } - ui.StatusTpl.Execute(w, args) } diff --git a/weed/server/master_ui/masterNewRaft.html b/weed/server/master_ui/masterNewRaft.html new file mode 100644 index 000000000..32afdceac --- /dev/null +++ b/weed/server/master_ui/masterNewRaft.html @@ -0,0 +1,121 @@ +<!DOCTYPE html> +<html> +<head> + <title>SeaweedFS {{ .Version }}</title> + <link rel="stylesheet" href="/seaweedfsstatic/bootstrap/3.3.1/css/bootstrap.min.css"> +</head> +<body> +<div class="container"> + <div class="page-header"> + <h1> + <a href="https://github.com/chrislusf/seaweedfs"><img src="/seaweedfsstatic/seaweed50x50.png"></img></a> + SeaweedFS <small>{{ .Version }}</small> + </h1> + </div> + + <div class="row"> + <div class="col-sm-6"> + <h2>Cluster status</h2> + <table class="table table-condensed table-striped"> + <tbody> + <tr> + <th>Volume Size Limit</th> + <td>{{ .VolumeSizeLimitMB }}MB</td> + </tr> + <tr> + <th>Free</th> + <td>{{ .Topology.Free }}</td> + </tr> + <tr> + <th>Max</th> + <td>{{ .Topology.Max }}</td> + </tr> + {{ with .RaftServer }} + <tr> + <th>Leader</th> + <td><a href="http://{{ .Leader }}">{{ .Leader }}</a></td> + </tr> + <tr> + <th>Other Masters</th> + <td class="col-sm-5"> + <ul class="list-unstyled"> + {{ range $k, $p := .GetConfiguration.Configuration.Servers }} + <li><a href="http://{{ $p.ID }}/ui/index.html">{{ $p.ID }}</a></li> + {{ end }} + </ul> + </td> + </tr> + {{ end }} + </tbody> + </table> + </div> + + <div class="col-sm-6"> + <h2>System Stats</h2> + <table class="table table-condensed table-striped"> + <tr> + <th>Concurrent Connections</th> + <td>{{ .Counters.Connections.WeekCounter.Sum }}</td> + </tr> + {{ range $key, $val := .Stats }} + <tr> + <th>{{ $key }}</th> + <td>{{ $val }}</td> + </tr> + {{ end }} + </table> + <h2>Raft Stats</h2> + <table class="table table-condensed table-striped"> + <tr> + <th>applied_index</th> + <td>{{ .RaftServer.Stats.applied_index }}</td> + </tr> + <tr> + <th>last_log_term</th> + <td>{{ .RaftServer.Stats.last_log_term }}</td> + </tr> + </table> + </div> + </div> + + <div class="row"> + <h2>Topology</h2> + <table class="table table-striped"> + <thead> + <tr> + <th>Data Center</th> + <th>Rack</th> + <th>RemoteAddr</th> + <th>#Volumes</th> + <th>Volume Ids</th> + <th>#ErasureCodingShards</th> + <th>Max</th> + </tr> + </thead> + <tbody> + {{ range $dc_index, $dc := .Topology.DataCenters }} + {{ range $rack_index, $rack := $dc.Racks }} + {{ range $dn_index, $dn := $rack.DataNodes }} + <tr> + <td><code>{{ $dc.Id }}</code></td> + <td>{{ $rack.Id }}</td> + <td><a href="http://{{ $dn.Url }}/ui/index.html">{{ $dn.Url }}</a> + {{ if ne $dn.PublicUrl $dn.Url }} + / <a href="http://{{ $dn.PublicUrl }}/ui/index.html">{{ $dn.PublicUrl }}</a> + {{ end }} + </td> + <td>{{ $dn.Volumes }}</td> + <td>{{ $dn.VolumeIds}}</td> + <td>{{ $dn.EcShards }}</td> + <td>{{ $dn.Max }}</td> + </tr> + {{ end }} + {{ end }} + {{ end }} + </tbody> + </table> + </div> + +</div> +</body> +</html> diff --git a/weed/server/master_ui/templates.go b/weed/server/master_ui/templates.go index 415022b97..a6dcc57d7 100644 --- a/weed/server/master_ui/templates.go +++ b/weed/server/master_ui/templates.go @@ -8,4 +8,8 @@ import ( //go:embed master.html var masterHtml string +//go:embed masterNewRaft.html +var masterNewRaftHtml string + var StatusTpl = template.Must(template.New("status").Parse(masterHtml)) +var StatusNewRaftTpl = template.Must(template.New("status").Parse(masterNewRaftHtml)) diff --git a/weed/server/raft_hashicorp.go b/weed/server/raft_hashicorp.go new file mode 100644 index 000000000..cc6578bf5 --- /dev/null +++ b/weed/server/raft_hashicorp.go @@ -0,0 +1,183 @@ +package weed_server + +// https://yusufs.medium.com/creating-distributed-kv-database-by-implementing-raft-consensus-using-golang-d0884eef2e28 +// https://github.com/Jille/raft-grpc-example/blob/cd5bcab0218f008e044fbeee4facdd01b06018ad/application.go#L18 + +import ( + "fmt" + transport "github.com/Jille/raft-grpc-transport" + "github.com/chrislusf/seaweedfs/weed/glog" + "github.com/chrislusf/seaweedfs/weed/pb" + "github.com/hashicorp/raft" + boltdb "github.com/hashicorp/raft-boltdb" + "google.golang.org/grpc" + "math/rand" + "os" + "path" + "path/filepath" + "sort" + "strings" + "time" +) + +const ( + ldbFile = "logs.dat" + sdbFile = "stable.dat" + updatePeersTimeout = 15 * time.Minute +) + +func getPeerIdx(self pb.ServerAddress, mapPeers map[string]pb.ServerAddress) int { + peers := make([]pb.ServerAddress, 0, len(mapPeers)) + for _, peer := range mapPeers { + peers = append(peers, peer) + } + sort.Slice(peers, func(i, j int) bool { + return strings.Compare(string(peers[i]), string(peers[j])) < 0 + }) + for i, peer := range peers { + if string(peer) == string(self) { + return i + } + } + return -1 +} + +func (s *RaftServer) AddPeersConfiguration() (cfg raft.Configuration) { + for _, peer := range s.peers { + cfg.Servers = append(cfg.Servers, raft.Server{ + Suffrage: raft.Voter, + ID: raft.ServerID(peer), + Address: raft.ServerAddress(peer.ToGrpcAddress()), + }) + } + return cfg +} + +func (s *RaftServer) UpdatePeers() { + for { + select { + case isLeader := <-s.RaftHashicorp.LeaderCh(): + if isLeader { + peerLeader := string(s.serverAddr) + existsPeerName := make(map[string]bool) + for _, server := range s.RaftHashicorp.GetConfiguration().Configuration().Servers { + if string(server.ID) == peerLeader { + continue + } + existsPeerName[string(server.ID)] = true + } + for _, peer := range s.peers { + peerName := string(peer) + if peerName == peerLeader || existsPeerName[peerName] { + continue + } + glog.V(0).Infof("adding new peer: %s", peerName) + s.RaftHashicorp.AddVoter( + raft.ServerID(peerName), raft.ServerAddress(peer.ToGrpcAddress()), 0, 0) + } + for peer, _ := range existsPeerName { + if _, found := s.peers[peer]; !found { + glog.V(0).Infof("removing old peer: %s", peer) + s.RaftHashicorp.RemoveServer(raft.ServerID(peer), 0, 0) + } + } + if _, found := s.peers[peerLeader]; !found { + glog.V(0).Infof("removing old leader peer: %s", peerLeader) + s.RaftHashicorp.RemoveServer(raft.ServerID(peerLeader), 0, 0) + } + } + return + case <-time.After(updatePeersTimeout): + return + } + } +} + +func NewHashicorpRaftServer(option *RaftServerOption) (*RaftServer, error) { + s := &RaftServer{ + peers: option.Peers, + serverAddr: option.ServerAddr, + dataDir: option.DataDir, + topo: option.Topo, + } + + c := raft.DefaultConfig() + c.LocalID = raft.ServerID(s.serverAddr) // TODO maybee the IP:port address will change + c.HeartbeatTimeout = time.Duration(float64(option.HeartbeatInterval) * (rand.Float64()*0.25 + 1)) + c.ElectionTimeout = option.ElectionTimeout + if c.LeaderLeaseTimeout > c.HeartbeatTimeout { + c.LeaderLeaseTimeout = c.HeartbeatTimeout + } + if glog.V(4) { + c.LogLevel = "Debug" + } else if glog.V(2) { + c.LogLevel = "Info" + } else if glog.V(1) { + c.LogLevel = "Warn" + } else if glog.V(0) { + c.LogLevel = "Error" + } + + if option.RaftBootstrap { + os.RemoveAll(path.Join(s.dataDir, ldbFile)) + os.RemoveAll(path.Join(s.dataDir, sdbFile)) + os.RemoveAll(path.Join(s.dataDir, "snapshot")) + } + baseDir := s.dataDir + + ldb, err := boltdb.NewBoltStore(filepath.Join(baseDir, ldbFile)) + if err != nil { + return nil, fmt.Errorf(`boltdb.NewBoltStore(%q): %v`, filepath.Join(baseDir, "logs.dat"), err) + } + + sdb, err := boltdb.NewBoltStore(filepath.Join(baseDir, sdbFile)) + if err != nil { + return nil, fmt.Errorf(`boltdb.NewBoltStore(%q): %v`, filepath.Join(baseDir, "stable.dat"), err) + } + + fss, err := raft.NewFileSnapshotStore(baseDir, 3, os.Stderr) + if err != nil { + return nil, fmt.Errorf(`raft.NewFileSnapshotStore(%q, ...): %v`, baseDir, err) + } + + s.TransportManager = transport.New(raft.ServerAddress(s.serverAddr), []grpc.DialOption{option.GrpcDialOption}) + + stateMachine := StateMachine{topo: option.Topo} + s.RaftHashicorp, err = raft.NewRaft(c, &stateMachine, ldb, sdb, fss, s.TransportManager.Transport()) + if err != nil { + return nil, fmt.Errorf("raft.NewRaft: %v", err) + } + if option.RaftBootstrap || len(s.RaftHashicorp.GetConfiguration().Configuration().Servers) == 0 { + cfg := s.AddPeersConfiguration() + // Need to get lock, in case all servers do this at the same time. + peerIdx := getPeerIdx(s.serverAddr, s.peers) + timeSpeep := time.Duration(float64(c.LeaderLeaseTimeout) * (rand.Float64()*0.25 + 1) * float64(peerIdx)) + glog.V(0).Infof("Bootstrapping idx: %d sleep: %v new cluster: %+v", peerIdx, timeSpeep, cfg) + time.Sleep(timeSpeep) + f := s.RaftHashicorp.BootstrapCluster(cfg) + if err := f.Error(); err != nil { + return nil, fmt.Errorf("raft.Raft.BootstrapCluster: %v", err) + } + } else { + go s.UpdatePeers() + } + + ticker := time.NewTicker(c.HeartbeatTimeout * 10) + if glog.V(4) { + go func() { + for { + select { + case <-ticker.C: + cfuture := s.RaftHashicorp.GetConfiguration() + if err = cfuture.Error(); err != nil { + glog.Fatalf("error getting config: %s", err) + } + configuration := cfuture.Configuration() + glog.V(4).Infof("Showing peers known by %s:\n%+v", s.RaftHashicorp.String(), configuration.Servers) + } + } + }() + } + + return s, nil +} diff --git a/weed/server/raft_server.go b/weed/server/raft_server.go index d559cb691..8c372f0cc 100644 --- a/weed/server/raft_server.go +++ b/weed/server/raft_server.go @@ -2,6 +2,9 @@ package weed_server import ( "encoding/json" + transport "github.com/Jille/raft-grpc-transport" + "io" + "io/ioutil" "math/rand" "os" "path" @@ -12,6 +15,7 @@ import ( "github.com/chrislusf/seaweedfs/weed/pb" "github.com/chrislusf/raft" + hashicorpRaft "github.com/hashicorp/raft" "github.com/chrislusf/seaweedfs/weed/glog" "github.com/chrislusf/seaweedfs/weed/topology" @@ -26,14 +30,17 @@ type RaftServerOption struct { RaftResumeState bool HeartbeatInterval time.Duration ElectionTimeout time.Duration + RaftBootstrap bool } type RaftServer struct { - peers map[string]pb.ServerAddress // initial peers to join with - raftServer raft.Server - dataDir string - serverAddr pb.ServerAddress - topo *topology.Topology + peers map[string]pb.ServerAddress // initial peers to join with + raftServer raft.Server + RaftHashicorp *hashicorpRaft.Raft + TransportManager *transport.Manager + dataDir string + serverAddr pb.ServerAddress + topo *topology.Topology *raft.GrpcServer } @@ -42,6 +49,8 @@ type StateMachine struct { topo *topology.Topology } +var _ hashicorpRaft.FSM = &StateMachine{} + func (s StateMachine) Save() ([]byte, error) { state := topology.MaxVolumeIdCommand{ MaxVolumeId: s.topo.GetMaxVolumeId(), @@ -61,6 +70,36 @@ func (s StateMachine) Recovery(data []byte) error { return nil } +func (s *StateMachine) Apply(l *hashicorpRaft.Log) interface{} { + before := s.topo.GetMaxVolumeId() + state := topology.MaxVolumeIdCommand{} + err := json.Unmarshal(l.Data, &state) + if err != nil { + return err + } + s.topo.UpAdjustMaxVolumeId(state.MaxVolumeId) + + glog.V(1).Infoln("max volume id", before, "==>", s.topo.GetMaxVolumeId()) + return nil +} + +func (s *StateMachine) Snapshot() (hashicorpRaft.FSMSnapshot, error) { + return &topology.MaxVolumeIdCommand{ + MaxVolumeId: s.topo.GetMaxVolumeId(), + }, nil +} + +func (s *StateMachine) Restore(r io.ReadCloser) error { + b, err := ioutil.ReadAll(r) + if err != nil { + return err + } + if err := s.Recovery(b); err != nil { + return err + } + return nil +} + func NewRaftServer(option *RaftServerOption) (*RaftServer, error) { s := &RaftServer{ peers: option.Peers, @@ -132,12 +171,17 @@ func NewRaftServer(option *RaftServerOption) (*RaftServer, error) { } func (s *RaftServer) Peers() (members []string) { - peers := s.raftServer.Peers() - - for _, p := range peers { - members = append(members, p.Name) + if s.raftServer != nil { + peers := s.raftServer.Peers() + for _, p := range peers { + members = append(members, p.Name) + } + } else if s.RaftHashicorp != nil { + cfg := s.RaftHashicorp.GetConfiguration() + for _, p := range cfg.Configuration().Servers { + members = append(members, string(p.ID)) + } } - return } diff --git a/weed/server/raft_server_handlers.go b/weed/server/raft_server_handlers.go index 7e58f1e92..cc3e6e37f 100644 --- a/weed/server/raft_server_handlers.go +++ b/weed/server/raft_server_handlers.go @@ -25,3 +25,11 @@ func (s *RaftServer) StatusHandler(w http.ResponseWriter, r *http.Request) { } writeJsonQuiet(w, r, http.StatusOK, ret) } + +func (s *RaftServer) StatsRaftHandler(w http.ResponseWriter, r *http.Request) { + if s.RaftHashicorp == nil { + writeJsonQuiet(w, r, http.StatusNotFound, nil) + return + } + writeJsonQuiet(w, r, http.StatusOK, s.RaftHashicorp.Stats()) +} |
