diff options
Diffstat (limited to 'weed/server/filer_server_handlers_write.go')
| -rw-r--r-- | weed/server/filer_server_handlers_write.go | 174 |
1 files changed, 107 insertions, 67 deletions
diff --git a/weed/server/filer_server_handlers_write.go b/weed/server/filer_server_handlers_write.go index 91996dd5e..2c571684d 100644 --- a/weed/server/filer_server_handlers_write.go +++ b/weed/server/filer_server_handlers_write.go @@ -4,6 +4,7 @@ import ( "context" "encoding/json" "errors" + "fmt" "io" "io/ioutil" "mime" @@ -38,6 +39,11 @@ type FilerPostResult struct { } func (fs *FilerServer) assignNewFileInfo(w http.ResponseWriter, r *http.Request, replication, collection string, dataCenter string) (fileId, urlLocation string, auth security.EncodedJwt, err error) { + + stats.FilerRequestCounter.WithLabelValues("assign").Inc() + start := time.Now() + defer func() { stats.FilerRequestHistogram.WithLabelValues("assign").Observe(time.Since(start).Seconds()) }() + ar := &operation.VolumeAssignRequest{ Count: 1, Replication: replication, @@ -116,67 +122,36 @@ func (fs *FilerServer) PostHandler(w http.ResponseWriter, r *http.Request) { } glog.V(4).Infoln("post to", u) - // send request to volume server - request := &http.Request{ - Method: r.Method, - URL: u, - Proto: r.Proto, - ProtoMajor: r.ProtoMajor, - ProtoMinor: r.ProtoMinor, - Header: r.Header, - Body: r.Body, - Host: r.Host, - ContentLength: r.ContentLength, - } - if auth != "" { - request.Header.Set("Authorization", "BEARER "+string(auth)) - } - resp, do_err := util.Do(request) - if do_err != nil { - glog.Errorf("failing to connect to volume server %s: %v, %+v", r.RequestURI, do_err, r.Method) - writeJsonError(w, r, http.StatusInternalServerError, do_err) - return - } - defer func() { - io.Copy(ioutil.Discard, resp.Body) - resp.Body.Close() - }() - etag := resp.Header.Get("ETag") - resp_body, ra_err := ioutil.ReadAll(resp.Body) - if ra_err != nil { - glog.V(0).Infoln("failing to upload to volume server", r.RequestURI, ra_err.Error()) - writeJsonError(w, r, http.StatusInternalServerError, ra_err) - return - } - glog.V(4).Infoln("post result", string(resp_body)) - var ret operation.UploadResult - unmarshal_err := json.Unmarshal(resp_body, &ret) - if unmarshal_err != nil { - glog.V(0).Infoln("failing to read upload resonse", r.RequestURI, string(resp_body)) - writeJsonError(w, r, http.StatusInternalServerError, unmarshal_err) + ret, err := fs.uploadToVolumeServer(r, u, auth, w, fileId) + if err != nil { return } - if ret.Error != "" { - glog.V(0).Infoln("failing to post to volume server", r.RequestURI, ret.Error) - writeJsonError(w, r, http.StatusInternalServerError, errors.New(ret.Error)) + + if err = fs.updateFilerStore(ctx, r, w, replication, collection, ret, fileId); err != nil { return } - // find correct final path - path := r.URL.Path - if strings.HasSuffix(path, "/") { - if ret.Name != "" { - path += ret.Name - } else { - fs.filer.DeleteFileByFileId(fileId) - glog.V(0).Infoln("Can not to write to folder", path, "without a file name!") - writeJsonError(w, r, http.StatusInternalServerError, - errors.New("Can not to write to folder "+path+" without a file name")) - return - } + // send back post result + reply := FilerPostResult{ + Name: ret.Name, + Size: ret.Size, + Error: ret.Error, + Fid: fileId, + Url: urlLocation, } + setEtag(w, ret.ETag) + writeJsonQuiet(w, r, http.StatusCreated, reply) +} + +// update metadata in filer store +func (fs *FilerServer) updateFilerStore(ctx context.Context, r *http.Request, w http.ResponseWriter, + replication string, collection string, ret operation.UploadResult, fileId string) (err error) { + + stats.FilerRequestCounter.WithLabelValues("postStoreWrite").Inc() + start := time.Now() + defer func() { stats.FilerRequestHistogram.WithLabelValues("postStoreWrite").Observe(time.Since(start).Seconds()) }() - // update metadata in filer store + path := r.URL.Path existingEntry, err := fs.filer.FindEntry(ctx, filer2.FullPath(path)) crTime := time.Now() if err == nil && existingEntry != nil { @@ -203,30 +178,95 @@ func (fs *FilerServer) PostHandler(w http.ResponseWriter, r *http.Request) { FileId: fileId, Size: uint64(ret.Size), Mtime: time.Now().UnixNano(), - ETag: etag, + ETag: ret.ETag, }}, } if ext := filenamePath.Ext(path); ext != "" { entry.Attr.Mime = mime.TypeByExtension(ext) } // glog.V(4).Infof("saving %s => %+v", path, entry) - if db_err := fs.filer.CreateEntry(ctx, entry); db_err != nil { + if dbErr := fs.filer.CreateEntry(ctx, entry); dbErr != nil { fs.filer.DeleteChunks(entry.FullPath, entry.Chunks) - glog.V(0).Infof("failing to write %s to filer server : %v", path, db_err) - writeJsonError(w, r, http.StatusInternalServerError, db_err) + glog.V(0).Infof("failing to write %s to filer server : %v", path, dbErr) + writeJsonError(w, r, http.StatusInternalServerError, dbErr) + err = dbErr return } - // send back post result - reply := FilerPostResult{ - Name: ret.Name, - Size: ret.Size, - Error: ret.Error, - Fid: fileId, - Url: urlLocation, + return nil +} + +// send request to volume server +func (fs *FilerServer) uploadToVolumeServer(r *http.Request, u *url.URL, auth security.EncodedJwt, w http.ResponseWriter, fileId string) (ret operation.UploadResult, err error) { + + stats.FilerRequestCounter.WithLabelValues("postUpload").Inc() + start := time.Now() + defer func() { stats.FilerRequestHistogram.WithLabelValues("postUpload").Observe(time.Since(start).Seconds()) }() + + request := &http.Request{ + Method: r.Method, + URL: u, + Proto: r.Proto, + ProtoMajor: r.ProtoMajor, + ProtoMinor: r.ProtoMinor, + Header: r.Header, + Body: r.Body, + Host: r.Host, + ContentLength: r.ContentLength, } - setEtag(w, etag) - writeJsonQuiet(w, r, http.StatusCreated, reply) + if auth != "" { + request.Header.Set("Authorization", "BEARER "+string(auth)) + } + resp, doErr := util.Do(request) + if doErr != nil { + glog.Errorf("failing to connect to volume server %s: %v, %+v", r.RequestURI, doErr, r.Method) + writeJsonError(w, r, http.StatusInternalServerError, doErr) + err = doErr + return + } + defer func() { + io.Copy(ioutil.Discard, resp.Body) + resp.Body.Close() + }() + etag := resp.Header.Get("ETag") + respBody, raErr := ioutil.ReadAll(resp.Body) + if raErr != nil { + glog.V(0).Infoln("failing to upload to volume server", r.RequestURI, raErr.Error()) + writeJsonError(w, r, http.StatusInternalServerError, raErr) + err = raErr + return + } + glog.V(4).Infoln("post result", string(respBody)) + unmarshalErr := json.Unmarshal(respBody, &ret) + if unmarshalErr != nil { + glog.V(0).Infoln("failing to read upload resonse", r.RequestURI, string(respBody)) + writeJsonError(w, r, http.StatusInternalServerError, unmarshalErr) + err = unmarshalErr + return + } + if ret.Error != "" { + err = errors.New(ret.Error) + glog.V(0).Infoln("failing to post to volume server", r.RequestURI, ret.Error) + writeJsonError(w, r, http.StatusInternalServerError, err) + return + } + // find correct final path + path := r.URL.Path + if strings.HasSuffix(path, "/") { + if ret.Name != "" { + path += ret.Name + } else { + err = fmt.Errorf("can not to write to folder %s without a file name", path) + fs.filer.DeleteFileByFileId(fileId) + glog.V(0).Infoln("Can not to write to folder", path, "without a file name!") + writeJsonError(w, r, http.StatusInternalServerError, err) + return + } + } + if etag != "" { + ret.ETag = etag + } + return } // curl -X DELETE http://localhost:8888/path/to |
