diff options
Diffstat (limited to 'weed/server/filer_server_handlers_write.go')
| -rw-r--r-- | weed/server/filer_server_handlers_write.go | 298 |
1 files changed, 74 insertions, 224 deletions
diff --git a/weed/server/filer_server_handlers_write.go b/weed/server/filer_server_handlers_write.go index 236e7027d..95eba9d3d 100644 --- a/weed/server/filer_server_handlers_write.go +++ b/weed/server/filer_server_handlers_write.go @@ -2,26 +2,17 @@ package weed_server import ( "context" - "encoding/json" - "errors" - "fmt" - "io" - "io/ioutil" - "mime" "net/http" - "net/url" "os" - filenamePath "path" - "strconv" "strings" "time" - "github.com/chrislusf/seaweedfs/weed/filer2" "github.com/chrislusf/seaweedfs/weed/glog" "github.com/chrislusf/seaweedfs/weed/operation" "github.com/chrislusf/seaweedfs/weed/pb/filer_pb" "github.com/chrislusf/seaweedfs/weed/security" "github.com/chrislusf/seaweedfs/weed/stats" + "github.com/chrislusf/seaweedfs/weed/storage/needle" "github.com/chrislusf/seaweedfs/weed/util" ) @@ -32,271 +23,130 @@ var ( type FilerPostResult struct { Name string `json:"name,omitempty"` - Size uint32 `json:"size,omitempty"` + Size int64 `json:"size,omitempty"` Error string `json:"error,omitempty"` Fid string `json:"fid,omitempty"` Url string `json:"url,omitempty"` } -func (fs *FilerServer) assignNewFileInfo(w http.ResponseWriter, r *http.Request, replication, collection string, dataCenter string) (fileId, urlLocation string, auth security.EncodedJwt, err error) { +func (fs *FilerServer) assignNewFileInfo(so *operation.StorageOption) (fileId, urlLocation string, auth security.EncodedJwt, err error) { stats.FilerRequestCounter.WithLabelValues("assign").Inc() start := time.Now() defer func() { stats.FilerRequestHistogram.WithLabelValues("assign").Observe(time.Since(start).Seconds()) }() - ar := &operation.VolumeAssignRequest{ - Count: 1, - Replication: replication, - Collection: collection, - Ttl: r.URL.Query().Get("ttl"), - DataCenter: dataCenter, - } - var altRequest *operation.VolumeAssignRequest - if dataCenter != "" { - altRequest = &operation.VolumeAssignRequest{ - Count: 1, - Replication: replication, - Collection: collection, - Ttl: r.URL.Query().Get("ttl"), - DataCenter: "", - } - } + ar, altRequest := so.ToAssignRequests(1) - assignResult, ae := operation.Assign(fs.filer.GetMaster(), fs.grpcDialOption, ar, altRequest) + assignResult, ae := operation.Assign(fs.filer.GetMaster, fs.grpcDialOption, ar, altRequest) if ae != nil { glog.Errorf("failing to assign a file id: %v", ae) - writeJsonError(w, r, http.StatusInternalServerError, ae) err = ae return } fileId = assignResult.Fid urlLocation = "http://" + assignResult.Url + "/" + assignResult.Fid + if so.Fsync { + urlLocation += "?fsync=true" + } auth = assignResult.Auth return } -func (fs *FilerServer) PostHandler(w http.ResponseWriter, r *http.Request) { +func (fs *FilerServer) PostHandler(w http.ResponseWriter, r *http.Request, contentLength int64) { ctx := context.Background() query := r.URL.Query() - replication := query.Get("replication") - if replication == "" { - replication = fs.option.DefaultReplication - } - collection := query.Get("collection") - if collection == "" { - collection = fs.option.Collection - } - dataCenter := query.Get("dataCenter") - if dataCenter == "" { - dataCenter = fs.option.DataCenter - } + so := fs.detectStorageOption0(r.RequestURI, + query.Get("collection"), + query.Get("replication"), + query.Get("ttl"), + query.Get("disk"), + query.Get("dataCenter"), + query.Get("rack"), + ) + + fs.autoChunk(ctx, w, r, contentLength, so) + util.CloseRequest(r) - if autoChunked := fs.autoChunk(ctx, w, r, replication, collection, dataCenter); autoChunked { - return - } +} - fileId, urlLocation, auth, err := fs.assignNewFileInfo(w, r, replication, collection, dataCenter) +// curl -X DELETE http://localhost:8888/path/to +// curl -X DELETE http://localhost:8888/path/to?recursive=true +// curl -X DELETE http://localhost:8888/path/to?recursive=true&ignoreRecursiveError=true +// curl -X DELETE http://localhost:8888/path/to?recursive=true&skipChunkDeletion=true +func (fs *FilerServer) DeleteHandler(w http.ResponseWriter, r *http.Request) { - if err != nil || fileId == "" || urlLocation == "" { - glog.V(0).Infof("fail to allocate volume for %s, collection:%s, datacenter:%s", r.URL.Path, collection, dataCenter) - return + isRecursive := r.FormValue("recursive") == "true" + if !isRecursive && fs.option.recursiveDelete { + if r.FormValue("recursive") != "false" { + isRecursive = true + } } + ignoreRecursiveError := r.FormValue("ignoreRecursiveError") == "true" + skipChunkDeletion := r.FormValue("skipChunkDeletion") == "true" - glog.V(4).Infof("write %s to %v", r.URL.Path, urlLocation) - - u, _ := url.Parse(urlLocation) - - // This allows a client to generate a chunk manifest and submit it to the filer -- it is a little off - // because they need to provide FIDs instead of file paths... - cm, _ := strconv.ParseBool(query.Get("cm")) - if cm { - q := u.Query() - q.Set("cm", "true") - u.RawQuery = q.Encode() + objectPath := r.URL.Path + if len(r.URL.Path) > 1 && strings.HasSuffix(objectPath, "/") { + objectPath = objectPath[0 : len(objectPath)-1] } - glog.V(4).Infoln("post to", u) - ret, err := fs.uploadToVolumeServer(r, u, auth, w, fileId) + err := fs.filer.DeleteEntryMetaAndData(context.Background(), util.FullPath(objectPath), isRecursive, ignoreRecursiveError, !skipChunkDeletion, false, nil) if err != nil { + glog.V(1).Infoln("deleting", objectPath, ":", err.Error()) + httpStatus := http.StatusInternalServerError + if err == filer_pb.ErrNotFound { + httpStatus = http.StatusNoContent + } + writeJsonError(w, r, httpStatus, err) return } - if err = fs.updateFilerStore(ctx, r, w, replication, collection, ret, fileId); err != nil { - return - } - - // send back post result - reply := FilerPostResult{ - Name: ret.Name, - Size: ret.Size, - Error: ret.Error, - Fid: fileId, - Url: urlLocation, - } - setEtag(w, ret.ETag) - writeJsonQuiet(w, r, http.StatusCreated, reply) + w.WriteHeader(http.StatusNoContent) } -// update metadata in filer store -func (fs *FilerServer) updateFilerStore(ctx context.Context, r *http.Request, w http.ResponseWriter, - replication string, collection string, ret operation.UploadResult, fileId string) (err error) { - - stats.FilerRequestCounter.WithLabelValues("postStoreWrite").Inc() - start := time.Now() - defer func() { - stats.FilerRequestHistogram.WithLabelValues("postStoreWrite").Observe(time.Since(start).Seconds()) - }() +func (fs *FilerServer) detectStorageOption(requestURI, qCollection, qReplication string, ttlSeconds int32, diskType string, dataCenter, rack string) *operation.StorageOption { + collection := util.Nvl(qCollection, fs.option.Collection) + replication := util.Nvl(qReplication, fs.option.DefaultReplication) - modeStr := r.URL.Query().Get("mode") - if modeStr == "" { - modeStr = "0660" + // required by buckets folder + bucketDefaultReplication, fsync := "", false + if strings.HasPrefix(requestURI, fs.filer.DirBucketsPath+"/") { + collection = fs.filer.DetectBucket(util.FullPath(requestURI)) + bucketDefaultReplication, fsync = fs.filer.ReadBucketOption(collection) } - mode, err := strconv.ParseUint(modeStr, 8, 32) - if err != nil { - glog.Errorf("Invalid mode format: %s, use 0660 by default", modeStr) - mode = 0660 - } - - path := r.URL.Path - if strings.HasSuffix(path, "/") { - if ret.Name != "" { - path += ret.Name - } - } - existingEntry, err := fs.filer.FindEntry(ctx, filer2.FullPath(path)) - crTime := time.Now() - if err == nil && existingEntry != nil { - crTime = existingEntry.Crtime - } - entry := &filer2.Entry{ - FullPath: filer2.FullPath(path), - Attr: filer2.Attr{ - Mtime: time.Now(), - Crtime: crTime, - Mode: os.FileMode(mode), - Uid: OS_UID, - Gid: OS_GID, - Replication: replication, - Collection: collection, - TtlSec: int32(util.ParseInt(r.URL.Query().Get("ttl"), 0)), - }, - Chunks: []*filer_pb.FileChunk{{ - FileId: fileId, - Size: uint64(ret.Size), - Mtime: time.Now().UnixNano(), - ETag: ret.ETag, - }}, - } - if ext := filenamePath.Ext(path); ext != "" { - entry.Attr.Mime = mime.TypeByExtension(ext) - } - // glog.V(4).Infof("saving %s => %+v", path, entry) - if dbErr := fs.filer.CreateEntry(ctx, entry); dbErr != nil { - fs.filer.DeleteChunks(entry.Chunks) - glog.V(0).Infof("failing to write %s to filer server : %v", path, dbErr) - writeJsonError(w, r, http.StatusInternalServerError, dbErr) - err = dbErr - return + if replication == "" { + replication = bucketDefaultReplication } - return nil -} - -// send request to volume server -func (fs *FilerServer) uploadToVolumeServer(r *http.Request, u *url.URL, auth security.EncodedJwt, w http.ResponseWriter, fileId string) (ret operation.UploadResult, err error) { - - stats.FilerRequestCounter.WithLabelValues("postUpload").Inc() - start := time.Now() - defer func() { stats.FilerRequestHistogram.WithLabelValues("postUpload").Observe(time.Since(start).Seconds()) }() + rule := fs.filer.FilerConf.MatchStorageRule(requestURI) - request := &http.Request{ - Method: r.Method, - URL: u, - Proto: r.Proto, - ProtoMajor: r.ProtoMajor, - ProtoMinor: r.ProtoMinor, - Header: r.Header, - Body: r.Body, - Host: r.Host, - ContentLength: r.ContentLength, - } - if auth != "" { - request.Header.Set("Authorization", "BEARER "+string(auth)) - } - resp, doErr := util.Do(request) - if doErr != nil { - glog.Errorf("failing to connect to volume server %s: %v, %+v", r.RequestURI, doErr, r.Method) - writeJsonError(w, r, http.StatusInternalServerError, doErr) - err = doErr - return - } - defer func() { - io.Copy(ioutil.Discard, resp.Body) - resp.Body.Close() - }() - etag := resp.Header.Get("ETag") - respBody, raErr := ioutil.ReadAll(resp.Body) - if raErr != nil { - glog.V(0).Infoln("failing to upload to volume server", r.RequestURI, raErr.Error()) - writeJsonError(w, r, http.StatusInternalServerError, raErr) - err = raErr - return - } - glog.V(4).Infoln("post result", string(respBody)) - unmarshalErr := json.Unmarshal(respBody, &ret) - if unmarshalErr != nil { - glog.V(0).Infoln("failing to read upload resonse", r.RequestURI, string(respBody)) - writeJsonError(w, r, http.StatusInternalServerError, unmarshalErr) - err = unmarshalErr - return - } - if ret.Error != "" { - err = errors.New(ret.Error) - glog.V(0).Infoln("failing to post to volume server", r.RequestURI, ret.Error) - writeJsonError(w, r, http.StatusInternalServerError, err) - return - } - // find correct final path - path := r.URL.Path - if strings.HasSuffix(path, "/") { - if ret.Name != "" { - path += ret.Name - } else { - err = fmt.Errorf("can not to write to folder %s without a file name", path) - fs.filer.DeleteFileByFileId(fileId) - glog.V(0).Infoln("Can not to write to folder", path, "without a file name!") - writeJsonError(w, r, http.StatusInternalServerError, err) - return + if ttlSeconds == 0 { + ttl, err := needle.ReadTTL(rule.GetTtl()) + if err != nil { + glog.Errorf("fail to parse %s ttl setting %s: %v", rule.LocationPrefix, rule.Ttl, err) } + ttlSeconds = int32(ttl.Minutes()) * 60 } - if etag != "" { - ret.ETag = etag + + return &operation.StorageOption{ + Replication: util.Nvl(replication, rule.Replication), + Collection: util.Nvl(collection, rule.Collection), + DataCenter: util.Nvl(dataCenter, fs.option.DataCenter), + Rack: util.Nvl(rack, fs.option.Rack), + TtlSeconds: ttlSeconds, + DiskType: util.Nvl(diskType, rule.DiskType), + Fsync: fsync || rule.Fsync, + VolumeGrowthCount: rule.VolumeGrowthCount, } - return } -// curl -X DELETE http://localhost:8888/path/to -// curl -X DELETE http://localhost:8888/path/to?recursive=true -// curl -X DELETE http://localhost:8888/path/to?recursive=true&ignoreRecursiveError=true -// curl -X DELETE http://localhost:8888/path/to?recursive=true&skipChunkDeletion=true -func (fs *FilerServer) DeleteHandler(w http.ResponseWriter, r *http.Request) { - - isRecursive := r.FormValue("recursive") == "true" - ignoreRecursiveError := r.FormValue("ignoreRecursiveError") == "true" - skipChunkDeletion := r.FormValue("skipChunkDeletion") == "true" +func (fs *FilerServer) detectStorageOption0(requestURI, qCollection, qReplication string, qTtl string, diskType string, dataCenter, rack string) *operation.StorageOption { - err := fs.filer.DeleteEntryMetaAndData(context.Background(), filer2.FullPath(r.URL.Path), isRecursive, ignoreRecursiveError, !skipChunkDeletion) + ttl, err := needle.ReadTTL(qTtl) if err != nil { - glog.V(1).Infoln("deleting", r.URL.Path, ":", err.Error()) - httpStatus := http.StatusInternalServerError - if err == filer2.ErrNotFound { - httpStatus = http.StatusNotFound - } - writeJsonError(w, r, httpStatus, err) - return + glog.Errorf("fail to parse ttl %s: %v", qTtl, err) } - w.WriteHeader(http.StatusNoContent) + return fs.detectStorageOption(requestURI, qCollection, qReplication, int32(ttl.Minutes())*60, diskType, dataCenter, rack) } |
