aboutsummaryrefslogtreecommitdiff
path: root/weed/server
diff options
context:
space:
mode:
authoryourchanges <yourchanges@gmail.com>2020-07-10 09:44:32 +0800
committerGitHub <noreply@github.com>2020-07-10 09:44:32 +0800
commite67096656b0fcdc313c7d8983b6ce36a54d794a3 (patch)
tree4d6cfd722cf6e19b5aa8253e477ddc596ea5e193 /weed/server
parent2b3cef7780a5e91d2072a33411926f9b30c88ee2 (diff)
parent1b680c06c1de27e6a3899c089ec354a9eb08ea44 (diff)
downloadseaweedfs-e67096656b0fcdc313c7d8983b6ce36a54d794a3.tar.xz
seaweedfs-e67096656b0fcdc313c7d8983b6ce36a54d794a3.zip
Merge pull request #1 from chrislusf/master
update
Diffstat (limited to 'weed/server')
-rw-r--r--weed/server/common.go156
-rw-r--r--weed/server/common_test.go31
-rw-r--r--weed/server/filer_grpc_server.go301
-rw-r--r--weed/server/filer_grpc_server_rename.go141
-rw-r--r--weed/server/filer_grpc_server_sub_meta.go136
-rw-r--r--weed/server/filer_server.go157
-rw-r--r--weed/server/filer_server_handlers.go19
-rw-r--r--weed/server/filer_server_handlers_read.go250
-rw-r--r--weed/server/filer_server_handlers_read_dir.go9
-rw-r--r--weed/server/filer_server_handlers_write.go331
-rw-r--r--weed/server/filer_server_handlers_write_autochunk.go138
-rw-r--r--weed/server/filer_server_handlers_write_cipher.go90
-rw-r--r--weed/server/filer_ui/breadcrumb.go15
-rw-r--r--weed/server/filer_ui/templates.go15
-rw-r--r--weed/server/master_grpc_server.go213
-rw-r--r--weed/server/master_grpc_server_admin.go138
-rw-r--r--weed/server/master_grpc_server_collection.go95
-rw-r--r--weed/server/master_grpc_server_volume.go96
-rw-r--r--weed/server/master_server.go254
-rw-r--r--weed/server/master_server_handlers.go107
-rw-r--r--weed/server/master_server_handlers_admin.go100
-rw-r--r--weed/server/master_server_handlers_ui.go5
-rw-r--r--weed/server/master_ui/templates.go6
-rw-r--r--weed/server/raft_server.go64
-rw-r--r--weed/server/raft_server_handlers.go11
-rw-r--r--weed/server/volume_grpc_admin.go86
-rw-r--r--weed/server/volume_grpc_batch_delete.go55
-rw-r--r--weed/server/volume_grpc_client_to_master.go126
-rw-r--r--weed/server/volume_grpc_copy.go286
-rw-r--r--weed/server/volume_grpc_copy_incremental.go66
-rw-r--r--weed/server/volume_grpc_erasure_coding.go391
-rw-r--r--weed/server/volume_grpc_query.go69
-rw-r--r--weed/server/volume_grpc_sync.go101
-rw-r--r--weed/server/volume_grpc_tail.go136
-rw-r--r--weed/server/volume_grpc_tier_download.go85
-rw-r--r--weed/server/volume_grpc_tier_upload.go100
-rw-r--r--weed/server/volume_grpc_vacuum.go29
-rw-r--r--weed/server/volume_server.go94
-rw-r--r--weed/server/volume_server_handlers.go47
-rw-r--r--weed/server/volume_server_handlers_admin.go13
-rw-r--r--weed/server/volume_server_handlers_read.go208
-rw-r--r--weed/server/volume_server_handlers_ui.go32
-rw-r--r--weed/server/volume_server_handlers_write.go87
-rw-r--r--weed/server/volume_server_ui/templates.go92
-rw-r--r--weed/server/webdav_server.go573
45 files changed, 4477 insertions, 1077 deletions
diff --git a/weed/server/common.go b/weed/server/common.go
index d88abfdc8..bc6008864 100644
--- a/weed/server/common.go
+++ b/weed/server/common.go
@@ -1,26 +1,29 @@
package weed_server
import (
- "bytes"
"encoding/json"
"errors"
"fmt"
+ "io"
+ "mime/multipart"
"net/http"
"path/filepath"
"strconv"
"strings"
"time"
+ "google.golang.org/grpc"
+
"github.com/chrislusf/seaweedfs/weed/glog"
"github.com/chrislusf/seaweedfs/weed/operation"
- "github.com/chrislusf/seaweedfs/weed/security"
"github.com/chrislusf/seaweedfs/weed/stats"
- "github.com/chrislusf/seaweedfs/weed/storage"
+ "github.com/chrislusf/seaweedfs/weed/storage/needle"
"github.com/chrislusf/seaweedfs/weed/util"
- _ "github.com/chrislusf/seaweedfs/weed/statik"
"github.com/gorilla/mux"
statik "github.com/rakyll/statik/fs"
+
+ _ "github.com/chrislusf/seaweedfs/weed/statik"
)
var serverStats *stats.ServerStats
@@ -43,14 +46,26 @@ func writeJson(w http.ResponseWriter, r *http.Request, httpStatus int, obj inter
if err != nil {
return
}
+
+ if httpStatus >= 400 {
+ glog.V(0).Infof("response method:%s URL:%s with httpStatus:%d and JSON:%s",
+ r.Method, r.URL.String(), httpStatus, string(bytes))
+ }
+
callback := r.FormValue("callback")
if callback == "" {
w.Header().Set("Content-Type", "application/json")
w.WriteHeader(httpStatus)
+ if httpStatus == http.StatusNotModified {
+ return
+ }
_, err = w.Write(bytes)
} else {
w.Header().Set("Content-Type", "application/javascript")
w.WriteHeader(httpStatus)
+ if httpStatus == http.StatusNotModified {
+ return
+ }
if _, err = w.Write([]uint8(callback)); err != nil {
return
}
@@ -69,7 +84,8 @@ func writeJson(w http.ResponseWriter, r *http.Request, httpStatus int, obj inter
// wrapper for writeJson - just logs errors
func writeJsonQuiet(w http.ResponseWriter, r *http.Request, httpStatus int, obj interface{}) {
if err := writeJson(w, r, httpStatus, obj); err != nil {
- glog.V(0).Infof("error writing JSON %+v status %d: %v", obj, httpStatus, err)
+ glog.V(0).Infof("error writing JSON status %d: %v", httpStatus, err)
+ glog.V(1).Infof("JSON content: %+v", obj)
}
}
func writeJsonError(w http.ResponseWriter, r *http.Request, httpStatus int, err error) {
@@ -82,8 +98,7 @@ func debug(params ...interface{}) {
glog.V(4).Infoln(params...)
}
-func submitForClientHandler(w http.ResponseWriter, r *http.Request, masterUrl string) {
- jwt := security.GetJwt(r)
+func submitForClientHandler(w http.ResponseWriter, r *http.Request, masterUrl string, grpcDialOption grpc.DialOption) {
m := make(map[string]interface{})
if r.Method != "POST" {
writeJsonError(w, r, http.StatusMethodNotAllowed, errors.New("Only submit via POST!"))
@@ -91,13 +106,13 @@ func submitForClientHandler(w http.ResponseWriter, r *http.Request, masterUrl st
}
debug("parsing upload file...")
- fname, data, mimeType, pairMap, isGzipped, originalDataSize, lastModified, _, _, pe := storage.ParseUpload(r)
+ pu, pe := needle.ParseUpload(r, 256*1024*1024)
if pe != nil {
writeJsonError(w, r, http.StatusBadRequest, pe)
return
}
- debug("assigning file id for", fname)
+ debug("assigning file id for", pu.FileName)
r.ParseForm()
count := uint64(1)
if r.FormValue("count") != "" {
@@ -109,32 +124,33 @@ func submitForClientHandler(w http.ResponseWriter, r *http.Request, masterUrl st
}
ar := &operation.VolumeAssignRequest{
Count: count,
+ DataCenter: r.FormValue("dataCenter"),
Replication: r.FormValue("replication"),
Collection: r.FormValue("collection"),
Ttl: r.FormValue("ttl"),
}
- assignResult, ae := operation.Assign(masterUrl, ar)
+ assignResult, ae := operation.Assign(masterUrl, grpcDialOption, ar)
if ae != nil {
writeJsonError(w, r, http.StatusInternalServerError, ae)
return
}
url := "http://" + assignResult.Url + "/" + assignResult.Fid
- if lastModified != 0 {
- url = url + "?ts=" + strconv.FormatUint(lastModified, 10)
+ if pu.ModifiedTime != 0 {
+ url = url + "?ts=" + strconv.FormatUint(pu.ModifiedTime, 10)
}
debug("upload file to store", url)
- uploadResult, err := operation.Upload(url, fname, bytes.NewReader(data), isGzipped, mimeType, pairMap, jwt)
+ uploadResult, err := operation.UploadData(url, pu.FileName, false, pu.Data, pu.IsGzipped, pu.MimeType, pu.PairMap, assignResult.Auth)
if err != nil {
writeJsonError(w, r, http.StatusInternalServerError, err)
return
}
- m["fileName"] = fname
+ m["fileName"] = pu.FileName
m["fid"] = assignResult.Fid
m["fileUrl"] = assignResult.PublicUrl + "/" + assignResult.Fid
- m["size"] = originalDataSize
+ m["size"] = pu.OriginalDataSize
m["eTag"] = uploadResult.ETag
writeJsonQuiet(w, r, http.StatusCreated, m)
return
@@ -175,19 +191,19 @@ func parseURLPath(path string) (vid, fid, filename, ext string, isVolumeIdOnly b
func statsHealthHandler(w http.ResponseWriter, r *http.Request) {
m := make(map[string]interface{})
- m["Version"] = util.VERSION
+ m["Version"] = util.Version()
writeJsonQuiet(w, r, http.StatusOK, m)
}
func statsCounterHandler(w http.ResponseWriter, r *http.Request) {
m := make(map[string]interface{})
- m["Version"] = util.VERSION
+ m["Version"] = util.Version()
m["Counters"] = serverStats
writeJsonQuiet(w, r, http.StatusOK, m)
}
func statsMemoryHandler(w http.ResponseWriter, r *http.Request) {
m := make(map[string]interface{})
- m["Version"] = util.VERSION
+ m["Version"] = util.Version()
m["Memory"] = stats.MemStat()
writeJsonQuiet(w, r, http.StatusOK, m)
}
@@ -201,3 +217,107 @@ func handleStaticResources2(r *mux.Router) {
r.Handle("/favicon.ico", http.FileServer(statikFS))
r.PathPrefix("/seaweedfsstatic/").Handler(http.StripPrefix("/seaweedfsstatic", http.FileServer(statikFS)))
}
+
+func adjustHeadersAfterHEAD(w http.ResponseWriter, r *http.Request, filename string) {
+ if filename != "" {
+ contentDisposition := "inline"
+ if r.FormValue("dl") != "" {
+ if dl, _ := strconv.ParseBool(r.FormValue("dl")); dl {
+ contentDisposition = "attachment"
+ }
+ }
+ w.Header().Set("Content-Disposition", contentDisposition+`; filename="`+fileNameEscaper.Replace(filename)+`"`)
+ }
+}
+
+func processRangeRequest(r *http.Request, w http.ResponseWriter, totalSize int64, mimeType string, writeFn func(writer io.Writer, offset int64, size int64) error) {
+ rangeReq := r.Header.Get("Range")
+
+ if rangeReq == "" {
+ w.Header().Set("Content-Length", strconv.FormatInt(totalSize, 10))
+ if err := writeFn(w, 0, totalSize); err != nil {
+ http.Error(w, err.Error(), http.StatusInternalServerError)
+ return
+ }
+ return
+ }
+
+ //the rest is dealing with partial content request
+ //mostly copy from src/pkg/net/http/fs.go
+ ranges, err := parseRange(rangeReq, totalSize)
+ if err != nil {
+ http.Error(w, err.Error(), http.StatusRequestedRangeNotSatisfiable)
+ return
+ }
+ if sumRangesSize(ranges) > totalSize {
+ // The total number of bytes in all the ranges
+ // is larger than the size of the file by
+ // itself, so this is probably an attack, or a
+ // dumb client. Ignore the range request.
+ return
+ }
+ if len(ranges) == 0 {
+ return
+ }
+ if len(ranges) == 1 {
+ // RFC 2616, Section 14.16:
+ // "When an HTTP message includes the content of a single
+ // range (for example, a response to a request for a
+ // single range, or to a request for a set of ranges
+ // that overlap without any holes), this content is
+ // transmitted with a Content-Range header, and a
+ // Content-Length header showing the number of bytes
+ // actually transferred.
+ // ...
+ // A response to a request for a single range MUST NOT
+ // be sent using the multipart/byteranges media type."
+ ra := ranges[0]
+ w.Header().Set("Content-Length", strconv.FormatInt(ra.length, 10))
+ w.Header().Set("Content-Range", ra.contentRange(totalSize))
+ w.WriteHeader(http.StatusPartialContent)
+
+ err = writeFn(w, ra.start, ra.length)
+ if err != nil {
+ http.Error(w, err.Error(), http.StatusInternalServerError)
+ return
+ }
+ return
+ }
+
+ // process multiple ranges
+ for _, ra := range ranges {
+ if ra.start > totalSize {
+ http.Error(w, "Out of Range", http.StatusRequestedRangeNotSatisfiable)
+ return
+ }
+ }
+ sendSize := rangesMIMESize(ranges, mimeType, totalSize)
+ pr, pw := io.Pipe()
+ mw := multipart.NewWriter(pw)
+ w.Header().Set("Content-Type", "multipart/byteranges; boundary="+mw.Boundary())
+ sendContent := pr
+ defer pr.Close() // cause writing goroutine to fail and exit if CopyN doesn't finish.
+ go func() {
+ for _, ra := range ranges {
+ part, e := mw.CreatePart(ra.mimeHeader(mimeType, totalSize))
+ if e != nil {
+ pw.CloseWithError(e)
+ return
+ }
+ if e = writeFn(part, ra.start, ra.length); e != nil {
+ pw.CloseWithError(e)
+ return
+ }
+ }
+ mw.Close()
+ pw.Close()
+ }()
+ if w.Header().Get("Content-Encoding") == "" {
+ w.Header().Set("Content-Length", strconv.FormatInt(sendSize, 10))
+ }
+ w.WriteHeader(http.StatusPartialContent)
+ if _, err := io.CopyN(w, sendContent, sendSize); err != nil {
+ http.Error(w, "Internal Error", http.StatusInternalServerError)
+ return
+ }
+}
diff --git a/weed/server/common_test.go b/weed/server/common_test.go
new file mode 100644
index 000000000..2e6c70bfe
--- /dev/null
+++ b/weed/server/common_test.go
@@ -0,0 +1,31 @@
+package weed_server
+
+import (
+ "strings"
+ "testing"
+)
+
+func TestParseURL(t *testing.T) {
+ if vid, fid, _, _, _ := parseURLPath("/1,06dfa8a684"); true {
+ if vid != "1" {
+ t.Errorf("fail to parse vid: %s", vid)
+ }
+ if fid != "06dfa8a684" {
+ t.Errorf("fail to parse fid: %s", fid)
+ }
+ }
+ if vid, fid, _, _, _ := parseURLPath("/1,06dfa8a684_1"); true {
+ if vid != "1" {
+ t.Errorf("fail to parse vid: %s", vid)
+ }
+ if fid != "06dfa8a684_1" {
+ t.Errorf("fail to parse fid: %s", fid)
+ }
+ if sepIndex := strings.LastIndex(fid, "_"); sepIndex > 0 {
+ fid = fid[:sepIndex]
+ }
+ if fid != "06dfa8a684" {
+ t.Errorf("fail to parse fid: %s", fid)
+ }
+ }
+}
diff --git a/weed/server/filer_grpc_server.go b/weed/server/filer_grpc_server.go
index 06589e3c6..17e32731c 100644
--- a/weed/server/filer_grpc_server.go
+++ b/weed/server/filer_grpc_server.go
@@ -19,9 +19,15 @@ import (
func (fs *FilerServer) LookupDirectoryEntry(ctx context.Context, req *filer_pb.LookupDirectoryEntryRequest) (*filer_pb.LookupDirectoryEntryResponse, error) {
- entry, err := fs.filer.FindEntry(filer2.FullPath(filepath.Join(req.Directory, req.Name)))
+ glog.V(4).Infof("LookupDirectoryEntry %s", filepath.Join(req.Directory, req.Name))
+
+ entry, err := fs.filer.FindEntry(ctx, util.JoinPath(req.Directory, req.Name))
+ if err == filer_pb.ErrNotFound {
+ return &filer_pb.LookupDirectoryEntryResponse{}, err
+ }
if err != nil {
- return nil, fmt.Errorf("%s not found under %s: %v", req.Name, req.Directory, err)
+ glog.V(3).Infof("LookupDirectoryEntry %s: %+v, ", filepath.Join(req.Directory, req.Name), err)
+ return nil, err
}
return &filer_pb.LookupDirectoryEntryResponse{
@@ -30,27 +36,35 @@ func (fs *FilerServer) LookupDirectoryEntry(ctx context.Context, req *filer_pb.L
IsDirectory: entry.IsDirectory(),
Attributes: filer2.EntryAttributeToPb(entry),
Chunks: entry.Chunks,
+ Extended: entry.Extended,
},
}, nil
}
-func (fs *FilerServer) ListEntries(ctx context.Context, req *filer_pb.ListEntriesRequest) (*filer_pb.ListEntriesResponse, error) {
+func (fs *FilerServer) ListEntries(req *filer_pb.ListEntriesRequest, stream filer_pb.SeaweedFiler_ListEntriesServer) error {
+
+ glog.V(4).Infof("ListEntries %v", req)
limit := int(req.Limit)
if limit == 0 {
limit = fs.option.DirListingLimit
}
- resp := &filer_pb.ListEntriesResponse{}
+ paginationLimit := filer2.PaginationSize
+ if limit < paginationLimit {
+ paginationLimit = limit
+ }
+
lastFileName := req.StartFromFileName
includeLastFile := req.InclusiveStartFrom
for limit > 0 {
- entries, err := fs.filer.ListDirectoryEntries(filer2.FullPath(req.Directory), lastFileName, includeLastFile, 1024)
+ entries, err := fs.filer.ListDirectoryEntries(stream.Context(), util.FullPath(req.Directory), lastFileName, includeLastFile, paginationLimit)
+
if err != nil {
- return nil, err
+ return err
}
if len(entries) == 0 {
- return resp, nil
+ return nil
}
includeLastFile = false
@@ -65,22 +79,31 @@ func (fs *FilerServer) ListEntries(ctx context.Context, req *filer_pb.ListEntrie
}
}
- resp.Entries = append(resp.Entries, &filer_pb.Entry{
- Name: entry.Name(),
- IsDirectory: entry.IsDirectory(),
- Chunks: entry.Chunks,
- Attributes: filer2.EntryAttributeToPb(entry),
- })
+ if err := stream.Send(&filer_pb.ListEntriesResponse{
+ Entry: &filer_pb.Entry{
+ Name: entry.Name(),
+ IsDirectory: entry.IsDirectory(),
+ Chunks: entry.Chunks,
+ Attributes: filer2.EntryAttributeToPb(entry),
+ Extended: entry.Extended,
+ },
+ }); err != nil {
+ return err
+ }
+
limit--
+ if limit == 0 {
+ return nil
+ }
}
- if len(resp.Entries) < 1024 {
+ if len(entries) < paginationLimit {
break
}
}
- return resp, nil
+ return nil
}
func (fs *FilerServer) LookupVolume(ctx context.Context, req *filer_pb.LookupVolumeRequest) (*filer_pb.LookupVolumeResponse, error) {
@@ -96,7 +119,11 @@ func (fs *FilerServer) LookupVolume(ctx context.Context, req *filer_pb.LookupVol
return nil, err
}
var locs []*filer_pb.Location
- for _, loc := range fs.filer.MasterClient.GetLocations(uint32(vid)) {
+ locations, found := fs.filer.MasterClient.GetLocations(uint32(vid))
+ if !found {
+ continue
+ }
+ for _, loc := range locations {
locs = append(locs, &filer_pb.Location{
Url: loc.Url,
PublicUrl: loc.PublicUrl,
@@ -112,49 +139,60 @@ func (fs *FilerServer) LookupVolume(ctx context.Context, req *filer_pb.LookupVol
func (fs *FilerServer) CreateEntry(ctx context.Context, req *filer_pb.CreateEntryRequest) (resp *filer_pb.CreateEntryResponse, err error) {
- fullpath := filer2.FullPath(filepath.Join(req.Directory, req.Entry.Name))
- chunks, garbages := filer2.CompactFileChunks(req.Entry.Chunks)
+ glog.V(4).Infof("CreateEntry %v", req)
- fs.filer.DeleteChunks(garbages)
+ resp = &filer_pb.CreateEntryResponse{}
+
+ chunks, garbages := filer2.CompactFileChunks(req.Entry.Chunks)
if req.Entry.Attributes == nil {
- return nil, fmt.Errorf("can not create entry with empty attributes")
+ glog.V(3).Infof("CreateEntry %s: nil attributes", filepath.Join(req.Directory, req.Entry.Name))
+ resp.Error = fmt.Sprintf("can not create entry with empty attributes")
+ return
}
- err = fs.filer.CreateEntry(&filer2.Entry{
- FullPath: fullpath,
+ createErr := fs.filer.CreateEntry(ctx, &filer2.Entry{
+ FullPath: util.JoinPath(req.Directory, req.Entry.Name),
Attr: filer2.PbToEntryAttribute(req.Entry.Attributes),
Chunks: chunks,
- })
+ }, req.OExcl, req.IsFromOtherCluster)
- if err == nil {
+ if createErr == nil {
+ fs.filer.DeleteChunks(garbages)
+ } else {
+ glog.V(3).Infof("CreateEntry %s: %v", filepath.Join(req.Directory, req.Entry.Name), createErr)
+ resp.Error = createErr.Error()
}
- return &filer_pb.CreateEntryResponse{}, err
+ return
}
func (fs *FilerServer) UpdateEntry(ctx context.Context, req *filer_pb.UpdateEntryRequest) (*filer_pb.UpdateEntryResponse, error) {
- fullpath := filepath.Join(req.Directory, req.Entry.Name)
- entry, err := fs.filer.FindEntry(filer2.FullPath(fullpath))
+ glog.V(4).Infof("UpdateEntry %v", req)
+
+ fullpath := util.Join(req.Directory, req.Entry.Name)
+ entry, err := fs.filer.FindEntry(ctx, util.FullPath(fullpath))
if err != nil {
return &filer_pb.UpdateEntryResponse{}, fmt.Errorf("not found %s: %v", fullpath, err)
}
// remove old chunks if not included in the new ones
- unusedChunks := filer2.FindUnusedFileChunks(entry.Chunks, req.Entry.Chunks)
+ unusedChunks := filer2.MinusChunks(entry.Chunks, req.Entry.Chunks)
chunks, garbages := filer2.CompactFileChunks(req.Entry.Chunks)
newEntry := &filer2.Entry{
- FullPath: filer2.FullPath(filepath.Join(req.Directory, req.Entry.Name)),
+ FullPath: util.JoinPath(req.Directory, req.Entry.Name),
Attr: entry.Attr,
+ Extended: req.Entry.Extended,
Chunks: chunks,
}
- glog.V(3).Infof("updating %s: %+v, chunks %d: %v => %+v, chunks %d: %v",
+ glog.V(3).Infof("updating %s: %+v, chunks %d: %v => %+v, chunks %d: %v, extended: %v => %v",
fullpath, entry.Attr, len(entry.Chunks), entry.Chunks,
- req.Entry.Attributes, len(req.Entry.Chunks), req.Entry.Chunks)
+ req.Entry.Attributes, len(req.Entry.Chunks), req.Entry.Chunks,
+ entry.Extended, req.Entry.Extended)
if req.Entry.Attributes != nil {
if req.Entry.Attributes.Mtime != 0 {
@@ -175,19 +213,62 @@ func (fs *FilerServer) UpdateEntry(ctx context.Context, req *filer_pb.UpdateEntr
return &filer_pb.UpdateEntryResponse{}, err
}
- if err = fs.filer.UpdateEntry(entry, newEntry); err == nil {
+ if err = fs.filer.UpdateEntry(ctx, entry, newEntry); err == nil {
fs.filer.DeleteChunks(unusedChunks)
fs.filer.DeleteChunks(garbages)
+ } else {
+ glog.V(3).Infof("UpdateEntry %s: %v", filepath.Join(req.Directory, req.Entry.Name), err)
}
- fs.filer.NotifyUpdateEvent(entry, newEntry, true)
+ fs.filer.NotifyUpdateEvent(ctx, entry, newEntry, true, req.IsFromOtherCluster)
return &filer_pb.UpdateEntryResponse{}, err
}
+func (fs *FilerServer) AppendToEntry(ctx context.Context, req *filer_pb.AppendToEntryRequest) (*filer_pb.AppendToEntryResponse, error) {
+
+ glog.V(4).Infof("AppendToEntry %v", req)
+
+ fullpath := util.NewFullPath(req.Directory, req.EntryName)
+ var offset int64 = 0
+ entry, err := fs.filer.FindEntry(ctx, util.FullPath(fullpath))
+ if err == filer_pb.ErrNotFound {
+ entry = &filer2.Entry{
+ FullPath: fullpath,
+ Attr: filer2.Attr{
+ Crtime: time.Now(),
+ Mtime: time.Now(),
+ Mode: os.FileMode(0644),
+ Uid: OS_UID,
+ Gid: OS_GID,
+ },
+ }
+ } else {
+ offset = int64(filer2.TotalSize(entry.Chunks))
+ }
+
+ for _, chunk := range req.Chunks {
+ chunk.Offset = offset
+ offset += int64(chunk.Size)
+ }
+
+ entry.Chunks = append(entry.Chunks, req.Chunks...)
+
+ err = fs.filer.CreateEntry(context.Background(), entry, false, false)
+
+ return &filer_pb.AppendToEntryResponse{}, err
+}
+
func (fs *FilerServer) DeleteEntry(ctx context.Context, req *filer_pb.DeleteEntryRequest) (resp *filer_pb.DeleteEntryResponse, err error) {
- err = fs.filer.DeleteEntryMetaAndData(filer2.FullPath(filepath.Join(req.Directory, req.Name)), req.IsRecursive, req.IsDeleteData)
- return &filer_pb.DeleteEntryResponse{}, err
+
+ glog.V(4).Infof("DeleteEntry %v", req)
+
+ err = fs.filer.DeleteEntryMetaAndData(ctx, util.JoinPath(req.Directory, req.Name), req.IsRecursive, req.IgnoreRecursiveError, req.IsDeleteData, req.IsFromOtherCluster)
+ resp = &filer_pb.DeleteEntryResponse{}
+ if err != nil {
+ resp.Error = err.Error()
+ }
+ return resp, nil
}
func (fs *FilerServer) AssignVolume(ctx context.Context, req *filer_pb.AssignVolumeRequest) (resp *filer_pb.AssignVolumeResponse, err error) {
@@ -196,6 +277,7 @@ func (fs *FilerServer) AssignVolume(ctx context.Context, req *filer_pb.AssignVol
if req.TtlSec > 0 {
ttlStr = strconv.Itoa(int(req.TtlSec))
}
+ collection, replication, _ := fs.detectCollection(req.ParentPath, req.Collection, req.Replication)
var altRequest *operation.VolumeAssignRequest
@@ -206,54 +288,73 @@ func (fs *FilerServer) AssignVolume(ctx context.Context, req *filer_pb.AssignVol
assignRequest := &operation.VolumeAssignRequest{
Count: uint64(req.Count),
- Replication: req.Replication,
- Collection: req.Collection,
+ Replication: replication,
+ Collection: collection,
Ttl: ttlStr,
DataCenter: dataCenter,
}
if dataCenter != "" {
altRequest = &operation.VolumeAssignRequest{
Count: uint64(req.Count),
- Replication: req.Replication,
- Collection: req.Collection,
+ Replication: replication,
+ Collection: collection,
Ttl: ttlStr,
DataCenter: "",
}
}
- assignResult, err := operation.Assign(fs.filer.GetMaster(), assignRequest, altRequest)
+ assignResult, err := operation.Assign(fs.filer.GetMaster(), fs.grpcDialOption, assignRequest, altRequest)
if err != nil {
- return nil, fmt.Errorf("assign volume: %v", err)
+ glog.V(3).Infof("AssignVolume: %v", err)
+ return &filer_pb.AssignVolumeResponse{Error: fmt.Sprintf("assign volume: %v", err)}, nil
}
if assignResult.Error != "" {
- return nil, fmt.Errorf("assign volume result: %v", assignResult.Error)
+ glog.V(3).Infof("AssignVolume error: %v", assignResult.Error)
+ return &filer_pb.AssignVolumeResponse{Error: fmt.Sprintf("assign volume result: %v", assignResult.Error)}, nil
}
return &filer_pb.AssignVolumeResponse{
- FileId: assignResult.Fid,
- Count: int32(assignResult.Count),
- Url: assignResult.Url,
- PublicUrl: assignResult.PublicUrl,
- }, err
+ FileId: assignResult.Fid,
+ Count: int32(assignResult.Count),
+ Url: assignResult.Url,
+ PublicUrl: assignResult.PublicUrl,
+ Auth: string(assignResult.Auth),
+ Collection: collection,
+ Replication: replication,
+ }, nil
}
func (fs *FilerServer) DeleteCollection(ctx context.Context, req *filer_pb.DeleteCollectionRequest) (resp *filer_pb.DeleteCollectionResponse, err error) {
- for _, master := range fs.option.Masters {
- _, err = util.Get(fmt.Sprintf("http://%s/col/delete?collection=%s", master, req.Collection))
- }
+ glog.V(4).Infof("DeleteCollection %v", req)
+
+ err = fs.filer.MasterClient.WithClient(func(client master_pb.SeaweedClient) error {
+ _, err := client.CollectionDelete(context.Background(), &master_pb.CollectionDeleteRequest{
+ Name: req.GetCollection(),
+ })
+ return err
+ })
return &filer_pb.DeleteCollectionResponse{}, err
}
func (fs *FilerServer) Statistics(ctx context.Context, req *filer_pb.StatisticsRequest) (resp *filer_pb.StatisticsResponse, err error) {
- input := &master_pb.StatisticsRequest{
- Replication: req.Replication,
- Collection: req.Collection,
- Ttl: req.Ttl,
- }
+ var output *master_pb.StatisticsResponse
+
+ err = fs.filer.MasterClient.WithClient(func(masterClient master_pb.SeaweedClient) error {
+ grpcResponse, grpcErr := masterClient.Statistics(context.Background(), &master_pb.StatisticsRequest{
+ Replication: req.Replication,
+ Collection: req.Collection,
+ Ttl: req.Ttl,
+ })
+ if grpcErr != nil {
+ return grpcErr
+ }
+
+ output = grpcResponse
+ return nil
+ })
- output, err := operation.Statistics(fs.filer.GetMaster(), input)
if err != nil {
return nil, err
}
@@ -264,3 +365,91 @@ func (fs *FilerServer) Statistics(ctx context.Context, req *filer_pb.StatisticsR
FileCount: output.FileCount,
}, nil
}
+
+func (fs *FilerServer) GetFilerConfiguration(ctx context.Context, req *filer_pb.GetFilerConfigurationRequest) (resp *filer_pb.GetFilerConfigurationResponse, err error) {
+
+ t := &filer_pb.GetFilerConfigurationResponse{
+ Masters: fs.option.Masters,
+ Collection: fs.option.Collection,
+ Replication: fs.option.DefaultReplication,
+ MaxMb: uint32(fs.option.MaxMB),
+ DirBuckets: fs.filer.DirBucketsPath,
+ Cipher: fs.filer.Cipher,
+ }
+
+ glog.V(4).Infof("GetFilerConfiguration: %v", t)
+
+ return t, nil
+}
+
+func (fs *FilerServer) KeepConnected(stream filer_pb.SeaweedFiler_KeepConnectedServer) error {
+
+ req, err := stream.Recv()
+ if err != nil {
+ return err
+ }
+
+ clientName := fmt.Sprintf("%s:%d", req.Name, req.GrpcPort)
+ m := make(map[string]bool)
+ for _, tp := range req.Resources {
+ m[tp] = true
+ }
+ fs.brokersLock.Lock()
+ fs.brokers[clientName] = m
+ glog.V(0).Infof("+ broker %v", clientName)
+ fs.brokersLock.Unlock()
+
+ defer func() {
+ fs.brokersLock.Lock()
+ delete(fs.brokers, clientName)
+ glog.V(0).Infof("- broker %v: %v", clientName, err)
+ fs.brokersLock.Unlock()
+ }()
+
+ for {
+ if err := stream.Send(&filer_pb.KeepConnectedResponse{}); err != nil {
+ glog.V(0).Infof("send broker %v: %+v", clientName, err)
+ return err
+ }
+ // println("replied")
+
+ if _, err := stream.Recv(); err != nil {
+ glog.V(0).Infof("recv broker %v: %v", clientName, err)
+ return err
+ }
+ // println("received")
+ }
+
+}
+
+func (fs *FilerServer) LocateBroker(ctx context.Context, req *filer_pb.LocateBrokerRequest) (resp *filer_pb.LocateBrokerResponse, err error) {
+
+ resp = &filer_pb.LocateBrokerResponse{}
+
+ fs.brokersLock.Lock()
+ defer fs.brokersLock.Unlock()
+
+ var localBrokers []*filer_pb.LocateBrokerResponse_Resource
+
+ for b, m := range fs.brokers {
+ if _, found := m[req.Resource]; found {
+ resp.Found = true
+ resp.Resources = []*filer_pb.LocateBrokerResponse_Resource{
+ {
+ GrpcAddresses: b,
+ ResourceCount: int32(len(m)),
+ },
+ }
+ return
+ }
+ localBrokers = append(localBrokers, &filer_pb.LocateBrokerResponse_Resource{
+ GrpcAddresses: b,
+ ResourceCount: int32(len(m)),
+ })
+ }
+
+ resp.Resources = localBrokers
+
+ return resp, nil
+
+}
diff --git a/weed/server/filer_grpc_server_rename.go b/weed/server/filer_grpc_server_rename.go
new file mode 100644
index 000000000..9642fec24
--- /dev/null
+++ b/weed/server/filer_grpc_server_rename.go
@@ -0,0 +1,141 @@
+package weed_server
+
+import (
+ "context"
+ "fmt"
+ "path/filepath"
+
+ "github.com/chrislusf/seaweedfs/weed/filer2"
+ "github.com/chrislusf/seaweedfs/weed/glog"
+ "github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
+ "github.com/chrislusf/seaweedfs/weed/util"
+)
+
+func (fs *FilerServer) AtomicRenameEntry(ctx context.Context, req *filer_pb.AtomicRenameEntryRequest) (*filer_pb.AtomicRenameEntryResponse, error) {
+
+ glog.V(1).Infof("AtomicRenameEntry %v", req)
+
+ ctx, err := fs.filer.BeginTransaction(ctx)
+ if err != nil {
+ return nil, err
+ }
+
+ oldParent := util.FullPath(filepath.ToSlash(req.OldDirectory))
+
+ oldEntry, err := fs.filer.FindEntry(ctx, oldParent.Child(req.OldName))
+ if err != nil {
+ fs.filer.RollbackTransaction(ctx)
+ return nil, fmt.Errorf("%s/%s not found: %v", req.OldDirectory, req.OldName, err)
+ }
+
+ var events MoveEvents
+ moveErr := fs.moveEntry(ctx, oldParent, oldEntry, util.FullPath(filepath.ToSlash(req.NewDirectory)), req.NewName, &events)
+ if moveErr != nil {
+ fs.filer.RollbackTransaction(ctx)
+ return nil, fmt.Errorf("%s/%s move error: %v", req.OldDirectory, req.OldName, moveErr)
+ } else {
+ if commitError := fs.filer.CommitTransaction(ctx); commitError != nil {
+ fs.filer.RollbackTransaction(ctx)
+ return nil, fmt.Errorf("%s/%s move commit error: %v", req.OldDirectory, req.OldName, commitError)
+ }
+ }
+
+ return &filer_pb.AtomicRenameEntryResponse{}, nil
+}
+
+func (fs *FilerServer) moveEntry(ctx context.Context, oldParent util.FullPath, entry *filer2.Entry, newParent util.FullPath, newName string, events *MoveEvents) error {
+
+ if err := fs.moveSelfEntry(ctx, oldParent, entry, newParent, newName, events, func() error {
+ if entry.IsDirectory() {
+ if err := fs.moveFolderSubEntries(ctx, oldParent, entry, newParent, newName, events); err != nil {
+ return err
+ }
+ }
+ return nil
+ }); err != nil {
+ return fmt.Errorf("fail to move %s => %s: %v", oldParent.Child(entry.Name()), newParent.Child(newName), err)
+ }
+
+ return nil
+}
+
+func (fs *FilerServer) moveFolderSubEntries(ctx context.Context, oldParent util.FullPath, entry *filer2.Entry, newParent util.FullPath, newName string, events *MoveEvents) error {
+
+ currentDirPath := oldParent.Child(entry.Name())
+ newDirPath := newParent.Child(newName)
+
+ glog.V(1).Infof("moving folder %s => %s", currentDirPath, newDirPath)
+
+ lastFileName := ""
+ includeLastFile := false
+ for {
+
+ entries, err := fs.filer.ListDirectoryEntries(ctx, currentDirPath, lastFileName, includeLastFile, 1024)
+ if err != nil {
+ return err
+ }
+
+ // println("found", len(entries), "entries under", currentDirPath)
+
+ for _, item := range entries {
+ lastFileName = item.Name()
+ // println("processing", lastFileName)
+ err := fs.moveEntry(ctx, currentDirPath, item, newDirPath, item.Name(), events)
+ if err != nil {
+ return err
+ }
+ }
+ if len(entries) < 1024 {
+ break
+ }
+ }
+ return nil
+}
+
+func (fs *FilerServer) moveSelfEntry(ctx context.Context, oldParent util.FullPath, entry *filer2.Entry, newParent util.FullPath, newName string, events *MoveEvents,
+ moveFolderSubEntries func() error) error {
+
+ oldPath, newPath := oldParent.Child(entry.Name()), newParent.Child(newName)
+
+ glog.V(1).Infof("moving entry %s => %s", oldPath, newPath)
+
+ if oldPath == newPath {
+ glog.V(1).Infof("skip moving entry %s => %s", oldPath, newPath)
+ return nil
+ }
+
+ // add to new directory
+ newEntry := &filer2.Entry{
+ FullPath: newPath,
+ Attr: entry.Attr,
+ Chunks: entry.Chunks,
+ }
+ createErr := fs.filer.CreateEntry(ctx, newEntry, false, false)
+ if createErr != nil {
+ return createErr
+ }
+
+ events.newEntries = append(events.newEntries, newEntry)
+
+ if moveFolderSubEntries != nil {
+ if moveChildrenErr := moveFolderSubEntries(); moveChildrenErr != nil {
+ return moveChildrenErr
+ }
+ }
+
+ // delete old entry
+ deleteErr := fs.filer.DeleteEntryMetaAndData(ctx, oldPath, false, false, false, false)
+ if deleteErr != nil {
+ return deleteErr
+ }
+
+ events.oldEntries = append(events.oldEntries, entry)
+
+ return nil
+
+}
+
+type MoveEvents struct {
+ oldEntries []*filer2.Entry
+ newEntries []*filer2.Entry
+}
diff --git a/weed/server/filer_grpc_server_sub_meta.go b/weed/server/filer_grpc_server_sub_meta.go
new file mode 100644
index 000000000..8ef75cf02
--- /dev/null
+++ b/weed/server/filer_grpc_server_sub_meta.go
@@ -0,0 +1,136 @@
+package weed_server
+
+import (
+ "fmt"
+ "strings"
+ "time"
+
+ "github.com/golang/protobuf/proto"
+
+ "github.com/chrislusf/seaweedfs/weed/filer2"
+ "github.com/chrislusf/seaweedfs/weed/glog"
+ "github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
+ "github.com/chrislusf/seaweedfs/weed/util"
+)
+
+func (fs *FilerServer) SubscribeMetadata(req *filer_pb.SubscribeMetadataRequest, stream filer_pb.SeaweedFiler_SubscribeMetadataServer) error {
+
+ peerAddress := findClientAddress(stream.Context(), 0)
+
+ clientName := fs.addClient(req.ClientName, peerAddress)
+
+ defer fs.deleteClient(clientName)
+
+ lastReadTime := time.Unix(0, req.SinceNs)
+ glog.V(0).Infof(" %v starts to subscribe %s from %+v", clientName, req.PathPrefix, lastReadTime)
+
+ eachEventNotificationFn := eachEventNotificationFn(req, stream, clientName)
+
+ eachLogEntryFn := eachLogEntryFn(eachEventNotificationFn)
+
+ processedTsNs, err := fs.filer.ReadPersistedLogBuffer(lastReadTime, eachLogEntryFn)
+ if err != nil {
+ return fmt.Errorf("reading from persisted logs: %v", err)
+ }
+
+ if processedTsNs != 0 {
+ lastReadTime = time.Unix(0, processedTsNs)
+ }
+
+ err = fs.metaAggregator.MetaLogBuffer.LoopProcessLogData(lastReadTime, func() bool {
+ fs.metaAggregator.ListenersLock.Lock()
+ fs.metaAggregator.ListenersCond.Wait()
+ fs.metaAggregator.ListenersLock.Unlock()
+ return true
+ }, eachLogEntryFn)
+
+ return err
+
+}
+
+func (fs *FilerServer) SubscribeLocalMetadata(req *filer_pb.SubscribeMetadataRequest, stream filer_pb.SeaweedFiler_SubscribeLocalMetadataServer) error {
+
+ peerAddress := findClientAddress(stream.Context(), 0)
+
+ clientName := fs.addClient(req.ClientName, peerAddress)
+
+ defer fs.deleteClient(clientName)
+
+ lastReadTime := time.Unix(0, req.SinceNs)
+ glog.V(0).Infof(" %v local subscribe %s from %+v", clientName, req.PathPrefix, lastReadTime)
+
+ eachEventNotificationFn := eachEventNotificationFn(req, stream, clientName)
+
+ eachLogEntryFn := eachLogEntryFn(eachEventNotificationFn)
+
+ err := fs.filer.LocalMetaLogBuffer.LoopProcessLogData(lastReadTime, func() bool {
+ fs.listenersLock.Lock()
+ fs.listenersCond.Wait()
+ fs.listenersLock.Unlock()
+ return true
+ }, eachLogEntryFn)
+
+ return err
+
+}
+
+func eachLogEntryFn(eachEventNotificationFn func(dirPath string, eventNotification *filer_pb.EventNotification, tsNs int64) error) func(logEntry *filer_pb.LogEntry) error {
+ return func(logEntry *filer_pb.LogEntry) error {
+ event := &filer_pb.SubscribeMetadataResponse{}
+ if err := proto.Unmarshal(logEntry.Data, event); err != nil {
+ glog.Errorf("unexpected unmarshal filer_pb.SubscribeMetadataResponse: %v", err)
+ return fmt.Errorf("unexpected unmarshal filer_pb.SubscribeMetadataResponse: %v", err)
+ }
+
+ if err := eachEventNotificationFn(event.Directory, event.EventNotification, event.TsNs); err != nil {
+ return err
+ }
+
+ return nil
+ }
+}
+
+func eachEventNotificationFn(req *filer_pb.SubscribeMetadataRequest, stream filer_pb.SeaweedFiler_SubscribeMetadataServer, clientName string) func(dirPath string, eventNotification *filer_pb.EventNotification, tsNs int64) error {
+ return func(dirPath string, eventNotification *filer_pb.EventNotification, tsNs int64) error {
+
+ // get complete path to the file or directory
+ var entryName string
+ if eventNotification.OldEntry != nil {
+ entryName = eventNotification.OldEntry.Name
+ } else if eventNotification.NewEntry != nil {
+ entryName = eventNotification.NewEntry.Name
+ }
+
+ fullpath := util.Join(dirPath, entryName)
+
+ // skip on filer internal meta logs
+ if strings.HasPrefix(fullpath, filer2.SystemLogDir) {
+ return nil
+ }
+
+ if !strings.HasPrefix(fullpath, req.PathPrefix) {
+ return nil
+ }
+
+ message := &filer_pb.SubscribeMetadataResponse{
+ Directory: dirPath,
+ EventNotification: eventNotification,
+ TsNs: tsNs,
+ }
+ if err := stream.Send(message); err != nil {
+ glog.V(0).Infof("=> client %v: %+v", clientName, err)
+ return err
+ }
+ return nil
+ }
+}
+
+func (fs *FilerServer) addClient(clientType string, clientAddress string) (clientName string) {
+ clientName = clientType + "@" + clientAddress
+ glog.V(0).Infof("+ listener %v", clientName)
+ return
+}
+
+func (fs *FilerServer) deleteClient(clientName string) {
+ glog.V(0).Infof("- listener %v", clientName)
+}
diff --git a/weed/server/filer_server.go b/weed/server/filer_server.go
index 9d70e4dac..c6ab6ef0f 100644
--- a/weed/server/filer_server.go
+++ b/weed/server/filer_server.go
@@ -1,112 +1,185 @@
package weed_server
import (
+ "context"
+ "fmt"
"net/http"
"os"
+ "strings"
+ "sync"
+ "time"
+
+ "google.golang.org/grpc"
+
+ "github.com/chrislusf/seaweedfs/weed/util/grace"
+
+ "github.com/chrislusf/seaweedfs/weed/operation"
+ "github.com/chrislusf/seaweedfs/weed/pb"
+ "github.com/chrislusf/seaweedfs/weed/pb/master_pb"
+ "github.com/chrislusf/seaweedfs/weed/stats"
+ "github.com/chrislusf/seaweedfs/weed/util"
"github.com/chrislusf/seaweedfs/weed/filer2"
_ "github.com/chrislusf/seaweedfs/weed/filer2/cassandra"
+ _ "github.com/chrislusf/seaweedfs/weed/filer2/etcd"
_ "github.com/chrislusf/seaweedfs/weed/filer2/leveldb"
- _ "github.com/chrislusf/seaweedfs/weed/filer2/memdb"
+ _ "github.com/chrislusf/seaweedfs/weed/filer2/leveldb2"
+ _ "github.com/chrislusf/seaweedfs/weed/filer2/mongodb"
_ "github.com/chrislusf/seaweedfs/weed/filer2/mysql"
_ "github.com/chrislusf/seaweedfs/weed/filer2/postgres"
_ "github.com/chrislusf/seaweedfs/weed/filer2/redis"
+ _ "github.com/chrislusf/seaweedfs/weed/filer2/redis2"
"github.com/chrislusf/seaweedfs/weed/glog"
"github.com/chrislusf/seaweedfs/weed/notification"
_ "github.com/chrislusf/seaweedfs/weed/notification/aws_sqs"
+ _ "github.com/chrislusf/seaweedfs/weed/notification/gocdk_pub_sub"
_ "github.com/chrislusf/seaweedfs/weed/notification/google_pub_sub"
_ "github.com/chrislusf/seaweedfs/weed/notification/kafka"
_ "github.com/chrislusf/seaweedfs/weed/notification/log"
"github.com/chrislusf/seaweedfs/weed/security"
- "github.com/spf13/viper"
)
type FilerOption struct {
Masters []string
Collection string
DefaultReplication string
- RedirectOnRead bool
DisableDirListing bool
MaxMB int
- SecretKey string
DirListingLimit int
DataCenter string
DefaultLevelDbDir string
+ DisableHttp bool
+ Host string
+ Port uint32
+ recursiveDelete bool
+ Cipher bool
+ Filers []string
}
type FilerServer struct {
- option *FilerOption
- secret security.Secret
- filer *filer2.Filer
+ option *FilerOption
+ secret security.SigningKey
+ filer *filer2.Filer
+ metaAggregator *filer2.MetaAggregator
+ grpcDialOption grpc.DialOption
+
+ // notifying clients
+ listenersLock sync.Mutex
+ listenersCond *sync.Cond
+
+ brokers map[string]map[string]bool
+ brokersLock sync.Mutex
}
func NewFilerServer(defaultMux, readonlyMux *http.ServeMux, option *FilerOption) (fs *FilerServer, err error) {
fs = &FilerServer{
- option: option,
+ option: option,
+ grpcDialOption: security.LoadClientTLS(util.GetViper(), "grpc.filer"),
+ brokers: make(map[string]map[string]bool),
}
+ fs.listenersCond = sync.NewCond(&fs.listenersLock)
if len(option.Masters) == 0 {
glog.Fatal("master list is required!")
}
- fs.filer = filer2.NewFiler(option.Masters)
+ fs.filer = filer2.NewFiler(option.Masters, fs.grpcDialOption, option.Host, option.Port, option.Collection, option.DefaultReplication, func() {
+ fs.listenersCond.Broadcast()
+ })
+ fs.filer.Cipher = option.Cipher
+
+ maybeStartMetrics(fs, option)
go fs.filer.KeepConnectedToMaster()
- v := viper.GetViper()
- if !LoadConfiguration("filer", false) {
- v.Set("leveldb.enabled", true)
- v.Set("leveldb.dir", option.DefaultLevelDbDir)
+ v := util.GetViper()
+ if !util.LoadConfiguration("filer", false) {
+ v.Set("leveldb2.enabled", true)
+ v.Set("leveldb2.dir", option.DefaultLevelDbDir)
_, err := os.Stat(option.DefaultLevelDbDir)
if os.IsNotExist(err) {
os.MkdirAll(option.DefaultLevelDbDir, 0755)
}
+ glog.V(0).Infof("default to create filer store dir in %s", option.DefaultLevelDbDir)
}
- LoadConfiguration("notification", false)
+ util.LoadConfiguration("notification", false)
+ fs.option.recursiveDelete = v.GetBool("filer.options.recursive_delete")
+ v.SetDefault("filer.options.buckets_folder", "/buckets")
+ fs.filer.DirBucketsPath = v.GetString("filer.options.buckets_folder")
+ fs.filer.FsyncBuckets = v.GetStringSlice("filer.options.buckets_fsync")
fs.filer.LoadConfiguration(v)
- notification.LoadConfiguration(v.Sub("notification"))
+ notification.LoadConfiguration(v, "notification.")
handleStaticResources(defaultMux)
- defaultMux.HandleFunc("/", fs.filerHandler)
+ if !option.DisableHttp {
+ defaultMux.HandleFunc("/", fs.filerHandler)
+ }
if defaultMux != readonlyMux {
readonlyMux.HandleFunc("/", fs.readonlyFilerHandler)
}
+ // set peers
+ if strings.HasPrefix(fs.filer.GetStore().GetName(), "leveldb") && len(option.Filers) > 0 {
+ glog.Fatalf("filers using separate leveldb stores should not configure %d peers %+v", len(option.Filers), option.Filers)
+ }
+ if len(option.Filers) == 0 {
+ option.Filers = append(option.Filers, fmt.Sprintf("%s:%d", option.Host, option.Port))
+ }
+ fs.metaAggregator = filer2.NewMetaAggregator(option.Filers, fs.grpcDialOption)
+ fs.metaAggregator.StartLoopSubscribe(time.Now().UnixNano())
+
+ fs.filer.LoadBuckets()
+
+ grace.OnInterrupt(func() {
+ fs.filer.Shutdown()
+ })
+
return fs, nil
}
-func (fs *FilerServer) jwt(fileId string) security.EncodedJwt {
- return security.GenJwt(fs.secret, fileId)
-}
+func maybeStartMetrics(fs *FilerServer, option *FilerOption) {
-func LoadConfiguration(configFileName string, required bool) (loaded bool) {
-
- // find a filer store
- viper.SetConfigName(configFileName) // name of config file (without extension)
- viper.AddConfigPath(".") // optionally look for config in the working directory
- viper.AddConfigPath("$HOME/.seaweedfs") // call multiple times to add many search paths
- viper.AddConfigPath("/etc/seaweedfs/") // path to look for the config file in
-
- glog.V(0).Infof("Reading %s.toml from %s", configFileName, viper.ConfigFileUsed())
-
- if err := viper.MergeInConfig(); err != nil { // Handle errors reading the config file
- glog.V(0).Infof("Reading %s: %v", viper.ConfigFileUsed(), err)
- if required {
- glog.Fatalf("Failed to load %s.toml file from current directory, or $HOME/.seaweedfs/, or /etc/seaweedfs/"+
- "\n\nPlease follow this example and add a filer.toml file to "+
- "current directory, or $HOME/.seaweedfs/, or /etc/seaweedfs/:\n"+
- " https://github.com/chrislusf/seaweedfs/blob/master/weed/%s.toml\n"+
- "\nOr use this command to generate the default toml file\n"+
- " weed scaffold -config=%s -output=.\n\n\n",
- configFileName, configFileName, configFileName)
- } else {
- return false
+ for _, master := range option.Masters {
+ _, err := pb.ParseFilerGrpcAddress(master)
+ if err != nil {
+ glog.Fatalf("invalid master address %s: %v", master, err)
}
}
- return true
+ isConnected := false
+ var metricsAddress string
+ var metricsIntervalSec int
+ var readErr error
+ for !isConnected {
+ for _, master := range option.Masters {
+ metricsAddress, metricsIntervalSec, readErr = readFilerConfiguration(fs.grpcDialOption, master)
+ if readErr == nil {
+ isConnected = true
+ } else {
+ time.Sleep(7 * time.Second)
+ }
+ }
+ }
+ if metricsAddress == "" && metricsIntervalSec <= 0 {
+ return
+ }
+ go stats.LoopPushingMetric("filer", stats.SourceName(option.Port), stats.FilerGather,
+ func() (addr string, intervalSeconds int) {
+ return metricsAddress, metricsIntervalSec
+ })
+}
+func readFilerConfiguration(grpcDialOption grpc.DialOption, masterAddress string) (metricsAddress string, metricsIntervalSec int, err error) {
+ err = operation.WithMasterServerClient(masterAddress, grpcDialOption, func(masterClient master_pb.SeaweedClient) error {
+ resp, err := masterClient.GetMasterConfiguration(context.Background(), &master_pb.GetMasterConfigurationRequest{})
+ if err != nil {
+ return fmt.Errorf("get master %s configuration: %v", masterAddress, err)
+ }
+ metricsAddress, metricsIntervalSec = resp.MetricsAddress, int(resp.MetricsIntervalSeconds)
+ return nil
+ })
+ return
}
diff --git a/weed/server/filer_server_handlers.go b/weed/server/filer_server_handlers.go
index d76d7df8c..b6bfc3b04 100644
--- a/weed/server/filer_server_handlers.go
+++ b/weed/server/filer_server_handlers.go
@@ -2,28 +2,47 @@ package weed_server
import (
"net/http"
+ "time"
+
+ "github.com/chrislusf/seaweedfs/weed/stats"
)
func (fs *FilerServer) filerHandler(w http.ResponseWriter, r *http.Request) {
+ start := time.Now()
switch r.Method {
case "GET":
+ stats.FilerRequestCounter.WithLabelValues("get").Inc()
fs.GetOrHeadHandler(w, r, true)
+ stats.FilerRequestHistogram.WithLabelValues("get").Observe(time.Since(start).Seconds())
case "HEAD":
+ stats.FilerRequestCounter.WithLabelValues("head").Inc()
fs.GetOrHeadHandler(w, r, false)
+ stats.FilerRequestHistogram.WithLabelValues("head").Observe(time.Since(start).Seconds())
case "DELETE":
+ stats.FilerRequestCounter.WithLabelValues("delete").Inc()
fs.DeleteHandler(w, r)
+ stats.FilerRequestHistogram.WithLabelValues("delete").Observe(time.Since(start).Seconds())
case "PUT":
+ stats.FilerRequestCounter.WithLabelValues("put").Inc()
fs.PostHandler(w, r)
+ stats.FilerRequestHistogram.WithLabelValues("put").Observe(time.Since(start).Seconds())
case "POST":
+ stats.FilerRequestCounter.WithLabelValues("post").Inc()
fs.PostHandler(w, r)
+ stats.FilerRequestHistogram.WithLabelValues("post").Observe(time.Since(start).Seconds())
}
}
func (fs *FilerServer) readonlyFilerHandler(w http.ResponseWriter, r *http.Request) {
+ start := time.Now()
switch r.Method {
case "GET":
+ stats.FilerRequestCounter.WithLabelValues("get").Inc()
fs.GetOrHeadHandler(w, r, true)
+ stats.FilerRequestHistogram.WithLabelValues("get").Observe(time.Since(start).Seconds())
case "HEAD":
+ stats.FilerRequestCounter.WithLabelValues("head").Inc()
fs.GetOrHeadHandler(w, r, false)
+ stats.FilerRequestHistogram.WithLabelValues("head").Observe(time.Since(start).Seconds())
}
}
diff --git a/weed/server/filer_server_handlers_read.go b/weed/server/filer_server_handlers_read.go
index 226de640c..76c924df1 100644
--- a/weed/server/filer_server_handlers_read.go
+++ b/weed/server/filer_server_handlers_read.go
@@ -1,34 +1,47 @@
package weed_server
import (
+ "bytes"
+ "context"
"io"
"mime"
- "mime/multipart"
"net/http"
- "net/url"
- "path"
+ "path/filepath"
"strconv"
"strings"
+ "time"
"github.com/chrislusf/seaweedfs/weed/filer2"
"github.com/chrislusf/seaweedfs/weed/glog"
+ "github.com/chrislusf/seaweedfs/weed/images"
+ "github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
+ "github.com/chrislusf/seaweedfs/weed/stats"
"github.com/chrislusf/seaweedfs/weed/util"
)
func (fs *FilerServer) GetOrHeadHandler(w http.ResponseWriter, r *http.Request, isGetMethod bool) {
+
path := r.URL.Path
- if strings.HasSuffix(path, "/") && len(path) > 1 {
+ isForDirectory := strings.HasSuffix(path, "/")
+ if isForDirectory && len(path) > 1 {
path = path[:len(path)-1]
}
- entry, err := fs.filer.FindEntry(filer2.FullPath(path))
+ entry, err := fs.filer.FindEntry(context.Background(), util.FullPath(path))
if err != nil {
if path == "/" {
fs.listDirectoryHandler(w, r)
return
}
- glog.V(1).Infof("Not found %s: %v", path, err)
- w.WriteHeader(http.StatusNotFound)
+ if err == filer_pb.ErrNotFound {
+ glog.V(1).Infof("Not found %s: %v", path, err)
+ stats.FilerRequestCounter.WithLabelValues("read.notfound").Inc()
+ w.WriteHeader(http.StatusNotFound)
+ } else {
+ glog.V(0).Infof("Internal %s: %v", path, err)
+ stats.FilerRequestCounter.WithLabelValues("read.internalerror").Inc()
+ w.WriteHeader(http.StatusInternalServerError)
+ }
return
}
@@ -41,212 +54,81 @@ func (fs *FilerServer) GetOrHeadHandler(w http.ResponseWriter, r *http.Request,
return
}
- if len(entry.Chunks) == 0 {
- glog.V(1).Infof("no file chunks for %s, attr=%+v", path, entry.Attr)
- w.WriteHeader(http.StatusNoContent)
- return
- }
-
- w.Header().Set("Accept-Ranges", "bytes")
- if r.Method == "HEAD" {
- w.Header().Set("Content-Length", strconv.FormatInt(int64(filer2.TotalSize(entry.Chunks)), 10))
- w.Header().Set("Last-Modified", entry.Attr.Mtime.Format(http.TimeFormat))
- return
- }
-
- if len(entry.Chunks) == 1 {
- fs.handleSingleChunk(w, r, entry)
- return
- }
-
- fs.handleMultipleChunks(w, r, entry)
-
-}
-
-func (fs *FilerServer) handleSingleChunk(w http.ResponseWriter, r *http.Request, entry *filer2.Entry) {
-
- fileId := entry.Chunks[0].FileId
-
- urlString, err := fs.filer.MasterClient.LookupFileId(fileId)
- if err != nil {
- glog.V(1).Infof("operation LookupFileId %s failed, err: %v", fileId, err)
+ if isForDirectory {
w.WriteHeader(http.StatusNotFound)
return
}
- if fs.option.RedirectOnRead {
- http.Redirect(w, r, urlString, http.StatusFound)
- return
- }
-
- u, _ := url.Parse(urlString)
- q := u.Query()
- for key, values := range r.URL.Query() {
- for _, value := range values {
- q.Add(key, value)
- }
- }
- u.RawQuery = q.Encode()
- request := &http.Request{
- Method: r.Method,
- URL: u,
- Proto: r.Proto,
- ProtoMajor: r.ProtoMajor,
- ProtoMinor: r.ProtoMinor,
- Header: r.Header,
- Body: r.Body,
- Host: r.Host,
- ContentLength: r.ContentLength,
- }
- glog.V(3).Infoln("retrieving from", u)
- resp, do_err := util.Do(request)
- if do_err != nil {
- glog.V(0).Infoln("failing to connect to volume server", do_err.Error())
- writeJsonError(w, r, http.StatusInternalServerError, do_err)
+ if len(entry.Chunks) == 0 {
+ glog.V(1).Infof("no file chunks for %s, attr=%+v", path, entry.Attr)
+ stats.FilerRequestCounter.WithLabelValues("read.nocontent").Inc()
+ w.WriteHeader(http.StatusNoContent)
return
}
- defer resp.Body.Close()
- for k, v := range resp.Header {
- w.Header()[k] = v
- }
- w.WriteHeader(resp.StatusCode)
- io.Copy(w, resp.Body)
-}
-func (fs *FilerServer) handleMultipleChunks(w http.ResponseWriter, r *http.Request, entry *filer2.Entry) {
+ w.Header().Set("Accept-Ranges", "bytes")
+ w.Header().Set("Last-Modified", entry.Attr.Mtime.Format(http.TimeFormat))
- mimeType := entry.Mime
+ // mime type
+ mimeType := entry.Attr.Mime
if mimeType == "" {
- if ext := path.Ext(entry.Name()); ext != "" {
+ if ext := filepath.Ext(entry.Name()); ext != "" {
mimeType = mime.TypeByExtension(ext)
}
}
if mimeType != "" {
w.Header().Set("Content-Type", mimeType)
}
- setEtag(w, filer2.ETag(entry.Chunks))
- totalSize := int64(filer2.TotalSize(entry.Chunks))
-
- rangeReq := r.Header.Get("Range")
-
- if rangeReq == "" {
- w.Header().Set("Content-Length", strconv.FormatInt(totalSize, 10))
- if err := fs.writeContent(w, entry, 0, int(totalSize)); err != nil {
- http.Error(w, err.Error(), http.StatusInternalServerError)
- return
+ // if modified since
+ if !entry.Attr.Mtime.IsZero() {
+ w.Header().Set("Last-Modified", entry.Attr.Mtime.UTC().Format(http.TimeFormat))
+ if r.Header.Get("If-Modified-Since") != "" {
+ if t, parseError := time.Parse(http.TimeFormat, r.Header.Get("If-Modified-Since")); parseError == nil {
+ if t.After(entry.Attr.Mtime) {
+ w.WriteHeader(http.StatusNotModified)
+ return
+ }
+ }
}
- return
}
- //the rest is dealing with partial content request
- //mostly copy from src/pkg/net/http/fs.go
- ranges, err := parseRange(rangeReq, totalSize)
- if err != nil {
- http.Error(w, err.Error(), http.StatusRequestedRangeNotSatisfiable)
- return
- }
- if sumRangesSize(ranges) > totalSize {
- // The total number of bytes in all the ranges
- // is larger than the size of the file by
- // itself, so this is probably an attack, or a
- // dumb client. Ignore the range request.
- return
- }
- if len(ranges) == 0 {
- return
- }
- if len(ranges) == 1 {
- // RFC 2616, Section 14.16:
- // "When an HTTP message includes the content of a single
- // range (for example, a response to a request for a
- // single range, or to a request for a set of ranges
- // that overlap without any holes), this content is
- // transmitted with a Content-Range header, and a
- // Content-Length header showing the number of bytes
- // actually transferred.
- // ...
- // A response to a request for a single range MUST NOT
- // be sent using the multipart/byteranges media type."
- ra := ranges[0]
- w.Header().Set("Content-Length", strconv.FormatInt(ra.length, 10))
- w.Header().Set("Content-Range", ra.contentRange(totalSize))
- w.WriteHeader(http.StatusPartialContent)
-
- err = fs.writeContent(w, entry, ra.start, int(ra.length))
- if err != nil {
- http.Error(w, err.Error(), http.StatusInternalServerError)
- return
- }
+ // set etag
+ etag := filer2.ETagEntry(entry)
+ if inm := r.Header.Get("If-None-Match"); inm == "\""+etag+"\"" {
+ w.WriteHeader(http.StatusNotModified)
return
}
+ setEtag(w, etag)
- // process multiple ranges
- for _, ra := range ranges {
- if ra.start > totalSize {
- http.Error(w, "Out of Range", http.StatusRequestedRangeNotSatisfiable)
- return
- }
- }
- sendSize := rangesMIMESize(ranges, mimeType, totalSize)
- pr, pw := io.Pipe()
- mw := multipart.NewWriter(pw)
- w.Header().Set("Content-Type", "multipart/byteranges; boundary="+mw.Boundary())
- sendContent := pr
- defer pr.Close() // cause writing goroutine to fail and exit if CopyN doesn't finish.
- go func() {
- for _, ra := range ranges {
- part, e := mw.CreatePart(ra.mimeHeader(mimeType, totalSize))
- if e != nil {
- pw.CloseWithError(e)
- return
- }
- if e = fs.writeContent(part, entry, ra.start, int(ra.length)); e != nil {
- pw.CloseWithError(e)
- return
- }
- }
- mw.Close()
- pw.Close()
- }()
- if w.Header().Get("Content-Encoding") == "" {
- w.Header().Set("Content-Length", strconv.FormatInt(sendSize, 10))
- }
- w.WriteHeader(http.StatusPartialContent)
- if _, err := io.CopyN(w, sendContent, sendSize); err != nil {
- http.Error(w, "Internal Error", http.StatusInternalServerError)
+ if r.Method == "HEAD" {
+ w.Header().Set("Content-Length", strconv.FormatInt(int64(filer2.TotalSize(entry.Chunks)), 10))
return
}
-}
-
-func (fs *FilerServer) writeContent(w io.Writer, entry *filer2.Entry, offset int64, size int) error {
+ filename := entry.Name()
+ adjustHeadersAfterHEAD(w, r, filename)
- chunkViews := filer2.ViewFromChunks(entry.Chunks, offset, size)
-
- fileId2Url := make(map[string]string)
-
- for _, chunkView := range chunkViews {
-
- urlString, err := fs.filer.MasterClient.LookupFileId(chunkView.FileId)
- if err != nil {
- glog.V(1).Infof("operation LookupFileId %s failed, err: %v", chunkView.FileId, err)
- return err
- }
- fileId2Url[chunkView.FileId] = urlString
- }
+ totalSize := int64(filer2.TotalSize(entry.Chunks))
- for _, chunkView := range chunkViews {
- urlString := fileId2Url[chunkView.FileId]
- _, err := util.ReadUrlAsStream(urlString, chunkView.Offset, int(chunkView.Size), func(data []byte) {
- w.Write(data)
- })
- if err != nil {
- glog.V(1).Infof("read %s failed, err: %v", chunkView.FileId, err)
- return err
+ if rangeReq := r.Header.Get("Range"); rangeReq == "" {
+ ext := filepath.Ext(filename)
+ width, height, mode, shouldResize := shouldResizeImages(ext, r)
+ if shouldResize {
+ data, err := filer2.ReadAll(fs.filer.MasterClient, entry.Chunks)
+ if err != nil {
+ glog.Errorf("failed to read %s: %v", path, err)
+ w.WriteHeader(http.StatusNotModified)
+ return
+ }
+ rs, _, _ := images.Resized(ext, bytes.NewReader(data), width, height, mode)
+ io.Copy(w, rs)
+ return
}
}
- return nil
+ processRangeRequest(r, w, totalSize, mimeType, func(writer io.Writer, offset int64, size int64) error {
+ return filer2.StreamContent(fs.filer.MasterClient, writer, entry.Chunks, offset, size)
+ })
}
diff --git a/weed/server/filer_server_handlers_read_dir.go b/weed/server/filer_server_handlers_read_dir.go
index bcf7f0eb5..ae28fc1db 100644
--- a/weed/server/filer_server_handlers_read_dir.go
+++ b/weed/server/filer_server_handlers_read_dir.go
@@ -1,13 +1,15 @@
package weed_server
import (
+ "context"
"net/http"
"strconv"
"strings"
- "github.com/chrislusf/seaweedfs/weed/filer2"
"github.com/chrislusf/seaweedfs/weed/glog"
ui "github.com/chrislusf/seaweedfs/weed/server/filer_ui"
+ "github.com/chrislusf/seaweedfs/weed/stats"
+ "github.com/chrislusf/seaweedfs/weed/util"
)
// listDirectoryHandler lists directories and folers under a directory
@@ -15,6 +17,9 @@ import (
// sub directories are listed on the first page, when "lastFileName"
// is empty.
func (fs *FilerServer) listDirectoryHandler(w http.ResponseWriter, r *http.Request) {
+
+ stats.FilerRequestCounter.WithLabelValues("list").Inc()
+
path := r.URL.Path
if strings.HasSuffix(path, "/") && len(path) > 1 {
path = path[:len(path)-1]
@@ -27,7 +32,7 @@ func (fs *FilerServer) listDirectoryHandler(w http.ResponseWriter, r *http.Reque
lastFileName := r.FormValue("lastFileName")
- entries, err := fs.filer.ListDirectoryEntries(filer2.FullPath(path), lastFileName, false, limit)
+ entries, err := fs.filer.ListDirectoryEntries(context.Background(), util.FullPath(path), lastFileName, false, limit)
if err != nil {
glog.V(0).Infof("listDirectory %s %s %d: %s", path, lastFileName, limit, err)
diff --git a/weed/server/filer_server_handlers_write.go b/weed/server/filer_server_handlers_write.go
index 32f481e74..a642c502a 100644
--- a/weed/server/filer_server_handlers_write.go
+++ b/weed/server/filer_server_handlers_write.go
@@ -1,11 +1,18 @@
package weed_server
import (
+ "context"
+ "crypto/md5"
"encoding/json"
"errors"
+ "fmt"
+ "io"
"io/ioutil"
+ "mime"
"net/http"
"net/url"
+ "os"
+ filenamePath "path"
"strconv"
"strings"
"time"
@@ -14,8 +21,10 @@ import (
"github.com/chrislusf/seaweedfs/weed/glog"
"github.com/chrislusf/seaweedfs/weed/operation"
"github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
+ "github.com/chrislusf/seaweedfs/weed/security"
+ "github.com/chrislusf/seaweedfs/weed/stats"
+ "github.com/chrislusf/seaweedfs/weed/storage/needle"
"github.com/chrislusf/seaweedfs/weed/util"
- "os"
)
var (
@@ -25,18 +34,23 @@ var (
type FilerPostResult struct {
Name string `json:"name,omitempty"`
- Size uint32 `json:"size,omitempty"`
+ Size int64 `json:"size,omitempty"`
Error string `json:"error,omitempty"`
Fid string `json:"fid,omitempty"`
Url string `json:"url,omitempty"`
}
-func (fs *FilerServer) assignNewFileInfo(w http.ResponseWriter, r *http.Request, replication, collection string, dataCenter string) (fileId, urlLocation string, err error) {
+func (fs *FilerServer) assignNewFileInfo(w http.ResponseWriter, r *http.Request, replication, collection, dataCenter, ttlString string, fsync bool) (fileId, urlLocation string, auth security.EncodedJwt, err error) {
+
+ stats.FilerRequestCounter.WithLabelValues("assign").Inc()
+ start := time.Now()
+ defer func() { stats.FilerRequestHistogram.WithLabelValues("assign").Observe(time.Since(start).Seconds()) }()
+
ar := &operation.VolumeAssignRequest{
Count: 1,
Replication: replication,
Collection: collection,
- Ttl: r.URL.Query().Get("ttl"),
+ Ttl: ttlString,
DataCenter: dataCenter,
}
var altRequest *operation.VolumeAssignRequest
@@ -45,12 +59,12 @@ func (fs *FilerServer) assignNewFileInfo(w http.ResponseWriter, r *http.Request,
Count: 1,
Replication: replication,
Collection: collection,
- Ttl: r.URL.Query().Get("ttl"),
+ Ttl: ttlString,
DataCenter: "",
}
}
- assignResult, ae := operation.Assign(fs.filer.GetMaster(), ar, altRequest)
+ assignResult, ae := operation.Assign(fs.filer.GetMaster(), fs.grpcDialOption, ar, altRequest)
if ae != nil {
glog.Errorf("failing to assign a file id: %v", ae)
writeJsonError(w, r, http.StatusInternalServerError, ae)
@@ -59,166 +73,293 @@ func (fs *FilerServer) assignNewFileInfo(w http.ResponseWriter, r *http.Request,
}
fileId = assignResult.Fid
urlLocation = "http://" + assignResult.Url + "/" + assignResult.Fid
+ if fsync {
+ urlLocation += "?fsync=true"
+ }
+ auth = assignResult.Auth
return
}
func (fs *FilerServer) PostHandler(w http.ResponseWriter, r *http.Request) {
+ ctx := context.Background()
+
query := r.URL.Query()
- replication := query.Get("replication")
- if replication == "" {
- replication = fs.option.DefaultReplication
- }
- collection := query.Get("collection")
- if collection == "" {
- collection = fs.option.Collection
- }
+ collection, replication, fsync := fs.detectCollection(r.RequestURI, query.Get("collection"), query.Get("replication"))
dataCenter := query.Get("dataCenter")
if dataCenter == "" {
dataCenter = fs.option.DataCenter
}
+ ttlString := r.URL.Query().Get("ttl")
+
+ // read ttl in seconds
+ ttl, err := needle.ReadTTL(ttlString)
+ ttlSeconds := int32(0)
+ if err == nil {
+ ttlSeconds = int32(ttl.Minutes()) * 60
+ }
+
+ if autoChunked := fs.autoChunk(ctx, w, r, replication, collection, dataCenter, ttlSeconds, ttlString, fsync); autoChunked {
+ return
+ }
+
+ if fs.option.Cipher {
+ reply, err := fs.encrypt(ctx, w, r, replication, collection, dataCenter, ttlSeconds, ttlString, fsync)
+ if err != nil {
+ writeJsonError(w, r, http.StatusInternalServerError, err)
+ } else if reply != nil {
+ writeJsonQuiet(w, r, http.StatusCreated, reply)
+ }
- if autoChunked := fs.autoChunk(w, r, replication, collection, dataCenter); autoChunked {
return
}
- fileId, urlLocation, err := fs.assignNewFileInfo(w, r, replication, collection, dataCenter)
+ fileId, urlLocation, auth, err := fs.assignNewFileInfo(w, r, replication, collection, dataCenter, ttlString, fsync)
if err != nil || fileId == "" || urlLocation == "" {
glog.V(0).Infof("fail to allocate volume for %s, collection:%s, datacenter:%s", r.URL.Path, collection, dataCenter)
+ writeJsonError(w, r, http.StatusInternalServerError, fmt.Errorf("fail to allocate volume for %s, collection:%s, datacenter:%s", r.URL.Path, collection, dataCenter))
return
}
glog.V(4).Infof("write %s to %v", r.URL.Path, urlLocation)
u, _ := url.Parse(urlLocation)
-
- // This allows a client to generate a chunk manifest and submit it to the filer -- it is a little off
- // because they need to provide FIDs instead of file paths...
- cm, _ := strconv.ParseBool(query.Get("cm"))
- if cm {
- q := u.Query()
- q.Set("cm", "true")
- u.RawQuery = q.Encode()
+ ret, md5value, err := fs.uploadToVolumeServer(r, u, auth, w, fileId)
+ if err != nil {
+ return
}
- glog.V(4).Infoln("post to", u)
- // send request to volume server
- request := &http.Request{
- Method: r.Method,
- URL: u,
- Proto: r.Proto,
- ProtoMajor: r.ProtoMajor,
- ProtoMinor: r.ProtoMinor,
- Header: r.Header,
- Body: r.Body,
- Host: r.Host,
- ContentLength: r.ContentLength,
- }
- resp, do_err := util.Do(request)
- if do_err != nil {
- glog.Errorf("failing to connect to volume server %s: %v, %+v", r.RequestURI, do_err, r.Method)
- writeJsonError(w, r, http.StatusInternalServerError, do_err)
+ if err = fs.updateFilerStore(ctx, r, w, replication, collection, ret, md5value, fileId, ttlSeconds); err != nil {
return
}
- defer resp.Body.Close()
- etag := resp.Header.Get("ETag")
- resp_body, ra_err := ioutil.ReadAll(resp.Body)
- if ra_err != nil {
- glog.V(0).Infoln("failing to upload to volume server", r.RequestURI, ra_err.Error())
- writeJsonError(w, r, http.StatusInternalServerError, ra_err)
- return
+
+ // send back post result
+ reply := FilerPostResult{
+ Name: ret.Name,
+ Size: int64(ret.Size),
+ Error: ret.Error,
+ Fid: fileId,
+ Url: urlLocation,
}
- glog.V(4).Infoln("post result", string(resp_body))
- var ret operation.UploadResult
- unmarshal_err := json.Unmarshal(resp_body, &ret)
- if unmarshal_err != nil {
- glog.V(0).Infoln("failing to read upload resonse", r.RequestURI, string(resp_body))
- writeJsonError(w, r, http.StatusInternalServerError, unmarshal_err)
- return
+ setEtag(w, ret.ETag)
+ writeJsonQuiet(w, r, http.StatusCreated, reply)
+}
+
+// update metadata in filer store
+func (fs *FilerServer) updateFilerStore(ctx context.Context, r *http.Request, w http.ResponseWriter, replication string,
+ collection string, ret *operation.UploadResult, md5value []byte, fileId string, ttlSeconds int32) (err error) {
+
+ stats.FilerRequestCounter.WithLabelValues("postStoreWrite").Inc()
+ start := time.Now()
+ defer func() {
+ stats.FilerRequestHistogram.WithLabelValues("postStoreWrite").Observe(time.Since(start).Seconds())
+ }()
+
+ modeStr := r.URL.Query().Get("mode")
+ if modeStr == "" {
+ modeStr = "0660"
}
- if ret.Error != "" {
- glog.V(0).Infoln("failing to post to volume server", r.RequestURI, ret.Error)
- writeJsonError(w, r, http.StatusInternalServerError, errors.New(ret.Error))
- return
+ mode, err := strconv.ParseUint(modeStr, 8, 32)
+ if err != nil {
+ glog.Errorf("Invalid mode format: %s, use 0660 by default", modeStr)
+ mode = 0660
}
- // find correct final path
path := r.URL.Path
if strings.HasSuffix(path, "/") {
if ret.Name != "" {
path += ret.Name
- } else {
- fs.filer.DeleteFileByFileId(fileId)
- glog.V(0).Infoln("Can not to write to folder", path, "without a file name!")
- writeJsonError(w, r, http.StatusInternalServerError,
- errors.New("Can not to write to folder "+path+" without a file name"))
- return
}
}
-
- // update metadata in filer store
- existingEntry, err := fs.filer.FindEntry(filer2.FullPath(path))
+ existingEntry, err := fs.filer.FindEntry(ctx, util.FullPath(path))
crTime := time.Now()
if err == nil && existingEntry != nil {
- // glog.V(4).Infof("existing %s => %+v", path, existingEntry)
- if existingEntry.IsDirectory() {
- path += "/" + ret.Name
- } else {
- crTime = existingEntry.Crtime
- }
+ crTime = existingEntry.Crtime
}
entry := &filer2.Entry{
- FullPath: filer2.FullPath(path),
+ FullPath: util.FullPath(path),
Attr: filer2.Attr{
Mtime: time.Now(),
Crtime: crTime,
- Mode: 0660,
+ Mode: os.FileMode(mode),
Uid: OS_UID,
Gid: OS_GID,
Replication: replication,
Collection: collection,
- TtlSec: int32(util.ParseInt(r.URL.Query().Get("ttl"), 0)),
+ TtlSec: ttlSeconds,
+ Mime: ret.Mime,
+ Md5: md5value,
},
Chunks: []*filer_pb.FileChunk{{
FileId: fileId,
Size: uint64(ret.Size),
Mtime: time.Now().UnixNano(),
- ETag: etag,
+ ETag: ret.ETag,
}},
}
+ if entry.Attr.Mime == "" {
+ if ext := filenamePath.Ext(path); ext != "" {
+ entry.Attr.Mime = mime.TypeByExtension(ext)
+ }
+ }
// glog.V(4).Infof("saving %s => %+v", path, entry)
- if db_err := fs.filer.CreateEntry(entry); db_err != nil {
- fs.filer.DeleteFileByFileId(fileId)
- glog.V(0).Infof("failing to write %s to filer server : %v", path, db_err)
- writeJsonError(w, r, http.StatusInternalServerError, db_err)
+ if dbErr := fs.filer.CreateEntry(ctx, entry, false, false); dbErr != nil {
+ fs.filer.DeleteChunks(entry.Chunks)
+ glog.V(0).Infof("failing to write %s to filer server : %v", path, dbErr)
+ writeJsonError(w, r, http.StatusInternalServerError, dbErr)
+ err = dbErr
return
}
- // send back post result
- reply := FilerPostResult{
- Name: ret.Name,
- Size: ret.Size,
- Error: ret.Error,
- Fid: fileId,
- Url: urlLocation,
+ return nil
+}
+
+// send request to volume server
+func (fs *FilerServer) uploadToVolumeServer(r *http.Request, u *url.URL, auth security.EncodedJwt, w http.ResponseWriter, fileId string) (ret *operation.UploadResult, md5value []byte, err error) {
+
+ stats.FilerRequestCounter.WithLabelValues("postUpload").Inc()
+ start := time.Now()
+ defer func() { stats.FilerRequestHistogram.WithLabelValues("postUpload").Observe(time.Since(start).Seconds()) }()
+
+ ret = &operation.UploadResult{}
+
+ md5Hash := md5.New()
+ body := r.Body
+ if r.Method == "PUT" {
+ // only PUT or large chunked files has Md5 in attributes
+ body = ioutil.NopCloser(io.TeeReader(r.Body, md5Hash))
}
- setEtag(w, etag)
- writeJsonQuiet(w, r, http.StatusCreated, reply)
+
+ request := &http.Request{
+ Method: r.Method,
+ URL: u,
+ Proto: r.Proto,
+ ProtoMajor: r.ProtoMajor,
+ ProtoMinor: r.ProtoMinor,
+ Header: r.Header,
+ Body: body,
+ Host: r.Host,
+ ContentLength: r.ContentLength,
+ }
+
+ if auth != "" {
+ request.Header.Set("Authorization", "BEARER "+string(auth))
+ }
+ resp, doErr := util.Do(request)
+ if doErr != nil {
+ glog.Errorf("failing to connect to volume server %s: %v, %+v", r.RequestURI, doErr, r.Method)
+ writeJsonError(w, r, http.StatusInternalServerError, doErr)
+ err = doErr
+ return
+ }
+ defer func() {
+ io.Copy(ioutil.Discard, resp.Body)
+ resp.Body.Close()
+ }()
+
+ respBody, raErr := ioutil.ReadAll(resp.Body)
+ if raErr != nil {
+ glog.V(0).Infoln("failing to upload to volume server", r.RequestURI, raErr.Error())
+ writeJsonError(w, r, http.StatusInternalServerError, raErr)
+ err = raErr
+ return
+ }
+
+ glog.V(4).Infoln("post result", string(respBody))
+ unmarshalErr := json.Unmarshal(respBody, &ret)
+ if unmarshalErr != nil {
+ glog.V(0).Infoln("failing to read upload resonse", r.RequestURI, string(respBody))
+ writeJsonError(w, r, http.StatusInternalServerError, unmarshalErr)
+ err = unmarshalErr
+ return
+ }
+ if ret.Error != "" {
+ err = errors.New(ret.Error)
+ glog.V(0).Infoln("failing to post to volume server", r.RequestURI, ret.Error)
+ writeJsonError(w, r, http.StatusInternalServerError, err)
+ return
+ }
+ // find correct final path
+ path := r.URL.Path
+ if strings.HasSuffix(path, "/") {
+ if ret.Name != "" {
+ path += ret.Name
+ } else {
+ err = fmt.Errorf("can not to write to folder %s without a file name", path)
+ fs.filer.DeleteFileByFileId(fileId)
+ glog.V(0).Infoln("Can not to write to folder", path, "without a file name!")
+ writeJsonError(w, r, http.StatusInternalServerError, err)
+ return
+ }
+ }
+ // use filer calculated md5 ETag, instead of the volume server crc ETag
+ if r.Method == "PUT" {
+ md5value = md5Hash.Sum(nil)
+ }
+ ret.ETag = getEtag(resp)
+ return
}
// curl -X DELETE http://localhost:8888/path/to
// curl -X DELETE http://localhost:8888/path/to?recursive=true
+// curl -X DELETE http://localhost:8888/path/to?recursive=true&ignoreRecursiveError=true
+// curl -X DELETE http://localhost:8888/path/to?recursive=true&skipChunkDeletion=true
func (fs *FilerServer) DeleteHandler(w http.ResponseWriter, r *http.Request) {
isRecursive := r.FormValue("recursive") == "true"
+ if !isRecursive && fs.option.recursiveDelete {
+ if r.FormValue("recursive") != "false" {
+ isRecursive = true
+ }
+ }
+ ignoreRecursiveError := r.FormValue("ignoreRecursiveError") == "true"
+ skipChunkDeletion := r.FormValue("skipChunkDeletion") == "true"
- err := fs.filer.DeleteEntryMetaAndData(filer2.FullPath(r.URL.Path), isRecursive, true)
+ objectPath := r.URL.Path
+ if len(r.URL.Path) > 1 && strings.HasSuffix(objectPath, "/") {
+ objectPath = objectPath[0 : len(objectPath)-1]
+ }
+
+ err := fs.filer.DeleteEntryMetaAndData(context.Background(), util.FullPath(objectPath), isRecursive, ignoreRecursiveError, !skipChunkDeletion, false)
if err != nil {
- glog.V(1).Infoln("deleting", r.URL.Path, ":", err.Error())
- writeJsonError(w, r, http.StatusInternalServerError, err)
+ glog.V(1).Infoln("deleting", objectPath, ":", err.Error())
+ httpStatus := http.StatusInternalServerError
+ if err == filer_pb.ErrNotFound {
+ httpStatus = http.StatusNotFound
+ }
+ writeJsonError(w, r, httpStatus, err)
return
}
w.WriteHeader(http.StatusNoContent)
}
+
+func (fs *FilerServer) detectCollection(requestURI, qCollection, qReplication string) (collection, replication string, fsync bool) {
+ // default
+ collection = fs.option.Collection
+ replication = fs.option.DefaultReplication
+
+ // get default collection settings
+ if qCollection != "" {
+ collection = qCollection
+ }
+ if qReplication != "" {
+ replication = qReplication
+ }
+
+ // required by buckets folder
+ if strings.HasPrefix(requestURI, fs.filer.DirBucketsPath+"/") {
+ bucketAndObjectKey := requestURI[len(fs.filer.DirBucketsPath)+1:]
+ t := strings.Index(bucketAndObjectKey, "/")
+ if t < 0 {
+ collection = bucketAndObjectKey
+ }
+ if t > 0 {
+ collection = bucketAndObjectKey[:t]
+ }
+ replication, fsync = fs.filer.ReadBucketOption(collection)
+ }
+
+ return
+}
diff --git a/weed/server/filer_server_handlers_write_autochunk.go b/weed/server/filer_server_handlers_write_autochunk.go
index 4b1745aaa..29546542c 100644
--- a/weed/server/filer_server_handlers_write_autochunk.go
+++ b/weed/server/filer_server_handlers_write_autochunk.go
@@ -1,7 +1,8 @@
package weed_server
import (
- "bytes"
+ "context"
+ "crypto/md5"
"io"
"io/ioutil"
"net/http"
@@ -14,10 +15,13 @@ import (
"github.com/chrislusf/seaweedfs/weed/glog"
"github.com/chrislusf/seaweedfs/weed/operation"
"github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
+ "github.com/chrislusf/seaweedfs/weed/security"
+ "github.com/chrislusf/seaweedfs/weed/stats"
"github.com/chrislusf/seaweedfs/weed/util"
)
-func (fs *FilerServer) autoChunk(w http.ResponseWriter, r *http.Request, replication string, collection string, dataCenter string) bool {
+func (fs *FilerServer) autoChunk(ctx context.Context, w http.ResponseWriter, r *http.Request,
+ replication string, collection string, dataCenter string, ttlSec int32, ttlString string, fsync bool) bool {
if r.Method != "POST" {
glog.V(4).Infoln("AutoChunking not supported for method", r.Method)
return false
@@ -53,7 +57,7 @@ func (fs *FilerServer) autoChunk(w http.ResponseWriter, r *http.Request, replica
return false
}
- reply, err := fs.doAutoChunk(w, r, contentLength, chunkSize, replication, collection, dataCenter)
+ reply, err := fs.doAutoChunk(ctx, w, r, contentLength, chunkSize, replication, collection, dataCenter, ttlSec, ttlString, fsync)
if err != nil {
writeJsonError(w, r, http.StatusInternalServerError, err)
} else if reply != nil {
@@ -62,7 +66,14 @@ func (fs *FilerServer) autoChunk(w http.ResponseWriter, r *http.Request, replica
return true
}
-func (fs *FilerServer) doAutoChunk(w http.ResponseWriter, r *http.Request, contentLength int64, chunkSize int32, replication string, collection string, dataCenter string) (filerResult *FilerPostResult, replyerr error) {
+func (fs *FilerServer) doAutoChunk(ctx context.Context, w http.ResponseWriter, r *http.Request,
+ contentLength int64, chunkSize int32, replication string, collection string, dataCenter string, ttlSec int32, ttlString string, fsync bool) (filerResult *FilerPostResult, replyerr error) {
+
+ stats.FilerRequestCounter.WithLabelValues("postAutoChunk").Inc()
+ start := time.Now()
+ defer func() {
+ stats.FilerRequestHistogram.WithLabelValues("postAutoChunk").Observe(time.Since(start).Seconds())
+ }()
multipartReader, multipartReaderErr := r.MultipartReader()
if multipartReaderErr != nil {
@@ -78,68 +89,46 @@ func (fs *FilerServer) doAutoChunk(w http.ResponseWriter, r *http.Request, conte
if fileName != "" {
fileName = path.Base(fileName)
}
+ contentType := part1.Header.Get("Content-Type")
var fileChunks []*filer_pb.FileChunk
- totalBytesRead := int64(0)
- tmpBufferSize := int32(1024 * 1024)
- tmpBuffer := bytes.NewBuffer(make([]byte, 0, tmpBufferSize))
- chunkBuf := make([]byte, chunkSize+tmpBufferSize, chunkSize+tmpBufferSize) // chunk size plus a little overflow
- chunkBufOffset := int32(0)
+ md5Hash := md5.New()
+ var partReader = ioutil.NopCloser(io.TeeReader(part1, md5Hash))
+
chunkOffset := int64(0)
- writtenChunks := 0
- filerResult = &FilerPostResult{
- Name: fileName,
- }
+ for chunkOffset < contentLength {
+ limitedReader := io.LimitReader(partReader, int64(chunkSize))
- for totalBytesRead < contentLength {
- tmpBuffer.Reset()
- bytesRead, readErr := io.CopyN(tmpBuffer, part1, int64(tmpBufferSize))
- readFully := readErr != nil && readErr == io.EOF
- tmpBuf := tmpBuffer.Bytes()
- bytesToCopy := tmpBuf[0:int(bytesRead)]
-
- copy(chunkBuf[chunkBufOffset:chunkBufOffset+int32(bytesRead)], bytesToCopy)
- chunkBufOffset = chunkBufOffset + int32(bytesRead)
-
- if chunkBufOffset >= chunkSize || readFully || (chunkBufOffset > 0 && bytesRead == 0) {
- writtenChunks = writtenChunks + 1
- fileId, urlLocation, assignErr := fs.assignNewFileInfo(w, r, replication, collection, dataCenter)
- if assignErr != nil {
- return nil, assignErr
- }
-
- // upload the chunk to the volume server
- chunkName := fileName + "_chunk_" + strconv.FormatInt(int64(len(fileChunks)+1), 10)
- uploadErr := fs.doUpload(urlLocation, w, r, chunkBuf[0:chunkBufOffset], chunkName, "application/octet-stream", fileId)
- if uploadErr != nil {
- return nil, uploadErr
- }
-
- // Save to chunk manifest structure
- fileChunks = append(fileChunks,
- &filer_pb.FileChunk{
- FileId: fileId,
- Offset: chunkOffset,
- Size: uint64(chunkBufOffset),
- Mtime: time.Now().UnixNano(),
- },
- )
-
- // reset variables for the next chunk
- chunkBufOffset = 0
- chunkOffset = totalBytesRead + int64(bytesRead)
+ // assign one file id for one chunk
+ fileId, urlLocation, auth, assignErr := fs.assignNewFileInfo(w, r, replication, collection, dataCenter, ttlString, fsync)
+ if assignErr != nil {
+ return nil, assignErr
}
- totalBytesRead = totalBytesRead + int64(bytesRead)
+ // upload the chunk to the volume server
+ uploadResult, uploadErr := fs.doUpload(urlLocation, w, r, limitedReader, fileName, contentType, nil, auth)
+ if uploadErr != nil {
+ return nil, uploadErr
+ }
- if bytesRead == 0 || readFully {
+ // if last chunk exhausted the reader exactly at the border
+ if uploadResult.Size == 0 {
break
}
- if readErr != nil {
- return nil, readErr
+ // Save to chunk manifest structure
+ fileChunks = append(fileChunks, uploadResult.ToPbFileChunk(fileId, chunkOffset))
+
+ glog.V(4).Infof("uploaded %s chunk %d to %s [%d,%d) of %d", fileName, len(fileChunks), fileId, chunkOffset, chunkOffset+int64(uploadResult.Size), contentLength)
+
+ // reset variables for the next chunk
+ chunkOffset = chunkOffset + int64(uploadResult.Size)
+
+ // if last chunk was not at full chunk size, but already exhausted the reader
+ if int64(uploadResult.Size) < int64(chunkSize) {
+ break
}
}
@@ -152,7 +141,7 @@ func (fs *FilerServer) doAutoChunk(w http.ResponseWriter, r *http.Request, conte
glog.V(4).Infoln("saving", path)
entry := &filer2.Entry{
- FullPath: filer2.FullPath(path),
+ FullPath: util.FullPath(path),
Attr: filer2.Attr{
Mtime: time.Now(),
Crtime: time.Now(),
@@ -161,30 +150,37 @@ func (fs *FilerServer) doAutoChunk(w http.ResponseWriter, r *http.Request, conte
Gid: OS_GID,
Replication: replication,
Collection: collection,
- TtlSec: int32(util.ParseInt(r.URL.Query().Get("ttl"), 0)),
+ TtlSec: ttlSec,
+ Mime: contentType,
+ Md5: md5Hash.Sum(nil),
},
Chunks: fileChunks,
}
- if db_err := fs.filer.CreateEntry(entry); db_err != nil {
- replyerr = db_err
- filerResult.Error = db_err.Error()
- glog.V(0).Infof("failing to write %s to filer server : %v", path, db_err)
+
+ filerResult = &FilerPostResult{
+ Name: fileName,
+ Size: chunkOffset,
+ }
+
+ if dbErr := fs.filer.CreateEntry(ctx, entry, false, false); dbErr != nil {
+ fs.filer.DeleteChunks(entry.Chunks)
+ replyerr = dbErr
+ filerResult.Error = dbErr.Error()
+ glog.V(0).Infof("failing to write %s to filer server : %v", path, dbErr)
return
}
return
}
-func (fs *FilerServer) doUpload(urlLocation string, w http.ResponseWriter, r *http.Request, chunkBuf []byte, fileName string, contentType string, fileId string) (err error) {
- err = nil
+func (fs *FilerServer) doUpload(urlLocation string, w http.ResponseWriter, r *http.Request, limitedReader io.Reader, fileName string, contentType string, pairMap map[string]string, auth security.EncodedJwt) (*operation.UploadResult, error) {
- ioReader := ioutil.NopCloser(bytes.NewBuffer(chunkBuf))
- uploadResult, uploadError := operation.Upload(urlLocation, fileName, ioReader, false, contentType, nil, fs.jwt(fileId))
- if uploadResult != nil {
- glog.V(0).Infoln("Chunk upload result. Name:", uploadResult.Name, "Fid:", fileId, "Size:", uploadResult.Size)
- }
- if uploadError != nil {
- err = uploadError
- }
- return
+ stats.FilerRequestCounter.WithLabelValues("postAutoChunkUpload").Inc()
+ start := time.Now()
+ defer func() {
+ stats.FilerRequestHistogram.WithLabelValues("postAutoChunkUpload").Observe(time.Since(start).Seconds())
+ }()
+
+ uploadResult, err, _ := operation.Upload(urlLocation, fileName, fs.option.Cipher, limitedReader, false, contentType, pairMap, auth)
+ return uploadResult, err
}
diff --git a/weed/server/filer_server_handlers_write_cipher.go b/weed/server/filer_server_handlers_write_cipher.go
new file mode 100644
index 000000000..17f35838d
--- /dev/null
+++ b/weed/server/filer_server_handlers_write_cipher.go
@@ -0,0 +1,90 @@
+package weed_server
+
+import (
+ "context"
+ "fmt"
+ "net/http"
+ "strings"
+ "time"
+
+ "github.com/chrislusf/seaweedfs/weed/filer2"
+ "github.com/chrislusf/seaweedfs/weed/glog"
+ "github.com/chrislusf/seaweedfs/weed/operation"
+ "github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
+ "github.com/chrislusf/seaweedfs/weed/storage/needle"
+ "github.com/chrislusf/seaweedfs/weed/util"
+)
+
+// handling single chunk POST or PUT upload
+func (fs *FilerServer) encrypt(ctx context.Context, w http.ResponseWriter, r *http.Request,
+ replication string, collection string, dataCenter string, ttlSeconds int32, ttlString string, fsync bool) (filerResult *FilerPostResult, err error) {
+
+ fileId, urlLocation, auth, err := fs.assignNewFileInfo(w, r, replication, collection, dataCenter, ttlString, fsync)
+
+ if err != nil || fileId == "" || urlLocation == "" {
+ return nil, fmt.Errorf("fail to allocate volume for %s, collection:%s, datacenter:%s", r.URL.Path, collection, dataCenter)
+ }
+
+ glog.V(4).Infof("write %s to %v", r.URL.Path, urlLocation)
+
+ // Note: encrypt(gzip(data)), encrypt data first, then gzip
+
+ sizeLimit := int64(fs.option.MaxMB) * 1024 * 1024
+
+ pu, err := needle.ParseUpload(r, sizeLimit)
+ uncompressedData := pu.Data
+ if pu.IsGzipped {
+ uncompressedData = pu.UncompressedData
+ }
+ if pu.MimeType == "" {
+ pu.MimeType = http.DetectContentType(uncompressedData)
+ // println("detect2 mimetype to", pu.MimeType)
+ }
+
+ uploadResult, uploadError := operation.UploadData(urlLocation, pu.FileName, true, uncompressedData, false, pu.MimeType, pu.PairMap, auth)
+ if uploadError != nil {
+ return nil, fmt.Errorf("upload to volume server: %v", uploadError)
+ }
+
+ // Save to chunk manifest structure
+ fileChunks := []*filer_pb.FileChunk{uploadResult.ToPbFileChunk(fileId, 0)}
+
+ // fmt.Printf("uploaded: %+v\n", uploadResult)
+
+ path := r.URL.Path
+ if strings.HasSuffix(path, "/") {
+ if pu.FileName != "" {
+ path += pu.FileName
+ }
+ }
+
+ entry := &filer2.Entry{
+ FullPath: util.FullPath(path),
+ Attr: filer2.Attr{
+ Mtime: time.Now(),
+ Crtime: time.Now(),
+ Mode: 0660,
+ Uid: OS_UID,
+ Gid: OS_GID,
+ Replication: replication,
+ Collection: collection,
+ TtlSec: ttlSeconds,
+ Mime: pu.MimeType,
+ },
+ Chunks: fileChunks,
+ }
+
+ filerResult = &FilerPostResult{
+ Name: pu.FileName,
+ Size: int64(pu.OriginalDataSize),
+ }
+
+ if dbErr := fs.filer.CreateEntry(ctx, entry, false, false); dbErr != nil {
+ fs.filer.DeleteChunks(entry.Chunks)
+ err = dbErr
+ filerResult.Error = dbErr.Error()
+ return
+ }
+
+ return
+}
diff --git a/weed/server/filer_ui/breadcrumb.go b/weed/server/filer_ui/breadcrumb.go
index d056a4b25..f21cce7d1 100644
--- a/weed/server/filer_ui/breadcrumb.go
+++ b/weed/server/filer_ui/breadcrumb.go
@@ -1,8 +1,9 @@
package master_ui
import (
- "path/filepath"
"strings"
+
+ "github.com/chrislusf/seaweedfs/weed/util"
)
type Breadcrumb struct {
@@ -14,10 +15,14 @@ func ToBreadcrumb(fullpath string) (crumbs []Breadcrumb) {
parts := strings.Split(fullpath, "/")
for i := 0; i < len(parts); i++ {
- crumbs = append(crumbs, Breadcrumb{
- Name: parts[i] + "/",
- Link: "/" + filepath.Join(parts[0:i+1]...),
- })
+ crumb := Breadcrumb{
+ Name: parts[i] + " /",
+ Link: "/" + util.Join(parts[0:i+1]...),
+ }
+ if !strings.HasSuffix(crumb.Link, "/") {
+ crumb.Link += "/"
+ }
+ crumbs = append(crumbs, crumb)
}
return
diff --git a/weed/server/filer_ui/templates.go b/weed/server/filer_ui/templates.go
index e31685ea0..e532b27e2 100644
--- a/weed/server/filer_ui/templates.go
+++ b/weed/server/filer_ui/templates.go
@@ -50,7 +50,7 @@ var StatusTpl = template.Must(template.New("status").Funcs(funcMap).Parse(`<!DOC
<div class="row">
<div>
{{ range $entry := .Breadcrumbs }}
- <a href={{ $entry.Link }} >
+ <a href="{{ $entry.Link }}" >
{{ $entry.Name }}
</a>
{{ end }}
@@ -78,20 +78,19 @@ var StatusTpl = template.Must(template.New("status").Funcs(funcMap).Parse(`<!DOC
</a>
{{end}}
</td>
- <td align="right">
+ <td align="right" nowrap>
{{if $entry.IsDirectory}}
{{else}}
- {{ $entry.Mime }}
+ {{ $entry.Mime }}&nbsp;
{{end}}
</td>
- <td align="right">
+ <td align="right" nowrap>
{{if $entry.IsDirectory}}
{{else}}
- {{ $entry.Size | humanizeBytes }}
- &nbsp;&nbsp;&nbsp;
+ {{ $entry.Size | humanizeBytes }}&nbsp;
{{end}}
</td>
- <td>
+ <td nowrap>
{{ $entry.Timestamp.Format "2006-01-02 15:04" }}
</td>
</tr>
@@ -162,7 +161,7 @@ function uploadFile(file, i) {
var url = window.location.href
var xhr = new XMLHttpRequest()
var formData = new FormData()
- xhr.open('POST', url, true)
+ xhr.open('POST', url, false)
formData.append('file', file)
xhr.send(formData)
diff --git a/weed/server/master_grpc_server.go b/weed/server/master_grpc_server.go
index 93dce59d8..1ee214deb 100644
--- a/weed/server/master_grpc_server.go
+++ b/weed/server/master_grpc_server.go
@@ -1,16 +1,20 @@
package weed_server
import (
+ "context"
"fmt"
"net"
"strings"
"time"
"github.com/chrislusf/raft"
+ "google.golang.org/grpc/peer"
+
"github.com/chrislusf/seaweedfs/weed/glog"
"github.com/chrislusf/seaweedfs/weed/pb/master_pb"
+ "github.com/chrislusf/seaweedfs/weed/storage/backend"
+ "github.com/chrislusf/seaweedfs/weed/storage/needle"
"github.com/chrislusf/seaweedfs/weed/topology"
- "google.golang.org/grpc/peer"
)
func (ms *MasterServer) SendHeartbeat(stream master_pb.Seaweed_SendHeartbeatServer) error {
@@ -20,8 +24,10 @@ func (ms *MasterServer) SendHeartbeat(stream master_pb.Seaweed_SendHeartbeatServ
defer func() {
if dn != nil {
- glog.V(0).Infof("unregister disconnected volume server %s:%d", dn.Ip, dn.Port)
+ // if the volume server disconnects and reconnects quickly
+ // the unregister and register can race with each other
t.UnRegisterDataNode(dn)
+ glog.V(0).Infof("unregister disconnected volume server %s:%d", dn.Ip, dn.Port)
message := &master_pb.VolumeLocation{
Url: dn.Url(),
@@ -30,6 +36,9 @@ func (ms *MasterServer) SendHeartbeat(stream master_pb.Seaweed_SendHeartbeatServ
for _, v := range dn.GetVolumes() {
message.DeletedVids = append(message.DeletedVids, uint32(v.Id))
}
+ for _, s := range dn.GetEcShards() {
+ message.DeletedVids = append(message.DeletedVids, uint32(s.VolumeId))
+ }
if len(message.DeletedVids) > 0 {
ms.clientChansLock.RLock()
@@ -45,57 +54,109 @@ func (ms *MasterServer) SendHeartbeat(stream master_pb.Seaweed_SendHeartbeatServ
for {
heartbeat, err := stream.Recv()
if err != nil {
+ if dn != nil {
+ glog.Warningf("SendHeartbeat.Recv server %s:%d : %v", dn.Ip, dn.Port, err)
+ } else {
+ glog.Warningf("SendHeartbeat.Recv: %v", err)
+ }
return err
}
+ t.Sequence.SetMax(heartbeat.MaxFileKey)
+
if dn == nil {
- t.Sequence.SetMax(heartbeat.MaxFileKey)
- if heartbeat.Ip == "" {
- if pr, ok := peer.FromContext(stream.Context()); ok {
- if pr.Addr != net.Addr(nil) {
- heartbeat.Ip = pr.Addr.String()[0:strings.LastIndex(pr.Addr.String(), ":")]
- glog.V(0).Infof("remote IP address is detected as %v", heartbeat.Ip)
- }
- }
- }
dcName, rackName := t.Configuration.Locate(heartbeat.Ip, heartbeat.DataCenter, heartbeat.Rack)
dc := t.GetOrCreateDataCenter(dcName)
rack := dc.GetOrCreateRack(rackName)
dn = rack.GetOrCreateDataNode(heartbeat.Ip,
int(heartbeat.Port), heartbeat.PublicUrl,
- int(heartbeat.MaxVolumeCount))
+ int64(heartbeat.MaxVolumeCount))
glog.V(0).Infof("added volume server %v:%d", heartbeat.GetIp(), heartbeat.GetPort())
if err := stream.Send(&master_pb.HeartbeatResponse{
- VolumeSizeLimit: uint64(ms.volumeSizeLimitMB) * 1024 * 1024,
- SecretKey: string(ms.guard.SecretKey),
+ VolumeSizeLimit: uint64(ms.option.VolumeSizeLimitMB) * 1024 * 1024,
+ MetricsAddress: ms.option.MetricsAddress,
+ MetricsIntervalSeconds: uint32(ms.option.MetricsIntervalSec),
+ StorageBackends: backend.ToPbStorageBackends(),
}); err != nil {
+ glog.Warningf("SendHeartbeat.Send volume size to %s:%d %v", dn.Ip, dn.Port, err)
return err
}
}
+ if heartbeat.MaxVolumeCount != 0 && dn.GetMaxVolumeCount() != int64(heartbeat.MaxVolumeCount) {
+ delta := int64(heartbeat.MaxVolumeCount) - dn.GetMaxVolumeCount()
+ dn.UpAdjustMaxVolumeCountDelta(delta)
+ }
+
+ glog.V(4).Infof("master received heartbeat %s", heartbeat.String())
message := &master_pb.VolumeLocation{
Url: dn.Url(),
PublicUrl: dn.PublicUrl,
}
- if len(heartbeat.NewVids) > 0 || len(heartbeat.DeletedVids) > 0 {
+ if len(heartbeat.NewVolumes) > 0 || len(heartbeat.DeletedVolumes) > 0 {
// process delta volume ids if exists for fast volume id updates
- message.NewVids = append(message.NewVids, heartbeat.NewVids...)
- message.DeletedVids = append(message.DeletedVids, heartbeat.DeletedVids...)
- } else {
+ for _, volInfo := range heartbeat.NewVolumes {
+ message.NewVids = append(message.NewVids, volInfo.Id)
+ }
+ for _, volInfo := range heartbeat.DeletedVolumes {
+ message.DeletedVids = append(message.DeletedVids, volInfo.Id)
+ }
+ // update master internal volume layouts
+ t.IncrementalSyncDataNodeRegistration(heartbeat.NewVolumes, heartbeat.DeletedVolumes, dn)
+ }
+
+ if len(heartbeat.Volumes) > 0 || heartbeat.HasNoVolumes {
// process heartbeat.Volumes
newVolumes, deletedVolumes := t.SyncDataNodeRegistration(heartbeat.Volumes, dn)
for _, v := range newVolumes {
+ glog.V(0).Infof("master see new volume %d from %s", uint32(v.Id), dn.Url())
message.NewVids = append(message.NewVids, uint32(v.Id))
}
for _, v := range deletedVolumes {
+ glog.V(0).Infof("master see deleted volume %d from %s", uint32(v.Id), dn.Url())
message.DeletedVids = append(message.DeletedVids, uint32(v.Id))
}
}
+ if len(heartbeat.NewEcShards) > 0 || len(heartbeat.DeletedEcShards) > 0 {
+
+ // update master internal volume layouts
+ t.IncrementalSyncDataNodeEcShards(heartbeat.NewEcShards, heartbeat.DeletedEcShards, dn)
+
+ for _, s := range heartbeat.NewEcShards {
+ message.NewVids = append(message.NewVids, s.Id)
+ }
+ for _, s := range heartbeat.DeletedEcShards {
+ if dn.HasVolumesById(needle.VolumeId(s.Id)) {
+ continue
+ }
+ message.DeletedVids = append(message.DeletedVids, s.Id)
+ }
+
+ }
+
+ if len(heartbeat.EcShards) > 0 || heartbeat.HasNoEcShards {
+ glog.V(1).Infof("master recieved ec shards from %s: %+v", dn.Url(), heartbeat.EcShards)
+ newShards, deletedShards := t.SyncDataNodeEcShards(heartbeat.EcShards, dn)
+
+ // broadcast the ec vid changes to master clients
+ for _, s := range newShards {
+ message.NewVids = append(message.NewVids, uint32(s.VolumeId))
+ }
+ for _, s := range deletedShards {
+ if dn.HasVolumesById(s.VolumeId) {
+ continue
+ }
+ message.DeletedVids = append(message.DeletedVids, uint32(s.VolumeId))
+ }
+
+ }
+
if len(message.NewVids) > 0 || len(message.DeletedVids) > 0 {
ms.clientChansLock.RLock()
- for _, ch := range ms.clientChans {
+ for host, ch := range ms.clientChans {
+ glog.V(0).Infof("master send to %s: %s", host, message.String())
ch <- message
}
ms.clientChansLock.RUnlock()
@@ -103,12 +164,15 @@ func (ms *MasterServer) SendHeartbeat(stream master_pb.Seaweed_SendHeartbeatServ
// tell the volume servers about the leader
newLeader, err := t.Leader()
- if err == nil {
- if err := stream.Send(&master_pb.HeartbeatResponse{
- Leader: newLeader,
- }); err != nil {
- return err
- }
+ if err != nil {
+ glog.Warningf("SendHeartbeat find leader: %v", err)
+ return err
+ }
+ if err := stream.Send(&master_pb.HeartbeatResponse{
+ Leader: newLeader,
+ }); err != nil {
+ glog.Warningf("SendHeartbeat.Send response to to %s:%d %v", dn.Ip, dn.Port, err)
+ return err
}
}
}
@@ -123,38 +187,16 @@ func (ms *MasterServer) KeepConnected(stream master_pb.Seaweed_KeepConnectedServ
}
if !ms.Topo.IsLeader() {
- return raft.NotLeaderError
- }
-
- // remember client address
- ctx := stream.Context()
- // fmt.Printf("FromContext %+v\n", ctx)
- pr, ok := peer.FromContext(ctx)
- if !ok {
- glog.Error("failed to get peer from ctx")
- return fmt.Errorf("failed to get peer from ctx")
- }
- if pr.Addr == net.Addr(nil) {
- glog.Error("failed to get peer address")
- return fmt.Errorf("failed to get peer address")
+ return ms.informNewLeader(stream)
}
- clientName := req.Name + pr.Addr.String()
- glog.V(0).Infof("+ client %v", clientName)
+ peerAddress := findClientAddress(stream.Context(), req.GrpcPort)
- messageChan := make(chan *master_pb.VolumeLocation)
stopChan := make(chan bool)
- ms.clientChansLock.Lock()
- ms.clientChans[clientName] = messageChan
- ms.clientChansLock.Unlock()
+ clientName, messageChan := ms.addClient(req.Name, peerAddress)
- defer func() {
- glog.V(0).Infof("- client %v", clientName)
- ms.clientChansLock.Lock()
- delete(ms.clientChans, clientName)
- ms.clientChansLock.Unlock()
- }()
+ defer ms.deleteClient(clientName)
for _, message := range ms.Topo.ToVolumeLocations() {
if err := stream.Send(message); err != nil {
@@ -183,12 +225,79 @@ func (ms *MasterServer) KeepConnected(stream master_pb.Seaweed_KeepConnectedServ
}
case <-ticker.C:
if !ms.Topo.IsLeader() {
- return raft.NotLeaderError
+ return ms.informNewLeader(stream)
}
case <-stopChan:
return nil
}
}
+}
+
+func (ms *MasterServer) informNewLeader(stream master_pb.Seaweed_KeepConnectedServer) error {
+ leader, err := ms.Topo.Leader()
+ if err != nil {
+ glog.Errorf("topo leader: %v", err)
+ return raft.NotLeaderError
+ }
+ if err := stream.Send(&master_pb.VolumeLocation{
+ Leader: leader,
+ }); err != nil {
+ return err
+ }
return nil
}
+
+func (ms *MasterServer) addClient(clientType string, clientAddress string) (clientName string, messageChan chan *master_pb.VolumeLocation) {
+ clientName = clientType + "@" + clientAddress
+ glog.V(0).Infof("+ client %v", clientName)
+
+ messageChan = make(chan *master_pb.VolumeLocation)
+
+ ms.clientChansLock.Lock()
+ ms.clientChans[clientName] = messageChan
+ ms.clientChansLock.Unlock()
+ return
+}
+
+func (ms *MasterServer) deleteClient(clientName string) {
+ glog.V(0).Infof("- client %v", clientName)
+ ms.clientChansLock.Lock()
+ delete(ms.clientChans, clientName)
+ ms.clientChansLock.Unlock()
+}
+
+func findClientAddress(ctx context.Context, grpcPort uint32) string {
+ // fmt.Printf("FromContext %+v\n", ctx)
+ pr, ok := peer.FromContext(ctx)
+ if !ok {
+ glog.Error("failed to get peer from ctx")
+ return ""
+ }
+ if pr.Addr == net.Addr(nil) {
+ glog.Error("failed to get peer address")
+ return ""
+ }
+ if grpcPort == 0 {
+ return pr.Addr.String()
+ }
+ if tcpAddr, ok := pr.Addr.(*net.TCPAddr); ok {
+ externalIP := tcpAddr.IP
+ return fmt.Sprintf("%s:%d", externalIP, grpcPort)
+ }
+ return pr.Addr.String()
+
+}
+
+func (ms *MasterServer) ListMasterClients(ctx context.Context, req *master_pb.ListMasterClientsRequest) (*master_pb.ListMasterClientsResponse, error) {
+ resp := &master_pb.ListMasterClientsResponse{}
+ ms.clientChansLock.RLock()
+ defer ms.clientChansLock.RUnlock()
+
+ for k := range ms.clientChans {
+ if strings.HasPrefix(k, req.ClientType+"@") {
+ resp.GrpcAddresses = append(resp.GrpcAddresses, k[len(req.ClientType)+1:])
+ }
+ }
+ return resp, nil
+}
diff --git a/weed/server/master_grpc_server_admin.go b/weed/server/master_grpc_server_admin.go
new file mode 100644
index 000000000..7e7dcb36b
--- /dev/null
+++ b/weed/server/master_grpc_server_admin.go
@@ -0,0 +1,138 @@
+package weed_server
+
+import (
+ "context"
+ "fmt"
+ "math/rand"
+ "sync"
+ "time"
+
+ "github.com/chrislusf/seaweedfs/weed/pb/master_pb"
+)
+
+/*
+How exclusive lock works?
+-----------
+
+Shell
+------
+When shell lock,
+ * lease an admin token (lockTime, token)
+ * start a goroutine to renew the admin token periodically
+
+When shell unlock
+ * stop the renewal goroutine
+ * sends a release lock request
+
+Master
+------
+Master maintains:
+ * randomNumber
+ * lastLockTime
+When master receives the lease/renew request from shell
+ If lastLockTime still fresh {
+ if is a renew and token is valid {
+ // for renew
+ generate the randomNumber => token
+ return
+ }
+ refuse
+ return
+ } else {
+ // for fresh lease request
+ generate the randomNumber => token
+ return
+ }
+
+When master receives the release lock request from shell
+ set the lastLockTime to zero
+
+
+The volume server does not need to verify.
+This makes the lock/unlock optional, similar to what golang code usually does.
+
+*/
+
+const (
+ LockDuration = 10 * time.Second
+)
+
+type AdminLock struct {
+ accessSecret int64
+ accessLockTime time.Time
+}
+
+type AdminLocks struct {
+ locks map[string]*AdminLock
+ sync.RWMutex
+}
+
+func NewAdminLocks() *AdminLocks {
+ return &AdminLocks{
+ locks: make(map[string]*AdminLock),
+ }
+}
+
+func (locks *AdminLocks) isLocked(lockName string) bool {
+ locks.RLock()
+ defer locks.RUnlock()
+ adminLock, found := locks.locks[lockName]
+ if !found {
+ return false
+ }
+ return adminLock.accessLockTime.Add(LockDuration).After(time.Now())
+}
+
+func (locks *AdminLocks) isValidToken(lockName string, ts time.Time, token int64) bool {
+ locks.RLock()
+ defer locks.RUnlock()
+ adminLock, found := locks.locks[lockName]
+ if !found {
+ return false
+ }
+ return adminLock.accessLockTime.Equal(ts) && adminLock.accessSecret == token
+}
+
+func (locks *AdminLocks) generateToken(lockName string) (ts time.Time, token int64) {
+ locks.Lock()
+ defer locks.Unlock()
+ lock := &AdminLock{
+ accessSecret: rand.Int63(),
+ accessLockTime: time.Now(),
+ }
+ locks.locks[lockName] = lock
+ return lock.accessLockTime, lock.accessSecret
+}
+
+func (locks *AdminLocks) deleteLock(lockName string) {
+ locks.Lock()
+ defer locks.Unlock()
+ delete(locks.locks, lockName)
+}
+
+func (ms *MasterServer) LeaseAdminToken(ctx context.Context, req *master_pb.LeaseAdminTokenRequest) (*master_pb.LeaseAdminTokenResponse, error) {
+ resp := &master_pb.LeaseAdminTokenResponse{}
+
+ if ms.adminLocks.isLocked(req.LockName) {
+ if req.PreviousToken != 0 && ms.adminLocks.isValidToken(req.LockName, time.Unix(0, req.PreviousLockTime), req.PreviousToken) {
+ // for renew
+ ts, token := ms.adminLocks.generateToken(req.LockName)
+ resp.Token, resp.LockTsNs = token, ts.UnixNano()
+ return resp, nil
+ }
+ // refuse since still locked
+ return resp, fmt.Errorf("already locked")
+ }
+ // for fresh lease request
+ ts, token := ms.adminLocks.generateToken(req.LockName)
+ resp.Token, resp.LockTsNs = token, ts.UnixNano()
+ return resp, nil
+}
+
+func (ms *MasterServer) ReleaseAdminToken(ctx context.Context, req *master_pb.ReleaseAdminTokenRequest) (*master_pb.ReleaseAdminTokenResponse, error) {
+ resp := &master_pb.ReleaseAdminTokenResponse{}
+ if ms.adminLocks.isValidToken(req.LockName, time.Unix(0, req.PreviousLockTime), req.PreviousToken) {
+ ms.adminLocks.deleteLock(req.LockName)
+ }
+ return resp, nil
+}
diff --git a/weed/server/master_grpc_server_collection.go b/weed/server/master_grpc_server_collection.go
new file mode 100644
index 000000000..b92d6bcbe
--- /dev/null
+++ b/weed/server/master_grpc_server_collection.go
@@ -0,0 +1,95 @@
+package weed_server
+
+import (
+ "context"
+
+ "github.com/chrislusf/raft"
+
+ "github.com/chrislusf/seaweedfs/weed/operation"
+ "github.com/chrislusf/seaweedfs/weed/pb/master_pb"
+ "github.com/chrislusf/seaweedfs/weed/pb/volume_server_pb"
+)
+
+func (ms *MasterServer) CollectionList(ctx context.Context, req *master_pb.CollectionListRequest) (*master_pb.CollectionListResponse, error) {
+
+ if !ms.Topo.IsLeader() {
+ return nil, raft.NotLeaderError
+ }
+
+ resp := &master_pb.CollectionListResponse{}
+ collections := ms.Topo.ListCollections(req.IncludeNormalVolumes, req.IncludeEcVolumes)
+ for _, c := range collections {
+ resp.Collections = append(resp.Collections, &master_pb.Collection{
+ Name: c,
+ })
+ }
+
+ return resp, nil
+}
+
+func (ms *MasterServer) CollectionDelete(ctx context.Context, req *master_pb.CollectionDeleteRequest) (*master_pb.CollectionDeleteResponse, error) {
+
+ if !ms.Topo.IsLeader() {
+ return nil, raft.NotLeaderError
+ }
+
+ resp := &master_pb.CollectionDeleteResponse{}
+
+ err := ms.doDeleteNormalCollection(req.Name)
+
+ if err != nil {
+ return nil, err
+ }
+
+ err = ms.doDeleteEcCollection(req.Name)
+
+ if err != nil {
+ return nil, err
+ }
+
+ return resp, nil
+}
+
+func (ms *MasterServer) doDeleteNormalCollection(collectionName string) error {
+
+ collection, ok := ms.Topo.FindCollection(collectionName)
+ if !ok {
+ return nil
+ }
+
+ for _, server := range collection.ListVolumeServers() {
+ err := operation.WithVolumeServerClient(server.Url(), ms.grpcDialOption, func(client volume_server_pb.VolumeServerClient) error {
+ _, deleteErr := client.DeleteCollection(context.Background(), &volume_server_pb.DeleteCollectionRequest{
+ Collection: collectionName,
+ })
+ return deleteErr
+ })
+ if err != nil {
+ return err
+ }
+ }
+ ms.Topo.DeleteCollection(collectionName)
+
+ return nil
+}
+
+func (ms *MasterServer) doDeleteEcCollection(collectionName string) error {
+
+ listOfEcServers := ms.Topo.ListEcServersByCollection(collectionName)
+
+ for _, server := range listOfEcServers {
+ err := operation.WithVolumeServerClient(server, ms.grpcDialOption, func(client volume_server_pb.VolumeServerClient) error {
+ _, deleteErr := client.DeleteCollection(context.Background(), &volume_server_pb.DeleteCollectionRequest{
+ Collection: collectionName,
+ })
+ return deleteErr
+ })
+ if err != nil {
+ return err
+ }
+ }
+
+ ms.Topo.DeleteEcCollection(collectionName)
+
+ return nil
+}
diff --git a/weed/server/master_grpc_server_volume.go b/weed/server/master_grpc_server_volume.go
index ae0819d2d..282c75679 100644
--- a/weed/server/master_grpc_server_volume.go
+++ b/weed/server/master_grpc_server_volume.go
@@ -5,8 +5,11 @@ import (
"fmt"
"github.com/chrislusf/raft"
+
"github.com/chrislusf/seaweedfs/weed/pb/master_pb"
- "github.com/chrislusf/seaweedfs/weed/storage"
+ "github.com/chrislusf/seaweedfs/weed/security"
+ "github.com/chrislusf/seaweedfs/weed/storage/needle"
+ "github.com/chrislusf/seaweedfs/weed/storage/super_block"
"github.com/chrislusf/seaweedfs/weed/topology"
)
@@ -48,25 +51,26 @@ func (ms *MasterServer) Assign(ctx context.Context, req *master_pb.AssignRequest
}
if req.Replication == "" {
- req.Replication = ms.defaultReplicaPlacement
+ req.Replication = ms.option.DefaultReplicaPlacement
}
- replicaPlacement, err := storage.NewReplicaPlacementFromString(req.Replication)
+ replicaPlacement, err := super_block.NewReplicaPlacementFromString(req.Replication)
if err != nil {
return nil, err
}
- ttl, err := storage.ReadTTL(req.Ttl)
+ ttl, err := needle.ReadTTL(req.Ttl)
if err != nil {
return nil, err
}
option := &topology.VolumeGrowOption{
- Collection: req.Collection,
- ReplicaPlacement: replicaPlacement,
- Ttl: ttl,
- Prealloacte: ms.preallocate,
- DataCenter: req.DataCenter,
- Rack: req.Rack,
- DataNode: req.DataNode,
+ Collection: req.Collection,
+ ReplicaPlacement: replicaPlacement,
+ Ttl: ttl,
+ Prealloacte: ms.preallocateSize,
+ DataCenter: req.DataCenter,
+ Rack: req.Rack,
+ DataNode: req.DataNode,
+ MemoryMapMaxSizeMb: req.MemoryMapMaxSizeMb,
}
if !ms.Topo.HasWritableVolume(option) {
@@ -75,7 +79,7 @@ func (ms *MasterServer) Assign(ctx context.Context, req *master_pb.AssignRequest
}
ms.vgLock.Lock()
if !ms.Topo.HasWritableVolume(option) {
- if _, err = ms.vg.AutomaticGrowByType(option, ms.Topo); err != nil {
+ if _, err = ms.vg.AutomaticGrowByType(option, ms.grpcDialOption, ms.Topo, int(req.WritableVolumeCount)); err != nil {
ms.vgLock.Unlock()
return nil, fmt.Errorf("Cannot grow volume group! %v", err)
}
@@ -92,6 +96,7 @@ func (ms *MasterServer) Assign(ctx context.Context, req *master_pb.AssignRequest
Url: dn.Url(),
PublicUrl: dn.PublicUrl,
Count: count,
+ Auth: string(security.GenJwt(ms.guard.SigningKey, ms.guard.ExpiresAfterSec, fid)),
}, nil
}
@@ -102,13 +107,13 @@ func (ms *MasterServer) Statistics(ctx context.Context, req *master_pb.Statistic
}
if req.Replication == "" {
- req.Replication = ms.defaultReplicaPlacement
+ req.Replication = ms.option.DefaultReplicaPlacement
}
- replicaPlacement, err := storage.NewReplicaPlacementFromString(req.Replication)
+ replicaPlacement, err := super_block.NewReplicaPlacementFromString(req.Replication)
if err != nil {
return nil, err
}
- ttl, err := storage.ReadTTL(req.Ttl)
+ ttl, err := needle.ReadTTL(req.Ttl)
if err != nil {
return nil, err
}
@@ -116,11 +121,70 @@ func (ms *MasterServer) Statistics(ctx context.Context, req *master_pb.Statistic
volumeLayout := ms.Topo.GetVolumeLayout(req.Collection, replicaPlacement, ttl)
stats := volumeLayout.Stats()
+ totalSize := ms.Topo.GetMaxVolumeCount() * int64(ms.option.VolumeSizeLimitMB) * 1024 * 1024
+
resp := &master_pb.StatisticsResponse{
- TotalSize: stats.TotalSize,
+ TotalSize: uint64(totalSize),
UsedSize: stats.UsedSize,
FileCount: stats.FileCount,
}
return resp, nil
}
+
+func (ms *MasterServer) VolumeList(ctx context.Context, req *master_pb.VolumeListRequest) (*master_pb.VolumeListResponse, error) {
+
+ if !ms.Topo.IsLeader() {
+ return nil, raft.NotLeaderError
+ }
+
+ resp := &master_pb.VolumeListResponse{
+ TopologyInfo: ms.Topo.ToTopologyInfo(),
+ VolumeSizeLimitMb: uint64(ms.option.VolumeSizeLimitMB),
+ }
+
+ return resp, nil
+}
+
+func (ms *MasterServer) LookupEcVolume(ctx context.Context, req *master_pb.LookupEcVolumeRequest) (*master_pb.LookupEcVolumeResponse, error) {
+
+ if !ms.Topo.IsLeader() {
+ return nil, raft.NotLeaderError
+ }
+
+ resp := &master_pb.LookupEcVolumeResponse{}
+
+ ecLocations, found := ms.Topo.LookupEcShards(needle.VolumeId(req.VolumeId))
+
+ if !found {
+ return resp, fmt.Errorf("ec volume %d not found", req.VolumeId)
+ }
+
+ resp.VolumeId = req.VolumeId
+
+ for shardId, shardLocations := range ecLocations.Locations {
+ var locations []*master_pb.Location
+ for _, dn := range shardLocations {
+ locations = append(locations, &master_pb.Location{
+ Url: string(dn.Id()),
+ PublicUrl: dn.PublicUrl,
+ })
+ }
+ resp.ShardIdLocations = append(resp.ShardIdLocations, &master_pb.LookupEcVolumeResponse_EcShardIdLocation{
+ ShardId: uint32(shardId),
+ Locations: locations,
+ })
+ }
+
+ return resp, nil
+}
+
+func (ms *MasterServer) GetMasterConfiguration(ctx context.Context, req *master_pb.GetMasterConfigurationRequest) (*master_pb.GetMasterConfigurationResponse, error) {
+
+ resp := &master_pb.GetMasterConfigurationResponse{
+ MetricsAddress: ms.option.MetricsAddress,
+ MetricsIntervalSeconds: uint32(ms.option.MetricsIntervalSec),
+ }
+
+ return resp, nil
+}
diff --git a/weed/server/master_server.go b/weed/server/master_server.go
index f22925e56..9a490bb1f 100644
--- a/weed/server/master_server.go
+++ b/weed/server/master_server.go
@@ -5,27 +5,52 @@ import (
"net/http"
"net/http/httputil"
"net/url"
+ "os"
+ "regexp"
+ "strconv"
+ "strings"
"sync"
+ "time"
"github.com/chrislusf/raft"
+ "github.com/gorilla/mux"
+ "google.golang.org/grpc"
+
"github.com/chrislusf/seaweedfs/weed/glog"
"github.com/chrislusf/seaweedfs/weed/pb/master_pb"
"github.com/chrislusf/seaweedfs/weed/security"
"github.com/chrislusf/seaweedfs/weed/sequence"
+ "github.com/chrislusf/seaweedfs/weed/shell"
"github.com/chrislusf/seaweedfs/weed/topology"
"github.com/chrislusf/seaweedfs/weed/util"
- "github.com/gorilla/mux"
+ "github.com/chrislusf/seaweedfs/weed/wdclient"
)
+const (
+ SequencerType = "master.sequencer.type"
+ SequencerEtcdUrls = "master.sequencer.sequencer_etcd_urls"
+)
+
+type MasterOption struct {
+ Host string
+ Port int
+ MetaFolder string
+ VolumeSizeLimitMB uint
+ VolumePreallocate bool
+ // PulseSeconds int
+ DefaultReplicaPlacement string
+ GarbageThreshold float64
+ WhiteList []string
+ DisableHttp bool
+ MetricsAddress string
+ MetricsIntervalSec int
+}
+
type MasterServer struct {
- port int
- metaFolder string
- volumeSizeLimitMB uint
- preallocate int64
- pulseSeconds int
- defaultReplicaPlacement string
- garbageThreshold float64
- guard *security.Guard
+ option *MasterOption
+ guard *security.Guard
+
+ preallocateSize int64
Topo *topology.Topology
vg *topology.VolumeGrowth
@@ -36,56 +61,77 @@ type MasterServer struct {
// notifying clients
clientChansLock sync.RWMutex
clientChans map[string]chan *master_pb.VolumeLocation
+
+ grpcDialOption grpc.DialOption
+
+ MasterClient *wdclient.MasterClient
+
+ adminLocks *AdminLocks
}
-func NewMasterServer(r *mux.Router, port int, metaFolder string,
- volumeSizeLimitMB uint,
- preallocate bool,
- pulseSeconds int,
- defaultReplicaPlacement string,
- garbageThreshold float64,
- whiteList []string,
- secureKey string,
-) *MasterServer {
+func NewMasterServer(r *mux.Router, option *MasterOption, peers []string) *MasterServer {
+
+ v := util.GetViper()
+ signingKey := v.GetString("jwt.signing.key")
+ v.SetDefault("jwt.signing.expires_after_seconds", 10)
+ expiresAfterSec := v.GetInt("jwt.signing.expires_after_seconds")
+
+ readSigningKey := v.GetString("jwt.signing.read.key")
+ v.SetDefault("jwt.signing.read.expires_after_seconds", 60)
+ readExpiresAfterSec := v.GetInt("jwt.signing.read.expires_after_seconds")
+
+ v.SetDefault("master.replication.treat_replication_as_minimums", false)
+ replicationAsMin := v.GetBool("master.replication.treat_replication_as_minimums")
var preallocateSize int64
- if preallocate {
- preallocateSize = int64(volumeSizeLimitMB) * (1 << 20)
+ if option.VolumePreallocate {
+ preallocateSize = int64(option.VolumeSizeLimitMB) * (1 << 20)
}
+
+ grpcDialOption := security.LoadClientTLS(v, "grpc.master")
ms := &MasterServer{
- port: port,
- volumeSizeLimitMB: volumeSizeLimitMB,
- preallocate: preallocateSize,
- pulseSeconds: pulseSeconds,
- defaultReplicaPlacement: defaultReplicaPlacement,
- garbageThreshold: garbageThreshold,
- clientChans: make(map[string]chan *master_pb.VolumeLocation),
+ option: option,
+ preallocateSize: preallocateSize,
+ clientChans: make(map[string]chan *master_pb.VolumeLocation),
+ grpcDialOption: grpcDialOption,
+ MasterClient: wdclient.NewMasterClient(grpcDialOption, "master", option.Host, 0, peers),
+ adminLocks: NewAdminLocks(),
}
ms.bounedLeaderChan = make(chan int, 16)
- seq := sequence.NewMemorySequencer()
- ms.Topo = topology.NewTopology("topo", seq, uint64(volumeSizeLimitMB)*1024*1024, pulseSeconds)
+
+ seq := ms.createSequencer(option)
+ if nil == seq {
+ glog.Fatalf("create sequencer failed.")
+ }
+ ms.Topo = topology.NewTopology("topo", seq, uint64(ms.option.VolumeSizeLimitMB)*1024*1024, 5, replicationAsMin)
ms.vg = topology.NewDefaultVolumeGrowth()
- glog.V(0).Infoln("Volume Size Limit is", volumeSizeLimitMB, "MB")
-
- ms.guard = security.NewGuard(whiteList, secureKey)
-
- handleStaticResources2(r)
- r.HandleFunc("/", ms.uiStatusHandler)
- r.HandleFunc("/ui/index.html", ms.uiStatusHandler)
- r.HandleFunc("/dir/assign", ms.proxyToLeader(ms.guard.WhiteList(ms.dirAssignHandler)))
- r.HandleFunc("/dir/lookup", ms.proxyToLeader(ms.guard.WhiteList(ms.dirLookupHandler)))
- r.HandleFunc("/dir/status", ms.proxyToLeader(ms.guard.WhiteList(ms.dirStatusHandler)))
- r.HandleFunc("/col/delete", ms.proxyToLeader(ms.guard.WhiteList(ms.collectionDeleteHandler)))
- r.HandleFunc("/vol/grow", ms.proxyToLeader(ms.guard.WhiteList(ms.volumeGrowHandler)))
- r.HandleFunc("/vol/status", ms.proxyToLeader(ms.guard.WhiteList(ms.volumeStatusHandler)))
- r.HandleFunc("/vol/vacuum", ms.proxyToLeader(ms.guard.WhiteList(ms.volumeVacuumHandler)))
- r.HandleFunc("/submit", ms.guard.WhiteList(ms.submitFromMasterServerHandler))
- r.HandleFunc("/stats/health", ms.guard.WhiteList(statsHealthHandler))
- r.HandleFunc("/stats/counter", ms.guard.WhiteList(statsCounterHandler))
- r.HandleFunc("/stats/memory", ms.guard.WhiteList(statsMemoryHandler))
- r.HandleFunc("/{fileId}", ms.proxyToLeader(ms.redirectHandler))
-
- ms.Topo.StartRefreshWritableVolumes(garbageThreshold, ms.preallocate)
+ glog.V(0).Infoln("Volume Size Limit is", ms.option.VolumeSizeLimitMB, "MB")
+
+ ms.guard = security.NewGuard(ms.option.WhiteList, signingKey, expiresAfterSec, readSigningKey, readExpiresAfterSec)
+
+ if !ms.option.DisableHttp {
+ handleStaticResources2(r)
+ r.HandleFunc("/", ms.proxyToLeader(ms.uiStatusHandler))
+ r.HandleFunc("/ui/index.html", ms.uiStatusHandler)
+ r.HandleFunc("/dir/assign", ms.proxyToLeader(ms.guard.WhiteList(ms.dirAssignHandler)))
+ r.HandleFunc("/dir/lookup", ms.guard.WhiteList(ms.dirLookupHandler))
+ r.HandleFunc("/dir/status", ms.proxyToLeader(ms.guard.WhiteList(ms.dirStatusHandler)))
+ r.HandleFunc("/col/delete", ms.proxyToLeader(ms.guard.WhiteList(ms.collectionDeleteHandler)))
+ r.HandleFunc("/vol/grow", ms.proxyToLeader(ms.guard.WhiteList(ms.volumeGrowHandler)))
+ r.HandleFunc("/vol/status", ms.proxyToLeader(ms.guard.WhiteList(ms.volumeStatusHandler)))
+ r.HandleFunc("/vol/vacuum", ms.proxyToLeader(ms.guard.WhiteList(ms.volumeVacuumHandler)))
+ r.HandleFunc("/submit", ms.guard.WhiteList(ms.submitFromMasterServerHandler))
+ /*
+ r.HandleFunc("/stats/health", ms.guard.WhiteList(statsHealthHandler))
+ r.HandleFunc("/stats/counter", ms.guard.WhiteList(statsCounterHandler))
+ r.HandleFunc("/stats/memory", ms.guard.WhiteList(statsMemoryHandler))
+ */
+ r.HandleFunc("/{fileId}", ms.redirectHandler)
+ }
+
+ ms.Topo.StartRefreshWritableVolumes(ms.grpcDialOption, ms.option.GarbageThreshold, ms.preallocateSize)
+
+ ms.startAdminScripts()
return ms
}
@@ -98,6 +144,9 @@ func (ms *MasterServer) SetRaftServer(raftServer *RaftServer) {
glog.V(0).Infoln("[", ms.Topo.RaftServer.Name(), "]", ms.Topo.RaftServer.Leader(), "becomes leader.")
}
})
+ ms.Topo.RaftServer.AddEventListener(raft.StateChangeEventType, func(e raft.Event) {
+ glog.V(0).Infof("state change: %+v", e)
+ })
if ms.Topo.IsLeader() {
glog.V(0).Infoln("[", ms.Topo.RaftServer.Name(), "]", "I am the leader!")
} else {
@@ -107,7 +156,7 @@ func (ms *MasterServer) SetRaftServer(raftServer *RaftServer) {
}
}
-func (ms *MasterServer) proxyToLeader(f func(w http.ResponseWriter, r *http.Request)) func(w http.ResponseWriter, r *http.Request) {
+func (ms *MasterServer) proxyToLeader(f http.HandlerFunc) http.HandlerFunc {
return func(w http.ResponseWriter, r *http.Request) {
if ms.Topo.IsLeader() {
f(w, r)
@@ -133,8 +182,107 @@ func (ms *MasterServer) proxyToLeader(f func(w http.ResponseWriter, r *http.Requ
proxy.Transport = util.Transport
proxy.ServeHTTP(w, r)
} else {
- //drop it to the floor
- //writeJsonError(w, r, errors.New(ms.Topo.RaftServer.Name()+" does not know Leader yet:"+ms.Topo.RaftServer.Leader()))
+ // drop it to the floor
+ // writeJsonError(w, r, errors.New(ms.Topo.RaftServer.Name()+" does not know Leader yet:"+ms.Topo.RaftServer.Leader()))
+ }
+ }
+}
+
+func (ms *MasterServer) startAdminScripts() {
+ var err error
+
+ v := util.GetViper()
+ adminScripts := v.GetString("master.maintenance.scripts")
+ glog.V(0).Infof("adminScripts:\n%v", adminScripts)
+ if adminScripts == "" {
+ return
+ }
+
+ v.SetDefault("master.maintenance.sleep_minutes", 17)
+ sleepMinutes := v.GetInt("master.maintenance.sleep_minutes")
+
+ v.SetDefault("master.filer.default", "localhost:8888")
+ filerHostPort := v.GetString("master.filer.default")
+
+ scriptLines := strings.Split(adminScripts, "\n")
+ if !strings.Contains(adminScripts, "lock") {
+ scriptLines = append(append([]string{}, "lock"), scriptLines...)
+ scriptLines = append(scriptLines, "unlock")
+ }
+
+ masterAddress := "localhost:" + strconv.Itoa(ms.option.Port)
+
+ var shellOptions shell.ShellOptions
+ shellOptions.GrpcDialOption = security.LoadClientTLS(v, "grpc.master")
+ shellOptions.Masters = &masterAddress
+
+ shellOptions.FilerHost, shellOptions.FilerPort, err = util.ParseHostPort(filerHostPort)
+ shellOptions.Directory = "/"
+ if err != nil {
+ glog.V(0).Infof("failed to parse master.filer.default = %s : %v\n", filerHostPort, err)
+ return
+ }
+
+ commandEnv := shell.NewCommandEnv(shellOptions)
+
+ reg, _ := regexp.Compile(`'.*?'|".*?"|\S+`)
+
+ go commandEnv.MasterClient.KeepConnectedToMaster()
+
+ go func() {
+ commandEnv.MasterClient.WaitUntilConnected()
+
+ c := time.Tick(time.Duration(sleepMinutes) * time.Minute)
+ for range c {
+ if ms.Topo.IsLeader() {
+ for _, line := range scriptLines {
+ for _, c := range strings.Split(line, ";") {
+ processEachCmd(reg, c, commandEnv)
+ }
+ }
+ }
+ }
+ }()
+}
+
+func processEachCmd(reg *regexp.Regexp, line string, commandEnv *shell.CommandEnv) {
+ cmds := reg.FindAllString(line, -1)
+ if len(cmds) == 0 {
+ return
+ }
+ args := make([]string, len(cmds[1:]))
+ for i := range args {
+ args[i] = strings.Trim(string(cmds[1+i]), "\"'")
+ }
+ cmd := strings.ToLower(cmds[0])
+
+ for _, c := range shell.Commands {
+ if c.Name() == cmd {
+ glog.V(0).Infof("executing: %s %v", cmd, args)
+ if err := c.Do(args, commandEnv, os.Stdout); err != nil {
+ glog.V(0).Infof("error: %v", err)
+ }
+ }
+ }
+}
+
+func (ms *MasterServer) createSequencer(option *MasterOption) sequence.Sequencer {
+ var seq sequence.Sequencer
+ v := util.GetViper()
+ seqType := strings.ToLower(v.GetString(SequencerType))
+ glog.V(1).Infof("[%s] : [%s]", SequencerType, seqType)
+ switch strings.ToLower(seqType) {
+ case "etcd":
+ var err error
+ urls := v.GetString(SequencerEtcdUrls)
+ glog.V(0).Infof("[%s] : [%s]", SequencerEtcdUrls, urls)
+ seq, err = sequence.NewEtcdSequencer(urls, option.MetaFolder)
+ if err != nil {
+ glog.Error(err)
+ seq = nil
}
+ default:
+ seq = sequence.NewMemorySequencer()
}
+ return seq
}
diff --git a/weed/server/master_server_handlers.go b/weed/server/master_server_handlers.go
index a797dddfc..ebcb7efd2 100644
--- a/weed/server/master_server_handlers.go
+++ b/weed/server/master_server_handlers.go
@@ -7,8 +7,9 @@ import (
"strings"
"github.com/chrislusf/seaweedfs/weed/operation"
+ "github.com/chrislusf/seaweedfs/weed/security"
"github.com/chrislusf/seaweedfs/weed/stats"
- "github.com/chrislusf/seaweedfs/weed/storage"
+ "github.com/chrislusf/seaweedfs/weed/storage/needle"
)
func (ms *MasterServer) lookupVolumeId(vids []string, collection string) (volumeLocations map[string]operation.LookupResult) {
@@ -21,43 +22,77 @@ func (ms *MasterServer) lookupVolumeId(vids []string, collection string) (volume
if _, ok := volumeLocations[vid]; ok {
continue
}
- volumeId, err := storage.NewVolumeId(vid)
- if err == nil {
- machines := ms.Topo.Lookup(collection, volumeId)
- if machines != nil {
- var ret []operation.Location
- for _, dn := range machines {
- ret = append(ret, operation.Location{Url: dn.Url(), PublicUrl: dn.PublicUrl})
- }
- volumeLocations[vid] = operation.LookupResult{VolumeId: vid, Locations: ret}
- } else {
- volumeLocations[vid] = operation.LookupResult{VolumeId: vid, Error: fmt.Sprintf("volumeId %s not found.", vid)}
- }
- } else {
- volumeLocations[vid] = operation.LookupResult{VolumeId: vid, Error: fmt.Sprintf("Unknown volumeId format: %s", vid)}
- }
+ volumeLocations[vid] = ms.findVolumeLocation(collection, vid)
}
return
}
-// Takes one volumeId only, can not do batch lookup
+// If "fileId" is provided, this returns the fileId location and a JWT to update or delete the file.
+// If "volumeId" is provided, this only returns the volumeId location
func (ms *MasterServer) dirLookupHandler(w http.ResponseWriter, r *http.Request) {
vid := r.FormValue("volumeId")
- commaSep := strings.Index(vid, ",")
- if commaSep > 0 {
- vid = vid[0:commaSep]
+ if vid != "" {
+ // backward compatible
+ commaSep := strings.Index(vid, ",")
+ if commaSep > 0 {
+ vid = vid[0:commaSep]
+ }
}
- vids := []string{vid}
- collection := r.FormValue("collection") //optional, but can be faster if too many collections
- volumeLocations := ms.lookupVolumeId(vids, collection)
- location := volumeLocations[vid]
+ fileId := r.FormValue("fileId")
+ if fileId != "" {
+ commaSep := strings.Index(fileId, ",")
+ if commaSep > 0 {
+ vid = fileId[0:commaSep]
+ }
+ }
+ collection := r.FormValue("collection") // optional, but can be faster if too many collections
+ location := ms.findVolumeLocation(collection, vid)
httpStatus := http.StatusOK
- if location.Error != "" {
+ if location.Error != "" || location.Locations == nil {
httpStatus = http.StatusNotFound
+ } else {
+ forRead := r.FormValue("read")
+ isRead := forRead == "yes"
+ ms.maybeAddJwtAuthorization(w, fileId, !isRead)
}
writeJsonQuiet(w, r, httpStatus, location)
}
+// findVolumeLocation finds the volume location from master topo if it is leader,
+// or from master client if not leader
+func (ms *MasterServer) findVolumeLocation(collection, vid string) operation.LookupResult {
+ var locations []operation.Location
+ var err error
+ if ms.Topo.IsLeader() {
+ volumeId, newVolumeIdErr := needle.NewVolumeId(vid)
+ if newVolumeIdErr != nil {
+ err = fmt.Errorf("Unknown volume id %s", vid)
+ } else {
+ machines := ms.Topo.Lookup(collection, volumeId)
+ for _, loc := range machines {
+ locations = append(locations, operation.Location{Url: loc.Url(), PublicUrl: loc.PublicUrl})
+ }
+ }
+ } else {
+ machines, getVidLocationsErr := ms.MasterClient.GetVidLocations(vid)
+ for _, loc := range machines {
+ locations = append(locations, operation.Location{Url: loc.Url, PublicUrl: loc.PublicUrl})
+ }
+ err = getVidLocationsErr
+ }
+ if len(locations) == 0 && err == nil {
+ err = fmt.Errorf("volume id %s not found", vid)
+ }
+ ret := operation.LookupResult{
+ VolumeId: vid,
+ Locations: locations,
+ }
+ if err != nil {
+ ret.Error = err.Error()
+ }
+ return ret
+}
+
func (ms *MasterServer) dirAssignHandler(w http.ResponseWriter, r *http.Request) {
stats.AssignRequest()
requestedCount, e := strconv.ParseUint(r.FormValue("count"), 10, 64)
@@ -65,6 +100,11 @@ func (ms *MasterServer) dirAssignHandler(w http.ResponseWriter, r *http.Request)
requestedCount = 1
}
+ writableVolumeCount, e := strconv.Atoi(r.FormValue("writableVolumeCount"))
+ if e != nil {
+ writableVolumeCount = 0
+ }
+
option, err := ms.getVolumeGrowOption(r)
if err != nil {
writeJsonQuiet(w, r, http.StatusNotAcceptable, operation.AssignResult{Error: err.Error()})
@@ -79,7 +119,7 @@ func (ms *MasterServer) dirAssignHandler(w http.ResponseWriter, r *http.Request)
ms.vgLock.Lock()
defer ms.vgLock.Unlock()
if !ms.Topo.HasWritableVolume(option) {
- if _, err = ms.vg.AutomaticGrowByType(option, ms.Topo); err != nil {
+ if _, err = ms.vg.AutomaticGrowByType(option, ms.grpcDialOption, ms.Topo, writableVolumeCount); err != nil {
writeJsonError(w, r, http.StatusInternalServerError,
fmt.Errorf("Cannot grow volume group! %v", err))
return
@@ -88,8 +128,23 @@ func (ms *MasterServer) dirAssignHandler(w http.ResponseWriter, r *http.Request)
}
fid, count, dn, err := ms.Topo.PickForWrite(requestedCount, option)
if err == nil {
+ ms.maybeAddJwtAuthorization(w, fid, true)
writeJsonQuiet(w, r, http.StatusOK, operation.AssignResult{Fid: fid, Url: dn.Url(), PublicUrl: dn.PublicUrl, Count: count})
} else {
writeJsonQuiet(w, r, http.StatusNotAcceptable, operation.AssignResult{Error: err.Error()})
}
}
+
+func (ms *MasterServer) maybeAddJwtAuthorization(w http.ResponseWriter, fileId string, isWrite bool) {
+ var encodedJwt security.EncodedJwt
+ if isWrite {
+ encodedJwt = security.GenJwt(ms.guard.SigningKey, ms.guard.ExpiresAfterSec, fileId)
+ } else {
+ encodedJwt = security.GenJwt(ms.guard.ReadSigningKey, ms.guard.ReadExpiresAfterSec, fileId)
+ }
+ if encodedJwt == "" {
+ return
+ }
+
+ w.Header().Set("Authorization", "BEARER "+string(encodedJwt))
+}
diff --git a/weed/server/master_server_handlers_admin.go b/weed/server/master_server_handlers_admin.go
index 3a2662908..7595c0171 100644
--- a/weed/server/master_server_handlers_admin.go
+++ b/weed/server/master_server_handlers_admin.go
@@ -2,33 +2,31 @@ package weed_server
import (
"context"
- "errors"
"fmt"
"math/rand"
"net/http"
"strconv"
- "time"
"github.com/chrislusf/seaweedfs/weed/glog"
"github.com/chrislusf/seaweedfs/weed/operation"
"github.com/chrislusf/seaweedfs/weed/pb/volume_server_pb"
- "github.com/chrislusf/seaweedfs/weed/storage"
+ "github.com/chrislusf/seaweedfs/weed/storage/backend/memory_map"
+ "github.com/chrislusf/seaweedfs/weed/storage/needle"
+ "github.com/chrislusf/seaweedfs/weed/storage/super_block"
"github.com/chrislusf/seaweedfs/weed/topology"
"github.com/chrislusf/seaweedfs/weed/util"
)
func (ms *MasterServer) collectionDeleteHandler(w http.ResponseWriter, r *http.Request) {
- collection, ok := ms.Topo.FindCollection(r.FormValue("collection"))
+ collectionName := r.FormValue("collection")
+ collection, ok := ms.Topo.FindCollection(collectionName)
if !ok {
- writeJsonError(w, r, http.StatusBadRequest, fmt.Errorf("collection %s does not exist", r.FormValue("collection")))
+ writeJsonError(w, r, http.StatusBadRequest, fmt.Errorf("collection %s does not exist", collectionName))
return
}
for _, server := range collection.ListVolumeServers() {
- err := operation.WithVolumeServerClient(server.Url(), func(client volume_server_pb.VolumeServerClient) error {
- ctx, cancel := context.WithTimeout(context.Background(), time.Duration(5*time.Second))
- defer cancel()
-
- _, deleteErr := client.DeleteCollection(ctx, &volume_server_pb.DeleteCollectionRequest{
+ err := operation.WithVolumeServerClient(server.Url(), ms.grpcDialOption, func(client volume_server_pb.VolumeServerClient) error {
+ _, deleteErr := client.DeleteCollection(context.Background(), &volume_server_pb.DeleteCollectionRequest{
Collection: collection.Name,
})
return deleteErr
@@ -38,29 +36,33 @@ func (ms *MasterServer) collectionDeleteHandler(w http.ResponseWriter, r *http.R
return
}
}
- ms.Topo.DeleteCollection(r.FormValue("collection"))
+ ms.Topo.DeleteCollection(collectionName)
+
+ w.WriteHeader(http.StatusNoContent)
+ return
}
func (ms *MasterServer) dirStatusHandler(w http.ResponseWriter, r *http.Request) {
m := make(map[string]interface{})
- m["Version"] = util.VERSION
+ m["Version"] = util.Version()
m["Topology"] = ms.Topo.ToMap()
writeJsonQuiet(w, r, http.StatusOK, m)
}
func (ms *MasterServer) volumeVacuumHandler(w http.ResponseWriter, r *http.Request) {
gcString := r.FormValue("garbageThreshold")
- gcThreshold := ms.garbageThreshold
+ gcThreshold := ms.option.GarbageThreshold
if gcString != "" {
var err error
gcThreshold, err = strconv.ParseFloat(gcString, 32)
if err != nil {
glog.V(0).Infof("garbageThreshold %s is not a valid float number: %v", gcString, err)
+ writeJsonError(w, r, http.StatusNotAcceptable, fmt.Errorf("garbageThreshold %s is not a valid float number", gcString))
return
}
}
- glog.Infoln("garbageThreshold =", gcThreshold)
- ms.Topo.Vacuum(gcThreshold, ms.preallocate)
+ // glog.Infoln("garbageThreshold =", gcThreshold)
+ ms.Topo.Vacuum(ms.grpcDialOption, gcThreshold, ms.preallocateSize)
ms.dirStatusHandler(w, r)
}
@@ -71,17 +73,17 @@ func (ms *MasterServer) volumeGrowHandler(w http.ResponseWriter, r *http.Request
writeJsonError(w, r, http.StatusNotAcceptable, err)
return
}
- if err == nil {
- if count, err = strconv.Atoi(r.FormValue("count")); err == nil {
- if ms.Topo.FreeSpace() < count*option.ReplicaPlacement.GetCopyCount() {
- err = errors.New("Only " + strconv.Itoa(ms.Topo.FreeSpace()) + " volumes left! Not enough for " + strconv.Itoa(count*option.ReplicaPlacement.GetCopyCount()))
- } else {
- count, err = ms.vg.GrowByCountAndType(count, option, ms.Topo)
- }
+
+ if count, err = strconv.Atoi(r.FormValue("count")); err == nil {
+ if ms.Topo.FreeSpace() < int64(count*option.ReplicaPlacement.GetCopyCount()) {
+ err = fmt.Errorf("only %d volumes left, not enough for %d", ms.Topo.FreeSpace(), count*option.ReplicaPlacement.GetCopyCount())
} else {
- err = errors.New("parameter count is not found")
+ count, err = ms.vg.GrowByCountAndType(ms.grpcDialOption, count, option, ms.Topo)
}
+ } else {
+ err = fmt.Errorf("can not parse parameter count %s", r.FormValue("count"))
}
+
if err != nil {
writeJsonError(w, r, http.StatusNotAcceptable, err)
} else {
@@ -91,30 +93,26 @@ func (ms *MasterServer) volumeGrowHandler(w http.ResponseWriter, r *http.Request
func (ms *MasterServer) volumeStatusHandler(w http.ResponseWriter, r *http.Request) {
m := make(map[string]interface{})
- m["Version"] = util.VERSION
+ m["Version"] = util.Version()
m["Volumes"] = ms.Topo.ToVolumeMap()
writeJsonQuiet(w, r, http.StatusOK, m)
}
func (ms *MasterServer) redirectHandler(w http.ResponseWriter, r *http.Request) {
vid, _, _, _, _ := parseURLPath(r.URL.Path)
- volumeId, err := storage.NewVolumeId(vid)
- if err != nil {
- debug("parsing error:", err, r.URL.Path)
- return
- }
collection := r.FormValue("collection")
- machines := ms.Topo.Lookup(collection, volumeId)
- if machines != nil && len(machines) > 0 {
+ location := ms.findVolumeLocation(collection, vid)
+ if location.Error == "" {
+ loc := location.Locations[rand.Intn(len(location.Locations))]
var url string
if r.URL.RawQuery != "" {
- url = util.NormalizeUrl(machines[rand.Intn(len(machines))].PublicUrl) + r.URL.Path + "?" + r.URL.RawQuery
+ url = util.NormalizeUrl(loc.PublicUrl) + r.URL.Path + "?" + r.URL.RawQuery
} else {
- url = util.NormalizeUrl(machines[rand.Intn(len(machines))].PublicUrl) + r.URL.Path
+ url = util.NormalizeUrl(loc.PublicUrl) + r.URL.Path
}
http.Redirect(w, r, url, http.StatusMovedPermanently)
} else {
- writeJsonError(w, r, http.StatusNotFound, fmt.Errorf("volume id %d or collection %s not found", volumeId, collection))
+ writeJsonError(w, r, http.StatusNotFound, fmt.Errorf("volume id %s not found: %s", vid, location.Error))
}
}
@@ -122,17 +120,17 @@ func (ms *MasterServer) selfUrl(r *http.Request) string {
if r.Host != "" {
return r.Host
}
- return "localhost:" + strconv.Itoa(ms.port)
+ return "localhost:" + strconv.Itoa(ms.option.Port)
}
func (ms *MasterServer) submitFromMasterServerHandler(w http.ResponseWriter, r *http.Request) {
if ms.Topo.IsLeader() {
- submitForClientHandler(w, r, ms.selfUrl(r))
+ submitForClientHandler(w, r, ms.selfUrl(r), ms.grpcDialOption)
} else {
masterUrl, err := ms.Topo.Leader()
if err != nil {
writeJsonError(w, r, http.StatusInternalServerError, err)
} else {
- submitForClientHandler(w, r, masterUrl)
+ submitForClientHandler(w, r, masterUrl, ms.grpcDialOption)
}
}
}
@@ -145,17 +143,22 @@ func (ms *MasterServer) HasWritableVolume(option *topology.VolumeGrowOption) boo
func (ms *MasterServer) getVolumeGrowOption(r *http.Request) (*topology.VolumeGrowOption, error) {
replicationString := r.FormValue("replication")
if replicationString == "" {
- replicationString = ms.defaultReplicaPlacement
+ replicationString = ms.option.DefaultReplicaPlacement
+ }
+ replicaPlacement, err := super_block.NewReplicaPlacementFromString(replicationString)
+ if err != nil {
+ return nil, err
}
- replicaPlacement, err := storage.NewReplicaPlacementFromString(replicationString)
+ ttl, err := needle.ReadTTL(r.FormValue("ttl"))
if err != nil {
return nil, err
}
- ttl, err := storage.ReadTTL(r.FormValue("ttl"))
+ memoryMapMaxSizeMb, err := memory_map.ReadMemoryMapMaxSizeMb(r.FormValue("memoryMapMaxSizeMb"))
if err != nil {
return nil, err
}
- preallocate := ms.preallocate
+
+ preallocate := ms.preallocateSize
if r.FormValue("preallocate") != "" {
preallocate, err = strconv.ParseInt(r.FormValue("preallocate"), 10, 64)
if err != nil {
@@ -163,13 +166,14 @@ func (ms *MasterServer) getVolumeGrowOption(r *http.Request) (*topology.VolumeGr
}
}
volumeGrowOption := &topology.VolumeGrowOption{
- Collection: r.FormValue("collection"),
- ReplicaPlacement: replicaPlacement,
- Ttl: ttl,
- Prealloacte: preallocate,
- DataCenter: r.FormValue("dataCenter"),
- Rack: r.FormValue("rack"),
- DataNode: r.FormValue("dataNode"),
+ Collection: r.FormValue("collection"),
+ ReplicaPlacement: replicaPlacement,
+ Ttl: ttl,
+ Prealloacte: preallocate,
+ DataCenter: r.FormValue("dataCenter"),
+ Rack: r.FormValue("rack"),
+ DataNode: r.FormValue("dataNode"),
+ MemoryMapMaxSizeMb: memoryMapMaxSizeMb,
}
return volumeGrowOption, nil
}
diff --git a/weed/server/master_server_handlers_ui.go b/weed/server/master_server_handlers_ui.go
index f241df87f..9cd58158b 100644
--- a/weed/server/master_server_handlers_ui.go
+++ b/weed/server/master_server_handlers_ui.go
@@ -2,6 +2,7 @@ package weed_server
import (
"net/http"
+ "time"
"github.com/chrislusf/raft"
ui "github.com/chrislusf/seaweedfs/weed/server/master_ui"
@@ -11,7 +12,7 @@ import (
func (ms *MasterServer) uiStatusHandler(w http.ResponseWriter, r *http.Request) {
infos := make(map[string]interface{})
- infos["Version"] = util.VERSION
+ infos["Up Time"] = time.Now().Sub(startTime).String()
args := struct {
Version string
Topology interface{}
@@ -19,7 +20,7 @@ func (ms *MasterServer) uiStatusHandler(w http.ResponseWriter, r *http.Request)
Stats map[string]interface{}
Counters *stats.ServerStats
}{
- util.VERSION,
+ util.Version(),
ms.Topo.ToMap(),
ms.Topo.RaftServer,
infos,
diff --git a/weed/server/master_ui/templates.go b/weed/server/master_ui/templates.go
index f32e8e61b..7189064d0 100644
--- a/weed/server/master_ui/templates.go
+++ b/weed/server/master_ui/templates.go
@@ -41,7 +41,7 @@ var StatusTpl = template.Must(template.New("status").Parse(`<!DOCTYPE html>
<td class="col-sm-2 field-label"><label>Other Masters:</label></td>
<td class="col-sm-10"><ul class="list-unstyled">
{{ range $k, $p := .Peers }}
- <li><a href="{{ $p.ConnectionString }}">{{ $p.Name }}</a></li>
+ <li><a href="http://{{ $p.Name }}/ui/index.html">{{ $p.Name }}</a></li>
{{ end }}
</ul></td>
</tr>
@@ -76,6 +76,8 @@ var StatusTpl = template.Must(template.New("status").Parse(`<!DOCTYPE html>
<th>Rack</th>
<th>RemoteAddr</th>
<th>#Volumes</th>
+ <th>Volume Ids</th>
+ <th>#ErasureCodingShards</th>
<th>Max</th>
</tr>
</thead>
@@ -88,6 +90,8 @@ var StatusTpl = template.Must(template.New("status").Parse(`<!DOCTYPE html>
<td>{{ $rack.Id }}</td>
<td><a href="http://{{ $dn.Url }}/ui/index.html">{{ $dn.Url }}</a></td>
<td>{{ $dn.Volumes }}</td>
+ <td>{{ $dn.VolumeIds}}</td>
+ <td>{{ $dn.EcShards }}</td>
<td>{{ $dn.Max }}</td>
</tr>
{{ end }}
diff --git a/weed/server/raft_server.go b/weed/server/raft_server.go
index 2cc8252b8..958680d2b 100644
--- a/weed/server/raft_server.go
+++ b/weed/server/raft_server.go
@@ -3,36 +3,37 @@ package weed_server
import (
"encoding/json"
"io/ioutil"
- "math/rand"
"os"
"path"
"reflect"
"sort"
- "strings"
"time"
+ "google.golang.org/grpc"
+
+ "github.com/chrislusf/seaweedfs/weed/pb"
+
"github.com/chrislusf/raft"
+
"github.com/chrislusf/seaweedfs/weed/glog"
"github.com/chrislusf/seaweedfs/weed/topology"
- "github.com/gorilla/mux"
)
type RaftServer struct {
peers []string // initial peers to join with
raftServer raft.Server
dataDir string
- httpAddr string
- router *mux.Router
+ serverAddr string
topo *topology.Topology
+ *raft.GrpcServer
}
-func NewRaftServer(r *mux.Router, peers []string, httpAddr string, dataDir string, topo *topology.Topology, pulseSeconds int) *RaftServer {
+func NewRaftServer(grpcDialOption grpc.DialOption, peers []string, serverAddr, dataDir string, topo *topology.Topology, pulseSeconds int) *RaftServer {
s := &RaftServer{
- peers: peers,
- httpAddr: httpAddr,
- dataDir: dataDir,
- router: r,
- topo: topo,
+ peers: peers,
+ serverAddr: serverAddr,
+ dataDir: dataDir,
+ topo: topo,
}
if glog.V(4) {
@@ -42,41 +43,40 @@ func NewRaftServer(r *mux.Router, peers []string, httpAddr string, dataDir strin
raft.RegisterCommand(&topology.MaxVolumeIdCommand{})
var err error
- transporter := raft.NewHTTPTransporter("/cluster", time.Second)
- transporter.Transport.MaxIdleConnsPerHost = 1024
- glog.V(0).Infof("Starting RaftServer with %v", httpAddr)
+ transporter := raft.NewGrpcTransporter(grpcDialOption)
+ glog.V(0).Infof("Starting RaftServer with %v", serverAddr)
+ // always clear previous metadata
+ os.RemoveAll(path.Join(s.dataDir, "conf"))
+ os.RemoveAll(path.Join(s.dataDir, "log"))
+ os.RemoveAll(path.Join(s.dataDir, "snapshot"))
// Clear old cluster configurations if peers are changed
- if oldPeers, changed := isPeersChanged(s.dataDir, httpAddr, s.peers); changed {
+ if oldPeers, changed := isPeersChanged(s.dataDir, serverAddr, s.peers); changed {
glog.V(0).Infof("Peers Change: %v => %v", oldPeers, s.peers)
- os.RemoveAll(path.Join(s.dataDir, "conf"))
- os.RemoveAll(path.Join(s.dataDir, "log"))
- os.RemoveAll(path.Join(s.dataDir, "snapshot"))
}
- s.raftServer, err = raft.NewServer(s.httpAddr, s.dataDir, transporter, nil, topo, "")
+ s.raftServer, err = raft.NewServer(s.serverAddr, s.dataDir, transporter, nil, topo, "")
if err != nil {
glog.V(0).Infoln(err)
return nil
}
- transporter.Install(s.raftServer, s)
s.raftServer.SetHeartbeatInterval(500 * time.Millisecond)
s.raftServer.SetElectionTimeout(time.Duration(pulseSeconds) * 500 * time.Millisecond)
s.raftServer.Start()
- s.router.HandleFunc("/cluster/status", s.statusHandler).Methods("GET")
-
for _, peer := range s.peers {
- s.raftServer.AddPeer(peer, "http://"+peer)
+ s.raftServer.AddPeer(peer, pb.ServerToGrpcAddress(peer))
}
- time.Sleep(time.Duration(1000+rand.Int31n(3000)) * time.Millisecond)
- if s.raftServer.IsLogEmpty() {
+
+ s.GrpcServer = raft.NewGrpcServer(s.raftServer)
+
+ if s.raftServer.IsLogEmpty() && isTheFirstOne(serverAddr, s.peers) {
// Initialize the server by joining itself.
glog.V(0).Infoln("Initializing new cluster")
_, err := s.raftServer.Do(&raft.DefaultJoinCommand{
Name: s.raftServer.Name(),
- ConnectionString: "http://" + s.httpAddr,
+ ConnectionString: pb.ServerToGrpcAddress(s.serverAddr),
})
if err != nil {
@@ -94,7 +94,7 @@ func (s *RaftServer) Peers() (members []string) {
peers := s.raftServer.Peers()
for _, p := range peers {
- members = append(members, strings.TrimPrefix(p.ConnectionString, "http://"))
+ members = append(members, p.Name)
}
return
@@ -113,7 +113,7 @@ func isPeersChanged(dir string, self string, peers []string) (oldPeers []string,
}
for _, p := range conf.Peers {
- oldPeers = append(oldPeers, strings.TrimPrefix(p.ConnectionString, "http://"))
+ oldPeers = append(oldPeers, p.Name)
}
oldPeers = append(oldPeers, self)
@@ -127,3 +127,11 @@ func isPeersChanged(dir string, self string, peers []string) (oldPeers []string,
return oldPeers, !reflect.DeepEqual(peers, oldPeers)
}
+
+func isTheFirstOne(self string, peers []string) bool {
+ sort.Strings(peers)
+ if len(peers) <= 0 {
+ return true
+ }
+ return self == peers[0]
+}
diff --git a/weed/server/raft_server_handlers.go b/weed/server/raft_server_handlers.go
index 627fe354e..fd38cb977 100644
--- a/weed/server/raft_server_handlers.go
+++ b/weed/server/raft_server_handlers.go
@@ -1,16 +1,17 @@
package weed_server
import (
- "github.com/chrislusf/seaweedfs/weed/operation"
"net/http"
)
-func (s *RaftServer) HandleFunc(pattern string, handler func(http.ResponseWriter, *http.Request)) {
- s.router.HandleFunc(pattern, handler)
+type ClusterStatusResult struct {
+ IsLeader bool `json:"IsLeader,omitempty"`
+ Leader string `json:"Leader,omitempty"`
+ Peers []string `json:"Peers,omitempty"`
}
-func (s *RaftServer) statusHandler(w http.ResponseWriter, r *http.Request) {
- ret := operation.ClusterStatusResult{
+func (s *RaftServer) StatusHandler(w http.ResponseWriter, r *http.Request) {
+ ret := ClusterStatusResult{
IsLeader: s.topo.IsLeader(),
Peers: s.Peers(),
}
diff --git a/weed/server/volume_grpc_admin.go b/weed/server/volume_grpc_admin.go
index 429ca9b68..27b21ac09 100644
--- a/weed/server/volume_grpc_admin.go
+++ b/weed/server/volume_grpc_admin.go
@@ -2,10 +2,14 @@ package weed_server
import (
"context"
+ "fmt"
+ "path/filepath"
"github.com/chrislusf/seaweedfs/weed/glog"
"github.com/chrislusf/seaweedfs/weed/pb/volume_server_pb"
- "github.com/chrislusf/seaweedfs/weed/storage"
+ "github.com/chrislusf/seaweedfs/weed/stats"
+ "github.com/chrislusf/seaweedfs/weed/storage/needle"
+ "github.com/chrislusf/seaweedfs/weed/storage/super_block"
)
func (vs *VolumeServer) DeleteCollection(ctx context.Context, req *volume_server_pb.DeleteCollectionRequest) (*volume_server_pb.DeleteCollectionResponse, error) {
@@ -24,17 +28,18 @@ func (vs *VolumeServer) DeleteCollection(ctx context.Context, req *volume_server
}
-func (vs *VolumeServer) AssignVolume(ctx context.Context, req *volume_server_pb.AssignVolumeRequest) (*volume_server_pb.AssignVolumeResponse, error) {
+func (vs *VolumeServer) AllocateVolume(ctx context.Context, req *volume_server_pb.AllocateVolumeRequest) (*volume_server_pb.AllocateVolumeResponse, error) {
- resp := &volume_server_pb.AssignVolumeResponse{}
+ resp := &volume_server_pb.AllocateVolumeResponse{}
err := vs.store.AddVolume(
- storage.VolumeId(req.VolumdId),
+ needle.VolumeId(req.VolumeId),
req.Collection,
vs.needleMapKind,
req.Replication,
req.Ttl,
req.Preallocate,
+ req.MemoryMapMaxSizeMb,
)
if err != nil {
@@ -51,7 +56,7 @@ func (vs *VolumeServer) VolumeMount(ctx context.Context, req *volume_server_pb.V
resp := &volume_server_pb.VolumeMountResponse{}
- err := vs.store.MountVolume(storage.VolumeId(req.VolumdId))
+ err := vs.store.MountVolume(needle.VolumeId(req.VolumeId))
if err != nil {
glog.Errorf("volume mount %v: %v", req, err)
@@ -67,7 +72,7 @@ func (vs *VolumeServer) VolumeUnmount(ctx context.Context, req *volume_server_pb
resp := &volume_server_pb.VolumeUnmountResponse{}
- err := vs.store.UnmountVolume(storage.VolumeId(req.VolumdId))
+ err := vs.store.UnmountVolume(needle.VolumeId(req.VolumeId))
if err != nil {
glog.Errorf("volume unmount %v: %v", req, err)
@@ -83,7 +88,7 @@ func (vs *VolumeServer) VolumeDelete(ctx context.Context, req *volume_server_pb.
resp := &volume_server_pb.VolumeDeleteResponse{}
- err := vs.store.DeleteVolume(storage.VolumeId(req.VolumdId))
+ err := vs.store.DeleteVolume(needle.VolumeId(req.VolumeId))
if err != nil {
glog.Errorf("volume delete %v: %v", req, err)
@@ -94,3 +99,70 @@ func (vs *VolumeServer) VolumeDelete(ctx context.Context, req *volume_server_pb.
return resp, err
}
+
+func (vs *VolumeServer) VolumeConfigure(ctx context.Context, req *volume_server_pb.VolumeConfigureRequest) (*volume_server_pb.VolumeConfigureResponse, error) {
+
+ resp := &volume_server_pb.VolumeConfigureResponse{}
+
+ // check replication format
+ if _, err := super_block.NewReplicaPlacementFromString(req.Replication); err != nil {
+ resp.Error = fmt.Sprintf("volume configure replication %v: %v", req, err)
+ return resp, nil
+ }
+
+ // unmount
+ if err := vs.store.UnmountVolume(needle.VolumeId(req.VolumeId)); err != nil {
+ glog.Errorf("volume configure unmount %v: %v", req, err)
+ resp.Error = fmt.Sprintf("volume configure unmount %v: %v", req, err)
+ return resp, nil
+ }
+
+ // modify the volume info file
+ if err := vs.store.ConfigureVolume(needle.VolumeId(req.VolumeId), req.Replication); err != nil {
+ glog.Errorf("volume configure %v: %v", req, err)
+ resp.Error = fmt.Sprintf("volume configure %v: %v", req, err)
+ return resp, nil
+ }
+
+ // mount
+ if err := vs.store.MountVolume(needle.VolumeId(req.VolumeId)); err != nil {
+ glog.Errorf("volume configure mount %v: %v", req, err)
+ resp.Error = fmt.Sprintf("volume configure mount %v: %v", req, err)
+ return resp, nil
+ }
+
+ return resp, nil
+
+}
+
+func (vs *VolumeServer) VolumeMarkReadonly(ctx context.Context, req *volume_server_pb.VolumeMarkReadonlyRequest) (*volume_server_pb.VolumeMarkReadonlyResponse, error) {
+
+ resp := &volume_server_pb.VolumeMarkReadonlyResponse{}
+
+ err := vs.store.MarkVolumeReadonly(needle.VolumeId(req.VolumeId))
+
+ if err != nil {
+ glog.Errorf("volume mark readonly %v: %v", req, err)
+ } else {
+ glog.V(2).Infof("volume mark readonly %v", req)
+ }
+
+ return resp, err
+
+}
+
+func (vs *VolumeServer) VolumeServerStatus(ctx context.Context, req *volume_server_pb.VolumeServerStatusRequest) (*volume_server_pb.VolumeServerStatusResponse, error) {
+
+ resp := &volume_server_pb.VolumeServerStatusResponse{}
+
+ for _, loc := range vs.store.Locations {
+ if dir, e := filepath.Abs(loc.Directory); e == nil {
+ resp.DiskStatuses = append(resp.DiskStatuses, stats.NewDiskStatus(dir))
+ }
+ }
+
+ resp.MemoryStatus = stats.MemStat()
+
+ return resp, nil
+
+}
diff --git a/weed/server/volume_grpc_batch_delete.go b/weed/server/volume_grpc_batch_delete.go
index 3554d97ae..501964191 100644
--- a/weed/server/volume_grpc_batch_delete.go
+++ b/weed/server/volume_grpc_batch_delete.go
@@ -7,7 +7,8 @@ import (
"github.com/chrislusf/seaweedfs/weed/operation"
"github.com/chrislusf/seaweedfs/weed/pb/volume_server_pb"
- "github.com/chrislusf/seaweedfs/weed/storage"
+ "github.com/chrislusf/seaweedfs/weed/storage/needle"
+ "github.com/chrislusf/seaweedfs/weed/storage/types"
)
func (vs *VolumeServer) BatchDelete(ctx context.Context, req *volume_server_pb.BatchDeleteRequest) (*volume_server_pb.BatchDeleteResponse, error) {
@@ -26,18 +27,36 @@ func (vs *VolumeServer) BatchDelete(ctx context.Context, req *volume_server_pb.B
continue
}
- n := new(storage.Needle)
- volumeId, _ := storage.NewVolumeId(vid)
- n.ParsePath(id_cookie)
-
- cookie := n.Cookie
- if _, err := vs.store.ReadVolumeNeedle(volumeId, n); err != nil {
- resp.Results = append(resp.Results, &volume_server_pb.DeleteResult{
- FileId: fid,
- Status: http.StatusNotFound,
- Error: err.Error(),
- })
- continue
+ n := new(needle.Needle)
+ volumeId, _ := needle.NewVolumeId(vid)
+ if req.SkipCookieCheck {
+ n.Id, err = types.ParseNeedleId(id_cookie)
+ if err != nil {
+ resp.Results = append(resp.Results, &volume_server_pb.DeleteResult{
+ FileId: fid,
+ Status: http.StatusBadRequest,
+ Error: err.Error()})
+ continue
+ }
+ } else {
+ n.ParsePath(id_cookie)
+ cookie := n.Cookie
+ if _, err := vs.store.ReadVolumeNeedle(volumeId, n); err != nil {
+ resp.Results = append(resp.Results, &volume_server_pb.DeleteResult{
+ FileId: fid,
+ Status: http.StatusNotFound,
+ Error: err.Error(),
+ })
+ continue
+ }
+ if n.Cookie != cookie {
+ resp.Results = append(resp.Results, &volume_server_pb.DeleteResult{
+ FileId: fid,
+ Status: http.StatusBadRequest,
+ Error: "File Random Cookie does not match.",
+ })
+ break
+ }
}
if n.IsChunkedManifest() {
@@ -49,16 +68,8 @@ func (vs *VolumeServer) BatchDelete(ctx context.Context, req *volume_server_pb.B
continue
}
- if n.Cookie != cookie {
- resp.Results = append(resp.Results, &volume_server_pb.DeleteResult{
- FileId: fid,
- Status: http.StatusBadRequest,
- Error: "File Random Cookie does not match.",
- })
- break
- }
n.LastModified = now
- if size, err := vs.store.Delete(volumeId, n); err != nil {
+ if size, err := vs.store.DeleteVolumeNeedle(volumeId, n); err != nil {
resp.Results = append(resp.Results, &volume_server_pb.DeleteResult{
FileId: fid,
Status: http.StatusInternalServerError,
diff --git a/weed/server/volume_grpc_client_to_master.go b/weed/server/volume_grpc_client_to_master.go
index bd3ffd7b3..7cb836344 100644
--- a/weed/server/volume_grpc_client_to_master.go
+++ b/weed/server/volume_grpc_client_to_master.go
@@ -2,13 +2,21 @@ package weed_server
import (
"fmt"
+ "net"
"time"
+ "google.golang.org/grpc"
+
+ "github.com/chrislusf/seaweedfs/weed/pb"
+ "github.com/chrislusf/seaweedfs/weed/security"
+ "github.com/chrislusf/seaweedfs/weed/storage/backend"
+ "github.com/chrislusf/seaweedfs/weed/storage/erasure_coding"
+
+ "golang.org/x/net/context"
+
"github.com/chrislusf/seaweedfs/weed/glog"
"github.com/chrislusf/seaweedfs/weed/pb/master_pb"
- "github.com/chrislusf/seaweedfs/weed/security"
"github.com/chrislusf/seaweedfs/weed/util"
- "golang.org/x/net/context"
)
func (vs *VolumeServer) GetMaster() string {
@@ -16,34 +24,42 @@ func (vs *VolumeServer) GetMaster() string {
}
func (vs *VolumeServer) heartbeat() {
- glog.V(0).Infof("Volume server start with masters: %v", vs.MasterNodes)
+ glog.V(0).Infof("Volume server start with seed master nodes: %v", vs.SeedMasterNodes)
vs.store.SetDataCenter(vs.dataCenter)
vs.store.SetRack(vs.rack)
+ grpcDialOption := security.LoadClientTLS(util.GetViper(), "grpc.volume")
+
var err error
var newLeader string
for {
- for _, master := range vs.MasterNodes {
+ for _, master := range vs.SeedMasterNodes {
if newLeader != "" {
+ // the new leader may actually is the same master
+ // need to wait a bit before adding itself
+ time.Sleep(3 * time.Second)
master = newLeader
}
- masterGrpcAddress, parseErr := util.ParseServerToGrpcAddress(master, 0)
+ masterGrpcAddress, parseErr := pb.ParseServerToGrpcAddress(master)
if parseErr != nil {
- glog.V(0).Infof("failed to parse master grpc %v", masterGrpcAddress)
+ glog.V(0).Infof("failed to parse master grpc %v: %v", masterGrpcAddress, parseErr)
continue
}
- newLeader, err = vs.doHeartbeat(master, masterGrpcAddress, time.Duration(vs.pulseSeconds)*time.Second)
+ vs.store.MasterAddress = master
+ newLeader, err = vs.doHeartbeat(master, masterGrpcAddress, grpcDialOption, time.Duration(vs.pulseSeconds)*time.Second)
if err != nil {
glog.V(0).Infof("heartbeat error: %v", err)
time.Sleep(time.Duration(vs.pulseSeconds) * time.Second)
+ newLeader = ""
+ vs.store.MasterAddress = ""
}
}
}
}
-func (vs *VolumeServer) doHeartbeat(masterNode, masterGrpcAddress string, sleepInterval time.Duration) (newLeader string, err error) {
+func (vs *VolumeServer) doHeartbeat(masterNode, masterGrpcAddress string, grpcDialOption grpc.DialOption, sleepInterval time.Duration) (newLeader string, err error) {
- grpcConection, err := util.GrpcDial(masterGrpcAddress)
+ grpcConection, err := pb.GrpcDial(context.Background(), masterGrpcAddress, grpcDialOption)
if err != nil {
return "", fmt.Errorf("fail to dial %s : %v", masterNode, err)
}
@@ -58,9 +74,6 @@ func (vs *VolumeServer) doHeartbeat(masterNode, masterGrpcAddress string, sleepI
glog.V(0).Infof("Heartbeat to: %v", masterNode)
vs.currentMaster = masterNode
- vs.store.Client = stream
- defer func() { vs.store.Client = nil }()
-
doneChan := make(chan error, 1)
go func() {
@@ -70,18 +83,27 @@ func (vs *VolumeServer) doHeartbeat(masterNode, masterGrpcAddress string, sleepI
doneChan <- err
return
}
- if in.GetVolumeSizeLimit() != 0 {
- vs.store.VolumeSizeLimit = in.GetVolumeSizeLimit()
- }
- if in.GetSecretKey() != "" {
- vs.guard.SecretKey = security.Secret(in.GetSecretKey())
+ if in.GetVolumeSizeLimit() != 0 && vs.store.GetVolumeSizeLimit() != in.GetVolumeSizeLimit() {
+ vs.store.SetVolumeSizeLimit(in.GetVolumeSizeLimit())
+ if vs.store.MaybeAdjustVolumeMax() {
+ if err = stream.Send(vs.store.CollectHeartbeat()); err != nil {
+ glog.V(0).Infof("Volume Server Failed to talk with master %s: %v", masterNode, err)
+ }
+ }
}
- if in.GetLeader() != "" && masterNode != in.GetLeader() {
+ if in.GetLeader() != "" && masterNode != in.GetLeader() && !isSameIP(in.GetLeader(), masterNode) {
glog.V(0).Infof("Volume Server found a new master newLeader: %v instead of %v", in.GetLeader(), masterNode)
newLeader = in.GetLeader()
doneChan <- nil
return
}
+ if in.GetMetricsAddress() != "" && vs.MetricsAddress != in.GetMetricsAddress() {
+ vs.MetricsAddress = in.GetMetricsAddress()
+ vs.MetricsIntervalSec = int(in.GetMetricsIntervalSeconds())
+ }
+ if len(in.StorageBackends) > 0 {
+ backend.LoadFromPbStorageBackends(in.StorageBackends)
+ }
}
}()
@@ -90,33 +112,89 @@ func (vs *VolumeServer) doHeartbeat(masterNode, masterGrpcAddress string, sleepI
return "", err
}
- tickChan := time.Tick(sleepInterval)
+ if err = stream.Send(vs.store.CollectErasureCodingHeartbeat()); err != nil {
+ glog.V(0).Infof("Volume Server Failed to talk with master %s: %v", masterNode, err)
+ return "", err
+ }
+
+ volumeTickChan := time.Tick(sleepInterval)
+ ecShardTickChan := time.Tick(17 * sleepInterval)
for {
select {
- case vid := <-vs.store.NewVolumeIdChan:
+ case volumeMessage := <-vs.store.NewVolumesChan:
deltaBeat := &master_pb.Heartbeat{
- NewVids: []uint32{uint32(vid)},
+ NewVolumes: []*master_pb.VolumeShortInformationMessage{
+ &volumeMessage,
+ },
}
+ glog.V(1).Infof("volume server %s:%d adds volume %d", vs.store.Ip, vs.store.Port, volumeMessage.Id)
if err = stream.Send(deltaBeat); err != nil {
glog.V(0).Infof("Volume Server Failed to update to master %s: %v", masterNode, err)
return "", err
}
- case vid := <-vs.store.DeletedVolumeIdChan:
+ case ecShardMessage := <-vs.store.NewEcShardsChan:
deltaBeat := &master_pb.Heartbeat{
- DeletedVids: []uint32{uint32(vid)},
+ NewEcShards: []*master_pb.VolumeEcShardInformationMessage{
+ &ecShardMessage,
+ },
}
+ glog.V(1).Infof("volume server %s:%d adds ec shard %d:%d", vs.store.Ip, vs.store.Port, ecShardMessage.Id,
+ erasure_coding.ShardBits(ecShardMessage.EcIndexBits).ShardIds())
if err = stream.Send(deltaBeat); err != nil {
glog.V(0).Infof("Volume Server Failed to update to master %s: %v", masterNode, err)
return "", err
}
- case <-tickChan:
+ case volumeMessage := <-vs.store.DeletedVolumesChan:
+ deltaBeat := &master_pb.Heartbeat{
+ DeletedVolumes: []*master_pb.VolumeShortInformationMessage{
+ &volumeMessage,
+ },
+ }
+ glog.V(1).Infof("volume server %s:%d deletes volume %d", vs.store.Ip, vs.store.Port, volumeMessage.Id)
+ if err = stream.Send(deltaBeat); err != nil {
+ glog.V(0).Infof("Volume Server Failed to update to master %s: %v", masterNode, err)
+ return "", err
+ }
+ case ecShardMessage := <-vs.store.DeletedEcShardsChan:
+ deltaBeat := &master_pb.Heartbeat{
+ DeletedEcShards: []*master_pb.VolumeEcShardInformationMessage{
+ &ecShardMessage,
+ },
+ }
+ glog.V(1).Infof("volume server %s:%d deletes ec shard %d:%d", vs.store.Ip, vs.store.Port, ecShardMessage.Id,
+ erasure_coding.ShardBits(ecShardMessage.EcIndexBits).ShardIds())
+ if err = stream.Send(deltaBeat); err != nil {
+ glog.V(0).Infof("Volume Server Failed to update to master %s: %v", masterNode, err)
+ return "", err
+ }
+ case <-volumeTickChan:
+ glog.V(4).Infof("volume server %s:%d heartbeat", vs.store.Ip, vs.store.Port)
if err = stream.Send(vs.store.CollectHeartbeat()); err != nil {
glog.V(0).Infof("Volume Server Failed to talk with master %s: %v", masterNode, err)
return "", err
}
+ case <-ecShardTickChan:
+ glog.V(4).Infof("volume server %s:%d ec heartbeat", vs.store.Ip, vs.store.Port)
+ if err = stream.Send(vs.store.CollectErasureCodingHeartbeat()); err != nil {
+ glog.V(0).Infof("Volume Server Failed to talk with master %s: %v", masterNode, err)
+ return "", err
+ }
case err = <-doneChan:
return
}
}
}
+
+func isSameIP(ip string, host string) bool {
+ ips, err := net.LookupIP(host)
+ if err != nil {
+ return false
+ }
+ for _, t := range ips {
+ if ip == t.String() {
+ return true
+ }
+ }
+ return false
+}
diff --git a/weed/server/volume_grpc_copy.go b/weed/server/volume_grpc_copy.go
new file mode 100644
index 000000000..5c7d5572c
--- /dev/null
+++ b/weed/server/volume_grpc_copy.go
@@ -0,0 +1,286 @@
+package weed_server
+
+import (
+ "context"
+ "fmt"
+ "io"
+ "math"
+ "os"
+ "time"
+
+ "github.com/chrislusf/seaweedfs/weed/glog"
+ "github.com/chrislusf/seaweedfs/weed/operation"
+ "github.com/chrislusf/seaweedfs/weed/pb/volume_server_pb"
+ "github.com/chrislusf/seaweedfs/weed/storage"
+ "github.com/chrislusf/seaweedfs/weed/storage/erasure_coding"
+ "github.com/chrislusf/seaweedfs/weed/storage/needle"
+ "github.com/chrislusf/seaweedfs/weed/util"
+)
+
+const BufferSizeLimit = 1024 * 1024 * 2
+
+// VolumeCopy copy the .idx .dat .vif files, and mount the volume
+func (vs *VolumeServer) VolumeCopy(ctx context.Context, req *volume_server_pb.VolumeCopyRequest) (*volume_server_pb.VolumeCopyResponse, error) {
+
+ v := vs.store.GetVolume(needle.VolumeId(req.VolumeId))
+ if v != nil {
+
+ glog.V(0).Infof("volume %d already exists. deleted before copying...", req.VolumeId)
+
+ err := vs.store.UnmountVolume(needle.VolumeId(req.VolumeId))
+ if err != nil {
+ return nil, fmt.Errorf("failed to mount existing volume %d: %v", req.VolumeId, err)
+ }
+
+ err = vs.store.DeleteVolume(needle.VolumeId(req.VolumeId))
+ if err != nil {
+ return nil, fmt.Errorf("failed to delete existing volume %d: %v", req.VolumeId, err)
+ }
+
+ glog.V(0).Infof("deleted exisitng volume %d before copying.", req.VolumeId)
+ }
+
+ location := vs.store.FindFreeLocation()
+ if location == nil {
+ return nil, fmt.Errorf("no space left")
+ }
+
+ // the master will not start compaction for read-only volumes, so it is safe to just copy files directly
+ // copy .dat and .idx files
+ // read .idx .dat file size and timestamp
+ // send .idx file
+ // send .dat file
+ // confirm size and timestamp
+ var volFileInfoResp *volume_server_pb.ReadVolumeFileStatusResponse
+ var volumeFileName, idxFileName, datFileName string
+ err := operation.WithVolumeServerClient(req.SourceDataNode, vs.grpcDialOption, func(client volume_server_pb.VolumeServerClient) error {
+ var err error
+ volFileInfoResp, err = client.ReadVolumeFileStatus(context.Background(),
+ &volume_server_pb.ReadVolumeFileStatusRequest{
+ VolumeId: req.VolumeId,
+ })
+ if nil != err {
+ return fmt.Errorf("read volume file status failed, %v", err)
+ }
+
+ volumeFileName = storage.VolumeFileName(location.Directory, volFileInfoResp.Collection, int(req.VolumeId))
+
+ // println("source:", volFileInfoResp.String())
+ // copy ecx file
+ if err := vs.doCopyFile(client, false, req.Collection, req.VolumeId, volFileInfoResp.CompactionRevision, volFileInfoResp.IdxFileSize, volumeFileName, ".idx", false, false); err != nil {
+ return err
+ }
+
+ if err := vs.doCopyFile(client, false, req.Collection, req.VolumeId, volFileInfoResp.CompactionRevision, volFileInfoResp.DatFileSize, volumeFileName, ".dat", false, true); err != nil {
+ return err
+ }
+
+ if err := vs.doCopyFile(client, false, req.Collection, req.VolumeId, volFileInfoResp.CompactionRevision, volFileInfoResp.DatFileSize, volumeFileName, ".vif", false, true); err != nil {
+ return err
+ }
+
+ return nil
+ })
+
+ if err != nil {
+ return nil, err
+ }
+ if volumeFileName == "" {
+ return nil, fmt.Errorf("not found volume %d file", req.VolumeId)
+ }
+
+ idxFileName = volumeFileName + ".idx"
+ datFileName = volumeFileName + ".dat"
+
+ defer func() {
+ if err != nil && volumeFileName != "" {
+ os.Remove(idxFileName)
+ os.Remove(datFileName)
+ os.Remove(volumeFileName + ".vif")
+ }
+ }()
+
+ if err = checkCopyFiles(volFileInfoResp, idxFileName, datFileName); err != nil { // added by panyc16
+ return nil, err
+ }
+
+ // mount the volume
+ err = vs.store.MountVolume(needle.VolumeId(req.VolumeId))
+ if err != nil {
+ return nil, fmt.Errorf("failed to mount volume %d: %v", req.VolumeId, err)
+ }
+
+ return &volume_server_pb.VolumeCopyResponse{
+ LastAppendAtNs: volFileInfoResp.DatFileTimestampSeconds * uint64(time.Second),
+ }, err
+}
+
+func (vs *VolumeServer) doCopyFile(client volume_server_pb.VolumeServerClient, isEcVolume bool, collection string, vid, compactRevision uint32, stopOffset uint64, baseFileName, ext string, isAppend, ignoreSourceFileNotFound bool) error {
+
+ copyFileClient, err := client.CopyFile(context.Background(), &volume_server_pb.CopyFileRequest{
+ VolumeId: vid,
+ Ext: ext,
+ CompactionRevision: compactRevision,
+ StopOffset: stopOffset,
+ Collection: collection,
+ IsEcVolume: isEcVolume,
+ IgnoreSourceFileNotFound: ignoreSourceFileNotFound,
+ })
+ if err != nil {
+ return fmt.Errorf("failed to start copying volume %d %s file: %v", vid, ext, err)
+ }
+
+ err = writeToFile(copyFileClient, baseFileName+ext, util.NewWriteThrottler(vs.compactionBytePerSecond), isAppend)
+ if err != nil {
+ return fmt.Errorf("failed to copy %s file: %v", baseFileName+ext, err)
+ }
+
+ return nil
+
+}
+
+/**
+only check the the differ of the file size
+todo: maybe should check the received count and deleted count of the volume
+*/
+func checkCopyFiles(originFileInf *volume_server_pb.ReadVolumeFileStatusResponse, idxFileName, datFileName string) error {
+ stat, err := os.Stat(idxFileName)
+ if err != nil {
+ return fmt.Errorf("stat idx file %s failed, %v", idxFileName, err)
+ }
+ if originFileInf.IdxFileSize != uint64(stat.Size()) {
+ return fmt.Errorf("idx file %s size [%v] is not same as origin file size [%v]",
+ idxFileName, stat.Size(), originFileInf.IdxFileSize)
+ }
+
+ stat, err = os.Stat(datFileName)
+ if err != nil {
+ return fmt.Errorf("get dat file info failed, %v", err)
+ }
+ if originFileInf.DatFileSize != uint64(stat.Size()) {
+ return fmt.Errorf("the dat file size [%v] is not same as origin file size [%v]",
+ stat.Size(), originFileInf.DatFileSize)
+ }
+ return nil
+}
+
+func writeToFile(client volume_server_pb.VolumeServer_CopyFileClient, fileName string, wt *util.WriteThrottler, isAppend bool) error {
+ glog.V(4).Infof("writing to %s", fileName)
+ flags := os.O_WRONLY | os.O_CREATE | os.O_TRUNC
+ if isAppend {
+ flags = os.O_WRONLY | os.O_CREATE
+ }
+ dst, err := os.OpenFile(fileName, flags, 0644)
+ if err != nil {
+ return nil
+ }
+ defer dst.Close()
+
+ for {
+ resp, receiveErr := client.Recv()
+ if receiveErr == io.EOF {
+ break
+ }
+ if receiveErr != nil {
+ return fmt.Errorf("receiving %s: %v", fileName, receiveErr)
+ }
+ dst.Write(resp.FileContent)
+ wt.MaybeSlowdown(int64(len(resp.FileContent)))
+ }
+ return nil
+}
+
+func (vs *VolumeServer) ReadVolumeFileStatus(ctx context.Context, req *volume_server_pb.ReadVolumeFileStatusRequest) (*volume_server_pb.ReadVolumeFileStatusResponse, error) {
+ resp := &volume_server_pb.ReadVolumeFileStatusResponse{}
+ v := vs.store.GetVolume(needle.VolumeId(req.VolumeId))
+ if v == nil {
+ return nil, fmt.Errorf("not found volume id %d", req.VolumeId)
+ }
+
+ resp.VolumeId = req.VolumeId
+ datSize, idxSize, modTime := v.FileStat()
+ resp.DatFileSize = datSize
+ resp.IdxFileSize = idxSize
+ resp.DatFileTimestampSeconds = uint64(modTime.Unix())
+ resp.IdxFileTimestampSeconds = uint64(modTime.Unix())
+ resp.FileCount = v.FileCount()
+ resp.CompactionRevision = uint32(v.CompactionRevision)
+ resp.Collection = v.Collection
+ return resp, nil
+}
+
+// CopyFile client pulls the volume related file from the source server.
+// if req.CompactionRevision != math.MaxUint32, it ensures the compact revision is as expected
+// The copying still stop at req.StopOffset, but you can set it to math.MaxUint64 in order to read all data.
+func (vs *VolumeServer) CopyFile(req *volume_server_pb.CopyFileRequest, stream volume_server_pb.VolumeServer_CopyFileServer) error {
+
+ var fileName string
+ if !req.IsEcVolume {
+ v := vs.store.GetVolume(needle.VolumeId(req.VolumeId))
+ if v == nil {
+ return fmt.Errorf("not found volume id %d", req.VolumeId)
+ }
+
+ if uint32(v.CompactionRevision) != req.CompactionRevision && req.CompactionRevision != math.MaxUint32 {
+ return fmt.Errorf("volume %d is compacted", req.VolumeId)
+ }
+ fileName = v.FileName() + req.Ext
+ } else {
+ baseFileName := erasure_coding.EcShardBaseFileName(req.Collection, int(req.VolumeId)) + req.Ext
+ for _, location := range vs.store.Locations {
+ tName := util.Join(location.Directory, baseFileName)
+ if util.FileExists(tName) {
+ fileName = tName
+ }
+ }
+ if fileName == "" {
+ if req.IgnoreSourceFileNotFound {
+ return nil
+ }
+ return fmt.Errorf("CopyFile not found ec volume id %d", req.VolumeId)
+ }
+ }
+
+ bytesToRead := int64(req.StopOffset)
+
+ file, err := os.Open(fileName)
+ if err != nil {
+ if req.IgnoreSourceFileNotFound && err == os.ErrNotExist {
+ return nil
+ }
+ return err
+ }
+ defer file.Close()
+
+ buffer := make([]byte, BufferSizeLimit)
+
+ for bytesToRead > 0 {
+ bytesread, err := file.Read(buffer)
+
+ // println(fileName, "read", bytesread, "bytes, with target", bytesToRead)
+
+ if err != nil {
+ if err != io.EOF {
+ return err
+ }
+ // println(fileName, "read", bytesread, "bytes, with target", bytesToRead, "err", err.Error())
+ break
+ }
+
+ if int64(bytesread) > bytesToRead {
+ bytesread = int(bytesToRead)
+ }
+ err = stream.Send(&volume_server_pb.CopyFileResponse{
+ FileContent: buffer[:bytesread],
+ })
+ if err != nil {
+ // println("sending", bytesread, "bytes err", err.Error())
+ return err
+ }
+
+ bytesToRead -= int64(bytesread)
+
+ }
+
+ return nil
+}
diff --git a/weed/server/volume_grpc_copy_incremental.go b/weed/server/volume_grpc_copy_incremental.go
new file mode 100644
index 000000000..6d6c3daa3
--- /dev/null
+++ b/weed/server/volume_grpc_copy_incremental.go
@@ -0,0 +1,66 @@
+package weed_server
+
+import (
+ "context"
+ "fmt"
+ "io"
+
+ "github.com/chrislusf/seaweedfs/weed/pb/volume_server_pb"
+ "github.com/chrislusf/seaweedfs/weed/storage/backend"
+ "github.com/chrislusf/seaweedfs/weed/storage/needle"
+)
+
+func (vs *VolumeServer) VolumeIncrementalCopy(req *volume_server_pb.VolumeIncrementalCopyRequest, stream volume_server_pb.VolumeServer_VolumeIncrementalCopyServer) error {
+
+ v := vs.store.GetVolume(needle.VolumeId(req.VolumeId))
+ if v == nil {
+ return fmt.Errorf("not found volume id %d", req.VolumeId)
+ }
+
+ stopOffset, _, _ := v.FileStat()
+ foundOffset, isLastOne, err := v.BinarySearchByAppendAtNs(req.SinceNs)
+ if err != nil {
+ return fmt.Errorf("fail to locate by appendAtNs %d: %s", req.SinceNs, err)
+ }
+
+ if isLastOne {
+ return nil
+ }
+
+ startOffset := foundOffset.ToAcutalOffset()
+
+ buf := make([]byte, 1024*1024*2)
+ return sendFileContent(v.DataBackend, buf, startOffset, int64(stopOffset), stream)
+
+}
+
+func (vs *VolumeServer) VolumeSyncStatus(ctx context.Context, req *volume_server_pb.VolumeSyncStatusRequest) (*volume_server_pb.VolumeSyncStatusResponse, error) {
+
+ v := vs.store.GetVolume(needle.VolumeId(req.VolumeId))
+ if v == nil {
+ return nil, fmt.Errorf("not found volume id %d", req.VolumeId)
+ }
+
+ resp := v.GetVolumeSyncStatus()
+
+ return resp, nil
+
+}
+
+func sendFileContent(datBackend backend.BackendStorageFile, buf []byte, startOffset, stopOffset int64, stream volume_server_pb.VolumeServer_VolumeIncrementalCopyServer) error {
+ var blockSizeLimit = int64(len(buf))
+ for i := int64(0); i < stopOffset-startOffset; i += blockSizeLimit {
+ n, readErr := datBackend.ReadAt(buf, startOffset+i)
+ if readErr == nil || readErr == io.EOF {
+ resp := &volume_server_pb.VolumeIncrementalCopyResponse{}
+ resp.FileContent = buf[:int64(n)]
+ sendErr := stream.Send(resp)
+ if sendErr != nil {
+ return sendErr
+ }
+ } else {
+ return readErr
+ }
+ }
+ return nil
+}
diff --git a/weed/server/volume_grpc_erasure_coding.go b/weed/server/volume_grpc_erasure_coding.go
new file mode 100644
index 000000000..79348c9d7
--- /dev/null
+++ b/weed/server/volume_grpc_erasure_coding.go
@@ -0,0 +1,391 @@
+package weed_server
+
+import (
+ "context"
+ "fmt"
+ "io"
+ "io/ioutil"
+ "math"
+ "os"
+ "path"
+ "path/filepath"
+ "strings"
+
+ "github.com/chrislusf/seaweedfs/weed/glog"
+ "github.com/chrislusf/seaweedfs/weed/operation"
+ "github.com/chrislusf/seaweedfs/weed/pb"
+ "github.com/chrislusf/seaweedfs/weed/pb/volume_server_pb"
+ "github.com/chrislusf/seaweedfs/weed/storage"
+ "github.com/chrislusf/seaweedfs/weed/storage/erasure_coding"
+ "github.com/chrislusf/seaweedfs/weed/storage/needle"
+ "github.com/chrislusf/seaweedfs/weed/storage/types"
+ "github.com/chrislusf/seaweedfs/weed/util"
+)
+
+/*
+
+Steps to apply erasure coding to .dat .idx files
+0. ensure the volume is readonly
+1. client call VolumeEcShardsGenerate to generate the .ecx and .ec00 ~ .ec13 files
+2. client ask master for possible servers to hold the ec files, at least 4 servers
+3. client call VolumeEcShardsCopy on above target servers to copy ec files from the source server
+4. target servers report the new ec files to the master
+5. master stores vid -> [14]*DataNode
+6. client checks master. If all 14 slices are ready, delete the original .idx, .idx files
+
+*/
+
+// VolumeEcShardsGenerate generates the .ecx and .ec00 ~ .ec13 files
+func (vs *VolumeServer) VolumeEcShardsGenerate(ctx context.Context, req *volume_server_pb.VolumeEcShardsGenerateRequest) (*volume_server_pb.VolumeEcShardsGenerateResponse, error) {
+
+ glog.V(0).Infof("VolumeEcShardsGenerate: %v", req)
+
+ v := vs.store.GetVolume(needle.VolumeId(req.VolumeId))
+ if v == nil {
+ return nil, fmt.Errorf("volume %d not found", req.VolumeId)
+ }
+ baseFileName := v.FileName()
+
+ if v.Collection != req.Collection {
+ return nil, fmt.Errorf("existing collection:%v unexpected input: %v", v.Collection, req.Collection)
+ }
+
+ // write .ec00 ~ .ec13 files
+ if err := erasure_coding.WriteEcFiles(baseFileName); err != nil {
+ return nil, fmt.Errorf("WriteEcFiles %s: %v", baseFileName, err)
+ }
+
+ // write .ecx file
+ if err := erasure_coding.WriteSortedFileFromIdx(baseFileName, ".ecx"); err != nil {
+ return nil, fmt.Errorf("WriteSortedFileFromIdx %s: %v", baseFileName, err)
+ }
+
+ // write .vif files
+ if err := pb.SaveVolumeInfo(baseFileName+".vif", &volume_server_pb.VolumeInfo{Version: uint32(v.Version())}); err != nil {
+ return nil, fmt.Errorf("WriteEcFiles %s: %v", baseFileName, err)
+ }
+
+ return &volume_server_pb.VolumeEcShardsGenerateResponse{}, nil
+}
+
+// VolumeEcShardsRebuild generates the any of the missing .ec00 ~ .ec13 files
+func (vs *VolumeServer) VolumeEcShardsRebuild(ctx context.Context, req *volume_server_pb.VolumeEcShardsRebuildRequest) (*volume_server_pb.VolumeEcShardsRebuildResponse, error) {
+
+ glog.V(0).Infof("VolumeEcShardsRebuild: %v", req)
+
+ baseFileName := erasure_coding.EcShardBaseFileName(req.Collection, int(req.VolumeId))
+
+ var rebuiltShardIds []uint32
+
+ for _, location := range vs.store.Locations {
+ if util.FileExists(path.Join(location.Directory, baseFileName+".ecx")) {
+ // write .ec00 ~ .ec13 files
+ baseFileName = path.Join(location.Directory, baseFileName)
+ if generatedShardIds, err := erasure_coding.RebuildEcFiles(baseFileName); err != nil {
+ return nil, fmt.Errorf("RebuildEcFiles %s: %v", baseFileName, err)
+ } else {
+ rebuiltShardIds = generatedShardIds
+ }
+
+ if err := erasure_coding.RebuildEcxFile(baseFileName); err != nil {
+ return nil, fmt.Errorf("RebuildEcxFile %s: %v", baseFileName, err)
+ }
+
+ break
+ }
+ }
+
+ return &volume_server_pb.VolumeEcShardsRebuildResponse{
+ RebuiltShardIds: rebuiltShardIds,
+ }, nil
+}
+
+// VolumeEcShardsCopy copy the .ecx and some ec data slices
+func (vs *VolumeServer) VolumeEcShardsCopy(ctx context.Context, req *volume_server_pb.VolumeEcShardsCopyRequest) (*volume_server_pb.VolumeEcShardsCopyResponse, error) {
+
+ glog.V(0).Infof("VolumeEcShardsCopy: %v", req)
+
+ location := vs.store.FindFreeLocation()
+ if location == nil {
+ return nil, fmt.Errorf("no space left")
+ }
+
+ baseFileName := storage.VolumeFileName(location.Directory, req.Collection, int(req.VolumeId))
+
+ err := operation.WithVolumeServerClient(req.SourceDataNode, vs.grpcDialOption, func(client volume_server_pb.VolumeServerClient) error {
+
+ // copy ec data slices
+ for _, shardId := range req.ShardIds {
+ if err := vs.doCopyFile(client, true, req.Collection, req.VolumeId, math.MaxUint32, math.MaxInt64, baseFileName, erasure_coding.ToExt(int(shardId)), false, false); err != nil {
+ return err
+ }
+ }
+
+ if req.CopyEcxFile {
+
+ // copy ecx file
+ if err := vs.doCopyFile(client, true, req.Collection, req.VolumeId, math.MaxUint32, math.MaxInt64, baseFileName, ".ecx", false, false); err != nil {
+ return err
+ }
+ return nil
+ }
+
+ if req.CopyEcjFile {
+ // copy ecj file
+ if err := vs.doCopyFile(client, true, req.Collection, req.VolumeId, math.MaxUint32, math.MaxInt64, baseFileName, ".ecj", true, true); err != nil {
+ return err
+ }
+ }
+
+ if req.CopyVifFile {
+ // copy vif file
+ if err := vs.doCopyFile(client, true, req.Collection, req.VolumeId, math.MaxUint32, math.MaxInt64, baseFileName, ".vif", false, true); err != nil {
+ return err
+ }
+ }
+
+ return nil
+ })
+ if err != nil {
+ return nil, fmt.Errorf("VolumeEcShardsCopy volume %d: %v", req.VolumeId, err)
+ }
+
+ return &volume_server_pb.VolumeEcShardsCopyResponse{}, nil
+}
+
+// VolumeEcShardsDelete local delete the .ecx and some ec data slices if not needed
+// the shard should not be mounted before calling this.
+func (vs *VolumeServer) VolumeEcShardsDelete(ctx context.Context, req *volume_server_pb.VolumeEcShardsDeleteRequest) (*volume_server_pb.VolumeEcShardsDeleteResponse, error) {
+
+ baseFilename := erasure_coding.EcShardBaseFileName(req.Collection, int(req.VolumeId))
+
+ glog.V(0).Infof("ec volume %d shard delete %v", req.VolumeId, req.ShardIds)
+
+ found := false
+ for _, location := range vs.store.Locations {
+ if util.FileExists(path.Join(location.Directory, baseFilename+".ecx")) {
+ found = true
+ baseFilename = path.Join(location.Directory, baseFilename)
+ for _, shardId := range req.ShardIds {
+ os.Remove(baseFilename + erasure_coding.ToExt(int(shardId)))
+ }
+ break
+ }
+ }
+
+ if !found {
+ return nil, nil
+ }
+
+ // check whether to delete the .ecx and .ecj file also
+ hasEcxFile := false
+ hasIdxFile := false
+ existingShardCount := 0
+
+ bName := filepath.Base(baseFilename)
+ for _, location := range vs.store.Locations {
+ fileInfos, err := ioutil.ReadDir(location.Directory)
+ if err != nil {
+ continue
+ }
+ for _, fileInfo := range fileInfos {
+ if fileInfo.Name() == bName+".ecx" || fileInfo.Name() == bName+".ecj" {
+ hasEcxFile = true
+ continue
+ }
+ if fileInfo.Name() == bName+".idx" {
+ hasIdxFile = true
+ continue
+ }
+ if strings.HasPrefix(fileInfo.Name(), bName+".ec") {
+ existingShardCount++
+ }
+ }
+ }
+
+ if hasEcxFile && existingShardCount == 0 {
+ if err := os.Remove(baseFilename + ".ecx"); err != nil {
+ return nil, err
+ }
+ os.Remove(baseFilename + ".ecj")
+ }
+ if !hasIdxFile {
+ // .vif is used for ec volumes and normal volumes
+ os.Remove(baseFilename + ".vif")
+ }
+
+ return &volume_server_pb.VolumeEcShardsDeleteResponse{}, nil
+}
+
+func (vs *VolumeServer) VolumeEcShardsMount(ctx context.Context, req *volume_server_pb.VolumeEcShardsMountRequest) (*volume_server_pb.VolumeEcShardsMountResponse, error) {
+
+ glog.V(0).Infof("VolumeEcShardsMount: %v", req)
+
+ for _, shardId := range req.ShardIds {
+ err := vs.store.MountEcShards(req.Collection, needle.VolumeId(req.VolumeId), erasure_coding.ShardId(shardId))
+
+ if err != nil {
+ glog.Errorf("ec shard mount %v: %v", req, err)
+ } else {
+ glog.V(2).Infof("ec shard mount %v", req)
+ }
+
+ if err != nil {
+ return nil, fmt.Errorf("mount %d.%d: %v", req.VolumeId, shardId, err)
+ }
+ }
+
+ return &volume_server_pb.VolumeEcShardsMountResponse{}, nil
+}
+
+func (vs *VolumeServer) VolumeEcShardsUnmount(ctx context.Context, req *volume_server_pb.VolumeEcShardsUnmountRequest) (*volume_server_pb.VolumeEcShardsUnmountResponse, error) {
+
+ glog.V(0).Infof("VolumeEcShardsUnmount: %v", req)
+
+ for _, shardId := range req.ShardIds {
+ err := vs.store.UnmountEcShards(needle.VolumeId(req.VolumeId), erasure_coding.ShardId(shardId))
+
+ if err != nil {
+ glog.Errorf("ec shard unmount %v: %v", req, err)
+ } else {
+ glog.V(2).Infof("ec shard unmount %v", req)
+ }
+
+ if err != nil {
+ return nil, fmt.Errorf("unmount %d.%d: %v", req.VolumeId, shardId, err)
+ }
+ }
+
+ return &volume_server_pb.VolumeEcShardsUnmountResponse{}, nil
+}
+
+func (vs *VolumeServer) VolumeEcShardRead(req *volume_server_pb.VolumeEcShardReadRequest, stream volume_server_pb.VolumeServer_VolumeEcShardReadServer) error {
+
+ ecVolume, found := vs.store.FindEcVolume(needle.VolumeId(req.VolumeId))
+ if !found {
+ return fmt.Errorf("VolumeEcShardRead not found ec volume id %d", req.VolumeId)
+ }
+ ecShard, found := ecVolume.FindEcVolumeShard(erasure_coding.ShardId(req.ShardId))
+ if !found {
+ return fmt.Errorf("not found ec shard %d.%d", req.VolumeId, req.ShardId)
+ }
+
+ if req.FileKey != 0 {
+ _, size, _ := ecVolume.FindNeedleFromEcx(types.Uint64ToNeedleId(req.FileKey))
+ if size == types.TombstoneFileSize {
+ return stream.Send(&volume_server_pb.VolumeEcShardReadResponse{
+ IsDeleted: true,
+ })
+ }
+ }
+
+ bufSize := req.Size
+ if bufSize > BufferSizeLimit {
+ bufSize = BufferSizeLimit
+ }
+ buffer := make([]byte, bufSize)
+
+ startOffset, bytesToRead := req.Offset, req.Size
+
+ for bytesToRead > 0 {
+ // min of bytesToRead and bufSize
+ bufferSize := bufSize
+ if bufferSize > bytesToRead {
+ bufferSize = bytesToRead
+ }
+ bytesread, err := ecShard.ReadAt(buffer[0:bufferSize], startOffset)
+
+ // println("read", ecShard.FileName(), "startOffset", startOffset, bytesread, "bytes, with target", bufferSize)
+ if bytesread > 0 {
+
+ if int64(bytesread) > bytesToRead {
+ bytesread = int(bytesToRead)
+ }
+ err = stream.Send(&volume_server_pb.VolumeEcShardReadResponse{
+ Data: buffer[:bytesread],
+ })
+ if err != nil {
+ // println("sending", bytesread, "bytes err", err.Error())
+ return err
+ }
+
+ startOffset += int64(bytesread)
+ bytesToRead -= int64(bytesread)
+
+ }
+
+ if err != nil {
+ if err != io.EOF {
+ return err
+ }
+ return nil
+ }
+
+ }
+
+ return nil
+
+}
+
+func (vs *VolumeServer) VolumeEcBlobDelete(ctx context.Context, req *volume_server_pb.VolumeEcBlobDeleteRequest) (*volume_server_pb.VolumeEcBlobDeleteResponse, error) {
+
+ glog.V(0).Infof("VolumeEcBlobDelete: %v", req)
+
+ resp := &volume_server_pb.VolumeEcBlobDeleteResponse{}
+
+ for _, location := range vs.store.Locations {
+ if localEcVolume, found := location.FindEcVolume(needle.VolumeId(req.VolumeId)); found {
+
+ _, size, _, err := localEcVolume.LocateEcShardNeedle(types.NeedleId(req.FileKey), needle.Version(req.Version))
+ if err != nil {
+ return nil, fmt.Errorf("locate in local ec volume: %v", err)
+ }
+ if size == types.TombstoneFileSize {
+ return resp, nil
+ }
+
+ err = localEcVolume.DeleteNeedleFromEcx(types.NeedleId(req.FileKey))
+ if err != nil {
+ return nil, err
+ }
+
+ break
+ }
+ }
+
+ return resp, nil
+}
+
+// VolumeEcShardsToVolume generates the .idx, .dat files from .ecx, .ecj and .ec01 ~ .ec14 files
+func (vs *VolumeServer) VolumeEcShardsToVolume(ctx context.Context, req *volume_server_pb.VolumeEcShardsToVolumeRequest) (*volume_server_pb.VolumeEcShardsToVolumeResponse, error) {
+
+ glog.V(0).Infof("VolumeEcShardsToVolume: %v", req)
+
+ v, found := vs.store.FindEcVolume(needle.VolumeId(req.VolumeId))
+ if !found {
+ return nil, fmt.Errorf("ec volume %d not found", req.VolumeId)
+ }
+ baseFileName := v.FileName()
+
+ if v.Collection != req.Collection {
+ return nil, fmt.Errorf("existing collection:%v unexpected input: %v", v.Collection, req.Collection)
+ }
+
+ // calculate .dat file size
+ datFileSize, err := erasure_coding.FindDatFileSize(baseFileName)
+ if err != nil {
+ return nil, fmt.Errorf("FindDatFileSize %s: %v", baseFileName, err)
+ }
+
+ // write .dat file from .ec00 ~ .ec09 files
+ if err := erasure_coding.WriteDatFile(baseFileName, datFileSize); err != nil {
+ return nil, fmt.Errorf("WriteEcFiles %s: %v", baseFileName, err)
+ }
+
+ // write .idx file from .ecx and .ecj files
+ if err := erasure_coding.WriteIdxFileFromEcIndex(baseFileName); err != nil {
+ return nil, fmt.Errorf("WriteIdxFileFromEcIndex %s: %v", baseFileName, err)
+ }
+
+ return &volume_server_pb.VolumeEcShardsToVolumeResponse{}, nil
+}
diff --git a/weed/server/volume_grpc_query.go b/weed/server/volume_grpc_query.go
new file mode 100644
index 000000000..767e28e7b
--- /dev/null
+++ b/weed/server/volume_grpc_query.go
@@ -0,0 +1,69 @@
+package weed_server
+
+import (
+ "github.com/chrislusf/seaweedfs/weed/glog"
+ "github.com/chrislusf/seaweedfs/weed/operation"
+ "github.com/chrislusf/seaweedfs/weed/pb/volume_server_pb"
+ "github.com/chrislusf/seaweedfs/weed/query/json"
+ "github.com/chrislusf/seaweedfs/weed/storage/needle"
+ "github.com/tidwall/gjson"
+)
+
+func (vs *VolumeServer) Query(req *volume_server_pb.QueryRequest, stream volume_server_pb.VolumeServer_QueryServer) error {
+
+ for _, fid := range req.FromFileIds {
+
+ vid, id_cookie, err := operation.ParseFileId(fid)
+ if err != nil {
+ glog.V(0).Infof("volume query failed to parse fid %s: %v", fid, err)
+ return err
+ }
+
+ n := new(needle.Needle)
+ volumeId, _ := needle.NewVolumeId(vid)
+ n.ParsePath(id_cookie)
+
+ cookie := n.Cookie
+ if _, err := vs.store.ReadVolumeNeedle(volumeId, n); err != nil {
+ glog.V(0).Infof("volume query failed to read fid %s: %v", fid, err)
+ return err
+ }
+
+ if n.Cookie != cookie {
+ glog.V(0).Infof("volume query failed to read fid cookie %s: %v", fid, err)
+ return err
+ }
+
+ if req.InputSerialization.CsvInput != nil {
+
+ }
+
+ if req.InputSerialization.JsonInput != nil {
+
+ stripe := &volume_server_pb.QueriedStripe{
+ Records: nil,
+ }
+
+ filter := json.Query{
+ Field: req.Filter.Field,
+ Op: req.Filter.Operand,
+ Value: req.Filter.Value,
+ }
+ gjson.ForEachLine(string(n.Data), func(line gjson.Result) bool {
+ passedFilter, values := json.QueryJson(line.Raw, req.Selections, filter)
+ if !passedFilter {
+ return true
+ }
+ stripe.Records = json.ToJson(stripe.Records, req.Selections, values)
+ return true
+ })
+ err = stream.Send(stripe)
+ if err != nil {
+ return err
+ }
+ }
+
+ }
+
+ return nil
+}
diff --git a/weed/server/volume_grpc_sync.go b/weed/server/volume_grpc_sync.go
deleted file mode 100644
index 5f56ec17d..000000000
--- a/weed/server/volume_grpc_sync.go
+++ /dev/null
@@ -1,101 +0,0 @@
-package weed_server
-
-import (
- "context"
- "fmt"
-
- "github.com/chrislusf/seaweedfs/weed/glog"
- "github.com/chrislusf/seaweedfs/weed/pb/volume_server_pb"
- "github.com/chrislusf/seaweedfs/weed/storage"
- "github.com/chrislusf/seaweedfs/weed/storage/types"
-)
-
-func (vs *VolumeServer) VolumeSyncStatus(ctx context.Context, req *volume_server_pb.VolumeSyncStatusRequest) (*volume_server_pb.VolumeSyncStatusResponse, error) {
-
- v := vs.store.GetVolume(storage.VolumeId(req.VolumdId))
- if v == nil {
- return nil, fmt.Errorf("Not Found Volume Id %d", req.VolumdId)
- }
-
- resp := v.GetVolumeSyncStatus()
-
- glog.V(2).Infof("volume sync status %d", req.VolumdId)
-
- return resp, nil
-
-}
-
-func (vs *VolumeServer) VolumeSyncIndex(req *volume_server_pb.VolumeSyncIndexRequest, stream volume_server_pb.VolumeServer_VolumeSyncIndexServer) error {
-
- v := vs.store.GetVolume(storage.VolumeId(req.VolumdId))
- if v == nil {
- return fmt.Errorf("Not Found Volume Id %d", req.VolumdId)
- }
-
- content, err := v.IndexFileContent()
-
- if err != nil {
- glog.Errorf("sync volume %d index: %v", req.VolumdId, err)
- } else {
- glog.V(2).Infof("sync volume %d index", req.VolumdId)
- }
-
- const blockSizeLimit = 1024 * 1024 * 2
- for i := 0; i < len(content); i += blockSizeLimit {
- blockSize := len(content) - i
- if blockSize > blockSizeLimit {
- blockSize = blockSizeLimit
- }
- resp := &volume_server_pb.VolumeSyncIndexResponse{}
- resp.IndexFileContent = content[i : i+blockSize]
- stream.Send(resp)
- }
-
- return nil
-
-}
-
-func (vs *VolumeServer) VolumeSyncData(req *volume_server_pb.VolumeSyncDataRequest, stream volume_server_pb.VolumeServer_VolumeSyncDataServer) error {
-
- v := vs.store.GetVolume(storage.VolumeId(req.VolumdId))
- if v == nil {
- return fmt.Errorf("Not Found Volume Id %d", req.VolumdId)
- }
-
- if uint32(v.SuperBlock.CompactRevision) != req.Revision {
- return fmt.Errorf("Requested Volume Revision is %d, but current revision is %d", req.Revision, v.SuperBlock.CompactRevision)
- }
-
- content, err := storage.ReadNeedleBlob(v.DataFile(), int64(req.Offset)*types.NeedlePaddingSize, req.Size, v.Version())
- if err != nil {
- return fmt.Errorf("read offset:%d size:%d", req.Offset, req.Size)
- }
-
- id, err := types.ParseNeedleId(req.NeedleId)
- if err != nil {
- return fmt.Errorf("parsing needle id %s: %v", req.NeedleId, err)
- }
- n := new(storage.Needle)
- n.ParseNeedleHeader(content)
- if id != n.Id {
- return fmt.Errorf("Expected file entry id %d, but found %d", id, n.Id)
- }
-
- if err != nil {
- glog.Errorf("sync volume %d data: %v", req.VolumdId, err)
- }
-
- const blockSizeLimit = 1024 * 1024 * 2
- for i := 0; i < len(content); i += blockSizeLimit {
- blockSize := len(content) - i
- if blockSize > blockSizeLimit {
- blockSize = blockSizeLimit
- }
- resp := &volume_server_pb.VolumeSyncDataResponse{}
- resp.FileContent = content[i : i+blockSize]
- stream.Send(resp)
- }
-
- return nil
-
-}
diff --git a/weed/server/volume_grpc_tail.go b/weed/server/volume_grpc_tail.go
new file mode 100644
index 000000000..2dde5b69c
--- /dev/null
+++ b/weed/server/volume_grpc_tail.go
@@ -0,0 +1,136 @@
+package weed_server
+
+import (
+ "context"
+ "fmt"
+ "time"
+
+ "github.com/chrislusf/seaweedfs/weed/glog"
+ "github.com/chrislusf/seaweedfs/weed/operation"
+ "github.com/chrislusf/seaweedfs/weed/pb/volume_server_pb"
+ "github.com/chrislusf/seaweedfs/weed/storage"
+ "github.com/chrislusf/seaweedfs/weed/storage/needle"
+ "github.com/chrislusf/seaweedfs/weed/storage/super_block"
+)
+
+func (vs *VolumeServer) VolumeTailSender(req *volume_server_pb.VolumeTailSenderRequest, stream volume_server_pb.VolumeServer_VolumeTailSenderServer) error {
+
+ v := vs.store.GetVolume(needle.VolumeId(req.VolumeId))
+ if v == nil {
+ return fmt.Errorf("not found volume id %d", req.VolumeId)
+ }
+
+ defer glog.V(1).Infof("tailing volume %d finished", v.Id)
+
+ lastTimestampNs := req.SinceNs
+ drainingSeconds := req.IdleTimeoutSeconds
+
+ for {
+ lastProcessedTimestampNs, err := sendNeedlesSince(stream, v, lastTimestampNs)
+ if err != nil {
+ glog.Infof("sendNeedlesSince: %v", err)
+ return fmt.Errorf("streamFollow: %v", err)
+ }
+ time.Sleep(2 * time.Second)
+
+ if req.IdleTimeoutSeconds == 0 {
+ lastTimestampNs = lastProcessedTimestampNs
+ continue
+ }
+ if lastProcessedTimestampNs == lastTimestampNs {
+ drainingSeconds--
+ if drainingSeconds <= 0 {
+ return nil
+ }
+ glog.V(1).Infof("tailing volume %d drains requests with %d seconds remaining", v.Id, drainingSeconds)
+ } else {
+ lastTimestampNs = lastProcessedTimestampNs
+ drainingSeconds = req.IdleTimeoutSeconds
+ glog.V(1).Infof("tailing volume %d resets draining wait time to %d seconds", v.Id, drainingSeconds)
+ }
+
+ }
+
+}
+
+func sendNeedlesSince(stream volume_server_pb.VolumeServer_VolumeTailSenderServer, v *storage.Volume, lastTimestampNs uint64) (lastProcessedTimestampNs uint64, err error) {
+
+ foundOffset, isLastOne, err := v.BinarySearchByAppendAtNs(lastTimestampNs)
+ if err != nil {
+ return 0, fmt.Errorf("fail to locate by appendAtNs %d: %s", lastTimestampNs, err)
+ }
+
+ // log.Printf("reading ts %d offset %d isLast %v", lastTimestampNs, foundOffset, isLastOne)
+
+ if isLastOne {
+ // need to heart beat to the client to ensure the connection health
+ sendErr := stream.Send(&volume_server_pb.VolumeTailSenderResponse{IsLastChunk: true})
+ return lastTimestampNs, sendErr
+ }
+
+ scanner := &VolumeFileScanner4Tailing{
+ stream: stream,
+ }
+
+ err = storage.ScanVolumeFileFrom(v.Version(), v.DataBackend, foundOffset.ToAcutalOffset(), scanner)
+
+ return scanner.lastProcessedTimestampNs, err
+
+}
+
+func (vs *VolumeServer) VolumeTailReceiver(ctx context.Context, req *volume_server_pb.VolumeTailReceiverRequest) (*volume_server_pb.VolumeTailReceiverResponse, error) {
+
+ resp := &volume_server_pb.VolumeTailReceiverResponse{}
+
+ v := vs.store.GetVolume(needle.VolumeId(req.VolumeId))
+ if v == nil {
+ return resp, fmt.Errorf("receiver not found volume id %d", req.VolumeId)
+ }
+
+ defer glog.V(1).Infof("receive tailing volume %d finished", v.Id)
+
+ return resp, operation.TailVolumeFromSource(req.SourceVolumeServer, vs.grpcDialOption, v.Id, req.SinceNs, int(req.IdleTimeoutSeconds), func(n *needle.Needle) error {
+ _, err := vs.store.WriteVolumeNeedle(v.Id, n, false)
+ return err
+ })
+
+}
+
+// generate the volume idx
+type VolumeFileScanner4Tailing struct {
+ stream volume_server_pb.VolumeServer_VolumeTailSenderServer
+ lastProcessedTimestampNs uint64
+}
+
+func (scanner *VolumeFileScanner4Tailing) VisitSuperBlock(superBlock super_block.SuperBlock) error {
+ return nil
+
+}
+func (scanner *VolumeFileScanner4Tailing) ReadNeedleBody() bool {
+ return true
+}
+
+func (scanner *VolumeFileScanner4Tailing) VisitNeedle(n *needle.Needle, offset int64, needleHeader, needleBody []byte) error {
+ isLastChunk := false
+
+ // need to send body by chunks
+ for i := 0; i < len(needleBody); i += BufferSizeLimit {
+ stopOffset := i + BufferSizeLimit
+ if stopOffset >= len(needleBody) {
+ isLastChunk = true
+ stopOffset = len(needleBody)
+ }
+
+ sendErr := scanner.stream.Send(&volume_server_pb.VolumeTailSenderResponse{
+ NeedleHeader: needleHeader,
+ NeedleBody: needleBody[i:stopOffset],
+ IsLastChunk: isLastChunk,
+ })
+ if sendErr != nil {
+ return sendErr
+ }
+ }
+
+ scanner.lastProcessedTimestampNs = n.AppendAtNs
+ return nil
+}
diff --git a/weed/server/volume_grpc_tier_download.go b/weed/server/volume_grpc_tier_download.go
new file mode 100644
index 000000000..7b3982e40
--- /dev/null
+++ b/weed/server/volume_grpc_tier_download.go
@@ -0,0 +1,85 @@
+package weed_server
+
+import (
+ "fmt"
+ "time"
+
+ "github.com/chrislusf/seaweedfs/weed/pb/volume_server_pb"
+ "github.com/chrislusf/seaweedfs/weed/storage/backend"
+ "github.com/chrislusf/seaweedfs/weed/storage/needle"
+)
+
+// VolumeTierMoveDatFromRemote copy dat file from a remote tier to local volume server
+func (vs *VolumeServer) VolumeTierMoveDatFromRemote(req *volume_server_pb.VolumeTierMoveDatFromRemoteRequest, stream volume_server_pb.VolumeServer_VolumeTierMoveDatFromRemoteServer) error {
+
+ // find existing volume
+ v := vs.store.GetVolume(needle.VolumeId(req.VolumeId))
+ if v == nil {
+ return fmt.Errorf("volume %d not found", req.VolumeId)
+ }
+
+ // verify the collection
+ if v.Collection != req.Collection {
+ return fmt.Errorf("existing collection:%v unexpected input: %v", v.Collection, req.Collection)
+ }
+
+ // locate the disk file
+ storageName, storageKey := v.RemoteStorageNameKey()
+ if storageName == "" || storageKey == "" {
+ return fmt.Errorf("volume %d is already on local disk", req.VolumeId)
+ }
+
+ // check whether the local .dat already exists
+ _, ok := v.DataBackend.(*backend.DiskFile)
+ if ok {
+ return fmt.Errorf("volume %d is already on local disk", req.VolumeId)
+ }
+
+ // check valid storage backend type
+ backendStorage, found := backend.BackendStorages[storageName]
+ if !found {
+ var keys []string
+ for key := range backend.BackendStorages {
+ keys = append(keys, key)
+ }
+ return fmt.Errorf("remote storage %s not found from suppported: %v", storageName, keys)
+ }
+
+ startTime := time.Now()
+ fn := func(progressed int64, percentage float32) error {
+ now := time.Now()
+ if now.Sub(startTime) < time.Second {
+ return nil
+ }
+ startTime = now
+ return stream.Send(&volume_server_pb.VolumeTierMoveDatFromRemoteResponse{
+ Processed: progressed,
+ ProcessedPercentage: percentage,
+ })
+ }
+ // copy the data file
+ _, err := backendStorage.DownloadFile(v.FileName()+".dat", storageKey, fn)
+ if err != nil {
+ return fmt.Errorf("backend %s copy file %s: %v", storageName, v.FileName()+".dat", err)
+ }
+
+ if req.KeepRemoteDatFile {
+ return nil
+ }
+
+ // remove remote file
+ if err := backendStorage.DeleteFile(storageKey); err != nil {
+ return fmt.Errorf("volume %d fail to delete remote file %s: %v", v.Id, storageKey, err)
+ }
+
+ // forget remote file
+ v.GetVolumeInfo().Files = v.GetVolumeInfo().Files[1:]
+ if err := v.SaveVolumeInfo(); err != nil {
+ return fmt.Errorf("volume %d fail to save remote file info: %v", v.Id, err)
+ }
+
+ v.DataBackend.Close()
+ v.DataBackend = nil
+
+ return nil
+}
diff --git a/weed/server/volume_grpc_tier_upload.go b/weed/server/volume_grpc_tier_upload.go
new file mode 100644
index 000000000..c9694df59
--- /dev/null
+++ b/weed/server/volume_grpc_tier_upload.go
@@ -0,0 +1,100 @@
+package weed_server
+
+import (
+ "fmt"
+ "os"
+ "time"
+
+ "github.com/chrislusf/seaweedfs/weed/pb/volume_server_pb"
+ "github.com/chrislusf/seaweedfs/weed/storage/backend"
+ "github.com/chrislusf/seaweedfs/weed/storage/needle"
+)
+
+// VolumeTierMoveDatToRemote copy dat file to a remote tier
+func (vs *VolumeServer) VolumeTierMoveDatToRemote(req *volume_server_pb.VolumeTierMoveDatToRemoteRequest, stream volume_server_pb.VolumeServer_VolumeTierMoveDatToRemoteServer) error {
+
+ // find existing volume
+ v := vs.store.GetVolume(needle.VolumeId(req.VolumeId))
+ if v == nil {
+ return fmt.Errorf("volume %d not found", req.VolumeId)
+ }
+
+ // verify the collection
+ if v.Collection != req.Collection {
+ return fmt.Errorf("existing collection:%v unexpected input: %v", v.Collection, req.Collection)
+ }
+
+ // locate the disk file
+ diskFile, ok := v.DataBackend.(*backend.DiskFile)
+ if !ok {
+ return fmt.Errorf("volume %d is not on local disk", req.VolumeId)
+ }
+
+ // check valid storage backend type
+ backendStorage, found := backend.BackendStorages[req.DestinationBackendName]
+ if !found {
+ var keys []string
+ for key := range backend.BackendStorages {
+ keys = append(keys, key)
+ }
+ return fmt.Errorf("destination %s not found, suppported: %v", req.DestinationBackendName, keys)
+ }
+
+ // check whether the existing backend storage is the same as requested
+ // if same, skip
+ backendType, backendId := backend.BackendNameToTypeId(req.DestinationBackendName)
+ for _, remoteFile := range v.GetVolumeInfo().GetFiles() {
+ if remoteFile.BackendType == backendType && remoteFile.BackendId == backendId {
+ return fmt.Errorf("destination %s already exists", req.DestinationBackendName)
+ }
+ }
+
+ startTime := time.Now()
+ fn := func(progressed int64, percentage float32) error {
+ now := time.Now()
+ if now.Sub(startTime) < time.Second {
+ return nil
+ }
+ startTime = now
+ return stream.Send(&volume_server_pb.VolumeTierMoveDatToRemoteResponse{
+ Processed: progressed,
+ ProcessedPercentage: percentage,
+ })
+ }
+
+ // remember the file original source
+ attributes := make(map[string]string)
+ attributes["volumeId"] = v.Id.String()
+ attributes["collection"] = v.Collection
+ attributes["ext"] = ".dat"
+ // copy the data file
+ key, size, err := backendStorage.CopyFile(diskFile.File, attributes, fn)
+ if err != nil {
+ return fmt.Errorf("backend %s copy file %s: %v", req.DestinationBackendName, diskFile.Name(), err)
+ }
+
+ // save the remote file to volume tier info
+ v.GetVolumeInfo().Files = append(v.GetVolumeInfo().GetFiles(), &volume_server_pb.RemoteFile{
+ BackendType: backendType,
+ BackendId: backendId,
+ Key: key,
+ Offset: 0,
+ FileSize: uint64(size),
+ ModifiedTime: uint64(time.Now().Unix()),
+ Extension: ".dat",
+ })
+
+ if err := v.SaveVolumeInfo(); err != nil {
+ return fmt.Errorf("volume %d fail to save remote file info: %v", v.Id, err)
+ }
+
+ if err := v.LoadRemoteFile(); err != nil {
+ return fmt.Errorf("volume %d fail to load remote file: %v", v.Id, err)
+ }
+
+ if !req.KeepLocalDatFile {
+ os.Remove(v.FileName() + ".dat")
+ }
+
+ return nil
+}
diff --git a/weed/server/volume_grpc_vacuum.go b/weed/server/volume_grpc_vacuum.go
index f0c87b582..b87de4b5b 100644
--- a/weed/server/volume_grpc_vacuum.go
+++ b/weed/server/volume_grpc_vacuum.go
@@ -5,19 +5,19 @@ import (
"github.com/chrislusf/seaweedfs/weed/glog"
"github.com/chrislusf/seaweedfs/weed/pb/volume_server_pb"
- "github.com/chrislusf/seaweedfs/weed/storage"
+ "github.com/chrislusf/seaweedfs/weed/storage/needle"
)
func (vs *VolumeServer) VacuumVolumeCheck(ctx context.Context, req *volume_server_pb.VacuumVolumeCheckRequest) (*volume_server_pb.VacuumVolumeCheckResponse, error) {
resp := &volume_server_pb.VacuumVolumeCheckResponse{}
- garbageRatio, err := vs.store.CheckCompactVolume(storage.VolumeId(req.VolumdId))
+ garbageRatio, err := vs.store.CheckCompactVolume(needle.VolumeId(req.VolumeId))
resp.GarbageRatio = garbageRatio
if err != nil {
- glog.V(3).Infof("check volume %d: %v", req.VolumdId, err)
+ glog.V(3).Infof("check volume %d: %v", req.VolumeId, err)
}
return resp, err
@@ -28,12 +28,12 @@ func (vs *VolumeServer) VacuumVolumeCompact(ctx context.Context, req *volume_ser
resp := &volume_server_pb.VacuumVolumeCompactResponse{}
- err := vs.store.CompactVolume(storage.VolumeId(req.VolumdId), req.Preallocate)
+ err := vs.store.CompactVolume(needle.VolumeId(req.VolumeId), req.Preallocate, vs.compactionBytePerSecond)
if err != nil {
- glog.Errorf("compact volume %d: %v", req.VolumdId, err)
+ glog.Errorf("compact volume %d: %v", req.VolumeId, err)
} else {
- glog.V(1).Infof("compact volume %d", req.VolumdId)
+ glog.V(1).Infof("compact volume %d", req.VolumeId)
}
return resp, err
@@ -44,12 +44,17 @@ func (vs *VolumeServer) VacuumVolumeCommit(ctx context.Context, req *volume_serv
resp := &volume_server_pb.VacuumVolumeCommitResponse{}
- err := vs.store.CommitCompactVolume(storage.VolumeId(req.VolumdId))
+ err := vs.store.CommitCompactVolume(needle.VolumeId(req.VolumeId))
if err != nil {
- glog.Errorf("commit volume %d: %v", req.VolumdId, err)
+ glog.Errorf("commit volume %d: %v", req.VolumeId, err)
} else {
- glog.V(1).Infof("commit volume %d", req.VolumdId)
+ glog.V(1).Infof("commit volume %d", req.VolumeId)
+ }
+ if err == nil {
+ if vs.store.GetVolume(needle.VolumeId(req.VolumeId)).IsReadOnly() {
+ resp.IsReadOnly = true
+ }
}
return resp, err
@@ -60,12 +65,12 @@ func (vs *VolumeServer) VacuumVolumeCleanup(ctx context.Context, req *volume_ser
resp := &volume_server_pb.VacuumVolumeCleanupResponse{}
- err := vs.store.CommitCleanupVolume(storage.VolumeId(req.VolumdId))
+ err := vs.store.CommitCleanupVolume(needle.VolumeId(req.VolumeId))
if err != nil {
- glog.Errorf("cleanup volume %d: %v", req.VolumdId, err)
+ glog.Errorf("cleanup volume %d: %v", req.VolumeId, err)
} else {
- glog.V(1).Infof("cleanup volume %d", req.VolumdId)
+ glog.V(1).Infof("cleanup volume %d", req.VolumeId)
}
return resp, err
diff --git a/weed/server/volume_server.go b/weed/server/volume_server.go
index 0914e81b0..b7ed81be0 100644
--- a/weed/server/volume_server.go
+++ b/weed/server/volume_server.go
@@ -1,55 +1,84 @@
package weed_server
import (
+ "fmt"
"net/http"
+ "google.golang.org/grpc"
+
+ "github.com/chrislusf/seaweedfs/weed/stats"
+ "github.com/chrislusf/seaweedfs/weed/util"
+
"github.com/chrislusf/seaweedfs/weed/glog"
"github.com/chrislusf/seaweedfs/weed/security"
"github.com/chrislusf/seaweedfs/weed/storage"
)
type VolumeServer struct {
- MasterNodes []string
- currentMaster string
- pulseSeconds int
- dataCenter string
- rack string
- store *storage.Store
- guard *security.Guard
+ SeedMasterNodes []string
+ currentMaster string
+ pulseSeconds int
+ dataCenter string
+ rack string
+ store *storage.Store
+ guard *security.Guard
+ grpcDialOption grpc.DialOption
- needleMapKind storage.NeedleMapType
- FixJpgOrientation bool
- ReadRedirect bool
+ needleMapKind storage.NeedleMapType
+ ReadRedirect bool
+ compactionBytePerSecond int64
+ MetricsAddress string
+ MetricsIntervalSec int
+ fileSizeLimitBytes int64
}
func NewVolumeServer(adminMux, publicMux *http.ServeMux, ip string,
port int, publicUrl string,
- folders []string, maxCounts []int,
+ folders []string, maxCounts []int, minFreeSpacePercents []float32,
needleMapKind storage.NeedleMapType,
masterNodes []string, pulseSeconds int,
dataCenter string, rack string,
whiteList []string,
- fixJpgOrientation bool,
- readRedirect bool) *VolumeServer {
+ readRedirect bool,
+ compactionMBPerSecond int,
+ fileSizeLimitMB int,
+) *VolumeServer {
+
+ v := util.GetViper()
+ signingKey := v.GetString("jwt.signing.key")
+ v.SetDefault("jwt.signing.expires_after_seconds", 10)
+ expiresAfterSec := v.GetInt("jwt.signing.expires_after_seconds")
+ enableUiAccess := v.GetBool("access.ui")
+
+ readSigningKey := v.GetString("jwt.signing.read.key")
+ v.SetDefault("jwt.signing.read.expires_after_seconds", 60)
+ readExpiresAfterSec := v.GetInt("jwt.signing.read.expires_after_seconds")
+
vs := &VolumeServer{
- pulseSeconds: pulseSeconds,
- dataCenter: dataCenter,
- rack: rack,
- needleMapKind: needleMapKind,
- FixJpgOrientation: fixJpgOrientation,
- ReadRedirect: readRedirect,
+ pulseSeconds: pulseSeconds,
+ dataCenter: dataCenter,
+ rack: rack,
+ needleMapKind: needleMapKind,
+ ReadRedirect: readRedirect,
+ grpcDialOption: security.LoadClientTLS(util.GetViper(), "grpc.volume"),
+ compactionBytePerSecond: int64(compactionMBPerSecond) * 1024 * 1024,
+ fileSizeLimitBytes: int64(fileSizeLimitMB) * 1024 * 1024,
}
- vs.MasterNodes = masterNodes
- vs.store = storage.NewStore(port, ip, publicUrl, folders, maxCounts, vs.needleMapKind)
-
- vs.guard = security.NewGuard(whiteList, "")
+ vs.SeedMasterNodes = masterNodes
+ vs.store = storage.NewStore(vs.grpcDialOption, port, ip, publicUrl, folders, maxCounts, minFreeSpacePercents, vs.needleMapKind)
+ vs.guard = security.NewGuard(whiteList, signingKey, expiresAfterSec, readSigningKey, readExpiresAfterSec)
handleStaticResources(adminMux)
- adminMux.HandleFunc("/ui/index.html", vs.uiStatusHandler)
- adminMux.HandleFunc("/status", vs.guard.WhiteList(vs.statusHandler))
- adminMux.HandleFunc("/stats/counter", vs.guard.WhiteList(statsCounterHandler))
- adminMux.HandleFunc("/stats/memory", vs.guard.WhiteList(statsMemoryHandler))
- adminMux.HandleFunc("/stats/disk", vs.guard.WhiteList(vs.statsDiskHandler))
+ if signingKey == "" || enableUiAccess {
+ // only expose the volume server details for safe environments
+ adminMux.HandleFunc("/ui/index.html", vs.uiStatusHandler)
+ adminMux.HandleFunc("/status", vs.guard.WhiteList(vs.statusHandler))
+ /*
+ adminMux.HandleFunc("/stats/counter", vs.guard.WhiteList(statsCounterHandler))
+ adminMux.HandleFunc("/stats/memory", vs.guard.WhiteList(statsMemoryHandler))
+ adminMux.HandleFunc("/stats/disk", vs.guard.WhiteList(vs.statsDiskHandler))
+ */
+ }
adminMux.HandleFunc("/", vs.privateStoreHandler)
if publicMux != adminMux {
// separated admin and public port
@@ -58,6 +87,11 @@ func NewVolumeServer(adminMux, publicMux *http.ServeMux, ip string,
}
go vs.heartbeat()
+ hostAddress := fmt.Sprintf("%s:%d", ip, port)
+ go stats.LoopPushingMetric("volumeServer", hostAddress, stats.VolumeServerGather,
+ func() (addr string, intervalSeconds int) {
+ return vs.MetricsAddress, vs.MetricsIntervalSec
+ })
return vs
}
@@ -67,7 +101,3 @@ func (vs *VolumeServer) Shutdown() {
vs.store.Close()
glog.V(0).Infoln("Shut down successfully!")
}
-
-func (vs *VolumeServer) jwt(fileId string) security.EncodedJwt {
- return security.GenJwt(vs.guard.SecretKey, fileId)
-}
diff --git a/weed/server/volume_server_handlers.go b/weed/server/volume_server_handlers.go
index 77b1274fd..14ad27d42 100644
--- a/weed/server/volume_server_handlers.go
+++ b/weed/server/volume_server_handlers.go
@@ -2,7 +2,10 @@ package weed_server
import (
"net/http"
+ "strings"
+ "github.com/chrislusf/seaweedfs/weed/glog"
+ "github.com/chrislusf/seaweedfs/weed/security"
"github.com/chrislusf/seaweedfs/weed/stats"
)
@@ -45,3 +48,47 @@ func (vs *VolumeServer) publicReadOnlyHandler(w http.ResponseWriter, r *http.Req
vs.GetOrHeadHandler(w, r)
}
}
+
+func (vs *VolumeServer) maybeCheckJwtAuthorization(r *http.Request, vid, fid string, isWrite bool) bool {
+
+ var signingKey security.SigningKey
+
+ if isWrite {
+ if len(vs.guard.SigningKey) == 0 {
+ return true
+ } else {
+ signingKey = vs.guard.SigningKey
+ }
+ } else {
+ if len(vs.guard.ReadSigningKey) == 0 {
+ return true
+ } else {
+ signingKey = vs.guard.ReadSigningKey
+ }
+ }
+
+ tokenStr := security.GetJwt(r)
+ if tokenStr == "" {
+ glog.V(1).Infof("missing jwt from %s", r.RemoteAddr)
+ return false
+ }
+
+ token, err := security.DecodeJwt(signingKey, tokenStr)
+ if err != nil {
+ glog.V(1).Infof("jwt verification error from %s: %v", r.RemoteAddr, err)
+ return false
+ }
+ if !token.Valid {
+ glog.V(1).Infof("jwt invalid from %s: %v", r.RemoteAddr, tokenStr)
+ return false
+ }
+
+ if sc, ok := token.Claims.(*security.SeaweedFileIdClaims); ok {
+ if sepIndex := strings.LastIndex(fid, "_"); sepIndex > 0 {
+ fid = fid[:sepIndex]
+ }
+ return sc.Fid == vid+","+fid
+ }
+ glog.V(1).Infof("unexpected jwt from %s: %v", r.RemoteAddr, tokenStr)
+ return false
+}
diff --git a/weed/server/volume_server_handlers_admin.go b/weed/server/volume_server_handlers_admin.go
index 25b6582f7..34655d833 100644
--- a/weed/server/volume_server_handlers_admin.go
+++ b/weed/server/volume_server_handlers_admin.go
@@ -11,14 +11,21 @@ import (
func (vs *VolumeServer) statusHandler(w http.ResponseWriter, r *http.Request) {
m := make(map[string]interface{})
- m["Version"] = util.VERSION
- m["Volumes"] = vs.store.Status()
+ m["Version"] = util.Version()
+ var ds []*volume_server_pb.DiskStatus
+ for _, loc := range vs.store.Locations {
+ if dir, e := filepath.Abs(loc.Directory); e == nil {
+ ds = append(ds, stats.NewDiskStatus(dir))
+ }
+ }
+ m["DiskStatuses"] = ds
+ m["Volumes"] = vs.store.VolumeInfos()
writeJsonQuiet(w, r, http.StatusOK, m)
}
func (vs *VolumeServer) statsDiskHandler(w http.ResponseWriter, r *http.Request) {
m := make(map[string]interface{})
- m["Version"] = util.VERSION
+ m["Version"] = util.Version()
var ds []*volume_server_pb.DiskStatus
for _, loc := range vs.store.Locations {
if dir, e := filepath.Abs(loc.Directory); e == nil {
diff --git a/weed/server/volume_server_handlers_read.go b/weed/server/volume_server_handlers_read.go
index 92c728141..7ef1170b3 100644
--- a/weed/server/volume_server_handlers_read.go
+++ b/weed/server/volume_server_handlers_read.go
@@ -2,45 +2,59 @@ package weed_server
import (
"bytes"
+ "encoding/json"
+ "errors"
+ "fmt"
"io"
"mime"
- "mime/multipart"
"net/http"
"net/url"
- "path"
+ "path/filepath"
"strconv"
"strings"
"time"
- "encoding/json"
-
"github.com/chrislusf/seaweedfs/weed/glog"
"github.com/chrislusf/seaweedfs/weed/images"
"github.com/chrislusf/seaweedfs/weed/operation"
- "github.com/chrislusf/seaweedfs/weed/storage"
+ "github.com/chrislusf/seaweedfs/weed/stats"
+ "github.com/chrislusf/seaweedfs/weed/storage/needle"
"github.com/chrislusf/seaweedfs/weed/util"
)
var fileNameEscaper = strings.NewReplacer("\\", "\\\\", "\"", "\\\"")
func (vs *VolumeServer) GetOrHeadHandler(w http.ResponseWriter, r *http.Request) {
- n := new(storage.Needle)
+
+ stats.VolumeServerRequestCounter.WithLabelValues("get").Inc()
+ start := time.Now()
+ defer func() { stats.VolumeServerRequestHistogram.WithLabelValues("get").Observe(time.Since(start).Seconds()) }()
+
+ n := new(needle.Needle)
vid, fid, filename, ext, _ := parseURLPath(r.URL.Path)
- volumeId, err := storage.NewVolumeId(vid)
+
+ if !vs.maybeCheckJwtAuthorization(r, vid, fid, false) {
+ writeJsonError(w, r, http.StatusUnauthorized, errors.New("wrong jwt"))
+ return
+ }
+
+ volumeId, err := needle.NewVolumeId(vid)
if err != nil {
- glog.V(2).Infoln("parsing error:", err, r.URL.Path)
+ glog.V(2).Infof("parsing vid %s: %v", r.URL.Path, err)
w.WriteHeader(http.StatusBadRequest)
return
}
err = n.ParsePath(fid)
if err != nil {
- glog.V(2).Infoln("parsing fid error:", err, r.URL.Path)
+ glog.V(2).Infof("parsing fid %s: %v", r.URL.Path, err)
w.WriteHeader(http.StatusBadRequest)
return
}
- glog.V(4).Infoln("volume", volumeId, "reading", n)
- if !vs.store.HasVolume(volumeId) {
+ // glog.V(4).Infoln("volume", volumeId, "reading", n)
+ hasVolume := vs.store.HasVolume(volumeId)
+ _, hasEcVolume := vs.store.FindEcVolume(volumeId)
+ if !hasVolume && !hasEcVolume {
if !vs.ReadRedirect {
glog.V(2).Infoln("volume is not local:", err, r.URL.Path)
w.WriteHeader(http.StatusNotFound)
@@ -50,7 +64,7 @@ func (vs *VolumeServer) GetOrHeadHandler(w http.ResponseWriter, r *http.Request)
glog.V(2).Infoln("volume", volumeId, "found on", lookupResult, "error", err)
if err == nil && len(lookupResult.Locations) > 0 {
u, _ := url.Parse(util.NormalizeUrl(lookupResult.Locations[0].PublicUrl))
- u.Path = r.URL.Path
+ u.Path = fmt.Sprintf("%s/%s,%s", u.Path, vid, fid)
arg := url.Values{}
if c := r.FormValue("collection"); c != "" {
arg.Set("collection", c)
@@ -65,10 +79,15 @@ func (vs *VolumeServer) GetOrHeadHandler(w http.ResponseWriter, r *http.Request)
return
}
cookie := n.Cookie
- count, e := vs.store.ReadVolumeNeedle(volumeId, n)
- glog.V(4).Infoln("read bytes", count, "error", e)
- if e != nil || count < 0 {
- glog.V(0).Infof("read %s error: %v", r.URL.Path, e)
+ var count int
+ if hasVolume {
+ count, err = vs.store.ReadVolumeNeedle(volumeId, n)
+ } else if hasEcVolume {
+ count, err = vs.store.ReadEcShardNeedle(volumeId, n)
+ }
+ // glog.V(4).Infoln("read bytes", count, "error", err)
+ if err != nil || count < 0 {
+ glog.V(0).Infof("read %s isNormalVolume %v error: %v", r.URL.Path, hasVolume, err)
w.WriteHeader(http.StatusNotFound)
return
}
@@ -92,11 +111,7 @@ func (vs *VolumeServer) GetOrHeadHandler(w http.ResponseWriter, r *http.Request)
w.WriteHeader(http.StatusNotModified)
return
}
- if r.Header.Get("ETag-MD5") == "True" {
- setEtag(w, n.MD5())
- } else {
- setEtag(w, n.Etag())
- }
+ setEtag(w, n.Etag())
if n.HasPairs() {
pairMap := make(map[string]string)
@@ -109,14 +124,14 @@ func (vs *VolumeServer) GetOrHeadHandler(w http.ResponseWriter, r *http.Request)
}
}
- if vs.tryHandleChunkedFile(n, filename, w, r) {
+ if vs.tryHandleChunkedFile(n, filename, ext, w, r) {
return
}
if n.NameSize > 0 && filename == "" {
filename = string(n.Name)
if ext == "" {
- ext = path.Ext(filename)
+ ext = filepath.Ext(filename)
}
}
mtype := ""
@@ -127,13 +142,19 @@ func (vs *VolumeServer) GetOrHeadHandler(w http.ResponseWriter, r *http.Request)
}
}
- if ext != ".gz" {
- if n.IsGzipped() {
- if strings.Contains(r.Header.Get("Accept-Encoding"), "gzip") {
+ if ext != ".gz" && ext != ".zst" {
+ if n.IsCompressed() {
+ if _, _, _, shouldResize := shouldResizeImages(ext, r); shouldResize {
+ if n.Data, err = util.DecompressData(n.Data); err != nil {
+ glog.V(0).Infoln("ungzip error:", err, r.URL.Path)
+ }
+ } else if strings.Contains(r.Header.Get("Accept-Encoding"), "zstd") && util.IsZstdContent(n.Data) {
+ w.Header().Set("Content-Encoding", "zstd")
+ } else if strings.Contains(r.Header.Get("Accept-Encoding"), "gzip") && util.IsGzippedContent(n.Data) {
w.Header().Set("Content-Encoding", "gzip")
} else {
- if n.Data, err = operation.UnGzipData(n.Data); err != nil {
- glog.V(0).Infoln("ungzip error:", err, r.URL.Path)
+ if n.Data, err = util.DecompressData(n.Data); err != nil {
+ glog.V(0).Infoln("uncompress error:", err, r.URL.Path)
}
}
}
@@ -146,12 +167,12 @@ func (vs *VolumeServer) GetOrHeadHandler(w http.ResponseWriter, r *http.Request)
}
}
-func (vs *VolumeServer) tryHandleChunkedFile(n *storage.Needle, fileName string, w http.ResponseWriter, r *http.Request) (processed bool) {
+func (vs *VolumeServer) tryHandleChunkedFile(n *needle.Needle, fileName string, ext string, w http.ResponseWriter, r *http.Request) (processed bool) {
if !n.IsChunkedManifest() || r.URL.Query().Get("cm") == "false" {
return false
}
- chunkManifest, e := operation.LoadChunkManifest(n.Data, n.IsGzipped())
+ chunkManifest, e := operation.LoadChunkManifest(n.Data, n.IsCompressed())
if e != nil {
glog.V(0).Infof("load chunked manifest (%s) error: %v", r.URL.Path, e)
return false
@@ -160,7 +181,9 @@ func (vs *VolumeServer) tryHandleChunkedFile(n *storage.Needle, fileName string,
fileName = chunkManifest.Name
}
- ext := path.Ext(fileName)
+ if ext == "" {
+ ext = filepath.Ext(fileName)
+ }
mType := ""
if chunkManifest.Mime != "" {
@@ -172,10 +195,7 @@ func (vs *VolumeServer) tryHandleChunkedFile(n *storage.Needle, fileName string,
w.Header().Set("X-File-Store", "chunked")
- chunkedFileReader := &operation.ChunkedFileReader{
- Manifest: chunkManifest,
- Master: vs.GetMaster(),
- }
+ chunkedFileReader := operation.NewChunkedFileReader(chunkManifest.Chunks, vs.GetMaster())
defer chunkedFileReader.Close()
rs := conditionallyResizeImages(chunkedFileReader, ext, r)
@@ -188,132 +208,56 @@ func (vs *VolumeServer) tryHandleChunkedFile(n *storage.Needle, fileName string,
func conditionallyResizeImages(originalDataReaderSeeker io.ReadSeeker, ext string, r *http.Request) io.ReadSeeker {
rs := originalDataReaderSeeker
+
+ width, height, mode, shouldResize := shouldResizeImages(ext, r)
+ if shouldResize {
+ rs, _, _ = images.Resized(ext, originalDataReaderSeeker, width, height, mode)
+ }
+ return rs
+}
+
+func shouldResizeImages(ext string, r *http.Request) (width, height int, mode string, shouldResize bool) {
if len(ext) > 0 {
ext = strings.ToLower(ext)
}
if ext == ".png" || ext == ".jpg" || ext == ".jpeg" || ext == ".gif" {
- width, height := 0, 0
if r.FormValue("width") != "" {
width, _ = strconv.Atoi(r.FormValue("width"))
}
if r.FormValue("height") != "" {
height, _ = strconv.Atoi(r.FormValue("height"))
}
- rs, _, _ = images.Resized(ext, originalDataReaderSeeker, width, height, r.FormValue("mode"))
}
- return rs
+ mode = r.FormValue("mode")
+ shouldResize = width > 0 || height > 0
+ return
}
func writeResponseContent(filename, mimeType string, rs io.ReadSeeker, w http.ResponseWriter, r *http.Request) error {
totalSize, e := rs.Seek(0, 2)
if mimeType == "" {
- if ext := path.Ext(filename); ext != "" {
+ if ext := filepath.Ext(filename); ext != "" {
mimeType = mime.TypeByExtension(ext)
}
}
if mimeType != "" {
w.Header().Set("Content-Type", mimeType)
}
- if filename != "" {
- contentDisposition := "inline"
- if r.FormValue("dl") != "" {
- if dl, _ := strconv.ParseBool(r.FormValue("dl")); dl {
- contentDisposition = "attachment"
- }
- }
- w.Header().Set("Content-Disposition", contentDisposition+`; filename="`+fileNameEscaper.Replace(filename)+`"`)
- }
w.Header().Set("Accept-Ranges", "bytes")
+
if r.Method == "HEAD" {
w.Header().Set("Content-Length", strconv.FormatInt(totalSize, 10))
return nil
}
- rangeReq := r.Header.Get("Range")
- if rangeReq == "" {
- w.Header().Set("Content-Length", strconv.FormatInt(totalSize, 10))
- if _, e = rs.Seek(0, 0); e != nil {
- return e
- }
- _, e = io.Copy(w, rs)
- return e
- }
- //the rest is dealing with partial content request
- //mostly copy from src/pkg/net/http/fs.go
- ranges, err := parseRange(rangeReq, totalSize)
- if err != nil {
- http.Error(w, err.Error(), http.StatusRequestedRangeNotSatisfiable)
- return nil
- }
- if sumRangesSize(ranges) > totalSize {
- // The total number of bytes in all the ranges
- // is larger than the size of the file by
- // itself, so this is probably an attack, or a
- // dumb client. Ignore the range request.
- return nil
- }
- if len(ranges) == 0 {
- return nil
- }
- if len(ranges) == 1 {
- // RFC 2616, Section 14.16:
- // "When an HTTP message includes the content of a single
- // range (for example, a response to a request for a
- // single range, or to a request for a set of ranges
- // that overlap without any holes), this content is
- // transmitted with a Content-Range header, and a
- // Content-Length header showing the number of bytes
- // actually transferred.
- // ...
- // A response to a request for a single range MUST NOT
- // be sent using the multipart/byteranges media type."
- ra := ranges[0]
- w.Header().Set("Content-Length", strconv.FormatInt(ra.length, 10))
- w.Header().Set("Content-Range", ra.contentRange(totalSize))
- w.WriteHeader(http.StatusPartialContent)
- if _, e = rs.Seek(ra.start, 0); e != nil {
+ adjustHeadersAfterHEAD(w, r, filename)
+
+ processRangeRequest(r, w, totalSize, mimeType, func(writer io.Writer, offset int64, size int64) error {
+ if _, e = rs.Seek(offset, 0); e != nil {
return e
}
-
- _, e = io.CopyN(w, rs, ra.length)
+ _, e = io.CopyN(writer, rs, size)
return e
- }
- // process multiple ranges
- for _, ra := range ranges {
- if ra.start > totalSize {
- http.Error(w, "Out of Range", http.StatusRequestedRangeNotSatisfiable)
- return nil
- }
- }
- sendSize := rangesMIMESize(ranges, mimeType, totalSize)
- pr, pw := io.Pipe()
- mw := multipart.NewWriter(pw)
- w.Header().Set("Content-Type", "multipart/byteranges; boundary="+mw.Boundary())
- sendContent := pr
- defer pr.Close() // cause writing goroutine to fail and exit if CopyN doesn't finish.
- go func() {
- for _, ra := range ranges {
- part, e := mw.CreatePart(ra.mimeHeader(mimeType, totalSize))
- if e != nil {
- pw.CloseWithError(e)
- return
- }
- if _, e = rs.Seek(ra.start, 0); e != nil {
- pw.CloseWithError(e)
- return
- }
- if _, e = io.CopyN(part, rs, ra.length); e != nil {
- pw.CloseWithError(e)
- return
- }
- }
- mw.Close()
- pw.Close()
- }()
- if w.Header().Get("Content-Encoding") == "" {
- w.Header().Set("Content-Length", strconv.FormatInt(sendSize, 10))
- }
- w.WriteHeader(http.StatusPartialContent)
- _, e = io.CopyN(w, sendContent, sendSize)
- return e
+ })
+ return nil
}
diff --git a/weed/server/volume_server_handlers_ui.go b/weed/server/volume_server_handlers_ui.go
index b3d9a21fd..8b2027e7b 100644
--- a/weed/server/volume_server_handlers_ui.go
+++ b/weed/server/volume_server_handlers_ui.go
@@ -8,6 +8,7 @@ import (
"github.com/chrislusf/seaweedfs/weed/pb/volume_server_pb"
ui "github.com/chrislusf/seaweedfs/weed/server/volume_server_ui"
"github.com/chrislusf/seaweedfs/weed/stats"
+ "github.com/chrislusf/seaweedfs/weed/storage"
"github.com/chrislusf/seaweedfs/weed/util"
)
@@ -20,17 +21,30 @@ func (vs *VolumeServer) uiStatusHandler(w http.ResponseWriter, r *http.Request)
ds = append(ds, stats.NewDiskStatus(dir))
}
}
+ volumeInfos := vs.store.VolumeInfos()
+ var normalVolumeInfos, remoteVolumeInfos []*storage.VolumeInfo
+ for _, vinfo := range volumeInfos {
+ if vinfo.IsRemote() {
+ remoteVolumeInfos = append(remoteVolumeInfos, vinfo)
+ } else {
+ normalVolumeInfos = append(normalVolumeInfos, vinfo)
+ }
+ }
args := struct {
- Version string
- Masters []string
- Volumes interface{}
- DiskStatuses interface{}
- Stats interface{}
- Counters *stats.ServerStats
+ Version string
+ Masters []string
+ Volumes interface{}
+ EcVolumes interface{}
+ RemoteVolumes interface{}
+ DiskStatuses interface{}
+ Stats interface{}
+ Counters *stats.ServerStats
}{
- util.VERSION,
- vs.MasterNodes,
- vs.store.Status(),
+ util.Version(),
+ vs.SeedMasterNodes,
+ normalVolumeInfos,
+ vs.store.EcVolumes(),
+ remoteVolumeInfos,
ds,
infos,
serverStats,
diff --git a/weed/server/volume_server_handlers_write.go b/weed/server/volume_server_handlers_write.go
index fd93142e1..5ece46ed0 100644
--- a/weed/server/volume_server_handlers_write.go
+++ b/weed/server/volume_server_handlers_write.go
@@ -10,56 +10,99 @@ import (
"github.com/chrislusf/seaweedfs/weed/glog"
"github.com/chrislusf/seaweedfs/weed/operation"
- "github.com/chrislusf/seaweedfs/weed/storage"
+ "github.com/chrislusf/seaweedfs/weed/stats"
+ "github.com/chrislusf/seaweedfs/weed/storage/needle"
"github.com/chrislusf/seaweedfs/weed/topology"
)
func (vs *VolumeServer) PostHandler(w http.ResponseWriter, r *http.Request) {
+
+ stats.VolumeServerRequestCounter.WithLabelValues("post").Inc()
+ start := time.Now()
+ defer func() {
+ stats.VolumeServerRequestHistogram.WithLabelValues("post").Observe(time.Since(start).Seconds())
+ }()
+
if e := r.ParseForm(); e != nil {
glog.V(0).Infoln("form parse error:", e)
writeJsonError(w, r, http.StatusBadRequest, e)
return
}
- vid, _, _, _, _ := parseURLPath(r.URL.Path)
- volumeId, ve := storage.NewVolumeId(vid)
+
+ vid, fid, _, _, _ := parseURLPath(r.URL.Path)
+ volumeId, ve := needle.NewVolumeId(vid)
if ve != nil {
glog.V(0).Infoln("NewVolumeId error:", ve)
writeJsonError(w, r, http.StatusBadRequest, ve)
return
}
- needle, originalSize, ne := storage.CreateNeedleFromRequest(r, vs.FixJpgOrientation)
+
+ if !vs.maybeCheckJwtAuthorization(r, vid, fid, true) {
+ writeJsonError(w, r, http.StatusUnauthorized, errors.New("wrong jwt"))
+ return
+ }
+
+ reqNeedle, originalSize, ne := needle.CreateNeedleFromRequest(r, vs.fileSizeLimitBytes)
if ne != nil {
writeJsonError(w, r, http.StatusBadRequest, ne)
return
}
ret := operation.UploadResult{}
- _, errorStatus := topology.ReplicatedWrite(vs.GetMaster(),
- vs.store, volumeId, needle, r)
+ isUnchanged, writeError := topology.ReplicatedWrite(vs.GetMaster(), vs.store, volumeId, reqNeedle, r)
+
+ // http 204 status code does not allow body
+ if writeError == nil && isUnchanged {
+ setEtag(w, reqNeedle.Etag())
+ w.WriteHeader(http.StatusNoContent)
+ return
+ }
+
httpStatus := http.StatusCreated
- if errorStatus != "" {
+ if writeError != nil {
httpStatus = http.StatusInternalServerError
- ret.Error = errorStatus
+ ret.Error = writeError.Error()
}
- if needle.HasName() {
- ret.Name = string(needle.Name)
+ if reqNeedle.HasName() {
+ ret.Name = string(reqNeedle.Name)
}
ret.Size = uint32(originalSize)
- ret.ETag = needle.Etag()
+ ret.ETag = reqNeedle.Etag()
+ ret.Mime = string(reqNeedle.Mime)
setEtag(w, ret.ETag)
writeJsonQuiet(w, r, httpStatus, ret)
}
func (vs *VolumeServer) DeleteHandler(w http.ResponseWriter, r *http.Request) {
- n := new(storage.Needle)
+
+ stats.VolumeServerRequestCounter.WithLabelValues("delete").Inc()
+ start := time.Now()
+ defer func() {
+ stats.VolumeServerRequestHistogram.WithLabelValues("delete").Observe(time.Since(start).Seconds())
+ }()
+
+ n := new(needle.Needle)
vid, fid, _, _, _ := parseURLPath(r.URL.Path)
- volumeId, _ := storage.NewVolumeId(vid)
+ volumeId, _ := needle.NewVolumeId(vid)
n.ParsePath(fid)
+ if !vs.maybeCheckJwtAuthorization(r, vid, fid, true) {
+ writeJsonError(w, r, http.StatusUnauthorized, errors.New("wrong jwt"))
+ return
+ }
+
// glog.V(2).Infof("volume %s deleting %s", vid, n)
cookie := n.Cookie
+ ecVolume, hasEcVolume := vs.store.FindEcVolume(volumeId)
+
+ if hasEcVolume {
+ count, err := vs.store.DeleteEcShardNeedle(ecVolume, n, cookie)
+ writeDeleteResult(err, count, w, r)
+ return
+ }
+
_, ok := vs.store.ReadVolumeNeedle(volumeId, n)
if ok != nil {
m := make(map[string]uint32)
@@ -77,13 +120,13 @@ func (vs *VolumeServer) DeleteHandler(w http.ResponseWriter, r *http.Request) {
count := int64(n.Size)
if n.IsChunkedManifest() {
- chunkManifest, e := operation.LoadChunkManifest(n.Data, n.IsGzipped())
+ chunkManifest, e := operation.LoadChunkManifest(n.Data, n.IsCompressed())
if e != nil {
writeJsonError(w, r, http.StatusInternalServerError, fmt.Errorf("Load chunks manifest error: %v", e))
return
}
// make sure all chunks had deleted before delete manifest
- if e := chunkManifest.DeleteChunks(vs.GetMaster()); e != nil {
+ if e := chunkManifest.DeleteChunks(vs.GetMaster(), false, vs.grpcDialOption); e != nil {
writeJsonError(w, r, http.StatusInternalServerError, fmt.Errorf("Delete chunks error: %v", e))
return
}
@@ -100,6 +143,11 @@ func (vs *VolumeServer) DeleteHandler(w http.ResponseWriter, r *http.Request) {
_, err := topology.ReplicatedDelete(vs.GetMaster(), vs.store, volumeId, n, r)
+ writeDeleteResult(err, count, w, r)
+
+}
+
+func writeDeleteResult(err error, count int64, w http.ResponseWriter, r *http.Request) {
if err == nil {
m := make(map[string]int64)
m["size"] = count
@@ -107,7 +155,6 @@ func (vs *VolumeServer) DeleteHandler(w http.ResponseWriter, r *http.Request) {
} else {
writeJsonError(w, r, http.StatusInternalServerError, fmt.Errorf("Deletion Failed: %v", err))
}
-
}
func setEtag(w http.ResponseWriter, etag string) {
@@ -119,3 +166,11 @@ func setEtag(w http.ResponseWriter, etag string) {
}
}
}
+
+func getEtag(resp *http.Response) (etag string) {
+ etag = resp.Header.Get("ETag")
+ if strings.HasPrefix(etag, "\"") && strings.HasSuffix(etag, "\"") {
+ return etag[1 : len(etag)-1]
+ }
+ return
+}
diff --git a/weed/server/volume_server_ui/templates.go b/weed/server/volume_server_ui/templates.go
index b9740510f..8705bc088 100644
--- a/weed/server/volume_server_ui/templates.go
+++ b/weed/server/volume_server_ui/templates.go
@@ -1,11 +1,17 @@
package master_ui
import (
+ "fmt"
+ "github.com/chrislusf/seaweedfs/weed/util"
"html/template"
"strconv"
"strings"
)
+func percentFrom(total uint64, part_of uint64) string {
+ return fmt.Sprintf("%.2f", (float64(part_of)/float64(total))*100)
+}
+
func join(data []int64) string {
var ret []string
for _, d := range data {
@@ -15,7 +21,9 @@ func join(data []int64) string {
}
var funcMap = template.FuncMap{
- "join": join,
+ "join": join,
+ "bytesToHumanReadable": util.BytesToHumanReadable,
+ "percentFrom": percentFrom,
}
var StatusTpl = template.Must(template.New("status").Funcs(funcMap).Parse(`<!DOCTYPE html>
@@ -57,13 +65,25 @@ var StatusTpl = template.Must(template.New("status").Funcs(funcMap).Parse(`<!DOC
<div class="row">
<div class="col-sm-6">
<h2>Disk Stats</h2>
- <table class="table table-condensed table-striped">
+ <table class="table table-striped">
+ <thead>
+ <tr>
+ <th>Path</th>
+ <th>Total</th>
+ <th>Free</th>
+ <th>Usage</th>
+ </tr>
+ </thead>
+ <tbody>
{{ range .DiskStatuses }}
<tr>
- <th>{{ .Dir }}</th>
- <td>{{ .Free }} Bytes Free</td>
+ <td>{{ .Dir }}</td>
+ <td>{{ bytesToHumanReadable .All }}</td>
+ <td>{{ bytesToHumanReadable .Free }}</td>
+ <td>{{ percentFrom .All .Used}}%</td>
</tr>
{{ end }}
+ </tbody>
</table>
</div>
@@ -107,10 +127,11 @@ var StatusTpl = template.Must(template.New("status").Funcs(funcMap).Parse(`<!DOC
<tr>
<th>Id</th>
<th>Collection</th>
- <th>Size</th>
+ <th>Data Size</th>
<th>Files</th>
<th>Trash</th>
<th>TTL</th>
+ <th>ReadOnly</th>
</tr>
</thead>
<tbody>
@@ -118,10 +139,67 @@ var StatusTpl = template.Must(template.New("status").Funcs(funcMap).Parse(`<!DOC
<tr>
<td><code>{{ .Id }}</code></td>
<td>{{ .Collection }}</td>
- <td>{{ .Size }} Bytes</td>
+ <td>{{ bytesToHumanReadable .Size }}</td>
<td>{{ .FileCount }}</td>
- <td>{{ .DeleteCount }} / {{.DeletedByteCount}} Bytes</td>
+ <td>{{ .DeleteCount }} / {{bytesToHumanReadable .DeletedByteCount}}</td>
<td>{{ .Ttl }}</td>
+ <td>{{ .ReadOnly }}</td>
+ </tr>
+ {{ end }}
+ </tbody>
+ </table>
+ </div>
+
+ <div class="row">
+ <h2>Remote Volumes</h2>
+ <table class="table table-striped">
+ <thead>
+ <tr>
+ <th>Id</th>
+ <th>Collection</th>
+ <th>Size</th>
+ <th>Files</th>
+ <th>Trash</th>
+ <th>Remote</th>
+ <th>Key</th>
+ </tr>
+ </thead>
+ <tbody>
+ {{ range .RemoteVolumes }}
+ <tr>
+ <td><code>{{ .Id }}</code></td>
+ <td>{{ .Collection }}</td>
+ <td>{{ bytesToHumanReadable .Size }}</td>
+ <td>{{ .FileCount }}</td>
+ <td>{{ .DeleteCount }} / {{bytesToHumanReadable .DeletedByteCount}}</td>
+ <td>{{ .RemoteStorageName }}</td>
+ <td>{{ .RemoteStorageKey }}</td>
+ </tr>
+ {{ end }}
+ </tbody>
+ </table>
+ </div>
+
+ <div class="row">
+ <h2>Erasure Coding Shards</h2>
+ <table class="table table-striped">
+ <thead>
+ <tr>
+ <th>Id</th>
+ <th>Collection</th>
+ <th>Shard Size</th>
+ <th>Shards</th>
+ <th>CreatedAt</th>
+ </tr>
+ </thead>
+ <tbody>
+ {{ range .EcVolumes }}
+ <tr>
+ <td><code>{{ .VolumeId }}</code></td>
+ <td>{{ .Collection }}</td>
+ <td>{{ bytesToHumanReadable .ShardSize }}</td>
+ <td>{{ .ShardIdList }}</td>
+ <td>{{ .CreatedAt.Format "02 Jan 06 15:04 -0700" }}</td>
</tr>
{{ end }}
</tbody>
diff --git a/weed/server/webdav_server.go b/weed/server/webdav_server.go
new file mode 100644
index 000000000..e8bedd352
--- /dev/null
+++ b/weed/server/webdav_server.go
@@ -0,0 +1,573 @@
+package weed_server
+
+import (
+ "context"
+ "fmt"
+ "io"
+ "math"
+ "os"
+ "path"
+ "strings"
+ "time"
+
+ "github.com/chrislusf/seaweedfs/weed/util/grace"
+ "golang.org/x/net/webdav"
+ "google.golang.org/grpc"
+
+ "github.com/chrislusf/seaweedfs/weed/operation"
+ "github.com/chrislusf/seaweedfs/weed/pb"
+ "github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
+ "github.com/chrislusf/seaweedfs/weed/util"
+ "github.com/chrislusf/seaweedfs/weed/util/chunk_cache"
+
+ "github.com/chrislusf/seaweedfs/weed/filer2"
+ "github.com/chrislusf/seaweedfs/weed/glog"
+ "github.com/chrislusf/seaweedfs/weed/security"
+)
+
+type WebDavOption struct {
+ Filer string
+ FilerGrpcAddress string
+ DomainName string
+ BucketsPath string
+ GrpcDialOption grpc.DialOption
+ Collection string
+ Uid uint32
+ Gid uint32
+ Cipher bool
+ CacheDir string
+ CacheSizeMB int64
+}
+
+type WebDavServer struct {
+ option *WebDavOption
+ secret security.SigningKey
+ filer *filer2.Filer
+ grpcDialOption grpc.DialOption
+ Handler *webdav.Handler
+}
+
+func NewWebDavServer(option *WebDavOption) (ws *WebDavServer, err error) {
+
+ fs, _ := NewWebDavFileSystem(option)
+
+ ws = &WebDavServer{
+ option: option,
+ grpcDialOption: security.LoadClientTLS(util.GetViper(), "grpc.filer"),
+ Handler: &webdav.Handler{
+ FileSystem: fs,
+ LockSystem: webdav.NewMemLS(),
+ },
+ }
+
+ return ws, nil
+}
+
+// adapted from https://github.com/mattn/davfs/blob/master/plugin/mysql/mysql.go
+
+type WebDavFileSystem struct {
+ option *WebDavOption
+ secret security.SigningKey
+ filer *filer2.Filer
+ grpcDialOption grpc.DialOption
+ chunkCache *chunk_cache.ChunkCache
+}
+
+type FileInfo struct {
+ name string
+ size int64
+ mode os.FileMode
+ modifiledTime time.Time
+ isDirectory bool
+}
+
+func (fi *FileInfo) Name() string { return fi.name }
+func (fi *FileInfo) Size() int64 { return fi.size }
+func (fi *FileInfo) Mode() os.FileMode { return fi.mode }
+func (fi *FileInfo) ModTime() time.Time { return fi.modifiledTime }
+func (fi *FileInfo) IsDir() bool { return fi.isDirectory }
+func (fi *FileInfo) Sys() interface{} { return nil }
+
+type WebDavFile struct {
+ fs *WebDavFileSystem
+ name string
+ isDirectory bool
+ off int64
+ entry *filer_pb.Entry
+ entryViewCache []filer2.VisibleInterval
+ reader io.ReaderAt
+}
+
+func NewWebDavFileSystem(option *WebDavOption) (webdav.FileSystem, error) {
+
+ chunkCache := chunk_cache.NewChunkCache(256, option.CacheDir, option.CacheSizeMB)
+ grace.OnInterrupt(func() {
+ chunkCache.Shutdown()
+ })
+ return &WebDavFileSystem{
+ option: option,
+ chunkCache: chunkCache,
+ }, nil
+}
+
+var _ = filer_pb.FilerClient(&WebDavFileSystem{})
+
+func (fs *WebDavFileSystem) WithFilerClient(fn func(filer_pb.SeaweedFilerClient) error) error {
+
+ return pb.WithCachedGrpcClient(func(grpcConnection *grpc.ClientConn) error {
+ client := filer_pb.NewSeaweedFilerClient(grpcConnection)
+ return fn(client)
+ }, fs.option.FilerGrpcAddress, fs.option.GrpcDialOption)
+
+}
+func (fs *WebDavFileSystem) AdjustedUrl(hostAndPort string) string {
+ return hostAndPort
+}
+
+func clearName(name string) (string, error) {
+ slashed := strings.HasSuffix(name, "/")
+ name = path.Clean(name)
+ if !strings.HasSuffix(name, "/") && slashed {
+ name += "/"
+ }
+ if !strings.HasPrefix(name, "/") {
+ return "", os.ErrInvalid
+ }
+ return name, nil
+}
+
+func (fs *WebDavFileSystem) Mkdir(ctx context.Context, fullDirPath string, perm os.FileMode) error {
+
+ glog.V(2).Infof("WebDavFileSystem.Mkdir %v", fullDirPath)
+
+ if !strings.HasSuffix(fullDirPath, "/") {
+ fullDirPath += "/"
+ }
+
+ var err error
+ if fullDirPath, err = clearName(fullDirPath); err != nil {
+ return err
+ }
+
+ _, err = fs.stat(ctx, fullDirPath)
+ if err == nil {
+ return os.ErrExist
+ }
+
+ return fs.WithFilerClient(func(client filer_pb.SeaweedFilerClient) error {
+ dir, name := util.FullPath(fullDirPath).DirAndName()
+ request := &filer_pb.CreateEntryRequest{
+ Directory: dir,
+ Entry: &filer_pb.Entry{
+ Name: name,
+ IsDirectory: true,
+ Attributes: &filer_pb.FuseAttributes{
+ Mtime: time.Now().Unix(),
+ Crtime: time.Now().Unix(),
+ FileMode: uint32(perm | os.ModeDir),
+ Uid: fs.option.Uid,
+ Gid: fs.option.Gid,
+ },
+ },
+ }
+
+ glog.V(1).Infof("mkdir: %v", request)
+ if err := filer_pb.CreateEntry(client, request); err != nil {
+ return fmt.Errorf("mkdir %s/%s: %v", dir, name, err)
+ }
+
+ return nil
+ })
+}
+
+func (fs *WebDavFileSystem) OpenFile(ctx context.Context, fullFilePath string, flag int, perm os.FileMode) (webdav.File, error) {
+
+ glog.V(2).Infof("WebDavFileSystem.OpenFile %v %x", fullFilePath, flag)
+
+ var err error
+ if fullFilePath, err = clearName(fullFilePath); err != nil {
+ return nil, err
+ }
+
+ if flag&os.O_CREATE != 0 {
+ // file should not have / suffix.
+ if strings.HasSuffix(fullFilePath, "/") {
+ return nil, os.ErrInvalid
+ }
+ _, err = fs.stat(ctx, fullFilePath)
+ if err == nil {
+ if flag&os.O_EXCL != 0 {
+ return nil, os.ErrExist
+ }
+ fs.removeAll(ctx, fullFilePath)
+ }
+
+ dir, name := util.FullPath(fullFilePath).DirAndName()
+ err = fs.WithFilerClient(func(client filer_pb.SeaweedFilerClient) error {
+ if err := filer_pb.CreateEntry(client, &filer_pb.CreateEntryRequest{
+ Directory: dir,
+ Entry: &filer_pb.Entry{
+ Name: name,
+ IsDirectory: perm&os.ModeDir > 0,
+ Attributes: &filer_pb.FuseAttributes{
+ Mtime: time.Now().Unix(),
+ Crtime: time.Now().Unix(),
+ FileMode: uint32(perm),
+ Uid: fs.option.Uid,
+ Gid: fs.option.Gid,
+ Collection: fs.option.Collection,
+ Replication: "000",
+ TtlSec: 0,
+ },
+ },
+ }); err != nil {
+ return fmt.Errorf("create %s: %v", fullFilePath, err)
+ }
+ return nil
+ })
+ if err != nil {
+ return nil, err
+ }
+ return &WebDavFile{
+ fs: fs,
+ name: fullFilePath,
+ isDirectory: false,
+ }, nil
+ }
+
+ fi, err := fs.stat(ctx, fullFilePath)
+ if err != nil {
+ return nil, os.ErrNotExist
+ }
+ if !strings.HasSuffix(fullFilePath, "/") && fi.IsDir() {
+ fullFilePath += "/"
+ }
+
+ return &WebDavFile{
+ fs: fs,
+ name: fullFilePath,
+ isDirectory: false,
+ }, nil
+
+}
+
+func (fs *WebDavFileSystem) removeAll(ctx context.Context, fullFilePath string) error {
+ var err error
+ if fullFilePath, err = clearName(fullFilePath); err != nil {
+ return err
+ }
+
+ dir, name := util.FullPath(fullFilePath).DirAndName()
+
+ return filer_pb.Remove(fs, dir, name, true, false, false, false)
+
+}
+
+func (fs *WebDavFileSystem) RemoveAll(ctx context.Context, name string) error {
+
+ glog.V(2).Infof("WebDavFileSystem.RemoveAll %v", name)
+
+ return fs.removeAll(ctx, name)
+}
+
+func (fs *WebDavFileSystem) Rename(ctx context.Context, oldName, newName string) error {
+
+ glog.V(2).Infof("WebDavFileSystem.Rename %v to %v", oldName, newName)
+
+ var err error
+ if oldName, err = clearName(oldName); err != nil {
+ return err
+ }
+ if newName, err = clearName(newName); err != nil {
+ return err
+ }
+
+ of, err := fs.stat(ctx, oldName)
+ if err != nil {
+ return os.ErrExist
+ }
+ if of.IsDir() {
+ if strings.HasSuffix(oldName, "/") {
+ oldName = strings.TrimRight(oldName, "/")
+ }
+ if strings.HasSuffix(newName, "/") {
+ newName = strings.TrimRight(newName, "/")
+ }
+ }
+
+ _, err = fs.stat(ctx, newName)
+ if err == nil {
+ return os.ErrExist
+ }
+
+ oldDir, oldBaseName := util.FullPath(oldName).DirAndName()
+ newDir, newBaseName := util.FullPath(newName).DirAndName()
+
+ return fs.WithFilerClient(func(client filer_pb.SeaweedFilerClient) error {
+
+ request := &filer_pb.AtomicRenameEntryRequest{
+ OldDirectory: oldDir,
+ OldName: oldBaseName,
+ NewDirectory: newDir,
+ NewName: newBaseName,
+ }
+
+ _, err := client.AtomicRenameEntry(ctx, request)
+ if err != nil {
+ return fmt.Errorf("renaming %s/%s => %s/%s: %v", oldDir, oldBaseName, newDir, newBaseName, err)
+ }
+
+ return nil
+
+ })
+}
+
+func (fs *WebDavFileSystem) stat(ctx context.Context, fullFilePath string) (os.FileInfo, error) {
+ var err error
+ if fullFilePath, err = clearName(fullFilePath); err != nil {
+ return nil, err
+ }
+
+ fullpath := util.FullPath(fullFilePath)
+
+ var fi FileInfo
+ entry, err := filer_pb.GetEntry(fs, fullpath)
+ if entry == nil {
+ return nil, os.ErrNotExist
+ }
+ if err != nil {
+ return nil, err
+ }
+ fi.size = int64(filer2.TotalSize(entry.GetChunks()))
+ fi.name = string(fullpath)
+ fi.mode = os.FileMode(entry.Attributes.FileMode)
+ fi.modifiledTime = time.Unix(entry.Attributes.Mtime, 0)
+ fi.isDirectory = entry.IsDirectory
+
+ if fi.name == "/" {
+ fi.modifiledTime = time.Now()
+ fi.isDirectory = true
+ }
+ return &fi, nil
+}
+
+func (fs *WebDavFileSystem) Stat(ctx context.Context, name string) (os.FileInfo, error) {
+
+ glog.V(2).Infof("WebDavFileSystem.Stat %v", name)
+
+ return fs.stat(ctx, name)
+}
+
+func (f *WebDavFile) Write(buf []byte) (int, error) {
+
+ glog.V(2).Infof("WebDavFileSystem.Write %v", f.name)
+
+ dir, _ := util.FullPath(f.name).DirAndName()
+
+ var err error
+ ctx := context.Background()
+ if f.entry == nil {
+ f.entry, err = filer_pb.GetEntry(f.fs, util.FullPath(f.name))
+ }
+
+ if f.entry == nil {
+ return 0, err
+ }
+ if err != nil {
+ return 0, err
+ }
+
+ var fileId, host string
+ var auth security.EncodedJwt
+ var collection, replication string
+
+ if err = f.fs.WithFilerClient(func(client filer_pb.SeaweedFilerClient) error {
+
+ request := &filer_pb.AssignVolumeRequest{
+ Count: 1,
+ Replication: "",
+ Collection: f.fs.option.Collection,
+ ParentPath: dir,
+ }
+
+ resp, err := client.AssignVolume(ctx, request)
+ if err != nil {
+ glog.V(0).Infof("assign volume failure %v: %v", request, err)
+ return err
+ }
+ if resp.Error != "" {
+ return fmt.Errorf("assign volume failure %v: %v", request, resp.Error)
+ }
+
+ fileId, host, auth = resp.FileId, resp.Url, security.EncodedJwt(resp.Auth)
+ collection, replication = resp.Collection, resp.Replication
+
+ return nil
+ }); err != nil {
+ return 0, fmt.Errorf("filerGrpcAddress assign volume: %v", err)
+ }
+
+ fileUrl := fmt.Sprintf("http://%s/%s", host, fileId)
+ uploadResult, err := operation.UploadData(fileUrl, f.name, f.fs.option.Cipher, buf, false, "", nil, auth)
+ if err != nil {
+ glog.V(0).Infof("upload data %v to %s: %v", f.name, fileUrl, err)
+ return 0, fmt.Errorf("upload data: %v", err)
+ }
+ if uploadResult.Error != "" {
+ glog.V(0).Infof("upload failure %v to %s: %v", f.name, fileUrl, err)
+ return 0, fmt.Errorf("upload result: %v", uploadResult.Error)
+ }
+
+ f.entry.Chunks = append(f.entry.Chunks, uploadResult.ToPbFileChunk(fileId, f.off))
+
+ err = f.fs.WithFilerClient(func(client filer_pb.SeaweedFilerClient) error {
+ f.entry.Attributes.Mtime = time.Now().Unix()
+ f.entry.Attributes.Collection = collection
+ f.entry.Attributes.Replication = replication
+
+ request := &filer_pb.UpdateEntryRequest{
+ Directory: dir,
+ Entry: f.entry,
+ }
+
+ if _, err := client.UpdateEntry(ctx, request); err != nil {
+ return fmt.Errorf("update %s: %v", f.name, err)
+ }
+
+ return nil
+ })
+
+ if err == nil {
+ glog.V(3).Infof("WebDavFileSystem.Write %v: written [%d,%d)", f.name, f.off, f.off+int64(len(buf)))
+ f.off += int64(len(buf))
+ }
+
+ return len(buf), err
+}
+
+func (f *WebDavFile) Close() error {
+
+ glog.V(2).Infof("WebDavFileSystem.Close %v", f.name)
+
+ if f.entry != nil {
+ f.entry = nil
+ f.entryViewCache = nil
+ }
+
+ return nil
+}
+
+func (f *WebDavFile) Read(p []byte) (readSize int, err error) {
+
+ glog.V(2).Infof("WebDavFileSystem.Read %v", f.name)
+
+ if f.entry == nil {
+ f.entry, err = filer_pb.GetEntry(f.fs, util.FullPath(f.name))
+ }
+ if f.entry == nil {
+ return 0, err
+ }
+ if err != nil {
+ return 0, err
+ }
+ if len(f.entry.Chunks) == 0 {
+ return 0, io.EOF
+ }
+ if f.entryViewCache == nil {
+ f.entryViewCache = filer2.NonOverlappingVisibleIntervals(f.entry.Chunks)
+ f.reader = nil
+ }
+ if f.reader == nil {
+ chunkViews := filer2.ViewFromVisibleIntervals(f.entryViewCache, 0, math.MaxInt32)
+ f.reader = filer2.NewChunkReaderAtFromClient(f.fs, chunkViews, f.fs.chunkCache)
+ }
+
+ readSize, err = f.reader.ReadAt(p, f.off)
+
+ glog.V(3).Infof("WebDavFileSystem.Read %v: [%d,%d)", f.name, f.off, f.off+int64(readSize))
+ f.off += int64(readSize)
+
+ if err == io.EOF {
+ err = nil
+ }
+
+ if err != nil {
+ glog.Errorf("file read %s: %v", f.name, err)
+ }
+
+ return
+
+}
+
+func (f *WebDavFile) Readdir(count int) (ret []os.FileInfo, err error) {
+
+ glog.V(2).Infof("WebDavFileSystem.Readdir %v count %d", f.name, count)
+
+ dir, _ := util.FullPath(f.name).DirAndName()
+
+ err = filer_pb.ReadDirAllEntries(f.fs, util.FullPath(dir), "", func(entry *filer_pb.Entry, isLast bool) error {
+ fi := FileInfo{
+ size: int64(filer2.TotalSize(entry.GetChunks())),
+ name: entry.Name,
+ mode: os.FileMode(entry.Attributes.FileMode),
+ modifiledTime: time.Unix(entry.Attributes.Mtime, 0),
+ isDirectory: entry.IsDirectory,
+ }
+
+ if !strings.HasSuffix(fi.name, "/") && fi.IsDir() {
+ fi.name += "/"
+ }
+ glog.V(4).Infof("entry: %v", fi.name)
+ ret = append(ret, &fi)
+ return nil
+ })
+
+ old := f.off
+ if old >= int64(len(ret)) {
+ if count > 0 {
+ return nil, io.EOF
+ }
+ return nil, nil
+ }
+ if count > 0 {
+ f.off += int64(count)
+ if f.off > int64(len(ret)) {
+ f.off = int64(len(ret))
+ }
+ } else {
+ f.off = int64(len(ret))
+ old = 0
+ }
+
+ return ret[old:f.off], nil
+}
+
+func (f *WebDavFile) Seek(offset int64, whence int) (int64, error) {
+
+ glog.V(2).Infof("WebDavFile.Seek %v %v %v", f.name, offset, whence)
+
+ ctx := context.Background()
+
+ var err error
+ switch whence {
+ case 0:
+ f.off = 0
+ case 2:
+ if fi, err := f.fs.stat(ctx, f.name); err != nil {
+ return 0, err
+ } else {
+ f.off = fi.Size()
+ }
+ }
+ f.off += offset
+ return f.off, err
+}
+
+func (f *WebDavFile) Stat() (os.FileInfo, error) {
+
+ glog.V(2).Infof("WebDavFile.Stat %v", f.name)
+
+ ctx := context.Background()
+
+ return f.fs.stat(ctx, f.name)
+}