aboutsummaryrefslogtreecommitdiff
path: root/weed/server
diff options
context:
space:
mode:
authorguosj <515878133@qq.com>2022-04-19 09:25:32 +0800
committerguosj <515878133@qq.com>2022-04-19 09:25:32 +0800
commit82ee31965dd7a1ad2d348c7e9dadb254744bf9b0 (patch)
tree593eb933dffc877010c761b2c55ec6c73875e9a3 /weed/server
parent5c9a3bb8cf68ed99acb53dd548c92b54744d7fd7 (diff)
parente6ebafc094dc0ce0e3b0a68d7735f52a544bc479 (diff)
downloadseaweedfs-82ee31965dd7a1ad2d348c7e9dadb254744bf9b0.tar.xz
seaweedfs-82ee31965dd7a1ad2d348c7e9dadb254744bf9b0.zip
Merge branch 'master' of https://github.com/chrislusf/seaweedfs into chrislusf-master
Diffstat (limited to 'weed/server')
-rw-r--r--weed/server/filer_grpc_server.go126
-rw-r--r--weed/server/filer_grpc_server_admin.go177
-rw-r--r--weed/server/filer_server.go2
-rw-r--r--weed/server/filer_server_handlers_read_dir.go6
-rw-r--r--weed/server/filer_server_handlers_tagging.go4
-rw-r--r--weed/server/filer_server_handlers_write_autochunk.go12
-rw-r--r--weed/server/filer_server_handlers_write_merge.go11
-rw-r--r--weed/server/filer_server_handlers_write_upload.go8
-rw-r--r--weed/server/filer_ui/breadcrumb.go6
-rw-r--r--weed/server/filer_ui/filer.html247
-rw-r--r--weed/server/master_grpc_server.go15
-rw-r--r--weed/server/master_grpc_server_admin.go42
-rw-r--r--weed/server/master_grpc_server_raft.go66
-rw-r--r--weed/server/master_grpc_server_volume.go2
-rw-r--r--weed/server/master_server.go164
-rw-r--r--weed/server/master_server_handlers_admin.go2
-rw-r--r--weed/server/master_server_handlers_ui.go52
-rw-r--r--weed/server/master_ui/masterNewRaft.html121
-rw-r--r--weed/server/master_ui/templates.go4
-rw-r--r--weed/server/raft_hashicorp.go183
-rw-r--r--weed/server/raft_server.go99
-rw-r--r--weed/server/raft_server_handlers.go8
-rw-r--r--weed/server/volume_grpc_admin.go43
23 files changed, 1132 insertions, 268 deletions
diff --git a/weed/server/filer_grpc_server.go b/weed/server/filer_grpc_server.go
index 3f65660ee..67d4aaaaf 100644
--- a/weed/server/filer_grpc_server.go
+++ b/weed/server/filer_grpc_server.go
@@ -3,7 +3,6 @@ package weed_server
import (
"context"
"fmt"
- "github.com/chrislusf/seaweedfs/weed/pb"
"os"
"path/filepath"
"strconv"
@@ -357,128 +356,3 @@ func (fs *FilerServer) DeleteCollection(ctx context.Context, req *filer_pb.Delet
return &filer_pb.DeleteCollectionResponse{}, err
}
-
-func (fs *FilerServer) Statistics(ctx context.Context, req *filer_pb.StatisticsRequest) (resp *filer_pb.StatisticsResponse, err error) {
-
- var output *master_pb.StatisticsResponse
-
- err = fs.filer.MasterClient.WithClient(false, func(masterClient master_pb.SeaweedClient) error {
- grpcResponse, grpcErr := masterClient.Statistics(context.Background(), &master_pb.StatisticsRequest{
- Replication: req.Replication,
- Collection: req.Collection,
- Ttl: req.Ttl,
- DiskType: req.DiskType,
- })
- if grpcErr != nil {
- return grpcErr
- }
-
- output = grpcResponse
- return nil
- })
-
- if err != nil {
- return nil, err
- }
-
- return &filer_pb.StatisticsResponse{
- TotalSize: output.TotalSize,
- UsedSize: output.UsedSize,
- FileCount: output.FileCount,
- }, nil
-}
-
-func (fs *FilerServer) GetFilerConfiguration(ctx context.Context, req *filer_pb.GetFilerConfigurationRequest) (resp *filer_pb.GetFilerConfigurationResponse, err error) {
-
- clusterId, _ := fs.filer.Store.KvGet(context.Background(), []byte("clusterId"))
-
- t := &filer_pb.GetFilerConfigurationResponse{
- Masters: pb.ToAddressStrings(fs.option.Masters),
- Collection: fs.option.Collection,
- Replication: fs.option.DefaultReplication,
- MaxMb: uint32(fs.option.MaxMB),
- DirBuckets: fs.filer.DirBucketsPath,
- Cipher: fs.filer.Cipher,
- Signature: fs.filer.Signature,
- MetricsAddress: fs.metricsAddress,
- MetricsIntervalSec: int32(fs.metricsIntervalSec),
- Version: util.Version(),
- ClusterId: string(clusterId),
- }
-
- glog.V(4).Infof("GetFilerConfiguration: %v", t)
-
- return t, nil
-}
-
-func (fs *FilerServer) KeepConnected(stream filer_pb.SeaweedFiler_KeepConnectedServer) error {
-
- req, err := stream.Recv()
- if err != nil {
- return err
- }
-
- clientName := util.JoinHostPort(req.Name, int(req.GrpcPort))
- m := make(map[string]bool)
- for _, tp := range req.Resources {
- m[tp] = true
- }
- fs.brokersLock.Lock()
- fs.brokers[clientName] = m
- glog.V(0).Infof("+ broker %v", clientName)
- fs.brokersLock.Unlock()
-
- defer func() {
- fs.brokersLock.Lock()
- delete(fs.brokers, clientName)
- glog.V(0).Infof("- broker %v: %v", clientName, err)
- fs.brokersLock.Unlock()
- }()
-
- for {
- if err := stream.Send(&filer_pb.KeepConnectedResponse{}); err != nil {
- glog.V(0).Infof("send broker %v: %+v", clientName, err)
- return err
- }
- // println("replied")
-
- if _, err := stream.Recv(); err != nil {
- glog.V(0).Infof("recv broker %v: %v", clientName, err)
- return err
- }
- // println("received")
- }
-
-}
-
-func (fs *FilerServer) LocateBroker(ctx context.Context, req *filer_pb.LocateBrokerRequest) (resp *filer_pb.LocateBrokerResponse, err error) {
-
- resp = &filer_pb.LocateBrokerResponse{}
-
- fs.brokersLock.Lock()
- defer fs.brokersLock.Unlock()
-
- var localBrokers []*filer_pb.LocateBrokerResponse_Resource
-
- for b, m := range fs.brokers {
- if _, found := m[req.Resource]; found {
- resp.Found = true
- resp.Resources = []*filer_pb.LocateBrokerResponse_Resource{
- {
- GrpcAddresses: b,
- ResourceCount: int32(len(m)),
- },
- }
- return
- }
- localBrokers = append(localBrokers, &filer_pb.LocateBrokerResponse_Resource{
- GrpcAddresses: b,
- ResourceCount: int32(len(m)),
- })
- }
-
- resp.Resources = localBrokers
-
- return resp, nil
-
-}
diff --git a/weed/server/filer_grpc_server_admin.go b/weed/server/filer_grpc_server_admin.go
new file mode 100644
index 000000000..5341fc52f
--- /dev/null
+++ b/weed/server/filer_grpc_server_admin.go
@@ -0,0 +1,177 @@
+package weed_server
+
+import (
+ "context"
+ "fmt"
+ "github.com/chrislusf/seaweedfs/weed/cluster"
+ "github.com/chrislusf/seaweedfs/weed/glog"
+ "github.com/chrislusf/seaweedfs/weed/pb"
+ "github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
+ "github.com/chrislusf/seaweedfs/weed/pb/master_pb"
+ "github.com/chrislusf/seaweedfs/weed/pb/volume_server_pb"
+ "github.com/chrislusf/seaweedfs/weed/util"
+ "time"
+)
+
+func (fs *FilerServer) Statistics(ctx context.Context, req *filer_pb.StatisticsRequest) (resp *filer_pb.StatisticsResponse, err error) {
+
+ var output *master_pb.StatisticsResponse
+
+ err = fs.filer.MasterClient.WithClient(false, func(masterClient master_pb.SeaweedClient) error {
+ grpcResponse, grpcErr := masterClient.Statistics(context.Background(), &master_pb.StatisticsRequest{
+ Replication: req.Replication,
+ Collection: req.Collection,
+ Ttl: req.Ttl,
+ DiskType: req.DiskType,
+ })
+ if grpcErr != nil {
+ return grpcErr
+ }
+
+ output = grpcResponse
+ return nil
+ })
+
+ if err != nil {
+ return nil, err
+ }
+
+ return &filer_pb.StatisticsResponse{
+ TotalSize: output.TotalSize,
+ UsedSize: output.UsedSize,
+ FileCount: output.FileCount,
+ }, nil
+}
+
+func (fs *FilerServer) Ping(ctx context.Context, req *filer_pb.PingRequest) (resp *filer_pb.PingResponse, pingErr error) {
+ resp = &filer_pb.PingResponse{
+ StartTimeNs: time.Now().UnixNano(),
+ }
+ if req.TargetType == cluster.FilerType {
+ pingErr = pb.WithFilerClient(false, pb.ServerAddress(req.Target), fs.grpcDialOption, func(client filer_pb.SeaweedFilerClient) error {
+ pingResp, err := client.Ping(ctx, &filer_pb.PingRequest{})
+ if pingResp != nil {
+ resp.RemoteTimeNs = pingResp.StartTimeNs
+ }
+ return err
+ })
+ }
+ if req.TargetType == cluster.VolumeServerType {
+ pingErr = pb.WithVolumeServerClient(false, pb.ServerAddress(req.Target), fs.grpcDialOption, func(client volume_server_pb.VolumeServerClient) error {
+ pingResp, err := client.Ping(ctx, &volume_server_pb.PingRequest{})
+ if pingResp != nil {
+ resp.RemoteTimeNs = pingResp.StartTimeNs
+ }
+ return err
+ })
+ }
+ if req.TargetType == cluster.MasterType {
+ pingErr = pb.WithMasterClient(false, pb.ServerAddress(req.Target), fs.grpcDialOption, func(client master_pb.SeaweedClient) error {
+ pingResp, err := client.Ping(ctx, &master_pb.PingRequest{})
+ if pingResp != nil {
+ resp.RemoteTimeNs = pingResp.StartTimeNs
+ }
+ return err
+ })
+ }
+ if pingErr != nil {
+ pingErr = fmt.Errorf("ping %s %s: %v", req.TargetType, req.Target, pingErr)
+ }
+ resp.StopTimeNs = time.Now().UnixNano()
+ return
+}
+
+func (fs *FilerServer) GetFilerConfiguration(ctx context.Context, req *filer_pb.GetFilerConfigurationRequest) (resp *filer_pb.GetFilerConfigurationResponse, err error) {
+
+ clusterId, _ := fs.filer.Store.KvGet(context.Background(), []byte("clusterId"))
+
+ t := &filer_pb.GetFilerConfigurationResponse{
+ Masters: pb.ToAddressStringsFromMap(fs.option.Masters),
+ Collection: fs.option.Collection,
+ Replication: fs.option.DefaultReplication,
+ MaxMb: uint32(fs.option.MaxMB),
+ DirBuckets: fs.filer.DirBucketsPath,
+ Cipher: fs.filer.Cipher,
+ Signature: fs.filer.Signature,
+ MetricsAddress: fs.metricsAddress,
+ MetricsIntervalSec: int32(fs.metricsIntervalSec),
+ Version: util.Version(),
+ ClusterId: string(clusterId),
+ }
+
+ glog.V(4).Infof("GetFilerConfiguration: %v", t)
+
+ return t, nil
+}
+
+func (fs *FilerServer) KeepConnected(stream filer_pb.SeaweedFiler_KeepConnectedServer) error {
+
+ req, err := stream.Recv()
+ if err != nil {
+ return err
+ }
+
+ clientName := util.JoinHostPort(req.Name, int(req.GrpcPort))
+ m := make(map[string]bool)
+ for _, tp := range req.Resources {
+ m[tp] = true
+ }
+ fs.brokersLock.Lock()
+ fs.brokers[clientName] = m
+ glog.V(0).Infof("+ broker %v", clientName)
+ fs.brokersLock.Unlock()
+
+ defer func() {
+ fs.brokersLock.Lock()
+ delete(fs.brokers, clientName)
+ glog.V(0).Infof("- broker %v: %v", clientName, err)
+ fs.brokersLock.Unlock()
+ }()
+
+ for {
+ if err := stream.Send(&filer_pb.KeepConnectedResponse{}); err != nil {
+ glog.V(0).Infof("send broker %v: %+v", clientName, err)
+ return err
+ }
+ // println("replied")
+
+ if _, err := stream.Recv(); err != nil {
+ glog.V(0).Infof("recv broker %v: %v", clientName, err)
+ return err
+ }
+ // println("received")
+ }
+
+}
+
+func (fs *FilerServer) LocateBroker(ctx context.Context, req *filer_pb.LocateBrokerRequest) (resp *filer_pb.LocateBrokerResponse, err error) {
+
+ resp = &filer_pb.LocateBrokerResponse{}
+
+ fs.brokersLock.Lock()
+ defer fs.brokersLock.Unlock()
+
+ var localBrokers []*filer_pb.LocateBrokerResponse_Resource
+
+ for b, m := range fs.brokers {
+ if _, found := m[req.Resource]; found {
+ resp.Found = true
+ resp.Resources = []*filer_pb.LocateBrokerResponse_Resource{
+ {
+ GrpcAddresses: b,
+ ResourceCount: int32(len(m)),
+ },
+ }
+ return
+ }
+ localBrokers = append(localBrokers, &filer_pb.LocateBrokerResponse_Resource{
+ GrpcAddresses: b,
+ ResourceCount: int32(len(m)),
+ })
+ }
+
+ resp.Resources = localBrokers
+
+ return resp, nil
+
+}
diff --git a/weed/server/filer_server.go b/weed/server/filer_server.go
index 497f59568..7edd5870f 100644
--- a/weed/server/filer_server.go
+++ b/weed/server/filer_server.go
@@ -48,7 +48,7 @@ import (
)
type FilerOption struct {
- Masters []pb.ServerAddress
+ Masters map[string]pb.ServerAddress
Collection string
DefaultReplication string
DisableDirListing bool
diff --git a/weed/server/filer_server_handlers_read_dir.go b/weed/server/filer_server_handlers_read_dir.go
index f67e90d38..8382cfc76 100644
--- a/weed/server/filer_server_handlers_read_dir.go
+++ b/weed/server/filer_server_handlers_read_dir.go
@@ -46,8 +46,10 @@ func (fs *FilerServer) listDirectoryHandler(w http.ResponseWriter, r *http.Reque
path = ""
}
+ emptyFolder := true
if len(entries) > 0 {
lastFileName = entries[len(entries)-1].Name()
+ emptyFolder = false
}
glog.V(4).Infof("listDirectory %s, last file %s, limit %d: %d items", path, lastFileName, limit, len(entries))
@@ -59,12 +61,14 @@ func (fs *FilerServer) listDirectoryHandler(w http.ResponseWriter, r *http.Reque
Limit int
LastFileName string
ShouldDisplayLoadMore bool
+ EmptyFolder bool
}{
path,
entries,
limit,
lastFileName,
shouldDisplayLoadMore,
+ emptyFolder,
})
return
}
@@ -76,6 +80,7 @@ func (fs *FilerServer) listDirectoryHandler(w http.ResponseWriter, r *http.Reque
Limit int
LastFileName string
ShouldDisplayLoadMore bool
+ EmptyFolder bool
}{
path,
ui.ToBreadcrumb(path),
@@ -83,5 +88,6 @@ func (fs *FilerServer) listDirectoryHandler(w http.ResponseWriter, r *http.Reque
limit,
lastFileName,
shouldDisplayLoadMore,
+ emptyFolder,
})
}
diff --git a/weed/server/filer_server_handlers_tagging.go b/weed/server/filer_server_handlers_tagging.go
index 1da7ff50f..ae2093947 100644
--- a/weed/server/filer_server_handlers_tagging.go
+++ b/weed/server/filer_server_handlers_tagging.go
@@ -82,7 +82,9 @@ func (fs *FilerServer) DeleteTaggingHandler(w http.ResponseWriter, r *http.Reque
toDelete := strings.Split(r.URL.Query().Get("tagging"), ",")
deletions := make(map[string]struct{})
for _, deletion := range toDelete {
- deletions[deletion] = struct{}{}
+ if deletion != "" {
+ deletions[deletion] = struct{}{}
+ }
}
// delete all tags or specific tags
diff --git a/weed/server/filer_server_handlers_write_autochunk.go b/weed/server/filer_server_handlers_write_autochunk.go
index 854b35f82..9bf2df6ef 100644
--- a/weed/server/filer_server_handlers_write_autochunk.go
+++ b/weed/server/filer_server_handlers_write_autochunk.go
@@ -164,6 +164,7 @@ func (fs *FilerServer) saveMetaData(ctx context.Context, r *http.Request, fileNa
}
var entry *filer.Entry
+ var newChunks []*filer_pb.FileChunk
var mergedChunks []*filer_pb.FileChunk
isAppend := isAppend(r)
@@ -186,7 +187,7 @@ func (fs *FilerServer) saveMetaData(ctx context.Context, r *http.Request, fileNa
}
entry.FileSize += uint64(chunkOffset)
}
- mergedChunks = append(entry.Chunks, fileChunks...)
+ newChunks = append(entry.Chunks, fileChunks...)
// TODO
if len(entry.Content) > 0 {
@@ -196,7 +197,7 @@ func (fs *FilerServer) saveMetaData(ctx context.Context, r *http.Request, fileNa
} else {
glog.V(4).Infoln("saving", path)
- mergedChunks = fileChunks
+ newChunks = fileChunks
entry = &filer.Entry{
FullPath: util.FullPath(path),
Attr: filer.Attr{
@@ -217,6 +218,13 @@ func (fs *FilerServer) saveMetaData(ctx context.Context, r *http.Request, fileNa
}
}
+ // maybe concatenate small chunks into one whole chunk
+ mergedChunks, replyerr = fs.maybeMergeChunks(so, newChunks)
+ if replyerr != nil {
+ glog.V(0).Infof("merge chunks %s: %v", r.RequestURI, replyerr)
+ mergedChunks = newChunks
+ }
+
// maybe compact entry chunks
mergedChunks, replyerr = filer.MaybeManifestize(fs.saveAsChunk(so), mergedChunks)
if replyerr != nil {
diff --git a/weed/server/filer_server_handlers_write_merge.go b/weed/server/filer_server_handlers_write_merge.go
new file mode 100644
index 000000000..dadc6f726
--- /dev/null
+++ b/weed/server/filer_server_handlers_write_merge.go
@@ -0,0 +1,11 @@
+package weed_server
+
+import (
+ "github.com/chrislusf/seaweedfs/weed/operation"
+ "github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
+)
+
+func (fs *FilerServer) maybeMergeChunks(so *operation.StorageOption, inputChunks []*filer_pb.FileChunk) (mergedChunks []*filer_pb.FileChunk, err error) {
+ //TODO merge consecutive smaller chunks into a large chunk to reduce number of chunks
+ return inputChunks, nil
+}
diff --git a/weed/server/filer_server_handlers_write_upload.go b/weed/server/filer_server_handlers_write_upload.go
index 6ee378819..fe3346402 100644
--- a/weed/server/filer_server_handlers_write_upload.go
+++ b/weed/server/filer_server_handlers_write_upload.go
@@ -4,10 +4,10 @@ import (
"bytes"
"crypto/md5"
"fmt"
+ "golang.org/x/exp/slices"
"hash"
"io"
"net/http"
- "sort"
"strconv"
"strings"
"sync"
@@ -130,11 +130,9 @@ func (fs *FilerServer) uploadReaderToChunks(w http.ResponseWriter, r *http.Reque
fs.filer.DeleteChunks(fileChunks)
return nil, md5Hash, 0, uploadErr, nil
}
-
- sort.Slice(fileChunks, func(i, j int) bool {
- return fileChunks[i].Offset < fileChunks[j].Offset
+ slices.SortFunc(fileChunks, func(a, b *filer_pb.FileChunk) bool {
+ return a.Offset < b.Offset
})
-
return fileChunks, md5Hash, chunkOffset, nil, smallContent
}
diff --git a/weed/server/filer_ui/breadcrumb.go b/weed/server/filer_ui/breadcrumb.go
index 5016117a8..3201ff76c 100644
--- a/weed/server/filer_ui/breadcrumb.go
+++ b/weed/server/filer_ui/breadcrumb.go
@@ -15,8 +15,12 @@ func ToBreadcrumb(fullpath string) (crumbs []Breadcrumb) {
parts := strings.Split(fullpath, "/")
for i := 0; i < len(parts); i++ {
+ name := parts[i]
+ if name == "" {
+ name = "/"
+ }
crumb := Breadcrumb{
- Name: parts[i] + " /",
+ Name: name,
Link: "/" + util.Join(parts[0:i+1]...),
}
if !strings.HasSuffix(crumb.Link, "/") {
diff --git a/weed/server/filer_ui/filer.html b/weed/server/filer_ui/filer.html
index 6f57c25d8..785f82887 100644
--- a/weed/server/filer_ui/filer.html
+++ b/weed/server/filer_ui/filer.html
@@ -11,6 +11,7 @@
#drop-area {
border: 1px transparent;
+ margin-top: 5px;
}
#drop-area.highlight {
@@ -26,6 +27,12 @@
border-radius: 2px;
border: 1px solid #ccc;
float: right;
+ margin-left: 2px;
+ margin-bottom: 0;
+ }
+
+ label {
+ font-weight: normal;
}
.button:hover {
@@ -36,6 +43,38 @@
display: none;
}
+ td, th {
+ vertical-align: bottom;
+ }
+
+ .table-hover > tbody > tr:hover > * > div.operations {
+ display: block;
+ }
+
+ .table > tbody > tr {
+ height: 39px;
+ }
+
+ div.operations {
+ display: none;
+ }
+
+ .footer {
+ position: absolute;
+ bottom: 0px;
+ right: 5%;
+ min-width: 25%;
+ border-left: 1px solid #ccc;
+ border-right: 1px solid #ccc;
+ }
+
+ .add-files {
+ font-size: 46px;
+ text-align: center;
+ border: 1px dashed #999;
+ padding-bottom: 9px;
+ margin: 0 2px;
+ }
</style>
</head>
<body>
@@ -48,12 +87,21 @@
</div>
<div class="row">
<div>
+ <div class="btn-group btn-group-sm pull-right" role="group" style="margin-top:3px;">
+ <label class="btn btn-default" onclick="handleCreateDir()">
+ <span class="glyphicon glyphicon-plus" aria-hidden="true"></span> New Folder
+ </label>
+ <label class="btn btn-default" for="fileElem">
+ <span class="glyphicon glyphicon-cloud-upload" aria-hidden="true"></span> Upload
+ </label>
+ </div>
+ <ol class="breadcrumb">
{{ range $entry := .Breadcrumbs }}
- <a href="{{ printpath $entry.Link }}">
+ <li><a href="{{ printpath $entry.Link }}">
{{ $entry.Name }}
- </a>
+ </li></a>
{{ end }}
- <label class="button" for="fileElem">Upload</label>
+ </ol>
</div>
</div>
@@ -61,13 +109,18 @@
<form class="upload-form">
<input type="file" id="fileElem" multiple onchange="handleFiles(this.files)">
- <table width="90%">
+ {{if .EmptyFolder}}
+ <div class="row add-files">
+ +
+ </div>
+ {{else}}
+ <table width="100%" class="table table-hover">
{{$path := .Path }}
{{ range $entry_index, $entry := .Entries }}
<tr>
<td>
{{if $entry.IsDirectory}}
- <img src="/seaweedfsstatic/images/folder.gif" width="20" height="23">
+ <span class="glyphicon glyphicon-folder-open" aria-hidden="true"></span>&nbsp;
<a href="{{ printpath $path "/" $entry.Name "/"}}" >
{{ $entry.Name }}
</a>
@@ -89,13 +142,29 @@
{{ $entry.Size | humanizeBytes }}&nbsp;
{{end}}
</td>
- <td nowrap>
+ <td align="right" nowrap>
{{ $entry.Timestamp.Format "2006-01-02 15:04" }}
</td>
+ <td style="width:75px">
+ <div class="btn-group btn-group-xs pull-right operations" role="group">
+ <label class="btn" onclick="handleRename('{{ $entry.Name }}', '{{ printpath $path "/" }}')">
+ <span class="glyphicon glyphicon-edit" aria-hidden="true"></span>
+ </label>
+ {{if $entry.IsDirectory}}
+ <label class="btn" onclick="handleDelete('{{ printpath $path "/" $entry.Name "/" }}')">
+ <span class="glyphicon glyphicon-trash" aria-hidden="true"></span>
+ </label>
+ {{else}}
+ <label class="btn" onclick="handleDelete('{{ printpath $path "/" $entry.Name }}')">
+ <span class="glyphicon glyphicon-trash" aria-hidden="true"></span>
+ </label>
+ {{end}}
+ </div>
+ </td>
</tr>
{{ end }}
-
</table>
+ {{end}}
</form>
</div>
@@ -109,65 +178,177 @@
<br/>
<br/>
-
+ <div id="progress-area" class="footer" style="display: none;">
+ </div>
</div>
</body>
<script type="text/javascript">
// ************************ Drag and drop ***************** //
- let dropArea = document.getElementById("drop-area")
+ let dropArea = document.getElementById("drop-area");
+ let progressArea = document.getElementById("progress-area");
// Prevent default drag behaviors
;['dragenter', 'dragover', 'dragleave', 'drop'].forEach(eventName => {
- dropArea.addEventListener(eventName, preventDefaults, false)
- document.body.addEventListener(eventName, preventDefaults, false)
- })
+ dropArea.addEventListener(eventName, preventDefaults, false);
+ document.body.addEventListener(eventName, preventDefaults, false);
+ });
// Highlight drop area when item is dragged over it
;['dragenter', 'dragover'].forEach(eventName => {
- dropArea.addEventListener(eventName, highlight, false)
- })
+ dropArea.addEventListener(eventName, highlight, false);
+ });
;['dragleave', 'drop'].forEach(eventName => {
- dropArea.addEventListener(eventName, unhighlight, false)
- })
+ dropArea.addEventListener(eventName, unhighlight, false);
+ });
// Handle dropped files
- dropArea.addEventListener('drop', handleDrop, false)
+ dropArea.addEventListener('drop', handleDrop, false);
function preventDefaults(e) {
- e.preventDefault()
- e.stopPropagation()
+ e.preventDefault();
+ e.stopPropagation();
}
function highlight(e) {
- dropArea.classList.add('highlight')
+ dropArea.classList.add('highlight');
}
function unhighlight(e) {
- dropArea.classList.remove('highlight')
+ dropArea.classList.remove('highlight');
}
function handleDrop(e) {
- var dt = e.dataTransfer
- var files = dt.files
+ var dt = e.dataTransfer;
+ var files = dt.files;
- handleFiles(files)
+ handleFiles(files);
}
+ var uploadList = {};
+
function handleFiles(files) {
- files = [...files]
- files.forEach(uploadFile)
- window.location.reload()
+ files = [...files];
+ files.forEach(startUpload);
+ renderProgress();
+ files.forEach(uploadFile);
+ }
+
+ function startUpload(file, i) {
+ uploadList[file.name] = {'name': file.name, 'percent': 0, 'finish': false};
+ }
+
+ function renderProgress() {
+ var values = Object.values(uploadList);
+ var html = '<table class="table">\n<tr><th>Uploading</th><\/tr>\n';
+ for (let i of values) {
+ var progressBarClass = 'progress-bar-striped active';
+ if (i.percent >= 100) {
+ progressBarClass = 'progress-bar-success';
+ }
+ html += '<tr>\n<td>\n';
+ html += '<div class="progress" style="margin-bottom: 2px;">\n';
+ html += '<div class="progress-bar ' + progressBarClass + '" role="progressbar" aria-valuenow="' + '100" aria-valuemin="0" aria-valuemax="100" style="width:' + i.percent + '%;">';
+ html += '<span style="margin-right: 10px;">' + i.name + '</span>' + i.percent + '%<\/div>';
+ html += '<\/div>\n<\/td>\n<\/tr>\n';
+ }
+ html += '<\/table>\n';
+ progressArea.innerHTML = html;
+ if (values.length > 0) {
+ progressArea.attributes.style.value = '';
+ }
+ }
+
+ function reportProgress(file, percent) {
+ var item = uploadList[file]
+ item.percent = percent;
+ renderProgress();
+ }
+
+ function finishUpload(file) {
+ uploadList[file]['finish'] = true;
+ renderProgress();
+ var allFinish = true;
+ for (let i of Object.values(uploadList)) {
+ if (!i.finish) {
+ allFinish = false;
+ break;
+ }
+ }
+ if (allFinish) {
+ console.log('All Finish');
+ window.location.reload();
+ }
}
function uploadFile(file, i) {
- var url = window.location.href
- var xhr = new XMLHttpRequest()
- var formData = new FormData()
- xhr.open('POST', url, false)
+ var url = window.location.href;
+ var xhr = new XMLHttpRequest();
+ var fileName = file.name;
+ xhr.upload.addEventListener('progress', function(e) {
+ if (e.lengthComputable) {
+ var percent = Math.ceil((e.loaded / e.total) * 100);
+ reportProgress(fileName, percent)
+ }
+ });
+ xhr.upload.addEventListener('loadend', function(e) {
+ finishUpload(fileName);
+ });
+ var formData = new FormData();
+ xhr.open('POST', url, true);
+ formData.append('file', file);
+ xhr.send(formData);
+ }
+
+ function handleCreateDir() {
+ var dirName = prompt('Folder Name:', '');
+ dirName = dirName.trim();
+ if (dirName == null || dirName == '') {
+ return;
+ }
+ var baseUrl = window.location.href;
+ if (!baseUrl.endsWith('/')) {
+ baseUrl += '/';
+ }
+ var url = baseUrl + dirName;
+ if (!url.endsWith('/')) {
+ url += '/';
+ }
+ var xhr = new XMLHttpRequest();
+ xhr.open('POST', url, false);
+ xhr.setRequestHeader('Content-Type', '');
+ xhr.send();
+ window.location.reload();
+ }
+
+ function handleRename(originName, basePath) {
+ var newName = prompt('New Name:', originName);
+ if (newName == null || newName == '') {
+ return;
+ }
+ var url = basePath + newName;
+ var originPath = basePath + originName;
+ url += '?mv.from=' + originPath;
+ var xhr = new XMLHttpRequest();
+ xhr.open('POST', url, false);
+ xhr.setRequestHeader('Content-Type', '');
+ xhr.send();
+ window.location.reload();
+ }
+
+ function handleDelete(path) {
+ if (!confirm('Are you sure to delete ' + path + '?')) {
+ return;
+ }
+ var url = path;
+ if (url.endsWith('/')) {
+ url += '?recursive=true';
+ }
- formData.append('file', file)
- xhr.send(formData)
+ var xhr = new XMLHttpRequest();
+ xhr.open('DELETE', url, false);
+ xhr.send();
+ window.location.reload();
}
</script>
</html>
diff --git a/weed/server/master_grpc_server.go b/weed/server/master_grpc_server.go
index 50fcc0d62..83abdaaad 100644
--- a/weed/server/master_grpc_server.go
+++ b/weed/server/master_grpc_server.go
@@ -113,6 +113,9 @@ func (ms *MasterServer) SendHeartbeat(stream master_pb.Seaweed_SendHeartbeatServ
}
if len(heartbeat.Volumes) > 0 || heartbeat.HasNoVolumes {
+ dcName, rackName := ms.Topo.Configuration.Locate(heartbeat.Ip, heartbeat.DataCenter, heartbeat.Rack)
+ ms.Topo.DataNodeRegistration(dcName, rackName, dn)
+
// process heartbeat.Volumes
stats.MasterReceivedHeartbeatCounter.WithLabelValues("Volumes").Inc()
newVolumes, deletedVolumes := ms.Topo.SyncDataNodeRegistration(heartbeat.Volumes, dn)
@@ -133,13 +136,13 @@ func (ms *MasterServer) SendHeartbeat(stream master_pb.Seaweed_SendHeartbeatServ
ms.Topo.IncrementalSyncDataNodeEcShards(heartbeat.NewEcShards, heartbeat.DeletedEcShards, dn)
for _, s := range heartbeat.NewEcShards {
- message.NewVids = append(message.NewVids, s.Id)
+ message.NewEcVids = append(message.NewEcVids, s.Id)
}
for _, s := range heartbeat.DeletedEcShards {
- if dn.HasVolumesById(needle.VolumeId(s.Id)) {
+ if dn.HasEcShards(needle.VolumeId(s.Id)) {
continue
}
- message.DeletedVids = append(message.DeletedVids, s.Id)
+ message.DeletedEcVids = append(message.DeletedEcVids, s.Id)
}
}
@@ -151,17 +154,17 @@ func (ms *MasterServer) SendHeartbeat(stream master_pb.Seaweed_SendHeartbeatServ
// broadcast the ec vid changes to master clients
for _, s := range newShards {
- message.NewVids = append(message.NewVids, uint32(s.VolumeId))
+ message.NewEcVids = append(message.NewEcVids, uint32(s.VolumeId))
}
for _, s := range deletedShards {
if dn.HasVolumesById(s.VolumeId) {
continue
}
- message.DeletedVids = append(message.DeletedVids, uint32(s.VolumeId))
+ message.DeletedEcVids = append(message.DeletedEcVids, uint32(s.VolumeId))
}
}
- if len(message.NewVids) > 0 || len(message.DeletedVids) > 0 {
+ if len(message.NewVids) > 0 || len(message.DeletedVids) > 0 || len(message.NewEcVids) > 0 || len(message.DeletedEcVids) > 0 {
ms.broadcastToClients(&master_pb.KeepConnectedResponse{VolumeLocation: message})
}
diff --git a/weed/server/master_grpc_server_admin.go b/weed/server/master_grpc_server_admin.go
index 983606476..1f37e979a 100644
--- a/weed/server/master_grpc_server_admin.go
+++ b/weed/server/master_grpc_server_admin.go
@@ -3,7 +3,11 @@ package weed_server
import (
"context"
"fmt"
+ "github.com/chrislusf/seaweedfs/weed/cluster"
"github.com/chrislusf/seaweedfs/weed/glog"
+ "github.com/chrislusf/seaweedfs/weed/pb"
+ "github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
+ "github.com/chrislusf/seaweedfs/weed/pb/volume_server_pb"
"math/rand"
"sync"
"time"
@@ -142,3 +146,41 @@ func (ms *MasterServer) ReleaseAdminToken(ctx context.Context, req *master_pb.Re
}
return resp, nil
}
+
+func (ms *MasterServer) Ping(ctx context.Context, req *master_pb.PingRequest) (resp *master_pb.PingResponse, pingErr error) {
+ resp = &master_pb.PingResponse{
+ StartTimeNs: time.Now().UnixNano(),
+ }
+ if req.TargetType == cluster.FilerType {
+ pingErr = pb.WithFilerClient(false, pb.ServerAddress(req.Target), ms.grpcDialOption, func(client filer_pb.SeaweedFilerClient) error {
+ pingResp, err := client.Ping(ctx, &filer_pb.PingRequest{})
+ if pingResp != nil {
+ resp.RemoteTimeNs = pingResp.StartTimeNs
+ }
+ return err
+ })
+ }
+ if req.TargetType == cluster.VolumeServerType {
+ pingErr = pb.WithVolumeServerClient(false, pb.ServerAddress(req.Target), ms.grpcDialOption, func(client volume_server_pb.VolumeServerClient) error {
+ pingResp, err := client.Ping(ctx, &volume_server_pb.PingRequest{})
+ if pingResp != nil {
+ resp.RemoteTimeNs = pingResp.StartTimeNs
+ }
+ return err
+ })
+ }
+ if req.TargetType == cluster.MasterType {
+ pingErr = pb.WithMasterClient(false, pb.ServerAddress(req.Target), ms.grpcDialOption, func(client master_pb.SeaweedClient) error {
+ pingResp, err := client.Ping(ctx, &master_pb.PingRequest{})
+ if pingResp != nil {
+ resp.RemoteTimeNs = pingResp.StartTimeNs
+ }
+ return err
+ })
+ }
+ if pingErr != nil {
+ pingErr = fmt.Errorf("ping %s %s: %v", req.TargetType, req.Target, pingErr)
+ }
+ resp.StopTimeNs = time.Now().UnixNano()
+ return
+}
diff --git a/weed/server/master_grpc_server_raft.go b/weed/server/master_grpc_server_raft.go
new file mode 100644
index 000000000..37491b3df
--- /dev/null
+++ b/weed/server/master_grpc_server_raft.go
@@ -0,0 +1,66 @@
+package weed_server
+
+import (
+ "context"
+ "fmt"
+ "github.com/chrislusf/seaweedfs/weed/cluster"
+ "github.com/chrislusf/seaweedfs/weed/pb/master_pb"
+ "github.com/hashicorp/raft"
+)
+
+func (ms *MasterServer) RaftListClusterServers(ctx context.Context, req *master_pb.RaftListClusterServersRequest) (*master_pb.RaftListClusterServersResponse, error) {
+ resp := &master_pb.RaftListClusterServersResponse{}
+
+ servers := ms.Topo.HashicorpRaft.GetConfiguration().Configuration().Servers
+
+ for _, server := range servers {
+ resp.ClusterServers = append(resp.ClusterServers, &master_pb.RaftListClusterServersResponse_ClusterServers{
+ Id: string(server.ID),
+ Address: string(server.Address),
+ Suffrage: server.Suffrage.String(),
+ })
+ }
+ return resp, nil
+}
+
+func (ms *MasterServer) RaftAddServer(ctx context.Context, req *master_pb.RaftAddServerRequest) (*master_pb.RaftAddServerResponse, error) {
+ resp := &master_pb.RaftAddServerResponse{}
+ if ms.Topo.HashicorpRaft.State() != raft.Leader {
+ return nil, fmt.Errorf("raft add server %s failed: %s is no current leader", req.Id, ms.Topo.HashicorpRaft.String())
+ }
+
+ var idxFuture raft.IndexFuture
+ if req.Voter {
+ idxFuture = ms.Topo.HashicorpRaft.AddVoter(raft.ServerID(req.Id), raft.ServerAddress(req.Address), 0, 0)
+ } else {
+ idxFuture = ms.Topo.HashicorpRaft.AddNonvoter(raft.ServerID(req.Id), raft.ServerAddress(req.Address), 0, 0)
+ }
+
+ if err := idxFuture.Error(); err != nil {
+ return nil, err
+ }
+ return resp, nil
+}
+
+func (ms *MasterServer) RaftRemoveServer(ctx context.Context, req *master_pb.RaftRemoveServerRequest) (*master_pb.RaftRemoveServerResponse, error) {
+ resp := &master_pb.RaftRemoveServerResponse{}
+
+ if ms.Topo.HashicorpRaft.State() != raft.Leader {
+ return nil, fmt.Errorf("raft remove server %s failed: %s is no current leader", req.Id, ms.Topo.HashicorpRaft.String())
+ }
+
+ if !req.Force {
+ ms.clientChansLock.RLock()
+ _, ok := ms.clientChans[fmt.Sprintf("%s@%s", cluster.MasterType, req.Id)]
+ ms.clientChansLock.RUnlock()
+ if ok {
+ return resp, fmt.Errorf("raft remove server %s failed: client connection to master exists", req.Id)
+ }
+ }
+
+ idxFuture := ms.Topo.HashicorpRaft.RemoveServer(raft.ServerID(req.Id), 0, 0)
+ if err := idxFuture.Error(); err != nil {
+ return nil, err
+ }
+ return resp, nil
+}
diff --git a/weed/server/master_grpc_server_volume.go b/weed/server/master_grpc_server_volume.go
index 9389bceb8..bc92dd332 100644
--- a/weed/server/master_grpc_server_volume.go
+++ b/weed/server/master_grpc_server_volume.go
@@ -268,7 +268,7 @@ func (ms *MasterServer) VacuumVolume(ctx context.Context, req *master_pb.VacuumV
resp := &master_pb.VacuumVolumeResponse{}
- ms.Topo.Vacuum(ms.grpcDialOption, float64(req.GarbageThreshold), ms.preallocateSize)
+ ms.Topo.Vacuum(ms.grpcDialOption, float64(req.GarbageThreshold), req.VolumeId, req.Collection, ms.preallocateSize)
return resp, nil
}
diff --git a/weed/server/master_server.go b/weed/server/master_server.go
index 671432d5c..9f29d4ba7 100644
--- a/weed/server/master_server.go
+++ b/weed/server/master_server.go
@@ -1,6 +1,7 @@
package weed_server
import (
+ "context"
"fmt"
"github.com/chrislusf/seaweedfs/weed/stats"
"net/http"
@@ -17,6 +18,7 @@ import (
"github.com/chrislusf/raft"
"github.com/gorilla/mux"
+ hashicorpRaft "github.com/hashicorp/raft"
"google.golang.org/grpc"
"github.com/chrislusf/seaweedfs/weed/glog"
@@ -30,8 +32,9 @@ import (
)
const (
- SequencerType = "master.sequencer.type"
- SequencerSnowflakeId = "master.sequencer.sequencer_snowflake_id"
+ SequencerType = "master.sequencer.type"
+ SequencerSnowflakeId = "master.sequencer.sequencer_snowflake_id"
+ RaftServerRemovalTime = 72 * time.Minute
)
type MasterOption struct {
@@ -62,6 +65,9 @@ type MasterServer struct {
boundedLeaderChan chan int
+ onPeerUpdatDoneCn chan string
+ onPeerUpdatDoneCnExist bool
+
// notifying clients
clientChansLock sync.RWMutex
clientChans map[string]chan *master_pb.KeepConnectedResponse
@@ -75,7 +81,7 @@ type MasterServer struct {
Cluster *cluster.Cluster
}
-func NewMasterServer(r *mux.Router, option *MasterOption, peers []pb.ServerAddress) *MasterServer {
+func NewMasterServer(r *mux.Router, option *MasterOption, peers map[string]pb.ServerAddress) *MasterServer {
v := util.GetViper()
signingKey := v.GetString("jwt.signing.key")
@@ -112,6 +118,9 @@ func NewMasterServer(r *mux.Router, option *MasterOption, peers []pb.ServerAddre
Cluster: cluster.NewCluster(),
}
ms.boundedLeaderChan = make(chan int, 16)
+ ms.onPeerUpdatDoneCn = make(chan string)
+
+ ms.MasterClient.OnPeerUpdate = ms.OnPeerUpdate
seq := ms.createSequencer(option)
if nil == seq {
@@ -160,19 +169,41 @@ func NewMasterServer(r *mux.Router, option *MasterOption, peers []pb.ServerAddre
}
func (ms *MasterServer) SetRaftServer(raftServer *RaftServer) {
- ms.Topo.RaftServer = raftServer.raftServer
- ms.Topo.RaftServer.AddEventListener(raft.LeaderChangeEventType, func(e raft.Event) {
- glog.V(0).Infof("leader change event: %+v => %+v", e.PrevValue(), e.Value())
- stats.MasterLeaderChangeCounter.WithLabelValues(fmt.Sprintf("%+v", e.Value())).Inc()
- if ms.Topo.RaftServer.Leader() != "" {
- glog.V(0).Infoln("[", ms.Topo.RaftServer.Name(), "]", ms.Topo.RaftServer.Leader(), "becomes leader.")
- }
- })
+ var raftServerName string
+ if raftServer.raftServer != nil {
+ ms.Topo.RaftServer = raftServer.raftServer
+ ms.Topo.RaftServer.AddEventListener(raft.LeaderChangeEventType, func(e raft.Event) {
+ glog.V(0).Infof("leader change event: %+v => %+v", e.PrevValue(), e.Value())
+ stats.MasterLeaderChangeCounter.WithLabelValues(fmt.Sprintf("%+v", e.Value())).Inc()
+ if ms.Topo.RaftServer.Leader() != "" {
+ glog.V(0).Infoln("[", ms.Topo.RaftServer.Name(), "]", ms.Topo.RaftServer.Leader(), "becomes leader.")
+ }
+ })
+ raftServerName = ms.Topo.RaftServer.Name()
+ } else if raftServer.RaftHashicorp != nil {
+ ms.Topo.HashicorpRaft = raftServer.RaftHashicorp
+ leaderCh := raftServer.RaftHashicorp.LeaderCh()
+ prevLeader := ms.Topo.HashicorpRaft.Leader()
+ go func() {
+ for {
+ select {
+ case isLeader := <-leaderCh:
+ leader := ms.Topo.HashicorpRaft.Leader()
+ glog.V(0).Infof("is leader %+v change event: %+v => %+v", isLeader, prevLeader, leader)
+ stats.MasterLeaderChangeCounter.WithLabelValues(fmt.Sprintf("%+v", leader)).Inc()
+ prevLeader = leader
+ }
+ }
+ }()
+ raftServerName = ms.Topo.HashicorpRaft.String()
+ }
if ms.Topo.IsLeader() {
- glog.V(0).Infoln("[", ms.Topo.RaftServer.Name(), "]", "I am the leader!")
+ glog.V(0).Infoln("[", raftServerName, "]", "I am the leader!")
} else {
- if ms.Topo.RaftServer.Leader() != "" {
+ if ms.Topo.RaftServer != nil && ms.Topo.RaftServer.Leader() != "" {
glog.V(0).Infoln("[", ms.Topo.RaftServer.Name(), "]", ms.Topo.RaftServer.Leader(), "is the leader.")
+ } else if ms.Topo.HashicorpRaft != nil && ms.Topo.HashicorpRaft.Leader() != "" {
+ glog.V(0).Infoln("[", ms.Topo.HashicorpRaft.String(), "]", ms.Topo.HashicorpRaft.Leader(), "is the leader.")
}
}
}
@@ -181,31 +212,38 @@ func (ms *MasterServer) proxyToLeader(f http.HandlerFunc) http.HandlerFunc {
return func(w http.ResponseWriter, r *http.Request) {
if ms.Topo.IsLeader() {
f(w, r)
- } else if ms.Topo.RaftServer != nil && ms.Topo.RaftServer.Leader() != "" {
- ms.boundedLeaderChan <- 1
- defer func() { <-ms.boundedLeaderChan }()
- targetUrl, err := url.Parse("http://" + ms.Topo.RaftServer.Leader())
- if err != nil {
- writeJsonError(w, r, http.StatusInternalServerError,
- fmt.Errorf("Leader URL http://%s Parse Error: %v", ms.Topo.RaftServer.Leader(), err))
- return
- }
- glog.V(4).Infoln("proxying to leader", ms.Topo.RaftServer.Leader())
- proxy := httputil.NewSingleHostReverseProxy(targetUrl)
- director := proxy.Director
- proxy.Director = func(req *http.Request) {
- actualHost, err := security.GetActualRemoteHost(req)
- if err == nil {
- req.Header.Set("HTTP_X_FORWARDED_FOR", actualHost)
- }
- director(req)
- }
- proxy.Transport = util.Transport
- proxy.ServeHTTP(w, r)
- } else {
- // handle requests locally
+ return
+ }
+ var raftServerLeader string
+ if ms.Topo.RaftServer != nil && ms.Topo.RaftServer.Leader() != "" {
+ raftServerLeader = ms.Topo.RaftServer.Leader()
+ } else if ms.Topo.HashicorpRaft != nil && ms.Topo.HashicorpRaft.Leader() != "" {
+ raftServerLeader = string(ms.Topo.HashicorpRaft.Leader())
+ }
+ if raftServerLeader == "" {
f(w, r)
+ return
+ }
+ ms.boundedLeaderChan <- 1
+ defer func() { <-ms.boundedLeaderChan }()
+ targetUrl, err := url.Parse("http://" + raftServerLeader)
+ if err != nil {
+ writeJsonError(w, r, http.StatusInternalServerError,
+ fmt.Errorf("Leader URL http://%s Parse Error: %v", raftServerLeader, err))
+ return
+ }
+ glog.V(4).Infoln("proxying to leader", raftServerLeader)
+ proxy := httputil.NewSingleHostReverseProxy(targetUrl)
+ director := proxy.Director
+ proxy.Director = func(req *http.Request) {
+ actualHost, err := security.GetActualRemoteHost(req)
+ if err == nil {
+ req.Header.Set("HTTP_X_FORWARDED_FOR", actualHost)
+ }
+ director(req)
}
+ proxy.Transport = util.Transport
+ proxy.ServeHTTP(w, r)
}
}
@@ -301,3 +339,57 @@ func (ms *MasterServer) createSequencer(option *MasterOption) sequence.Sequencer
}
return seq
}
+
+func (ms *MasterServer) OnPeerUpdate(update *master_pb.ClusterNodeUpdate) {
+ if update.NodeType != cluster.MasterType || ms.Topo.HashicorpRaft == nil {
+ return
+ }
+ glog.V(4).Infof("OnPeerUpdate: %+v", update)
+
+ peerAddress := pb.ServerAddress(update.Address)
+ peerName := string(peerAddress)
+ isLeader := ms.Topo.HashicorpRaft.State() == hashicorpRaft.Leader
+ if update.IsAdd {
+ if isLeader {
+ raftServerFound := false
+ for _, server := range ms.Topo.HashicorpRaft.GetConfiguration().Configuration().Servers {
+ if string(server.ID) == peerName {
+ raftServerFound = true
+ }
+ }
+ if !raftServerFound {
+ glog.V(0).Infof("adding new raft server: %s", peerName)
+ ms.Topo.HashicorpRaft.AddVoter(
+ hashicorpRaft.ServerID(peerName),
+ hashicorpRaft.ServerAddress(peerAddress.ToGrpcAddress()), 0, 0)
+ }
+ }
+ if ms.onPeerUpdatDoneCnExist {
+ ms.onPeerUpdatDoneCn <- peerName
+ }
+ } else if isLeader {
+ go func(peerName string) {
+ for {
+ select {
+ case <-time.After(RaftServerRemovalTime):
+ err := ms.MasterClient.WithClient(false, func(client master_pb.SeaweedClient) error {
+ _, err := client.RaftRemoveServer(context.Background(), &master_pb.RaftRemoveServerRequest{
+ Id: peerName,
+ Force: false,
+ })
+ return err
+ })
+ if err != nil {
+ glog.Warningf("failed to removing old raft server %s: %v", peerName, err)
+ }
+ return
+ case peerDone := <-ms.onPeerUpdatDoneCn:
+ if peerName == peerDone {
+ return
+ }
+ }
+ }
+ }(peerName)
+ ms.onPeerUpdatDoneCnExist = true
+ }
+}
diff --git a/weed/server/master_server_handlers_admin.go b/weed/server/master_server_handlers_admin.go
index 72d4e20d7..ade750ccc 100644
--- a/weed/server/master_server_handlers_admin.go
+++ b/weed/server/master_server_handlers_admin.go
@@ -64,7 +64,7 @@ func (ms *MasterServer) volumeVacuumHandler(w http.ResponseWriter, r *http.Reque
}
}
// glog.Infoln("garbageThreshold =", gcThreshold)
- ms.Topo.Vacuum(ms.grpcDialOption, gcThreshold, ms.preallocateSize)
+ ms.Topo.Vacuum(ms.grpcDialOption, gcThreshold, 0, "", ms.preallocateSize)
ms.dirStatusHandler(w, r)
}
diff --git a/weed/server/master_server_handlers_ui.go b/weed/server/master_server_handlers_ui.go
index 015bfbd00..d8260d8d2 100644
--- a/weed/server/master_server_handlers_ui.go
+++ b/weed/server/master_server_handlers_ui.go
@@ -5,6 +5,8 @@ import (
"time"
"github.com/chrislusf/raft"
+ hashicorpRaft "github.com/hashicorp/raft"
+
ui "github.com/chrislusf/seaweedfs/weed/server/master_ui"
"github.com/chrislusf/seaweedfs/weed/stats"
"github.com/chrislusf/seaweedfs/weed/util"
@@ -13,20 +15,40 @@ import (
func (ms *MasterServer) uiStatusHandler(w http.ResponseWriter, r *http.Request) {
infos := make(map[string]interface{})
infos["Up Time"] = time.Now().Sub(startTime).String()
- args := struct {
- Version string
- Topology interface{}
- RaftServer raft.Server
- Stats map[string]interface{}
- Counters *stats.ServerStats
- VolumeSizeLimitMB uint32
- }{
- util.Version(),
- ms.Topo.ToMap(),
- ms.Topo.RaftServer,
- infos,
- serverStats,
- ms.option.VolumeSizeLimitMB,
+ infos["Max Volume Id"] = ms.Topo.GetMaxVolumeId()
+ if ms.Topo.RaftServer != nil {
+ args := struct {
+ Version string
+ Topology interface{}
+ RaftServer raft.Server
+ Stats map[string]interface{}
+ Counters *stats.ServerStats
+ VolumeSizeLimitMB uint32
+ }{
+ util.Version(),
+ ms.Topo.ToMap(),
+ ms.Topo.RaftServer,
+ infos,
+ serverStats,
+ ms.option.VolumeSizeLimitMB,
+ }
+ ui.StatusTpl.Execute(w, args)
+ } else if ms.Topo.HashicorpRaft != nil {
+ args := struct {
+ Version string
+ Topology interface{}
+ RaftServer *hashicorpRaft.Raft
+ Stats map[string]interface{}
+ Counters *stats.ServerStats
+ VolumeSizeLimitMB uint32
+ }{
+ util.Version(),
+ ms.Topo.ToMap(),
+ ms.Topo.HashicorpRaft,
+ infos,
+ serverStats,
+ ms.option.VolumeSizeLimitMB,
+ }
+ ui.StatusNewRaftTpl.Execute(w, args)
}
- ui.StatusTpl.Execute(w, args)
}
diff --git a/weed/server/master_ui/masterNewRaft.html b/weed/server/master_ui/masterNewRaft.html
new file mode 100644
index 000000000..32afdceac
--- /dev/null
+++ b/weed/server/master_ui/masterNewRaft.html
@@ -0,0 +1,121 @@
+<!DOCTYPE html>
+<html>
+<head>
+ <title>SeaweedFS {{ .Version }}</title>
+ <link rel="stylesheet" href="/seaweedfsstatic/bootstrap/3.3.1/css/bootstrap.min.css">
+</head>
+<body>
+<div class="container">
+ <div class="page-header">
+ <h1>
+ <a href="https://github.com/chrislusf/seaweedfs"><img src="/seaweedfsstatic/seaweed50x50.png"></img></a>
+ SeaweedFS <small>{{ .Version }}</small>
+ </h1>
+ </div>
+
+ <div class="row">
+ <div class="col-sm-6">
+ <h2>Cluster status</h2>
+ <table class="table table-condensed table-striped">
+ <tbody>
+ <tr>
+ <th>Volume Size Limit</th>
+ <td>{{ .VolumeSizeLimitMB }}MB</td>
+ </tr>
+ <tr>
+ <th>Free</th>
+ <td>{{ .Topology.Free }}</td>
+ </tr>
+ <tr>
+ <th>Max</th>
+ <td>{{ .Topology.Max }}</td>
+ </tr>
+ {{ with .RaftServer }}
+ <tr>
+ <th>Leader</th>
+ <td><a href="http://{{ .Leader }}">{{ .Leader }}</a></td>
+ </tr>
+ <tr>
+ <th>Other Masters</th>
+ <td class="col-sm-5">
+ <ul class="list-unstyled">
+ {{ range $k, $p := .GetConfiguration.Configuration.Servers }}
+ <li><a href="http://{{ $p.ID }}/ui/index.html">{{ $p.ID }}</a></li>
+ {{ end }}
+ </ul>
+ </td>
+ </tr>
+ {{ end }}
+ </tbody>
+ </table>
+ </div>
+
+ <div class="col-sm-6">
+ <h2>System Stats</h2>
+ <table class="table table-condensed table-striped">
+ <tr>
+ <th>Concurrent Connections</th>
+ <td>{{ .Counters.Connections.WeekCounter.Sum }}</td>
+ </tr>
+ {{ range $key, $val := .Stats }}
+ <tr>
+ <th>{{ $key }}</th>
+ <td>{{ $val }}</td>
+ </tr>
+ {{ end }}
+ </table>
+ <h2>Raft Stats</h2>
+ <table class="table table-condensed table-striped">
+ <tr>
+ <th>applied_index</th>
+ <td>{{ .RaftServer.Stats.applied_index }}</td>
+ </tr>
+ <tr>
+ <th>last_log_term</th>
+ <td>{{ .RaftServer.Stats.last_log_term }}</td>
+ </tr>
+ </table>
+ </div>
+ </div>
+
+ <div class="row">
+ <h2>Topology</h2>
+ <table class="table table-striped">
+ <thead>
+ <tr>
+ <th>Data Center</th>
+ <th>Rack</th>
+ <th>RemoteAddr</th>
+ <th>#Volumes</th>
+ <th>Volume Ids</th>
+ <th>#ErasureCodingShards</th>
+ <th>Max</th>
+ </tr>
+ </thead>
+ <tbody>
+ {{ range $dc_index, $dc := .Topology.DataCenters }}
+ {{ range $rack_index, $rack := $dc.Racks }}
+ {{ range $dn_index, $dn := $rack.DataNodes }}
+ <tr>
+ <td><code>{{ $dc.Id }}</code></td>
+ <td>{{ $rack.Id }}</td>
+ <td><a href="http://{{ $dn.Url }}/ui/index.html">{{ $dn.Url }}</a>
+ {{ if ne $dn.PublicUrl $dn.Url }}
+ / <a href="http://{{ $dn.PublicUrl }}/ui/index.html">{{ $dn.PublicUrl }}</a>
+ {{ end }}
+ </td>
+ <td>{{ $dn.Volumes }}</td>
+ <td>{{ $dn.VolumeIds}}</td>
+ <td>{{ $dn.EcShards }}</td>
+ <td>{{ $dn.Max }}</td>
+ </tr>
+ {{ end }}
+ {{ end }}
+ {{ end }}
+ </tbody>
+ </table>
+ </div>
+
+</div>
+</body>
+</html>
diff --git a/weed/server/master_ui/templates.go b/weed/server/master_ui/templates.go
index 415022b97..a6dcc57d7 100644
--- a/weed/server/master_ui/templates.go
+++ b/weed/server/master_ui/templates.go
@@ -8,4 +8,8 @@ import (
//go:embed master.html
var masterHtml string
+//go:embed masterNewRaft.html
+var masterNewRaftHtml string
+
var StatusTpl = template.Must(template.New("status").Parse(masterHtml))
+var StatusNewRaftTpl = template.Must(template.New("status").Parse(masterNewRaftHtml))
diff --git a/weed/server/raft_hashicorp.go b/weed/server/raft_hashicorp.go
new file mode 100644
index 000000000..cc6578bf5
--- /dev/null
+++ b/weed/server/raft_hashicorp.go
@@ -0,0 +1,183 @@
+package weed_server
+
+// https://yusufs.medium.com/creating-distributed-kv-database-by-implementing-raft-consensus-using-golang-d0884eef2e28
+// https://github.com/Jille/raft-grpc-example/blob/cd5bcab0218f008e044fbeee4facdd01b06018ad/application.go#L18
+
+import (
+ "fmt"
+ transport "github.com/Jille/raft-grpc-transport"
+ "github.com/chrislusf/seaweedfs/weed/glog"
+ "github.com/chrislusf/seaweedfs/weed/pb"
+ "github.com/hashicorp/raft"
+ boltdb "github.com/hashicorp/raft-boltdb"
+ "google.golang.org/grpc"
+ "math/rand"
+ "os"
+ "path"
+ "path/filepath"
+ "sort"
+ "strings"
+ "time"
+)
+
+const (
+ ldbFile = "logs.dat"
+ sdbFile = "stable.dat"
+ updatePeersTimeout = 15 * time.Minute
+)
+
+func getPeerIdx(self pb.ServerAddress, mapPeers map[string]pb.ServerAddress) int {
+ peers := make([]pb.ServerAddress, 0, len(mapPeers))
+ for _, peer := range mapPeers {
+ peers = append(peers, peer)
+ }
+ sort.Slice(peers, func(i, j int) bool {
+ return strings.Compare(string(peers[i]), string(peers[j])) < 0
+ })
+ for i, peer := range peers {
+ if string(peer) == string(self) {
+ return i
+ }
+ }
+ return -1
+}
+
+func (s *RaftServer) AddPeersConfiguration() (cfg raft.Configuration) {
+ for _, peer := range s.peers {
+ cfg.Servers = append(cfg.Servers, raft.Server{
+ Suffrage: raft.Voter,
+ ID: raft.ServerID(peer),
+ Address: raft.ServerAddress(peer.ToGrpcAddress()),
+ })
+ }
+ return cfg
+}
+
+func (s *RaftServer) UpdatePeers() {
+ for {
+ select {
+ case isLeader := <-s.RaftHashicorp.LeaderCh():
+ if isLeader {
+ peerLeader := string(s.serverAddr)
+ existsPeerName := make(map[string]bool)
+ for _, server := range s.RaftHashicorp.GetConfiguration().Configuration().Servers {
+ if string(server.ID) == peerLeader {
+ continue
+ }
+ existsPeerName[string(server.ID)] = true
+ }
+ for _, peer := range s.peers {
+ peerName := string(peer)
+ if peerName == peerLeader || existsPeerName[peerName] {
+ continue
+ }
+ glog.V(0).Infof("adding new peer: %s", peerName)
+ s.RaftHashicorp.AddVoter(
+ raft.ServerID(peerName), raft.ServerAddress(peer.ToGrpcAddress()), 0, 0)
+ }
+ for peer, _ := range existsPeerName {
+ if _, found := s.peers[peer]; !found {
+ glog.V(0).Infof("removing old peer: %s", peer)
+ s.RaftHashicorp.RemoveServer(raft.ServerID(peer), 0, 0)
+ }
+ }
+ if _, found := s.peers[peerLeader]; !found {
+ glog.V(0).Infof("removing old leader peer: %s", peerLeader)
+ s.RaftHashicorp.RemoveServer(raft.ServerID(peerLeader), 0, 0)
+ }
+ }
+ return
+ case <-time.After(updatePeersTimeout):
+ return
+ }
+ }
+}
+
+func NewHashicorpRaftServer(option *RaftServerOption) (*RaftServer, error) {
+ s := &RaftServer{
+ peers: option.Peers,
+ serverAddr: option.ServerAddr,
+ dataDir: option.DataDir,
+ topo: option.Topo,
+ }
+
+ c := raft.DefaultConfig()
+ c.LocalID = raft.ServerID(s.serverAddr) // TODO maybee the IP:port address will change
+ c.HeartbeatTimeout = time.Duration(float64(option.HeartbeatInterval) * (rand.Float64()*0.25 + 1))
+ c.ElectionTimeout = option.ElectionTimeout
+ if c.LeaderLeaseTimeout > c.HeartbeatTimeout {
+ c.LeaderLeaseTimeout = c.HeartbeatTimeout
+ }
+ if glog.V(4) {
+ c.LogLevel = "Debug"
+ } else if glog.V(2) {
+ c.LogLevel = "Info"
+ } else if glog.V(1) {
+ c.LogLevel = "Warn"
+ } else if glog.V(0) {
+ c.LogLevel = "Error"
+ }
+
+ if option.RaftBootstrap {
+ os.RemoveAll(path.Join(s.dataDir, ldbFile))
+ os.RemoveAll(path.Join(s.dataDir, sdbFile))
+ os.RemoveAll(path.Join(s.dataDir, "snapshot"))
+ }
+ baseDir := s.dataDir
+
+ ldb, err := boltdb.NewBoltStore(filepath.Join(baseDir, ldbFile))
+ if err != nil {
+ return nil, fmt.Errorf(`boltdb.NewBoltStore(%q): %v`, filepath.Join(baseDir, "logs.dat"), err)
+ }
+
+ sdb, err := boltdb.NewBoltStore(filepath.Join(baseDir, sdbFile))
+ if err != nil {
+ return nil, fmt.Errorf(`boltdb.NewBoltStore(%q): %v`, filepath.Join(baseDir, "stable.dat"), err)
+ }
+
+ fss, err := raft.NewFileSnapshotStore(baseDir, 3, os.Stderr)
+ if err != nil {
+ return nil, fmt.Errorf(`raft.NewFileSnapshotStore(%q, ...): %v`, baseDir, err)
+ }
+
+ s.TransportManager = transport.New(raft.ServerAddress(s.serverAddr), []grpc.DialOption{option.GrpcDialOption})
+
+ stateMachine := StateMachine{topo: option.Topo}
+ s.RaftHashicorp, err = raft.NewRaft(c, &stateMachine, ldb, sdb, fss, s.TransportManager.Transport())
+ if err != nil {
+ return nil, fmt.Errorf("raft.NewRaft: %v", err)
+ }
+ if option.RaftBootstrap || len(s.RaftHashicorp.GetConfiguration().Configuration().Servers) == 0 {
+ cfg := s.AddPeersConfiguration()
+ // Need to get lock, in case all servers do this at the same time.
+ peerIdx := getPeerIdx(s.serverAddr, s.peers)
+ timeSpeep := time.Duration(float64(c.LeaderLeaseTimeout) * (rand.Float64()*0.25 + 1) * float64(peerIdx))
+ glog.V(0).Infof("Bootstrapping idx: %d sleep: %v new cluster: %+v", peerIdx, timeSpeep, cfg)
+ time.Sleep(timeSpeep)
+ f := s.RaftHashicorp.BootstrapCluster(cfg)
+ if err := f.Error(); err != nil {
+ return nil, fmt.Errorf("raft.Raft.BootstrapCluster: %v", err)
+ }
+ } else {
+ go s.UpdatePeers()
+ }
+
+ ticker := time.NewTicker(c.HeartbeatTimeout * 10)
+ if glog.V(4) {
+ go func() {
+ for {
+ select {
+ case <-ticker.C:
+ cfuture := s.RaftHashicorp.GetConfiguration()
+ if err = cfuture.Error(); err != nil {
+ glog.Fatalf("error getting config: %s", err)
+ }
+ configuration := cfuture.Configuration()
+ glog.V(4).Infof("Showing peers known by %s:\n%+v", s.RaftHashicorp.String(), configuration.Servers)
+ }
+ }
+ }()
+ }
+
+ return s, nil
+}
diff --git a/weed/server/raft_server.go b/weed/server/raft_server.go
index f22b7c45d..8c372f0cc 100644
--- a/weed/server/raft_server.go
+++ b/weed/server/raft_server.go
@@ -2,11 +2,12 @@ package weed_server
import (
"encoding/json"
+ transport "github.com/Jille/raft-grpc-transport"
+ "io"
+ "io/ioutil"
"math/rand"
"os"
"path"
- "sort"
- "strings"
"time"
"google.golang.org/grpc"
@@ -14,6 +15,7 @@ import (
"github.com/chrislusf/seaweedfs/weed/pb"
"github.com/chrislusf/raft"
+ hashicorpRaft "github.com/hashicorp/raft"
"github.com/chrislusf/seaweedfs/weed/glog"
"github.com/chrislusf/seaweedfs/weed/topology"
@@ -21,21 +23,24 @@ import (
type RaftServerOption struct {
GrpcDialOption grpc.DialOption
- Peers []pb.ServerAddress
+ Peers map[string]pb.ServerAddress
ServerAddr pb.ServerAddress
DataDir string
Topo *topology.Topology
RaftResumeState bool
HeartbeatInterval time.Duration
ElectionTimeout time.Duration
+ RaftBootstrap bool
}
type RaftServer struct {
- peers []pb.ServerAddress // initial peers to join with
- raftServer raft.Server
- dataDir string
- serverAddr pb.ServerAddress
- topo *topology.Topology
+ peers map[string]pb.ServerAddress // initial peers to join with
+ raftServer raft.Server
+ RaftHashicorp *hashicorpRaft.Raft
+ TransportManager *transport.Manager
+ dataDir string
+ serverAddr pb.ServerAddress
+ topo *topology.Topology
*raft.GrpcServer
}
@@ -44,6 +49,8 @@ type StateMachine struct {
topo *topology.Topology
}
+var _ hashicorpRaft.FSM = &StateMachine{}
+
func (s StateMachine) Save() ([]byte, error) {
state := topology.MaxVolumeIdCommand{
MaxVolumeId: s.topo.GetMaxVolumeId(),
@@ -63,6 +70,36 @@ func (s StateMachine) Recovery(data []byte) error {
return nil
}
+func (s *StateMachine) Apply(l *hashicorpRaft.Log) interface{} {
+ before := s.topo.GetMaxVolumeId()
+ state := topology.MaxVolumeIdCommand{}
+ err := json.Unmarshal(l.Data, &state)
+ if err != nil {
+ return err
+ }
+ s.topo.UpAdjustMaxVolumeId(state.MaxVolumeId)
+
+ glog.V(1).Infoln("max volume id", before, "==>", s.topo.GetMaxVolumeId())
+ return nil
+}
+
+func (s *StateMachine) Snapshot() (hashicorpRaft.FSMSnapshot, error) {
+ return &topology.MaxVolumeIdCommand{
+ MaxVolumeId: s.topo.GetMaxVolumeId(),
+ }, nil
+}
+
+func (s *StateMachine) Restore(r io.ReadCloser) error {
+ b, err := ioutil.ReadAll(r)
+ if err != nil {
+ return err
+ }
+ if err := s.Recovery(b); err != nil {
+ return err
+ }
+ return nil
+}
+
func NewRaftServer(option *RaftServerOption) (*RaftServer, error) {
s := &RaftServer{
peers: option.Peers,
@@ -88,7 +125,7 @@ func NewRaftServer(option *RaftServerOption) (*RaftServer, error) {
os.RemoveAll(path.Join(s.dataDir, "conf"))
os.RemoveAll(path.Join(s.dataDir, "snapshot"))
}
- if err := os.MkdirAll(path.Join(s.dataDir, "snapshot"), 0600); err != nil {
+ if err := os.MkdirAll(path.Join(s.dataDir, "snapshot"), 0700); err != nil {
return nil, err
}
@@ -108,23 +145,15 @@ func NewRaftServer(option *RaftServerOption) (*RaftServer, error) {
return nil, err
}
- for _, peer := range s.peers {
- if err := s.raftServer.AddPeer(string(peer), peer.ToGrpcAddress()); err != nil {
+ for name, peer := range s.peers {
+ if err := s.raftServer.AddPeer(name, peer.ToGrpcAddress()); err != nil {
return nil, err
}
}
// Remove deleted peers
for existsPeerName := range s.raftServer.Peers() {
- exists := false
- var existingPeer pb.ServerAddress
- for _, peer := range s.peers {
- if peer.String() == existsPeerName {
- exists, existingPeer = true, peer
- break
- }
- }
- if !exists {
+ if existingPeer, found := s.peers[existsPeerName]; !found {
if err := s.raftServer.RemovePeer(existsPeerName); err != nil {
glog.V(0).Infoln(err)
return nil, err
@@ -136,36 +165,26 @@ func NewRaftServer(option *RaftServerOption) (*RaftServer, error) {
s.GrpcServer = raft.NewGrpcServer(s.raftServer)
- if s.raftServer.IsLogEmpty() && isTheFirstOne(option.ServerAddr, s.peers) {
- // Initialize the server by joining itself.
- // s.DoJoinCommand()
- }
-
glog.V(0).Infof("current cluster leader: %v", s.raftServer.Leader())
return s, nil
}
func (s *RaftServer) Peers() (members []string) {
- peers := s.raftServer.Peers()
-
- for _, p := range peers {
- members = append(members, p.Name)
+ if s.raftServer != nil {
+ peers := s.raftServer.Peers()
+ for _, p := range peers {
+ members = append(members, p.Name)
+ }
+ } else if s.RaftHashicorp != nil {
+ cfg := s.RaftHashicorp.GetConfiguration()
+ for _, p := range cfg.Configuration().Servers {
+ members = append(members, string(p.ID))
+ }
}
-
return
}
-func isTheFirstOne(self pb.ServerAddress, peers []pb.ServerAddress) bool {
- sort.Slice(peers, func(i, j int) bool {
- return strings.Compare(string(peers[i]), string(peers[j])) < 0
- })
- if len(peers) <= 0 {
- return true
- }
- return self == peers[0]
-}
-
func (s *RaftServer) DoJoinCommand() {
glog.V(0).Infoln("Initializing new cluster")
diff --git a/weed/server/raft_server_handlers.go b/weed/server/raft_server_handlers.go
index 7e58f1e92..cc3e6e37f 100644
--- a/weed/server/raft_server_handlers.go
+++ b/weed/server/raft_server_handlers.go
@@ -25,3 +25,11 @@ func (s *RaftServer) StatusHandler(w http.ResponseWriter, r *http.Request) {
}
writeJsonQuiet(w, r, http.StatusOK, ret)
}
+
+func (s *RaftServer) StatsRaftHandler(w http.ResponseWriter, r *http.Request) {
+ if s.RaftHashicorp == nil {
+ writeJsonQuiet(w, r, http.StatusNotFound, nil)
+ return
+ }
+ writeJsonQuiet(w, r, http.StatusOK, s.RaftHashicorp.Stats())
+}
diff --git a/weed/server/volume_grpc_admin.go b/weed/server/volume_grpc_admin.go
index 898c3da12..476937847 100644
--- a/weed/server/volume_grpc_admin.go
+++ b/weed/server/volume_grpc_admin.go
@@ -3,7 +3,12 @@ package weed_server
import (
"context"
"fmt"
+ "github.com/chrislusf/seaweedfs/weed/cluster"
+ "github.com/chrislusf/seaweedfs/weed/pb"
+ "github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
+ "github.com/chrislusf/seaweedfs/weed/pb/master_pb"
"path/filepath"
+ "time"
"github.com/chrislusf/seaweedfs/weed/glog"
"github.com/chrislusf/seaweedfs/weed/pb/volume_server_pb"
@@ -247,3 +252,41 @@ func (vs *VolumeServer) VolumeNeedleStatus(ctx context.Context, req *volume_serv
return resp, nil
}
+
+func (vs *VolumeServer) Ping(ctx context.Context, req *volume_server_pb.PingRequest) (resp *volume_server_pb.PingResponse, pingErr error) {
+ resp = &volume_server_pb.PingResponse{
+ StartTimeNs: time.Now().UnixNano(),
+ }
+ if req.TargetType == cluster.FilerType {
+ pingErr = pb.WithFilerClient(false, pb.ServerAddress(req.Target), vs.grpcDialOption, func(client filer_pb.SeaweedFilerClient) error {
+ pingResp, err := client.Ping(ctx, &filer_pb.PingRequest{})
+ if pingResp != nil {
+ resp.RemoteTimeNs = pingResp.StartTimeNs
+ }
+ return err
+ })
+ }
+ if req.TargetType == cluster.VolumeServerType {
+ pingErr = pb.WithVolumeServerClient(false, pb.ServerAddress(req.Target), vs.grpcDialOption, func(client volume_server_pb.VolumeServerClient) error {
+ pingResp, err := client.Ping(ctx, &volume_server_pb.PingRequest{})
+ if pingResp != nil {
+ resp.RemoteTimeNs = pingResp.StartTimeNs
+ }
+ return err
+ })
+ }
+ if req.TargetType == cluster.MasterType {
+ pingErr = pb.WithMasterClient(false, pb.ServerAddress(req.Target), vs.grpcDialOption, func(client master_pb.SeaweedClient) error {
+ pingResp, err := client.Ping(ctx, &master_pb.PingRequest{})
+ if pingResp != nil {
+ resp.RemoteTimeNs = pingResp.StartTimeNs
+ }
+ return err
+ })
+ }
+ if pingErr != nil {
+ pingErr = fmt.Errorf("ping %s %s: %v", req.TargetType, req.Target, pingErr)
+ }
+ resp.StopTimeNs = time.Now().UnixNano()
+ return
+}