aboutsummaryrefslogtreecommitdiff
path: root/weed/server/filer_server_handlers_write.go
diff options
context:
space:
mode:
Diffstat (limited to 'weed/server/filer_server_handlers_write.go')
-rw-r--r--weed/server/filer_server_handlers_write.go174
1 files changed, 107 insertions, 67 deletions
diff --git a/weed/server/filer_server_handlers_write.go b/weed/server/filer_server_handlers_write.go
index 91996dd5e..2c571684d 100644
--- a/weed/server/filer_server_handlers_write.go
+++ b/weed/server/filer_server_handlers_write.go
@@ -4,6 +4,7 @@ import (
"context"
"encoding/json"
"errors"
+ "fmt"
"io"
"io/ioutil"
"mime"
@@ -38,6 +39,11 @@ type FilerPostResult struct {
}
func (fs *FilerServer) assignNewFileInfo(w http.ResponseWriter, r *http.Request, replication, collection string, dataCenter string) (fileId, urlLocation string, auth security.EncodedJwt, err error) {
+
+ stats.FilerRequestCounter.WithLabelValues("assign").Inc()
+ start := time.Now()
+ defer func() { stats.FilerRequestHistogram.WithLabelValues("assign").Observe(time.Since(start).Seconds()) }()
+
ar := &operation.VolumeAssignRequest{
Count: 1,
Replication: replication,
@@ -116,67 +122,36 @@ func (fs *FilerServer) PostHandler(w http.ResponseWriter, r *http.Request) {
}
glog.V(4).Infoln("post to", u)
- // send request to volume server
- request := &http.Request{
- Method: r.Method,
- URL: u,
- Proto: r.Proto,
- ProtoMajor: r.ProtoMajor,
- ProtoMinor: r.ProtoMinor,
- Header: r.Header,
- Body: r.Body,
- Host: r.Host,
- ContentLength: r.ContentLength,
- }
- if auth != "" {
- request.Header.Set("Authorization", "BEARER "+string(auth))
- }
- resp, do_err := util.Do(request)
- if do_err != nil {
- glog.Errorf("failing to connect to volume server %s: %v, %+v", r.RequestURI, do_err, r.Method)
- writeJsonError(w, r, http.StatusInternalServerError, do_err)
- return
- }
- defer func() {
- io.Copy(ioutil.Discard, resp.Body)
- resp.Body.Close()
- }()
- etag := resp.Header.Get("ETag")
- resp_body, ra_err := ioutil.ReadAll(resp.Body)
- if ra_err != nil {
- glog.V(0).Infoln("failing to upload to volume server", r.RequestURI, ra_err.Error())
- writeJsonError(w, r, http.StatusInternalServerError, ra_err)
- return
- }
- glog.V(4).Infoln("post result", string(resp_body))
- var ret operation.UploadResult
- unmarshal_err := json.Unmarshal(resp_body, &ret)
- if unmarshal_err != nil {
- glog.V(0).Infoln("failing to read upload resonse", r.RequestURI, string(resp_body))
- writeJsonError(w, r, http.StatusInternalServerError, unmarshal_err)
+ ret, err := fs.uploadToVolumeServer(r, u, auth, w, fileId)
+ if err != nil {
return
}
- if ret.Error != "" {
- glog.V(0).Infoln("failing to post to volume server", r.RequestURI, ret.Error)
- writeJsonError(w, r, http.StatusInternalServerError, errors.New(ret.Error))
+
+ if err = fs.updateFilerStore(ctx, r, w, replication, collection, ret, fileId); err != nil {
return
}
- // find correct final path
- path := r.URL.Path
- if strings.HasSuffix(path, "/") {
- if ret.Name != "" {
- path += ret.Name
- } else {
- fs.filer.DeleteFileByFileId(fileId)
- glog.V(0).Infoln("Can not to write to folder", path, "without a file name!")
- writeJsonError(w, r, http.StatusInternalServerError,
- errors.New("Can not to write to folder "+path+" without a file name"))
- return
- }
+ // send back post result
+ reply := FilerPostResult{
+ Name: ret.Name,
+ Size: ret.Size,
+ Error: ret.Error,
+ Fid: fileId,
+ Url: urlLocation,
}
+ setEtag(w, ret.ETag)
+ writeJsonQuiet(w, r, http.StatusCreated, reply)
+}
+
+// update metadata in filer store
+func (fs *FilerServer) updateFilerStore(ctx context.Context, r *http.Request, w http.ResponseWriter,
+ replication string, collection string, ret operation.UploadResult, fileId string) (err error) {
+
+ stats.FilerRequestCounter.WithLabelValues("postStoreWrite").Inc()
+ start := time.Now()
+ defer func() { stats.FilerRequestHistogram.WithLabelValues("postStoreWrite").Observe(time.Since(start).Seconds()) }()
- // update metadata in filer store
+ path := r.URL.Path
existingEntry, err := fs.filer.FindEntry(ctx, filer2.FullPath(path))
crTime := time.Now()
if err == nil && existingEntry != nil {
@@ -203,30 +178,95 @@ func (fs *FilerServer) PostHandler(w http.ResponseWriter, r *http.Request) {
FileId: fileId,
Size: uint64(ret.Size),
Mtime: time.Now().UnixNano(),
- ETag: etag,
+ ETag: ret.ETag,
}},
}
if ext := filenamePath.Ext(path); ext != "" {
entry.Attr.Mime = mime.TypeByExtension(ext)
}
// glog.V(4).Infof("saving %s => %+v", path, entry)
- if db_err := fs.filer.CreateEntry(ctx, entry); db_err != nil {
+ if dbErr := fs.filer.CreateEntry(ctx, entry); dbErr != nil {
fs.filer.DeleteChunks(entry.FullPath, entry.Chunks)
- glog.V(0).Infof("failing to write %s to filer server : %v", path, db_err)
- writeJsonError(w, r, http.StatusInternalServerError, db_err)
+ glog.V(0).Infof("failing to write %s to filer server : %v", path, dbErr)
+ writeJsonError(w, r, http.StatusInternalServerError, dbErr)
+ err = dbErr
return
}
- // send back post result
- reply := FilerPostResult{
- Name: ret.Name,
- Size: ret.Size,
- Error: ret.Error,
- Fid: fileId,
- Url: urlLocation,
+ return nil
+}
+
+// send request to volume server
+func (fs *FilerServer) uploadToVolumeServer(r *http.Request, u *url.URL, auth security.EncodedJwt, w http.ResponseWriter, fileId string) (ret operation.UploadResult, err error) {
+
+ stats.FilerRequestCounter.WithLabelValues("postUpload").Inc()
+ start := time.Now()
+ defer func() { stats.FilerRequestHistogram.WithLabelValues("postUpload").Observe(time.Since(start).Seconds()) }()
+
+ request := &http.Request{
+ Method: r.Method,
+ URL: u,
+ Proto: r.Proto,
+ ProtoMajor: r.ProtoMajor,
+ ProtoMinor: r.ProtoMinor,
+ Header: r.Header,
+ Body: r.Body,
+ Host: r.Host,
+ ContentLength: r.ContentLength,
}
- setEtag(w, etag)
- writeJsonQuiet(w, r, http.StatusCreated, reply)
+ if auth != "" {
+ request.Header.Set("Authorization", "BEARER "+string(auth))
+ }
+ resp, doErr := util.Do(request)
+ if doErr != nil {
+ glog.Errorf("failing to connect to volume server %s: %v, %+v", r.RequestURI, doErr, r.Method)
+ writeJsonError(w, r, http.StatusInternalServerError, doErr)
+ err = doErr
+ return
+ }
+ defer func() {
+ io.Copy(ioutil.Discard, resp.Body)
+ resp.Body.Close()
+ }()
+ etag := resp.Header.Get("ETag")
+ respBody, raErr := ioutil.ReadAll(resp.Body)
+ if raErr != nil {
+ glog.V(0).Infoln("failing to upload to volume server", r.RequestURI, raErr.Error())
+ writeJsonError(w, r, http.StatusInternalServerError, raErr)
+ err = raErr
+ return
+ }
+ glog.V(4).Infoln("post result", string(respBody))
+ unmarshalErr := json.Unmarshal(respBody, &ret)
+ if unmarshalErr != nil {
+ glog.V(0).Infoln("failing to read upload resonse", r.RequestURI, string(respBody))
+ writeJsonError(w, r, http.StatusInternalServerError, unmarshalErr)
+ err = unmarshalErr
+ return
+ }
+ if ret.Error != "" {
+ err = errors.New(ret.Error)
+ glog.V(0).Infoln("failing to post to volume server", r.RequestURI, ret.Error)
+ writeJsonError(w, r, http.StatusInternalServerError, err)
+ return
+ }
+ // find correct final path
+ path := r.URL.Path
+ if strings.HasSuffix(path, "/") {
+ if ret.Name != "" {
+ path += ret.Name
+ } else {
+ err = fmt.Errorf("can not to write to folder %s without a file name", path)
+ fs.filer.DeleteFileByFileId(fileId)
+ glog.V(0).Infoln("Can not to write to folder", path, "without a file name!")
+ writeJsonError(w, r, http.StatusInternalServerError, err)
+ return
+ }
+ }
+ if etag != "" {
+ ret.ETag = etag
+ }
+ return
}
// curl -X DELETE http://localhost:8888/path/to