diff options
Diffstat (limited to 'weed/server')
23 files changed, 1132 insertions, 268 deletions
diff --git a/weed/server/filer_grpc_server.go b/weed/server/filer_grpc_server.go index 3f65660ee..67d4aaaaf 100644 --- a/weed/server/filer_grpc_server.go +++ b/weed/server/filer_grpc_server.go @@ -3,7 +3,6 @@ package weed_server import ( "context" "fmt" - "github.com/chrislusf/seaweedfs/weed/pb" "os" "path/filepath" "strconv" @@ -357,128 +356,3 @@ func (fs *FilerServer) DeleteCollection(ctx context.Context, req *filer_pb.Delet return &filer_pb.DeleteCollectionResponse{}, err } - -func (fs *FilerServer) Statistics(ctx context.Context, req *filer_pb.StatisticsRequest) (resp *filer_pb.StatisticsResponse, err error) { - - var output *master_pb.StatisticsResponse - - err = fs.filer.MasterClient.WithClient(false, func(masterClient master_pb.SeaweedClient) error { - grpcResponse, grpcErr := masterClient.Statistics(context.Background(), &master_pb.StatisticsRequest{ - Replication: req.Replication, - Collection: req.Collection, - Ttl: req.Ttl, - DiskType: req.DiskType, - }) - if grpcErr != nil { - return grpcErr - } - - output = grpcResponse - return nil - }) - - if err != nil { - return nil, err - } - - return &filer_pb.StatisticsResponse{ - TotalSize: output.TotalSize, - UsedSize: output.UsedSize, - FileCount: output.FileCount, - }, nil -} - -func (fs *FilerServer) GetFilerConfiguration(ctx context.Context, req *filer_pb.GetFilerConfigurationRequest) (resp *filer_pb.GetFilerConfigurationResponse, err error) { - - clusterId, _ := fs.filer.Store.KvGet(context.Background(), []byte("clusterId")) - - t := &filer_pb.GetFilerConfigurationResponse{ - Masters: pb.ToAddressStrings(fs.option.Masters), - Collection: fs.option.Collection, - Replication: fs.option.DefaultReplication, - MaxMb: uint32(fs.option.MaxMB), - DirBuckets: fs.filer.DirBucketsPath, - Cipher: fs.filer.Cipher, - Signature: fs.filer.Signature, - MetricsAddress: fs.metricsAddress, - MetricsIntervalSec: int32(fs.metricsIntervalSec), - Version: util.Version(), - ClusterId: string(clusterId), - } - - glog.V(4).Infof("GetFilerConfiguration: %v", t) - - return t, nil -} - -func (fs *FilerServer) KeepConnected(stream filer_pb.SeaweedFiler_KeepConnectedServer) error { - - req, err := stream.Recv() - if err != nil { - return err - } - - clientName := util.JoinHostPort(req.Name, int(req.GrpcPort)) - m := make(map[string]bool) - for _, tp := range req.Resources { - m[tp] = true - } - fs.brokersLock.Lock() - fs.brokers[clientName] = m - glog.V(0).Infof("+ broker %v", clientName) - fs.brokersLock.Unlock() - - defer func() { - fs.brokersLock.Lock() - delete(fs.brokers, clientName) - glog.V(0).Infof("- broker %v: %v", clientName, err) - fs.brokersLock.Unlock() - }() - - for { - if err := stream.Send(&filer_pb.KeepConnectedResponse{}); err != nil { - glog.V(0).Infof("send broker %v: %+v", clientName, err) - return err - } - // println("replied") - - if _, err := stream.Recv(); err != nil { - glog.V(0).Infof("recv broker %v: %v", clientName, err) - return err - } - // println("received") - } - -} - -func (fs *FilerServer) LocateBroker(ctx context.Context, req *filer_pb.LocateBrokerRequest) (resp *filer_pb.LocateBrokerResponse, err error) { - - resp = &filer_pb.LocateBrokerResponse{} - - fs.brokersLock.Lock() - defer fs.brokersLock.Unlock() - - var localBrokers []*filer_pb.LocateBrokerResponse_Resource - - for b, m := range fs.brokers { - if _, found := m[req.Resource]; found { - resp.Found = true - resp.Resources = []*filer_pb.LocateBrokerResponse_Resource{ - { - GrpcAddresses: b, - ResourceCount: int32(len(m)), - }, - } - return - } - localBrokers = append(localBrokers, &filer_pb.LocateBrokerResponse_Resource{ - GrpcAddresses: b, - ResourceCount: int32(len(m)), - }) - } - - resp.Resources = localBrokers - - return resp, nil - -} diff --git a/weed/server/filer_grpc_server_admin.go b/weed/server/filer_grpc_server_admin.go new file mode 100644 index 000000000..5341fc52f --- /dev/null +++ b/weed/server/filer_grpc_server_admin.go @@ -0,0 +1,177 @@ +package weed_server + +import ( + "context" + "fmt" + "github.com/chrislusf/seaweedfs/weed/cluster" + "github.com/chrislusf/seaweedfs/weed/glog" + "github.com/chrislusf/seaweedfs/weed/pb" + "github.com/chrislusf/seaweedfs/weed/pb/filer_pb" + "github.com/chrislusf/seaweedfs/weed/pb/master_pb" + "github.com/chrislusf/seaweedfs/weed/pb/volume_server_pb" + "github.com/chrislusf/seaweedfs/weed/util" + "time" +) + +func (fs *FilerServer) Statistics(ctx context.Context, req *filer_pb.StatisticsRequest) (resp *filer_pb.StatisticsResponse, err error) { + + var output *master_pb.StatisticsResponse + + err = fs.filer.MasterClient.WithClient(false, func(masterClient master_pb.SeaweedClient) error { + grpcResponse, grpcErr := masterClient.Statistics(context.Background(), &master_pb.StatisticsRequest{ + Replication: req.Replication, + Collection: req.Collection, + Ttl: req.Ttl, + DiskType: req.DiskType, + }) + if grpcErr != nil { + return grpcErr + } + + output = grpcResponse + return nil + }) + + if err != nil { + return nil, err + } + + return &filer_pb.StatisticsResponse{ + TotalSize: output.TotalSize, + UsedSize: output.UsedSize, + FileCount: output.FileCount, + }, nil +} + +func (fs *FilerServer) Ping(ctx context.Context, req *filer_pb.PingRequest) (resp *filer_pb.PingResponse, pingErr error) { + resp = &filer_pb.PingResponse{ + StartTimeNs: time.Now().UnixNano(), + } + if req.TargetType == cluster.FilerType { + pingErr = pb.WithFilerClient(false, pb.ServerAddress(req.Target), fs.grpcDialOption, func(client filer_pb.SeaweedFilerClient) error { + pingResp, err := client.Ping(ctx, &filer_pb.PingRequest{}) + if pingResp != nil { + resp.RemoteTimeNs = pingResp.StartTimeNs + } + return err + }) + } + if req.TargetType == cluster.VolumeServerType { + pingErr = pb.WithVolumeServerClient(false, pb.ServerAddress(req.Target), fs.grpcDialOption, func(client volume_server_pb.VolumeServerClient) error { + pingResp, err := client.Ping(ctx, &volume_server_pb.PingRequest{}) + if pingResp != nil { + resp.RemoteTimeNs = pingResp.StartTimeNs + } + return err + }) + } + if req.TargetType == cluster.MasterType { + pingErr = pb.WithMasterClient(false, pb.ServerAddress(req.Target), fs.grpcDialOption, func(client master_pb.SeaweedClient) error { + pingResp, err := client.Ping(ctx, &master_pb.PingRequest{}) + if pingResp != nil { + resp.RemoteTimeNs = pingResp.StartTimeNs + } + return err + }) + } + if pingErr != nil { + pingErr = fmt.Errorf("ping %s %s: %v", req.TargetType, req.Target, pingErr) + } + resp.StopTimeNs = time.Now().UnixNano() + return +} + +func (fs *FilerServer) GetFilerConfiguration(ctx context.Context, req *filer_pb.GetFilerConfigurationRequest) (resp *filer_pb.GetFilerConfigurationResponse, err error) { + + clusterId, _ := fs.filer.Store.KvGet(context.Background(), []byte("clusterId")) + + t := &filer_pb.GetFilerConfigurationResponse{ + Masters: pb.ToAddressStringsFromMap(fs.option.Masters), + Collection: fs.option.Collection, + Replication: fs.option.DefaultReplication, + MaxMb: uint32(fs.option.MaxMB), + DirBuckets: fs.filer.DirBucketsPath, + Cipher: fs.filer.Cipher, + Signature: fs.filer.Signature, + MetricsAddress: fs.metricsAddress, + MetricsIntervalSec: int32(fs.metricsIntervalSec), + Version: util.Version(), + ClusterId: string(clusterId), + } + + glog.V(4).Infof("GetFilerConfiguration: %v", t) + + return t, nil +} + +func (fs *FilerServer) KeepConnected(stream filer_pb.SeaweedFiler_KeepConnectedServer) error { + + req, err := stream.Recv() + if err != nil { + return err + } + + clientName := util.JoinHostPort(req.Name, int(req.GrpcPort)) + m := make(map[string]bool) + for _, tp := range req.Resources { + m[tp] = true + } + fs.brokersLock.Lock() + fs.brokers[clientName] = m + glog.V(0).Infof("+ broker %v", clientName) + fs.brokersLock.Unlock() + + defer func() { + fs.brokersLock.Lock() + delete(fs.brokers, clientName) + glog.V(0).Infof("- broker %v: %v", clientName, err) + fs.brokersLock.Unlock() + }() + + for { + if err := stream.Send(&filer_pb.KeepConnectedResponse{}); err != nil { + glog.V(0).Infof("send broker %v: %+v", clientName, err) + return err + } + // println("replied") + + if _, err := stream.Recv(); err != nil { + glog.V(0).Infof("recv broker %v: %v", clientName, err) + return err + } + // println("received") + } + +} + +func (fs *FilerServer) LocateBroker(ctx context.Context, req *filer_pb.LocateBrokerRequest) (resp *filer_pb.LocateBrokerResponse, err error) { + + resp = &filer_pb.LocateBrokerResponse{} + + fs.brokersLock.Lock() + defer fs.brokersLock.Unlock() + + var localBrokers []*filer_pb.LocateBrokerResponse_Resource + + for b, m := range fs.brokers { + if _, found := m[req.Resource]; found { + resp.Found = true + resp.Resources = []*filer_pb.LocateBrokerResponse_Resource{ + { + GrpcAddresses: b, + ResourceCount: int32(len(m)), + }, + } + return + } + localBrokers = append(localBrokers, &filer_pb.LocateBrokerResponse_Resource{ + GrpcAddresses: b, + ResourceCount: int32(len(m)), + }) + } + + resp.Resources = localBrokers + + return resp, nil + +} diff --git a/weed/server/filer_server.go b/weed/server/filer_server.go index 497f59568..7edd5870f 100644 --- a/weed/server/filer_server.go +++ b/weed/server/filer_server.go @@ -48,7 +48,7 @@ import ( ) type FilerOption struct { - Masters []pb.ServerAddress + Masters map[string]pb.ServerAddress Collection string DefaultReplication string DisableDirListing bool diff --git a/weed/server/filer_server_handlers_read_dir.go b/weed/server/filer_server_handlers_read_dir.go index f67e90d38..8382cfc76 100644 --- a/weed/server/filer_server_handlers_read_dir.go +++ b/weed/server/filer_server_handlers_read_dir.go @@ -46,8 +46,10 @@ func (fs *FilerServer) listDirectoryHandler(w http.ResponseWriter, r *http.Reque path = "" } + emptyFolder := true if len(entries) > 0 { lastFileName = entries[len(entries)-1].Name() + emptyFolder = false } glog.V(4).Infof("listDirectory %s, last file %s, limit %d: %d items", path, lastFileName, limit, len(entries)) @@ -59,12 +61,14 @@ func (fs *FilerServer) listDirectoryHandler(w http.ResponseWriter, r *http.Reque Limit int LastFileName string ShouldDisplayLoadMore bool + EmptyFolder bool }{ path, entries, limit, lastFileName, shouldDisplayLoadMore, + emptyFolder, }) return } @@ -76,6 +80,7 @@ func (fs *FilerServer) listDirectoryHandler(w http.ResponseWriter, r *http.Reque Limit int LastFileName string ShouldDisplayLoadMore bool + EmptyFolder bool }{ path, ui.ToBreadcrumb(path), @@ -83,5 +88,6 @@ func (fs *FilerServer) listDirectoryHandler(w http.ResponseWriter, r *http.Reque limit, lastFileName, shouldDisplayLoadMore, + emptyFolder, }) } diff --git a/weed/server/filer_server_handlers_tagging.go b/weed/server/filer_server_handlers_tagging.go index 1da7ff50f..ae2093947 100644 --- a/weed/server/filer_server_handlers_tagging.go +++ b/weed/server/filer_server_handlers_tagging.go @@ -82,7 +82,9 @@ func (fs *FilerServer) DeleteTaggingHandler(w http.ResponseWriter, r *http.Reque toDelete := strings.Split(r.URL.Query().Get("tagging"), ",") deletions := make(map[string]struct{}) for _, deletion := range toDelete { - deletions[deletion] = struct{}{} + if deletion != "" { + deletions[deletion] = struct{}{} + } } // delete all tags or specific tags diff --git a/weed/server/filer_server_handlers_write_autochunk.go b/weed/server/filer_server_handlers_write_autochunk.go index 854b35f82..9bf2df6ef 100644 --- a/weed/server/filer_server_handlers_write_autochunk.go +++ b/weed/server/filer_server_handlers_write_autochunk.go @@ -164,6 +164,7 @@ func (fs *FilerServer) saveMetaData(ctx context.Context, r *http.Request, fileNa } var entry *filer.Entry + var newChunks []*filer_pb.FileChunk var mergedChunks []*filer_pb.FileChunk isAppend := isAppend(r) @@ -186,7 +187,7 @@ func (fs *FilerServer) saveMetaData(ctx context.Context, r *http.Request, fileNa } entry.FileSize += uint64(chunkOffset) } - mergedChunks = append(entry.Chunks, fileChunks...) + newChunks = append(entry.Chunks, fileChunks...) // TODO if len(entry.Content) > 0 { @@ -196,7 +197,7 @@ func (fs *FilerServer) saveMetaData(ctx context.Context, r *http.Request, fileNa } else { glog.V(4).Infoln("saving", path) - mergedChunks = fileChunks + newChunks = fileChunks entry = &filer.Entry{ FullPath: util.FullPath(path), Attr: filer.Attr{ @@ -217,6 +218,13 @@ func (fs *FilerServer) saveMetaData(ctx context.Context, r *http.Request, fileNa } } + // maybe concatenate small chunks into one whole chunk + mergedChunks, replyerr = fs.maybeMergeChunks(so, newChunks) + if replyerr != nil { + glog.V(0).Infof("merge chunks %s: %v", r.RequestURI, replyerr) + mergedChunks = newChunks + } + // maybe compact entry chunks mergedChunks, replyerr = filer.MaybeManifestize(fs.saveAsChunk(so), mergedChunks) if replyerr != nil { diff --git a/weed/server/filer_server_handlers_write_merge.go b/weed/server/filer_server_handlers_write_merge.go new file mode 100644 index 000000000..dadc6f726 --- /dev/null +++ b/weed/server/filer_server_handlers_write_merge.go @@ -0,0 +1,11 @@ +package weed_server + +import ( + "github.com/chrislusf/seaweedfs/weed/operation" + "github.com/chrislusf/seaweedfs/weed/pb/filer_pb" +) + +func (fs *FilerServer) maybeMergeChunks(so *operation.StorageOption, inputChunks []*filer_pb.FileChunk) (mergedChunks []*filer_pb.FileChunk, err error) { + //TODO merge consecutive smaller chunks into a large chunk to reduce number of chunks + return inputChunks, nil +} diff --git a/weed/server/filer_server_handlers_write_upload.go b/weed/server/filer_server_handlers_write_upload.go index 6ee378819..fe3346402 100644 --- a/weed/server/filer_server_handlers_write_upload.go +++ b/weed/server/filer_server_handlers_write_upload.go @@ -4,10 +4,10 @@ import ( "bytes" "crypto/md5" "fmt" + "golang.org/x/exp/slices" "hash" "io" "net/http" - "sort" "strconv" "strings" "sync" @@ -130,11 +130,9 @@ func (fs *FilerServer) uploadReaderToChunks(w http.ResponseWriter, r *http.Reque fs.filer.DeleteChunks(fileChunks) return nil, md5Hash, 0, uploadErr, nil } - - sort.Slice(fileChunks, func(i, j int) bool { - return fileChunks[i].Offset < fileChunks[j].Offset + slices.SortFunc(fileChunks, func(a, b *filer_pb.FileChunk) bool { + return a.Offset < b.Offset }) - return fileChunks, md5Hash, chunkOffset, nil, smallContent } diff --git a/weed/server/filer_ui/breadcrumb.go b/weed/server/filer_ui/breadcrumb.go index 5016117a8..3201ff76c 100644 --- a/weed/server/filer_ui/breadcrumb.go +++ b/weed/server/filer_ui/breadcrumb.go @@ -15,8 +15,12 @@ func ToBreadcrumb(fullpath string) (crumbs []Breadcrumb) { parts := strings.Split(fullpath, "/") for i := 0; i < len(parts); i++ { + name := parts[i] + if name == "" { + name = "/" + } crumb := Breadcrumb{ - Name: parts[i] + " /", + Name: name, Link: "/" + util.Join(parts[0:i+1]...), } if !strings.HasSuffix(crumb.Link, "/") { diff --git a/weed/server/filer_ui/filer.html b/weed/server/filer_ui/filer.html index 6f57c25d8..785f82887 100644 --- a/weed/server/filer_ui/filer.html +++ b/weed/server/filer_ui/filer.html @@ -11,6 +11,7 @@ #drop-area { border: 1px transparent; + margin-top: 5px; } #drop-area.highlight { @@ -26,6 +27,12 @@ border-radius: 2px; border: 1px solid #ccc; float: right; + margin-left: 2px; + margin-bottom: 0; + } + + label { + font-weight: normal; } .button:hover { @@ -36,6 +43,38 @@ display: none; } + td, th { + vertical-align: bottom; + } + + .table-hover > tbody > tr:hover > * > div.operations { + display: block; + } + + .table > tbody > tr { + height: 39px; + } + + div.operations { + display: none; + } + + .footer { + position: absolute; + bottom: 0px; + right: 5%; + min-width: 25%; + border-left: 1px solid #ccc; + border-right: 1px solid #ccc; + } + + .add-files { + font-size: 46px; + text-align: center; + border: 1px dashed #999; + padding-bottom: 9px; + margin: 0 2px; + } </style> </head> <body> @@ -48,12 +87,21 @@ </div> <div class="row"> <div> + <div class="btn-group btn-group-sm pull-right" role="group" style="margin-top:3px;"> + <label class="btn btn-default" onclick="handleCreateDir()"> + <span class="glyphicon glyphicon-plus" aria-hidden="true"></span> New Folder + </label> + <label class="btn btn-default" for="fileElem"> + <span class="glyphicon glyphicon-cloud-upload" aria-hidden="true"></span> Upload + </label> + </div> + <ol class="breadcrumb"> {{ range $entry := .Breadcrumbs }} - <a href="{{ printpath $entry.Link }}"> + <li><a href="{{ printpath $entry.Link }}"> {{ $entry.Name }} - </a> + </li></a> {{ end }} - <label class="button" for="fileElem">Upload</label> + </ol> </div> </div> @@ -61,13 +109,18 @@ <form class="upload-form"> <input type="file" id="fileElem" multiple onchange="handleFiles(this.files)"> - <table width="90%"> + {{if .EmptyFolder}} + <div class="row add-files"> + + + </div> + {{else}} + <table width="100%" class="table table-hover"> {{$path := .Path }} {{ range $entry_index, $entry := .Entries }} <tr> <td> {{if $entry.IsDirectory}} - <img src="/seaweedfsstatic/images/folder.gif" width="20" height="23"> + <span class="glyphicon glyphicon-folder-open" aria-hidden="true"></span> <a href="{{ printpath $path "/" $entry.Name "/"}}" > {{ $entry.Name }} </a> @@ -89,13 +142,29 @@ {{ $entry.Size | humanizeBytes }} {{end}} </td> - <td nowrap> + <td align="right" nowrap> {{ $entry.Timestamp.Format "2006-01-02 15:04" }} </td> + <td style="width:75px"> + <div class="btn-group btn-group-xs pull-right operations" role="group"> + <label class="btn" onclick="handleRename('{{ $entry.Name }}', '{{ printpath $path "/" }}')"> + <span class="glyphicon glyphicon-edit" aria-hidden="true"></span> + </label> + {{if $entry.IsDirectory}} + <label class="btn" onclick="handleDelete('{{ printpath $path "/" $entry.Name "/" }}')"> + <span class="glyphicon glyphicon-trash" aria-hidden="true"></span> + </label> + {{else}} + <label class="btn" onclick="handleDelete('{{ printpath $path "/" $entry.Name }}')"> + <span class="glyphicon glyphicon-trash" aria-hidden="true"></span> + </label> + {{end}} + </div> + </td> </tr> {{ end }} - </table> + {{end}} </form> </div> @@ -109,65 +178,177 @@ <br/> <br/> - + <div id="progress-area" class="footer" style="display: none;"> + </div> </div> </body> <script type="text/javascript"> // ************************ Drag and drop ***************** // - let dropArea = document.getElementById("drop-area") + let dropArea = document.getElementById("drop-area"); + let progressArea = document.getElementById("progress-area"); // Prevent default drag behaviors ;['dragenter', 'dragover', 'dragleave', 'drop'].forEach(eventName => { - dropArea.addEventListener(eventName, preventDefaults, false) - document.body.addEventListener(eventName, preventDefaults, false) - }) + dropArea.addEventListener(eventName, preventDefaults, false); + document.body.addEventListener(eventName, preventDefaults, false); + }); // Highlight drop area when item is dragged over it ;['dragenter', 'dragover'].forEach(eventName => { - dropArea.addEventListener(eventName, highlight, false) - }) + dropArea.addEventListener(eventName, highlight, false); + }); ;['dragleave', 'drop'].forEach(eventName => { - dropArea.addEventListener(eventName, unhighlight, false) - }) + dropArea.addEventListener(eventName, unhighlight, false); + }); // Handle dropped files - dropArea.addEventListener('drop', handleDrop, false) + dropArea.addEventListener('drop', handleDrop, false); function preventDefaults(e) { - e.preventDefault() - e.stopPropagation() + e.preventDefault(); + e.stopPropagation(); } function highlight(e) { - dropArea.classList.add('highlight') + dropArea.classList.add('highlight'); } function unhighlight(e) { - dropArea.classList.remove('highlight') + dropArea.classList.remove('highlight'); } function handleDrop(e) { - var dt = e.dataTransfer - var files = dt.files + var dt = e.dataTransfer; + var files = dt.files; - handleFiles(files) + handleFiles(files); } + var uploadList = {}; + function handleFiles(files) { - files = [...files] - files.forEach(uploadFile) - window.location.reload() + files = [...files]; + files.forEach(startUpload); + renderProgress(); + files.forEach(uploadFile); + } + + function startUpload(file, i) { + uploadList[file.name] = {'name': file.name, 'percent': 0, 'finish': false}; + } + + function renderProgress() { + var values = Object.values(uploadList); + var html = '<table class="table">\n<tr><th>Uploading</th><\/tr>\n'; + for (let i of values) { + var progressBarClass = 'progress-bar-striped active'; + if (i.percent >= 100) { + progressBarClass = 'progress-bar-success'; + } + html += '<tr>\n<td>\n'; + html += '<div class="progress" style="margin-bottom: 2px;">\n'; + html += '<div class="progress-bar ' + progressBarClass + '" role="progressbar" aria-valuenow="' + '100" aria-valuemin="0" aria-valuemax="100" style="width:' + i.percent + '%;">'; + html += '<span style="margin-right: 10px;">' + i.name + '</span>' + i.percent + '%<\/div>'; + html += '<\/div>\n<\/td>\n<\/tr>\n'; + } + html += '<\/table>\n'; + progressArea.innerHTML = html; + if (values.length > 0) { + progressArea.attributes.style.value = ''; + } + } + + function reportProgress(file, percent) { + var item = uploadList[file] + item.percent = percent; + renderProgress(); + } + + function finishUpload(file) { + uploadList[file]['finish'] = true; + renderProgress(); + var allFinish = true; + for (let i of Object.values(uploadList)) { + if (!i.finish) { + allFinish = false; + break; + } + } + if (allFinish) { + console.log('All Finish'); + window.location.reload(); + } } function uploadFile(file, i) { - var url = window.location.href - var xhr = new XMLHttpRequest() - var formData = new FormData() - xhr.open('POST', url, false) + var url = window.location.href; + var xhr = new XMLHttpRequest(); + var fileName = file.name; + xhr.upload.addEventListener('progress', function(e) { + if (e.lengthComputable) { + var percent = Math.ceil((e.loaded / e.total) * 100); + reportProgress(fileName, percent) + } + }); + xhr.upload.addEventListener('loadend', function(e) { + finishUpload(fileName); + }); + var formData = new FormData(); + xhr.open('POST', url, true); + formData.append('file', file); + xhr.send(formData); + } + + function handleCreateDir() { + var dirName = prompt('Folder Name:', ''); + dirName = dirName.trim(); + if (dirName == null || dirName == '') { + return; + } + var baseUrl = window.location.href; + if (!baseUrl.endsWith('/')) { + baseUrl += '/'; + } + var url = baseUrl + dirName; + if (!url.endsWith('/')) { + url += '/'; + } + var xhr = new XMLHttpRequest(); + xhr.open('POST', url, false); + xhr.setRequestHeader('Content-Type', ''); + xhr.send(); + window.location.reload(); + } + + function handleRename(originName, basePath) { + var newName = prompt('New Name:', originName); + if (newName == null || newName == '') { + return; + } + var url = basePath + newName; + var originPath = basePath + originName; + url += '?mv.from=' + originPath; + var xhr = new XMLHttpRequest(); + xhr.open('POST', url, false); + xhr.setRequestHeader('Content-Type', ''); + xhr.send(); + window.location.reload(); + } + + function handleDelete(path) { + if (!confirm('Are you sure to delete ' + path + '?')) { + return; + } + var url = path; + if (url.endsWith('/')) { + url += '?recursive=true'; + } - formData.append('file', file) - xhr.send(formData) + var xhr = new XMLHttpRequest(); + xhr.open('DELETE', url, false); + xhr.send(); + window.location.reload(); } </script> </html> diff --git a/weed/server/master_grpc_server.go b/weed/server/master_grpc_server.go index 50fcc0d62..83abdaaad 100644 --- a/weed/server/master_grpc_server.go +++ b/weed/server/master_grpc_server.go @@ -113,6 +113,9 @@ func (ms *MasterServer) SendHeartbeat(stream master_pb.Seaweed_SendHeartbeatServ } if len(heartbeat.Volumes) > 0 || heartbeat.HasNoVolumes { + dcName, rackName := ms.Topo.Configuration.Locate(heartbeat.Ip, heartbeat.DataCenter, heartbeat.Rack) + ms.Topo.DataNodeRegistration(dcName, rackName, dn) + // process heartbeat.Volumes stats.MasterReceivedHeartbeatCounter.WithLabelValues("Volumes").Inc() newVolumes, deletedVolumes := ms.Topo.SyncDataNodeRegistration(heartbeat.Volumes, dn) @@ -133,13 +136,13 @@ func (ms *MasterServer) SendHeartbeat(stream master_pb.Seaweed_SendHeartbeatServ ms.Topo.IncrementalSyncDataNodeEcShards(heartbeat.NewEcShards, heartbeat.DeletedEcShards, dn) for _, s := range heartbeat.NewEcShards { - message.NewVids = append(message.NewVids, s.Id) + message.NewEcVids = append(message.NewEcVids, s.Id) } for _, s := range heartbeat.DeletedEcShards { - if dn.HasVolumesById(needle.VolumeId(s.Id)) { + if dn.HasEcShards(needle.VolumeId(s.Id)) { continue } - message.DeletedVids = append(message.DeletedVids, s.Id) + message.DeletedEcVids = append(message.DeletedEcVids, s.Id) } } @@ -151,17 +154,17 @@ func (ms *MasterServer) SendHeartbeat(stream master_pb.Seaweed_SendHeartbeatServ // broadcast the ec vid changes to master clients for _, s := range newShards { - message.NewVids = append(message.NewVids, uint32(s.VolumeId)) + message.NewEcVids = append(message.NewEcVids, uint32(s.VolumeId)) } for _, s := range deletedShards { if dn.HasVolumesById(s.VolumeId) { continue } - message.DeletedVids = append(message.DeletedVids, uint32(s.VolumeId)) + message.DeletedEcVids = append(message.DeletedEcVids, uint32(s.VolumeId)) } } - if len(message.NewVids) > 0 || len(message.DeletedVids) > 0 { + if len(message.NewVids) > 0 || len(message.DeletedVids) > 0 || len(message.NewEcVids) > 0 || len(message.DeletedEcVids) > 0 { ms.broadcastToClients(&master_pb.KeepConnectedResponse{VolumeLocation: message}) } diff --git a/weed/server/master_grpc_server_admin.go b/weed/server/master_grpc_server_admin.go index 983606476..1f37e979a 100644 --- a/weed/server/master_grpc_server_admin.go +++ b/weed/server/master_grpc_server_admin.go @@ -3,7 +3,11 @@ package weed_server import ( "context" "fmt" + "github.com/chrislusf/seaweedfs/weed/cluster" "github.com/chrislusf/seaweedfs/weed/glog" + "github.com/chrislusf/seaweedfs/weed/pb" + "github.com/chrislusf/seaweedfs/weed/pb/filer_pb" + "github.com/chrislusf/seaweedfs/weed/pb/volume_server_pb" "math/rand" "sync" "time" @@ -142,3 +146,41 @@ func (ms *MasterServer) ReleaseAdminToken(ctx context.Context, req *master_pb.Re } return resp, nil } + +func (ms *MasterServer) Ping(ctx context.Context, req *master_pb.PingRequest) (resp *master_pb.PingResponse, pingErr error) { + resp = &master_pb.PingResponse{ + StartTimeNs: time.Now().UnixNano(), + } + if req.TargetType == cluster.FilerType { + pingErr = pb.WithFilerClient(false, pb.ServerAddress(req.Target), ms.grpcDialOption, func(client filer_pb.SeaweedFilerClient) error { + pingResp, err := client.Ping(ctx, &filer_pb.PingRequest{}) + if pingResp != nil { + resp.RemoteTimeNs = pingResp.StartTimeNs + } + return err + }) + } + if req.TargetType == cluster.VolumeServerType { + pingErr = pb.WithVolumeServerClient(false, pb.ServerAddress(req.Target), ms.grpcDialOption, func(client volume_server_pb.VolumeServerClient) error { + pingResp, err := client.Ping(ctx, &volume_server_pb.PingRequest{}) + if pingResp != nil { + resp.RemoteTimeNs = pingResp.StartTimeNs + } + return err + }) + } + if req.TargetType == cluster.MasterType { + pingErr = pb.WithMasterClient(false, pb.ServerAddress(req.Target), ms.grpcDialOption, func(client master_pb.SeaweedClient) error { + pingResp, err := client.Ping(ctx, &master_pb.PingRequest{}) + if pingResp != nil { + resp.RemoteTimeNs = pingResp.StartTimeNs + } + return err + }) + } + if pingErr != nil { + pingErr = fmt.Errorf("ping %s %s: %v", req.TargetType, req.Target, pingErr) + } + resp.StopTimeNs = time.Now().UnixNano() + return +} diff --git a/weed/server/master_grpc_server_raft.go b/weed/server/master_grpc_server_raft.go new file mode 100644 index 000000000..37491b3df --- /dev/null +++ b/weed/server/master_grpc_server_raft.go @@ -0,0 +1,66 @@ +package weed_server + +import ( + "context" + "fmt" + "github.com/chrislusf/seaweedfs/weed/cluster" + "github.com/chrislusf/seaweedfs/weed/pb/master_pb" + "github.com/hashicorp/raft" +) + +func (ms *MasterServer) RaftListClusterServers(ctx context.Context, req *master_pb.RaftListClusterServersRequest) (*master_pb.RaftListClusterServersResponse, error) { + resp := &master_pb.RaftListClusterServersResponse{} + + servers := ms.Topo.HashicorpRaft.GetConfiguration().Configuration().Servers + + for _, server := range servers { + resp.ClusterServers = append(resp.ClusterServers, &master_pb.RaftListClusterServersResponse_ClusterServers{ + Id: string(server.ID), + Address: string(server.Address), + Suffrage: server.Suffrage.String(), + }) + } + return resp, nil +} + +func (ms *MasterServer) RaftAddServer(ctx context.Context, req *master_pb.RaftAddServerRequest) (*master_pb.RaftAddServerResponse, error) { + resp := &master_pb.RaftAddServerResponse{} + if ms.Topo.HashicorpRaft.State() != raft.Leader { + return nil, fmt.Errorf("raft add server %s failed: %s is no current leader", req.Id, ms.Topo.HashicorpRaft.String()) + } + + var idxFuture raft.IndexFuture + if req.Voter { + idxFuture = ms.Topo.HashicorpRaft.AddVoter(raft.ServerID(req.Id), raft.ServerAddress(req.Address), 0, 0) + } else { + idxFuture = ms.Topo.HashicorpRaft.AddNonvoter(raft.ServerID(req.Id), raft.ServerAddress(req.Address), 0, 0) + } + + if err := idxFuture.Error(); err != nil { + return nil, err + } + return resp, nil +} + +func (ms *MasterServer) RaftRemoveServer(ctx context.Context, req *master_pb.RaftRemoveServerRequest) (*master_pb.RaftRemoveServerResponse, error) { + resp := &master_pb.RaftRemoveServerResponse{} + + if ms.Topo.HashicorpRaft.State() != raft.Leader { + return nil, fmt.Errorf("raft remove server %s failed: %s is no current leader", req.Id, ms.Topo.HashicorpRaft.String()) + } + + if !req.Force { + ms.clientChansLock.RLock() + _, ok := ms.clientChans[fmt.Sprintf("%s@%s", cluster.MasterType, req.Id)] + ms.clientChansLock.RUnlock() + if ok { + return resp, fmt.Errorf("raft remove server %s failed: client connection to master exists", req.Id) + } + } + + idxFuture := ms.Topo.HashicorpRaft.RemoveServer(raft.ServerID(req.Id), 0, 0) + if err := idxFuture.Error(); err != nil { + return nil, err + } + return resp, nil +} diff --git a/weed/server/master_grpc_server_volume.go b/weed/server/master_grpc_server_volume.go index 9389bceb8..bc92dd332 100644 --- a/weed/server/master_grpc_server_volume.go +++ b/weed/server/master_grpc_server_volume.go @@ -268,7 +268,7 @@ func (ms *MasterServer) VacuumVolume(ctx context.Context, req *master_pb.VacuumV resp := &master_pb.VacuumVolumeResponse{} - ms.Topo.Vacuum(ms.grpcDialOption, float64(req.GarbageThreshold), ms.preallocateSize) + ms.Topo.Vacuum(ms.grpcDialOption, float64(req.GarbageThreshold), req.VolumeId, req.Collection, ms.preallocateSize) return resp, nil } diff --git a/weed/server/master_server.go b/weed/server/master_server.go index 671432d5c..9f29d4ba7 100644 --- a/weed/server/master_server.go +++ b/weed/server/master_server.go @@ -1,6 +1,7 @@ package weed_server import ( + "context" "fmt" "github.com/chrislusf/seaweedfs/weed/stats" "net/http" @@ -17,6 +18,7 @@ import ( "github.com/chrislusf/raft" "github.com/gorilla/mux" + hashicorpRaft "github.com/hashicorp/raft" "google.golang.org/grpc" "github.com/chrislusf/seaweedfs/weed/glog" @@ -30,8 +32,9 @@ import ( ) const ( - SequencerType = "master.sequencer.type" - SequencerSnowflakeId = "master.sequencer.sequencer_snowflake_id" + SequencerType = "master.sequencer.type" + SequencerSnowflakeId = "master.sequencer.sequencer_snowflake_id" + RaftServerRemovalTime = 72 * time.Minute ) type MasterOption struct { @@ -62,6 +65,9 @@ type MasterServer struct { boundedLeaderChan chan int + onPeerUpdatDoneCn chan string + onPeerUpdatDoneCnExist bool + // notifying clients clientChansLock sync.RWMutex clientChans map[string]chan *master_pb.KeepConnectedResponse @@ -75,7 +81,7 @@ type MasterServer struct { Cluster *cluster.Cluster } -func NewMasterServer(r *mux.Router, option *MasterOption, peers []pb.ServerAddress) *MasterServer { +func NewMasterServer(r *mux.Router, option *MasterOption, peers map[string]pb.ServerAddress) *MasterServer { v := util.GetViper() signingKey := v.GetString("jwt.signing.key") @@ -112,6 +118,9 @@ func NewMasterServer(r *mux.Router, option *MasterOption, peers []pb.ServerAddre Cluster: cluster.NewCluster(), } ms.boundedLeaderChan = make(chan int, 16) + ms.onPeerUpdatDoneCn = make(chan string) + + ms.MasterClient.OnPeerUpdate = ms.OnPeerUpdate seq := ms.createSequencer(option) if nil == seq { @@ -160,19 +169,41 @@ func NewMasterServer(r *mux.Router, option *MasterOption, peers []pb.ServerAddre } func (ms *MasterServer) SetRaftServer(raftServer *RaftServer) { - ms.Topo.RaftServer = raftServer.raftServer - ms.Topo.RaftServer.AddEventListener(raft.LeaderChangeEventType, func(e raft.Event) { - glog.V(0).Infof("leader change event: %+v => %+v", e.PrevValue(), e.Value()) - stats.MasterLeaderChangeCounter.WithLabelValues(fmt.Sprintf("%+v", e.Value())).Inc() - if ms.Topo.RaftServer.Leader() != "" { - glog.V(0).Infoln("[", ms.Topo.RaftServer.Name(), "]", ms.Topo.RaftServer.Leader(), "becomes leader.") - } - }) + var raftServerName string + if raftServer.raftServer != nil { + ms.Topo.RaftServer = raftServer.raftServer + ms.Topo.RaftServer.AddEventListener(raft.LeaderChangeEventType, func(e raft.Event) { + glog.V(0).Infof("leader change event: %+v => %+v", e.PrevValue(), e.Value()) + stats.MasterLeaderChangeCounter.WithLabelValues(fmt.Sprintf("%+v", e.Value())).Inc() + if ms.Topo.RaftServer.Leader() != "" { + glog.V(0).Infoln("[", ms.Topo.RaftServer.Name(), "]", ms.Topo.RaftServer.Leader(), "becomes leader.") + } + }) + raftServerName = ms.Topo.RaftServer.Name() + } else if raftServer.RaftHashicorp != nil { + ms.Topo.HashicorpRaft = raftServer.RaftHashicorp + leaderCh := raftServer.RaftHashicorp.LeaderCh() + prevLeader := ms.Topo.HashicorpRaft.Leader() + go func() { + for { + select { + case isLeader := <-leaderCh: + leader := ms.Topo.HashicorpRaft.Leader() + glog.V(0).Infof("is leader %+v change event: %+v => %+v", isLeader, prevLeader, leader) + stats.MasterLeaderChangeCounter.WithLabelValues(fmt.Sprintf("%+v", leader)).Inc() + prevLeader = leader + } + } + }() + raftServerName = ms.Topo.HashicorpRaft.String() + } if ms.Topo.IsLeader() { - glog.V(0).Infoln("[", ms.Topo.RaftServer.Name(), "]", "I am the leader!") + glog.V(0).Infoln("[", raftServerName, "]", "I am the leader!") } else { - if ms.Topo.RaftServer.Leader() != "" { + if ms.Topo.RaftServer != nil && ms.Topo.RaftServer.Leader() != "" { glog.V(0).Infoln("[", ms.Topo.RaftServer.Name(), "]", ms.Topo.RaftServer.Leader(), "is the leader.") + } else if ms.Topo.HashicorpRaft != nil && ms.Topo.HashicorpRaft.Leader() != "" { + glog.V(0).Infoln("[", ms.Topo.HashicorpRaft.String(), "]", ms.Topo.HashicorpRaft.Leader(), "is the leader.") } } } @@ -181,31 +212,38 @@ func (ms *MasterServer) proxyToLeader(f http.HandlerFunc) http.HandlerFunc { return func(w http.ResponseWriter, r *http.Request) { if ms.Topo.IsLeader() { f(w, r) - } else if ms.Topo.RaftServer != nil && ms.Topo.RaftServer.Leader() != "" { - ms.boundedLeaderChan <- 1 - defer func() { <-ms.boundedLeaderChan }() - targetUrl, err := url.Parse("http://" + ms.Topo.RaftServer.Leader()) - if err != nil { - writeJsonError(w, r, http.StatusInternalServerError, - fmt.Errorf("Leader URL http://%s Parse Error: %v", ms.Topo.RaftServer.Leader(), err)) - return - } - glog.V(4).Infoln("proxying to leader", ms.Topo.RaftServer.Leader()) - proxy := httputil.NewSingleHostReverseProxy(targetUrl) - director := proxy.Director - proxy.Director = func(req *http.Request) { - actualHost, err := security.GetActualRemoteHost(req) - if err == nil { - req.Header.Set("HTTP_X_FORWARDED_FOR", actualHost) - } - director(req) - } - proxy.Transport = util.Transport - proxy.ServeHTTP(w, r) - } else { - // handle requests locally + return + } + var raftServerLeader string + if ms.Topo.RaftServer != nil && ms.Topo.RaftServer.Leader() != "" { + raftServerLeader = ms.Topo.RaftServer.Leader() + } else if ms.Topo.HashicorpRaft != nil && ms.Topo.HashicorpRaft.Leader() != "" { + raftServerLeader = string(ms.Topo.HashicorpRaft.Leader()) + } + if raftServerLeader == "" { f(w, r) + return + } + ms.boundedLeaderChan <- 1 + defer func() { <-ms.boundedLeaderChan }() + targetUrl, err := url.Parse("http://" + raftServerLeader) + if err != nil { + writeJsonError(w, r, http.StatusInternalServerError, + fmt.Errorf("Leader URL http://%s Parse Error: %v", raftServerLeader, err)) + return + } + glog.V(4).Infoln("proxying to leader", raftServerLeader) + proxy := httputil.NewSingleHostReverseProxy(targetUrl) + director := proxy.Director + proxy.Director = func(req *http.Request) { + actualHost, err := security.GetActualRemoteHost(req) + if err == nil { + req.Header.Set("HTTP_X_FORWARDED_FOR", actualHost) + } + director(req) } + proxy.Transport = util.Transport + proxy.ServeHTTP(w, r) } } @@ -301,3 +339,57 @@ func (ms *MasterServer) createSequencer(option *MasterOption) sequence.Sequencer } return seq } + +func (ms *MasterServer) OnPeerUpdate(update *master_pb.ClusterNodeUpdate) { + if update.NodeType != cluster.MasterType || ms.Topo.HashicorpRaft == nil { + return + } + glog.V(4).Infof("OnPeerUpdate: %+v", update) + + peerAddress := pb.ServerAddress(update.Address) + peerName := string(peerAddress) + isLeader := ms.Topo.HashicorpRaft.State() == hashicorpRaft.Leader + if update.IsAdd { + if isLeader { + raftServerFound := false + for _, server := range ms.Topo.HashicorpRaft.GetConfiguration().Configuration().Servers { + if string(server.ID) == peerName { + raftServerFound = true + } + } + if !raftServerFound { + glog.V(0).Infof("adding new raft server: %s", peerName) + ms.Topo.HashicorpRaft.AddVoter( + hashicorpRaft.ServerID(peerName), + hashicorpRaft.ServerAddress(peerAddress.ToGrpcAddress()), 0, 0) + } + } + if ms.onPeerUpdatDoneCnExist { + ms.onPeerUpdatDoneCn <- peerName + } + } else if isLeader { + go func(peerName string) { + for { + select { + case <-time.After(RaftServerRemovalTime): + err := ms.MasterClient.WithClient(false, func(client master_pb.SeaweedClient) error { + _, err := client.RaftRemoveServer(context.Background(), &master_pb.RaftRemoveServerRequest{ + Id: peerName, + Force: false, + }) + return err + }) + if err != nil { + glog.Warningf("failed to removing old raft server %s: %v", peerName, err) + } + return + case peerDone := <-ms.onPeerUpdatDoneCn: + if peerName == peerDone { + return + } + } + } + }(peerName) + ms.onPeerUpdatDoneCnExist = true + } +} diff --git a/weed/server/master_server_handlers_admin.go b/weed/server/master_server_handlers_admin.go index 72d4e20d7..ade750ccc 100644 --- a/weed/server/master_server_handlers_admin.go +++ b/weed/server/master_server_handlers_admin.go @@ -64,7 +64,7 @@ func (ms *MasterServer) volumeVacuumHandler(w http.ResponseWriter, r *http.Reque } } // glog.Infoln("garbageThreshold =", gcThreshold) - ms.Topo.Vacuum(ms.grpcDialOption, gcThreshold, ms.preallocateSize) + ms.Topo.Vacuum(ms.grpcDialOption, gcThreshold, 0, "", ms.preallocateSize) ms.dirStatusHandler(w, r) } diff --git a/weed/server/master_server_handlers_ui.go b/weed/server/master_server_handlers_ui.go index 015bfbd00..d8260d8d2 100644 --- a/weed/server/master_server_handlers_ui.go +++ b/weed/server/master_server_handlers_ui.go @@ -5,6 +5,8 @@ import ( "time" "github.com/chrislusf/raft" + hashicorpRaft "github.com/hashicorp/raft" + ui "github.com/chrislusf/seaweedfs/weed/server/master_ui" "github.com/chrislusf/seaweedfs/weed/stats" "github.com/chrislusf/seaweedfs/weed/util" @@ -13,20 +15,40 @@ import ( func (ms *MasterServer) uiStatusHandler(w http.ResponseWriter, r *http.Request) { infos := make(map[string]interface{}) infos["Up Time"] = time.Now().Sub(startTime).String() - args := struct { - Version string - Topology interface{} - RaftServer raft.Server - Stats map[string]interface{} - Counters *stats.ServerStats - VolumeSizeLimitMB uint32 - }{ - util.Version(), - ms.Topo.ToMap(), - ms.Topo.RaftServer, - infos, - serverStats, - ms.option.VolumeSizeLimitMB, + infos["Max Volume Id"] = ms.Topo.GetMaxVolumeId() + if ms.Topo.RaftServer != nil { + args := struct { + Version string + Topology interface{} + RaftServer raft.Server + Stats map[string]interface{} + Counters *stats.ServerStats + VolumeSizeLimitMB uint32 + }{ + util.Version(), + ms.Topo.ToMap(), + ms.Topo.RaftServer, + infos, + serverStats, + ms.option.VolumeSizeLimitMB, + } + ui.StatusTpl.Execute(w, args) + } else if ms.Topo.HashicorpRaft != nil { + args := struct { + Version string + Topology interface{} + RaftServer *hashicorpRaft.Raft + Stats map[string]interface{} + Counters *stats.ServerStats + VolumeSizeLimitMB uint32 + }{ + util.Version(), + ms.Topo.ToMap(), + ms.Topo.HashicorpRaft, + infos, + serverStats, + ms.option.VolumeSizeLimitMB, + } + ui.StatusNewRaftTpl.Execute(w, args) } - ui.StatusTpl.Execute(w, args) } diff --git a/weed/server/master_ui/masterNewRaft.html b/weed/server/master_ui/masterNewRaft.html new file mode 100644 index 000000000..32afdceac --- /dev/null +++ b/weed/server/master_ui/masterNewRaft.html @@ -0,0 +1,121 @@ +<!DOCTYPE html> +<html> +<head> + <title>SeaweedFS {{ .Version }}</title> + <link rel="stylesheet" href="/seaweedfsstatic/bootstrap/3.3.1/css/bootstrap.min.css"> +</head> +<body> +<div class="container"> + <div class="page-header"> + <h1> + <a href="https://github.com/chrislusf/seaweedfs"><img src="/seaweedfsstatic/seaweed50x50.png"></img></a> + SeaweedFS <small>{{ .Version }}</small> + </h1> + </div> + + <div class="row"> + <div class="col-sm-6"> + <h2>Cluster status</h2> + <table class="table table-condensed table-striped"> + <tbody> + <tr> + <th>Volume Size Limit</th> + <td>{{ .VolumeSizeLimitMB }}MB</td> + </tr> + <tr> + <th>Free</th> + <td>{{ .Topology.Free }}</td> + </tr> + <tr> + <th>Max</th> + <td>{{ .Topology.Max }}</td> + </tr> + {{ with .RaftServer }} + <tr> + <th>Leader</th> + <td><a href="http://{{ .Leader }}">{{ .Leader }}</a></td> + </tr> + <tr> + <th>Other Masters</th> + <td class="col-sm-5"> + <ul class="list-unstyled"> + {{ range $k, $p := .GetConfiguration.Configuration.Servers }} + <li><a href="http://{{ $p.ID }}/ui/index.html">{{ $p.ID }}</a></li> + {{ end }} + </ul> + </td> + </tr> + {{ end }} + </tbody> + </table> + </div> + + <div class="col-sm-6"> + <h2>System Stats</h2> + <table class="table table-condensed table-striped"> + <tr> + <th>Concurrent Connections</th> + <td>{{ .Counters.Connections.WeekCounter.Sum }}</td> + </tr> + {{ range $key, $val := .Stats }} + <tr> + <th>{{ $key }}</th> + <td>{{ $val }}</td> + </tr> + {{ end }} + </table> + <h2>Raft Stats</h2> + <table class="table table-condensed table-striped"> + <tr> + <th>applied_index</th> + <td>{{ .RaftServer.Stats.applied_index }}</td> + </tr> + <tr> + <th>last_log_term</th> + <td>{{ .RaftServer.Stats.last_log_term }}</td> + </tr> + </table> + </div> + </div> + + <div class="row"> + <h2>Topology</h2> + <table class="table table-striped"> + <thead> + <tr> + <th>Data Center</th> + <th>Rack</th> + <th>RemoteAddr</th> + <th>#Volumes</th> + <th>Volume Ids</th> + <th>#ErasureCodingShards</th> + <th>Max</th> + </tr> + </thead> + <tbody> + {{ range $dc_index, $dc := .Topology.DataCenters }} + {{ range $rack_index, $rack := $dc.Racks }} + {{ range $dn_index, $dn := $rack.DataNodes }} + <tr> + <td><code>{{ $dc.Id }}</code></td> + <td>{{ $rack.Id }}</td> + <td><a href="http://{{ $dn.Url }}/ui/index.html">{{ $dn.Url }}</a> + {{ if ne $dn.PublicUrl $dn.Url }} + / <a href="http://{{ $dn.PublicUrl }}/ui/index.html">{{ $dn.PublicUrl }}</a> + {{ end }} + </td> + <td>{{ $dn.Volumes }}</td> + <td>{{ $dn.VolumeIds}}</td> + <td>{{ $dn.EcShards }}</td> + <td>{{ $dn.Max }}</td> + </tr> + {{ end }} + {{ end }} + {{ end }} + </tbody> + </table> + </div> + +</div> +</body> +</html> diff --git a/weed/server/master_ui/templates.go b/weed/server/master_ui/templates.go index 415022b97..a6dcc57d7 100644 --- a/weed/server/master_ui/templates.go +++ b/weed/server/master_ui/templates.go @@ -8,4 +8,8 @@ import ( //go:embed master.html var masterHtml string +//go:embed masterNewRaft.html +var masterNewRaftHtml string + var StatusTpl = template.Must(template.New("status").Parse(masterHtml)) +var StatusNewRaftTpl = template.Must(template.New("status").Parse(masterNewRaftHtml)) diff --git a/weed/server/raft_hashicorp.go b/weed/server/raft_hashicorp.go new file mode 100644 index 000000000..cc6578bf5 --- /dev/null +++ b/weed/server/raft_hashicorp.go @@ -0,0 +1,183 @@ +package weed_server + +// https://yusufs.medium.com/creating-distributed-kv-database-by-implementing-raft-consensus-using-golang-d0884eef2e28 +// https://github.com/Jille/raft-grpc-example/blob/cd5bcab0218f008e044fbeee4facdd01b06018ad/application.go#L18 + +import ( + "fmt" + transport "github.com/Jille/raft-grpc-transport" + "github.com/chrislusf/seaweedfs/weed/glog" + "github.com/chrislusf/seaweedfs/weed/pb" + "github.com/hashicorp/raft" + boltdb "github.com/hashicorp/raft-boltdb" + "google.golang.org/grpc" + "math/rand" + "os" + "path" + "path/filepath" + "sort" + "strings" + "time" +) + +const ( + ldbFile = "logs.dat" + sdbFile = "stable.dat" + updatePeersTimeout = 15 * time.Minute +) + +func getPeerIdx(self pb.ServerAddress, mapPeers map[string]pb.ServerAddress) int { + peers := make([]pb.ServerAddress, 0, len(mapPeers)) + for _, peer := range mapPeers { + peers = append(peers, peer) + } + sort.Slice(peers, func(i, j int) bool { + return strings.Compare(string(peers[i]), string(peers[j])) < 0 + }) + for i, peer := range peers { + if string(peer) == string(self) { + return i + } + } + return -1 +} + +func (s *RaftServer) AddPeersConfiguration() (cfg raft.Configuration) { + for _, peer := range s.peers { + cfg.Servers = append(cfg.Servers, raft.Server{ + Suffrage: raft.Voter, + ID: raft.ServerID(peer), + Address: raft.ServerAddress(peer.ToGrpcAddress()), + }) + } + return cfg +} + +func (s *RaftServer) UpdatePeers() { + for { + select { + case isLeader := <-s.RaftHashicorp.LeaderCh(): + if isLeader { + peerLeader := string(s.serverAddr) + existsPeerName := make(map[string]bool) + for _, server := range s.RaftHashicorp.GetConfiguration().Configuration().Servers { + if string(server.ID) == peerLeader { + continue + } + existsPeerName[string(server.ID)] = true + } + for _, peer := range s.peers { + peerName := string(peer) + if peerName == peerLeader || existsPeerName[peerName] { + continue + } + glog.V(0).Infof("adding new peer: %s", peerName) + s.RaftHashicorp.AddVoter( + raft.ServerID(peerName), raft.ServerAddress(peer.ToGrpcAddress()), 0, 0) + } + for peer, _ := range existsPeerName { + if _, found := s.peers[peer]; !found { + glog.V(0).Infof("removing old peer: %s", peer) + s.RaftHashicorp.RemoveServer(raft.ServerID(peer), 0, 0) + } + } + if _, found := s.peers[peerLeader]; !found { + glog.V(0).Infof("removing old leader peer: %s", peerLeader) + s.RaftHashicorp.RemoveServer(raft.ServerID(peerLeader), 0, 0) + } + } + return + case <-time.After(updatePeersTimeout): + return + } + } +} + +func NewHashicorpRaftServer(option *RaftServerOption) (*RaftServer, error) { + s := &RaftServer{ + peers: option.Peers, + serverAddr: option.ServerAddr, + dataDir: option.DataDir, + topo: option.Topo, + } + + c := raft.DefaultConfig() + c.LocalID = raft.ServerID(s.serverAddr) // TODO maybee the IP:port address will change + c.HeartbeatTimeout = time.Duration(float64(option.HeartbeatInterval) * (rand.Float64()*0.25 + 1)) + c.ElectionTimeout = option.ElectionTimeout + if c.LeaderLeaseTimeout > c.HeartbeatTimeout { + c.LeaderLeaseTimeout = c.HeartbeatTimeout + } + if glog.V(4) { + c.LogLevel = "Debug" + } else if glog.V(2) { + c.LogLevel = "Info" + } else if glog.V(1) { + c.LogLevel = "Warn" + } else if glog.V(0) { + c.LogLevel = "Error" + } + + if option.RaftBootstrap { + os.RemoveAll(path.Join(s.dataDir, ldbFile)) + os.RemoveAll(path.Join(s.dataDir, sdbFile)) + os.RemoveAll(path.Join(s.dataDir, "snapshot")) + } + baseDir := s.dataDir + + ldb, err := boltdb.NewBoltStore(filepath.Join(baseDir, ldbFile)) + if err != nil { + return nil, fmt.Errorf(`boltdb.NewBoltStore(%q): %v`, filepath.Join(baseDir, "logs.dat"), err) + } + + sdb, err := boltdb.NewBoltStore(filepath.Join(baseDir, sdbFile)) + if err != nil { + return nil, fmt.Errorf(`boltdb.NewBoltStore(%q): %v`, filepath.Join(baseDir, "stable.dat"), err) + } + + fss, err := raft.NewFileSnapshotStore(baseDir, 3, os.Stderr) + if err != nil { + return nil, fmt.Errorf(`raft.NewFileSnapshotStore(%q, ...): %v`, baseDir, err) + } + + s.TransportManager = transport.New(raft.ServerAddress(s.serverAddr), []grpc.DialOption{option.GrpcDialOption}) + + stateMachine := StateMachine{topo: option.Topo} + s.RaftHashicorp, err = raft.NewRaft(c, &stateMachine, ldb, sdb, fss, s.TransportManager.Transport()) + if err != nil { + return nil, fmt.Errorf("raft.NewRaft: %v", err) + } + if option.RaftBootstrap || len(s.RaftHashicorp.GetConfiguration().Configuration().Servers) == 0 { + cfg := s.AddPeersConfiguration() + // Need to get lock, in case all servers do this at the same time. + peerIdx := getPeerIdx(s.serverAddr, s.peers) + timeSpeep := time.Duration(float64(c.LeaderLeaseTimeout) * (rand.Float64()*0.25 + 1) * float64(peerIdx)) + glog.V(0).Infof("Bootstrapping idx: %d sleep: %v new cluster: %+v", peerIdx, timeSpeep, cfg) + time.Sleep(timeSpeep) + f := s.RaftHashicorp.BootstrapCluster(cfg) + if err := f.Error(); err != nil { + return nil, fmt.Errorf("raft.Raft.BootstrapCluster: %v", err) + } + } else { + go s.UpdatePeers() + } + + ticker := time.NewTicker(c.HeartbeatTimeout * 10) + if glog.V(4) { + go func() { + for { + select { + case <-ticker.C: + cfuture := s.RaftHashicorp.GetConfiguration() + if err = cfuture.Error(); err != nil { + glog.Fatalf("error getting config: %s", err) + } + configuration := cfuture.Configuration() + glog.V(4).Infof("Showing peers known by %s:\n%+v", s.RaftHashicorp.String(), configuration.Servers) + } + } + }() + } + + return s, nil +} diff --git a/weed/server/raft_server.go b/weed/server/raft_server.go index f22b7c45d..8c372f0cc 100644 --- a/weed/server/raft_server.go +++ b/weed/server/raft_server.go @@ -2,11 +2,12 @@ package weed_server import ( "encoding/json" + transport "github.com/Jille/raft-grpc-transport" + "io" + "io/ioutil" "math/rand" "os" "path" - "sort" - "strings" "time" "google.golang.org/grpc" @@ -14,6 +15,7 @@ import ( "github.com/chrislusf/seaweedfs/weed/pb" "github.com/chrislusf/raft" + hashicorpRaft "github.com/hashicorp/raft" "github.com/chrislusf/seaweedfs/weed/glog" "github.com/chrislusf/seaweedfs/weed/topology" @@ -21,21 +23,24 @@ import ( type RaftServerOption struct { GrpcDialOption grpc.DialOption - Peers []pb.ServerAddress + Peers map[string]pb.ServerAddress ServerAddr pb.ServerAddress DataDir string Topo *topology.Topology RaftResumeState bool HeartbeatInterval time.Duration ElectionTimeout time.Duration + RaftBootstrap bool } type RaftServer struct { - peers []pb.ServerAddress // initial peers to join with - raftServer raft.Server - dataDir string - serverAddr pb.ServerAddress - topo *topology.Topology + peers map[string]pb.ServerAddress // initial peers to join with + raftServer raft.Server + RaftHashicorp *hashicorpRaft.Raft + TransportManager *transport.Manager + dataDir string + serverAddr pb.ServerAddress + topo *topology.Topology *raft.GrpcServer } @@ -44,6 +49,8 @@ type StateMachine struct { topo *topology.Topology } +var _ hashicorpRaft.FSM = &StateMachine{} + func (s StateMachine) Save() ([]byte, error) { state := topology.MaxVolumeIdCommand{ MaxVolumeId: s.topo.GetMaxVolumeId(), @@ -63,6 +70,36 @@ func (s StateMachine) Recovery(data []byte) error { return nil } +func (s *StateMachine) Apply(l *hashicorpRaft.Log) interface{} { + before := s.topo.GetMaxVolumeId() + state := topology.MaxVolumeIdCommand{} + err := json.Unmarshal(l.Data, &state) + if err != nil { + return err + } + s.topo.UpAdjustMaxVolumeId(state.MaxVolumeId) + + glog.V(1).Infoln("max volume id", before, "==>", s.topo.GetMaxVolumeId()) + return nil +} + +func (s *StateMachine) Snapshot() (hashicorpRaft.FSMSnapshot, error) { + return &topology.MaxVolumeIdCommand{ + MaxVolumeId: s.topo.GetMaxVolumeId(), + }, nil +} + +func (s *StateMachine) Restore(r io.ReadCloser) error { + b, err := ioutil.ReadAll(r) + if err != nil { + return err + } + if err := s.Recovery(b); err != nil { + return err + } + return nil +} + func NewRaftServer(option *RaftServerOption) (*RaftServer, error) { s := &RaftServer{ peers: option.Peers, @@ -88,7 +125,7 @@ func NewRaftServer(option *RaftServerOption) (*RaftServer, error) { os.RemoveAll(path.Join(s.dataDir, "conf")) os.RemoveAll(path.Join(s.dataDir, "snapshot")) } - if err := os.MkdirAll(path.Join(s.dataDir, "snapshot"), 0600); err != nil { + if err := os.MkdirAll(path.Join(s.dataDir, "snapshot"), 0700); err != nil { return nil, err } @@ -108,23 +145,15 @@ func NewRaftServer(option *RaftServerOption) (*RaftServer, error) { return nil, err } - for _, peer := range s.peers { - if err := s.raftServer.AddPeer(string(peer), peer.ToGrpcAddress()); err != nil { + for name, peer := range s.peers { + if err := s.raftServer.AddPeer(name, peer.ToGrpcAddress()); err != nil { return nil, err } } // Remove deleted peers for existsPeerName := range s.raftServer.Peers() { - exists := false - var existingPeer pb.ServerAddress - for _, peer := range s.peers { - if peer.String() == existsPeerName { - exists, existingPeer = true, peer - break - } - } - if !exists { + if existingPeer, found := s.peers[existsPeerName]; !found { if err := s.raftServer.RemovePeer(existsPeerName); err != nil { glog.V(0).Infoln(err) return nil, err @@ -136,36 +165,26 @@ func NewRaftServer(option *RaftServerOption) (*RaftServer, error) { s.GrpcServer = raft.NewGrpcServer(s.raftServer) - if s.raftServer.IsLogEmpty() && isTheFirstOne(option.ServerAddr, s.peers) { - // Initialize the server by joining itself. - // s.DoJoinCommand() - } - glog.V(0).Infof("current cluster leader: %v", s.raftServer.Leader()) return s, nil } func (s *RaftServer) Peers() (members []string) { - peers := s.raftServer.Peers() - - for _, p := range peers { - members = append(members, p.Name) + if s.raftServer != nil { + peers := s.raftServer.Peers() + for _, p := range peers { + members = append(members, p.Name) + } + } else if s.RaftHashicorp != nil { + cfg := s.RaftHashicorp.GetConfiguration() + for _, p := range cfg.Configuration().Servers { + members = append(members, string(p.ID)) + } } - return } -func isTheFirstOne(self pb.ServerAddress, peers []pb.ServerAddress) bool { - sort.Slice(peers, func(i, j int) bool { - return strings.Compare(string(peers[i]), string(peers[j])) < 0 - }) - if len(peers) <= 0 { - return true - } - return self == peers[0] -} - func (s *RaftServer) DoJoinCommand() { glog.V(0).Infoln("Initializing new cluster") diff --git a/weed/server/raft_server_handlers.go b/weed/server/raft_server_handlers.go index 7e58f1e92..cc3e6e37f 100644 --- a/weed/server/raft_server_handlers.go +++ b/weed/server/raft_server_handlers.go @@ -25,3 +25,11 @@ func (s *RaftServer) StatusHandler(w http.ResponseWriter, r *http.Request) { } writeJsonQuiet(w, r, http.StatusOK, ret) } + +func (s *RaftServer) StatsRaftHandler(w http.ResponseWriter, r *http.Request) { + if s.RaftHashicorp == nil { + writeJsonQuiet(w, r, http.StatusNotFound, nil) + return + } + writeJsonQuiet(w, r, http.StatusOK, s.RaftHashicorp.Stats()) +} diff --git a/weed/server/volume_grpc_admin.go b/weed/server/volume_grpc_admin.go index 898c3da12..476937847 100644 --- a/weed/server/volume_grpc_admin.go +++ b/weed/server/volume_grpc_admin.go @@ -3,7 +3,12 @@ package weed_server import ( "context" "fmt" + "github.com/chrislusf/seaweedfs/weed/cluster" + "github.com/chrislusf/seaweedfs/weed/pb" + "github.com/chrislusf/seaweedfs/weed/pb/filer_pb" + "github.com/chrislusf/seaweedfs/weed/pb/master_pb" "path/filepath" + "time" "github.com/chrislusf/seaweedfs/weed/glog" "github.com/chrislusf/seaweedfs/weed/pb/volume_server_pb" @@ -247,3 +252,41 @@ func (vs *VolumeServer) VolumeNeedleStatus(ctx context.Context, req *volume_serv return resp, nil } + +func (vs *VolumeServer) Ping(ctx context.Context, req *volume_server_pb.PingRequest) (resp *volume_server_pb.PingResponse, pingErr error) { + resp = &volume_server_pb.PingResponse{ + StartTimeNs: time.Now().UnixNano(), + } + if req.TargetType == cluster.FilerType { + pingErr = pb.WithFilerClient(false, pb.ServerAddress(req.Target), vs.grpcDialOption, func(client filer_pb.SeaweedFilerClient) error { + pingResp, err := client.Ping(ctx, &filer_pb.PingRequest{}) + if pingResp != nil { + resp.RemoteTimeNs = pingResp.StartTimeNs + } + return err + }) + } + if req.TargetType == cluster.VolumeServerType { + pingErr = pb.WithVolumeServerClient(false, pb.ServerAddress(req.Target), vs.grpcDialOption, func(client volume_server_pb.VolumeServerClient) error { + pingResp, err := client.Ping(ctx, &volume_server_pb.PingRequest{}) + if pingResp != nil { + resp.RemoteTimeNs = pingResp.StartTimeNs + } + return err + }) + } + if req.TargetType == cluster.MasterType { + pingErr = pb.WithMasterClient(false, pb.ServerAddress(req.Target), vs.grpcDialOption, func(client master_pb.SeaweedClient) error { + pingResp, err := client.Ping(ctx, &master_pb.PingRequest{}) + if pingResp != nil { + resp.RemoteTimeNs = pingResp.StartTimeNs + } + return err + }) + } + if pingErr != nil { + pingErr = fmt.Errorf("ping %s %s: %v", req.TargetType, req.Target, pingErr) + } + resp.StopTimeNs = time.Now().UnixNano() + return +} |
