diff options
| author | HongyanShen <763987993@qq.com> | 2020-03-11 12:55:24 +0800 |
|---|---|---|
| committer | GitHub <noreply@github.com> | 2020-03-11 12:55:24 +0800 |
| commit | 03529fc0c29072f6f26e11ffbd7229cf92dc71ce (patch) | |
| tree | ed8833386a712c850dcef0815509774681a6ab56 /weed/server/filer_server_handlers_write.go | |
| parent | 0fca1ae776783b37481549df40f477b7d9248d3c (diff) | |
| parent | 60f5f05c78a2918d5219c925cea5847759281a2c (diff) | |
| download | seaweedfs-03529fc0c29072f6f26e11ffbd7229cf92dc71ce.tar.xz seaweedfs-03529fc0c29072f6f26e11ffbd7229cf92dc71ce.zip | |
Merge pull request #1 from chrislusf/master
sync
Diffstat (limited to 'weed/server/filer_server_handlers_write.go')
| -rw-r--r-- | weed/server/filer_server_handlers_write.go | 146 |
1 files changed, 104 insertions, 42 deletions
diff --git a/weed/server/filer_server_handlers_write.go b/weed/server/filer_server_handlers_write.go index 5d95a5d7e..5cd174b17 100644 --- a/weed/server/filer_server_handlers_write.go +++ b/weed/server/filer_server_handlers_write.go @@ -2,6 +2,7 @@ package weed_server import ( "context" + "crypto/md5" "encoding/json" "errors" "fmt" @@ -22,6 +23,7 @@ import ( "github.com/chrislusf/seaweedfs/weed/pb/filer_pb" "github.com/chrislusf/seaweedfs/weed/security" "github.com/chrislusf/seaweedfs/weed/stats" + "github.com/chrislusf/seaweedfs/weed/storage/needle" "github.com/chrislusf/seaweedfs/weed/util" ) @@ -32,13 +34,13 @@ var ( type FilerPostResult struct { Name string `json:"name,omitempty"` - Size uint32 `json:"size,omitempty"` + Size int64 `json:"size,omitempty"` Error string `json:"error,omitempty"` Fid string `json:"fid,omitempty"` Url string `json:"url,omitempty"` } -func (fs *FilerServer) assignNewFileInfo(w http.ResponseWriter, r *http.Request, replication, collection string, dataCenter string) (fileId, urlLocation string, auth security.EncodedJwt, err error) { +func (fs *FilerServer) assignNewFileInfo(w http.ResponseWriter, r *http.Request, replication, collection, dataCenter, ttlString string) (fileId, urlLocation string, auth security.EncodedJwt, err error) { stats.FilerRequestCounter.WithLabelValues("assign").Inc() start := time.Now() @@ -48,7 +50,7 @@ func (fs *FilerServer) assignNewFileInfo(w http.ResponseWriter, r *http.Request, Count: 1, Replication: replication, Collection: collection, - Ttl: r.URL.Query().Get("ttl"), + Ttl: ttlString, DataCenter: dataCenter, } var altRequest *operation.VolumeAssignRequest @@ -57,7 +59,7 @@ func (fs *FilerServer) assignNewFileInfo(w http.ResponseWriter, r *http.Request, Count: 1, Replication: replication, Collection: collection, - Ttl: r.URL.Query().Get("ttl"), + Ttl: ttlString, DataCenter: "", } } @@ -80,57 +82,59 @@ func (fs *FilerServer) PostHandler(w http.ResponseWriter, r *http.Request) { ctx := context.Background() query := r.URL.Query() - replication := query.Get("replication") - if replication == "" { - replication = fs.option.DefaultReplication - } - collection := query.Get("collection") - if collection == "" { - collection = fs.option.Collection - } + collection, replication := fs.detectCollection(r.RequestURI, query.Get("collection"), query.Get("replication")) dataCenter := query.Get("dataCenter") if dataCenter == "" { dataCenter = fs.option.DataCenter } + ttlString := r.URL.Query().Get("ttl") + + // read ttl in seconds + ttl, err := needle.ReadTTL(ttlString) + ttlSeconds := int32(0) + if err == nil { + ttlSeconds = int32(ttl.Minutes()) * 60 + } + + if autoChunked := fs.autoChunk(ctx, w, r, replication, collection, dataCenter, ttlSeconds, ttlString); autoChunked { + return + } + + if fs.option.Cipher { + reply, err := fs.encrypt(ctx, w, r, replication, collection, dataCenter, ttlSeconds, ttlString) + if err != nil { + writeJsonError(w, r, http.StatusInternalServerError, err) + } else if reply != nil { + writeJsonQuiet(w, r, http.StatusCreated, reply) + } - if autoChunked := fs.autoChunk(ctx, w, r, replication, collection, dataCenter); autoChunked { return } - fileId, urlLocation, auth, err := fs.assignNewFileInfo(w, r, replication, collection, dataCenter) + fileId, urlLocation, auth, err := fs.assignNewFileInfo(w, r, replication, collection, dataCenter, ttlString) if err != nil || fileId == "" || urlLocation == "" { glog.V(0).Infof("fail to allocate volume for %s, collection:%s, datacenter:%s", r.URL.Path, collection, dataCenter) + writeJsonError(w, r, http.StatusInternalServerError, fmt.Errorf("fail to allocate volume for %s, collection:%s, datacenter:%s", r.URL.Path, collection, dataCenter)) return } glog.V(4).Infof("write %s to %v", r.URL.Path, urlLocation) u, _ := url.Parse(urlLocation) - - // This allows a client to generate a chunk manifest and submit it to the filer -- it is a little off - // because they need to provide FIDs instead of file paths... - cm, _ := strconv.ParseBool(query.Get("cm")) - if cm { - q := u.Query() - q.Set("cm", "true") - u.RawQuery = q.Encode() - } - glog.V(4).Infoln("post to", u) - ret, err := fs.uploadToVolumeServer(r, u, auth, w, fileId) if err != nil { return } - if err = fs.updateFilerStore(ctx, r, w, replication, collection, ret, fileId); err != nil { + if err = fs.updateFilerStore(ctx, r, w, replication, collection, ret, fileId, ttlSeconds); err != nil { return } // send back post result reply := FilerPostResult{ Name: ret.Name, - Size: ret.Size, + Size: int64(ret.Size), Error: ret.Error, Fid: fileId, Url: urlLocation, @@ -141,7 +145,7 @@ func (fs *FilerServer) PostHandler(w http.ResponseWriter, r *http.Request) { // update metadata in filer store func (fs *FilerServer) updateFilerStore(ctx context.Context, r *http.Request, w http.ResponseWriter, - replication string, collection string, ret operation.UploadResult, fileId string) (err error) { + replication string, collection string, ret *operation.UploadResult, fileId string, ttlSeconds int32) (err error) { stats.FilerRequestCounter.WithLabelValues("postStoreWrite").Inc() start := time.Now() @@ -149,6 +153,16 @@ func (fs *FilerServer) updateFilerStore(ctx context.Context, r *http.Request, w stats.FilerRequestHistogram.WithLabelValues("postStoreWrite").Observe(time.Since(start).Seconds()) }() + modeStr := r.URL.Query().Get("mode") + if modeStr == "" { + modeStr = "0660" + } + mode, err := strconv.ParseUint(modeStr, 8, 32) + if err != nil { + glog.Errorf("Invalid mode format: %s, use 0660 by default", modeStr) + mode = 0660 + } + path := r.URL.Path if strings.HasSuffix(path, "/") { if ret.Name != "" { @@ -165,12 +179,13 @@ func (fs *FilerServer) updateFilerStore(ctx context.Context, r *http.Request, w Attr: filer2.Attr{ Mtime: time.Now(), Crtime: crTime, - Mode: 0660, + Mode: os.FileMode(mode), Uid: OS_UID, Gid: OS_GID, Replication: replication, Collection: collection, - TtlSec: int32(util.ParseInt(r.URL.Query().Get("ttl"), 0)), + TtlSec: ttlSeconds, + Mime: ret.Mime, }, Chunks: []*filer_pb.FileChunk{{ FileId: fileId, @@ -179,12 +194,14 @@ func (fs *FilerServer) updateFilerStore(ctx context.Context, r *http.Request, w ETag: ret.ETag, }}, } - if ext := filenamePath.Ext(path); ext != "" { - entry.Attr.Mime = mime.TypeByExtension(ext) + if entry.Attr.Mime == "" { + if ext := filenamePath.Ext(path); ext != "" { + entry.Attr.Mime = mime.TypeByExtension(ext) + } } // glog.V(4).Infof("saving %s => %+v", path, entry) - if dbErr := fs.filer.CreateEntry(ctx, entry); dbErr != nil { - fs.filer.DeleteChunks(entry.FullPath, entry.Chunks) + if dbErr := fs.filer.CreateEntry(ctx, entry, false); dbErr != nil { + fs.filer.DeleteChunks(entry.Chunks) glog.V(0).Infof("failing to write %s to filer server : %v", path, dbErr) writeJsonError(w, r, http.StatusInternalServerError, dbErr) err = dbErr @@ -195,12 +212,16 @@ func (fs *FilerServer) updateFilerStore(ctx context.Context, r *http.Request, w } // send request to volume server -func (fs *FilerServer) uploadToVolumeServer(r *http.Request, u *url.URL, auth security.EncodedJwt, w http.ResponseWriter, fileId string) (ret operation.UploadResult, err error) { +func (fs *FilerServer) uploadToVolumeServer(r *http.Request, u *url.URL, auth security.EncodedJwt, w http.ResponseWriter, fileId string) (ret *operation.UploadResult, err error) { stats.FilerRequestCounter.WithLabelValues("postUpload").Inc() start := time.Now() defer func() { stats.FilerRequestHistogram.WithLabelValues("postUpload").Observe(time.Since(start).Seconds()) }() + ret = &operation.UploadResult{} + hash := md5.New() + var body = ioutil.NopCloser(io.TeeReader(r.Body, hash)) + request := &http.Request{ Method: r.Method, URL: u, @@ -208,10 +229,11 @@ func (fs *FilerServer) uploadToVolumeServer(r *http.Request, u *url.URL, auth se ProtoMajor: r.ProtoMajor, ProtoMinor: r.ProtoMinor, Header: r.Header, - Body: r.Body, + Body: body, Host: r.Host, ContentLength: r.ContentLength, } + if auth != "" { request.Header.Set("Authorization", "BEARER "+string(auth)) } @@ -226,7 +248,7 @@ func (fs *FilerServer) uploadToVolumeServer(r *http.Request, u *url.URL, auth se io.Copy(ioutil.Discard, resp.Body) resp.Body.Close() }() - etag := resp.Header.Get("ETag") + respBody, raErr := ioutil.ReadAll(resp.Body) if raErr != nil { glog.V(0).Infoln("failing to upload to volume server", r.RequestURI, raErr.Error()) @@ -234,6 +256,7 @@ func (fs *FilerServer) uploadToVolumeServer(r *http.Request, u *url.URL, auth se err = raErr return } + glog.V(4).Infoln("post result", string(respBody)) unmarshalErr := json.Unmarshal(respBody, &ret) if unmarshalErr != nil { @@ -261,26 +284,65 @@ func (fs *FilerServer) uploadToVolumeServer(r *http.Request, u *url.URL, auth se return } } - if etag != "" { - ret.ETag = etag - } + // use filer calculated md5 ETag, instead of the volume server crc ETag + ret.ETag = fmt.Sprintf("%x", hash.Sum(nil)) return } // curl -X DELETE http://localhost:8888/path/to // curl -X DELETE http://localhost:8888/path/to?recursive=true // curl -X DELETE http://localhost:8888/path/to?recursive=true&ignoreRecursiveError=true +// curl -X DELETE http://localhost:8888/path/to?recursive=true&skipChunkDeletion=true func (fs *FilerServer) DeleteHandler(w http.ResponseWriter, r *http.Request) { isRecursive := r.FormValue("recursive") == "true" + if !isRecursive && fs.option.recursiveDelete { + if r.FormValue("recursive") != "false" { + isRecursive = true + } + } ignoreRecursiveError := r.FormValue("ignoreRecursiveError") == "true" + skipChunkDeletion := r.FormValue("skipChunkDeletion") == "true" - err := fs.filer.DeleteEntryMetaAndData(context.Background(), filer2.FullPath(r.URL.Path), isRecursive, ignoreRecursiveError, true) + err := fs.filer.DeleteEntryMetaAndData(context.Background(), filer2.FullPath(r.URL.Path), isRecursive, ignoreRecursiveError, !skipChunkDeletion) if err != nil { glog.V(1).Infoln("deleting", r.URL.Path, ":", err.Error()) - writeJsonError(w, r, http.StatusInternalServerError, err) + httpStatus := http.StatusInternalServerError + if err == filer_pb.ErrNotFound { + httpStatus = http.StatusNotFound + } + writeJsonError(w, r, httpStatus, err) return } w.WriteHeader(http.StatusNoContent) } + +func (fs *FilerServer) detectCollection(requestURI, qCollection, qReplication string) (collection, replication string) { + // default + collection = fs.option.Collection + replication = fs.option.DefaultReplication + + // get default collection settings + if qCollection != "" { + collection = qCollection + } + if qReplication != "" { + replication = qReplication + } + + // required by buckets folder + if strings.HasPrefix(requestURI, fs.filer.DirBucketsPath+"/") { + bucketAndObjectKey := requestURI[len(fs.filer.DirBucketsPath)+1:] + t := strings.Index(bucketAndObjectKey, "/") + if t < 0 { + collection = bucketAndObjectKey + } + if t > 0 { + collection = bucketAndObjectKey[:t] + } + replication = fs.filer.ReadBucketOption(collection) + } + + return +} |
