aboutsummaryrefslogtreecommitdiff
path: root/weed/server
diff options
context:
space:
mode:
Diffstat (limited to 'weed/server')
-rw-r--r--weed/server/common.go151
-rw-r--r--weed/server/filer_grpc_server.go457
-rw-r--r--weed/server/filer_grpc_server_kv.go42
-rw-r--r--weed/server/filer_grpc_server_rename.go77
-rw-r--r--weed/server/filer_grpc_server_sub_meta.go202
-rw-r--r--weed/server/filer_server.go171
-rw-r--r--weed/server/filer_server_handlers.go86
-rw-r--r--weed/server/filer_server_handlers_proxy.go67
-rw-r--r--weed/server/filer_server_handlers_read.go254
-rw-r--r--weed/server/filer_server_handlers_read_dir.go51
-rw-r--r--weed/server/filer_server_handlers_tagging.go102
-rw-r--r--weed/server/filer_server_handlers_write.go298
-rw-r--r--weed/server/filer_server_handlers_write_autochunk.go374
-rw-r--r--weed/server/filer_server_handlers_write_cipher.go91
-rw-r--r--weed/server/filer_server_handlers_write_upload.go105
-rw-r--r--weed/server/filer_server_rocksdb.go7
-rw-r--r--weed/server/filer_ui/breadcrumb.go7
-rw-r--r--weed/server/filer_ui/templates.go36
-rw-r--r--weed/server/gateway_server.go106
-rw-r--r--weed/server/master_grpc_server.go165
-rw-r--r--weed/server/master_grpc_server_admin.go143
-rw-r--r--weed/server/master_grpc_server_collection.go1
-rw-r--r--weed/server/master_grpc_server_volume.go22
-rw-r--r--weed/server/master_server.go133
-rw-r--r--weed/server/master_server_handlers.go15
-rw-r--r--weed/server/master_server_handlers_admin.go21
-rw-r--r--weed/server/master_server_handlers_ui.go17
-rw-r--r--weed/server/master_ui/templates.go18
-rw-r--r--weed/server/raft_server.go138
-rw-r--r--weed/server/raft_server_handlers.go14
-rw-r--r--weed/server/volume_grpc_admin.go136
-rw-r--r--weed/server/volume_grpc_batch_delete.go49
-rw-r--r--weed/server/volume_grpc_client_to_master.go117
-rw-r--r--weed/server/volume_grpc_copy.go82
-rw-r--r--weed/server/volume_grpc_copy_incremental.go2
-rw-r--r--weed/server/volume_grpc_erasure_coding.go106
-rw-r--r--weed/server/volume_grpc_query.go2
-rw-r--r--weed/server/volume_grpc_read_write.go38
-rw-r--r--weed/server/volume_grpc_tail.go4
-rw-r--r--weed/server/volume_grpc_tier_download.go4
-rw-r--r--weed/server/volume_grpc_tier_upload.go2
-rw-r--r--weed/server/volume_grpc_vacuum.go5
-rw-r--r--weed/server/volume_server.go56
-rw-r--r--weed/server/volume_server_handlers.go50
-rw-r--r--weed/server/volume_server_handlers_admin.go19
-rw-r--r--weed/server/volume_server_handlers_read.go197
-rw-r--r--weed/server/volume_server_handlers_ui.go7
-rw-r--r--weed/server/volume_server_handlers_write.go36
-rw-r--r--weed/server/volume_server_tcp_handlers_write.go137
-rw-r--r--weed/server/volume_server_ui/templates.go44
-rw-r--r--weed/server/webdav_server.go285
51 files changed, 3333 insertions, 1416 deletions
diff --git a/weed/server/common.go b/weed/server/common.go
index 888ddec49..5c5f1b8eb 100644
--- a/weed/server/common.go
+++ b/weed/server/common.go
@@ -1,10 +1,11 @@
package weed_server
import (
- "bytes"
"encoding/json"
"errors"
"fmt"
+ "io"
+ "mime/multipart"
"net/http"
"path/filepath"
"strconv"
@@ -37,14 +38,22 @@ func init() {
func writeJson(w http.ResponseWriter, r *http.Request, httpStatus int, obj interface{}) (err error) {
var bytes []byte
- if r.FormValue("pretty") != "" {
- bytes, err = json.MarshalIndent(obj, "", " ")
- } else {
- bytes, err = json.Marshal(obj)
+ if obj != nil {
+ if r.FormValue("pretty") != "" {
+ bytes, err = json.MarshalIndent(obj, "", " ")
+ } else {
+ bytes, err = json.Marshal(obj)
+ }
}
if err != nil {
return
}
+
+ if httpStatus >= 400 {
+ glog.V(0).Infof("response method:%s URL:%s with httpStatus:%d and JSON:%s",
+ r.Method, r.URL.String(), httpStatus, string(bytes))
+ }
+
callback := r.FormValue("callback")
if callback == "" {
w.Header().Set("Content-Type", "application/json")
@@ -77,7 +86,8 @@ func writeJson(w http.ResponseWriter, r *http.Request, httpStatus int, obj inter
// wrapper for writeJson - just logs errors
func writeJsonQuiet(w http.ResponseWriter, r *http.Request, httpStatus int, obj interface{}) {
if err := writeJson(w, r, httpStatus, obj); err != nil {
- glog.V(0).Infof("error writing JSON %+v status %d: %v", obj, httpStatus, err)
+ glog.V(0).Infof("error writing JSON status %d: %v", httpStatus, err)
+ glog.V(1).Infof("JSON content: %+v", obj)
}
}
func writeJsonError(w http.ResponseWriter, r *http.Request, httpStatus int, err error) {
@@ -90,7 +100,7 @@ func debug(params ...interface{}) {
glog.V(4).Infoln(params...)
}
-func submitForClientHandler(w http.ResponseWriter, r *http.Request, masterUrl string, grpcDialOption grpc.DialOption) {
+func submitForClientHandler(w http.ResponseWriter, r *http.Request, masterFn operation.GetMasterFn, grpcDialOption grpc.DialOption) {
m := make(map[string]interface{})
if r.Method != "POST" {
writeJsonError(w, r, http.StatusMethodNotAllowed, errors.New("Only submit via POST!"))
@@ -98,13 +108,13 @@ func submitForClientHandler(w http.ResponseWriter, r *http.Request, masterUrl st
}
debug("parsing upload file...")
- fname, data, mimeType, pairMap, isGzipped, originalDataSize, lastModified, _, _, pe := needle.ParseUpload(r)
+ pu, pe := needle.ParseUpload(r, 256*1024*1024)
if pe != nil {
writeJsonError(w, r, http.StatusBadRequest, pe)
return
}
- debug("assigning file id for", fname)
+ debug("assigning file id for", pu.FileName)
r.ParseForm()
count := uint64(1)
if r.FormValue("count") != "" {
@@ -117,32 +127,34 @@ func submitForClientHandler(w http.ResponseWriter, r *http.Request, masterUrl st
ar := &operation.VolumeAssignRequest{
Count: count,
DataCenter: r.FormValue("dataCenter"),
+ Rack: r.FormValue("rack"),
Replication: r.FormValue("replication"),
Collection: r.FormValue("collection"),
Ttl: r.FormValue("ttl"),
+ DiskType: r.FormValue("disk"),
}
- assignResult, ae := operation.Assign(masterUrl, grpcDialOption, ar)
+ assignResult, ae := operation.Assign(masterFn, grpcDialOption, ar)
if ae != nil {
writeJsonError(w, r, http.StatusInternalServerError, ae)
return
}
url := "http://" + assignResult.Url + "/" + assignResult.Fid
- if lastModified != 0 {
- url = url + "?ts=" + strconv.FormatUint(lastModified, 10)
+ if pu.ModifiedTime != 0 {
+ url = url + "?ts=" + strconv.FormatUint(pu.ModifiedTime, 10)
}
debug("upload file to store", url)
- uploadResult, err := operation.Upload(url, fname, bytes.NewReader(data), isGzipped, mimeType, pairMap, assignResult.Auth)
+ uploadResult, err := operation.UploadData(url, pu.FileName, false, pu.Data, pu.IsGzipped, pu.MimeType, pu.PairMap, assignResult.Auth)
if err != nil {
writeJsonError(w, r, http.StatusInternalServerError, err)
return
}
- m["fileName"] = fname
+ m["fileName"] = pu.FileName
m["fid"] = assignResult.Fid
m["fileUrl"] = assignResult.PublicUrl + "/" + assignResult.Fid
- m["size"] = originalDataSize
+ m["size"] = pu.OriginalDataSize
m["eTag"] = uploadResult.ETag
writeJsonQuiet(w, r, http.StatusCreated, m)
return
@@ -183,19 +195,19 @@ func parseURLPath(path string) (vid, fid, filename, ext string, isVolumeIdOnly b
func statsHealthHandler(w http.ResponseWriter, r *http.Request) {
m := make(map[string]interface{})
- m["Version"] = util.VERSION
+ m["Version"] = util.Version()
writeJsonQuiet(w, r, http.StatusOK, m)
}
func statsCounterHandler(w http.ResponseWriter, r *http.Request) {
m := make(map[string]interface{})
- m["Version"] = util.VERSION
+ m["Version"] = util.Version()
m["Counters"] = serverStats
writeJsonQuiet(w, r, http.StatusOK, m)
}
func statsMemoryHandler(w http.ResponseWriter, r *http.Request) {
m := make(map[string]interface{})
- m["Version"] = util.VERSION
+ m["Version"] = util.Version()
m["Memory"] = stats.MemStat()
writeJsonQuiet(w, r, http.StatusOK, m)
}
@@ -209,3 +221,106 @@ func handleStaticResources2(r *mux.Router) {
r.Handle("/favicon.ico", http.FileServer(statikFS))
r.PathPrefix("/seaweedfsstatic/").Handler(http.StripPrefix("/seaweedfsstatic", http.FileServer(statikFS)))
}
+
+func adjustHeaderContentDisposition(w http.ResponseWriter, r *http.Request, filename string) {
+ if filename != "" {
+ contentDisposition := "inline"
+ if r.FormValue("dl") != "" {
+ if dl, _ := strconv.ParseBool(r.FormValue("dl")); dl {
+ contentDisposition = "attachment"
+ }
+ }
+ w.Header().Set("Content-Disposition", contentDisposition+`; filename="`+fileNameEscaper.Replace(filename)+`"`)
+ }
+}
+
+func processRangeRequest(r *http.Request, w http.ResponseWriter, totalSize int64, mimeType string, writeFn func(writer io.Writer, offset int64, size int64) error) {
+ rangeReq := r.Header.Get("Range")
+
+ if rangeReq == "" {
+ w.Header().Set("Content-Length", strconv.FormatInt(totalSize, 10))
+ if err := writeFn(w, 0, totalSize); err != nil {
+ http.Error(w, err.Error(), http.StatusInternalServerError)
+ return
+ }
+ return
+ }
+
+ //the rest is dealing with partial content request
+ //mostly copy from src/pkg/net/http/fs.go
+ ranges, err := parseRange(rangeReq, totalSize)
+ if err != nil {
+ http.Error(w, err.Error(), http.StatusRequestedRangeNotSatisfiable)
+ return
+ }
+ if sumRangesSize(ranges) > totalSize {
+ // The total number of bytes in all the ranges
+ // is larger than the size of the file by
+ // itself, so this is probably an attack, or a
+ // dumb client. Ignore the range request.
+ return
+ }
+ if len(ranges) == 0 {
+ return
+ }
+ if len(ranges) == 1 {
+ // RFC 2616, Section 14.16:
+ // "When an HTTP message includes the content of a single
+ // range (for example, a response to a request for a
+ // single range, or to a request for a set of ranges
+ // that overlap without any holes), this content is
+ // transmitted with a Content-Range header, and a
+ // Content-Length header showing the number of bytes
+ // actually transferred.
+ // ...
+ // A response to a request for a single range MUST NOT
+ // be sent using the multipart/byteranges media type."
+ ra := ranges[0]
+ w.Header().Set("Content-Length", strconv.FormatInt(ra.length, 10))
+ w.Header().Set("Content-Range", ra.contentRange(totalSize))
+
+ err = writeFn(w, ra.start, ra.length)
+ if err != nil {
+ http.Error(w, err.Error(), http.StatusInternalServerError)
+ return
+ }
+ return
+ }
+
+ // process multiple ranges
+ for _, ra := range ranges {
+ if ra.start > totalSize {
+ http.Error(w, "Out of Range", http.StatusRequestedRangeNotSatisfiable)
+ return
+ }
+ }
+ sendSize := rangesMIMESize(ranges, mimeType, totalSize)
+ pr, pw := io.Pipe()
+ mw := multipart.NewWriter(pw)
+ w.Header().Set("Content-Type", "multipart/byteranges; boundary="+mw.Boundary())
+ sendContent := pr
+ defer pr.Close() // cause writing goroutine to fail and exit if CopyN doesn't finish.
+ go func() {
+ for _, ra := range ranges {
+ part, e := mw.CreatePart(ra.mimeHeader(mimeType, totalSize))
+ if e != nil {
+ pw.CloseWithError(e)
+ return
+ }
+ if e = writeFn(part, ra.start, ra.length); e != nil {
+ pw.CloseWithError(e)
+ return
+ }
+ }
+ mw.Close()
+ pw.Close()
+ }()
+ if w.Header().Get("Content-Encoding") == "" {
+ w.Header().Set("Content-Length", strconv.FormatInt(sendSize, 10))
+ }
+ w.WriteHeader(http.StatusPartialContent)
+ if _, err := io.CopyN(w, sendContent, sendSize); err != nil {
+ http.Error(w, "Internal Error", http.StatusInternalServerError)
+ return
+ }
+}
diff --git a/weed/server/filer_grpc_server.go b/weed/server/filer_grpc_server.go
index a84feec2d..3821de6a9 100644
--- a/weed/server/filer_grpc_server.go
+++ b/weed/server/filer_grpc_server.go
@@ -6,89 +6,98 @@ import (
"os"
"path/filepath"
"strconv"
- "strings"
"time"
- "github.com/chrislusf/seaweedfs/weed/filer2"
+ "github.com/chrislusf/seaweedfs/weed/filer"
"github.com/chrislusf/seaweedfs/weed/glog"
"github.com/chrislusf/seaweedfs/weed/operation"
"github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
"github.com/chrislusf/seaweedfs/weed/pb/master_pb"
+ "github.com/chrislusf/seaweedfs/weed/storage/needle"
+ "github.com/chrislusf/seaweedfs/weed/util"
)
func (fs *FilerServer) LookupDirectoryEntry(ctx context.Context, req *filer_pb.LookupDirectoryEntryRequest) (*filer_pb.LookupDirectoryEntryResponse, error) {
- entry, err := fs.filer.FindEntry(ctx, filer2.FullPath(filepath.ToSlash(filepath.Join(req.Directory, req.Name))))
+ glog.V(4).Infof("LookupDirectoryEntry %s", filepath.Join(req.Directory, req.Name))
+
+ entry, err := fs.filer.FindEntry(ctx, util.JoinPath(req.Directory, req.Name))
+ if err == filer_pb.ErrNotFound {
+ return &filer_pb.LookupDirectoryEntryResponse{}, err
+ }
if err != nil {
+ glog.V(3).Infof("LookupDirectoryEntry %s: %+v, ", filepath.Join(req.Directory, req.Name), err)
return nil, err
}
return &filer_pb.LookupDirectoryEntryResponse{
Entry: &filer_pb.Entry{
- Name: req.Name,
- IsDirectory: entry.IsDirectory(),
- Attributes: filer2.EntryAttributeToPb(entry),
- Chunks: entry.Chunks,
- Extended: entry.Extended,
+ Name: req.Name,
+ IsDirectory: entry.IsDirectory(),
+ Attributes: filer.EntryAttributeToPb(entry),
+ Chunks: entry.Chunks,
+ Extended: entry.Extended,
+ HardLinkId: entry.HardLinkId,
+ HardLinkCounter: entry.HardLinkCounter,
+ Content: entry.Content,
},
}, nil
}
-func (fs *FilerServer) ListEntries(req *filer_pb.ListEntriesRequest, stream filer_pb.SeaweedFiler_ListEntriesServer) error {
+func (fs *FilerServer) ListEntries(req *filer_pb.ListEntriesRequest, stream filer_pb.SeaweedFiler_ListEntriesServer) (err error) {
+
+ glog.V(4).Infof("ListEntries %v", req)
limit := int(req.Limit)
if limit == 0 {
limit = fs.option.DirListingLimit
}
- paginationLimit := filer2.PaginationSize
+ paginationLimit := filer.PaginationSize
if limit < paginationLimit {
paginationLimit = limit
}
lastFileName := req.StartFromFileName
includeLastFile := req.InclusiveStartFrom
+ var listErr error
for limit > 0 {
- entries, err := fs.filer.ListDirectoryEntries(stream.Context(), filer2.FullPath(req.Directory), lastFileName, includeLastFile, paginationLimit)
- if err != nil {
- return err
- }
- if len(entries) == 0 {
- return nil
- }
-
- includeLastFile = false
-
- for _, entry := range entries {
-
- lastFileName = entry.Name()
-
- if req.Prefix != "" {
- if !strings.HasPrefix(entry.Name(), req.Prefix) {
- continue
- }
- }
-
- if err := stream.Send(&filer_pb.ListEntriesResponse{
+ var hasEntries bool
+ lastFileName, listErr = fs.filer.StreamListDirectoryEntries(stream.Context(), util.FullPath(req.Directory), lastFileName, includeLastFile, int64(paginationLimit), req.Prefix, "", "", func(entry *filer.Entry) bool {
+ hasEntries = true
+ if err = stream.Send(&filer_pb.ListEntriesResponse{
Entry: &filer_pb.Entry{
- Name: entry.Name(),
- IsDirectory: entry.IsDirectory(),
- Chunks: entry.Chunks,
- Attributes: filer2.EntryAttributeToPb(entry),
- Extended: entry.Extended,
+ Name: entry.Name(),
+ IsDirectory: entry.IsDirectory(),
+ Chunks: entry.Chunks,
+ Attributes: filer.EntryAttributeToPb(entry),
+ Extended: entry.Extended,
+ HardLinkId: entry.HardLinkId,
+ HardLinkCounter: entry.HardLinkCounter,
+ Content: entry.Content,
},
}); err != nil {
- return err
+ return false
}
+
limit--
if limit == 0 {
- return nil
+ return false
}
- }
+ return true
+ })
- if len(entries) < paginationLimit {
- break
+ if listErr != nil {
+ return listErr
+ }
+ if err != nil {
+ return err
}
+ if !hasEntries {
+ return nil
+ }
+
+ includeLastFile = false
}
@@ -126,46 +135,75 @@ func (fs *FilerServer) LookupVolume(ctx context.Context, req *filer_pb.LookupVol
return resp, nil
}
+func (fs *FilerServer) lookupFileId(fileId string) (targetUrls []string, err error) {
+ fid, err := needle.ParseFileIdFromString(fileId)
+ if err != nil {
+ return nil, err
+ }
+ locations, found := fs.filer.MasterClient.GetLocations(uint32(fid.VolumeId))
+ if !found || len(locations) == 0 {
+ return nil, fmt.Errorf("not found volume %d in %s", fid.VolumeId, fileId)
+ }
+ for _, loc := range locations {
+ targetUrls = append(targetUrls, fmt.Sprintf("http://%s/%s", loc.Url, fileId))
+ }
+ return
+}
+
func (fs *FilerServer) CreateEntry(ctx context.Context, req *filer_pb.CreateEntryRequest) (resp *filer_pb.CreateEntryResponse, err error) {
- fullpath := filer2.FullPath(filepath.ToSlash(filepath.Join(req.Directory, req.Entry.Name)))
- chunks, garbages := filer2.CompactFileChunks(req.Entry.Chunks)
+ glog.V(4).Infof("CreateEntry %v/%v", req.Directory, req.Entry.Name)
- if req.Entry.Attributes == nil {
- return nil, fmt.Errorf("can not create entry with empty attributes")
- }
+ resp = &filer_pb.CreateEntryResponse{}
- err = fs.filer.CreateEntry(ctx, &filer2.Entry{
- FullPath: fullpath,
- Attr: filer2.PbToEntryAttribute(req.Entry.Attributes),
- Chunks: chunks,
- })
+ chunks, garbage, err2 := fs.cleanupChunks(util.Join(req.Directory, req.Entry.Name), nil, req.Entry)
+ if err2 != nil {
+ return &filer_pb.CreateEntryResponse{}, fmt.Errorf("CreateEntry cleanupChunks %s %s: %v", req.Directory, req.Entry.Name, err2)
+ }
- if err == nil {
- fs.filer.DeleteChunks(garbages)
+ createErr := fs.filer.CreateEntry(ctx, &filer.Entry{
+ FullPath: util.JoinPath(req.Directory, req.Entry.Name),
+ Attr: filer.PbToEntryAttribute(req.Entry.Attributes),
+ Chunks: chunks,
+ Extended: req.Entry.Extended,
+ HardLinkId: filer.HardLinkId(req.Entry.HardLinkId),
+ HardLinkCounter: req.Entry.HardLinkCounter,
+ Content: req.Entry.Content,
+ }, req.OExcl, req.IsFromOtherCluster, req.Signatures)
+
+ if createErr == nil {
+ fs.filer.DeleteChunks(garbage)
+ } else {
+ glog.V(3).Infof("CreateEntry %s: %v", filepath.Join(req.Directory, req.Entry.Name), createErr)
+ resp.Error = createErr.Error()
}
- return &filer_pb.CreateEntryResponse{}, err
+ return
}
func (fs *FilerServer) UpdateEntry(ctx context.Context, req *filer_pb.UpdateEntryRequest) (*filer_pb.UpdateEntryResponse, error) {
- fullpath := filepath.ToSlash(filepath.Join(req.Directory, req.Entry.Name))
- entry, err := fs.filer.FindEntry(ctx, filer2.FullPath(fullpath))
+ glog.V(4).Infof("UpdateEntry %v", req)
+
+ fullpath := util.Join(req.Directory, req.Entry.Name)
+ entry, err := fs.filer.FindEntry(ctx, util.FullPath(fullpath))
if err != nil {
return &filer_pb.UpdateEntryResponse{}, fmt.Errorf("not found %s: %v", fullpath, err)
}
- // remove old chunks if not included in the new ones
- unusedChunks := filer2.MinusChunks(entry.Chunks, req.Entry.Chunks)
-
- chunks, garbages := filer2.CompactFileChunks(req.Entry.Chunks)
+ chunks, garbage, err2 := fs.cleanupChunks(fullpath, entry, req.Entry)
+ if err2 != nil {
+ return &filer_pb.UpdateEntryResponse{}, fmt.Errorf("UpdateEntry cleanupChunks %s: %v", fullpath, err2)
+ }
- newEntry := &filer2.Entry{
- FullPath: filer2.FullPath(filepath.ToSlash(filepath.Join(req.Directory, req.Entry.Name))),
- Attr: entry.Attr,
- Extended: req.Entry.Extended,
- Chunks: chunks,
+ newEntry := &filer.Entry{
+ FullPath: util.JoinPath(req.Directory, req.Entry.Name),
+ Attr: entry.Attr,
+ Extended: req.Entry.Extended,
+ Chunks: chunks,
+ HardLinkId: filer.HardLinkId(req.Entry.HardLinkId),
+ HardLinkCounter: req.Entry.HardLinkCounter,
+ Content: req.Entry.Content,
}
glog.V(3).Infof("updating %s: %+v, chunks %d: %v => %+v, chunks %d: %v, extended: %v => %v",
@@ -188,76 +226,166 @@ func (fs *FilerServer) UpdateEntry(ctx context.Context, req *filer_pb.UpdateEntr
}
- if filer2.EqualEntry(entry, newEntry) {
+ if filer.EqualEntry(entry, newEntry) {
return &filer_pb.UpdateEntryResponse{}, err
}
if err = fs.filer.UpdateEntry(ctx, entry, newEntry); err == nil {
- fs.filer.DeleteChunks(unusedChunks)
- fs.filer.DeleteChunks(garbages)
- }
+ fs.filer.DeleteChunks(garbage)
+
+ fs.filer.NotifyUpdateEvent(ctx, entry, newEntry, true, req.IsFromOtherCluster, req.Signatures)
- fs.filer.NotifyUpdateEvent(entry, newEntry, true)
+ } else {
+ glog.V(3).Infof("UpdateEntry %s: %v", filepath.Join(req.Directory, req.Entry.Name), err)
+ }
return &filer_pb.UpdateEntryResponse{}, err
}
-func (fs *FilerServer) DeleteEntry(ctx context.Context, req *filer_pb.DeleteEntryRequest) (resp *filer_pb.DeleteEntryResponse, err error) {
- err = fs.filer.DeleteEntryMetaAndData(ctx, filer2.FullPath(filepath.ToSlash(filepath.Join(req.Directory, req.Name))), req.IsRecursive, req.IgnoreRecursiveError, req.IsDeleteData)
- return &filer_pb.DeleteEntryResponse{}, err
-}
+func (fs *FilerServer) cleanupChunks(fullpath string, existingEntry *filer.Entry, newEntry *filer_pb.Entry) (chunks, garbage []*filer_pb.FileChunk, err error) {
-func (fs *FilerServer) AssignVolume(ctx context.Context, req *filer_pb.AssignVolumeRequest) (resp *filer_pb.AssignVolumeResponse, err error) {
+ // remove old chunks if not included in the new ones
+ if existingEntry != nil {
+ garbage, err = filer.MinusChunks(fs.lookupFileId, existingEntry.Chunks, newEntry.Chunks)
+ if err != nil {
+ return newEntry.Chunks, nil, fmt.Errorf("MinusChunks: %v", err)
+ }
+ }
- ttlStr := ""
- if req.TtlSec > 0 {
- ttlStr = strconv.Itoa(int(req.TtlSec))
+ // files with manifest chunks are usually large and append only, skip calculating covered chunks
+ manifestChunks, nonManifestChunks := filer.SeparateManifestChunks(newEntry.Chunks)
+
+ chunks, coveredChunks := filer.CompactFileChunks(fs.lookupFileId, nonManifestChunks)
+ garbage = append(garbage, coveredChunks...)
+
+ if newEntry.Attributes != nil {
+ so := fs.detectStorageOption(fullpath,
+ newEntry.Attributes.Collection,
+ newEntry.Attributes.Replication,
+ newEntry.Attributes.TtlSec,
+ newEntry.Attributes.DiskType,
+ "",
+ "",
+ )
+ chunks, err = filer.MaybeManifestize(fs.saveAsChunk(so), chunks)
+ if err != nil {
+ // not good, but should be ok
+ glog.V(0).Infof("MaybeManifestize: %v", err)
+ }
}
- var altRequest *operation.VolumeAssignRequest
+ chunks = append(chunks, manifestChunks...)
+
+ return
+}
- dataCenter := req.DataCenter
- if dataCenter == "" {
- dataCenter = fs.option.DataCenter
+func (fs *FilerServer) AppendToEntry(ctx context.Context, req *filer_pb.AppendToEntryRequest) (*filer_pb.AppendToEntryResponse, error) {
+
+ glog.V(4).Infof("AppendToEntry %v", req)
+
+ fullpath := util.NewFullPath(req.Directory, req.EntryName)
+ var offset int64 = 0
+ entry, err := fs.filer.FindEntry(ctx, fullpath)
+ if err == filer_pb.ErrNotFound {
+ entry = &filer.Entry{
+ FullPath: fullpath,
+ Attr: filer.Attr{
+ Crtime: time.Now(),
+ Mtime: time.Now(),
+ Mode: os.FileMode(0644),
+ Uid: OS_UID,
+ Gid: OS_GID,
+ },
+ }
+ } else {
+ offset = int64(filer.TotalSize(entry.Chunks))
}
- assignRequest := &operation.VolumeAssignRequest{
- Count: uint64(req.Count),
- Replication: req.Replication,
- Collection: req.Collection,
- Ttl: ttlStr,
- DataCenter: dataCenter,
+ for _, chunk := range req.Chunks {
+ chunk.Offset = offset
+ offset += int64(chunk.Size)
}
- if dataCenter != "" {
- altRequest = &operation.VolumeAssignRequest{
- Count: uint64(req.Count),
- Replication: req.Replication,
- Collection: req.Collection,
- Ttl: ttlStr,
- DataCenter: "",
- }
+
+ entry.Chunks = append(entry.Chunks, req.Chunks...)
+ so := fs.detectStorageOption(string(fullpath), entry.Collection, entry.Replication, entry.TtlSec, entry.DiskType, "", "")
+ entry.Chunks, err = filer.MaybeManifestize(fs.saveAsChunk(so), entry.Chunks)
+ if err != nil {
+ // not good, but should be ok
+ glog.V(0).Infof("MaybeManifestize: %v", err)
}
- assignResult, err := operation.Assign(fs.filer.GetMaster(), fs.grpcDialOption, assignRequest, altRequest)
+
+ err = fs.filer.CreateEntry(context.Background(), entry, false, false, nil)
+
+ return &filer_pb.AppendToEntryResponse{}, err
+}
+
+func (fs *FilerServer) DeleteEntry(ctx context.Context, req *filer_pb.DeleteEntryRequest) (resp *filer_pb.DeleteEntryResponse, err error) {
+
+ glog.V(4).Infof("DeleteEntry %v", req)
+
+ err = fs.filer.DeleteEntryMetaAndData(ctx, util.JoinPath(req.Directory, req.Name), req.IsRecursive, req.IgnoreRecursiveError, req.IsDeleteData, req.IsFromOtherCluster, req.Signatures)
+ resp = &filer_pb.DeleteEntryResponse{}
+ if err != nil && err != filer_pb.ErrNotFound {
+ resp.Error = err.Error()
+ }
+ return resp, nil
+}
+
+func (fs *FilerServer) AssignVolume(ctx context.Context, req *filer_pb.AssignVolumeRequest) (resp *filer_pb.AssignVolumeResponse, err error) {
+
+ so := fs.detectStorageOption(req.Path, req.Collection, req.Replication, req.TtlSec, req.DiskType, req.DataCenter, req.Rack)
+
+ assignRequest, altRequest := so.ToAssignRequests(int(req.Count))
+
+ assignResult, err := operation.Assign(fs.filer.GetMaster, fs.grpcDialOption, assignRequest, altRequest)
if err != nil {
- return nil, fmt.Errorf("assign volume: %v", err)
+ glog.V(3).Infof("AssignVolume: %v", err)
+ return &filer_pb.AssignVolumeResponse{Error: fmt.Sprintf("assign volume: %v", err)}, nil
}
if assignResult.Error != "" {
- return nil, fmt.Errorf("assign volume result: %v", assignResult.Error)
+ glog.V(3).Infof("AssignVolume error: %v", assignResult.Error)
+ return &filer_pb.AssignVolumeResponse{Error: fmt.Sprintf("assign volume result: %v", assignResult.Error)}, nil
}
return &filer_pb.AssignVolumeResponse{
- FileId: assignResult.Fid,
- Count: int32(assignResult.Count),
- Url: assignResult.Url,
- PublicUrl: assignResult.PublicUrl,
- Auth: string(assignResult.Auth),
- }, err
+ FileId: assignResult.Fid,
+ Count: int32(assignResult.Count),
+ Url: assignResult.Url,
+ PublicUrl: assignResult.PublicUrl,
+ Auth: string(assignResult.Auth),
+ Collection: so.Collection,
+ Replication: so.Replication,
+ }, nil
+}
+
+func (fs *FilerServer) CollectionList(ctx context.Context, req *filer_pb.CollectionListRequest) (resp *filer_pb.CollectionListResponse, err error) {
+
+ glog.V(4).Infof("CollectionList %v", req)
+ resp = &filer_pb.CollectionListResponse{}
+
+ err = fs.filer.MasterClient.WithClient(func(client master_pb.SeaweedClient) error {
+ masterResp, err := client.CollectionList(context.Background(), &master_pb.CollectionListRequest{
+ IncludeNormalVolumes: req.IncludeNormalVolumes,
+ IncludeEcVolumes: req.IncludeEcVolumes,
+ })
+ if err != nil {
+ return err
+ }
+ for _, c := range masterResp.Collections {
+ resp.Collections = append(resp.Collections, &filer_pb.Collection{Name: c.Name})
+ }
+ return nil
+ })
+
+ return
}
func (fs *FilerServer) DeleteCollection(ctx context.Context, req *filer_pb.DeleteCollectionRequest) (resp *filer_pb.DeleteCollectionResponse, err error) {
- err = fs.filer.MasterClient.WithClient(ctx, func(client master_pb.SeaweedClient) error {
- _, err := client.CollectionDelete(ctx, &master_pb.CollectionDeleteRequest{
+ glog.V(4).Infof("DeleteCollection %v", req)
+
+ err = fs.filer.MasterClient.WithClient(func(client master_pb.SeaweedClient) error {
+ _, err := client.CollectionDelete(context.Background(), &master_pb.CollectionDeleteRequest{
Name: req.GetCollection(),
})
return err
@@ -268,13 +396,23 @@ func (fs *FilerServer) DeleteCollection(ctx context.Context, req *filer_pb.Delet
func (fs *FilerServer) Statistics(ctx context.Context, req *filer_pb.StatisticsRequest) (resp *filer_pb.StatisticsResponse, err error) {
- input := &master_pb.StatisticsRequest{
- Replication: req.Replication,
- Collection: req.Collection,
- Ttl: req.Ttl,
- }
+ var output *master_pb.StatisticsResponse
+
+ err = fs.filer.MasterClient.WithClient(func(masterClient master_pb.SeaweedClient) error {
+ grpcResponse, grpcErr := masterClient.Statistics(context.Background(), &master_pb.StatisticsRequest{
+ Replication: req.Replication,
+ Collection: req.Collection,
+ Ttl: req.Ttl,
+ DiskType: req.DiskType,
+ })
+ if grpcErr != nil {
+ return grpcErr
+ }
+
+ output = grpcResponse
+ return nil
+ })
- output, err := operation.Statistics(fs.filer.GetMaster(), fs.grpcDialOption, input)
if err != nil {
return nil, err
}
@@ -288,10 +426,91 @@ func (fs *FilerServer) Statistics(ctx context.Context, req *filer_pb.StatisticsR
func (fs *FilerServer) GetFilerConfiguration(ctx context.Context, req *filer_pb.GetFilerConfigurationRequest) (resp *filer_pb.GetFilerConfigurationResponse, err error) {
- return &filer_pb.GetFilerConfigurationResponse{
- Masters: fs.option.Masters,
- Collection: fs.option.Collection,
- Replication: fs.option.DefaultReplication,
- MaxMb: uint32(fs.option.MaxMB),
- }, nil
+ t := &filer_pb.GetFilerConfigurationResponse{
+ Masters: fs.option.Masters,
+ Collection: fs.option.Collection,
+ Replication: fs.option.DefaultReplication,
+ MaxMb: uint32(fs.option.MaxMB),
+ DirBuckets: fs.filer.DirBucketsPath,
+ Cipher: fs.filer.Cipher,
+ Signature: fs.filer.Signature,
+ MetricsAddress: fs.metricsAddress,
+ MetricsIntervalSec: int32(fs.metricsIntervalSec),
+ }
+
+ glog.V(4).Infof("GetFilerConfiguration: %v", t)
+
+ return t, nil
+}
+
+func (fs *FilerServer) KeepConnected(stream filer_pb.SeaweedFiler_KeepConnectedServer) error {
+
+ req, err := stream.Recv()
+ if err != nil {
+ return err
+ }
+
+ clientName := fmt.Sprintf("%s:%d", req.Name, req.GrpcPort)
+ m := make(map[string]bool)
+ for _, tp := range req.Resources {
+ m[tp] = true
+ }
+ fs.brokersLock.Lock()
+ fs.brokers[clientName] = m
+ glog.V(0).Infof("+ broker %v", clientName)
+ fs.brokersLock.Unlock()
+
+ defer func() {
+ fs.brokersLock.Lock()
+ delete(fs.brokers, clientName)
+ glog.V(0).Infof("- broker %v: %v", clientName, err)
+ fs.brokersLock.Unlock()
+ }()
+
+ for {
+ if err := stream.Send(&filer_pb.KeepConnectedResponse{}); err != nil {
+ glog.V(0).Infof("send broker %v: %+v", clientName, err)
+ return err
+ }
+ // println("replied")
+
+ if _, err := stream.Recv(); err != nil {
+ glog.V(0).Infof("recv broker %v: %v", clientName, err)
+ return err
+ }
+ // println("received")
+ }
+
+}
+
+func (fs *FilerServer) LocateBroker(ctx context.Context, req *filer_pb.LocateBrokerRequest) (resp *filer_pb.LocateBrokerResponse, err error) {
+
+ resp = &filer_pb.LocateBrokerResponse{}
+
+ fs.brokersLock.Lock()
+ defer fs.brokersLock.Unlock()
+
+ var localBrokers []*filer_pb.LocateBrokerResponse_Resource
+
+ for b, m := range fs.brokers {
+ if _, found := m[req.Resource]; found {
+ resp.Found = true
+ resp.Resources = []*filer_pb.LocateBrokerResponse_Resource{
+ {
+ GrpcAddresses: b,
+ ResourceCount: int32(len(m)),
+ },
+ }
+ return
+ }
+ localBrokers = append(localBrokers, &filer_pb.LocateBrokerResponse_Resource{
+ GrpcAddresses: b,
+ ResourceCount: int32(len(m)),
+ })
+ }
+
+ resp.Resources = localBrokers
+
+ return resp, nil
+
}
diff --git a/weed/server/filer_grpc_server_kv.go b/weed/server/filer_grpc_server_kv.go
new file mode 100644
index 000000000..3cb47115e
--- /dev/null
+++ b/weed/server/filer_grpc_server_kv.go
@@ -0,0 +1,42 @@
+package weed_server
+
+import (
+ "context"
+ "github.com/chrislusf/seaweedfs/weed/filer"
+ "github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
+)
+
+func (fs *FilerServer) KvGet(ctx context.Context, req *filer_pb.KvGetRequest) (*filer_pb.KvGetResponse, error) {
+
+ value, err := fs.filer.Store.KvGet(ctx, req.Key)
+ if err == filer.ErrKvNotFound {
+ return &filer_pb.KvGetResponse{}, nil
+ }
+
+ if err != nil {
+ return &filer_pb.KvGetResponse{Error: err.Error()}, nil
+ }
+
+ return &filer_pb.KvGetResponse{
+ Value: value,
+ }, nil
+
+}
+
+// KvPut sets the key~value. if empty value, delete the kv entry
+func (fs *FilerServer) KvPut(ctx context.Context, req *filer_pb.KvPutRequest) (*filer_pb.KvPutResponse, error) {
+
+ if len(req.Value) == 0 {
+ if err := fs.filer.Store.KvDelete(ctx, req.Key); err != nil {
+ return &filer_pb.KvPutResponse{Error: err.Error()}, nil
+ }
+ }
+
+ err := fs.filer.Store.KvPut(ctx, req.Key, req.Value)
+ if err != nil {
+ return &filer_pb.KvPutResponse{Error: err.Error()}, nil
+ }
+
+ return &filer_pb.KvPutResponse{}, nil
+
+}
diff --git a/weed/server/filer_grpc_server_rename.go b/weed/server/filer_grpc_server_rename.go
index dfa59e7fe..eadb970d5 100644
--- a/weed/server/filer_grpc_server_rename.go
+++ b/weed/server/filer_grpc_server_rename.go
@@ -3,61 +3,67 @@ package weed_server
import (
"context"
"fmt"
- "github.com/chrislusf/seaweedfs/weed/filer2"
+ "path/filepath"
+
+ "github.com/chrislusf/seaweedfs/weed/filer"
"github.com/chrislusf/seaweedfs/weed/glog"
"github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
- "path/filepath"
+ "github.com/chrislusf/seaweedfs/weed/util"
)
func (fs *FilerServer) AtomicRenameEntry(ctx context.Context, req *filer_pb.AtomicRenameEntryRequest) (*filer_pb.AtomicRenameEntryResponse, error) {
glog.V(1).Infof("AtomicRenameEntry %v", req)
+ oldParent := util.FullPath(filepath.ToSlash(req.OldDirectory))
+ newParent := util.FullPath(filepath.ToSlash(req.NewDirectory))
+
+ if err := fs.filer.CanRename(oldParent, newParent); err != nil {
+ return nil, err
+ }
+
ctx, err := fs.filer.BeginTransaction(ctx)
if err != nil {
return nil, err
}
- oldParent := filer2.FullPath(filepath.ToSlash(req.OldDirectory))
-
oldEntry, err := fs.filer.FindEntry(ctx, oldParent.Child(req.OldName))
if err != nil {
fs.filer.RollbackTransaction(ctx)
return nil, fmt.Errorf("%s/%s not found: %v", req.OldDirectory, req.OldName, err)
}
- var events MoveEvents
- moveErr := fs.moveEntry(ctx, oldParent, oldEntry, filer2.FullPath(filepath.ToSlash(req.NewDirectory)), req.NewName, &events)
+ moveErr := fs.moveEntry(ctx, oldParent, oldEntry, newParent, req.NewName)
if moveErr != nil {
fs.filer.RollbackTransaction(ctx)
- return nil, fmt.Errorf("%s/%s move error: %v", req.OldDirectory, req.OldName, err)
+ return nil, fmt.Errorf("%s/%s move error: %v", req.OldDirectory, req.OldName, moveErr)
} else {
if commitError := fs.filer.CommitTransaction(ctx); commitError != nil {
fs.filer.RollbackTransaction(ctx)
- return nil, fmt.Errorf("%s/%s move commit error: %v", req.OldDirectory, req.OldName, err)
+ return nil, fmt.Errorf("%s/%s move commit error: %v", req.OldDirectory, req.OldName, commitError)
}
}
- for _, entry := range events.newEntries {
- fs.filer.NotifyUpdateEvent(nil, entry, false)
- }
- for _, entry := range events.oldEntries {
- fs.filer.NotifyUpdateEvent(entry, nil, false)
- }
-
return &filer_pb.AtomicRenameEntryResponse{}, nil
}
-func (fs *FilerServer) moveEntry(ctx context.Context, oldParent filer2.FullPath, entry *filer2.Entry, newParent filer2.FullPath, newName string, events *MoveEvents) error {
- if entry.IsDirectory() {
- if err := fs.moveFolderSubEntries(ctx, oldParent, entry, newParent, newName, events); err != nil {
- return err
+func (fs *FilerServer) moveEntry(ctx context.Context, oldParent util.FullPath, entry *filer.Entry, newParent util.FullPath, newName string) error {
+
+ if err := fs.moveSelfEntry(ctx, oldParent, entry, newParent, newName, func() error {
+ if entry.IsDirectory() {
+ if err := fs.moveFolderSubEntries(ctx, oldParent, entry, newParent, newName); err != nil {
+ return err
+ }
}
+ return nil
+ }); err != nil {
+ return fmt.Errorf("fail to move %s => %s: %v", oldParent.Child(entry.Name()), newParent.Child(newName), err)
}
- return fs.moveSelfEntry(ctx, oldParent, entry, newParent, newName, events)
+
+ return nil
}
-func (fs *FilerServer) moveFolderSubEntries(ctx context.Context, oldParent filer2.FullPath, entry *filer2.Entry, newParent filer2.FullPath, newName string, events *MoveEvents) error {
+func (fs *FilerServer) moveFolderSubEntries(ctx context.Context, oldParent util.FullPath, entry *filer.Entry, newParent util.FullPath, newName string) error {
currentDirPath := oldParent.Child(entry.Name())
newDirPath := newParent.Child(newName)
@@ -68,7 +74,7 @@ func (fs *FilerServer) moveFolderSubEntries(ctx context.Context, oldParent filer
includeLastFile := false
for {
- entries, err := fs.filer.ListDirectoryEntries(ctx, currentDirPath, lastFileName, includeLastFile, 1024)
+ entries, hasMore, err := fs.filer.ListDirectoryEntries(ctx, currentDirPath, lastFileName, includeLastFile, 1024, "", "", "")
if err != nil {
return err
}
@@ -78,19 +84,19 @@ func (fs *FilerServer) moveFolderSubEntries(ctx context.Context, oldParent filer
for _, item := range entries {
lastFileName = item.Name()
// println("processing", lastFileName)
- err := fs.moveEntry(ctx, currentDirPath, item, newDirPath, item.Name(), events)
+ err := fs.moveEntry(ctx, currentDirPath, item, newDirPath, item.Name())
if err != nil {
return err
}
}
- if len(entries) < 1024 {
+ if !hasMore {
break
}
}
return nil
}
-func (fs *FilerServer) moveSelfEntry(ctx context.Context, oldParent filer2.FullPath, entry *filer2.Entry, newParent filer2.FullPath, newName string, events *MoveEvents) error {
+func (fs *FilerServer) moveSelfEntry(ctx context.Context, oldParent util.FullPath, entry *filer.Entry, newParent util.FullPath, newName string, moveFolderSubEntries func() error) error {
oldPath, newPath := oldParent.Child(entry.Name()), newParent.Child(newName)
@@ -102,29 +108,30 @@ func (fs *FilerServer) moveSelfEntry(ctx context.Context, oldParent filer2.FullP
}
// add to new directory
- newEntry := &filer2.Entry{
+ newEntry := &filer.Entry{
FullPath: newPath,
Attr: entry.Attr,
Chunks: entry.Chunks,
+ Extended: entry.Extended,
+ Content: entry.Content,
}
- createErr := fs.filer.CreateEntry(ctx, newEntry)
+ createErr := fs.filer.CreateEntry(ctx, newEntry, false, false, nil)
if createErr != nil {
return createErr
}
+ if moveFolderSubEntries != nil {
+ if moveChildrenErr := moveFolderSubEntries(); moveChildrenErr != nil {
+ return moveChildrenErr
+ }
+ }
+
// delete old entry
- deleteErr := fs.filer.DeleteEntryMetaAndData(ctx, oldPath, false, false, false)
+ deleteErr := fs.filer.DeleteEntryMetaAndData(ctx, oldPath, false, false, false, false, nil)
if deleteErr != nil {
return deleteErr
}
- events.oldEntries = append(events.oldEntries, entry)
- events.newEntries = append(events.newEntries, newEntry)
return nil
}
-
-type MoveEvents struct {
- oldEntries []*filer2.Entry
- newEntries []*filer2.Entry
-}
diff --git a/weed/server/filer_grpc_server_sub_meta.go b/weed/server/filer_grpc_server_sub_meta.go
new file mode 100644
index 000000000..d9f91b125
--- /dev/null
+++ b/weed/server/filer_grpc_server_sub_meta.go
@@ -0,0 +1,202 @@
+package weed_server
+
+import (
+ "fmt"
+ "github.com/chrislusf/seaweedfs/weed/util/log_buffer"
+ "strings"
+ "time"
+
+ "github.com/golang/protobuf/proto"
+
+ "github.com/chrislusf/seaweedfs/weed/filer"
+ "github.com/chrislusf/seaweedfs/weed/glog"
+ "github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
+ "github.com/chrislusf/seaweedfs/weed/util"
+)
+
+func (fs *FilerServer) SubscribeMetadata(req *filer_pb.SubscribeMetadataRequest, stream filer_pb.SeaweedFiler_SubscribeMetadataServer) error {
+
+ peerAddress := findClientAddress(stream.Context(), 0)
+
+ clientName := fs.addClient(req.ClientName, peerAddress)
+
+ defer fs.deleteClient(clientName)
+
+ lastReadTime := time.Unix(0, req.SinceNs)
+ glog.V(0).Infof(" %v starts to subscribe %s from %+v", clientName, req.PathPrefix, lastReadTime)
+
+ eachEventNotificationFn := fs.eachEventNotificationFn(req, stream, clientName, req.Signature)
+
+ eachLogEntryFn := eachLogEntryFn(eachEventNotificationFn)
+
+ var processedTsNs int64
+ var err error
+
+ for {
+
+ processedTsNs, err = fs.filer.ReadPersistedLogBuffer(lastReadTime, eachLogEntryFn)
+ if err != nil {
+ return fmt.Errorf("reading from persisted logs: %v", err)
+ }
+
+ if processedTsNs != 0 {
+ lastReadTime = time.Unix(0, processedTsNs)
+ }
+
+ lastReadTime, err = fs.filer.MetaAggregator.MetaLogBuffer.LoopProcessLogData(lastReadTime, func() bool {
+ fs.filer.MetaAggregator.ListenersLock.Lock()
+ fs.filer.MetaAggregator.ListenersCond.Wait()
+ fs.filer.MetaAggregator.ListenersLock.Unlock()
+ return true
+ }, eachLogEntryFn)
+ if err != nil {
+ if err == log_buffer.ResumeFromDiskError {
+ continue
+ }
+ glog.Errorf("processed to %v: %v", lastReadTime, err)
+ time.Sleep(3127 * time.Millisecond)
+ if err != log_buffer.ResumeError {
+ break
+ }
+ }
+ }
+
+ return err
+
+}
+
+func (fs *FilerServer) SubscribeLocalMetadata(req *filer_pb.SubscribeMetadataRequest, stream filer_pb.SeaweedFiler_SubscribeLocalMetadataServer) error {
+
+ peerAddress := findClientAddress(stream.Context(), 0)
+
+ clientName := fs.addClient(req.ClientName, peerAddress)
+
+ defer fs.deleteClient(clientName)
+
+ lastReadTime := time.Unix(0, req.SinceNs)
+ glog.V(0).Infof(" %v local subscribe %s from %+v", clientName, req.PathPrefix, lastReadTime)
+
+ eachEventNotificationFn := fs.eachEventNotificationFn(req, stream, clientName, req.Signature)
+
+ eachLogEntryFn := eachLogEntryFn(eachEventNotificationFn)
+
+ var processedTsNs int64
+ var err error
+
+ for {
+ // println("reading from persisted logs ...")
+ processedTsNs, err = fs.filer.ReadPersistedLogBuffer(lastReadTime, eachLogEntryFn)
+ if err != nil {
+ return fmt.Errorf("reading from persisted logs: %v", err)
+ }
+
+ if processedTsNs != 0 {
+ lastReadTime = time.Unix(0, processedTsNs)
+ }
+ // glog.V(0).Infof("after local log reads, %v local subscribe %s from %+v", clientName, req.PathPrefix, lastReadTime)
+
+ // println("reading from in memory logs ...")
+
+ lastReadTime, err = fs.filer.LocalMetaLogBuffer.LoopProcessLogData(lastReadTime, func() bool {
+ fs.listenersLock.Lock()
+ fs.listenersCond.Wait()
+ fs.listenersLock.Unlock()
+ return true
+ }, eachLogEntryFn)
+ if err != nil {
+ if err == log_buffer.ResumeFromDiskError {
+ continue
+ }
+ glog.Errorf("processed to %v: %v", lastReadTime, err)
+ time.Sleep(3127 * time.Millisecond)
+ if err != log_buffer.ResumeError {
+ break
+ }
+ }
+ }
+
+ return err
+
+}
+
+func eachLogEntryFn(eachEventNotificationFn func(dirPath string, eventNotification *filer_pb.EventNotification, tsNs int64) error) func(logEntry *filer_pb.LogEntry) error {
+ return func(logEntry *filer_pb.LogEntry) error {
+ event := &filer_pb.SubscribeMetadataResponse{}
+ if err := proto.Unmarshal(logEntry.Data, event); err != nil {
+ glog.Errorf("unexpected unmarshal filer_pb.SubscribeMetadataResponse: %v", err)
+ return fmt.Errorf("unexpected unmarshal filer_pb.SubscribeMetadataResponse: %v", err)
+ }
+
+ if err := eachEventNotificationFn(event.Directory, event.EventNotification, event.TsNs); err != nil {
+ return err
+ }
+
+ return nil
+ }
+}
+
+func (fs *FilerServer) eachEventNotificationFn(req *filer_pb.SubscribeMetadataRequest, stream filer_pb.SeaweedFiler_SubscribeMetadataServer, clientName string, clientSignature int32) func(dirPath string, eventNotification *filer_pb.EventNotification, tsNs int64) error {
+ return func(dirPath string, eventNotification *filer_pb.EventNotification, tsNs int64) error {
+
+ foundSelf := false
+ for _, sig := range eventNotification.Signatures {
+ if sig == clientSignature && clientSignature != 0 {
+ return nil
+ }
+ if sig == fs.filer.Signature {
+ foundSelf = true
+ }
+ }
+ if !foundSelf {
+ eventNotification.Signatures = append(eventNotification.Signatures, fs.filer.Signature)
+ }
+
+ // get complete path to the file or directory
+ var entryName string
+ if eventNotification.OldEntry != nil {
+ entryName = eventNotification.OldEntry.Name
+ } else if eventNotification.NewEntry != nil {
+ entryName = eventNotification.NewEntry.Name
+ }
+
+ fullpath := util.Join(dirPath, entryName)
+
+ // skip on filer internal meta logs
+ if strings.HasPrefix(fullpath, filer.SystemLogDir) {
+ return nil
+ }
+
+ if !strings.HasPrefix(fullpath, req.PathPrefix) {
+ if eventNotification.NewParentPath != "" {
+ newFullPath := util.Join(eventNotification.NewParentPath, entryName)
+ if !strings.HasPrefix(newFullPath, req.PathPrefix) {
+ return nil
+ }
+ } else {
+ return nil
+ }
+ }
+
+ message := &filer_pb.SubscribeMetadataResponse{
+ Directory: dirPath,
+ EventNotification: eventNotification,
+ TsNs: tsNs,
+ }
+ // println("sending", dirPath, entryName)
+ if err := stream.Send(message); err != nil {
+ glog.V(0).Infof("=> client %v: %+v", clientName, err)
+ return err
+ }
+ return nil
+ }
+}
+
+func (fs *FilerServer) addClient(clientType string, clientAddress string) (clientName string) {
+ clientName = clientType + "@" + clientAddress
+ glog.V(0).Infof("+ listener %v", clientName)
+ return
+}
+
+func (fs *FilerServer) deleteClient(clientName string) {
+ glog.V(0).Infof("- listener %v", clientName)
+}
diff --git a/weed/server/filer_server.go b/weed/server/filer_server.go
index 41ba81366..2734223ea 100644
--- a/weed/server/filer_server.go
+++ b/weed/server/filer_server.go
@@ -5,23 +5,35 @@ import (
"fmt"
"net/http"
"os"
+ "sync"
"time"
+ "github.com/chrislusf/seaweedfs/weed/stats"
+
+ "google.golang.org/grpc"
+
+ "github.com/chrislusf/seaweedfs/weed/util/grace"
+
"github.com/chrislusf/seaweedfs/weed/operation"
+ "github.com/chrislusf/seaweedfs/weed/pb"
"github.com/chrislusf/seaweedfs/weed/pb/master_pb"
- "github.com/chrislusf/seaweedfs/weed/stats"
"github.com/chrislusf/seaweedfs/weed/util"
- "google.golang.org/grpc"
- "github.com/chrislusf/seaweedfs/weed/filer2"
- _ "github.com/chrislusf/seaweedfs/weed/filer2/cassandra"
- _ "github.com/chrislusf/seaweedfs/weed/filer2/etcd"
- _ "github.com/chrislusf/seaweedfs/weed/filer2/leveldb"
- _ "github.com/chrislusf/seaweedfs/weed/filer2/leveldb2"
- _ "github.com/chrislusf/seaweedfs/weed/filer2/mysql"
- _ "github.com/chrislusf/seaweedfs/weed/filer2/postgres"
- _ "github.com/chrislusf/seaweedfs/weed/filer2/redis"
- _ "github.com/chrislusf/seaweedfs/weed/filer2/tikv"
+ "github.com/chrislusf/seaweedfs/weed/filer"
+ _ "github.com/chrislusf/seaweedfs/weed/filer/cassandra"
+ _ "github.com/chrislusf/seaweedfs/weed/filer/elastic/v7"
+ _ "github.com/chrislusf/seaweedfs/weed/filer/etcd"
+ _ "github.com/chrislusf/seaweedfs/weed/filer/hbase"
+ _ "github.com/chrislusf/seaweedfs/weed/filer/leveldb"
+ _ "github.com/chrislusf/seaweedfs/weed/filer/leveldb2"
+ _ "github.com/chrislusf/seaweedfs/weed/filer/leveldb3"
+ _ "github.com/chrislusf/seaweedfs/weed/filer/mongodb"
+ _ "github.com/chrislusf/seaweedfs/weed/filer/mysql"
+ _ "github.com/chrislusf/seaweedfs/weed/filer/mysql2"
+ _ "github.com/chrislusf/seaweedfs/weed/filer/postgres"
+ _ "github.com/chrislusf/seaweedfs/weed/filer/postgres2"
+ _ "github.com/chrislusf/seaweedfs/weed/filer/redis"
+ _ "github.com/chrislusf/seaweedfs/weed/filer/redis2"
"github.com/chrislusf/seaweedfs/weed/glog"
"github.com/chrislusf/seaweedfs/weed/notification"
_ "github.com/chrislusf/seaweedfs/weed/notification/aws_sqs"
@@ -30,46 +42,74 @@ import (
_ "github.com/chrislusf/seaweedfs/weed/notification/kafka"
_ "github.com/chrislusf/seaweedfs/weed/notification/log"
"github.com/chrislusf/seaweedfs/weed/security"
- "github.com/spf13/viper"
)
type FilerOption struct {
- Masters []string
- Collection string
- DefaultReplication string
- RedirectOnRead bool
- DisableDirListing bool
- MaxMB int
- DirListingLimit int
- DataCenter string
- DefaultLevelDbDir string
- DisableHttp bool
- Port int
+ Masters []string
+ Collection string
+ DefaultReplication string
+ DisableDirListing bool
+ MaxMB int
+ DirListingLimit int
+ DataCenter string
+ Rack string
+ DefaultLevelDbDir string
+ DisableHttp bool
+ Host string
+ Port uint32
+ recursiveDelete bool
+ Cipher bool
+ SaveToFilerLimit int64
+ Filers []string
+ ConcurrentUploadLimit int64
}
type FilerServer struct {
option *FilerOption
secret security.SigningKey
- filer *filer2.Filer
+ filer *filer.Filer
grpcDialOption grpc.DialOption
+
+ // metrics read from the master
+ metricsAddress string
+ metricsIntervalSec int
+
+ // notifying clients
+ listenersLock sync.Mutex
+ listenersCond *sync.Cond
+
+ brokers map[string]map[string]bool
+ brokersLock sync.Mutex
+
+ inFlightDataSize int64
+ inFlightDataLimitCond *sync.Cond
}
func NewFilerServer(defaultMux, readonlyMux *http.ServeMux, option *FilerOption) (fs *FilerServer, err error) {
fs = &FilerServer{
- option: option,
- grpcDialOption: security.LoadClientTLS(viper.Sub("grpc"), "filer"),
+ option: option,
+ grpcDialOption: security.LoadClientTLS(util.GetViper(), "grpc.filer"),
+ brokers: make(map[string]map[string]bool),
+ inFlightDataLimitCond: sync.NewCond(new(sync.Mutex)),
}
+ fs.listenersCond = sync.NewCond(&fs.listenersLock)
if len(option.Masters) == 0 {
glog.Fatal("master list is required!")
}
- fs.filer = filer2.NewFiler(option.Masters, fs.grpcDialOption)
+ fs.filer = filer.NewFiler(option.Masters, fs.grpcDialOption, option.Host, option.Port, option.Collection, option.DefaultReplication, option.DataCenter, func() {
+ fs.listenersCond.Broadcast()
+ })
+ fs.filer.Cipher = option.Cipher
+
+ fs.checkWithMaster()
+ go stats.LoopPushingMetric("filer", stats.SourceName(fs.option.Port), fs.metricsAddress, fs.metricsIntervalSec)
go fs.filer.KeepConnectedToMaster()
- v := viper.GetViper()
+ v := util.GetViper()
if !util.LoadConfiguration("filer", false) {
v.Set("leveldb2.enabled", true)
v.Set("leveldb2.dir", option.DefaultLevelDbDir)
@@ -77,56 +117,73 @@ func NewFilerServer(defaultMux, readonlyMux *http.ServeMux, option *FilerOption)
if os.IsNotExist(err) {
os.MkdirAll(option.DefaultLevelDbDir, 0755)
}
+ glog.V(0).Infof("default to create filer store dir in %s", option.DefaultLevelDbDir)
+ } else {
+ glog.Warningf("skipping default store dir in %s", option.DefaultLevelDbDir)
}
util.LoadConfiguration("notification", false)
+ fs.option.recursiveDelete = v.GetBool("filer.options.recursive_delete")
+ v.SetDefault("filer.options.buckets_folder", "/buckets")
+ fs.filer.DirBucketsPath = v.GetString("filer.options.buckets_folder")
+ // TODO deprecated, will be be removed after 2020-12-31
+ // replaced by https://github.com/chrislusf/seaweedfs/wiki/Path-Specific-Configuration
+ fs.filer.FsyncBuckets = v.GetStringSlice("filer.options.buckets_fsync")
fs.filer.LoadConfiguration(v)
- notification.LoadConfiguration(v.Sub("notification"))
+ notification.LoadConfiguration(v, "notification.")
handleStaticResources(defaultMux)
if !option.DisableHttp {
defaultMux.HandleFunc("/", fs.filerHandler)
}
if defaultMux != readonlyMux {
+ handleStaticResources(readonlyMux)
readonlyMux.HandleFunc("/", fs.readonlyFilerHandler)
}
- maybeStartMetrics(fs, option)
+ fs.filer.AggregateFromPeers(fmt.Sprintf("%s:%d", option.Host, option.Port), option.Filers)
+
+ fs.filer.LoadBuckets()
+
+ fs.filer.LoadFilerConf()
+
+ grace.OnInterrupt(func() {
+ fs.filer.Shutdown()
+ })
return fs, nil
}
-func maybeStartMetrics(fs *FilerServer, option *FilerOption) {
+func (fs *FilerServer) checkWithMaster() {
+
+ for _, master := range fs.option.Masters {
+ _, err := pb.ParseServerToGrpcAddress(master)
+ if err != nil {
+ glog.Fatalf("invalid master address %s: %v", master, err)
+ }
+ }
+
isConnected := false
- var metricsAddress string
- var metricsIntervalSec int
- var readErr error
for !isConnected {
- metricsAddress, metricsIntervalSec, readErr = readFilerConfiguration(fs.grpcDialOption, option.Masters[0])
- if readErr == nil {
- isConnected = true
- } else {
- time.Sleep(7 * time.Second)
+ for _, master := range fs.option.Masters {
+ readErr := operation.WithMasterServerClient(master, fs.grpcDialOption, func(masterClient master_pb.SeaweedClient) error {
+ resp, err := masterClient.GetMasterConfiguration(context.Background(), &master_pb.GetMasterConfigurationRequest{})
+ if err != nil {
+ return fmt.Errorf("get master %s configuration: %v", master, err)
+ }
+ fs.metricsAddress, fs.metricsIntervalSec = resp.MetricsAddress, int(resp.MetricsIntervalSeconds)
+ if fs.option.DefaultReplication == "" {
+ fs.option.DefaultReplication = resp.DefaultReplication
+ }
+ return nil
+ })
+ if readErr == nil {
+ isConnected = true
+ } else {
+ time.Sleep(7 * time.Second)
+ }
}
}
- if metricsAddress == "" && metricsIntervalSec <= 0 {
- return
- }
- go stats.LoopPushingMetric("filer", stats.SourceName(option.Port), stats.FilerGather,
- func() (addr string, intervalSeconds int) {
- return metricsAddress, metricsIntervalSec
- })
-}
-func readFilerConfiguration(grpcDialOption grpc.DialOption, masterGrpcAddress string) (metricsAddress string, metricsIntervalSec int, err error) {
- err = operation.WithMasterServerClient(masterGrpcAddress, grpcDialOption, func(masterClient master_pb.SeaweedClient) error {
- resp, err := masterClient.GetMasterConfiguration(context.Background(), &master_pb.GetMasterConfigurationRequest{})
- if err != nil {
- return fmt.Errorf("get master %s configuration: %v", masterGrpcAddress, err)
- }
- metricsAddress, metricsIntervalSec = resp.MetricsAddress, int(resp.MetricsIntervalSeconds)
- return nil
- })
- return
}
diff --git a/weed/server/filer_server_handlers.go b/weed/server/filer_server_handlers.go
index b6bfc3b04..ed6bbb6f6 100644
--- a/weed/server/filer_server_handlers.go
+++ b/weed/server/filer_server_handlers.go
@@ -1,14 +1,36 @@
package weed_server
import (
+ "github.com/chrislusf/seaweedfs/weed/util"
"net/http"
+ "strings"
+ "sync/atomic"
"time"
"github.com/chrislusf/seaweedfs/weed/stats"
)
func (fs *FilerServer) filerHandler(w http.ResponseWriter, r *http.Request) {
+
start := time.Now()
+
+ // proxy to volume servers
+ var fileId string
+ if strings.HasPrefix(r.RequestURI, "/?proxyChunkId=") {
+ fileId = r.RequestURI[len("/?proxyChunkId="):]
+ }
+ if fileId != "" {
+ stats.FilerRequestCounter.WithLabelValues("proxy").Inc()
+ fs.proxyToVolumeServer(w, r, fileId)
+ stats.FilerRequestHistogram.WithLabelValues("proxy").Observe(time.Since(start).Seconds())
+ return
+ }
+
+ w.Header().Set("Server", "SeaweedFS Filer "+util.VERSION)
+ if r.Header.Get("Origin") != "" {
+ w.Header().Set("Access-Control-Allow-Origin", "*")
+ w.Header().Set("Access-Control-Allow-Credentials", "true")
+ }
switch r.Method {
case "GET":
stats.FilerRequestCounter.WithLabelValues("get").Inc()
@@ -20,20 +42,53 @@ func (fs *FilerServer) filerHandler(w http.ResponseWriter, r *http.Request) {
stats.FilerRequestHistogram.WithLabelValues("head").Observe(time.Since(start).Seconds())
case "DELETE":
stats.FilerRequestCounter.WithLabelValues("delete").Inc()
- fs.DeleteHandler(w, r)
+ if _, ok := r.URL.Query()["tagging"]; ok {
+ fs.DeleteTaggingHandler(w, r)
+ } else {
+ fs.DeleteHandler(w, r)
+ }
stats.FilerRequestHistogram.WithLabelValues("delete").Observe(time.Since(start).Seconds())
- case "PUT":
- stats.FilerRequestCounter.WithLabelValues("put").Inc()
- fs.PostHandler(w, r)
- stats.FilerRequestHistogram.WithLabelValues("put").Observe(time.Since(start).Seconds())
- case "POST":
- stats.FilerRequestCounter.WithLabelValues("post").Inc()
- fs.PostHandler(w, r)
- stats.FilerRequestHistogram.WithLabelValues("post").Observe(time.Since(start).Seconds())
+ case "POST", "PUT":
+
+ // wait until in flight data is less than the limit
+ contentLength := getContentLength(r)
+ fs.inFlightDataLimitCond.L.Lock()
+ for atomic.LoadInt64(&fs.inFlightDataSize) > fs.option.ConcurrentUploadLimit {
+ fs.inFlightDataLimitCond.Wait()
+ }
+ atomic.AddInt64(&fs.inFlightDataSize, contentLength)
+ fs.inFlightDataLimitCond.L.Unlock()
+ defer func() {
+ atomic.AddInt64(&fs.inFlightDataSize, -contentLength)
+ fs.inFlightDataLimitCond.Signal()
+ }()
+
+ if r.Method == "PUT" {
+ stats.FilerRequestCounter.WithLabelValues("put").Inc()
+ if _, ok := r.URL.Query()["tagging"]; ok {
+ fs.PutTaggingHandler(w, r)
+ } else {
+ fs.PostHandler(w, r, contentLength)
+ }
+ stats.FilerRequestHistogram.WithLabelValues("put").Observe(time.Since(start).Seconds())
+ } else { // method == "POST"
+ stats.FilerRequestCounter.WithLabelValues("post").Inc()
+ fs.PostHandler(w, r, contentLength)
+ stats.FilerRequestHistogram.WithLabelValues("post").Observe(time.Since(start).Seconds())
+ }
+ case "OPTIONS":
+ stats.FilerRequestCounter.WithLabelValues("options").Inc()
+ OptionsHandler(w, r, false)
+ stats.FilerRequestHistogram.WithLabelValues("head").Observe(time.Since(start).Seconds())
}
}
func (fs *FilerServer) readonlyFilerHandler(w http.ResponseWriter, r *http.Request) {
+ w.Header().Set("Server", "SeaweedFS Filer "+util.VERSION)
+ if r.Header.Get("Origin") != "" {
+ w.Header().Set("Access-Control-Allow-Origin", "*")
+ w.Header().Set("Access-Control-Allow-Credentials", "true")
+ }
start := time.Now()
switch r.Method {
case "GET":
@@ -44,5 +99,18 @@ func (fs *FilerServer) readonlyFilerHandler(w http.ResponseWriter, r *http.Reque
stats.FilerRequestCounter.WithLabelValues("head").Inc()
fs.GetOrHeadHandler(w, r, false)
stats.FilerRequestHistogram.WithLabelValues("head").Observe(time.Since(start).Seconds())
+ case "OPTIONS":
+ stats.FilerRequestCounter.WithLabelValues("options").Inc()
+ OptionsHandler(w, r, true)
+ stats.FilerRequestHistogram.WithLabelValues("head").Observe(time.Since(start).Seconds())
+ }
+}
+
+func OptionsHandler(w http.ResponseWriter, r *http.Request, isReadOnly bool) {
+ if isReadOnly {
+ w.Header().Add("Access-Control-Allow-Methods", "GET, OPTIONS")
+ } else {
+ w.Header().Add("Access-Control-Allow-Methods", "PUT, POST, GET, DELETE, OPTIONS")
}
+ w.Header().Add("Access-Control-Allow-Headers", "*")
}
diff --git a/weed/server/filer_server_handlers_proxy.go b/weed/server/filer_server_handlers_proxy.go
new file mode 100644
index 000000000..b8b28790b
--- /dev/null
+++ b/weed/server/filer_server_handlers_proxy.go
@@ -0,0 +1,67 @@
+package weed_server
+
+import (
+ "github.com/chrislusf/seaweedfs/weed/glog"
+ "github.com/chrislusf/seaweedfs/weed/util"
+ "io"
+ "math/rand"
+ "net/http"
+)
+
+var (
+ client *http.Client
+)
+
+func init() {
+ client = &http.Client{Transport: &http.Transport{
+ MaxIdleConns: 1024,
+ MaxIdleConnsPerHost: 1024,
+ }}
+}
+
+func (fs *FilerServer) proxyToVolumeServer(w http.ResponseWriter, r *http.Request, fileId string) {
+
+ urlStrings, err := fs.filer.MasterClient.GetLookupFileIdFunction()(fileId)
+ if err != nil {
+ glog.Errorf("locate %s: %v", fileId, err)
+ w.WriteHeader(http.StatusInternalServerError)
+ return
+ }
+
+ if len(urlStrings) == 0 {
+ w.WriteHeader(http.StatusNotFound)
+ return
+ }
+
+ proxyReq, err := http.NewRequest(r.Method, urlStrings[rand.Intn(len(urlStrings))], r.Body)
+ if err != nil {
+ glog.Errorf("NewRequest %s: %v", urlStrings[0], err)
+ w.WriteHeader(http.StatusInternalServerError)
+ return
+ }
+
+ proxyReq.Header.Set("Host", r.Host)
+ proxyReq.Header.Set("X-Forwarded-For", r.RemoteAddr)
+
+ for header, values := range r.Header {
+ for _, value := range values {
+ proxyReq.Header.Add(header, value)
+ }
+ }
+
+ proxyResponse, postErr := client.Do(proxyReq)
+
+ if postErr != nil {
+ glog.Errorf("post to filer: %v", postErr)
+ w.WriteHeader(http.StatusInternalServerError)
+ return
+ }
+ defer util.CloseResponse(proxyResponse)
+
+ for k, v := range proxyResponse.Header {
+ w.Header()[k] = v
+ }
+ w.WriteHeader(proxyResponse.StatusCode)
+ io.Copy(w, proxyResponse.Body)
+
+}
diff --git a/weed/server/filer_server_handlers_read.go b/weed/server/filer_server_handlers_read.go
index ba21298ba..6bc09e953 100644
--- a/weed/server/filer_server_handlers_read.go
+++ b/weed/server/filer_server_handlers_read.go
@@ -1,19 +1,22 @@
package weed_server
import (
+ "bytes"
"context"
"io"
- "io/ioutil"
"mime"
- "mime/multipart"
"net/http"
"net/url"
- "path"
+ "path/filepath"
"strconv"
"strings"
+ "time"
- "github.com/chrislusf/seaweedfs/weed/filer2"
+ "github.com/chrislusf/seaweedfs/weed/filer"
"github.com/chrislusf/seaweedfs/weed/glog"
+ "github.com/chrislusf/seaweedfs/weed/images"
+ "github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
+ xhttp "github.com/chrislusf/seaweedfs/weed/s3api/http"
"github.com/chrislusf/seaweedfs/weed/stats"
"github.com/chrislusf/seaweedfs/weed/util"
)
@@ -26,13 +29,13 @@ func (fs *FilerServer) GetOrHeadHandler(w http.ResponseWriter, r *http.Request,
path = path[:len(path)-1]
}
- entry, err := fs.filer.FindEntry(context.Background(), filer2.FullPath(path))
+ entry, err := fs.filer.FindEntry(context.Background(), util.FullPath(path))
if err != nil {
if path == "/" {
fs.listDirectoryHandler(w, r)
return
}
- if err == filer2.ErrNotFound {
+ if err == filer_pb.ErrNotFound {
glog.V(1).Infof("Not found %s: %v", path, err)
stats.FilerRequestCounter.WithLabelValues("read.notfound").Inc()
w.WriteHeader(http.StatusNotFound)
@@ -58,196 +61,107 @@ func (fs *FilerServer) GetOrHeadHandler(w http.ResponseWriter, r *http.Request,
return
}
- if len(entry.Chunks) == 0 {
- glog.V(1).Infof("no file chunks for %s, attr=%+v", path, entry.Attr)
- stats.FilerRequestCounter.WithLabelValues("read.nocontent").Inc()
- w.WriteHeader(http.StatusNoContent)
- return
- }
-
w.Header().Set("Accept-Ranges", "bytes")
- if r.Method == "HEAD" {
- w.Header().Set("Content-Length", strconv.FormatInt(int64(filer2.TotalSize(entry.Chunks)), 10))
- w.Header().Set("Last-Modified", entry.Attr.Mtime.Format(http.TimeFormat))
- setEtag(w, filer2.ETag(entry.Chunks))
- return
- }
-
- if len(entry.Chunks) == 1 {
- fs.handleSingleChunk(w, r, entry)
- return
- }
-
- fs.handleMultipleChunks(w, r, entry)
-
-}
-
-func (fs *FilerServer) handleSingleChunk(w http.ResponseWriter, r *http.Request, entry *filer2.Entry) {
-
- fileId := entry.Chunks[0].GetFileIdString()
-
- urlString, err := fs.filer.MasterClient.LookupFileId(fileId)
- if err != nil {
- glog.V(1).Infof("operation LookupFileId %s failed, err: %v", fileId, err)
- w.WriteHeader(http.StatusNotFound)
- return
- }
-
- if fs.option.RedirectOnRead {
- stats.FilerRequestCounter.WithLabelValues("redirect").Inc()
- http.Redirect(w, r, urlString, http.StatusFound)
- return
- }
-
- u, _ := url.Parse(urlString)
- q := u.Query()
- for key, values := range r.URL.Query() {
- for _, value := range values {
- q.Add(key, value)
- }
- }
- u.RawQuery = q.Encode()
- request := &http.Request{
- Method: r.Method,
- URL: u,
- Proto: r.Proto,
- ProtoMajor: r.ProtoMajor,
- ProtoMinor: r.ProtoMinor,
- Header: r.Header,
- Body: r.Body,
- Host: r.Host,
- ContentLength: r.ContentLength,
- }
- glog.V(3).Infoln("retrieving from", u)
- resp, do_err := util.Do(request)
- if do_err != nil {
- glog.V(0).Infoln("failing to connect to volume server", do_err.Error())
- writeJsonError(w, r, http.StatusInternalServerError, do_err)
- return
- }
- defer func() {
- io.Copy(ioutil.Discard, resp.Body)
- resp.Body.Close()
- }()
- for k, v := range resp.Header {
- w.Header()[k] = v
- }
- if entry.Attr.Mime != "" {
- w.Header().Set("Content-Type", entry.Attr.Mime)
- }
- w.WriteHeader(resp.StatusCode)
- io.Copy(w, resp.Body)
-}
-
-func (fs *FilerServer) handleMultipleChunks(w http.ResponseWriter, r *http.Request, entry *filer2.Entry) {
+ // mime type
mimeType := entry.Attr.Mime
if mimeType == "" {
- if ext := path.Ext(entry.Name()); ext != "" {
+ if ext := filepath.Ext(entry.Name()); ext != "" {
mimeType = mime.TypeByExtension(ext)
}
}
if mimeType != "" {
w.Header().Set("Content-Type", mimeType)
}
- setEtag(w, filer2.ETag(entry.Chunks))
- totalSize := int64(filer2.TotalSize(entry.Chunks))
+ // if modified since
+ if !entry.Attr.Mtime.IsZero() {
+ w.Header().Set("Last-Modified", entry.Attr.Mtime.UTC().Format(http.TimeFormat))
+ if r.Header.Get("If-Modified-Since") != "" {
+ if t, parseError := time.Parse(http.TimeFormat, r.Header.Get("If-Modified-Since")); parseError == nil {
+ if !t.Before(entry.Attr.Mtime) {
+ w.WriteHeader(http.StatusNotModified)
+ return
+ }
+ }
+ }
+ }
- rangeReq := r.Header.Get("Range")
+ // print out the header from extended properties
+ for k, v := range entry.Extended {
+ w.Header().Set(k, string(v))
+ }
- if rangeReq == "" {
- w.Header().Set("Content-Length", strconv.FormatInt(totalSize, 10))
- if err := fs.writeContent(w, entry, 0, int(totalSize)); err != nil {
- http.Error(w, err.Error(), http.StatusInternalServerError)
- return
+ //Seaweed custom header are not visible to Vue or javascript
+ seaweedHeaders := []string{}
+ for header, _ := range w.Header() {
+ if strings.HasPrefix(header, "Seaweed-") {
+ seaweedHeaders = append(seaweedHeaders, header)
}
- return
}
+ seaweedHeaders = append(seaweedHeaders, "Content-Disposition")
+ w.Header().Set("Access-Control-Expose-Headers", strings.Join(seaweedHeaders, ","))
- //the rest is dealing with partial content request
- //mostly copy from src/pkg/net/http/fs.go
- ranges, err := parseRange(rangeReq, totalSize)
- if err != nil {
- http.Error(w, err.Error(), http.StatusRequestedRangeNotSatisfiable)
- return
- }
- if sumRangesSize(ranges) > totalSize {
- // The total number of bytes in all the ranges
- // is larger than the size of the file by
- // itself, so this is probably an attack, or a
- // dumb client. Ignore the range request.
- return
+ //set tag count
+ if r.Method == "GET" {
+ tagCount := 0
+ for k := range entry.Extended {
+ if strings.HasPrefix(k, xhttp.AmzObjectTagging+"-") {
+ tagCount++
+ }
+ }
+ if tagCount > 0 {
+ w.Header().Set(xhttp.AmzTagCount, strconv.Itoa(tagCount))
+ }
}
- if len(ranges) == 0 {
+
+ // set etag
+ etag := filer.ETagEntry(entry)
+ if inm := r.Header.Get("If-None-Match"); inm == "\""+etag+"\"" {
+ w.WriteHeader(http.StatusNotModified)
return
}
- if len(ranges) == 1 {
- // RFC 2616, Section 14.16:
- // "When an HTTP message includes the content of a single
- // range (for example, a response to a request for a
- // single range, or to a request for a set of ranges
- // that overlap without any holes), this content is
- // transmitted with a Content-Range header, and a
- // Content-Length header showing the number of bytes
- // actually transferred.
- // ...
- // A response to a request for a single range MUST NOT
- // be sent using the multipart/byteranges media type."
- ra := ranges[0]
- w.Header().Set("Content-Length", strconv.FormatInt(ra.length, 10))
- w.Header().Set("Content-Range", ra.contentRange(totalSize))
- w.WriteHeader(http.StatusPartialContent)
-
- err = fs.writeContent(w, entry, ra.start, int(ra.length))
- if err != nil {
- http.Error(w, err.Error(), http.StatusInternalServerError)
- return
- }
+ setEtag(w, etag)
+
+ filename := entry.Name()
+ filename = url.QueryEscape(filename)
+ adjustHeaderContentDisposition(w, r, filename)
+
+ totalSize := int64(entry.Size())
+
+ if r.Method == "HEAD" {
+ w.Header().Set("Content-Length", strconv.FormatInt(totalSize, 10))
+ processRangeRequest(r, w, totalSize, mimeType, func(writer io.Writer, offset int64, size int64) error {
+ return filer.StreamContent(fs.filer.MasterClient, writer, entry.Chunks, offset, size, true)
+ })
return
}
- // process multiple ranges
- for _, ra := range ranges {
- if ra.start > totalSize {
- http.Error(w, "Out of Range", http.StatusRequestedRangeNotSatisfiable)
- return
- }
- }
- sendSize := rangesMIMESize(ranges, mimeType, totalSize)
- pr, pw := io.Pipe()
- mw := multipart.NewWriter(pw)
- w.Header().Set("Content-Type", "multipart/byteranges; boundary="+mw.Boundary())
- sendContent := pr
- defer pr.Close() // cause writing goroutine to fail and exit if CopyN doesn't finish.
- go func() {
- for _, ra := range ranges {
- part, e := mw.CreatePart(ra.mimeHeader(mimeType, totalSize))
- if e != nil {
- pw.CloseWithError(e)
- return
- }
- if e = fs.writeContent(part, entry, ra.start, int(ra.length)); e != nil {
- pw.CloseWithError(e)
+ if rangeReq := r.Header.Get("Range"); rangeReq == "" {
+ ext := filepath.Ext(filename)
+ width, height, mode, shouldResize := shouldResizeImages(ext, r)
+ if shouldResize {
+ data, err := filer.ReadAll(fs.filer.MasterClient, entry.Chunks)
+ if err != nil {
+ glog.Errorf("failed to read %s: %v", path, err)
+ w.WriteHeader(http.StatusNotModified)
return
}
+ rs, _, _ := images.Resized(ext, bytes.NewReader(data), width, height, mode)
+ io.Copy(w, rs)
+ return
}
- mw.Close()
- pw.Close()
- }()
- if w.Header().Get("Content-Encoding") == "" {
- w.Header().Set("Content-Length", strconv.FormatInt(sendSize, 10))
- }
- w.WriteHeader(http.StatusPartialContent)
- if _, err := io.CopyN(w, sendContent, sendSize); err != nil {
- http.Error(w, "Internal Error", http.StatusInternalServerError)
- return
}
-}
-
-func (fs *FilerServer) writeContent(w io.Writer, entry *filer2.Entry, offset int64, size int) error {
-
- return filer2.StreamContent(fs.filer.MasterClient, w, entry.Chunks, offset, size)
+ processRangeRequest(r, w, totalSize, mimeType, func(writer io.Writer, offset int64, size int64) error {
+ if offset+size <= int64(len(entry.Content)) {
+ _, err := writer.Write(entry.Content[offset : offset+size])
+ if err != nil {
+ glog.Errorf("failed to write entry content: %v", err)
+ }
+ return err
+ }
+ return filer.StreamContent(fs.filer.MasterClient, writer, entry.Chunks, offset, size, false)
+ })
}
diff --git a/weed/server/filer_server_handlers_read_dir.go b/weed/server/filer_server_handlers_read_dir.go
index 87e864559..307c411b6 100644
--- a/weed/server/filer_server_handlers_read_dir.go
+++ b/weed/server/filer_server_handlers_read_dir.go
@@ -2,14 +2,17 @@ package weed_server
import (
"context"
+ "encoding/base64"
+ "fmt"
+ "github.com/skip2/go-qrcode"
"net/http"
"strconv"
"strings"
- "github.com/chrislusf/seaweedfs/weed/filer2"
"github.com/chrislusf/seaweedfs/weed/glog"
ui "github.com/chrislusf/seaweedfs/weed/server/filer_ui"
"github.com/chrislusf/seaweedfs/weed/stats"
+ "github.com/chrislusf/seaweedfs/weed/util"
)
// listDirectoryHandler lists directories and folers under a directory
@@ -31,8 +34,10 @@ func (fs *FilerServer) listDirectoryHandler(w http.ResponseWriter, r *http.Reque
}
lastFileName := r.FormValue("lastFileName")
+ namePattern := r.FormValue("namePattern")
+ namePatternExclude := r.FormValue("namePatternExclude")
- entries, err := fs.filer.ListDirectoryEntries(context.Background(), filer2.FullPath(path), lastFileName, false, limit)
+ entries, shouldDisplayLoadMore, err := fs.filer.ListDirectoryEntries(context.Background(), util.FullPath(path), lastFileName, false, int64(limit), "", namePattern, namePatternExclude)
if err != nil {
glog.V(0).Infof("listDirectory %s %s %d: %s", path, lastFileName, limit, err)
@@ -40,7 +45,6 @@ func (fs *FilerServer) listDirectoryHandler(w http.ResponseWriter, r *http.Reque
return
}
- shouldDisplayLoadMore := len(entries) == limit
if path == "/" {
path = ""
}
@@ -65,21 +69,30 @@ func (fs *FilerServer) listDirectoryHandler(w http.ResponseWriter, r *http.Reque
lastFileName,
shouldDisplayLoadMore,
})
- } else {
- ui.StatusTpl.Execute(w, struct {
- Path string
- Breadcrumbs []ui.Breadcrumb
- Entries interface{}
- Limit int
- LastFileName string
- ShouldDisplayLoadMore bool
- }{
- path,
- ui.ToBreadcrumb(path),
- entries,
- limit,
- lastFileName,
- shouldDisplayLoadMore,
- })
+ return
+ }
+
+ var qrImageString string
+ img, err := qrcode.Encode(fmt.Sprintf("http://%s:%d%s", fs.option.Host, fs.option.Port, r.URL.Path), qrcode.Medium, 128)
+ if err == nil {
+ qrImageString = base64.StdEncoding.EncodeToString(img)
}
+
+ ui.StatusTpl.Execute(w, struct {
+ Path string
+ Breadcrumbs []ui.Breadcrumb
+ Entries interface{}
+ Limit int
+ LastFileName string
+ ShouldDisplayLoadMore bool
+ QrImage string
+ }{
+ path,
+ ui.ToBreadcrumb(path),
+ entries,
+ limit,
+ lastFileName,
+ shouldDisplayLoadMore,
+ qrImageString,
+ })
}
diff --git a/weed/server/filer_server_handlers_tagging.go b/weed/server/filer_server_handlers_tagging.go
new file mode 100644
index 000000000..50b3a2c06
--- /dev/null
+++ b/weed/server/filer_server_handlers_tagging.go
@@ -0,0 +1,102 @@
+package weed_server
+
+import (
+ "context"
+ "net/http"
+ "strings"
+
+ "github.com/chrislusf/seaweedfs/weed/glog"
+ "github.com/chrislusf/seaweedfs/weed/storage/needle"
+ "github.com/chrislusf/seaweedfs/weed/util"
+)
+
+// add or replace one file Seaweed- prefixed attributes
+// curl -X PUT -H "Seaweed-Name1: value1" http://localhost:8888/path/to/a/file?tagging
+func (fs *FilerServer) PutTaggingHandler(w http.ResponseWriter, r *http.Request) {
+
+ ctx := context.Background()
+
+ path := r.URL.Path
+ if strings.HasSuffix(path, "/") {
+ path = path[:len(path)-1]
+ }
+
+ existingEntry, err := fs.filer.FindEntry(ctx, util.FullPath(path))
+ if err != nil {
+ writeJsonError(w, r, http.StatusNotFound, err)
+ return
+ }
+ if existingEntry == nil {
+ writeJsonError(w, r, http.StatusNotFound, err)
+ return
+ }
+
+ if existingEntry.Extended == nil {
+ existingEntry.Extended = make(map[string][]byte)
+ }
+
+ for header, values := range r.Header {
+ if strings.HasPrefix(header, needle.PairNamePrefix) {
+ for _, value := range values {
+ existingEntry.Extended[header] = []byte(value)
+ }
+ }
+ }
+
+ if dbErr := fs.filer.CreateEntry(ctx, existingEntry, false, false, nil); dbErr != nil {
+ glog.V(0).Infof("failing to update %s tagging : %v", path, dbErr)
+ writeJsonError(w, r, http.StatusInternalServerError, err)
+ return
+ }
+
+ writeJsonQuiet(w, r, http.StatusAccepted, nil)
+ return
+}
+
+// remove all Seaweed- prefixed attributes
+// curl -X DELETE http://localhost:8888/path/to/a/file?tagging
+func (fs *FilerServer) DeleteTaggingHandler(w http.ResponseWriter, r *http.Request) {
+
+ ctx := context.Background()
+
+ path := r.URL.Path
+ if strings.HasSuffix(path, "/") {
+ path = path[:len(path)-1]
+ }
+
+ existingEntry, err := fs.filer.FindEntry(ctx, util.FullPath(path))
+ if err != nil {
+ writeJsonError(w, r, http.StatusNotFound, err)
+ return
+ }
+ if existingEntry == nil {
+ writeJsonError(w, r, http.StatusNotFound, err)
+ return
+ }
+
+ if existingEntry.Extended == nil {
+ existingEntry.Extended = make(map[string][]byte)
+ }
+
+ hasDeletion := false
+ for header, _ := range existingEntry.Extended {
+ if strings.HasPrefix(header, needle.PairNamePrefix) {
+ delete(existingEntry.Extended, header)
+ hasDeletion = true
+ }
+ }
+
+ if !hasDeletion {
+ writeJsonQuiet(w, r, http.StatusNotModified, nil)
+ return
+ }
+
+ if dbErr := fs.filer.CreateEntry(ctx, existingEntry, false, false, nil); dbErr != nil {
+ glog.V(0).Infof("failing to delete %s tagging : %v", path, dbErr)
+ writeJsonError(w, r, http.StatusInternalServerError, err)
+ return
+ }
+
+ writeJsonQuiet(w, r, http.StatusAccepted, nil)
+ return
+}
diff --git a/weed/server/filer_server_handlers_write.go b/weed/server/filer_server_handlers_write.go
index 236e7027d..95eba9d3d 100644
--- a/weed/server/filer_server_handlers_write.go
+++ b/weed/server/filer_server_handlers_write.go
@@ -2,26 +2,17 @@ package weed_server
import (
"context"
- "encoding/json"
- "errors"
- "fmt"
- "io"
- "io/ioutil"
- "mime"
"net/http"
- "net/url"
"os"
- filenamePath "path"
- "strconv"
"strings"
"time"
- "github.com/chrislusf/seaweedfs/weed/filer2"
"github.com/chrislusf/seaweedfs/weed/glog"
"github.com/chrislusf/seaweedfs/weed/operation"
"github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
"github.com/chrislusf/seaweedfs/weed/security"
"github.com/chrislusf/seaweedfs/weed/stats"
+ "github.com/chrislusf/seaweedfs/weed/storage/needle"
"github.com/chrislusf/seaweedfs/weed/util"
)
@@ -32,271 +23,130 @@ var (
type FilerPostResult struct {
Name string `json:"name,omitempty"`
- Size uint32 `json:"size,omitempty"`
+ Size int64 `json:"size,omitempty"`
Error string `json:"error,omitempty"`
Fid string `json:"fid,omitempty"`
Url string `json:"url,omitempty"`
}
-func (fs *FilerServer) assignNewFileInfo(w http.ResponseWriter, r *http.Request, replication, collection string, dataCenter string) (fileId, urlLocation string, auth security.EncodedJwt, err error) {
+func (fs *FilerServer) assignNewFileInfo(so *operation.StorageOption) (fileId, urlLocation string, auth security.EncodedJwt, err error) {
stats.FilerRequestCounter.WithLabelValues("assign").Inc()
start := time.Now()
defer func() { stats.FilerRequestHistogram.WithLabelValues("assign").Observe(time.Since(start).Seconds()) }()
- ar := &operation.VolumeAssignRequest{
- Count: 1,
- Replication: replication,
- Collection: collection,
- Ttl: r.URL.Query().Get("ttl"),
- DataCenter: dataCenter,
- }
- var altRequest *operation.VolumeAssignRequest
- if dataCenter != "" {
- altRequest = &operation.VolumeAssignRequest{
- Count: 1,
- Replication: replication,
- Collection: collection,
- Ttl: r.URL.Query().Get("ttl"),
- DataCenter: "",
- }
- }
+ ar, altRequest := so.ToAssignRequests(1)
- assignResult, ae := operation.Assign(fs.filer.GetMaster(), fs.grpcDialOption, ar, altRequest)
+ assignResult, ae := operation.Assign(fs.filer.GetMaster, fs.grpcDialOption, ar, altRequest)
if ae != nil {
glog.Errorf("failing to assign a file id: %v", ae)
- writeJsonError(w, r, http.StatusInternalServerError, ae)
err = ae
return
}
fileId = assignResult.Fid
urlLocation = "http://" + assignResult.Url + "/" + assignResult.Fid
+ if so.Fsync {
+ urlLocation += "?fsync=true"
+ }
auth = assignResult.Auth
return
}
-func (fs *FilerServer) PostHandler(w http.ResponseWriter, r *http.Request) {
+func (fs *FilerServer) PostHandler(w http.ResponseWriter, r *http.Request, contentLength int64) {
ctx := context.Background()
query := r.URL.Query()
- replication := query.Get("replication")
- if replication == "" {
- replication = fs.option.DefaultReplication
- }
- collection := query.Get("collection")
- if collection == "" {
- collection = fs.option.Collection
- }
- dataCenter := query.Get("dataCenter")
- if dataCenter == "" {
- dataCenter = fs.option.DataCenter
- }
+ so := fs.detectStorageOption0(r.RequestURI,
+ query.Get("collection"),
+ query.Get("replication"),
+ query.Get("ttl"),
+ query.Get("disk"),
+ query.Get("dataCenter"),
+ query.Get("rack"),
+ )
+
+ fs.autoChunk(ctx, w, r, contentLength, so)
+ util.CloseRequest(r)
- if autoChunked := fs.autoChunk(ctx, w, r, replication, collection, dataCenter); autoChunked {
- return
- }
+}
- fileId, urlLocation, auth, err := fs.assignNewFileInfo(w, r, replication, collection, dataCenter)
+// curl -X DELETE http://localhost:8888/path/to
+// curl -X DELETE http://localhost:8888/path/to?recursive=true
+// curl -X DELETE http://localhost:8888/path/to?recursive=true&ignoreRecursiveError=true
+// curl -X DELETE http://localhost:8888/path/to?recursive=true&skipChunkDeletion=true
+func (fs *FilerServer) DeleteHandler(w http.ResponseWriter, r *http.Request) {
- if err != nil || fileId == "" || urlLocation == "" {
- glog.V(0).Infof("fail to allocate volume for %s, collection:%s, datacenter:%s", r.URL.Path, collection, dataCenter)
- return
+ isRecursive := r.FormValue("recursive") == "true"
+ if !isRecursive && fs.option.recursiveDelete {
+ if r.FormValue("recursive") != "false" {
+ isRecursive = true
+ }
}
+ ignoreRecursiveError := r.FormValue("ignoreRecursiveError") == "true"
+ skipChunkDeletion := r.FormValue("skipChunkDeletion") == "true"
- glog.V(4).Infof("write %s to %v", r.URL.Path, urlLocation)
-
- u, _ := url.Parse(urlLocation)
-
- // This allows a client to generate a chunk manifest and submit it to the filer -- it is a little off
- // because they need to provide FIDs instead of file paths...
- cm, _ := strconv.ParseBool(query.Get("cm"))
- if cm {
- q := u.Query()
- q.Set("cm", "true")
- u.RawQuery = q.Encode()
+ objectPath := r.URL.Path
+ if len(r.URL.Path) > 1 && strings.HasSuffix(objectPath, "/") {
+ objectPath = objectPath[0 : len(objectPath)-1]
}
- glog.V(4).Infoln("post to", u)
- ret, err := fs.uploadToVolumeServer(r, u, auth, w, fileId)
+ err := fs.filer.DeleteEntryMetaAndData(context.Background(), util.FullPath(objectPath), isRecursive, ignoreRecursiveError, !skipChunkDeletion, false, nil)
if err != nil {
+ glog.V(1).Infoln("deleting", objectPath, ":", err.Error())
+ httpStatus := http.StatusInternalServerError
+ if err == filer_pb.ErrNotFound {
+ httpStatus = http.StatusNoContent
+ }
+ writeJsonError(w, r, httpStatus, err)
return
}
- if err = fs.updateFilerStore(ctx, r, w, replication, collection, ret, fileId); err != nil {
- return
- }
-
- // send back post result
- reply := FilerPostResult{
- Name: ret.Name,
- Size: ret.Size,
- Error: ret.Error,
- Fid: fileId,
- Url: urlLocation,
- }
- setEtag(w, ret.ETag)
- writeJsonQuiet(w, r, http.StatusCreated, reply)
+ w.WriteHeader(http.StatusNoContent)
}
-// update metadata in filer store
-func (fs *FilerServer) updateFilerStore(ctx context.Context, r *http.Request, w http.ResponseWriter,
- replication string, collection string, ret operation.UploadResult, fileId string) (err error) {
-
- stats.FilerRequestCounter.WithLabelValues("postStoreWrite").Inc()
- start := time.Now()
- defer func() {
- stats.FilerRequestHistogram.WithLabelValues("postStoreWrite").Observe(time.Since(start).Seconds())
- }()
+func (fs *FilerServer) detectStorageOption(requestURI, qCollection, qReplication string, ttlSeconds int32, diskType string, dataCenter, rack string) *operation.StorageOption {
+ collection := util.Nvl(qCollection, fs.option.Collection)
+ replication := util.Nvl(qReplication, fs.option.DefaultReplication)
- modeStr := r.URL.Query().Get("mode")
- if modeStr == "" {
- modeStr = "0660"
+ // required by buckets folder
+ bucketDefaultReplication, fsync := "", false
+ if strings.HasPrefix(requestURI, fs.filer.DirBucketsPath+"/") {
+ collection = fs.filer.DetectBucket(util.FullPath(requestURI))
+ bucketDefaultReplication, fsync = fs.filer.ReadBucketOption(collection)
}
- mode, err := strconv.ParseUint(modeStr, 8, 32)
- if err != nil {
- glog.Errorf("Invalid mode format: %s, use 0660 by default", modeStr)
- mode = 0660
- }
-
- path := r.URL.Path
- if strings.HasSuffix(path, "/") {
- if ret.Name != "" {
- path += ret.Name
- }
- }
- existingEntry, err := fs.filer.FindEntry(ctx, filer2.FullPath(path))
- crTime := time.Now()
- if err == nil && existingEntry != nil {
- crTime = existingEntry.Crtime
- }
- entry := &filer2.Entry{
- FullPath: filer2.FullPath(path),
- Attr: filer2.Attr{
- Mtime: time.Now(),
- Crtime: crTime,
- Mode: os.FileMode(mode),
- Uid: OS_UID,
- Gid: OS_GID,
- Replication: replication,
- Collection: collection,
- TtlSec: int32(util.ParseInt(r.URL.Query().Get("ttl"), 0)),
- },
- Chunks: []*filer_pb.FileChunk{{
- FileId: fileId,
- Size: uint64(ret.Size),
- Mtime: time.Now().UnixNano(),
- ETag: ret.ETag,
- }},
- }
- if ext := filenamePath.Ext(path); ext != "" {
- entry.Attr.Mime = mime.TypeByExtension(ext)
- }
- // glog.V(4).Infof("saving %s => %+v", path, entry)
- if dbErr := fs.filer.CreateEntry(ctx, entry); dbErr != nil {
- fs.filer.DeleteChunks(entry.Chunks)
- glog.V(0).Infof("failing to write %s to filer server : %v", path, dbErr)
- writeJsonError(w, r, http.StatusInternalServerError, dbErr)
- err = dbErr
- return
+ if replication == "" {
+ replication = bucketDefaultReplication
}
- return nil
-}
-
-// send request to volume server
-func (fs *FilerServer) uploadToVolumeServer(r *http.Request, u *url.URL, auth security.EncodedJwt, w http.ResponseWriter, fileId string) (ret operation.UploadResult, err error) {
-
- stats.FilerRequestCounter.WithLabelValues("postUpload").Inc()
- start := time.Now()
- defer func() { stats.FilerRequestHistogram.WithLabelValues("postUpload").Observe(time.Since(start).Seconds()) }()
+ rule := fs.filer.FilerConf.MatchStorageRule(requestURI)
- request := &http.Request{
- Method: r.Method,
- URL: u,
- Proto: r.Proto,
- ProtoMajor: r.ProtoMajor,
- ProtoMinor: r.ProtoMinor,
- Header: r.Header,
- Body: r.Body,
- Host: r.Host,
- ContentLength: r.ContentLength,
- }
- if auth != "" {
- request.Header.Set("Authorization", "BEARER "+string(auth))
- }
- resp, doErr := util.Do(request)
- if doErr != nil {
- glog.Errorf("failing to connect to volume server %s: %v, %+v", r.RequestURI, doErr, r.Method)
- writeJsonError(w, r, http.StatusInternalServerError, doErr)
- err = doErr
- return
- }
- defer func() {
- io.Copy(ioutil.Discard, resp.Body)
- resp.Body.Close()
- }()
- etag := resp.Header.Get("ETag")
- respBody, raErr := ioutil.ReadAll(resp.Body)
- if raErr != nil {
- glog.V(0).Infoln("failing to upload to volume server", r.RequestURI, raErr.Error())
- writeJsonError(w, r, http.StatusInternalServerError, raErr)
- err = raErr
- return
- }
- glog.V(4).Infoln("post result", string(respBody))
- unmarshalErr := json.Unmarshal(respBody, &ret)
- if unmarshalErr != nil {
- glog.V(0).Infoln("failing to read upload resonse", r.RequestURI, string(respBody))
- writeJsonError(w, r, http.StatusInternalServerError, unmarshalErr)
- err = unmarshalErr
- return
- }
- if ret.Error != "" {
- err = errors.New(ret.Error)
- glog.V(0).Infoln("failing to post to volume server", r.RequestURI, ret.Error)
- writeJsonError(w, r, http.StatusInternalServerError, err)
- return
- }
- // find correct final path
- path := r.URL.Path
- if strings.HasSuffix(path, "/") {
- if ret.Name != "" {
- path += ret.Name
- } else {
- err = fmt.Errorf("can not to write to folder %s without a file name", path)
- fs.filer.DeleteFileByFileId(fileId)
- glog.V(0).Infoln("Can not to write to folder", path, "without a file name!")
- writeJsonError(w, r, http.StatusInternalServerError, err)
- return
+ if ttlSeconds == 0 {
+ ttl, err := needle.ReadTTL(rule.GetTtl())
+ if err != nil {
+ glog.Errorf("fail to parse %s ttl setting %s: %v", rule.LocationPrefix, rule.Ttl, err)
}
+ ttlSeconds = int32(ttl.Minutes()) * 60
}
- if etag != "" {
- ret.ETag = etag
+
+ return &operation.StorageOption{
+ Replication: util.Nvl(replication, rule.Replication),
+ Collection: util.Nvl(collection, rule.Collection),
+ DataCenter: util.Nvl(dataCenter, fs.option.DataCenter),
+ Rack: util.Nvl(rack, fs.option.Rack),
+ TtlSeconds: ttlSeconds,
+ DiskType: util.Nvl(diskType, rule.DiskType),
+ Fsync: fsync || rule.Fsync,
+ VolumeGrowthCount: rule.VolumeGrowthCount,
}
- return
}
-// curl -X DELETE http://localhost:8888/path/to
-// curl -X DELETE http://localhost:8888/path/to?recursive=true
-// curl -X DELETE http://localhost:8888/path/to?recursive=true&ignoreRecursiveError=true
-// curl -X DELETE http://localhost:8888/path/to?recursive=true&skipChunkDeletion=true
-func (fs *FilerServer) DeleteHandler(w http.ResponseWriter, r *http.Request) {
-
- isRecursive := r.FormValue("recursive") == "true"
- ignoreRecursiveError := r.FormValue("ignoreRecursiveError") == "true"
- skipChunkDeletion := r.FormValue("skipChunkDeletion") == "true"
+func (fs *FilerServer) detectStorageOption0(requestURI, qCollection, qReplication string, qTtl string, diskType string, dataCenter, rack string) *operation.StorageOption {
- err := fs.filer.DeleteEntryMetaAndData(context.Background(), filer2.FullPath(r.URL.Path), isRecursive, ignoreRecursiveError, !skipChunkDeletion)
+ ttl, err := needle.ReadTTL(qTtl)
if err != nil {
- glog.V(1).Infoln("deleting", r.URL.Path, ":", err.Error())
- httpStatus := http.StatusInternalServerError
- if err == filer2.ErrNotFound {
- httpStatus = http.StatusNotFound
- }
- writeJsonError(w, r, httpStatus, err)
- return
+ glog.Errorf("fail to parse ttl %s: %v", qTtl, err)
}
- w.WriteHeader(http.StatusNoContent)
+ return fs.detectStorageOption(requestURI, qCollection, qReplication, int32(ttl.Minutes())*60, diskType, dataCenter, rack)
}
diff --git a/weed/server/filer_server_handlers_write_autochunk.go b/weed/server/filer_server_handlers_write_autochunk.go
index 8ff7ab2c0..c4f10d94e 100644
--- a/weed/server/filer_server_handlers_write_autochunk.go
+++ b/weed/server/filer_server_handlers_write_autochunk.go
@@ -1,31 +1,27 @@
package weed_server
import (
- "bytes"
"context"
+ "fmt"
"io"
- "io/ioutil"
"net/http"
+ "os"
"path"
"strconv"
"strings"
"time"
- "github.com/chrislusf/seaweedfs/weed/filer2"
+ "github.com/chrislusf/seaweedfs/weed/filer"
"github.com/chrislusf/seaweedfs/weed/glog"
"github.com/chrislusf/seaweedfs/weed/operation"
"github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
- "github.com/chrislusf/seaweedfs/weed/security"
+ xhttp "github.com/chrislusf/seaweedfs/weed/s3api/http"
"github.com/chrislusf/seaweedfs/weed/stats"
+ "github.com/chrislusf/seaweedfs/weed/storage/needle"
"github.com/chrislusf/seaweedfs/weed/util"
)
-func (fs *FilerServer) autoChunk(ctx context.Context, w http.ResponseWriter, r *http.Request,
- replication string, collection string, dataCenter string) bool {
- if r.Method != "POST" {
- glog.V(4).Infoln("AutoChunking not supported for method", r.Method)
- return false
- }
+func (fs *FilerServer) autoChunk(ctx context.Context, w http.ResponseWriter, r *http.Request, contentLength int64, so *operation.StorageOption) {
// autoChunking can be set at the command-line level or as a query param. Query param overrides command-line
query := r.URL.Query()
@@ -35,174 +31,308 @@ func (fs *FilerServer) autoChunk(ctx context.Context, w http.ResponseWriter, r *
if maxMB <= 0 && fs.option.MaxMB > 0 {
maxMB = int32(fs.option.MaxMB)
}
- if maxMB <= 0 {
- glog.V(4).Infoln("AutoChunking not enabled")
- return false
- }
- glog.V(4).Infoln("AutoChunking level set to", maxMB, "(MB)")
chunkSize := 1024 * 1024 * maxMB
- contentLength := int64(0)
- if contentLengthHeader := r.Header["Content-Length"]; len(contentLengthHeader) == 1 {
- contentLength, _ = strconv.ParseInt(contentLengthHeader[0], 10, 64)
- if contentLength <= int64(chunkSize) {
- glog.V(4).Infoln("Content-Length of", contentLength, "is less than the chunk size of", chunkSize, "so autoChunking will be skipped.")
- return false
- }
- }
+ stats.FilerRequestCounter.WithLabelValues("chunk").Inc()
+ start := time.Now()
+ defer func() {
+ stats.FilerRequestHistogram.WithLabelValues("chunk").Observe(time.Since(start).Seconds())
+ }()
- if contentLength <= 0 {
- glog.V(4).Infoln("Content-Length value is missing or unexpected so autoChunking will be skipped.")
- return false
+ var reply *FilerPostResult
+ var err error
+ var md5bytes []byte
+ if r.Method == "POST" {
+ if r.Header.Get("Content-Type") == "" && strings.HasSuffix(r.URL.Path, "/") {
+ reply, err = fs.mkdir(ctx, w, r)
+ } else {
+ reply, md5bytes, err = fs.doPostAutoChunk(ctx, w, r, chunkSize, contentLength, so)
+ }
+ } else {
+ reply, md5bytes, err = fs.doPutAutoChunk(ctx, w, r, chunkSize, contentLength, so)
}
-
- reply, err := fs.doAutoChunk(ctx, w, r, contentLength, chunkSize, replication, collection, dataCenter)
if err != nil {
- writeJsonError(w, r, http.StatusInternalServerError, err)
+ if strings.HasPrefix(err.Error(), "read input:") {
+ writeJsonError(w, r, 499, err)
+ } else if strings.HasSuffix(err.Error(), "is a file") {
+ writeJsonError(w, r, http.StatusConflict, err)
+ } else {
+ writeJsonError(w, r, http.StatusInternalServerError, err)
+ }
} else if reply != nil {
+ if len(md5bytes) > 0 {
+ w.Header().Set("Content-MD5", util.Base64Encode(md5bytes))
+ }
writeJsonQuiet(w, r, http.StatusCreated, reply)
}
- return true
}
-func (fs *FilerServer) doAutoChunk(ctx context.Context, w http.ResponseWriter, r *http.Request,
- contentLength int64, chunkSize int32, replication string, collection string, dataCenter string) (filerResult *FilerPostResult, replyerr error) {
-
- stats.FilerRequestCounter.WithLabelValues("postAutoChunk").Inc()
- start := time.Now()
- defer func() {
- stats.FilerRequestHistogram.WithLabelValues("postAutoChunk").Observe(time.Since(start).Seconds())
- }()
+func (fs *FilerServer) doPostAutoChunk(ctx context.Context, w http.ResponseWriter, r *http.Request, chunkSize int32, contentLength int64, so *operation.StorageOption) (filerResult *FilerPostResult, md5bytes []byte, replyerr error) {
multipartReader, multipartReaderErr := r.MultipartReader()
if multipartReaderErr != nil {
- return nil, multipartReaderErr
+ return nil, nil, multipartReaderErr
}
part1, part1Err := multipartReader.NextPart()
if part1Err != nil {
- return nil, part1Err
+ return nil, nil, part1Err
}
fileName := part1.FileName()
if fileName != "" {
fileName = path.Base(fileName)
}
+ contentType := part1.Header.Get("Content-Type")
+ if contentType == "application/octet-stream" {
+ contentType = ""
+ }
- var fileChunks []*filer_pb.FileChunk
+ fileChunks, md5Hash, chunkOffset, err, smallContent := fs.uploadReaderToChunks(w, r, part1, chunkSize, fileName, contentType, contentLength, so)
+ if err != nil {
+ return nil, nil, err
+ }
- totalBytesRead := int64(0)
- tmpBufferSize := int32(1024 * 1024)
- tmpBuffer := bytes.NewBuffer(make([]byte, 0, tmpBufferSize))
- chunkBuf := make([]byte, chunkSize+tmpBufferSize, chunkSize+tmpBufferSize) // chunk size plus a little overflow
- chunkBufOffset := int32(0)
- chunkOffset := int64(0)
- writtenChunks := 0
+ md5bytes = md5Hash.Sum(nil)
+ filerResult, replyerr = fs.saveMetaData(ctx, r, fileName, contentType, so, md5bytes, fileChunks, chunkOffset, smallContent)
- filerResult = &FilerPostResult{
- Name: fileName,
- }
+ return
+}
- for totalBytesRead < contentLength {
- tmpBuffer.Reset()
- bytesRead, readErr := io.CopyN(tmpBuffer, part1, int64(tmpBufferSize))
- readFully := readErr != nil && readErr == io.EOF
- tmpBuf := tmpBuffer.Bytes()
- bytesToCopy := tmpBuf[0:int(bytesRead)]
+func (fs *FilerServer) doPutAutoChunk(ctx context.Context, w http.ResponseWriter, r *http.Request, chunkSize int32, contentLength int64, so *operation.StorageOption) (filerResult *FilerPostResult, md5bytes []byte, replyerr error) {
- copy(chunkBuf[chunkBufOffset:chunkBufOffset+int32(bytesRead)], bytesToCopy)
- chunkBufOffset = chunkBufOffset + int32(bytesRead)
+ fileName := path.Base(r.URL.Path)
+ contentType := r.Header.Get("Content-Type")
+ if contentType == "application/octet-stream" {
+ contentType = ""
+ }
- if chunkBufOffset >= chunkSize || readFully || (chunkBufOffset > 0 && bytesRead == 0) {
- writtenChunks = writtenChunks + 1
- fileId, urlLocation, auth, assignErr := fs.assignNewFileInfo(w, r, replication, collection, dataCenter)
- if assignErr != nil {
- return nil, assignErr
- }
+ fileChunks, md5Hash, chunkOffset, err, smallContent := fs.uploadReaderToChunks(w, r, r.Body, chunkSize, fileName, contentType, contentLength, so)
+ if err != nil {
+ return nil, nil, err
+ }
- // upload the chunk to the volume server
- chunkName := fileName + "_chunk_" + strconv.FormatInt(int64(len(fileChunks)+1), 10)
- uploadErr := fs.doUpload(urlLocation, w, r, chunkBuf[0:chunkBufOffset], chunkName, "", fileId, auth)
- if uploadErr != nil {
- return nil, uploadErr
- }
+ md5bytes = md5Hash.Sum(nil)
+ filerResult, replyerr = fs.saveMetaData(ctx, r, fileName, contentType, so, md5bytes, fileChunks, chunkOffset, smallContent)
- // Save to chunk manifest structure
- fileChunks = append(fileChunks,
- &filer_pb.FileChunk{
- FileId: fileId,
- Offset: chunkOffset,
- Size: uint64(chunkBufOffset),
- Mtime: time.Now().UnixNano(),
- },
- )
-
- // reset variables for the next chunk
- chunkBufOffset = 0
- chunkOffset = totalBytesRead + int64(bytesRead)
- }
+ return
+}
- totalBytesRead = totalBytesRead + int64(bytesRead)
+func isAppend(r *http.Request) bool {
+ return r.URL.Query().Get("op") == "append"
+}
- if bytesRead == 0 || readFully {
- break
- }
+func (fs *FilerServer) saveMetaData(ctx context.Context, r *http.Request, fileName string, contentType string, so *operation.StorageOption, md5bytes []byte, fileChunks []*filer_pb.FileChunk, chunkOffset int64, content []byte) (filerResult *FilerPostResult, replyerr error) {
- if readErr != nil {
- return nil, readErr
- }
+ // detect file mode
+ modeStr := r.URL.Query().Get("mode")
+ if modeStr == "" {
+ modeStr = "0660"
+ }
+ mode, err := strconv.ParseUint(modeStr, 8, 32)
+ if err != nil {
+ glog.Errorf("Invalid mode format: %s, use 0660 by default", modeStr)
+ mode = 0660
}
+ // fix the path
path := r.URL.Path
if strings.HasSuffix(path, "/") {
if fileName != "" {
path += fileName
}
+ } else {
+ if fileName != "" {
+ if possibleDirEntry, findDirErr := fs.filer.FindEntry(ctx, util.FullPath(path)); findDirErr == nil {
+ if possibleDirEntry.IsDirectory() {
+ path += "/" + fileName
+ }
+ }
+ }
}
- glog.V(4).Infoln("saving", path)
- entry := &filer2.Entry{
- FullPath: filer2.FullPath(path),
- Attr: filer2.Attr{
- Mtime: time.Now(),
- Crtime: time.Now(),
- Mode: 0660,
- Uid: OS_UID,
- Gid: OS_GID,
- Replication: replication,
- Collection: collection,
- TtlSec: int32(util.ParseInt(r.URL.Query().Get("ttl"), 0)),
- },
- Chunks: fileChunks,
+ var entry *filer.Entry
+ var mergedChunks []*filer_pb.FileChunk
+ // when it is an append
+ if isAppend(r) {
+ existingEntry, findErr := fs.filer.FindEntry(ctx, util.FullPath(path))
+ if findErr != nil && findErr != filer_pb.ErrNotFound {
+ glog.V(0).Infof("failing to find %s: %v", path, findErr)
+ }
+ entry = existingEntry
+ }
+ if entry != nil {
+ entry.Mtime = time.Now()
+ entry.Md5 = nil
+ // adjust chunk offsets
+ for _, chunk := range fileChunks {
+ chunk.Offset += int64(entry.FileSize)
+ }
+ mergedChunks = append(entry.Chunks, fileChunks...)
+ entry.FileSize += uint64(chunkOffset)
+
+ // TODO
+ if len(entry.Content) > 0 {
+ replyerr = fmt.Errorf("append to small file is not supported yet")
+ return
+ }
+
+ } else {
+ glog.V(4).Infoln("saving", path)
+ mergedChunks = fileChunks
+ entry = &filer.Entry{
+ FullPath: util.FullPath(path),
+ Attr: filer.Attr{
+ Mtime: time.Now(),
+ Crtime: time.Now(),
+ Mode: os.FileMode(mode),
+ Uid: OS_UID,
+ Gid: OS_GID,
+ Replication: so.Replication,
+ Collection: so.Collection,
+ TtlSec: so.TtlSeconds,
+ DiskType: so.DiskType,
+ Mime: contentType,
+ Md5: md5bytes,
+ FileSize: uint64(chunkOffset),
+ },
+ Content: content,
+ }
}
- if dbErr := fs.filer.CreateEntry(ctx, entry); dbErr != nil {
- fs.filer.DeleteChunks(entry.Chunks)
+
+ // maybe compact entry chunks
+ mergedChunks, replyerr = filer.MaybeManifestize(fs.saveAsChunk(so), mergedChunks)
+ if replyerr != nil {
+ glog.V(0).Infof("manifestize %s: %v", r.RequestURI, replyerr)
+ return
+ }
+ entry.Chunks = mergedChunks
+
+ filerResult = &FilerPostResult{
+ Name: fileName,
+ Size: int64(entry.FileSize),
+ }
+
+ if entry.Extended == nil {
+ entry.Extended = make(map[string][]byte)
+ }
+
+ SaveAmzMetaData(r, entry.Extended, false)
+
+ for k, v := range r.Header {
+ if len(v) > 0 && strings.HasPrefix(k, needle.PairNamePrefix) {
+ entry.Extended[k] = []byte(v[0])
+ }
+ }
+
+ if dbErr := fs.filer.CreateEntry(ctx, entry, false, false, nil); dbErr != nil {
+ fs.filer.DeleteChunks(fileChunks)
replyerr = dbErr
filerResult.Error = dbErr.Error()
glog.V(0).Infof("failing to write %s to filer server : %v", path, dbErr)
+ }
+ return filerResult, replyerr
+}
+
+func (fs *FilerServer) saveAsChunk(so *operation.StorageOption) filer.SaveDataAsChunkFunctionType {
+
+ return func(reader io.Reader, name string, offset int64) (*filer_pb.FileChunk, string, string, error) {
+ // assign one file id for one chunk
+ fileId, urlLocation, auth, assignErr := fs.assignNewFileInfo(so)
+ if assignErr != nil {
+ return nil, "", "", assignErr
+ }
+
+ // upload the chunk to the volume server
+ uploadResult, uploadErr, _ := operation.Upload(urlLocation, name, fs.option.Cipher, reader, false, "", nil, auth)
+ if uploadErr != nil {
+ return nil, "", "", uploadErr
+ }
+
+ return uploadResult.ToPbFileChunk(fileId, offset), so.Collection, so.Replication, nil
+ }
+}
+
+func (fs *FilerServer) mkdir(ctx context.Context, w http.ResponseWriter, r *http.Request) (filerResult *FilerPostResult, replyerr error) {
+
+ // detect file mode
+ modeStr := r.URL.Query().Get("mode")
+ if modeStr == "" {
+ modeStr = "0660"
+ }
+ mode, err := strconv.ParseUint(modeStr, 8, 32)
+ if err != nil {
+ glog.Errorf("Invalid mode format: %s, use 0660 by default", modeStr)
+ mode = 0660
+ }
+
+ // fix the path
+ path := r.URL.Path
+ if strings.HasSuffix(path, "/") {
+ path = path[:len(path)-1]
+ }
+
+ existingEntry, err := fs.filer.FindEntry(ctx, util.FullPath(path))
+ if err == nil && existingEntry != nil {
+ replyerr = fmt.Errorf("dir %s already exists", path)
return
}
- return
+ glog.V(4).Infoln("mkdir", path)
+ entry := &filer.Entry{
+ FullPath: util.FullPath(path),
+ Attr: filer.Attr{
+ Mtime: time.Now(),
+ Crtime: time.Now(),
+ Mode: os.FileMode(mode) | os.ModeDir,
+ Uid: OS_UID,
+ Gid: OS_GID,
+ },
+ }
+
+ filerResult = &FilerPostResult{
+ Name: util.FullPath(path).Name(),
+ }
+
+ if dbErr := fs.filer.CreateEntry(ctx, entry, false, false, nil); dbErr != nil {
+ replyerr = dbErr
+ filerResult.Error = dbErr.Error()
+ glog.V(0).Infof("failing to create dir %s on filer server : %v", path, dbErr)
+ }
+ return filerResult, replyerr
}
-func (fs *FilerServer) doUpload(urlLocation string, w http.ResponseWriter, r *http.Request,
- chunkBuf []byte, fileName string, contentType string, fileId string, auth security.EncodedJwt) (err error) {
+func SaveAmzMetaData(r *http.Request, existing map[string][]byte, isReplace bool) (metadata map[string][]byte) {
- stats.FilerRequestCounter.WithLabelValues("postAutoChunkUpload").Inc()
- start := time.Now()
- defer func() {
- stats.FilerRequestHistogram.WithLabelValues("postAutoChunkUpload").Observe(time.Since(start).Seconds())
- }()
+ metadata = make(map[string][]byte)
+ if !isReplace {
+ for k, v := range existing {
+ metadata[k] = v
+ }
+ }
- ioReader := ioutil.NopCloser(bytes.NewBuffer(chunkBuf))
- uploadResult, uploadError := operation.Upload(urlLocation, fileName, ioReader, false, contentType, nil, auth)
- if uploadResult != nil {
- glog.V(0).Infoln("Chunk upload result. Name:", uploadResult.Name, "Fid:", fileId, "Size:", uploadResult.Size)
+ if sc := r.Header.Get(xhttp.AmzStorageClass); sc != "" {
+ metadata[xhttp.AmzStorageClass] = []byte(sc)
}
- if uploadError != nil {
- err = uploadError
+
+ if tags := r.Header.Get(xhttp.AmzObjectTagging); tags != "" {
+ for _, v := range strings.Split(tags, "&") {
+ tag := strings.Split(v, "=")
+ if len(tag) == 2 {
+ metadata[xhttp.AmzObjectTagging+"-"+tag[0]] = []byte(tag[1])
+ }
+ }
}
+
+ for header, values := range r.Header {
+ if strings.HasPrefix(header, xhttp.AmzUserMetaPrefix) {
+ for _, value := range values {
+ metadata[header] = []byte(value)
+ }
+ }
+ }
+
return
+
}
diff --git a/weed/server/filer_server_handlers_write_cipher.go b/weed/server/filer_server_handlers_write_cipher.go
new file mode 100644
index 000000000..8334d1618
--- /dev/null
+++ b/weed/server/filer_server_handlers_write_cipher.go
@@ -0,0 +1,91 @@
+package weed_server
+
+import (
+ "context"
+ "fmt"
+ "net/http"
+ "strings"
+ "time"
+
+ "github.com/chrislusf/seaweedfs/weed/filer"
+ "github.com/chrislusf/seaweedfs/weed/glog"
+ "github.com/chrislusf/seaweedfs/weed/operation"
+ "github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
+ "github.com/chrislusf/seaweedfs/weed/storage/needle"
+ "github.com/chrislusf/seaweedfs/weed/util"
+)
+
+// handling single chunk POST or PUT upload
+func (fs *FilerServer) encrypt(ctx context.Context, w http.ResponseWriter, r *http.Request, so *operation.StorageOption) (filerResult *FilerPostResult, err error) {
+
+ fileId, urlLocation, auth, err := fs.assignNewFileInfo(so)
+
+ if err != nil || fileId == "" || urlLocation == "" {
+ return nil, fmt.Errorf("fail to allocate volume for %s, collection:%s, datacenter:%s", r.URL.Path, so.Collection, so.DataCenter)
+ }
+
+ glog.V(4).Infof("write %s to %v", r.URL.Path, urlLocation)
+
+ // Note: encrypt(gzip(data)), encrypt data first, then gzip
+
+ sizeLimit := int64(fs.option.MaxMB) * 1024 * 1024
+
+ pu, err := needle.ParseUpload(r, sizeLimit)
+ uncompressedData := pu.Data
+ if pu.IsGzipped {
+ uncompressedData = pu.UncompressedData
+ }
+ if pu.MimeType == "" {
+ pu.MimeType = http.DetectContentType(uncompressedData)
+ // println("detect2 mimetype to", pu.MimeType)
+ }
+
+ uploadResult, uploadError := operation.UploadData(urlLocation, pu.FileName, true, uncompressedData, false, pu.MimeType, pu.PairMap, auth)
+ if uploadError != nil {
+ return nil, fmt.Errorf("upload to volume server: %v", uploadError)
+ }
+
+ // Save to chunk manifest structure
+ fileChunks := []*filer_pb.FileChunk{uploadResult.ToPbFileChunk(fileId, 0)}
+
+ // fmt.Printf("uploaded: %+v\n", uploadResult)
+
+ path := r.URL.Path
+ if strings.HasSuffix(path, "/") {
+ if pu.FileName != "" {
+ path += pu.FileName
+ }
+ }
+
+ entry := &filer.Entry{
+ FullPath: util.FullPath(path),
+ Attr: filer.Attr{
+ Mtime: time.Now(),
+ Crtime: time.Now(),
+ Mode: 0660,
+ Uid: OS_UID,
+ Gid: OS_GID,
+ Replication: so.Replication,
+ Collection: so.Collection,
+ TtlSec: so.TtlSeconds,
+ DiskType: so.DiskType,
+ Mime: pu.MimeType,
+ Md5: util.Base64Md5ToBytes(pu.ContentMd5),
+ },
+ Chunks: fileChunks,
+ }
+
+ filerResult = &FilerPostResult{
+ Name: pu.FileName,
+ Size: int64(pu.OriginalDataSize),
+ }
+
+ if dbErr := fs.filer.CreateEntry(ctx, entry, false, false, nil); dbErr != nil {
+ fs.filer.DeleteChunks(entry.Chunks)
+ err = dbErr
+ filerResult.Error = dbErr.Error()
+ return
+ }
+
+ return
+}
diff --git a/weed/server/filer_server_handlers_write_upload.go b/weed/server/filer_server_handlers_write_upload.go
new file mode 100644
index 000000000..3ab45453e
--- /dev/null
+++ b/weed/server/filer_server_handlers_write_upload.go
@@ -0,0 +1,105 @@
+package weed_server
+
+import (
+ "crypto/md5"
+ "hash"
+ "io"
+ "io/ioutil"
+ "net/http"
+ "strings"
+ "time"
+
+ "github.com/chrislusf/seaweedfs/weed/filer"
+ "github.com/chrislusf/seaweedfs/weed/glog"
+ "github.com/chrislusf/seaweedfs/weed/operation"
+ "github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
+ "github.com/chrislusf/seaweedfs/weed/security"
+ "github.com/chrislusf/seaweedfs/weed/stats"
+ "github.com/chrislusf/seaweedfs/weed/util"
+)
+
+func (fs *FilerServer) uploadReaderToChunks(w http.ResponseWriter, r *http.Request, reader io.Reader, chunkSize int32, fileName, contentType string, contentLength int64, so *operation.StorageOption) ([]*filer_pb.FileChunk, hash.Hash, int64, error, []byte) {
+ var fileChunks []*filer_pb.FileChunk
+
+ md5Hash := md5.New()
+ var partReader = ioutil.NopCloser(io.TeeReader(reader, md5Hash))
+
+ chunkOffset := int64(0)
+ var smallContent []byte
+
+ for {
+ limitedReader := io.LimitReader(partReader, int64(chunkSize))
+
+ data, err := ioutil.ReadAll(limitedReader)
+ if err != nil {
+ return nil, nil, 0, err, nil
+ }
+ if chunkOffset == 0 && !isAppend(r) {
+ if len(data) < int(fs.option.SaveToFilerLimit) || strings.HasPrefix(r.URL.Path, filer.DirectoryEtcRoot) && len(data) < 4*1024 {
+ smallContent = data
+ chunkOffset += int64(len(data))
+ break
+ }
+ }
+ dataReader := util.NewBytesReader(data)
+
+ // retry to assign a different file id
+ var fileId, urlLocation string
+ var auth security.EncodedJwt
+ var assignErr, uploadErr error
+ var uploadResult *operation.UploadResult
+ for i := 0; i < 3; i++ {
+ // assign one file id for one chunk
+ fileId, urlLocation, auth, assignErr = fs.assignNewFileInfo(so)
+ if assignErr != nil {
+ return nil, nil, 0, assignErr, nil
+ }
+
+ // upload the chunk to the volume server
+ uploadResult, uploadErr, _ = fs.doUpload(urlLocation, w, r, dataReader, fileName, contentType, nil, auth)
+ if uploadErr != nil {
+ time.Sleep(251 * time.Millisecond)
+ continue
+ }
+ break
+ }
+ if uploadErr != nil {
+ return nil, nil, 0, uploadErr, nil
+ }
+
+ // if last chunk exhausted the reader exactly at the border
+ if uploadResult.Size == 0 {
+ break
+ }
+
+ // Save to chunk manifest structure
+ fileChunks = append(fileChunks, uploadResult.ToPbFileChunk(fileId, chunkOffset))
+
+ glog.V(4).Infof("uploaded %s chunk %d to %s [%d,%d)", fileName, len(fileChunks), fileId, chunkOffset, chunkOffset+int64(uploadResult.Size))
+
+ // reset variables for the next chunk
+ chunkOffset = chunkOffset + int64(uploadResult.Size)
+
+ // if last chunk was not at full chunk size, but already exhausted the reader
+ if int64(uploadResult.Size) < int64(chunkSize) {
+ break
+ }
+ }
+
+ return fileChunks, md5Hash, chunkOffset, nil, smallContent
+}
+
+func (fs *FilerServer) doUpload(urlLocation string, w http.ResponseWriter, r *http.Request, limitedReader io.Reader, fileName string, contentType string, pairMap map[string]string, auth security.EncodedJwt) (*operation.UploadResult, error, []byte) {
+
+ stats.FilerRequestCounter.WithLabelValues("chunkUpload").Inc()
+ start := time.Now()
+ defer func() {
+ stats.FilerRequestHistogram.WithLabelValues("chunkUpload").Observe(time.Since(start).Seconds())
+ }()
+
+ uploadResult, err, data := operation.Upload(urlLocation, fileName, fs.option.Cipher, limitedReader, false, contentType, pairMap, auth)
+ if uploadResult != nil && uploadResult.RetryCount > 0 {
+ stats.FilerRequestCounter.WithLabelValues("chunkUploadRetry").Add(float64(uploadResult.RetryCount))
+ }
+ return uploadResult, err, data
+}
diff --git a/weed/server/filer_server_rocksdb.go b/weed/server/filer_server_rocksdb.go
new file mode 100644
index 000000000..5fcc7e88f
--- /dev/null
+++ b/weed/server/filer_server_rocksdb.go
@@ -0,0 +1,7 @@
+// +build rocksdb
+
+package weed_server
+
+import (
+ _ "github.com/chrislusf/seaweedfs/weed/filer/rocksdb"
+)
diff --git a/weed/server/filer_ui/breadcrumb.go b/weed/server/filer_ui/breadcrumb.go
index 2f0df7f91..5016117a8 100644
--- a/weed/server/filer_ui/breadcrumb.go
+++ b/weed/server/filer_ui/breadcrumb.go
@@ -1,8 +1,9 @@
-package master_ui
+package filer_ui
import (
- "path/filepath"
"strings"
+
+ "github.com/chrislusf/seaweedfs/weed/util"
)
type Breadcrumb struct {
@@ -16,7 +17,7 @@ func ToBreadcrumb(fullpath string) (crumbs []Breadcrumb) {
for i := 0; i < len(parts); i++ {
crumb := Breadcrumb{
Name: parts[i] + " /",
- Link: "/" + filepath.ToSlash(filepath.Join(parts[0:i+1]...)),
+ Link: "/" + util.Join(parts[0:i+1]...),
}
if !strings.HasSuffix(crumb.Link, "/") {
crumb.Link += "/"
diff --git a/weed/server/filer_ui/templates.go b/weed/server/filer_ui/templates.go
index e532b27e2..648b97f22 100644
--- a/weed/server/filer_ui/templates.go
+++ b/weed/server/filer_ui/templates.go
@@ -1,20 +1,31 @@
-package master_ui
+package filer_ui
import (
"github.com/dustin/go-humanize"
"html/template"
+ "net/url"
+ "strings"
)
+func printpath(parts ...string) string {
+ concat := strings.Join(parts, "")
+ escaped := url.PathEscape(concat)
+ return strings.ReplaceAll(escaped, "%2F", "/")
+}
+
var funcMap = template.FuncMap{
"humanizeBytes": humanize.Bytes,
+ "printpath": printpath,
}
var StatusTpl = template.Must(template.New("status").Funcs(funcMap).Parse(`<!DOCTYPE html>
<html>
<head>
- <title>SeaweedFS Filer</title>
- <link rel="stylesheet" href="/seaweedfsstatic/bootstrap/3.3.1/css/bootstrap.min.css">
+ <title>SeaweedFS Filer</title>
+ <meta name="viewport" content="width=device-width, initial-scale=1">
+ <link rel="stylesheet" href="/seaweedfsstatic/bootstrap/3.3.1/css/bootstrap.min.css">
<style>
+body { padding-bottom: 128px; }
#drop-area {
border: 1px transparent;
}
@@ -37,6 +48,11 @@ var StatusTpl = template.Must(template.New("status").Funcs(funcMap).Parse(`<!DOC
#fileElem {
display: none;
}
+.qrImage {
+ display: block;
+ margin-left: auto;
+ margin-right: auto;
+}
</style>
</head>
<body>
@@ -50,7 +66,7 @@ var StatusTpl = template.Must(template.New("status").Funcs(funcMap).Parse(`<!DOC
<div class="row">
<div>
{{ range $entry := .Breadcrumbs }}
- <a href="{{ $entry.Link }}" >
+ <a href="{{ printpath $entry.Link }}" >
{{ $entry.Name }}
</a>
{{ end }}
@@ -69,11 +85,11 @@ var StatusTpl = template.Must(template.New("status").Funcs(funcMap).Parse(`<!DOC
<td>
{{if $entry.IsDirectory}}
<img src="/seaweedfsstatic/images/folder.gif" width="20" height="23">
- <a href={{ print $path "/" $entry.Name "/"}} >
+ <a href="{{ printpath $path "/" $entry.Name "/"}}" >
{{ $entry.Name }}
</a>
{{else}}
- <a href={{ print $path "/" $entry.Name }} >
+ <a href="{{ printpath $path "/" $entry.Name }}" >
{{ $entry.Name }}
</a>
{{end}}
@@ -107,6 +123,14 @@ var StatusTpl = template.Must(template.New("status").Funcs(funcMap).Parse(`<!DOC
</a>
</div>
{{end}}
+
+ <br/>
+ <br/>
+
+ <div class="navbar navbar-fixed-bottom">
+ <img src="data:image/png;base64,{{.QrImage}}" class="qrImage" />
+ </div>
+
</div>
</body>
<script type="text/javascript">
diff --git a/weed/server/gateway_server.go b/weed/server/gateway_server.go
new file mode 100644
index 000000000..608217ed7
--- /dev/null
+++ b/weed/server/gateway_server.go
@@ -0,0 +1,106 @@
+package weed_server
+
+import (
+ "github.com/chrislusf/seaweedfs/weed/operation"
+ "google.golang.org/grpc"
+ "math/rand"
+ "net/http"
+
+ "github.com/chrislusf/seaweedfs/weed/util"
+
+ _ "github.com/chrislusf/seaweedfs/weed/filer/cassandra"
+ _ "github.com/chrislusf/seaweedfs/weed/filer/elastic/v7"
+ _ "github.com/chrislusf/seaweedfs/weed/filer/etcd"
+ _ "github.com/chrislusf/seaweedfs/weed/filer/hbase"
+ _ "github.com/chrislusf/seaweedfs/weed/filer/leveldb"
+ _ "github.com/chrislusf/seaweedfs/weed/filer/leveldb2"
+ _ "github.com/chrislusf/seaweedfs/weed/filer/leveldb3"
+ _ "github.com/chrislusf/seaweedfs/weed/filer/mongodb"
+ _ "github.com/chrislusf/seaweedfs/weed/filer/mysql"
+ _ "github.com/chrislusf/seaweedfs/weed/filer/mysql2"
+ _ "github.com/chrislusf/seaweedfs/weed/filer/postgres"
+ _ "github.com/chrislusf/seaweedfs/weed/filer/postgres2"
+ _ "github.com/chrislusf/seaweedfs/weed/filer/redis"
+ _ "github.com/chrislusf/seaweedfs/weed/filer/redis2"
+ "github.com/chrislusf/seaweedfs/weed/glog"
+ _ "github.com/chrislusf/seaweedfs/weed/notification/aws_sqs"
+ _ "github.com/chrislusf/seaweedfs/weed/notification/gocdk_pub_sub"
+ _ "github.com/chrislusf/seaweedfs/weed/notification/google_pub_sub"
+ _ "github.com/chrislusf/seaweedfs/weed/notification/kafka"
+ _ "github.com/chrislusf/seaweedfs/weed/notification/log"
+ "github.com/chrislusf/seaweedfs/weed/security"
+)
+
+type GatewayOption struct {
+ Masters []string
+ Filers []string
+ MaxMB int
+ IsSecure bool
+}
+
+type GatewayServer struct {
+ option *GatewayOption
+ secret security.SigningKey
+ grpcDialOption grpc.DialOption
+}
+
+func NewGatewayServer(defaultMux *http.ServeMux, option *GatewayOption) (fs *GatewayServer, err error) {
+
+ fs = &GatewayServer{
+ option: option,
+ grpcDialOption: security.LoadClientTLS(util.GetViper(), "grpc.client"),
+ }
+
+ if len(option.Masters) == 0 {
+ glog.Fatal("master list is required!")
+ }
+
+ defaultMux.HandleFunc("/blobs/", fs.blobsHandler)
+ defaultMux.HandleFunc("/files/", fs.filesHandler)
+ defaultMux.HandleFunc("/topics/", fs.topicsHandler)
+
+ return fs, nil
+}
+
+func (fs *GatewayServer) getMaster() string {
+ randMaster := rand.Intn(len(fs.option.Masters))
+ return fs.option.Masters[randMaster]
+}
+
+func (fs *GatewayServer) blobsHandler(w http.ResponseWriter, r *http.Request) {
+ switch r.Method {
+ case "DELETE":
+ chunkId := r.URL.Path[len("/blobs/"):]
+ fullUrl, err := operation.LookupFileId(fs.getMaster, chunkId)
+ if err != nil {
+ writeJsonError(w, r, http.StatusNotFound, err)
+ return
+ }
+ var jwtAuthorization security.EncodedJwt
+ if fs.option.IsSecure {
+ jwtAuthorization = operation.LookupJwt(fs.getMaster(), chunkId)
+ }
+ body, statusCode, err := util.DeleteProxied(fullUrl, string(jwtAuthorization))
+ if err != nil {
+ writeJsonError(w, r, http.StatusNotFound, err)
+ return
+ }
+ w.WriteHeader(statusCode)
+ w.Write(body)
+ case "POST":
+ submitForClientHandler(w, r, fs.getMaster, fs.grpcDialOption)
+ }
+}
+
+func (fs *GatewayServer) filesHandler(w http.ResponseWriter, r *http.Request) {
+ switch r.Method {
+ case "DELETE":
+ case "POST":
+ }
+}
+
+func (fs *GatewayServer) topicsHandler(w http.ResponseWriter, r *http.Request) {
+ switch r.Method {
+ case "POST":
+ }
+}
diff --git a/weed/server/master_grpc_server.go b/weed/server/master_grpc_server.go
index fcfd98f7b..3e6d9bb9e 100644
--- a/weed/server/master_grpc_server.go
+++ b/weed/server/master_grpc_server.go
@@ -1,29 +1,32 @@
package weed_server
import (
+ "context"
"fmt"
+ "github.com/chrislusf/seaweedfs/weed/storage/backend"
"net"
"strings"
"time"
"github.com/chrislusf/raft"
+ "google.golang.org/grpc/peer"
+
"github.com/chrislusf/seaweedfs/weed/glog"
"github.com/chrislusf/seaweedfs/weed/pb/master_pb"
- "github.com/chrislusf/seaweedfs/weed/storage/backend"
"github.com/chrislusf/seaweedfs/weed/storage/needle"
"github.com/chrislusf/seaweedfs/weed/topology"
- "google.golang.org/grpc/peer"
)
func (ms *MasterServer) SendHeartbeat(stream master_pb.Seaweed_SendHeartbeatServer) error {
var dn *topology.DataNode
- t := ms.Topo
defer func() {
if dn != nil {
+ // if the volume server disconnects and reconnects quickly
+ // the unregister and register can race with each other
+ ms.Topo.UnRegisterDataNode(dn)
glog.V(0).Infof("unregister disconnected volume server %s:%d", dn.Ip, dn.Port)
- t.UnRegisterDataNode(dn)
message := &master_pb.VolumeLocation{
Url: dn.Url(),
@@ -58,39 +61,33 @@ func (ms *MasterServer) SendHeartbeat(stream master_pb.Seaweed_SendHeartbeatServ
return err
}
- t.Sequence.SetMax(heartbeat.MaxFileKey)
+ ms.Topo.Sequence.SetMax(heartbeat.MaxFileKey)
if dn == nil {
- if heartbeat.Ip == "" {
- if pr, ok := peer.FromContext(stream.Context()); ok {
- if pr.Addr != net.Addr(nil) {
- heartbeat.Ip = pr.Addr.String()[0:strings.LastIndex(pr.Addr.String(), ":")]
- glog.V(0).Infof("remote IP address is detected as %v", heartbeat.Ip)
- }
- }
- }
- dcName, rackName := t.Configuration.Locate(heartbeat.Ip, heartbeat.DataCenter, heartbeat.Rack)
- dc := t.GetOrCreateDataCenter(dcName)
+ dcName, rackName := ms.Topo.Configuration.Locate(heartbeat.Ip, heartbeat.DataCenter, heartbeat.Rack)
+ dc := ms.Topo.GetOrCreateDataCenter(dcName)
rack := dc.GetOrCreateRack(rackName)
- dn = rack.GetOrCreateDataNode(heartbeat.Ip,
- int(heartbeat.Port), heartbeat.PublicUrl,
- int64(heartbeat.MaxVolumeCount))
+ dn = rack.GetOrCreateDataNode(heartbeat.Ip, int(heartbeat.Port), heartbeat.PublicUrl, heartbeat.MaxVolumeCounts)
glog.V(0).Infof("added volume server %v:%d", heartbeat.GetIp(), heartbeat.GetPort())
if err := stream.Send(&master_pb.HeartbeatResponse{
- VolumeSizeLimit: uint64(ms.option.VolumeSizeLimitMB) * 1024 * 1024,
- MetricsAddress: ms.option.MetricsAddress,
- MetricsIntervalSeconds: uint32(ms.option.MetricsIntervalSec),
- StorageBackends: backend.ToPbStorageBackends(),
+ VolumeSizeLimit: uint64(ms.option.VolumeSizeLimitMB) * 1024 * 1024,
}); err != nil {
glog.Warningf("SendHeartbeat.Send volume size to %s:%d %v", dn.Ip, dn.Port, err)
return err
}
}
+ dn.AdjustMaxVolumeCounts(heartbeat.MaxVolumeCounts)
+
glog.V(4).Infof("master received heartbeat %s", heartbeat.String())
+ var dataCenter string
+ if dc := dn.GetDataCenter(); dc != nil {
+ dataCenter = string(dc.Id())
+ }
message := &master_pb.VolumeLocation{
- Url: dn.Url(),
- PublicUrl: dn.PublicUrl,
+ Url: dn.Url(),
+ PublicUrl: dn.PublicUrl,
+ DataCenter: dataCenter,
}
if len(heartbeat.NewVolumes) > 0 || len(heartbeat.DeletedVolumes) > 0 {
// process delta volume ids if exists for fast volume id updates
@@ -101,12 +98,12 @@ func (ms *MasterServer) SendHeartbeat(stream master_pb.Seaweed_SendHeartbeatServ
message.DeletedVids = append(message.DeletedVids, volInfo.Id)
}
// update master internal volume layouts
- t.IncrementalSyncDataNodeRegistration(heartbeat.NewVolumes, heartbeat.DeletedVolumes, dn)
+ ms.Topo.IncrementalSyncDataNodeRegistration(heartbeat.NewVolumes, heartbeat.DeletedVolumes, dn)
}
if len(heartbeat.Volumes) > 0 || heartbeat.HasNoVolumes {
// process heartbeat.Volumes
- newVolumes, deletedVolumes := t.SyncDataNodeRegistration(heartbeat.Volumes, dn)
+ newVolumes, deletedVolumes := ms.Topo.SyncDataNodeRegistration(heartbeat.Volumes, dn)
for _, v := range newVolumes {
glog.V(0).Infof("master see new volume %d from %s", uint32(v.Id), dn.Url())
@@ -121,7 +118,7 @@ func (ms *MasterServer) SendHeartbeat(stream master_pb.Seaweed_SendHeartbeatServ
if len(heartbeat.NewEcShards) > 0 || len(heartbeat.DeletedEcShards) > 0 {
// update master internal volume layouts
- t.IncrementalSyncDataNodeEcShards(heartbeat.NewEcShards, heartbeat.DeletedEcShards, dn)
+ ms.Topo.IncrementalSyncDataNodeEcShards(heartbeat.NewEcShards, heartbeat.DeletedEcShards, dn)
for _, s := range heartbeat.NewEcShards {
message.NewVids = append(message.NewVids, s.Id)
@@ -136,8 +133,8 @@ func (ms *MasterServer) SendHeartbeat(stream master_pb.Seaweed_SendHeartbeatServ
}
if len(heartbeat.EcShards) > 0 || heartbeat.HasNoEcShards {
- glog.V(1).Infof("master recieved ec shards from %s: %+v", dn.Url(), heartbeat.EcShards)
- newShards, deletedShards := t.SyncDataNodeEcShards(heartbeat.EcShards, dn)
+ glog.V(1).Infof("master received ec shards from %s: %+v", dn.Url(), heartbeat.EcShards)
+ newShards, deletedShards := ms.Topo.SyncDataNodeEcShards(heartbeat.EcShards, dn)
// broadcast the ec vid changes to master clients
for _, s := range newShards {
@@ -151,7 +148,6 @@ func (ms *MasterServer) SendHeartbeat(stream master_pb.Seaweed_SendHeartbeatServ
}
}
-
if len(message.NewVids) > 0 || len(message.DeletedVids) > 0 {
ms.clientChansLock.RLock()
for host, ch := range ms.clientChans {
@@ -162,7 +158,7 @@ func (ms *MasterServer) SendHeartbeat(stream master_pb.Seaweed_SendHeartbeatServ
}
// tell the volume servers about the leader
- newLeader, err := t.Leader()
+ newLeader, err := ms.Topo.Leader()
if err != nil {
glog.Warningf("SendHeartbeat find leader: %v", err)
return err
@@ -189,35 +185,14 @@ func (ms *MasterServer) KeepConnected(stream master_pb.Seaweed_KeepConnectedServ
return ms.informNewLeader(stream)
}
- // remember client address
- ctx := stream.Context()
- // fmt.Printf("FromContext %+v\n", ctx)
- pr, ok := peer.FromContext(ctx)
- if !ok {
- glog.Error("failed to get peer from ctx")
- return fmt.Errorf("failed to get peer from ctx")
- }
- if pr.Addr == net.Addr(nil) {
- glog.Error("failed to get peer address")
- return fmt.Errorf("failed to get peer address")
- }
+ peerAddress := findClientAddress(stream.Context(), req.GrpcPort)
- clientName := req.Name + pr.Addr.String()
- glog.V(0).Infof("+ client %v", clientName)
+ // buffer by 1 so we don't end up getting stuck writing to stopChan forever
+ stopChan := make(chan bool, 1)
- messageChan := make(chan *master_pb.VolumeLocation)
- stopChan := make(chan bool)
+ clientName, messageChan := ms.addClient(req.Name, peerAddress)
- ms.clientChansLock.Lock()
- ms.clientChans[clientName] = messageChan
- ms.clientChansLock.Unlock()
-
- defer func() {
- glog.V(0).Infof("- client %v", clientName)
- ms.clientChansLock.Lock()
- delete(ms.clientChans, clientName)
- ms.clientChansLock.Unlock()
- }()
+ defer ms.deleteClient(clientName)
for _, message := range ms.Topo.ToVolumeLocations() {
if err := stream.Send(message); err != nil {
@@ -253,7 +228,6 @@ func (ms *MasterServer) KeepConnected(stream master_pb.Seaweed_KeepConnectedServ
}
}
- return nil
}
func (ms *MasterServer) informNewLeader(stream master_pb.Seaweed_KeepConnectedServer) error {
@@ -269,3 +243,78 @@ func (ms *MasterServer) informNewLeader(stream master_pb.Seaweed_KeepConnectedSe
}
return nil
}
+
+func (ms *MasterServer) addClient(clientType string, clientAddress string) (clientName string, messageChan chan *master_pb.VolumeLocation) {
+ clientName = clientType + "@" + clientAddress
+ glog.V(0).Infof("+ client %v", clientName)
+
+ // we buffer this because otherwise we end up in a potential deadlock where
+ // the KeepConnected loop is no longer listening on this channel but we're
+ // trying to send to it in SendHeartbeat and so we can't lock the
+ // clientChansLock to remove the channel and we're stuck writing to it
+ // 100 is probably overkill
+ messageChan = make(chan *master_pb.VolumeLocation, 100)
+
+ ms.clientChansLock.Lock()
+ ms.clientChans[clientName] = messageChan
+ ms.clientChansLock.Unlock()
+ return
+}
+
+func (ms *MasterServer) deleteClient(clientName string) {
+ glog.V(0).Infof("- client %v", clientName)
+ ms.clientChansLock.Lock()
+ delete(ms.clientChans, clientName)
+ ms.clientChansLock.Unlock()
+}
+
+func findClientAddress(ctx context.Context, grpcPort uint32) string {
+ // fmt.Printf("FromContext %+v\n", ctx)
+ pr, ok := peer.FromContext(ctx)
+ if !ok {
+ glog.Error("failed to get peer from ctx")
+ return ""
+ }
+ if pr.Addr == net.Addr(nil) {
+ glog.Error("failed to get peer address")
+ return ""
+ }
+ if grpcPort == 0 {
+ return pr.Addr.String()
+ }
+ if tcpAddr, ok := pr.Addr.(*net.TCPAddr); ok {
+ externalIP := tcpAddr.IP
+ return fmt.Sprintf("%s:%d", externalIP, grpcPort)
+ }
+ return pr.Addr.String()
+
+}
+
+func (ms *MasterServer) ListMasterClients(ctx context.Context, req *master_pb.ListMasterClientsRequest) (*master_pb.ListMasterClientsResponse, error) {
+ resp := &master_pb.ListMasterClientsResponse{}
+ ms.clientChansLock.RLock()
+ defer ms.clientChansLock.RUnlock()
+
+ for k := range ms.clientChans {
+ if strings.HasPrefix(k, req.ClientType+"@") {
+ resp.GrpcAddresses = append(resp.GrpcAddresses, k[len(req.ClientType)+1:])
+ }
+ }
+ return resp, nil
+}
+
+func (ms *MasterServer) GetMasterConfiguration(ctx context.Context, req *master_pb.GetMasterConfigurationRequest) (*master_pb.GetMasterConfigurationResponse, error) {
+
+ // tell the volume servers about the leader
+ leader, _ := ms.Topo.Leader()
+
+ resp := &master_pb.GetMasterConfigurationResponse{
+ MetricsAddress: ms.option.MetricsAddress,
+ MetricsIntervalSeconds: uint32(ms.option.MetricsIntervalSec),
+ StorageBackends: backend.ToPbStorageBackends(),
+ DefaultReplication: ms.option.DefaultReplicaPlacement,
+ Leader: leader,
+ }
+
+ return resp, nil
+}
diff --git a/weed/server/master_grpc_server_admin.go b/weed/server/master_grpc_server_admin.go
new file mode 100644
index 000000000..93c9e4e4e
--- /dev/null
+++ b/weed/server/master_grpc_server_admin.go
@@ -0,0 +1,143 @@
+package weed_server
+
+import (
+ "context"
+ "fmt"
+ "github.com/chrislusf/seaweedfs/weed/glog"
+ "math/rand"
+ "sync"
+ "time"
+
+ "github.com/chrislusf/seaweedfs/weed/pb/master_pb"
+)
+
+/*
+How exclusive lock works?
+-----------
+
+Shell
+------
+When shell lock,
+ * lease an admin token (lockTime, token)
+ * start a goroutine to renew the admin token periodically
+
+When shell unlock
+ * stop the renewal goroutine
+ * sends a release lock request
+
+Master
+------
+Master maintains:
+ * randomNumber
+ * lastLockTime
+When master receives the lease/renew request from shell
+ If lastLockTime still fresh {
+ if is a renew and token is valid {
+ // for renew
+ generate the randomNumber => token
+ return
+ }
+ refuse
+ return
+ } else {
+ // for fresh lease request
+ generate the randomNumber => token
+ return
+ }
+
+When master receives the release lock request from shell
+ set the lastLockTime to zero
+
+
+The volume server does not need to verify.
+This makes the lock/unlock optional, similar to what golang code usually does.
+
+*/
+
+const (
+ LockDuration = 10 * time.Second
+)
+
+type AdminLock struct {
+ accessSecret int64
+ accessLockTime time.Time
+ lastClient string
+}
+
+type AdminLocks struct {
+ locks map[string]*AdminLock
+ sync.RWMutex
+}
+
+func NewAdminLocks() *AdminLocks {
+ return &AdminLocks{
+ locks: make(map[string]*AdminLock),
+ }
+}
+
+func (locks *AdminLocks) isLocked(lockName string) (clientName string, isLocked bool) {
+ locks.RLock()
+ defer locks.RUnlock()
+ adminLock, found := locks.locks[lockName]
+ if !found {
+ return "", false
+ }
+ glog.V(4).Infof("isLocked %v", adminLock.lastClient)
+ return adminLock.lastClient, adminLock.accessLockTime.Add(LockDuration).After(time.Now())
+}
+
+func (locks *AdminLocks) isValidToken(lockName string, ts time.Time, token int64) bool {
+ locks.RLock()
+ defer locks.RUnlock()
+ adminLock, found := locks.locks[lockName]
+ if !found {
+ return false
+ }
+ return adminLock.accessLockTime.Equal(ts) && adminLock.accessSecret == token
+}
+
+func (locks *AdminLocks) generateToken(lockName string, clientName string) (ts time.Time, token int64) {
+ locks.Lock()
+ defer locks.Unlock()
+ lock := &AdminLock{
+ accessSecret: rand.Int63(),
+ accessLockTime: time.Now(),
+ lastClient: clientName,
+ }
+ locks.locks[lockName] = lock
+ return lock.accessLockTime, lock.accessSecret
+}
+
+func (locks *AdminLocks) deleteLock(lockName string) {
+ locks.Lock()
+ defer locks.Unlock()
+ delete(locks.locks, lockName)
+}
+
+func (ms *MasterServer) LeaseAdminToken(ctx context.Context, req *master_pb.LeaseAdminTokenRequest) (*master_pb.LeaseAdminTokenResponse, error) {
+ resp := &master_pb.LeaseAdminTokenResponse{}
+
+ if lastClient, isLocked := ms.adminLocks.isLocked(req.LockName); isLocked {
+ glog.V(4).Infof("LeaseAdminToken %v", lastClient)
+ if req.PreviousToken != 0 && ms.adminLocks.isValidToken(req.LockName, time.Unix(0, req.PreviousLockTime), req.PreviousToken) {
+ // for renew
+ ts, token := ms.adminLocks.generateToken(req.LockName, req.ClientName)
+ resp.Token, resp.LockTsNs = token, ts.UnixNano()
+ return resp, nil
+ }
+ // refuse since still locked
+ return resp, fmt.Errorf("already locked by " + lastClient)
+ }
+ // for fresh lease request
+ ts, token := ms.adminLocks.generateToken(req.LockName, req.ClientName)
+ resp.Token, resp.LockTsNs = token, ts.UnixNano()
+ return resp, nil
+}
+
+func (ms *MasterServer) ReleaseAdminToken(ctx context.Context, req *master_pb.ReleaseAdminTokenRequest) (*master_pb.ReleaseAdminTokenResponse, error) {
+ resp := &master_pb.ReleaseAdminTokenResponse{}
+ if ms.adminLocks.isValidToken(req.LockName, time.Unix(0, req.PreviousLockTime), req.PreviousToken) {
+ ms.adminLocks.deleteLock(req.LockName)
+ }
+ return resp, nil
+}
diff --git a/weed/server/master_grpc_server_collection.go b/weed/server/master_grpc_server_collection.go
index f8e0785f6..b92d6bcbe 100644
--- a/weed/server/master_grpc_server_collection.go
+++ b/weed/server/master_grpc_server_collection.go
@@ -4,6 +4,7 @@ import (
"context"
"github.com/chrislusf/raft"
+
"github.com/chrislusf/seaweedfs/weed/operation"
"github.com/chrislusf/seaweedfs/weed/pb/master_pb"
"github.com/chrislusf/seaweedfs/weed/pb/volume_server_pb"
diff --git a/weed/server/master_grpc_server_volume.go b/weed/server/master_grpc_server_volume.go
index 856c07890..156afd4a1 100644
--- a/weed/server/master_grpc_server_volume.go
+++ b/weed/server/master_grpc_server_volume.go
@@ -3,8 +3,8 @@ package weed_server
import (
"context"
"fmt"
-
"github.com/chrislusf/raft"
+ "github.com/chrislusf/seaweedfs/weed/storage/types"
"github.com/chrislusf/seaweedfs/weed/pb/master_pb"
"github.com/chrislusf/seaweedfs/weed/security"
@@ -61,11 +61,13 @@ func (ms *MasterServer) Assign(ctx context.Context, req *master_pb.AssignRequest
if err != nil {
return nil, err
}
+ diskType := types.ToDiskType(req.DiskType)
option := &topology.VolumeGrowOption{
Collection: req.Collection,
ReplicaPlacement: replicaPlacement,
Ttl: ttl,
+ DiskType: diskType,
Prealloacte: ms.preallocateSize,
DataCenter: req.DataCenter,
Rack: req.Rack,
@@ -74,8 +76,8 @@ func (ms *MasterServer) Assign(ctx context.Context, req *master_pb.AssignRequest
}
if !ms.Topo.HasWritableVolume(option) {
- if ms.Topo.FreeSpace() <= 0 {
- return nil, fmt.Errorf("No free volumes left!")
+ if ms.Topo.AvailableSpaceFor(option) <= 0 {
+ return nil, fmt.Errorf("no free volumes left for " + option.String())
}
ms.vgLock.Lock()
if !ms.Topo.HasWritableVolume(option) {
@@ -118,9 +120,8 @@ func (ms *MasterServer) Statistics(ctx context.Context, req *master_pb.Statistic
return nil, err
}
- volumeLayout := ms.Topo.GetVolumeLayout(req.Collection, replicaPlacement, ttl)
+ volumeLayout := ms.Topo.GetVolumeLayout(req.Collection, replicaPlacement, ttl, types.ToDiskType(req.DiskType))
stats := volumeLayout.Stats()
-
resp := &master_pb.StatisticsResponse{
TotalSize: stats.TotalSize,
UsedSize: stats.UsedSize,
@@ -177,12 +178,15 @@ func (ms *MasterServer) LookupEcVolume(ctx context.Context, req *master_pb.Looku
return resp, nil
}
-func (ms *MasterServer) GetMasterConfiguration(ctx context.Context, req *master_pb.GetMasterConfigurationRequest) (*master_pb.GetMasterConfigurationResponse, error) {
+func (ms *MasterServer) VacuumVolume(ctx context.Context, req *master_pb.VacuumVolumeRequest) (*master_pb.VacuumVolumeResponse, error) {
- resp := &master_pb.GetMasterConfigurationResponse{
- MetricsAddress: ms.option.MetricsAddress,
- MetricsIntervalSeconds: uint32(ms.option.MetricsIntervalSec),
+ if !ms.Topo.IsLeader() {
+ return nil, raft.NotLeaderError
}
+ resp := &master_pb.VacuumVolumeResponse{}
+
+ ms.Topo.Vacuum(ms.grpcDialOption, float64(req.GarbageThreshold), ms.preallocateSize)
+
return resp, nil
}
diff --git a/weed/server/master_server.go b/weed/server/master_server.go
index 33a5129da..e2b2df18d 100644
--- a/weed/server/master_server.go
+++ b/weed/server/master_server.go
@@ -1,19 +1,20 @@
package weed_server
import (
- "context"
"fmt"
"net/http"
"net/http/httputil"
"net/url"
"os"
"regexp"
- "strconv"
"strings"
"sync"
"time"
"github.com/chrislusf/raft"
+ "github.com/gorilla/mux"
+ "google.golang.org/grpc"
+
"github.com/chrislusf/seaweedfs/weed/glog"
"github.com/chrislusf/seaweedfs/weed/pb/master_pb"
"github.com/chrislusf/seaweedfs/weed/security"
@@ -22,9 +23,6 @@ import (
"github.com/chrislusf/seaweedfs/weed/topology"
"github.com/chrislusf/seaweedfs/weed/util"
"github.com/chrislusf/seaweedfs/weed/wdclient"
- "github.com/gorilla/mux"
- "github.com/spf13/viper"
- "google.golang.org/grpc"
)
const (
@@ -33,11 +31,12 @@ const (
)
type MasterOption struct {
- Port int
- MetaFolder string
- VolumeSizeLimitMB uint
- VolumePreallocate bool
- PulseSeconds int
+ Host string
+ Port int
+ MetaFolder string
+ VolumeSizeLimitMB uint
+ VolumePreallocate bool
+ // PulseSeconds int
DefaultReplicaPlacement string
GarbageThreshold float64
WhiteList []string
@@ -56,7 +55,7 @@ type MasterServer struct {
vg *topology.VolumeGrowth
vgLock sync.Mutex
- bounedLeaderChan chan int
+ boundedLeaderChan chan int
// notifying clients
clientChansLock sync.RWMutex
@@ -65,11 +64,13 @@ type MasterServer struct {
grpcDialOption grpc.DialOption
MasterClient *wdclient.MasterClient
+
+ adminLocks *AdminLocks
}
func NewMasterServer(r *mux.Router, option *MasterOption, peers []string) *MasterServer {
- v := viper.GetViper()
+ v := util.GetViper()
signingKey := v.GetString("jwt.signing.key")
v.SetDefault("jwt.signing.expires_after_seconds", 10)
expiresAfterSec := v.GetInt("jwt.signing.expires_after_seconds")
@@ -78,35 +79,39 @@ func NewMasterServer(r *mux.Router, option *MasterOption, peers []string) *Maste
v.SetDefault("jwt.signing.read.expires_after_seconds", 60)
readExpiresAfterSec := v.GetInt("jwt.signing.read.expires_after_seconds")
+ v.SetDefault("master.replication.treat_replication_as_minimums", false)
+ replicationAsMin := v.GetBool("master.replication.treat_replication_as_minimums")
+
var preallocateSize int64
if option.VolumePreallocate {
preallocateSize = int64(option.VolumeSizeLimitMB) * (1 << 20)
}
- grpcDialOption := security.LoadClientTLS(v.Sub("grpc"), "master")
+ grpcDialOption := security.LoadClientTLS(v, "grpc.master")
ms := &MasterServer{
option: option,
preallocateSize: preallocateSize,
clientChans: make(map[string]chan *master_pb.VolumeLocation),
grpcDialOption: grpcDialOption,
- MasterClient: wdclient.NewMasterClient(context.Background(), grpcDialOption, "master", peers),
+ MasterClient: wdclient.NewMasterClient(grpcDialOption, "master", option.Host, 0, "", peers),
+ adminLocks: NewAdminLocks(),
}
- ms.bounedLeaderChan = make(chan int, 16)
+ ms.boundedLeaderChan = make(chan int, 16)
seq := ms.createSequencer(option)
if nil == seq {
glog.Fatalf("create sequencer failed.")
}
- ms.Topo = topology.NewTopology("topo", seq, uint64(ms.option.VolumeSizeLimitMB)*1024*1024, ms.option.PulseSeconds)
+ ms.Topo = topology.NewTopology("topo", seq, uint64(ms.option.VolumeSizeLimitMB)*1024*1024, 5, replicationAsMin)
ms.vg = topology.NewDefaultVolumeGrowth()
glog.V(0).Infoln("Volume Size Limit is", ms.option.VolumeSizeLimitMB, "MB")
ms.guard = security.NewGuard(ms.option.WhiteList, signingKey, expiresAfterSec, readSigningKey, readExpiresAfterSec)
+ handleStaticResources2(r)
+ r.HandleFunc("/", ms.proxyToLeader(ms.uiStatusHandler))
+ r.HandleFunc("/ui/index.html", ms.uiStatusHandler)
if !ms.option.DisableHttp {
- handleStaticResources2(r)
- r.HandleFunc("/", ms.proxyToLeader(ms.uiStatusHandler))
- r.HandleFunc("/ui/index.html", ms.uiStatusHandler)
r.HandleFunc("/dir/assign", ms.proxyToLeader(ms.guard.WhiteList(ms.dirAssignHandler)))
r.HandleFunc("/dir/lookup", ms.guard.WhiteList(ms.dirLookupHandler))
r.HandleFunc("/dir/status", ms.proxyToLeader(ms.guard.WhiteList(ms.dirStatusHandler)))
@@ -115,9 +120,11 @@ func NewMasterServer(r *mux.Router, option *MasterOption, peers []string) *Maste
r.HandleFunc("/vol/status", ms.proxyToLeader(ms.guard.WhiteList(ms.volumeStatusHandler)))
r.HandleFunc("/vol/vacuum", ms.proxyToLeader(ms.guard.WhiteList(ms.volumeVacuumHandler)))
r.HandleFunc("/submit", ms.guard.WhiteList(ms.submitFromMasterServerHandler))
- r.HandleFunc("/stats/health", ms.guard.WhiteList(statsHealthHandler))
- r.HandleFunc("/stats/counter", ms.guard.WhiteList(statsCounterHandler))
- r.HandleFunc("/stats/memory", ms.guard.WhiteList(statsMemoryHandler))
+ /*
+ r.HandleFunc("/stats/health", ms.guard.WhiteList(statsHealthHandler))
+ r.HandleFunc("/stats/counter", ms.guard.WhiteList(statsCounterHandler))
+ r.HandleFunc("/stats/memory", ms.guard.WhiteList(statsMemoryHandler))
+ */
r.HandleFunc("/{fileId}", ms.redirectHandler)
}
@@ -131,14 +138,11 @@ func NewMasterServer(r *mux.Router, option *MasterOption, peers []string) *Maste
func (ms *MasterServer) SetRaftServer(raftServer *RaftServer) {
ms.Topo.RaftServer = raftServer.raftServer
ms.Topo.RaftServer.AddEventListener(raft.LeaderChangeEventType, func(e raft.Event) {
- glog.V(0).Infof("event: %+v", e)
+ glog.V(0).Infof("leader change event: %+v => %+v", e.PrevValue(), e.Value())
if ms.Topo.RaftServer.Leader() != "" {
glog.V(0).Infoln("[", ms.Topo.RaftServer.Name(), "]", ms.Topo.RaftServer.Leader(), "becomes leader.")
}
})
- ms.Topo.RaftServer.AddEventListener(raft.StateChangeEventType, func(e raft.Event) {
- glog.V(0).Infof("state change: %+v", e)
- })
if ms.Topo.IsLeader() {
glog.V(0).Infoln("[", ms.Topo.RaftServer.Name(), "]", "I am the leader!")
} else {
@@ -148,13 +152,13 @@ func (ms *MasterServer) SetRaftServer(raftServer *RaftServer) {
}
}
-func (ms *MasterServer) proxyToLeader(f func(w http.ResponseWriter, r *http.Request)) func(w http.ResponseWriter, r *http.Request) {
+func (ms *MasterServer) proxyToLeader(f http.HandlerFunc) http.HandlerFunc {
return func(w http.ResponseWriter, r *http.Request) {
if ms.Topo.IsLeader() {
f(w, r)
} else if ms.Topo.RaftServer != nil && ms.Topo.RaftServer.Leader() != "" {
- ms.bounedLeaderChan <- 1
- defer func() { <-ms.bounedLeaderChan }()
+ ms.boundedLeaderChan <- 1
+ defer func() { <-ms.boundedLeaderChan }()
targetUrl, err := url.Parse("http://" + ms.Topo.RaftServer.Leader())
if err != nil {
writeJsonError(w, r, http.StatusInternalServerError,
@@ -183,7 +187,7 @@ func (ms *MasterServer) proxyToLeader(f func(w http.ResponseWriter, r *http.Requ
func (ms *MasterServer) startAdminScripts() {
var err error
- v := viper.GetViper()
+ v := util.GetViper()
adminScripts := v.GetString("master.maintenance.scripts")
glog.V(0).Infof("adminScripts:\n%v", adminScripts)
if adminScripts == "" {
@@ -193,20 +197,25 @@ func (ms *MasterServer) startAdminScripts() {
v.SetDefault("master.maintenance.sleep_minutes", 17)
sleepMinutes := v.GetInt("master.maintenance.sleep_minutes")
- v.SetDefault("master.filer.default_filer_url", "http://localhost:8888/")
- filerURL := v.GetString("master.filer.default_filer_url")
+ v.SetDefault("master.filer.default", "localhost:8888")
+ filerHostPort := v.GetString("master.filer.default")
scriptLines := strings.Split(adminScripts, "\n")
+ if !strings.Contains(adminScripts, "lock") {
+ scriptLines = append(append([]string{}, "lock"), scriptLines...)
+ scriptLines = append(scriptLines, "unlock")
+ }
- masterAddress := "localhost:" + strconv.Itoa(ms.option.Port)
+ masterAddress := fmt.Sprintf("%s:%d", ms.option.Host, ms.option.Port)
var shellOptions shell.ShellOptions
- shellOptions.GrpcDialOption = security.LoadClientTLS(viper.Sub("grpc"), "master")
+ shellOptions.GrpcDialOption = security.LoadClientTLS(v, "grpc.master")
shellOptions.Masters = &masterAddress
- shellOptions.FilerHost, shellOptions.FilerPort, shellOptions.Directory, err = util.ParseFilerUrl(filerURL)
+ shellOptions.FilerHost, shellOptions.FilerPort, err = util.ParseHostPort(filerHostPort)
+ shellOptions.Directory = "/"
if err != nil {
- glog.V(0).Infof("failed to parse master.filer.default_filer_urll=%s : %v\n", filerURL, err)
+ glog.V(0).Infof("failed to parse master.filer.default = %s : %v\n", filerHostPort, err)
return
}
@@ -220,27 +229,11 @@ func (ms *MasterServer) startAdminScripts() {
commandEnv.MasterClient.WaitUntilConnected()
c := time.Tick(time.Duration(sleepMinutes) * time.Minute)
- for _ = range c {
+ for range c {
if ms.Topo.IsLeader() {
for _, line := range scriptLines {
-
- cmds := reg.FindAllString(line, -1)
- if len(cmds) == 0 {
- continue
- }
- args := make([]string, len(cmds[1:]))
- for i := range args {
- args[i] = strings.Trim(string(cmds[1+i]), "\"'")
- }
- cmd := strings.ToLower(cmds[0])
-
- for _, c := range shell.Commands {
- if c.Name() == cmd {
- glog.V(0).Infof("executing: %s %v", cmd, args)
- if err := c.Do(args, commandEnv, os.Stdout); err != nil {
- glog.V(0).Infof("error: %v", err)
- }
- }
+ for _, c := range strings.Split(line, ";") {
+ processEachCmd(reg, c, commandEnv)
}
}
}
@@ -248,9 +241,30 @@ func (ms *MasterServer) startAdminScripts() {
}()
}
+func processEachCmd(reg *regexp.Regexp, line string, commandEnv *shell.CommandEnv) {
+ cmds := reg.FindAllString(line, -1)
+ if len(cmds) == 0 {
+ return
+ }
+ args := make([]string, len(cmds[1:]))
+ for i := range args {
+ args[i] = strings.Trim(string(cmds[1+i]), "\"'")
+ }
+ cmd := strings.ToLower(cmds[0])
+
+ for _, c := range shell.Commands {
+ if c.Name() == cmd {
+ glog.V(0).Infof("executing: %s %v", cmd, args)
+ if err := c.Do(args, commandEnv, os.Stdout); err != nil {
+ glog.V(0).Infof("error: %v", err)
+ }
+ }
+ }
+}
+
func (ms *MasterServer) createSequencer(option *MasterOption) sequence.Sequencer {
var seq sequence.Sequencer
- v := viper.GetViper()
+ v := util.GetViper()
seqType := strings.ToLower(v.GetString(SequencerType))
glog.V(1).Infof("[%s] : [%s]", SequencerType, seqType)
switch strings.ToLower(seqType) {
@@ -263,6 +277,13 @@ func (ms *MasterServer) createSequencer(option *MasterOption) sequence.Sequencer
glog.Error(err)
seq = nil
}
+ case "snowflake":
+ var err error
+ seq, err = sequence.NewSnowflakeSequencer(fmt.Sprintf("%s:%d", option.Host, option.Port))
+ if err != nil {
+ glog.Error(err)
+ seq = nil
+ }
default:
seq = sequence.NewMemorySequencer()
}
diff --git a/weed/server/master_server_handlers.go b/weed/server/master_server_handlers.go
index 514d86800..a9fecc5bd 100644
--- a/weed/server/master_server_handlers.go
+++ b/weed/server/master_server_handlers.go
@@ -45,7 +45,7 @@ func (ms *MasterServer) dirLookupHandler(w http.ResponseWriter, r *http.Request)
vid = fileId[0:commaSep]
}
}
- collection := r.FormValue("collection") //optional, but can be faster if too many collections
+ collection := r.FormValue("collection") // optional, but can be faster if too many collections
location := ms.findVolumeLocation(collection, vid)
httpStatus := http.StatusOK
if location.Error != "" || location.Locations == nil {
@@ -72,9 +72,6 @@ func (ms *MasterServer) findVolumeLocation(collection, vid string) operation.Loo
for _, loc := range machines {
locations = append(locations, operation.Location{Url: loc.Url(), PublicUrl: loc.PublicUrl})
}
- if locations == nil {
- err = fmt.Errorf("volume id %s not found", vid)
- }
}
} else {
machines, getVidLocationsErr := ms.MasterClient.GetVidLocations(vid)
@@ -83,6 +80,9 @@ func (ms *MasterServer) findVolumeLocation(collection, vid string) operation.Loo
}
err = getVidLocationsErr
}
+ if len(locations) == 0 && err == nil {
+ err = fmt.Errorf("volume id %s not found", vid)
+ }
ret := operation.LookupResult{
VolumeId: vid,
Locations: locations,
@@ -112,8 +112,8 @@ func (ms *MasterServer) dirAssignHandler(w http.ResponseWriter, r *http.Request)
}
if !ms.Topo.HasWritableVolume(option) {
- if ms.Topo.FreeSpace() <= 0 {
- writeJsonQuiet(w, r, http.StatusNotFound, operation.AssignResult{Error: "No free volumes left!"})
+ if ms.Topo.AvailableSpaceFor(option) <= 0 {
+ writeJsonQuiet(w, r, http.StatusNotFound, operation.AssignResult{Error: "No free volumes left for " + option.String()})
return
}
ms.vgLock.Lock()
@@ -136,6 +136,9 @@ func (ms *MasterServer) dirAssignHandler(w http.ResponseWriter, r *http.Request)
}
func (ms *MasterServer) maybeAddJwtAuthorization(w http.ResponseWriter, fileId string, isWrite bool) {
+ if fileId == "" {
+ return
+ }
var encodedJwt security.EncodedJwt
if isWrite {
encodedJwt = security.GenJwt(ms.guard.SigningKey, ms.guard.ExpiresAfterSec, fileId)
diff --git a/weed/server/master_server_handlers_admin.go b/weed/server/master_server_handlers_admin.go
index 2965a4863..f24d4e924 100644
--- a/weed/server/master_server_handlers_admin.go
+++ b/weed/server/master_server_handlers_admin.go
@@ -3,6 +3,7 @@ package weed_server
import (
"context"
"fmt"
+ "github.com/chrislusf/seaweedfs/weed/storage/types"
"math/rand"
"net/http"
"strconv"
@@ -44,7 +45,7 @@ func (ms *MasterServer) collectionDeleteHandler(w http.ResponseWriter, r *http.R
func (ms *MasterServer) dirStatusHandler(w http.ResponseWriter, r *http.Request) {
m := make(map[string]interface{})
- m["Version"] = util.VERSION
+ m["Version"] = util.Version()
m["Topology"] = ms.Topo.ToMap()
writeJsonQuiet(w, r, http.StatusOK, m)
}
@@ -61,7 +62,7 @@ func (ms *MasterServer) volumeVacuumHandler(w http.ResponseWriter, r *http.Reque
return
}
}
- glog.Infoln("garbageThreshold =", gcThreshold)
+ // glog.Infoln("garbageThreshold =", gcThreshold)
ms.Topo.Vacuum(ms.grpcDialOption, gcThreshold, ms.preallocateSize)
ms.dirStatusHandler(w, r)
}
@@ -75,8 +76,8 @@ func (ms *MasterServer) volumeGrowHandler(w http.ResponseWriter, r *http.Request
}
if count, err = strconv.Atoi(r.FormValue("count")); err == nil {
- if ms.Topo.FreeSpace() < int64(count*option.ReplicaPlacement.GetCopyCount()) {
- err = fmt.Errorf("only %d volumes left, not enough for %d", ms.Topo.FreeSpace(), count*option.ReplicaPlacement.GetCopyCount())
+ if ms.Topo.AvailableSpaceFor(option) < int64(count*option.ReplicaPlacement.GetCopyCount()) {
+ err = fmt.Errorf("only %d volumes left, not enough for %d", ms.Topo.AvailableSpaceFor(option), count*option.ReplicaPlacement.GetCopyCount())
} else {
count, err = ms.vg.GrowByCountAndType(ms.grpcDialOption, count, option, ms.Topo)
}
@@ -93,7 +94,7 @@ func (ms *MasterServer) volumeGrowHandler(w http.ResponseWriter, r *http.Request
func (ms *MasterServer) volumeStatusHandler(w http.ResponseWriter, r *http.Request) {
m := make(map[string]interface{})
- m["Version"] = util.VERSION
+ m["Version"] = util.Version()
m["Volumes"] = ms.Topo.ToVolumeMap()
writeJsonQuiet(w, r, http.StatusOK, m)
}
@@ -110,7 +111,7 @@ func (ms *MasterServer) redirectHandler(w http.ResponseWriter, r *http.Request)
} else {
url = util.NormalizeUrl(loc.PublicUrl) + r.URL.Path
}
- http.Redirect(w, r, url, http.StatusMovedPermanently)
+ http.Redirect(w, r, url, http.StatusPermanentRedirect)
} else {
writeJsonError(w, r, http.StatusNotFound, fmt.Errorf("volume id %s not found: %s", vid, location.Error))
}
@@ -124,19 +125,19 @@ func (ms *MasterServer) selfUrl(r *http.Request) string {
}
func (ms *MasterServer) submitFromMasterServerHandler(w http.ResponseWriter, r *http.Request) {
if ms.Topo.IsLeader() {
- submitForClientHandler(w, r, ms.selfUrl(r), ms.grpcDialOption)
+ submitForClientHandler(w, r, func() string { return ms.selfUrl(r) }, ms.grpcDialOption)
} else {
masterUrl, err := ms.Topo.Leader()
if err != nil {
writeJsonError(w, r, http.StatusInternalServerError, err)
} else {
- submitForClientHandler(w, r, masterUrl, ms.grpcDialOption)
+ submitForClientHandler(w, r, func() string { return masterUrl }, ms.grpcDialOption)
}
}
}
func (ms *MasterServer) HasWritableVolume(option *topology.VolumeGrowOption) bool {
- vl := ms.Topo.GetVolumeLayout(option.Collection, option.ReplicaPlacement, option.Ttl)
+ vl := ms.Topo.GetVolumeLayout(option.Collection, option.ReplicaPlacement, option.Ttl, option.DiskType)
return vl.GetActiveVolumeCount(option) > 0
}
@@ -157,6 +158,7 @@ func (ms *MasterServer) getVolumeGrowOption(r *http.Request) (*topology.VolumeGr
if err != nil {
return nil, err
}
+ diskType := types.ToDiskType(r.FormValue("disk"))
preallocate := ms.preallocateSize
if r.FormValue("preallocate") != "" {
@@ -169,6 +171,7 @@ func (ms *MasterServer) getVolumeGrowOption(r *http.Request) (*topology.VolumeGr
Collection: r.FormValue("collection"),
ReplicaPlacement: replicaPlacement,
Ttl: ttl,
+ DiskType: diskType,
Prealloacte: preallocate,
DataCenter: r.FormValue("dataCenter"),
Rack: r.FormValue("rack"),
diff --git a/weed/server/master_server_handlers_ui.go b/weed/server/master_server_handlers_ui.go
index f241df87f..3822c6113 100644
--- a/weed/server/master_server_handlers_ui.go
+++ b/weed/server/master_server_handlers_ui.go
@@ -2,6 +2,7 @@ package weed_server
import (
"net/http"
+ "time"
"github.com/chrislusf/raft"
ui "github.com/chrislusf/seaweedfs/weed/server/master_ui"
@@ -11,19 +12,21 @@ import (
func (ms *MasterServer) uiStatusHandler(w http.ResponseWriter, r *http.Request) {
infos := make(map[string]interface{})
- infos["Version"] = util.VERSION
+ infos["Up Time"] = time.Now().Sub(startTime).String()
args := struct {
- Version string
- Topology interface{}
- RaftServer raft.Server
- Stats map[string]interface{}
- Counters *stats.ServerStats
+ Version string
+ Topology interface{}
+ RaftServer raft.Server
+ Stats map[string]interface{}
+ Counters *stats.ServerStats
+ VolumeSizeLimitMB uint
}{
- util.VERSION,
+ util.Version(),
ms.Topo.ToMap(),
ms.Topo.RaftServer,
infos,
serverStats,
+ ms.option.VolumeSizeLimitMB,
}
ui.StatusTpl.Execute(w, args)
}
diff --git a/weed/server/master_ui/templates.go b/weed/server/master_ui/templates.go
index b674e3f82..31b6353e9 100644
--- a/weed/server/master_ui/templates.go
+++ b/weed/server/master_ui/templates.go
@@ -22,9 +22,13 @@ var StatusTpl = template.Must(template.New("status").Parse(`<!DOCTYPE html>
<div class="row">
<div class="col-sm-6">
<h2>Cluster status</h2>
- <table class="table">
+ <table class="table table-condensed table-striped">
<tbody>
<tr>
+ <th>Volume Size Limit</th>
+ <td>{{ .VolumeSizeLimitMB }}MB</td>
+ </tr>
+ <tr>
<th>Free</th>
<td>{{ .Topology.Free }}</td>
</tr>
@@ -38,8 +42,8 @@ var StatusTpl = template.Must(template.New("status").Parse(`<!DOCTYPE html>
<td><a href="http://{{ .Leader }}">{{ .Leader }}</a></td>
</tr>
<tr>
- <td class="col-sm-2 field-label"><label>Other Masters:</label></td>
- <td class="col-sm-10"><ul class="list-unstyled">
+ <th>Other Masters</th>
+ <td class="col-sm-5"><ul class="list-unstyled">
{{ range $k, $p := .Peers }}
<li><a href="http://{{ $p.Name }}/ui/index.html">{{ $p.Name }}</a></li>
{{ end }}
@@ -76,6 +80,7 @@ var StatusTpl = template.Must(template.New("status").Parse(`<!DOCTYPE html>
<th>Rack</th>
<th>RemoteAddr</th>
<th>#Volumes</th>
+ <th>Volume Ids</th>
<th>#ErasureCodingShards</th>
<th>Max</th>
</tr>
@@ -87,8 +92,13 @@ var StatusTpl = template.Must(template.New("status").Parse(`<!DOCTYPE html>
<tr>
<td><code>{{ $dc.Id }}</code></td>
<td>{{ $rack.Id }}</td>
- <td><a href="http://{{ $dn.Url }}/ui/index.html">{{ $dn.Url }}</a></td>
+ <td><a href="http://{{ $dn.Url }}/ui/index.html">{{ $dn.Url }}</a>
+ {{ if ne $dn.PublicUrl $dn.Url }}
+ / <a href="http://{{ $dn.PublicUrl }}/ui/index.html">{{ $dn.PublicUrl }}</a>
+ {{ end }}
+ </td>
<td>{{ $dn.Volumes }}</td>
+ <td>{{ $dn.VolumeIds}}</td>
<td>{{ $dn.EcShards }}</td>
<td>{{ $dn.Max }}</td>
</tr>
diff --git a/weed/server/raft_server.go b/weed/server/raft_server.go
index 53289f1c1..85841e409 100644
--- a/weed/server/raft_server.go
+++ b/weed/server/raft_server.go
@@ -2,16 +2,18 @@ package weed_server
import (
"encoding/json"
- "github.com/chrislusf/seaweedfs/weed/util"
- "google.golang.org/grpc"
- "io/ioutil"
+ "math/rand"
"os"
"path"
- "reflect"
"sort"
"time"
+ "google.golang.org/grpc"
+
+ "github.com/chrislusf/seaweedfs/weed/pb"
+
"github.com/chrislusf/raft"
+
"github.com/chrislusf/seaweedfs/weed/glog"
"github.com/chrislusf/seaweedfs/weed/topology"
)
@@ -25,7 +27,31 @@ type RaftServer struct {
*raft.GrpcServer
}
-func NewRaftServer(grpcDialOption grpc.DialOption, peers []string, serverAddr, dataDir string, topo *topology.Topology, pulseSeconds int) *RaftServer {
+type StateMachine struct {
+ raft.StateMachine
+ topo *topology.Topology
+}
+
+func (s StateMachine) Save() ([]byte, error) {
+ state := topology.MaxVolumeIdCommand{
+ MaxVolumeId: s.topo.GetMaxVolumeId(),
+ }
+ glog.V(1).Infof("Save raft state %+v", state)
+ return json.Marshal(state)
+}
+
+func (s StateMachine) Recovery(data []byte) error {
+ state := topology.MaxVolumeIdCommand{}
+ err := json.Unmarshal(data, &state)
+ if err != nil {
+ return err
+ }
+ glog.V(1).Infof("Recovery raft state %+v", state)
+ s.topo.UpAdjustMaxVolumeId(state.MaxVolumeId)
+ return nil
+}
+
+func NewRaftServer(grpcDialOption grpc.DialOption, peers []string, serverAddr, dataDir string, topo *topology.Topology, raftResumeState bool) (*RaftServer, error) {
s := &RaftServer{
peers: peers,
serverAddr: serverAddr,
@@ -43,47 +69,66 @@ func NewRaftServer(grpcDialOption grpc.DialOption, peers []string, serverAddr, d
transporter := raft.NewGrpcTransporter(grpcDialOption)
glog.V(0).Infof("Starting RaftServer with %v", serverAddr)
- // Clear old cluster configurations if peers are changed
- if oldPeers, changed := isPeersChanged(s.dataDir, serverAddr, s.peers); changed {
- glog.V(0).Infof("Peers Change: %v => %v", oldPeers, s.peers)
+ if !raftResumeState {
+ // always clear previous metadata
os.RemoveAll(path.Join(s.dataDir, "conf"))
os.RemoveAll(path.Join(s.dataDir, "log"))
os.RemoveAll(path.Join(s.dataDir, "snapshot"))
}
+ if err := os.MkdirAll(path.Join(s.dataDir, "snapshot"), 0600); err != nil {
+ return nil, err
+ }
- s.raftServer, err = raft.NewServer(s.serverAddr, s.dataDir, transporter, nil, topo, "")
+ stateMachine := StateMachine{topo: topo}
+ s.raftServer, err = raft.NewServer(s.serverAddr, s.dataDir, transporter, stateMachine, topo, "")
if err != nil {
glog.V(0).Infoln(err)
- return nil
+ return nil, err
+ }
+ s.raftServer.SetHeartbeatInterval(time.Duration(300+rand.Intn(150)) * time.Millisecond)
+ s.raftServer.SetElectionTimeout(10 * time.Second)
+ if err := s.raftServer.LoadSnapshot(); err != nil {
+ return nil, err
+ }
+ if err := s.raftServer.Start(); err != nil {
+ return nil, err
}
- s.raftServer.SetHeartbeatInterval(500 * time.Millisecond)
- s.raftServer.SetElectionTimeout(time.Duration(pulseSeconds) * 500 * time.Millisecond)
- s.raftServer.Start()
for _, peer := range s.peers {
- s.raftServer.AddPeer(peer, util.ServerToGrpcAddress(peer))
+ if err := s.raftServer.AddPeer(peer, pb.ServerToGrpcAddress(peer)); err != nil {
+ return nil, err
+ }
+ }
+
+ // Remove deleted peers
+ for existsPeerName := range s.raftServer.Peers() {
+ exists, existingPeer := false, ""
+ for _, peer := range s.peers {
+ if pb.ServerToGrpcAddress(peer) == existsPeerName {
+ exists, existingPeer = true, peer
+ break
+ }
+ }
+ if exists {
+ if err := s.raftServer.RemovePeer(existsPeerName); err != nil {
+ glog.V(0).Infoln(err)
+ return nil, err
+ } else {
+ glog.V(0).Infof("removing old peer %s", existingPeer)
+ }
+ }
}
s.GrpcServer = raft.NewGrpcServer(s.raftServer)
if s.raftServer.IsLogEmpty() && isTheFirstOne(serverAddr, s.peers) {
// Initialize the server by joining itself.
- glog.V(0).Infoln("Initializing new cluster")
-
- _, err := s.raftServer.Do(&raft.DefaultJoinCommand{
- Name: s.raftServer.Name(),
- ConnectionString: util.ServerToGrpcAddress(s.serverAddr),
- })
-
- if err != nil {
- glog.V(0).Infoln(err)
- return nil
- }
+ // s.DoJoinCommand()
}
glog.V(0).Infof("current cluster leader: %v", s.raftServer.Leader())
- return s
+ return s, nil
}
func (s *RaftServer) Peers() (members []string) {
@@ -96,34 +141,6 @@ func (s *RaftServer) Peers() (members []string) {
return
}
-func isPeersChanged(dir string, self string, peers []string) (oldPeers []string, changed bool) {
- confPath := path.Join(dir, "conf")
- // open conf file
- b, err := ioutil.ReadFile(confPath)
- if err != nil {
- return oldPeers, true
- }
- conf := &raft.Config{}
- if err = json.Unmarshal(b, conf); err != nil {
- return oldPeers, true
- }
-
- for _, p := range conf.Peers {
- oldPeers = append(oldPeers, p.Name)
- }
- oldPeers = append(oldPeers, self)
-
- if len(peers) == 0 && len(oldPeers) <= 1 {
- return oldPeers, false
- }
-
- sort.Strings(peers)
- sort.Strings(oldPeers)
-
- return oldPeers, !reflect.DeepEqual(peers, oldPeers)
-
-}
-
func isTheFirstOne(self string, peers []string) bool {
sort.Strings(peers)
if len(peers) <= 0 {
@@ -131,3 +148,16 @@ func isTheFirstOne(self string, peers []string) bool {
}
return self == peers[0]
}
+
+func (s *RaftServer) DoJoinCommand() {
+
+ glog.V(0).Infoln("Initializing new cluster")
+
+ if _, err := s.raftServer.Do(&raft.DefaultJoinCommand{
+ Name: s.raftServer.Name(),
+ ConnectionString: pb.ServerToGrpcAddress(s.serverAddr),
+ }); err != nil {
+ glog.Errorf("fail to send join command: %v", err)
+ }
+
+}
diff --git a/weed/server/raft_server_handlers.go b/weed/server/raft_server_handlers.go
index fd38cb977..252570eab 100644
--- a/weed/server/raft_server_handlers.go
+++ b/weed/server/raft_server_handlers.go
@@ -1,20 +1,24 @@
package weed_server
import (
+ "github.com/chrislusf/seaweedfs/weed/storage/needle"
"net/http"
)
type ClusterStatusResult struct {
- IsLeader bool `json:"IsLeader,omitempty"`
- Leader string `json:"Leader,omitempty"`
- Peers []string `json:"Peers,omitempty"`
+ IsLeader bool `json:"IsLeader,omitempty"`
+ Leader string `json:"Leader,omitempty"`
+ Peers []string `json:"Peers,omitempty"`
+ MaxVolumeId needle.VolumeId `json:"MaxVolumeId,omitempty"`
}
func (s *RaftServer) StatusHandler(w http.ResponseWriter, r *http.Request) {
ret := ClusterStatusResult{
- IsLeader: s.topo.IsLeader(),
- Peers: s.Peers(),
+ IsLeader: s.topo.IsLeader(),
+ Peers: s.Peers(),
+ MaxVolumeId: s.topo.GetMaxVolumeId(),
}
+
if leader, e := s.topo.Leader(); e == nil {
ret.Leader = leader
}
diff --git a/weed/server/volume_grpc_admin.go b/weed/server/volume_grpc_admin.go
index c631d2535..2bc108a23 100644
--- a/weed/server/volume_grpc_admin.go
+++ b/weed/server/volume_grpc_admin.go
@@ -2,10 +2,15 @@ package weed_server
import (
"context"
+ "fmt"
+ "path/filepath"
"github.com/chrislusf/seaweedfs/weed/glog"
"github.com/chrislusf/seaweedfs/weed/pb/volume_server_pb"
+ "github.com/chrislusf/seaweedfs/weed/stats"
"github.com/chrislusf/seaweedfs/weed/storage/needle"
+ "github.com/chrislusf/seaweedfs/weed/storage/super_block"
+ "github.com/chrislusf/seaweedfs/weed/storage/types"
)
func (vs *VolumeServer) DeleteCollection(ctx context.Context, req *volume_server_pb.DeleteCollectionRequest) (*volume_server_pb.DeleteCollectionResponse, error) {
@@ -36,6 +41,7 @@ func (vs *VolumeServer) AllocateVolume(ctx context.Context, req *volume_server_p
req.Ttl,
req.Preallocate,
req.MemoryMapMaxSizeMb,
+ types.ToDiskType(req.DiskType),
)
if err != nil {
@@ -96,6 +102,41 @@ func (vs *VolumeServer) VolumeDelete(ctx context.Context, req *volume_server_pb.
}
+func (vs *VolumeServer) VolumeConfigure(ctx context.Context, req *volume_server_pb.VolumeConfigureRequest) (*volume_server_pb.VolumeConfigureResponse, error) {
+
+ resp := &volume_server_pb.VolumeConfigureResponse{}
+
+ // check replication format
+ if _, err := super_block.NewReplicaPlacementFromString(req.Replication); err != nil {
+ resp.Error = fmt.Sprintf("volume configure replication %v: %v", req, err)
+ return resp, nil
+ }
+
+ // unmount
+ if err := vs.store.UnmountVolume(needle.VolumeId(req.VolumeId)); err != nil {
+ glog.Errorf("volume configure unmount %v: %v", req, err)
+ resp.Error = fmt.Sprintf("volume configure unmount %v: %v", req, err)
+ return resp, nil
+ }
+
+ // modify the volume info file
+ if err := vs.store.ConfigureVolume(needle.VolumeId(req.VolumeId), req.Replication); err != nil {
+ glog.Errorf("volume configure %v: %v", req, err)
+ resp.Error = fmt.Sprintf("volume configure %v: %v", req, err)
+ return resp, nil
+ }
+
+ // mount
+ if err := vs.store.MountVolume(needle.VolumeId(req.VolumeId)); err != nil {
+ glog.Errorf("volume configure mount %v: %v", req, err)
+ resp.Error = fmt.Sprintf("volume configure mount %v: %v", req, err)
+ return resp, nil
+ }
+
+ return resp, nil
+
+}
+
func (vs *VolumeServer) VolumeMarkReadonly(ctx context.Context, req *volume_server_pb.VolumeMarkReadonlyRequest) (*volume_server_pb.VolumeMarkReadonlyResponse, error) {
resp := &volume_server_pb.VolumeMarkReadonlyResponse{}
@@ -109,5 +150,100 @@ func (vs *VolumeServer) VolumeMarkReadonly(ctx context.Context, req *volume_serv
}
return resp, err
+}
+
+func (vs *VolumeServer) VolumeMarkWritable(ctx context.Context, req *volume_server_pb.VolumeMarkWritableRequest) (*volume_server_pb.VolumeMarkWritableResponse, error) {
+
+ resp := &volume_server_pb.VolumeMarkWritableResponse{}
+
+ err := vs.store.MarkVolumeWritable(needle.VolumeId(req.VolumeId))
+
+ if err != nil {
+ glog.Errorf("volume mark writable %v: %v", req, err)
+ } else {
+ glog.V(2).Infof("volume mark writable %v", req)
+ }
+
+ return resp, err
+}
+
+func (vs *VolumeServer) VolumeStatus(ctx context.Context, req *volume_server_pb.VolumeStatusRequest) (*volume_server_pb.VolumeStatusResponse, error) {
+
+ resp := &volume_server_pb.VolumeStatusResponse{}
+
+ v := vs.store.GetVolume(needle.VolumeId(req.VolumeId))
+ if v == nil {
+ return nil, fmt.Errorf("not found volume id %d", req.VolumeId)
+ }
+
+ resp.IsReadOnly = v.IsReadOnly()
+
+ return resp, nil
+}
+
+func (vs *VolumeServer) VolumeServerStatus(ctx context.Context, req *volume_server_pb.VolumeServerStatusRequest) (*volume_server_pb.VolumeServerStatusResponse, error) {
+
+ resp := &volume_server_pb.VolumeServerStatusResponse{}
+
+ for _, loc := range vs.store.Locations {
+ if dir, e := filepath.Abs(loc.Directory); e == nil {
+ resp.DiskStatuses = append(resp.DiskStatuses, stats.NewDiskStatus(dir))
+ }
+ }
+
+ resp.MemoryStatus = stats.MemStat()
+
+ return resp, nil
+
+}
+
+func (vs *VolumeServer) VolumeServerLeave(ctx context.Context, req *volume_server_pb.VolumeServerLeaveRequest) (*volume_server_pb.VolumeServerLeaveResponse, error) {
+
+ resp := &volume_server_pb.VolumeServerLeaveResponse{}
+
+ vs.StopHeartbeat()
+
+ return resp, nil
+
+}
+
+func (vs *VolumeServer) VolumeNeedleStatus(ctx context.Context, req *volume_server_pb.VolumeNeedleStatusRequest) (*volume_server_pb.VolumeNeedleStatusResponse, error) {
+
+ resp := &volume_server_pb.VolumeNeedleStatusResponse{}
+
+ volumeId := needle.VolumeId(req.VolumeId)
+
+ n := &needle.Needle{
+ Id: types.NeedleId(req.NeedleId),
+ }
+
+ var count int
+ var err error
+ hasVolume := vs.store.HasVolume(volumeId)
+ if !hasVolume {
+ _, hasEcVolume := vs.store.FindEcVolume(volumeId)
+ if !hasEcVolume {
+ return nil, fmt.Errorf("volume not found %d", req.VolumeId)
+ }
+ count, err = vs.store.ReadEcShardNeedle(volumeId, n)
+ } else {
+ count, err = vs.store.ReadVolumeNeedle(volumeId, n, nil)
+ }
+ if err != nil {
+ return nil, err
+ }
+ if count < 0 {
+ return nil, fmt.Errorf("needle not found %d", n.Id)
+ }
+
+ resp.NeedleId = uint64(n.Id)
+ resp.Cookie = uint32(n.Cookie)
+ resp.Size = uint32(n.Size)
+ resp.LastModified = n.LastModified
+ resp.Crc = n.Checksum.Value()
+ if n.HasTtl() {
+ resp.Ttl = n.Ttl.String()
+ }
+ return resp, nil
}
diff --git a/weed/server/volume_grpc_batch_delete.go b/weed/server/volume_grpc_batch_delete.go
index fdb7937d2..8e84dc2a8 100644
--- a/weed/server/volume_grpc_batch_delete.go
+++ b/weed/server/volume_grpc_batch_delete.go
@@ -8,6 +8,7 @@ import (
"github.com/chrislusf/seaweedfs/weed/operation"
"github.com/chrislusf/seaweedfs/weed/pb/volume_server_pb"
"github.com/chrislusf/seaweedfs/weed/storage/needle"
+ "github.com/chrislusf/seaweedfs/weed/storage/types"
)
func (vs *VolumeServer) BatchDelete(ctx context.Context, req *volume_server_pb.BatchDeleteRequest) (*volume_server_pb.BatchDeleteResponse, error) {
@@ -28,16 +29,34 @@ func (vs *VolumeServer) BatchDelete(ctx context.Context, req *volume_server_pb.B
n := new(needle.Needle)
volumeId, _ := needle.NewVolumeId(vid)
- n.ParsePath(id_cookie)
-
- cookie := n.Cookie
- if _, err := vs.store.ReadVolumeNeedle(volumeId, n); err != nil {
- resp.Results = append(resp.Results, &volume_server_pb.DeleteResult{
- FileId: fid,
- Status: http.StatusNotFound,
- Error: err.Error(),
- })
- continue
+ if req.SkipCookieCheck {
+ n.Id, err = types.ParseNeedleId(id_cookie)
+ if err != nil {
+ resp.Results = append(resp.Results, &volume_server_pb.DeleteResult{
+ FileId: fid,
+ Status: http.StatusBadRequest,
+ Error: err.Error()})
+ continue
+ }
+ } else {
+ n.ParsePath(id_cookie)
+ cookie := n.Cookie
+ if _, err := vs.store.ReadVolumeNeedle(volumeId, n, nil); err != nil {
+ resp.Results = append(resp.Results, &volume_server_pb.DeleteResult{
+ FileId: fid,
+ Status: http.StatusNotFound,
+ Error: err.Error(),
+ })
+ continue
+ }
+ if n.Cookie != cookie {
+ resp.Results = append(resp.Results, &volume_server_pb.DeleteResult{
+ FileId: fid,
+ Status: http.StatusBadRequest,
+ Error: "File Random Cookie does not match.",
+ })
+ break
+ }
}
if n.IsChunkedManifest() {
@@ -49,14 +68,6 @@ func (vs *VolumeServer) BatchDelete(ctx context.Context, req *volume_server_pb.B
continue
}
- if n.Cookie != cookie {
- resp.Results = append(resp.Results, &volume_server_pb.DeleteResult{
- FileId: fid,
- Status: http.StatusBadRequest,
- Error: "File Random Cookie does not match.",
- })
- break
- }
n.LastModified = now
if size, err := vs.store.DeleteVolumeNeedle(volumeId, n); err != nil {
resp.Results = append(resp.Results, &volume_server_pb.DeleteResult{
@@ -68,7 +79,7 @@ func (vs *VolumeServer) BatchDelete(ctx context.Context, req *volume_server_pb.B
resp.Results = append(resp.Results, &volume_server_pb.DeleteResult{
FileId: fid,
Status: http.StatusAccepted,
- Size: size},
+ Size: uint32(size)},
)
}
}
diff --git a/weed/server/volume_grpc_client_to_master.go b/weed/server/volume_grpc_client_to_master.go
index 6038752d2..f8875169f 100644
--- a/weed/server/volume_grpc_client_to_master.go
+++ b/weed/server/volume_grpc_client_to_master.go
@@ -2,59 +2,104 @@ package weed_server
import (
"fmt"
- "net"
+ "github.com/chrislusf/seaweedfs/weed/operation"
"time"
+ "google.golang.org/grpc"
+
+ "github.com/chrislusf/seaweedfs/weed/pb"
"github.com/chrislusf/seaweedfs/weed/security"
"github.com/chrislusf/seaweedfs/weed/storage/backend"
"github.com/chrislusf/seaweedfs/weed/storage/erasure_coding"
- "github.com/spf13/viper"
- "google.golang.org/grpc"
+
+ "golang.org/x/net/context"
"github.com/chrislusf/seaweedfs/weed/glog"
"github.com/chrislusf/seaweedfs/weed/pb/master_pb"
"github.com/chrislusf/seaweedfs/weed/util"
- "golang.org/x/net/context"
)
func (vs *VolumeServer) GetMaster() string {
return vs.currentMaster
}
+
+func (vs *VolumeServer) checkWithMaster() (err error) {
+ isConnected := false
+ for !isConnected {
+ for _, master := range vs.SeedMasterNodes {
+ err = operation.WithMasterServerClient(master, vs.grpcDialOption, func(masterClient master_pb.SeaweedClient) error {
+ resp, err := masterClient.GetMasterConfiguration(context.Background(), &master_pb.GetMasterConfigurationRequest{})
+ if err != nil {
+ return fmt.Errorf("get master %s configuration: %v", master, err)
+ }
+ vs.metricsAddress, vs.metricsIntervalSec = resp.MetricsAddress, int(resp.MetricsIntervalSeconds)
+ backend.LoadFromPbStorageBackends(resp.StorageBackends)
+ return nil
+ })
+ if err == nil {
+ return
+ } else {
+ glog.V(0).Infof("checkWithMaster %s: %v", master, err)
+ }
+ }
+ time.Sleep(1790 * time.Millisecond)
+ }
+ return
+}
+
func (vs *VolumeServer) heartbeat() {
glog.V(0).Infof("Volume server start with seed master nodes: %v", vs.SeedMasterNodes)
vs.store.SetDataCenter(vs.dataCenter)
vs.store.SetRack(vs.rack)
- grpcDialOption := security.LoadClientTLS(viper.Sub("grpc"), "volume")
+ grpcDialOption := security.LoadClientTLS(util.GetViper(), "grpc.volume")
var err error
var newLeader string
- for {
+ for vs.isHeartbeating {
for _, master := range vs.SeedMasterNodes {
if newLeader != "" {
+ // the new leader may actually is the same master
+ // need to wait a bit before adding itself
+ time.Sleep(3 * time.Second)
master = newLeader
}
- masterGrpcAddress, parseErr := util.ParseServerToGrpcAddress(master)
+ masterGrpcAddress, parseErr := pb.ParseServerToGrpcAddress(master)
if parseErr != nil {
glog.V(0).Infof("failed to parse master grpc %v: %v", masterGrpcAddress, parseErr)
continue
}
vs.store.MasterAddress = master
- newLeader, err = vs.doHeartbeat(context.Background(), master, masterGrpcAddress, grpcDialOption, time.Duration(vs.pulseSeconds)*time.Second)
+ newLeader, err = vs.doHeartbeat(master, masterGrpcAddress, grpcDialOption, time.Duration(vs.pulseSeconds)*time.Second)
if err != nil {
glog.V(0).Infof("heartbeat error: %v", err)
time.Sleep(time.Duration(vs.pulseSeconds) * time.Second)
newLeader = ""
vs.store.MasterAddress = ""
}
+ if !vs.isHeartbeating {
+ break
+ }
}
}
}
-func (vs *VolumeServer) doHeartbeat(ctx context.Context, masterNode, masterGrpcAddress string, grpcDialOption grpc.DialOption, sleepInterval time.Duration) (newLeader string, err error) {
+func (vs *VolumeServer) StopHeartbeat() (isAlreadyStopping bool) {
+ if !vs.isHeartbeating {
+ return true
+ }
+ vs.isHeartbeating = false
+ close(vs.stopChan)
+ return false
+}
+
+func (vs *VolumeServer) doHeartbeat(masterNode, masterGrpcAddress string, grpcDialOption grpc.DialOption, sleepInterval time.Duration) (newLeader string, err error) {
+
+ ctx, cancel := context.WithCancel(context.Background())
+ defer cancel()
- grpcConection, err := util.GrpcDial(ctx, masterGrpcAddress, grpcDialOption)
+ grpcConection, err := pb.GrpcDial(ctx, masterGrpcAddress, grpcDialOption)
if err != nil {
return "", fmt.Errorf("fail to dial %s : %v", masterNode, err)
}
@@ -78,22 +123,20 @@ func (vs *VolumeServer) doHeartbeat(ctx context.Context, masterNode, masterGrpcA
doneChan <- err
return
}
- if in.GetVolumeSizeLimit() != 0 {
+ if in.GetVolumeSizeLimit() != 0 && vs.store.GetVolumeSizeLimit() != in.GetVolumeSizeLimit() {
vs.store.SetVolumeSizeLimit(in.GetVolumeSizeLimit())
- }
- if in.GetLeader() != "" && masterNode != in.GetLeader() && !isSameIP(in.GetLeader(), masterNode) {
- glog.V(0).Infof("Volume Server found a new master newLeader: %v instead of %v", in.GetLeader(), masterNode)
+ if vs.store.MaybeAdjustVolumeMax() {
+ if err = stream.Send(vs.store.CollectHeartbeat()); err != nil {
+ glog.V(0).Infof("Volume Server Failed to talk with master %s: %v", vs.currentMaster, err)
+ }
+ }
+ }
+ if in.GetLeader() != "" && vs.currentMaster != in.GetLeader() {
+ glog.V(0).Infof("Volume Server found a new master newLeader: %v instead of %v", in.GetLeader(), vs.currentMaster)
newLeader = in.GetLeader()
doneChan <- nil
return
}
- if in.GetMetricsAddress() != "" && vs.MetricsAddress != in.GetMetricsAddress() {
- vs.MetricsAddress = in.GetMetricsAddress()
- vs.MetricsIntervalSec = int(in.GetMetricsIntervalSeconds())
- }
- if len(in.StorageBackends) > 0 {
- backend.LoadFromPbStorageBackends(in.StorageBackends)
- }
}
}()
@@ -160,6 +203,7 @@ func (vs *VolumeServer) doHeartbeat(ctx context.Context, masterNode, masterGrpcA
}
case <-volumeTickChan:
glog.V(4).Infof("volume server %s:%d heartbeat", vs.store.Ip, vs.store.Port)
+ vs.store.MaybeAdjustVolumeMax()
if err = stream.Send(vs.store.CollectHeartbeat()); err != nil {
glog.V(0).Infof("Volume Server Failed to talk with master %s: %v", masterNode, err)
return "", err
@@ -172,19 +216,24 @@ func (vs *VolumeServer) doHeartbeat(ctx context.Context, masterNode, masterGrpcA
}
case err = <-doneChan:
return
+ case <-vs.stopChan:
+ var volumeMessages []*master_pb.VolumeInformationMessage
+ emptyBeat := &master_pb.Heartbeat{
+ Ip: vs.store.Ip,
+ Port: uint32(vs.store.Port),
+ PublicUrl: vs.store.PublicUrl,
+ MaxFileKey: uint64(0),
+ DataCenter: vs.store.GetDataCenter(),
+ Rack: vs.store.GetRack(),
+ Volumes: volumeMessages,
+ HasNoVolumes: len(volumeMessages) == 0,
+ }
+ glog.V(1).Infof("volume server %s:%d stops and deletes all volumes", vs.store.Ip, vs.store.Port)
+ if err = stream.Send(emptyBeat); err != nil {
+ glog.V(0).Infof("Volume Server Failed to update to master %s: %v", masterNode, err)
+ return "", err
+ }
+ return
}
}
}
-
-func isSameIP(ip string, host string) bool {
- ips, err := net.LookupIP(host)
- if err != nil {
- return false
- }
- for _, t := range ips {
- if ip == t.String() {
- return true
- }
- }
- return false
-}
diff --git a/weed/server/volume_grpc_copy.go b/weed/server/volume_grpc_copy.go
index a54a1e343..cf9f9f777 100644
--- a/weed/server/volume_grpc_copy.go
+++ b/weed/server/volume_grpc_copy.go
@@ -3,10 +3,11 @@ package weed_server
import (
"context"
"fmt"
+ "github.com/chrislusf/seaweedfs/weed/storage/types"
"io"
+ "io/ioutil"
"math"
"os"
- "path"
"time"
"github.com/chrislusf/seaweedfs/weed/glog"
@@ -20,17 +21,20 @@ import (
const BufferSizeLimit = 1024 * 1024 * 2
-// VolumeCopy copy the .idx .dat files, and mount the volume
+// VolumeCopy copy the .idx .dat .vif files, and mount the volume
func (vs *VolumeServer) VolumeCopy(ctx context.Context, req *volume_server_pb.VolumeCopyRequest) (*volume_server_pb.VolumeCopyResponse, error) {
v := vs.store.GetVolume(needle.VolumeId(req.VolumeId))
if v != nil {
- return nil, fmt.Errorf("volume %d already exists", req.VolumeId)
- }
- location := vs.store.FindFreeLocation()
- if location == nil {
- return nil, fmt.Errorf("no space left")
+ glog.V(0).Infof("volume %d already exists. deleted before copying...", req.VolumeId)
+
+ err := vs.store.DeleteVolume(needle.VolumeId(req.VolumeId))
+ if err != nil {
+ return nil, fmt.Errorf("failed to delete existing volume %d: %v", req.VolumeId, err)
+ }
+
+ glog.V(0).Infof("deleted existing volume %d before copying.", req.VolumeId)
}
// the master will not start compaction for read-only volumes, so it is safe to just copy files directly
@@ -40,10 +44,10 @@ func (vs *VolumeServer) VolumeCopy(ctx context.Context, req *volume_server_pb.Vo
// send .dat file
// confirm size and timestamp
var volFileInfoResp *volume_server_pb.ReadVolumeFileStatusResponse
- var volumeFileName, idxFileName, datFileName string
+ var dataBaseFileName, indexBaseFileName, idxFileName, datFileName string
err := operation.WithVolumeServerClient(req.SourceDataNode, vs.grpcDialOption, func(client volume_server_pb.VolumeServerClient) error {
var err error
- volFileInfoResp, err = client.ReadVolumeFileStatus(ctx,
+ volFileInfoResp, err = client.ReadVolumeFileStatus(context.Background(),
&volume_server_pb.ReadVolumeFileStatusRequest{
VolumeId: req.VolumeId,
})
@@ -51,33 +55,55 @@ func (vs *VolumeServer) VolumeCopy(ctx context.Context, req *volume_server_pb.Vo
return fmt.Errorf("read volume file status failed, %v", err)
}
- volumeFileName = storage.VolumeFileName(location.Directory, volFileInfoResp.Collection, int(req.VolumeId))
+ diskType := volFileInfoResp.DiskType
+ if req.DiskType != "" {
+ diskType = req.DiskType
+ }
+ location := vs.store.FindFreeLocation(types.ToDiskType(diskType))
+ if location == nil {
+ return fmt.Errorf("no space left for disk type %s", types.ToDiskType(diskType).ReadableString())
+ }
+
+ dataBaseFileName = storage.VolumeFileName(location.Directory, volFileInfoResp.Collection, int(req.VolumeId))
+ indexBaseFileName = storage.VolumeFileName(location.IdxDirectory, volFileInfoResp.Collection, int(req.VolumeId))
+
+ ioutil.WriteFile(dataBaseFileName+".note", []byte(fmt.Sprintf("copying from %s", req.SourceDataNode)), 0755)
// println("source:", volFileInfoResp.String())
- // copy ecx file
- if err := vs.doCopyFile(ctx, client, false, req.Collection, req.VolumeId, volFileInfoResp.CompactionRevision, volFileInfoResp.IdxFileSize, volumeFileName, ".idx", false, false); err != nil {
+ if err := vs.doCopyFile(client, false, req.Collection, req.VolumeId, volFileInfoResp.CompactionRevision, volFileInfoResp.DatFileSize, dataBaseFileName, ".dat", false, true); err != nil {
+ return err
+ }
+
+ if err := vs.doCopyFile(client, false, req.Collection, req.VolumeId, volFileInfoResp.CompactionRevision, volFileInfoResp.IdxFileSize, indexBaseFileName, ".idx", false, false); err != nil {
return err
}
- if err := vs.doCopyFile(ctx, client, false, req.Collection, req.VolumeId, volFileInfoResp.CompactionRevision, volFileInfoResp.DatFileSize, volumeFileName, ".dat", false, true); err != nil {
+ if err := vs.doCopyFile(client, false, req.Collection, req.VolumeId, volFileInfoResp.CompactionRevision, volFileInfoResp.DatFileSize, dataBaseFileName, ".vif", false, true); err != nil {
return err
}
+ os.Remove(dataBaseFileName + ".note")
+
return nil
})
- idxFileName = volumeFileName + ".idx"
- datFileName = volumeFileName + ".dat"
+ if err != nil {
+ return nil, err
+ }
+ if dataBaseFileName == "" {
+ return nil, fmt.Errorf("not found volume %d file", req.VolumeId)
+ }
+
+ idxFileName = indexBaseFileName + ".idx"
+ datFileName = dataBaseFileName + ".dat"
- if err != nil && volumeFileName != "" {
- if idxFileName != "" {
+ defer func() {
+ if err != nil && dataBaseFileName != "" {
os.Remove(idxFileName)
- }
- if datFileName != "" {
os.Remove(datFileName)
+ os.Remove(dataBaseFileName + ".vif")
}
- return nil, err
- }
+ }()
if err = checkCopyFiles(volFileInfoResp, idxFileName, datFileName); err != nil { // added by panyc16
return nil, err
@@ -94,10 +120,9 @@ func (vs *VolumeServer) VolumeCopy(ctx context.Context, req *volume_server_pb.Vo
}, err
}
-func (vs *VolumeServer) doCopyFile(ctx context.Context, client volume_server_pb.VolumeServerClient, isEcVolume bool, collection string, vid uint32,
- compactRevision uint32, stopOffset uint64, baseFileName, ext string, isAppend bool, ignoreSourceFileNotFound bool) error {
+func (vs *VolumeServer) doCopyFile(client volume_server_pb.VolumeServerClient, isEcVolume bool, collection string, vid, compactRevision uint32, stopOffset uint64, baseFileName, ext string, isAppend, ignoreSourceFileNotFound bool) error {
- copyFileClient, err := client.CopyFile(ctx, &volume_server_pb.CopyFileRequest{
+ copyFileClient, err := client.CopyFile(context.Background(), &volume_server_pb.CopyFileRequest{
VolumeId: vid,
Ext: ext,
CompactionRevision: compactRevision,
@@ -186,6 +211,7 @@ func (vs *VolumeServer) ReadVolumeFileStatus(ctx context.Context, req *volume_se
resp.FileCount = v.FileCount()
resp.CompactionRevision = uint32(v.CompactionRevision)
resp.Collection = v.Collection
+ resp.DiskType = string(v.DiskType())
return resp, nil
}
@@ -204,11 +230,15 @@ func (vs *VolumeServer) CopyFile(req *volume_server_pb.CopyFileRequest, stream v
if uint32(v.CompactionRevision) != req.CompactionRevision && req.CompactionRevision != math.MaxUint32 {
return fmt.Errorf("volume %d is compacted", req.VolumeId)
}
- fileName = v.FileName() + req.Ext
+ fileName = v.FileName(req.Ext)
} else {
baseFileName := erasure_coding.EcShardBaseFileName(req.Collection, int(req.VolumeId)) + req.Ext
for _, location := range vs.store.Locations {
- tName := path.Join(location.Directory, baseFileName)
+ tName := util.Join(location.Directory, baseFileName)
+ if util.FileExists(tName) {
+ fileName = tName
+ }
+ tName = util.Join(location.IdxDirectory, baseFileName)
if util.FileExists(tName) {
fileName = tName
}
diff --git a/weed/server/volume_grpc_copy_incremental.go b/weed/server/volume_grpc_copy_incremental.go
index 6d6c3daa3..82b143c3d 100644
--- a/weed/server/volume_grpc_copy_incremental.go
+++ b/weed/server/volume_grpc_copy_incremental.go
@@ -27,7 +27,7 @@ func (vs *VolumeServer) VolumeIncrementalCopy(req *volume_server_pb.VolumeIncrem
return nil
}
- startOffset := foundOffset.ToAcutalOffset()
+ startOffset := foundOffset.ToActualOffset()
buf := make([]byte, 1024*1024*2)
return sendFileContent(v.DataBackend, buf, startOffset, int64(stopOffset), stream)
diff --git a/weed/server/volume_grpc_erasure_coding.go b/weed/server/volume_grpc_erasure_coding.go
index 4bca9948e..452c2766e 100644
--- a/weed/server/volume_grpc_erasure_coding.go
+++ b/weed/server/volume_grpc_erasure_coding.go
@@ -8,7 +8,6 @@ import (
"math"
"os"
"path"
- "path/filepath"
"strings"
"github.com/chrislusf/seaweedfs/weed/glog"
@@ -27,7 +26,7 @@ import (
Steps to apply erasure coding to .dat .idx files
0. ensure the volume is readonly
1. client call VolumeEcShardsGenerate to generate the .ecx and .ec00 ~ .ec13 files
-2. client ask master for possible servers to hold the ec files, at least 4 servers
+2. client ask master for possible servers to hold the ec files
3. client call VolumeEcShardsCopy on above target servers to copy ec files from the source server
4. target servers report the new ec files to the master
5. master stores vid -> [14]*DataNode
@@ -38,26 +37,28 @@ Steps to apply erasure coding to .dat .idx files
// VolumeEcShardsGenerate generates the .ecx and .ec00 ~ .ec13 files
func (vs *VolumeServer) VolumeEcShardsGenerate(ctx context.Context, req *volume_server_pb.VolumeEcShardsGenerateRequest) (*volume_server_pb.VolumeEcShardsGenerateResponse, error) {
+ glog.V(0).Infof("VolumeEcShardsGenerate: %v", req)
+
v := vs.store.GetVolume(needle.VolumeId(req.VolumeId))
if v == nil {
return nil, fmt.Errorf("volume %d not found", req.VolumeId)
}
- baseFileName := v.FileName()
+ baseFileName := v.DataFileName()
if v.Collection != req.Collection {
return nil, fmt.Errorf("existing collection:%v unexpected input: %v", v.Collection, req.Collection)
}
- // write .ecx file
- if err := erasure_coding.WriteSortedFileFromIdx(baseFileName, ".ecx"); err != nil {
- return nil, fmt.Errorf("WriteSortedFileFromIdx %s: %v", baseFileName, err)
- }
-
// write .ec00 ~ .ec13 files
if err := erasure_coding.WriteEcFiles(baseFileName); err != nil {
return nil, fmt.Errorf("WriteEcFiles %s: %v", baseFileName, err)
}
+ // write .ecx file
+ if err := erasure_coding.WriteSortedFileFromIdx(v.IndexFileName(), ".ecx"); err != nil {
+ return nil, fmt.Errorf("WriteSortedFileFromIdx %s: %v", v.IndexFileName(), err)
+ }
+
// write .vif files
if err := pb.SaveVolumeInfo(baseFileName+".vif", &volume_server_pb.VolumeInfo{Version: uint32(v.Version())}); err != nil {
return nil, fmt.Errorf("WriteEcFiles %s: %v", baseFileName, err)
@@ -69,22 +70,25 @@ func (vs *VolumeServer) VolumeEcShardsGenerate(ctx context.Context, req *volume_
// VolumeEcShardsRebuild generates the any of the missing .ec00 ~ .ec13 files
func (vs *VolumeServer) VolumeEcShardsRebuild(ctx context.Context, req *volume_server_pb.VolumeEcShardsRebuildRequest) (*volume_server_pb.VolumeEcShardsRebuildResponse, error) {
+ glog.V(0).Infof("VolumeEcShardsRebuild: %v", req)
+
baseFileName := erasure_coding.EcShardBaseFileName(req.Collection, int(req.VolumeId))
var rebuiltShardIds []uint32
for _, location := range vs.store.Locations {
- if util.FileExists(path.Join(location.Directory, baseFileName+".ecx")) {
+ if util.FileExists(path.Join(location.IdxDirectory, baseFileName+".ecx")) {
// write .ec00 ~ .ec13 files
- baseFileName = path.Join(location.Directory, baseFileName)
- if generatedShardIds, err := erasure_coding.RebuildEcFiles(baseFileName); err != nil {
- return nil, fmt.Errorf("RebuildEcFiles %s: %v", baseFileName, err)
+ dataBaseFileName := path.Join(location.Directory, baseFileName)
+ if generatedShardIds, err := erasure_coding.RebuildEcFiles(dataBaseFileName); err != nil {
+ return nil, fmt.Errorf("RebuildEcFiles %s: %v", dataBaseFileName, err)
} else {
rebuiltShardIds = generatedShardIds
}
- if err := erasure_coding.RebuildEcxFile(baseFileName); err != nil {
- return nil, fmt.Errorf("RebuildEcxFile %s: %v", baseFileName, err)
+ indexBaseFileName := path.Join(location.IdxDirectory, baseFileName)
+ if err := erasure_coding.RebuildEcxFile(indexBaseFileName); err != nil {
+ return nil, fmt.Errorf("RebuildEcxFile %s: %v", dataBaseFileName, err)
}
break
@@ -99,18 +103,21 @@ func (vs *VolumeServer) VolumeEcShardsRebuild(ctx context.Context, req *volume_s
// VolumeEcShardsCopy copy the .ecx and some ec data slices
func (vs *VolumeServer) VolumeEcShardsCopy(ctx context.Context, req *volume_server_pb.VolumeEcShardsCopyRequest) (*volume_server_pb.VolumeEcShardsCopyResponse, error) {
- location := vs.store.FindFreeLocation()
+ glog.V(0).Infof("VolumeEcShardsCopy: %v", req)
+
+ location := vs.store.FindFreeLocation(types.HardDriveType)
if location == nil {
return nil, fmt.Errorf("no space left")
}
- baseFileName := storage.VolumeFileName(location.Directory, req.Collection, int(req.VolumeId))
+ dataBaseFileName := storage.VolumeFileName(location.Directory, req.Collection, int(req.VolumeId))
+ indexBaseFileName := storage.VolumeFileName(location.IdxDirectory, req.Collection, int(req.VolumeId))
err := operation.WithVolumeServerClient(req.SourceDataNode, vs.grpcDialOption, func(client volume_server_pb.VolumeServerClient) error {
// copy ec data slices
for _, shardId := range req.ShardIds {
- if err := vs.doCopyFile(ctx, client, true, req.Collection, req.VolumeId, math.MaxUint32, math.MaxInt64, baseFileName, erasure_coding.ToExt(int(shardId)), false, false); err != nil {
+ if err := vs.doCopyFile(client, true, req.Collection, req.VolumeId, math.MaxUint32, math.MaxInt64, dataBaseFileName, erasure_coding.ToExt(int(shardId)), false, false); err != nil {
return err
}
}
@@ -118,7 +125,7 @@ func (vs *VolumeServer) VolumeEcShardsCopy(ctx context.Context, req *volume_serv
if req.CopyEcxFile {
// copy ecx file
- if err := vs.doCopyFile(ctx, client, true, req.Collection, req.VolumeId, math.MaxUint32, math.MaxInt64, baseFileName, ".ecx", false, false); err != nil {
+ if err := vs.doCopyFile(client, true, req.Collection, req.VolumeId, math.MaxUint32, math.MaxInt64, indexBaseFileName, ".ecx", false, false); err != nil {
return err
}
return nil
@@ -126,14 +133,14 @@ func (vs *VolumeServer) VolumeEcShardsCopy(ctx context.Context, req *volume_serv
if req.CopyEcjFile {
// copy ecj file
- if err := vs.doCopyFile(ctx, client, true, req.Collection, req.VolumeId, math.MaxUint32, math.MaxInt64, baseFileName, ".ecj", true, true); err != nil {
+ if err := vs.doCopyFile(client, true, req.Collection, req.VolumeId, math.MaxUint32, math.MaxInt64, indexBaseFileName, ".ecj", true, true); err != nil {
return err
}
}
if req.CopyVifFile {
// copy vif file
- if err := vs.doCopyFile(ctx, client, true, req.Collection, req.VolumeId, math.MaxUint32, math.MaxInt64, baseFileName, ".vif", false, true); err != nil {
+ if err := vs.doCopyFile(client, true, req.Collection, req.VolumeId, math.MaxUint32, math.MaxInt64, dataBaseFileName, ".vif", false, true); err != nil {
return err
}
}
@@ -151,17 +158,19 @@ func (vs *VolumeServer) VolumeEcShardsCopy(ctx context.Context, req *volume_serv
// the shard should not be mounted before calling this.
func (vs *VolumeServer) VolumeEcShardsDelete(ctx context.Context, req *volume_server_pb.VolumeEcShardsDeleteRequest) (*volume_server_pb.VolumeEcShardsDeleteResponse, error) {
- baseFilename := erasure_coding.EcShardBaseFileName(req.Collection, int(req.VolumeId))
+ bName := erasure_coding.EcShardBaseFileName(req.Collection, int(req.VolumeId))
glog.V(0).Infof("ec volume %d shard delete %v", req.VolumeId, req.ShardIds)
found := false
+ var indexBaseFilename, dataBaseFilename string
for _, location := range vs.store.Locations {
- if util.FileExists(path.Join(location.Directory, baseFilename+".ecx")) {
+ if util.FileExists(path.Join(location.IdxDirectory, bName+".ecx")) {
found = true
- baseFilename = path.Join(location.Directory, baseFilename)
+ indexBaseFilename = path.Join(location.IdxDirectory, bName)
+ dataBaseFilename = path.Join(location.Directory, bName)
for _, shardId := range req.ShardIds {
- os.Remove(baseFilename + erasure_coding.ToExt(int(shardId)))
+ os.Remove(dataBaseFilename + erasure_coding.ToExt(int(shardId)))
}
break
}
@@ -173,19 +182,30 @@ func (vs *VolumeServer) VolumeEcShardsDelete(ctx context.Context, req *volume_se
// check whether to delete the .ecx and .ecj file also
hasEcxFile := false
+ hasIdxFile := false
existingShardCount := 0
- bName := filepath.Base(baseFilename)
for _, location := range vs.store.Locations {
fileInfos, err := ioutil.ReadDir(location.Directory)
if err != nil {
continue
}
+ if location.IdxDirectory != location.Directory {
+ idxFileInfos, err := ioutil.ReadDir(location.IdxDirectory)
+ if err != nil {
+ continue
+ }
+ fileInfos = append(fileInfos, idxFileInfos...)
+ }
for _, fileInfo := range fileInfos {
if fileInfo.Name() == bName+".ecx" || fileInfo.Name() == bName+".ecj" {
hasEcxFile = true
continue
}
+ if fileInfo.Name() == bName+".idx" {
+ hasIdxFile = true
+ continue
+ }
if strings.HasPrefix(fileInfo.Name(), bName+".ec") {
existingShardCount++
}
@@ -193,12 +213,14 @@ func (vs *VolumeServer) VolumeEcShardsDelete(ctx context.Context, req *volume_se
}
if hasEcxFile && existingShardCount == 0 {
- if err := os.Remove(baseFilename + ".ecx"); err != nil {
- return nil, err
- }
- if err := os.Remove(baseFilename + ".ecj"); err != nil {
+ if err := os.Remove(indexBaseFilename + ".ecx"); err != nil {
return nil, err
}
+ os.Remove(indexBaseFilename + ".ecj")
+ }
+ if !hasIdxFile {
+ // .vif is used for ec volumes and normal volumes
+ os.Remove(dataBaseFilename + ".vif")
}
return &volume_server_pb.VolumeEcShardsDeleteResponse{}, nil
@@ -206,6 +228,8 @@ func (vs *VolumeServer) VolumeEcShardsDelete(ctx context.Context, req *volume_se
func (vs *VolumeServer) VolumeEcShardsMount(ctx context.Context, req *volume_server_pb.VolumeEcShardsMountRequest) (*volume_server_pb.VolumeEcShardsMountResponse, error) {
+ glog.V(0).Infof("VolumeEcShardsMount: %v", req)
+
for _, shardId := range req.ShardIds {
err := vs.store.MountEcShards(req.Collection, needle.VolumeId(req.VolumeId), erasure_coding.ShardId(shardId))
@@ -225,6 +249,8 @@ func (vs *VolumeServer) VolumeEcShardsMount(ctx context.Context, req *volume_ser
func (vs *VolumeServer) VolumeEcShardsUnmount(ctx context.Context, req *volume_server_pb.VolumeEcShardsUnmountRequest) (*volume_server_pb.VolumeEcShardsUnmountResponse, error) {
+ glog.V(0).Infof("VolumeEcShardsUnmount: %v", req)
+
for _, shardId := range req.ShardIds {
err := vs.store.UnmountEcShards(needle.VolumeId(req.VolumeId), erasure_coding.ShardId(shardId))
@@ -255,7 +281,7 @@ func (vs *VolumeServer) VolumeEcShardRead(req *volume_server_pb.VolumeEcShardRea
if req.FileKey != 0 {
_, size, _ := ecVolume.FindNeedleFromEcx(types.Uint64ToNeedleId(req.FileKey))
- if size == types.TombstoneFileSize {
+ if size.IsDeleted() {
return stream.Send(&volume_server_pb.VolumeEcShardReadResponse{
IsDeleted: true,
})
@@ -312,6 +338,8 @@ func (vs *VolumeServer) VolumeEcShardRead(req *volume_server_pb.VolumeEcShardRea
func (vs *VolumeServer) VolumeEcBlobDelete(ctx context.Context, req *volume_server_pb.VolumeEcBlobDeleteRequest) (*volume_server_pb.VolumeEcBlobDeleteResponse, error) {
+ glog.V(0).Infof("VolumeEcBlobDelete: %v", req)
+
resp := &volume_server_pb.VolumeEcBlobDeleteResponse{}
for _, location := range vs.store.Locations {
@@ -321,7 +349,7 @@ func (vs *VolumeServer) VolumeEcBlobDelete(ctx context.Context, req *volume_serv
if err != nil {
return nil, fmt.Errorf("locate in local ec volume: %v", err)
}
- if size == types.TombstoneFileSize {
+ if size.IsDeleted() {
return resp, nil
}
@@ -340,30 +368,32 @@ func (vs *VolumeServer) VolumeEcBlobDelete(ctx context.Context, req *volume_serv
// VolumeEcShardsToVolume generates the .idx, .dat files from .ecx, .ecj and .ec01 ~ .ec14 files
func (vs *VolumeServer) VolumeEcShardsToVolume(ctx context.Context, req *volume_server_pb.VolumeEcShardsToVolumeRequest) (*volume_server_pb.VolumeEcShardsToVolumeResponse, error) {
+ glog.V(0).Infof("VolumeEcShardsToVolume: %v", req)
+
v, found := vs.store.FindEcVolume(needle.VolumeId(req.VolumeId))
if !found {
return nil, fmt.Errorf("ec volume %d not found", req.VolumeId)
}
- baseFileName := v.FileName()
if v.Collection != req.Collection {
return nil, fmt.Errorf("existing collection:%v unexpected input: %v", v.Collection, req.Collection)
}
+ dataBaseFileName, indexBaseFileName := v.DataBaseFileName(), v.IndexBaseFileName()
// calculate .dat file size
- datFileSize, err := erasure_coding.FindDatFileSize(baseFileName)
+ datFileSize, err := erasure_coding.FindDatFileSize(dataBaseFileName, indexBaseFileName)
if err != nil {
- return nil, fmt.Errorf("FindDatFileSize %s: %v", baseFileName, err)
+ return nil, fmt.Errorf("FindDatFileSize %s: %v", dataBaseFileName, err)
}
// write .dat file from .ec00 ~ .ec09 files
- if err := erasure_coding.WriteDatFile(baseFileName, datFileSize); err != nil {
- return nil, fmt.Errorf("WriteEcFiles %s: %v", baseFileName, err)
+ if err := erasure_coding.WriteDatFile(dataBaseFileName, datFileSize); err != nil {
+ return nil, fmt.Errorf("WriteEcFiles %s: %v", dataBaseFileName, err)
}
// write .idx file from .ecx and .ecj files
- if err := erasure_coding.WriteIdxFileFromEcIndex(baseFileName); err != nil {
- return nil, fmt.Errorf("WriteIdxFileFromEcIndex %s: %v", baseFileName, err)
+ if err := erasure_coding.WriteIdxFileFromEcIndex(indexBaseFileName); err != nil {
+ return nil, fmt.Errorf("WriteIdxFileFromEcIndex %s: %v", v.IndexBaseFileName(), err)
}
return &volume_server_pb.VolumeEcShardsToVolumeResponse{}, nil
diff --git a/weed/server/volume_grpc_query.go b/weed/server/volume_grpc_query.go
index 767e28e7b..2f4fab96a 100644
--- a/weed/server/volume_grpc_query.go
+++ b/weed/server/volume_grpc_query.go
@@ -24,7 +24,7 @@ func (vs *VolumeServer) Query(req *volume_server_pb.QueryRequest, stream volume_
n.ParsePath(id_cookie)
cookie := n.Cookie
- if _, err := vs.store.ReadVolumeNeedle(volumeId, n); err != nil {
+ if _, err := vs.store.ReadVolumeNeedle(volumeId, n, nil); err != nil {
glog.V(0).Infof("volume query failed to read fid %s: %v", fid, err)
return err
}
diff --git a/weed/server/volume_grpc_read_write.go b/weed/server/volume_grpc_read_write.go
new file mode 100644
index 000000000..988e9e145
--- /dev/null
+++ b/weed/server/volume_grpc_read_write.go
@@ -0,0 +1,38 @@
+package weed_server
+
+import (
+ "context"
+ "fmt"
+ "github.com/chrislusf/seaweedfs/weed/pb/volume_server_pb"
+ "github.com/chrislusf/seaweedfs/weed/storage/needle"
+ "github.com/chrislusf/seaweedfs/weed/storage/types"
+)
+
+func (vs *VolumeServer) ReadNeedleBlob(ctx context.Context, req *volume_server_pb.ReadNeedleBlobRequest) (resp *volume_server_pb.ReadNeedleBlobResponse, err error) {
+ resp = &volume_server_pb.ReadNeedleBlobResponse{}
+ v := vs.store.GetVolume(needle.VolumeId(req.VolumeId))
+ if v == nil {
+ return nil, fmt.Errorf("not found volume id %d", req.VolumeId)
+ }
+
+ resp.NeedleBlob, err = v.ReadNeedleBlob(req.Offset, types.Size(req.Size))
+ if err != nil {
+ return nil, fmt.Errorf("read needle blob offset %d size %d: %v", req.Offset, req.Size, err)
+ }
+
+ return resp, nil
+}
+
+func (vs *VolumeServer) WriteNeedleBlob(ctx context.Context, req *volume_server_pb.WriteNeedleBlobRequest) (resp *volume_server_pb.WriteNeedleBlobResponse, err error) {
+ resp = &volume_server_pb.WriteNeedleBlobResponse{}
+ v := vs.store.GetVolume(needle.VolumeId(req.VolumeId))
+ if v == nil {
+ return nil, fmt.Errorf("not found volume id %d", req.VolumeId)
+ }
+
+ if err = v.WriteNeedleBlob(types.NeedleId(req.NeedleId), req.NeedleBlob, types.Size(req.Size)); err != nil {
+ return nil, fmt.Errorf("write blob needle %d size %d: %v", req.NeedleId, req.Size, err)
+ }
+
+ return resp, nil
+}
diff --git a/weed/server/volume_grpc_tail.go b/weed/server/volume_grpc_tail.go
index c26d6ed8f..6c039ebf5 100644
--- a/weed/server/volume_grpc_tail.go
+++ b/weed/server/volume_grpc_tail.go
@@ -72,7 +72,7 @@ func sendNeedlesSince(stream volume_server_pb.VolumeServer_VolumeTailSenderServe
stream: stream,
}
- err = storage.ScanVolumeFileFrom(v.Version(), v.DataBackend, foundOffset.ToAcutalOffset(), scanner)
+ err = storage.ScanVolumeFileFrom(v.Version(), v.DataBackend, foundOffset.ToActualOffset(), scanner)
return scanner.lastProcessedTimestampNs, err
@@ -90,7 +90,7 @@ func (vs *VolumeServer) VolumeTailReceiver(ctx context.Context, req *volume_serv
defer glog.V(1).Infof("receive tailing volume %d finished", v.Id)
return resp, operation.TailVolumeFromSource(req.SourceVolumeServer, vs.grpcDialOption, v.Id, req.SinceNs, int(req.IdleTimeoutSeconds), func(n *needle.Needle) error {
- _, _, err := vs.store.WriteVolumeNeedle(v.Id, n)
+ _, err := vs.store.WriteVolumeNeedle(v.Id, n, false)
return err
})
diff --git a/weed/server/volume_grpc_tier_download.go b/weed/server/volume_grpc_tier_download.go
index 7b3982e40..73d8ae7cb 100644
--- a/weed/server/volume_grpc_tier_download.go
+++ b/weed/server/volume_grpc_tier_download.go
@@ -58,9 +58,9 @@ func (vs *VolumeServer) VolumeTierMoveDatFromRemote(req *volume_server_pb.Volume
})
}
// copy the data file
- _, err := backendStorage.DownloadFile(v.FileName()+".dat", storageKey, fn)
+ _, err := backendStorage.DownloadFile(v.FileName(".dat"), storageKey, fn)
if err != nil {
- return fmt.Errorf("backend %s copy file %s: %v", storageName, v.FileName()+".dat", err)
+ return fmt.Errorf("backend %s copy file %s: %v", storageName, v.FileName(".dat"), err)
}
if req.KeepRemoteDatFile {
diff --git a/weed/server/volume_grpc_tier_upload.go b/weed/server/volume_grpc_tier_upload.go
index c9694df59..e51de5f1d 100644
--- a/weed/server/volume_grpc_tier_upload.go
+++ b/weed/server/volume_grpc_tier_upload.go
@@ -93,7 +93,7 @@ func (vs *VolumeServer) VolumeTierMoveDatToRemote(req *volume_server_pb.VolumeTi
}
if !req.KeepLocalDatFile {
- os.Remove(v.FileName() + ".dat")
+ os.Remove(v.FileName(".dat"))
}
return nil
diff --git a/weed/server/volume_grpc_vacuum.go b/weed/server/volume_grpc_vacuum.go
index 24f982241..b87de4b5b 100644
--- a/weed/server/volume_grpc_vacuum.go
+++ b/weed/server/volume_grpc_vacuum.go
@@ -51,6 +51,11 @@ func (vs *VolumeServer) VacuumVolumeCommit(ctx context.Context, req *volume_serv
} else {
glog.V(1).Infof("commit volume %d", req.VolumeId)
}
+ if err == nil {
+ if vs.store.GetVolume(needle.VolumeId(req.VolumeId)).IsReadOnly() {
+ resp.IsReadOnly = true
+ }
+ }
return resp, err
diff --git a/weed/server/volume_server.go b/weed/server/volume_server.go
index 6cf654738..e11d607a4 100644
--- a/weed/server/volume_server.go
+++ b/weed/server/volume_server.go
@@ -2,15 +2,18 @@ package weed_server
import (
"fmt"
+ "github.com/chrislusf/seaweedfs/weed/storage/types"
"net/http"
+ "sync"
- "github.com/chrislusf/seaweedfs/weed/stats"
"google.golang.org/grpc"
+ "github.com/chrislusf/seaweedfs/weed/stats"
+ "github.com/chrislusf/seaweedfs/weed/util"
+
"github.com/chrislusf/seaweedfs/weed/glog"
"github.com/chrislusf/seaweedfs/weed/security"
"github.com/chrislusf/seaweedfs/weed/storage"
- "github.com/spf13/viper"
)
type VolumeServer struct {
@@ -23,27 +26,37 @@ type VolumeServer struct {
guard *security.Guard
grpcDialOption grpc.DialOption
- needleMapKind storage.NeedleMapType
+ needleMapKind storage.NeedleMapKind
FixJpgOrientation bool
ReadRedirect bool
compactionBytePerSecond int64
- MetricsAddress string
- MetricsIntervalSec int
+ metricsAddress string
+ metricsIntervalSec int
+ fileSizeLimitBytes int64
+ isHeartbeating bool
+ stopChan chan bool
+
+ inFlightDataSize int64
+ inFlightDataLimitCond *sync.Cond
+ concurrentUploadLimit int64
}
func NewVolumeServer(adminMux, publicMux *http.ServeMux, ip string,
port int, publicUrl string,
- folders []string, maxCounts []int,
- needleMapKind storage.NeedleMapType,
+ folders []string, maxCounts []int, minFreeSpacePercents []float32, diskTypes []types.DiskType,
+ idxFolder string,
+ needleMapKind storage.NeedleMapKind,
masterNodes []string, pulseSeconds int,
dataCenter string, rack string,
whiteList []string,
fixJpgOrientation bool,
readRedirect bool,
compactionMBPerSecond int,
+ fileSizeLimitMB int,
+ concurrentUploadLimit int64,
) *VolumeServer {
- v := viper.GetViper()
+ v := util.GetViper()
signingKey := v.GetString("jwt.signing.key")
v.SetDefault("jwt.signing.expires_after_seconds", 10)
expiresAfterSec := v.GetInt("jwt.signing.expires_after_seconds")
@@ -60,22 +73,31 @@ func NewVolumeServer(adminMux, publicMux *http.ServeMux, ip string,
needleMapKind: needleMapKind,
FixJpgOrientation: fixJpgOrientation,
ReadRedirect: readRedirect,
- grpcDialOption: security.LoadClientTLS(viper.Sub("grpc"), "volume"),
+ grpcDialOption: security.LoadClientTLS(util.GetViper(), "grpc.volume"),
compactionBytePerSecond: int64(compactionMBPerSecond) * 1024 * 1024,
+ fileSizeLimitBytes: int64(fileSizeLimitMB) * 1024 * 1024,
+ isHeartbeating: true,
+ stopChan: make(chan bool),
+ inFlightDataLimitCond: sync.NewCond(new(sync.Mutex)),
+ concurrentUploadLimit: concurrentUploadLimit,
}
vs.SeedMasterNodes = masterNodes
- vs.store = storage.NewStore(vs.grpcDialOption, port, ip, publicUrl, folders, maxCounts, vs.needleMapKind)
+ vs.checkWithMaster()
+
+ vs.store = storage.NewStore(vs.grpcDialOption, port, ip, publicUrl, folders, maxCounts, minFreeSpacePercents, idxFolder, vs.needleMapKind, diskTypes)
vs.guard = security.NewGuard(whiteList, signingKey, expiresAfterSec, readSigningKey, readExpiresAfterSec)
handleStaticResources(adminMux)
+ adminMux.HandleFunc("/status", vs.statusHandler)
if signingKey == "" || enableUiAccess {
// only expose the volume server details for safe environments
adminMux.HandleFunc("/ui/index.html", vs.uiStatusHandler)
- adminMux.HandleFunc("/status", vs.guard.WhiteList(vs.statusHandler))
- adminMux.HandleFunc("/stats/counter", vs.guard.WhiteList(statsCounterHandler))
- adminMux.HandleFunc("/stats/memory", vs.guard.WhiteList(statsMemoryHandler))
- adminMux.HandleFunc("/stats/disk", vs.guard.WhiteList(vs.statsDiskHandler))
+ /*
+ adminMux.HandleFunc("/stats/counter", vs.guard.WhiteList(statsCounterHandler))
+ adminMux.HandleFunc("/stats/memory", vs.guard.WhiteList(statsMemoryHandler))
+ adminMux.HandleFunc("/stats/disk", vs.guard.WhiteList(vs.statsDiskHandler))
+ */
}
adminMux.HandleFunc("/", vs.privateStoreHandler)
if publicMux != adminMux {
@@ -85,11 +107,7 @@ func NewVolumeServer(adminMux, publicMux *http.ServeMux, ip string,
}
go vs.heartbeat()
- hostAddress := fmt.Sprintf("%s:%d", ip, port)
- go stats.LoopPushingMetric("volumeServer", hostAddress, stats.VolumeServerGather,
- func() (addr string, intervalSeconds int) {
- return vs.MetricsAddress, vs.MetricsIntervalSec
- })
+ go stats.LoopPushingMetric("volumeServer", fmt.Sprintf("%s:%d", ip, port), vs.metricsAddress, vs.metricsIntervalSec)
return vs
}
diff --git a/weed/server/volume_server_handlers.go b/weed/server/volume_server_handlers.go
index 14ad27d42..4527add44 100644
--- a/weed/server/volume_server_handlers.go
+++ b/weed/server/volume_server_handlers.go
@@ -2,7 +2,11 @@ package weed_server
import (
"net/http"
+ "strconv"
"strings"
+ "sync/atomic"
+
+ "github.com/chrislusf/seaweedfs/weed/util"
"github.com/chrislusf/seaweedfs/weed/glog"
"github.com/chrislusf/seaweedfs/weed/security"
@@ -25,6 +29,11 @@ security settings:
*/
func (vs *VolumeServer) privateStoreHandler(w http.ResponseWriter, r *http.Request) {
+ w.Header().Set("Server", "SeaweedFS Volume "+util.VERSION)
+ if r.Header.Get("Origin") != "" {
+ w.Header().Set("Access-Control-Allow-Origin", "*")
+ w.Header().Set("Access-Control-Allow-Credentials", "true")
+ }
switch r.Method {
case "GET", "HEAD":
stats.ReadRequest()
@@ -33,12 +42,49 @@ func (vs *VolumeServer) privateStoreHandler(w http.ResponseWriter, r *http.Reque
stats.DeleteRequest()
vs.guard.WhiteList(vs.DeleteHandler)(w, r)
case "PUT", "POST":
+
+ // wait until in flight data is less than the limit
+ contentLength := getContentLength(r)
+ vs.inFlightDataLimitCond.L.Lock()
+ for atomic.LoadInt64(&vs.inFlightDataSize) > vs.concurrentUploadLimit {
+ vs.inFlightDataLimitCond.Wait()
+ }
+ atomic.AddInt64(&vs.inFlightDataSize, contentLength)
+ vs.inFlightDataLimitCond.L.Unlock()
+ defer func() {
+ atomic.AddInt64(&vs.inFlightDataSize, -contentLength)
+ vs.inFlightDataLimitCond.Signal()
+ }()
+
+ // processs uploads
stats.WriteRequest()
vs.guard.WhiteList(vs.PostHandler)(w, r)
+
+ case "OPTIONS":
+ stats.ReadRequest()
+ w.Header().Add("Access-Control-Allow-Methods", "PUT, POST, GET, DELETE, OPTIONS")
+ w.Header().Add("Access-Control-Allow-Headers", "*")
}
}
+func getContentLength(r *http.Request) int64 {
+ contentLength := r.Header.Get("Content-Length")
+ if contentLength != "" {
+ length, err := strconv.ParseInt(contentLength, 10, 64)
+ if err != nil {
+ return 0
+ }
+ return length
+ }
+ return 0
+}
+
func (vs *VolumeServer) publicReadOnlyHandler(w http.ResponseWriter, r *http.Request) {
+ w.Header().Set("Server", "SeaweedFS Volume "+util.VERSION)
+ if r.Header.Get("Origin") != "" {
+ w.Header().Set("Access-Control-Allow-Origin", "*")
+ w.Header().Set("Access-Control-Allow-Credentials", "true")
+ }
switch r.Method {
case "GET":
stats.ReadRequest()
@@ -46,6 +92,10 @@ func (vs *VolumeServer) publicReadOnlyHandler(w http.ResponseWriter, r *http.Req
case "HEAD":
stats.ReadRequest()
vs.GetOrHeadHandler(w, r)
+ case "OPTIONS":
+ stats.ReadRequest()
+ w.Header().Add("Access-Control-Allow-Methods", "GET, OPTIONS")
+ w.Header().Add("Access-Control-Allow-Headers", "*")
}
}
diff --git a/weed/server/volume_server_handlers_admin.go b/weed/server/volume_server_handlers_admin.go
index 1938a34c4..7e6c06871 100644
--- a/weed/server/volume_server_handlers_admin.go
+++ b/weed/server/volume_server_handlers_admin.go
@@ -10,19 +10,32 @@ import (
)
func (vs *VolumeServer) statusHandler(w http.ResponseWriter, r *http.Request) {
+ w.Header().Set("Server", "SeaweedFS Volume "+util.VERSION)
m := make(map[string]interface{})
- m["Version"] = util.VERSION
+ m["Version"] = util.Version()
+ var ds []*volume_server_pb.DiskStatus
+ for _, loc := range vs.store.Locations {
+ if dir, e := filepath.Abs(loc.Directory); e == nil {
+ newDiskStatus := stats.NewDiskStatus(dir)
+ newDiskStatus.DiskType = loc.DiskType.String()
+ ds = append(ds, newDiskStatus)
+ }
+ }
+ m["DiskStatuses"] = ds
m["Volumes"] = vs.store.VolumeInfos()
writeJsonQuiet(w, r, http.StatusOK, m)
}
func (vs *VolumeServer) statsDiskHandler(w http.ResponseWriter, r *http.Request) {
+ w.Header().Set("Server", "SeaweedFS Volume "+util.VERSION)
m := make(map[string]interface{})
- m["Version"] = util.VERSION
+ m["Version"] = util.Version()
var ds []*volume_server_pb.DiskStatus
for _, loc := range vs.store.Locations {
if dir, e := filepath.Abs(loc.Directory); e == nil {
- ds = append(ds, stats.NewDiskStatus(dir))
+ newDiskStatus := stats.NewDiskStatus(dir)
+ newDiskStatus.DiskType = loc.DiskType.String()
+ ds = append(ds, newDiskStatus)
}
}
m["DiskStatuses"] = ds
diff --git a/weed/server/volume_server_handlers_read.go b/weed/server/volume_server_handlers_read.go
index cd11356b9..3e977cfd4 100644
--- a/weed/server/volume_server_handlers_read.go
+++ b/weed/server/volume_server_handlers_read.go
@@ -2,33 +2,33 @@ package weed_server
import (
"bytes"
- "context"
+ "encoding/json"
"errors"
"fmt"
"io"
"mime"
- "mime/multipart"
"net/http"
"net/url"
- "path"
+ "path/filepath"
"strconv"
"strings"
"time"
- "encoding/json"
-
"github.com/chrislusf/seaweedfs/weed/glog"
"github.com/chrislusf/seaweedfs/weed/images"
"github.com/chrislusf/seaweedfs/weed/operation"
"github.com/chrislusf/seaweedfs/weed/stats"
+ "github.com/chrislusf/seaweedfs/weed/storage"
"github.com/chrislusf/seaweedfs/weed/storage/needle"
"github.com/chrislusf/seaweedfs/weed/util"
)
-var fileNameEscaper = strings.NewReplacer("\\", "\\\\", "\"", "\\\"")
+var fileNameEscaper = strings.NewReplacer(`\`, `\\`, `"`, `\"`)
func (vs *VolumeServer) GetOrHeadHandler(w http.ResponseWriter, r *http.Request) {
+ glog.V(9).Info(r.Method + " " + r.URL.Path + " " + r.Header.Get("Range"))
+
stats.VolumeServerRequestCounter.WithLabelValues("get").Inc()
start := time.Now()
defer func() { stats.VolumeServerRequestHistogram.WithLabelValues("get").Observe(time.Since(start).Seconds()) }()
@@ -43,18 +43,18 @@ func (vs *VolumeServer) GetOrHeadHandler(w http.ResponseWriter, r *http.Request)
volumeId, err := needle.NewVolumeId(vid)
if err != nil {
- glog.V(2).Infoln("parsing error:", err, r.URL.Path)
+ glog.V(2).Infof("parsing vid %s: %v", r.URL.Path, err)
w.WriteHeader(http.StatusBadRequest)
return
}
err = n.ParsePath(fid)
if err != nil {
- glog.V(2).Infoln("parsing fid error:", err, r.URL.Path)
+ glog.V(2).Infof("parsing fid %s: %v", r.URL.Path, err)
w.WriteHeader(http.StatusBadRequest)
return
}
- glog.V(4).Infoln("volume", volumeId, "reading", n)
+ // glog.V(4).Infoln("volume", volumeId, "reading", n)
hasVolume := vs.store.HasVolume(volumeId)
_, hasEcVolume := vs.store.FindEcVolume(volumeId)
if !hasVolume && !hasEcVolume {
@@ -63,7 +63,7 @@ func (vs *VolumeServer) GetOrHeadHandler(w http.ResponseWriter, r *http.Request)
w.WriteHeader(http.StatusNotFound)
return
}
- lookupResult, err := operation.Lookup(vs.GetMaster(), volumeId.String())
+ lookupResult, err := operation.Lookup(vs.GetMaster, volumeId.String())
glog.V(2).Infoln("volume", volumeId, "found on", lookupResult, "error", err)
if err == nil && len(lookupResult.Locations) > 0 {
u, _ := url.Parse(util.NormalizeUrl(lookupResult.Locations[0].PublicUrl))
@@ -82,15 +82,24 @@ func (vs *VolumeServer) GetOrHeadHandler(w http.ResponseWriter, r *http.Request)
return
}
cookie := n.Cookie
+
+ readOption := &storage.ReadOption{
+ ReadDeleted: r.FormValue("readDeleted") == "true",
+ }
+
var count int
if hasVolume {
- count, err = vs.store.ReadVolumeNeedle(volumeId, n)
+ count, err = vs.store.ReadVolumeNeedle(volumeId, n, readOption)
} else if hasEcVolume {
- count, err = vs.store.ReadEcShardNeedle(context.Background(), volumeId, n)
+ count, err = vs.store.ReadEcShardNeedle(volumeId, n)
+ }
+ if err != nil && err != storage.ErrorDeleted && r.FormValue("type") != "replicate" && hasVolume {
+ glog.V(4).Infof("read needle: %v", err)
+ // start to fix it from other replicas, if not deleted and hasVolume and is not a replicated request
}
- glog.V(4).Infoln("read bytes", count, "error", err)
+ // glog.V(4).Infoln("read bytes", count, "error", err)
if err != nil || count < 0 {
- glog.V(0).Infof("read %s isNormalVolume %v error: %v", r.URL.Path, hasVolume, err)
+ glog.V(3).Infof("read %s isNormalVolume %v error: %v", r.URL.Path, hasVolume, err)
w.WriteHeader(http.StatusNotFound)
return
}
@@ -114,11 +123,7 @@ func (vs *VolumeServer) GetOrHeadHandler(w http.ResponseWriter, r *http.Request)
w.WriteHeader(http.StatusNotModified)
return
}
- if r.Header.Get("ETag-MD5") == "True" {
- setEtag(w, n.MD5())
- } else {
- setEtag(w, n.Etag())
- }
+ setEtag(w, n.Etag())
if n.HasPairs() {
pairMap := make(map[string]string)
@@ -131,14 +136,14 @@ func (vs *VolumeServer) GetOrHeadHandler(w http.ResponseWriter, r *http.Request)
}
}
- if vs.tryHandleChunkedFile(n, filename, w, r) {
+ if vs.tryHandleChunkedFile(n, filename, ext, w, r) {
return
}
if n.NameSize > 0 && filename == "" {
filename = string(n.Name)
if ext == "" {
- ext = path.Ext(filename)
+ ext = filepath.Ext(filename)
}
}
mtype := ""
@@ -149,14 +154,18 @@ func (vs *VolumeServer) GetOrHeadHandler(w http.ResponseWriter, r *http.Request)
}
}
- if ext != ".gz" {
- if n.IsGzipped() {
- if strings.Contains(r.Header.Get("Accept-Encoding"), "gzip") {
- w.Header().Set("Content-Encoding", "gzip")
- } else {
- if n.Data, err = util.UnGzipData(n.Data); err != nil {
- glog.V(0).Infoln("ungzip error:", err, r.URL.Path)
- }
+ if n.IsCompressed() {
+ if _, _, _, shouldResize := shouldResizeImages(ext, r); shouldResize {
+ if n.Data, err = util.DecompressData(n.Data); err != nil {
+ glog.V(0).Infoln("ungzip error:", err, r.URL.Path)
+ }
+ // } else if strings.Contains(r.Header.Get("Accept-Encoding"), "zstd") && util.IsZstdContent(n.Data) {
+ // w.Header().Set("Content-Encoding", "zstd")
+ } else if strings.Contains(r.Header.Get("Accept-Encoding"), "gzip") && util.IsGzippedContent(n.Data) {
+ w.Header().Set("Content-Encoding", "gzip")
+ } else {
+ if n.Data, err = util.DecompressData(n.Data); err != nil {
+ glog.V(0).Infoln("uncompress error:", err, r.URL.Path)
}
}
}
@@ -168,12 +177,12 @@ func (vs *VolumeServer) GetOrHeadHandler(w http.ResponseWriter, r *http.Request)
}
}
-func (vs *VolumeServer) tryHandleChunkedFile(n *needle.Needle, fileName string, w http.ResponseWriter, r *http.Request) (processed bool) {
+func (vs *VolumeServer) tryHandleChunkedFile(n *needle.Needle, fileName string, ext string, w http.ResponseWriter, r *http.Request) (processed bool) {
if !n.IsChunkedManifest() || r.URL.Query().Get("cm") == "false" {
return false
}
- chunkManifest, e := operation.LoadChunkManifest(n.Data, n.IsGzipped())
+ chunkManifest, e := operation.LoadChunkManifest(n.Data, n.IsCompressed())
if e != nil {
glog.V(0).Infof("load chunked manifest (%s) error: %v", r.URL.Path, e)
return false
@@ -182,7 +191,9 @@ func (vs *VolumeServer) tryHandleChunkedFile(n *needle.Needle, fileName string,
fileName = chunkManifest.Name
}
- ext := path.Ext(fileName)
+ if ext == "" {
+ ext = filepath.Ext(fileName)
+ }
mType := ""
if chunkManifest.Mime != "" {
@@ -194,10 +205,7 @@ func (vs *VolumeServer) tryHandleChunkedFile(n *needle.Needle, fileName string,
w.Header().Set("X-File-Store", "chunked")
- chunkedFileReader := &operation.ChunkedFileReader{
- Manifest: chunkManifest,
- Master: vs.GetMaster(),
- }
+ chunkedFileReader := operation.NewChunkedFileReader(chunkManifest.Chunks, vs.GetMaster())
defer chunkedFileReader.Close()
rs := conditionallyResizeImages(chunkedFileReader, ext, r)
@@ -213,129 +221,52 @@ func conditionallyResizeImages(originalDataReaderSeeker io.ReadSeeker, ext strin
if len(ext) > 0 {
ext = strings.ToLower(ext)
}
+ width, height, mode, shouldResize := shouldResizeImages(ext, r)
+ if shouldResize {
+ rs, _, _ = images.Resized(ext, originalDataReaderSeeker, width, height, mode)
+ }
+ return rs
+}
+
+func shouldResizeImages(ext string, r *http.Request) (width, height int, mode string, shouldResize bool) {
if ext == ".png" || ext == ".jpg" || ext == ".jpeg" || ext == ".gif" {
- width, height := 0, 0
if r.FormValue("width") != "" {
width, _ = strconv.Atoi(r.FormValue("width"))
}
if r.FormValue("height") != "" {
height, _ = strconv.Atoi(r.FormValue("height"))
}
- rs, _, _ = images.Resized(ext, originalDataReaderSeeker, width, height, r.FormValue("mode"))
}
- return rs
+ mode = r.FormValue("mode")
+ shouldResize = width > 0 || height > 0
+ return
}
func writeResponseContent(filename, mimeType string, rs io.ReadSeeker, w http.ResponseWriter, r *http.Request) error {
totalSize, e := rs.Seek(0, 2)
if mimeType == "" {
- if ext := path.Ext(filename); ext != "" {
+ if ext := filepath.Ext(filename); ext != "" {
mimeType = mime.TypeByExtension(ext)
}
}
if mimeType != "" {
w.Header().Set("Content-Type", mimeType)
}
- if filename != "" {
- contentDisposition := "inline"
- if r.FormValue("dl") != "" {
- if dl, _ := strconv.ParseBool(r.FormValue("dl")); dl {
- contentDisposition = "attachment"
- }
- }
- w.Header().Set("Content-Disposition", contentDisposition+`; filename="`+fileNameEscaper.Replace(filename)+`"`)
- }
w.Header().Set("Accept-Ranges", "bytes")
+
+ adjustHeaderContentDisposition(w, r, filename)
+
if r.Method == "HEAD" {
w.Header().Set("Content-Length", strconv.FormatInt(totalSize, 10))
return nil
}
- rangeReq := r.Header.Get("Range")
- if rangeReq == "" {
- w.Header().Set("Content-Length", strconv.FormatInt(totalSize, 10))
- if _, e = rs.Seek(0, 0); e != nil {
- return e
- }
- _, e = io.Copy(w, rs)
- return e
- }
- //the rest is dealing with partial content request
- //mostly copy from src/pkg/net/http/fs.go
- ranges, err := parseRange(rangeReq, totalSize)
- if err != nil {
- http.Error(w, err.Error(), http.StatusRequestedRangeNotSatisfiable)
- return nil
- }
- if sumRangesSize(ranges) > totalSize {
- // The total number of bytes in all the ranges
- // is larger than the size of the file by
- // itself, so this is probably an attack, or a
- // dumb client. Ignore the range request.
- return nil
- }
- if len(ranges) == 0 {
- return nil
- }
- if len(ranges) == 1 {
- // RFC 2616, Section 14.16:
- // "When an HTTP message includes the content of a single
- // range (for example, a response to a request for a
- // single range, or to a request for a set of ranges
- // that overlap without any holes), this content is
- // transmitted with a Content-Range header, and a
- // Content-Length header showing the number of bytes
- // actually transferred.
- // ...
- // A response to a request for a single range MUST NOT
- // be sent using the multipart/byteranges media type."
- ra := ranges[0]
- w.Header().Set("Content-Length", strconv.FormatInt(ra.length, 10))
- w.Header().Set("Content-Range", ra.contentRange(totalSize))
- w.WriteHeader(http.StatusPartialContent)
- if _, e = rs.Seek(ra.start, 0); e != nil {
+ processRangeRequest(r, w, totalSize, mimeType, func(writer io.Writer, offset int64, size int64) error {
+ if _, e = rs.Seek(offset, 0); e != nil {
return e
}
-
- _, e = io.CopyN(w, rs, ra.length)
+ _, e = io.CopyN(writer, rs, size)
return e
- }
- // process multiple ranges
- for _, ra := range ranges {
- if ra.start > totalSize {
- http.Error(w, "Out of Range", http.StatusRequestedRangeNotSatisfiable)
- return nil
- }
- }
- sendSize := rangesMIMESize(ranges, mimeType, totalSize)
- pr, pw := io.Pipe()
- mw := multipart.NewWriter(pw)
- w.Header().Set("Content-Type", "multipart/byteranges; boundary="+mw.Boundary())
- sendContent := pr
- defer pr.Close() // cause writing goroutine to fail and exit if CopyN doesn't finish.
- go func() {
- for _, ra := range ranges {
- part, e := mw.CreatePart(ra.mimeHeader(mimeType, totalSize))
- if e != nil {
- pw.CloseWithError(e)
- return
- }
- if _, e = rs.Seek(ra.start, 0); e != nil {
- pw.CloseWithError(e)
- return
- }
- if _, e = io.CopyN(part, rs, ra.length); e != nil {
- pw.CloseWithError(e)
- return
- }
- }
- mw.Close()
- pw.Close()
- }()
- if w.Header().Get("Content-Encoding") == "" {
- w.Header().Set("Content-Length", strconv.FormatInt(sendSize, 10))
- }
- w.WriteHeader(http.StatusPartialContent)
- _, e = io.CopyN(w, sendContent, sendSize)
- return e
+ })
+ return nil
}
diff --git a/weed/server/volume_server_handlers_ui.go b/weed/server/volume_server_handlers_ui.go
index 8d35c9c8b..437e5c45d 100644
--- a/weed/server/volume_server_handlers_ui.go
+++ b/weed/server/volume_server_handlers_ui.go
@@ -13,12 +13,15 @@ import (
)
func (vs *VolumeServer) uiStatusHandler(w http.ResponseWriter, r *http.Request) {
+ w.Header().Set("Server", "SeaweedFS Volume "+util.VERSION)
infos := make(map[string]interface{})
infos["Up Time"] = time.Now().Sub(startTime).String()
var ds []*volume_server_pb.DiskStatus
for _, loc := range vs.store.Locations {
if dir, e := filepath.Abs(loc.Directory); e == nil {
- ds = append(ds, stats.NewDiskStatus(dir))
+ newDiskStatus := stats.NewDiskStatus(dir)
+ newDiskStatus.DiskType = loc.DiskType.String()
+ ds = append(ds, newDiskStatus)
}
}
volumeInfos := vs.store.VolumeInfos()
@@ -40,7 +43,7 @@ func (vs *VolumeServer) uiStatusHandler(w http.ResponseWriter, r *http.Request)
Stats interface{}
Counters *stats.ServerStats
}{
- util.VERSION,
+ util.Version(),
vs.SeedMasterNodes,
normalVolumeInfos,
vs.store.EcVolumes(),
diff --git a/weed/server/volume_server_handlers_write.go b/weed/server/volume_server_handlers_write.go
index 05e21612b..3d752eda6 100644
--- a/weed/server/volume_server_handlers_write.go
+++ b/weed/server/volume_server_handlers_write.go
@@ -1,7 +1,6 @@
package weed_server
import (
- "context"
"errors"
"fmt"
"net/http"
@@ -43,18 +42,19 @@ func (vs *VolumeServer) PostHandler(w http.ResponseWriter, r *http.Request) {
return
}
- needle, originalSize, ne := needle.CreateNeedleFromRequest(r, vs.FixJpgOrientation)
+ reqNeedle, originalSize, contentMd5, ne := needle.CreateNeedleFromRequest(r, vs.FixJpgOrientation, vs.fileSizeLimitBytes)
if ne != nil {
writeJsonError(w, r, http.StatusBadRequest, ne)
return
}
ret := operation.UploadResult{}
- _, isUnchanged, writeError := topology.ReplicatedWrite(vs.GetMaster(), vs.store, volumeId, needle, r)
+ isUnchanged, writeError := topology.ReplicatedWrite(vs.GetMaster, vs.store, volumeId, reqNeedle, r)
- // http 304 status code does not allow body
+ // http 204 status code does not allow body
if writeError == nil && isUnchanged {
- w.WriteHeader(http.StatusNotModified)
+ setEtag(w, reqNeedle.Etag())
+ w.WriteHeader(http.StatusNoContent)
return
}
@@ -63,12 +63,14 @@ func (vs *VolumeServer) PostHandler(w http.ResponseWriter, r *http.Request) {
httpStatus = http.StatusInternalServerError
ret.Error = writeError.Error()
}
- if needle.HasName() {
- ret.Name = string(needle.Name)
+ if reqNeedle.HasName() {
+ ret.Name = string(reqNeedle.Name)
}
ret.Size = uint32(originalSize)
- ret.ETag = needle.Etag()
+ ret.ETag = reqNeedle.Etag()
+ ret.Mime = string(reqNeedle.Mime)
setEtag(w, ret.ETag)
+ w.Header().Set("Content-MD5", contentMd5)
writeJsonQuiet(w, r, httpStatus, ret)
}
@@ -97,12 +99,12 @@ func (vs *VolumeServer) DeleteHandler(w http.ResponseWriter, r *http.Request) {
ecVolume, hasEcVolume := vs.store.FindEcVolume(volumeId)
if hasEcVolume {
- count, err := vs.store.DeleteEcShardNeedle(context.Background(), ecVolume, n, cookie)
+ count, err := vs.store.DeleteEcShardNeedle(ecVolume, n, cookie)
writeDeleteResult(err, count, w, r)
return
}
- _, ok := vs.store.ReadVolumeNeedle(volumeId, n)
+ _, ok := vs.store.ReadVolumeNeedle(volumeId, n, nil)
if ok != nil {
m := make(map[string]uint32)
m["size"] = 0
@@ -119,13 +121,13 @@ func (vs *VolumeServer) DeleteHandler(w http.ResponseWriter, r *http.Request) {
count := int64(n.Size)
if n.IsChunkedManifest() {
- chunkManifest, e := operation.LoadChunkManifest(n.Data, n.IsGzipped())
+ chunkManifest, e := operation.LoadChunkManifest(n.Data, n.IsCompressed())
if e != nil {
writeJsonError(w, r, http.StatusInternalServerError, fmt.Errorf("Load chunks manifest error: %v", e))
return
}
// make sure all chunks had deleted before delete manifest
- if e := chunkManifest.DeleteChunks(vs.GetMaster(), vs.grpcDialOption); e != nil {
+ if e := chunkManifest.DeleteChunks(vs.GetMaster, false, vs.grpcDialOption); e != nil {
writeJsonError(w, r, http.StatusInternalServerError, fmt.Errorf("Delete chunks error: %v", e))
return
}
@@ -140,7 +142,7 @@ func (vs *VolumeServer) DeleteHandler(w http.ResponseWriter, r *http.Request) {
}
}
- _, err := topology.ReplicatedDelete(vs.GetMaster(), vs.store, volumeId, n, r)
+ _, err := topology.ReplicatedDelete(vs.GetMaster, vs.store, volumeId, n, r)
writeDeleteResult(err, count, w, r)
@@ -165,3 +167,11 @@ func setEtag(w http.ResponseWriter, etag string) {
}
}
}
+
+func getEtag(resp *http.Response) (etag string) {
+ etag = resp.Header.Get("ETag")
+ if strings.HasPrefix(etag, "\"") && strings.HasSuffix(etag, "\"") {
+ return etag[1 : len(etag)-1]
+ }
+ return
+}
diff --git a/weed/server/volume_server_tcp_handlers_write.go b/weed/server/volume_server_tcp_handlers_write.go
new file mode 100644
index 000000000..a009611da
--- /dev/null
+++ b/weed/server/volume_server_tcp_handlers_write.go
@@ -0,0 +1,137 @@
+package weed_server
+
+import (
+ "bufio"
+ "fmt"
+ "github.com/chrislusf/seaweedfs/weed/glog"
+ "github.com/chrislusf/seaweedfs/weed/storage/needle"
+ "github.com/chrislusf/seaweedfs/weed/util"
+ "io"
+ "net"
+ "strings"
+)
+
+func (vs *VolumeServer) HandleTcpConnection(c net.Conn) {
+ defer c.Close()
+
+ glog.V(0).Infof("Serving writes from %s", c.RemoteAddr().String())
+
+ bufReader := bufio.NewReaderSize(c, 1024*1024)
+ bufWriter := bufio.NewWriterSize(c, 1024*1024)
+
+ for {
+ cmd, err := bufReader.ReadString('\n')
+ if err != nil {
+ if err != io.EOF {
+ glog.Errorf("read command from %s: %v", c.RemoteAddr().String(), err)
+ }
+ return
+ }
+ cmd = cmd[:len(cmd)-1]
+ switch cmd[0] {
+ case '+':
+ fileId := cmd[1:]
+ err = vs.handleTcpPut(fileId, bufReader)
+ if err == nil {
+ bufWriter.Write([]byte("+OK\n"))
+ } else {
+ bufWriter.Write([]byte("-ERR " + string(err.Error()) + "\n"))
+ }
+ case '-':
+ fileId := cmd[1:]
+ err = vs.handleTcpDelete(fileId)
+ if err == nil {
+ bufWriter.Write([]byte("+OK\n"))
+ } else {
+ bufWriter.Write([]byte("-ERR " + string(err.Error()) + "\n"))
+ }
+ case '?':
+ fileId := cmd[1:]
+ err = vs.handleTcpGet(fileId, bufWriter)
+ case '!':
+ bufWriter.Flush()
+ }
+
+ }
+
+}
+
+func (vs *VolumeServer) handleTcpGet(fileId string, writer *bufio.Writer) (err error) {
+
+ volumeId, n, err2 := vs.parseFileId(fileId)
+ if err2 != nil {
+ return err2
+ }
+
+ volume := vs.store.GetVolume(volumeId)
+ if volume == nil {
+ return fmt.Errorf("volume %d not found", volumeId)
+ }
+
+ err = volume.StreamRead(n, writer)
+ if err != nil {
+ return err
+ }
+
+ return nil
+}
+
+func (vs *VolumeServer) handleTcpPut(fileId string, bufReader *bufio.Reader) (err error) {
+
+ volumeId, n, err2 := vs.parseFileId(fileId)
+ if err2 != nil {
+ return err2
+ }
+
+ volume := vs.store.GetVolume(volumeId)
+ if volume == nil {
+ return fmt.Errorf("volume %d not found", volumeId)
+ }
+
+ sizeBuf := make([]byte, 4)
+ if _, err = bufReader.Read(sizeBuf); err != nil {
+ return err
+ }
+ dataSize := util.BytesToUint32(sizeBuf)
+
+ err = volume.StreamWrite(n, bufReader, dataSize)
+ if err != nil {
+ return err
+ }
+
+ return nil
+}
+
+func (vs *VolumeServer) handleTcpDelete(fileId string) (err error) {
+
+ volumeId, n, err2 := vs.parseFileId(fileId)
+ if err2 != nil {
+ return err2
+ }
+
+ _, err = vs.store.DeleteVolumeNeedle(volumeId, n)
+ if err != nil {
+ return err
+ }
+
+ return nil
+}
+
+func (vs *VolumeServer) parseFileId(fileId string) (needle.VolumeId, *needle.Needle, error) {
+
+ commaIndex := strings.LastIndex(fileId, ",")
+ if commaIndex <= 0 {
+ return 0, nil, fmt.Errorf("unknown fileId %s", fileId)
+ }
+
+ vid, fid := fileId[0:commaIndex], fileId[commaIndex+1:]
+
+ volumeId, ve := needle.NewVolumeId(vid)
+ if ve != nil {
+ return 0, nil, fmt.Errorf("unknown volume id in fileId %s", fileId)
+ }
+
+ n := new(needle.Needle)
+ n.ParsePath(fid)
+ return volumeId, n, nil
+}
diff --git a/weed/server/volume_server_ui/templates.go b/weed/server/volume_server_ui/templates.go
index 81496b1de..ee4c2e31d 100644
--- a/weed/server/volume_server_ui/templates.go
+++ b/weed/server/volume_server_ui/templates.go
@@ -1,11 +1,17 @@
-package master_ui
+package volume_server_ui
import (
+ "fmt"
+ "github.com/chrislusf/seaweedfs/weed/util"
"html/template"
"strconv"
"strings"
)
+func percentFrom(total uint64, part_of uint64) string {
+ return fmt.Sprintf("%.2f", (float64(part_of)/float64(total))*100)
+}
+
func join(data []int64) string {
var ret []string
for _, d := range data {
@@ -15,7 +21,9 @@ func join(data []int64) string {
}
var funcMap = template.FuncMap{
- "join": join,
+ "join": join,
+ "bytesToHumanReadable": util.BytesToHumanReadable,
+ "percentFrom": percentFrom,
}
var StatusTpl = template.Must(template.New("status").Funcs(funcMap).Parse(`<!DOCTYPE html>
@@ -57,13 +65,27 @@ var StatusTpl = template.Must(template.New("status").Funcs(funcMap).Parse(`<!DOC
<div class="row">
<div class="col-sm-6">
<h2>Disk Stats</h2>
- <table class="table table-condensed table-striped">
+ <table class="table table-striped">
+ <thead>
+ <tr>
+ <th>Path</th>
+ <th>Disk</th>
+ <th>Total</th>
+ <th>Free</th>
+ <th>Usage</th>
+ </tr>
+ </thead>
+ <tbody>
{{ range .DiskStatuses }}
<tr>
- <th>{{ .Dir }}</th>
- <td>{{ .Free }} Bytes Free</td>
+ <td>{{ .Dir }}</td>
+ <td>{{ .DiskType }}</td>
+ <td>{{ bytesToHumanReadable .All }}</td>
+ <td>{{ bytesToHumanReadable .Free }}</td>
+ <td>{{ percentFrom .All .Used}}%</td>
</tr>
{{ end }}
+ </tbody>
</table>
</div>
@@ -107,6 +129,7 @@ var StatusTpl = template.Must(template.New("status").Funcs(funcMap).Parse(`<!DOC
<tr>
<th>Id</th>
<th>Collection</th>
+ <th>Disk</th>
<th>Data Size</th>
<th>Files</th>
<th>Trash</th>
@@ -119,9 +142,10 @@ var StatusTpl = template.Must(template.New("status").Funcs(funcMap).Parse(`<!DOC
<tr>
<td><code>{{ .Id }}</code></td>
<td>{{ .Collection }}</td>
- <td>{{ .Size }} Bytes</td>
+ <td>{{ .DiskType }}</td>
+ <td>{{ bytesToHumanReadable .Size }}</td>
<td>{{ .FileCount }}</td>
- <td>{{ .DeleteCount }} / {{.DeletedByteCount}} Bytes</td>
+ <td>{{ .DeleteCount }} / {{bytesToHumanReadable .DeletedByteCount}}</td>
<td>{{ .Ttl }}</td>
<td>{{ .ReadOnly }}</td>
</tr>
@@ -149,9 +173,9 @@ var StatusTpl = template.Must(template.New("status").Funcs(funcMap).Parse(`<!DOC
<tr>
<td><code>{{ .Id }}</code></td>
<td>{{ .Collection }}</td>
- <td>{{ .Size }} Bytes</td>
+ <td>{{ bytesToHumanReadable .Size }}</td>
<td>{{ .FileCount }}</td>
- <td>{{ .DeleteCount }} / {{.DeletedByteCount}} Bytes</td>
+ <td>{{ .DeleteCount }} / {{bytesToHumanReadable .DeletedByteCount}}</td>
<td>{{ .RemoteStorageName }}</td>
<td>{{ .RemoteStorageKey }}</td>
</tr>
@@ -177,7 +201,7 @@ var StatusTpl = template.Must(template.New("status").Funcs(funcMap).Parse(`<!DOC
<tr>
<td><code>{{ .VolumeId }}</code></td>
<td>{{ .Collection }}</td>
- <td>{{ .ShardSize }} Bytes</td>
+ <td>{{ bytesToHumanReadable .ShardSize }}</td>
<td>{{ .ShardIdList }}</td>
<td>{{ .CreatedAt.Format "02 Jan 06 15:04 -0700" }}</td>
</tr>
diff --git a/weed/server/webdav_server.go b/weed/server/webdav_server.go
index abd0b66eb..c3f68fdee 100644
--- a/weed/server/webdav_server.go
+++ b/weed/server/webdav_server.go
@@ -5,21 +5,23 @@ import (
"context"
"fmt"
"io"
+ "math"
"os"
"path"
"strings"
"time"
+ "github.com/chrislusf/seaweedfs/weed/util/buffered_writer"
"golang.org/x/net/webdav"
"google.golang.org/grpc"
"github.com/chrislusf/seaweedfs/weed/operation"
+ "github.com/chrislusf/seaweedfs/weed/pb"
"github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
"github.com/chrislusf/seaweedfs/weed/util"
+ "github.com/chrislusf/seaweedfs/weed/util/chunk_cache"
- "github.com/spf13/viper"
-
- "github.com/chrislusf/seaweedfs/weed/filer2"
+ "github.com/chrislusf/seaweedfs/weed/filer"
"github.com/chrislusf/seaweedfs/weed/glog"
"github.com/chrislusf/seaweedfs/weed/security"
)
@@ -31,14 +33,19 @@ type WebDavOption struct {
BucketsPath string
GrpcDialOption grpc.DialOption
Collection string
+ Replication string
+ DiskType string
Uid uint32
Gid uint32
+ Cipher bool
+ CacheDir string
+ CacheSizeMB int64
}
type WebDavServer struct {
option *WebDavOption
secret security.SigningKey
- filer *filer2.Filer
+ filer *filer.Filer
grpcDialOption grpc.DialOption
Handler *webdav.Handler
}
@@ -49,7 +56,7 @@ func NewWebDavServer(option *WebDavOption) (ws *WebDavServer, err error) {
ws = &WebDavServer{
option: option,
- grpcDialOption: security.LoadClientTLS(viper.Sub("grpc"), "filer"),
+ grpcDialOption: security.LoadClientTLS(util.GetViper(), "grpc.filer"),
Handler: &webdav.Handler{
FileSystem: fs,
LockSystem: webdav.NewMemLS(),
@@ -64,8 +71,10 @@ func NewWebDavServer(option *WebDavOption) (ws *WebDavServer, err error) {
type WebDavFileSystem struct {
option *WebDavOption
secret security.SigningKey
- filer *filer2.Filer
+ filer *filer.Filer
grpcDialOption grpc.DialOption
+ chunkCache *chunk_cache.TieredChunkCache
+ signature int32
}
type FileInfo struct {
@@ -89,23 +98,40 @@ type WebDavFile struct {
isDirectory bool
off int64
entry *filer_pb.Entry
- entryViewCache []filer2.VisibleInterval
+ entryViewCache []filer.VisibleInterval
+ reader io.ReaderAt
+ bufWriter *buffered_writer.BufferedWriteCloser
+ collection string
+ replication string
}
func NewWebDavFileSystem(option *WebDavOption) (webdav.FileSystem, error) {
+
+ cacheUniqueId := util.Md5String([]byte("webdav" + option.FilerGrpcAddress + util.Version()))[0:8]
+ cacheDir := path.Join(option.CacheDir, cacheUniqueId)
+
+ os.MkdirAll(cacheDir, os.FileMode(0755))
+ chunkCache := chunk_cache.NewTieredChunkCache(256, cacheDir, option.CacheSizeMB, 1024*1024)
return &WebDavFileSystem{
- option: option,
+ option: option,
+ chunkCache: chunkCache,
+ signature: util.RandomInt32(),
}, nil
}
-func (fs *WebDavFileSystem) WithFilerClient(ctx context.Context, fn func(filer_pb.SeaweedFilerClient) error) error {
+var _ = filer_pb.FilerClient(&WebDavFileSystem{})
- return util.WithCachedGrpcClient(ctx, func(grpcConnection *grpc.ClientConn) error {
+func (fs *WebDavFileSystem) WithFilerClient(fn func(filer_pb.SeaweedFilerClient) error) error {
+
+ return pb.WithCachedGrpcClient(func(grpcConnection *grpc.ClientConn) error {
client := filer_pb.NewSeaweedFilerClient(grpcConnection)
return fn(client)
}, fs.option.FilerGrpcAddress, fs.option.GrpcDialOption)
}
+func (fs *WebDavFileSystem) AdjustedUrl(location *filer_pb.Location) string {
+ return location.Url
+}
func clearName(name string) (string, error) {
slashed := strings.HasSuffix(name, "/")
@@ -137,8 +163,8 @@ func (fs *WebDavFileSystem) Mkdir(ctx context.Context, fullDirPath string, perm
return os.ErrExist
}
- return fs.WithFilerClient(ctx, func(client filer_pb.SeaweedFilerClient) error {
- dir, name := filer2.FullPath(fullDirPath).DirAndName()
+ return fs.WithFilerClient(func(client filer_pb.SeaweedFilerClient) error {
+ dir, name := util.FullPath(fullDirPath).DirAndName()
request := &filer_pb.CreateEntryRequest{
Directory: dir,
Entry: &filer_pb.Entry{
@@ -152,10 +178,11 @@ func (fs *WebDavFileSystem) Mkdir(ctx context.Context, fullDirPath string, perm
Gid: fs.option.Gid,
},
},
+ Signatures: []int32{fs.signature},
}
glog.V(1).Infof("mkdir: %v", request)
- if _, err := client.CreateEntry(ctx, request); err != nil {
+ if err := filer_pb.CreateEntry(client, request); err != nil {
return fmt.Errorf("mkdir %s/%s: %v", dir, name, err)
}
@@ -185,9 +212,9 @@ func (fs *WebDavFileSystem) OpenFile(ctx context.Context, fullFilePath string, f
fs.removeAll(ctx, fullFilePath)
}
- dir, name := filer2.FullPath(fullFilePath).DirAndName()
- err = fs.WithFilerClient(ctx, func(client filer_pb.SeaweedFilerClient) error {
- if _, err := client.CreateEntry(ctx, &filer_pb.CreateEntryRequest{
+ dir, name := util.FullPath(fullFilePath).DirAndName()
+ err = fs.WithFilerClient(func(client filer_pb.SeaweedFilerClient) error {
+ if err := filer_pb.CreateEntry(client, &filer_pb.CreateEntryRequest{
Directory: dir,
Entry: &filer_pb.Entry{
Name: name,
@@ -199,10 +226,11 @@ func (fs *WebDavFileSystem) OpenFile(ctx context.Context, fullFilePath string, f
Uid: fs.option.Uid,
Gid: fs.option.Gid,
Collection: fs.option.Collection,
- Replication: "000",
+ Replication: fs.option.Replication,
TtlSec: 0,
},
},
+ Signatures: []int32{fs.signature},
}); err != nil {
return fmt.Errorf("create %s: %v", fullFilePath, err)
}
@@ -215,6 +243,7 @@ func (fs *WebDavFileSystem) OpenFile(ctx context.Context, fullFilePath string, f
fs: fs,
name: fullFilePath,
isDirectory: false,
+ bufWriter: buffered_writer.NewBufferedWriteCloser(4 * 1024 * 1024),
}, nil
}
@@ -230,6 +259,7 @@ func (fs *WebDavFileSystem) OpenFile(ctx context.Context, fullFilePath string, f
fs: fs,
name: fullFilePath,
isDirectory: false,
+ bufWriter: buffered_writer.NewBufferedWriteCloser(4 * 1024 * 1024),
}, nil
}
@@ -240,34 +270,10 @@ func (fs *WebDavFileSystem) removeAll(ctx context.Context, fullFilePath string)
return err
}
- fi, err := fs.stat(ctx, fullFilePath)
- if err != nil {
- return err
- }
+ dir, name := util.FullPath(fullFilePath).DirAndName()
- if fi.IsDir() {
- //_, err = fs.db.Exec(`delete from filesystem where fullFilePath like $1 escape '\'`, strings.Replace(fullFilePath, `%`, `\%`, -1)+`%`)
- } else {
- //_, err = fs.db.Exec(`delete from filesystem where fullFilePath = ?`, fullFilePath)
- }
- dir, name := filer2.FullPath(fullFilePath).DirAndName()
- err = fs.WithFilerClient(ctx, func(client filer_pb.SeaweedFilerClient) error {
-
- request := &filer_pb.DeleteEntryRequest{
- Directory: dir,
- Name: name,
- IsDeleteData: true,
- }
+ return filer_pb.Remove(fs, dir, name, true, false, false, false, []int32{fs.signature})
- glog.V(3).Infof("removing entry: %v", request)
- _, err := client.DeleteEntry(ctx, request)
- if err != nil {
- return fmt.Errorf("remove %s: %v", fullFilePath, err)
- }
-
- return nil
- })
- return err
}
func (fs *WebDavFileSystem) RemoveAll(ctx context.Context, name string) error {
@@ -307,10 +313,10 @@ func (fs *WebDavFileSystem) Rename(ctx context.Context, oldName, newName string)
return os.ErrExist
}
- oldDir, oldBaseName := filer2.FullPath(oldName).DirAndName()
- newDir, newBaseName := filer2.FullPath(newName).DirAndName()
+ oldDir, oldBaseName := util.FullPath(oldName).DirAndName()
+ newDir, newBaseName := util.FullPath(newName).DirAndName()
- return fs.WithFilerClient(ctx, func(client filer_pb.SeaweedFilerClient) error {
+ return fs.WithFilerClient(func(client filer_pb.SeaweedFilerClient) error {
request := &filer_pb.AtomicRenameEntryRequest{
OldDirectory: oldDir,
@@ -335,23 +341,23 @@ func (fs *WebDavFileSystem) stat(ctx context.Context, fullFilePath string) (os.F
return nil, err
}
+ fullpath := util.FullPath(fullFilePath)
+
var fi FileInfo
- entry, err := filer2.GetEntry(ctx, fs, fullFilePath)
+ entry, err := filer_pb.GetEntry(fs, fullpath)
if entry == nil {
return nil, os.ErrNotExist
}
if err != nil {
return nil, err
}
- fi.size = int64(filer2.TotalSize(entry.GetChunks()))
- fi.name = fullFilePath
+ fi.size = int64(filer.FileSize(entry))
+ fi.name = string(fullpath)
fi.mode = os.FileMode(entry.Attributes.FileMode)
fi.modifiledTime = time.Unix(entry.Attributes.Mtime, 0)
fi.isDirectory = entry.IsDirectory
- _, fi.name = path.Split(path.Clean(fi.name))
- if fi.name == "" {
- fi.name = "/"
+ if fi.name == "/" {
fi.modifiledTime = time.Now()
fi.isDirectory = true
}
@@ -365,32 +371,21 @@ func (fs *WebDavFileSystem) Stat(ctx context.Context, name string) (os.FileInfo,
return fs.stat(ctx, name)
}
-func (f *WebDavFile) Write(buf []byte) (int, error) {
-
- glog.V(2).Infof("WebDavFileSystem.Write %v", f.name)
-
- var err error
- ctx := context.Background()
- if f.entry == nil {
- f.entry, err = filer2.GetEntry(ctx, f.fs, f.name)
- }
-
- if f.entry == nil {
- return 0, err
- }
- if err != nil {
- return 0, err
- }
+func (f *WebDavFile) saveDataAsChunk(reader io.Reader, name string, offset int64) (chunk *filer_pb.FileChunk, collection, replication string, err error) {
var fileId, host string
var auth security.EncodedJwt
- if err = f.fs.WithFilerClient(ctx, func(client filer_pb.SeaweedFilerClient) error {
+ if flushErr := f.fs.WithFilerClient(func(client filer_pb.SeaweedFilerClient) error {
+
+ ctx := context.Background()
request := &filer_pb.AssignVolumeRequest{
Count: 1,
- Replication: "000",
+ Replication: f.fs.option.Replication,
Collection: f.fs.option.Collection,
+ DiskType: f.fs.option.DiskType,
+ Path: name,
}
resp, err := client.AssignVolume(ctx, request)
@@ -398,79 +393,126 @@ func (f *WebDavFile) Write(buf []byte) (int, error) {
glog.V(0).Infof("assign volume failure %v: %v", request, err)
return err
}
+ if resp.Error != "" {
+ return fmt.Errorf("assign volume failure %v: %v", request, resp.Error)
+ }
fileId, host, auth = resp.FileId, resp.Url, security.EncodedJwt(resp.Auth)
+ f.collection, f.replication = resp.Collection, resp.Replication
return nil
- }); err != nil {
- return 0, fmt.Errorf("filerGrpcAddress assign volume: %v", err)
+ }); flushErr != nil {
+ return nil, f.collection, f.replication, fmt.Errorf("filerGrpcAddress assign volume: %v", flushErr)
}
fileUrl := fmt.Sprintf("http://%s/%s", host, fileId)
- bufReader := bytes.NewReader(buf)
- uploadResult, err := operation.Upload(fileUrl, f.name, bufReader, false, "", nil, auth)
- if err != nil {
- glog.V(0).Infof("upload data %v to %s: %v", f.name, fileUrl, err)
- return 0, fmt.Errorf("upload data: %v", err)
+ uploadResult, flushErr, _ := operation.Upload(fileUrl, f.name, f.fs.option.Cipher, reader, false, "", nil, auth)
+ if flushErr != nil {
+ glog.V(0).Infof("upload data %v to %s: %v", f.name, fileUrl, flushErr)
+ return nil, f.collection, f.replication, fmt.Errorf("upload data: %v", flushErr)
}
if uploadResult.Error != "" {
- glog.V(0).Infof("upload failure %v to %s: %v", f.name, fileUrl, err)
- return 0, fmt.Errorf("upload result: %v", uploadResult.Error)
+ glog.V(0).Infof("upload failure %v to %s: %v", f.name, fileUrl, flushErr)
+ return nil, f.collection, f.replication, fmt.Errorf("upload result: %v", uploadResult.Error)
+ }
+ return uploadResult.ToPbFileChunk(fileId, offset), f.collection, f.replication, nil
+}
+
+func (f *WebDavFile) Write(buf []byte) (int, error) {
+
+ glog.V(2).Infof("WebDavFileSystem.Write %v", f.name)
+
+ dir, _ := util.FullPath(f.name).DirAndName()
+
+ var getErr error
+ ctx := context.Background()
+ if f.entry == nil {
+ f.entry, getErr = filer_pb.GetEntry(f.fs, util.FullPath(f.name))
}
- chunk := &filer_pb.FileChunk{
- FileId: fileId,
- Offset: f.off,
- Size: uint64(len(buf)),
- Mtime: time.Now().UnixNano(),
- ETag: uploadResult.ETag,
+ if f.entry == nil {
+ return 0, getErr
+ }
+ if getErr != nil {
+ return 0, getErr
}
- f.entry.Chunks = append(f.entry.Chunks, chunk)
- dir, _ := filer2.FullPath(f.name).DirAndName()
+ if f.bufWriter.FlushFunc == nil {
+ f.bufWriter.FlushFunc = func(data []byte, offset int64) (flushErr error) {
- err = f.fs.WithFilerClient(ctx, func(client filer_pb.SeaweedFilerClient) error {
- f.entry.Attributes.Mtime = time.Now().Unix()
+ var chunk *filer_pb.FileChunk
+ chunk, f.collection, f.replication, flushErr = f.saveDataAsChunk(bytes.NewReader(data), f.name, offset)
- request := &filer_pb.UpdateEntryRequest{
- Directory: dir,
- Entry: f.entry,
+ if flushErr != nil {
+ return fmt.Errorf("%s upload result: %v", f.name, flushErr)
+ }
+
+ f.entry.Content = nil
+ f.entry.Chunks = append(f.entry.Chunks, chunk)
+
+ return flushErr
}
+ f.bufWriter.CloseFunc = func() error {
+
+ manifestedChunks, manifestErr := filer.MaybeManifestize(f.saveDataAsChunk, f.entry.Chunks)
+ if manifestErr != nil {
+ // not good, but should be ok
+ glog.V(0).Infof("file %s close MaybeManifestize: %v", f.name, manifestErr)
+ } else {
+ f.entry.Chunks = manifestedChunks
+ }
- if _, err := client.UpdateEntry(ctx, request); err != nil {
- return fmt.Errorf("update %s: %v", f.name, err)
+ flushErr := f.fs.WithFilerClient(func(client filer_pb.SeaweedFilerClient) error {
+ f.entry.Attributes.Mtime = time.Now().Unix()
+ f.entry.Attributes.Collection = f.collection
+ f.entry.Attributes.Replication = f.replication
+
+ request := &filer_pb.UpdateEntryRequest{
+ Directory: dir,
+ Entry: f.entry,
+ Signatures: []int32{f.fs.signature},
+ }
+
+ if _, err := client.UpdateEntry(ctx, request); err != nil {
+ return fmt.Errorf("update %s: %v", f.name, err)
+ }
+
+ return nil
+ })
+ return flushErr
}
+ }
- return nil
- })
+ written, err := f.bufWriter.Write(buf)
if err == nil {
glog.V(3).Infof("WebDavFileSystem.Write %v: written [%d,%d)", f.name, f.off, f.off+int64(len(buf)))
- f.off += int64(len(buf))
+ f.off += int64(written)
}
- return len(buf), err
+ return written, err
}
func (f *WebDavFile) Close() error {
glog.V(2).Infof("WebDavFileSystem.Close %v", f.name)
+ err := f.bufWriter.Close()
+
if f.entry != nil {
f.entry = nil
f.entryViewCache = nil
}
- return nil
+ return err
}
func (f *WebDavFile) Read(p []byte) (readSize int, err error) {
glog.V(2).Infof("WebDavFileSystem.Read %v", f.name)
- ctx := context.Background()
if f.entry == nil {
- f.entry, err = filer2.GetEntry(ctx, f.fs, f.name)
+ f.entry, err = filer_pb.GetEntry(f.fs, util.FullPath(f.name))
}
if f.entry == nil {
return 0, err
@@ -478,43 +520,41 @@ func (f *WebDavFile) Read(p []byte) (readSize int, err error) {
if err != nil {
return 0, err
}
- if len(f.entry.Chunks) == 0 {
+ fileSize := int64(filer.FileSize(f.entry))
+ if fileSize == 0 {
return 0, io.EOF
}
if f.entryViewCache == nil {
- f.entryViewCache = filer2.NonOverlappingVisibleIntervals(f.entry.Chunks)
+ f.entryViewCache, _ = filer.NonOverlappingVisibleIntervals(filer.LookupFn(f.fs), f.entry.Chunks)
+ f.reader = nil
}
- chunkViews := filer2.ViewFromVisibleIntervals(f.entryViewCache, f.off, len(p))
-
- totalRead, err := filer2.ReadIntoBuffer(ctx, f.fs, f.name, p, chunkViews, f.off)
- if err != nil {
- return 0, err
+ if f.reader == nil {
+ chunkViews := filer.ViewFromVisibleIntervals(f.entryViewCache, 0, math.MaxInt64)
+ f.reader = filer.NewChunkReaderAtFromClient(filer.LookupFn(f.fs), chunkViews, f.fs.chunkCache, fileSize)
}
- readSize = int(totalRead)
- glog.V(3).Infof("WebDavFileSystem.Read %v: [%d,%d)", f.name, f.off, f.off+totalRead)
+ readSize, err = f.reader.ReadAt(p, f.off)
- f.off += totalRead
- if readSize == 0 {
- return 0, io.EOF
+ glog.V(3).Infof("WebDavFileSystem.Read %v: [%d,%d)", f.name, f.off, f.off+int64(readSize))
+ f.off += int64(readSize)
+
+ if err != nil && err != io.EOF {
+ glog.Errorf("file read %s: %v", f.name, err)
}
return
+
}
func (f *WebDavFile) Readdir(count int) (ret []os.FileInfo, err error) {
glog.V(2).Infof("WebDavFileSystem.Readdir %v count %d", f.name, count)
- ctx := context.Background()
- dir := f.name
- if dir != "/" && strings.HasSuffix(dir, "/") {
- dir = dir[:len(dir)-1]
- }
+ dir, _ := util.FullPath(f.name).DirAndName()
- err = filer2.ReadDirAllEntries(ctx, f.fs, dir, "", func(entry *filer_pb.Entry, isLast bool) {
+ err = filer_pb.ReadDirAllEntries(f.fs, util.FullPath(dir), "", func(entry *filer_pb.Entry, isLast bool) error {
fi := FileInfo{
- size: int64(filer2.TotalSize(entry.GetChunks())),
+ size: int64(filer.FileSize(entry)),
name: entry.Name,
mode: os.FileMode(entry.Attributes.FileMode),
modifiledTime: time.Unix(entry.Attributes.Mtime, 0),
@@ -526,6 +566,7 @@ func (f *WebDavFile) Readdir(count int) (ret []os.FileInfo, err error) {
}
glog.V(4).Infof("entry: %v", fi.name)
ret = append(ret, &fi)
+ return nil
})
old := f.off
@@ -556,9 +597,9 @@ func (f *WebDavFile) Seek(offset int64, whence int) (int64, error) {
var err error
switch whence {
- case 0:
+ case io.SeekStart:
f.off = 0
- case 2:
+ case io.SeekEnd:
if fi, err := f.fs.stat(ctx, f.name); err != nil {
return 0, err
} else {