aboutsummaryrefslogtreecommitdiff
path: root/weed/server/filer_server_handlers_write.go
diff options
context:
space:
mode:
Diffstat (limited to 'weed/server/filer_server_handlers_write.go')
-rw-r--r--weed/server/filer_server_handlers_write.go298
1 files changed, 74 insertions, 224 deletions
diff --git a/weed/server/filer_server_handlers_write.go b/weed/server/filer_server_handlers_write.go
index 236e7027d..95eba9d3d 100644
--- a/weed/server/filer_server_handlers_write.go
+++ b/weed/server/filer_server_handlers_write.go
@@ -2,26 +2,17 @@ package weed_server
import (
"context"
- "encoding/json"
- "errors"
- "fmt"
- "io"
- "io/ioutil"
- "mime"
"net/http"
- "net/url"
"os"
- filenamePath "path"
- "strconv"
"strings"
"time"
- "github.com/chrislusf/seaweedfs/weed/filer2"
"github.com/chrislusf/seaweedfs/weed/glog"
"github.com/chrislusf/seaweedfs/weed/operation"
"github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
"github.com/chrislusf/seaweedfs/weed/security"
"github.com/chrislusf/seaweedfs/weed/stats"
+ "github.com/chrislusf/seaweedfs/weed/storage/needle"
"github.com/chrislusf/seaweedfs/weed/util"
)
@@ -32,271 +23,130 @@ var (
type FilerPostResult struct {
Name string `json:"name,omitempty"`
- Size uint32 `json:"size,omitempty"`
+ Size int64 `json:"size,omitempty"`
Error string `json:"error,omitempty"`
Fid string `json:"fid,omitempty"`
Url string `json:"url,omitempty"`
}
-func (fs *FilerServer) assignNewFileInfo(w http.ResponseWriter, r *http.Request, replication, collection string, dataCenter string) (fileId, urlLocation string, auth security.EncodedJwt, err error) {
+func (fs *FilerServer) assignNewFileInfo(so *operation.StorageOption) (fileId, urlLocation string, auth security.EncodedJwt, err error) {
stats.FilerRequestCounter.WithLabelValues("assign").Inc()
start := time.Now()
defer func() { stats.FilerRequestHistogram.WithLabelValues("assign").Observe(time.Since(start).Seconds()) }()
- ar := &operation.VolumeAssignRequest{
- Count: 1,
- Replication: replication,
- Collection: collection,
- Ttl: r.URL.Query().Get("ttl"),
- DataCenter: dataCenter,
- }
- var altRequest *operation.VolumeAssignRequest
- if dataCenter != "" {
- altRequest = &operation.VolumeAssignRequest{
- Count: 1,
- Replication: replication,
- Collection: collection,
- Ttl: r.URL.Query().Get("ttl"),
- DataCenter: "",
- }
- }
+ ar, altRequest := so.ToAssignRequests(1)
- assignResult, ae := operation.Assign(fs.filer.GetMaster(), fs.grpcDialOption, ar, altRequest)
+ assignResult, ae := operation.Assign(fs.filer.GetMaster, fs.grpcDialOption, ar, altRequest)
if ae != nil {
glog.Errorf("failing to assign a file id: %v", ae)
- writeJsonError(w, r, http.StatusInternalServerError, ae)
err = ae
return
}
fileId = assignResult.Fid
urlLocation = "http://" + assignResult.Url + "/" + assignResult.Fid
+ if so.Fsync {
+ urlLocation += "?fsync=true"
+ }
auth = assignResult.Auth
return
}
-func (fs *FilerServer) PostHandler(w http.ResponseWriter, r *http.Request) {
+func (fs *FilerServer) PostHandler(w http.ResponseWriter, r *http.Request, contentLength int64) {
ctx := context.Background()
query := r.URL.Query()
- replication := query.Get("replication")
- if replication == "" {
- replication = fs.option.DefaultReplication
- }
- collection := query.Get("collection")
- if collection == "" {
- collection = fs.option.Collection
- }
- dataCenter := query.Get("dataCenter")
- if dataCenter == "" {
- dataCenter = fs.option.DataCenter
- }
+ so := fs.detectStorageOption0(r.RequestURI,
+ query.Get("collection"),
+ query.Get("replication"),
+ query.Get("ttl"),
+ query.Get("disk"),
+ query.Get("dataCenter"),
+ query.Get("rack"),
+ )
+
+ fs.autoChunk(ctx, w, r, contentLength, so)
+ util.CloseRequest(r)
- if autoChunked := fs.autoChunk(ctx, w, r, replication, collection, dataCenter); autoChunked {
- return
- }
+}
- fileId, urlLocation, auth, err := fs.assignNewFileInfo(w, r, replication, collection, dataCenter)
+// curl -X DELETE http://localhost:8888/path/to
+// curl -X DELETE http://localhost:8888/path/to?recursive=true
+// curl -X DELETE http://localhost:8888/path/to?recursive=true&ignoreRecursiveError=true
+// curl -X DELETE http://localhost:8888/path/to?recursive=true&skipChunkDeletion=true
+func (fs *FilerServer) DeleteHandler(w http.ResponseWriter, r *http.Request) {
- if err != nil || fileId == "" || urlLocation == "" {
- glog.V(0).Infof("fail to allocate volume for %s, collection:%s, datacenter:%s", r.URL.Path, collection, dataCenter)
- return
+ isRecursive := r.FormValue("recursive") == "true"
+ if !isRecursive && fs.option.recursiveDelete {
+ if r.FormValue("recursive") != "false" {
+ isRecursive = true
+ }
}
+ ignoreRecursiveError := r.FormValue("ignoreRecursiveError") == "true"
+ skipChunkDeletion := r.FormValue("skipChunkDeletion") == "true"
- glog.V(4).Infof("write %s to %v", r.URL.Path, urlLocation)
-
- u, _ := url.Parse(urlLocation)
-
- // This allows a client to generate a chunk manifest and submit it to the filer -- it is a little off
- // because they need to provide FIDs instead of file paths...
- cm, _ := strconv.ParseBool(query.Get("cm"))
- if cm {
- q := u.Query()
- q.Set("cm", "true")
- u.RawQuery = q.Encode()
+ objectPath := r.URL.Path
+ if len(r.URL.Path) > 1 && strings.HasSuffix(objectPath, "/") {
+ objectPath = objectPath[0 : len(objectPath)-1]
}
- glog.V(4).Infoln("post to", u)
- ret, err := fs.uploadToVolumeServer(r, u, auth, w, fileId)
+ err := fs.filer.DeleteEntryMetaAndData(context.Background(), util.FullPath(objectPath), isRecursive, ignoreRecursiveError, !skipChunkDeletion, false, nil)
if err != nil {
+ glog.V(1).Infoln("deleting", objectPath, ":", err.Error())
+ httpStatus := http.StatusInternalServerError
+ if err == filer_pb.ErrNotFound {
+ httpStatus = http.StatusNoContent
+ }
+ writeJsonError(w, r, httpStatus, err)
return
}
- if err = fs.updateFilerStore(ctx, r, w, replication, collection, ret, fileId); err != nil {
- return
- }
-
- // send back post result
- reply := FilerPostResult{
- Name: ret.Name,
- Size: ret.Size,
- Error: ret.Error,
- Fid: fileId,
- Url: urlLocation,
- }
- setEtag(w, ret.ETag)
- writeJsonQuiet(w, r, http.StatusCreated, reply)
+ w.WriteHeader(http.StatusNoContent)
}
-// update metadata in filer store
-func (fs *FilerServer) updateFilerStore(ctx context.Context, r *http.Request, w http.ResponseWriter,
- replication string, collection string, ret operation.UploadResult, fileId string) (err error) {
-
- stats.FilerRequestCounter.WithLabelValues("postStoreWrite").Inc()
- start := time.Now()
- defer func() {
- stats.FilerRequestHistogram.WithLabelValues("postStoreWrite").Observe(time.Since(start).Seconds())
- }()
+func (fs *FilerServer) detectStorageOption(requestURI, qCollection, qReplication string, ttlSeconds int32, diskType string, dataCenter, rack string) *operation.StorageOption {
+ collection := util.Nvl(qCollection, fs.option.Collection)
+ replication := util.Nvl(qReplication, fs.option.DefaultReplication)
- modeStr := r.URL.Query().Get("mode")
- if modeStr == "" {
- modeStr = "0660"
+ // required by buckets folder
+ bucketDefaultReplication, fsync := "", false
+ if strings.HasPrefix(requestURI, fs.filer.DirBucketsPath+"/") {
+ collection = fs.filer.DetectBucket(util.FullPath(requestURI))
+ bucketDefaultReplication, fsync = fs.filer.ReadBucketOption(collection)
}
- mode, err := strconv.ParseUint(modeStr, 8, 32)
- if err != nil {
- glog.Errorf("Invalid mode format: %s, use 0660 by default", modeStr)
- mode = 0660
- }
-
- path := r.URL.Path
- if strings.HasSuffix(path, "/") {
- if ret.Name != "" {
- path += ret.Name
- }
- }
- existingEntry, err := fs.filer.FindEntry(ctx, filer2.FullPath(path))
- crTime := time.Now()
- if err == nil && existingEntry != nil {
- crTime = existingEntry.Crtime
- }
- entry := &filer2.Entry{
- FullPath: filer2.FullPath(path),
- Attr: filer2.Attr{
- Mtime: time.Now(),
- Crtime: crTime,
- Mode: os.FileMode(mode),
- Uid: OS_UID,
- Gid: OS_GID,
- Replication: replication,
- Collection: collection,
- TtlSec: int32(util.ParseInt(r.URL.Query().Get("ttl"), 0)),
- },
- Chunks: []*filer_pb.FileChunk{{
- FileId: fileId,
- Size: uint64(ret.Size),
- Mtime: time.Now().UnixNano(),
- ETag: ret.ETag,
- }},
- }
- if ext := filenamePath.Ext(path); ext != "" {
- entry.Attr.Mime = mime.TypeByExtension(ext)
- }
- // glog.V(4).Infof("saving %s => %+v", path, entry)
- if dbErr := fs.filer.CreateEntry(ctx, entry); dbErr != nil {
- fs.filer.DeleteChunks(entry.Chunks)
- glog.V(0).Infof("failing to write %s to filer server : %v", path, dbErr)
- writeJsonError(w, r, http.StatusInternalServerError, dbErr)
- err = dbErr
- return
+ if replication == "" {
+ replication = bucketDefaultReplication
}
- return nil
-}
-
-// send request to volume server
-func (fs *FilerServer) uploadToVolumeServer(r *http.Request, u *url.URL, auth security.EncodedJwt, w http.ResponseWriter, fileId string) (ret operation.UploadResult, err error) {
-
- stats.FilerRequestCounter.WithLabelValues("postUpload").Inc()
- start := time.Now()
- defer func() { stats.FilerRequestHistogram.WithLabelValues("postUpload").Observe(time.Since(start).Seconds()) }()
+ rule := fs.filer.FilerConf.MatchStorageRule(requestURI)
- request := &http.Request{
- Method: r.Method,
- URL: u,
- Proto: r.Proto,
- ProtoMajor: r.ProtoMajor,
- ProtoMinor: r.ProtoMinor,
- Header: r.Header,
- Body: r.Body,
- Host: r.Host,
- ContentLength: r.ContentLength,
- }
- if auth != "" {
- request.Header.Set("Authorization", "BEARER "+string(auth))
- }
- resp, doErr := util.Do(request)
- if doErr != nil {
- glog.Errorf("failing to connect to volume server %s: %v, %+v", r.RequestURI, doErr, r.Method)
- writeJsonError(w, r, http.StatusInternalServerError, doErr)
- err = doErr
- return
- }
- defer func() {
- io.Copy(ioutil.Discard, resp.Body)
- resp.Body.Close()
- }()
- etag := resp.Header.Get("ETag")
- respBody, raErr := ioutil.ReadAll(resp.Body)
- if raErr != nil {
- glog.V(0).Infoln("failing to upload to volume server", r.RequestURI, raErr.Error())
- writeJsonError(w, r, http.StatusInternalServerError, raErr)
- err = raErr
- return
- }
- glog.V(4).Infoln("post result", string(respBody))
- unmarshalErr := json.Unmarshal(respBody, &ret)
- if unmarshalErr != nil {
- glog.V(0).Infoln("failing to read upload resonse", r.RequestURI, string(respBody))
- writeJsonError(w, r, http.StatusInternalServerError, unmarshalErr)
- err = unmarshalErr
- return
- }
- if ret.Error != "" {
- err = errors.New(ret.Error)
- glog.V(0).Infoln("failing to post to volume server", r.RequestURI, ret.Error)
- writeJsonError(w, r, http.StatusInternalServerError, err)
- return
- }
- // find correct final path
- path := r.URL.Path
- if strings.HasSuffix(path, "/") {
- if ret.Name != "" {
- path += ret.Name
- } else {
- err = fmt.Errorf("can not to write to folder %s without a file name", path)
- fs.filer.DeleteFileByFileId(fileId)
- glog.V(0).Infoln("Can not to write to folder", path, "without a file name!")
- writeJsonError(w, r, http.StatusInternalServerError, err)
- return
+ if ttlSeconds == 0 {
+ ttl, err := needle.ReadTTL(rule.GetTtl())
+ if err != nil {
+ glog.Errorf("fail to parse %s ttl setting %s: %v", rule.LocationPrefix, rule.Ttl, err)
}
+ ttlSeconds = int32(ttl.Minutes()) * 60
}
- if etag != "" {
- ret.ETag = etag
+
+ return &operation.StorageOption{
+ Replication: util.Nvl(replication, rule.Replication),
+ Collection: util.Nvl(collection, rule.Collection),
+ DataCenter: util.Nvl(dataCenter, fs.option.DataCenter),
+ Rack: util.Nvl(rack, fs.option.Rack),
+ TtlSeconds: ttlSeconds,
+ DiskType: util.Nvl(diskType, rule.DiskType),
+ Fsync: fsync || rule.Fsync,
+ VolumeGrowthCount: rule.VolumeGrowthCount,
}
- return
}
-// curl -X DELETE http://localhost:8888/path/to
-// curl -X DELETE http://localhost:8888/path/to?recursive=true
-// curl -X DELETE http://localhost:8888/path/to?recursive=true&ignoreRecursiveError=true
-// curl -X DELETE http://localhost:8888/path/to?recursive=true&skipChunkDeletion=true
-func (fs *FilerServer) DeleteHandler(w http.ResponseWriter, r *http.Request) {
-
- isRecursive := r.FormValue("recursive") == "true"
- ignoreRecursiveError := r.FormValue("ignoreRecursiveError") == "true"
- skipChunkDeletion := r.FormValue("skipChunkDeletion") == "true"
+func (fs *FilerServer) detectStorageOption0(requestURI, qCollection, qReplication string, qTtl string, diskType string, dataCenter, rack string) *operation.StorageOption {
- err := fs.filer.DeleteEntryMetaAndData(context.Background(), filer2.FullPath(r.URL.Path), isRecursive, ignoreRecursiveError, !skipChunkDeletion)
+ ttl, err := needle.ReadTTL(qTtl)
if err != nil {
- glog.V(1).Infoln("deleting", r.URL.Path, ":", err.Error())
- httpStatus := http.StatusInternalServerError
- if err == filer2.ErrNotFound {
- httpStatus = http.StatusNotFound
- }
- writeJsonError(w, r, httpStatus, err)
- return
+ glog.Errorf("fail to parse ttl %s: %v", qTtl, err)
}
- w.WriteHeader(http.StatusNoContent)
+ return fs.detectStorageOption(requestURI, qCollection, qReplication, int32(ttl.Minutes())*60, diskType, dataCenter, rack)
}