diff options
| author | Chris Lu <chris.lu@gmail.com> | 2018-07-21 10:39:02 -0700 |
|---|---|---|
| committer | Chris Lu <chris.lu@gmail.com> | 2018-07-21 10:39:02 -0700 |
| commit | 8480008a9a64ed8b922c786c70bc38e1f0353478 (patch) | |
| tree | 44529e24b43f2d5691b8e3f6b3028a82da8b6d07 /weed | |
| parent | 80d80daf64370d6a3d37afa6ce06258335ac856f (diff) | |
| download | seaweedfs-8480008a9a64ed8b922c786c70bc38e1f0353478.tar.xz seaweedfs-8480008a9a64ed8b922c786c70bc38e1f0353478.zip | |
add s3 upload, and removing mono and multi part upload analyzer
removing mono and multi part upload analyzer, which were used just to determine the file name
Diffstat (limited to 'weed')
| -rw-r--r-- | weed/operation/assign_file_id.go | 5 | ||||
| -rw-r--r-- | weed/s3api/s3api_errors.go | 8 | ||||
| -rw-r--r-- | weed/s3api/s3api_handlers.go | 12 | ||||
| -rw-r--r-- | weed/s3api/s3api_object_handlers.go | 90 | ||||
| -rw-r--r-- | weed/s3api/s3api_server.go | 4 | ||||
| -rw-r--r-- | weed/server/filer_server_handlers_write.go | 31 | ||||
| -rw-r--r-- | weed/server/filer_server_handlers_write_monopart.go | 18 | ||||
| -rw-r--r-- | weed/server/filer_server_handlers_write_multipart.go | 30 |
8 files changed, 139 insertions, 59 deletions
diff --git a/weed/operation/assign_file_id.go b/weed/operation/assign_file_id.go index c2e1e4444..169fd664d 100644 --- a/weed/operation/assign_file_id.go +++ b/weed/operation/assign_file_id.go @@ -59,8 +59,9 @@ func Assign(server string, primaryRequest *VolumeAssignRequest, alternativeReque values.Add("dataNode", request.DataNode) } - jsonBlob, err := util.Post("http://"+server+"/dir/assign", values) - glog.V(2).Infof("assign result from %s : %s", server, string(jsonBlob)) + postUrl := fmt.Sprintf("http://%s/dir/assign", server) + jsonBlob, err := util.Post(postUrl, values) + glog.V(2).Infof("assign %d result from %s %+v : %s", i, postUrl, values, string(jsonBlob)) if err != nil { return nil, err } diff --git a/weed/s3api/s3api_errors.go b/weed/s3api/s3api_errors.go index 8af024700..10b48c2c8 100644 --- a/weed/s3api/s3api_errors.go +++ b/weed/s3api/s3api_errors.go @@ -31,8 +31,9 @@ const ( ErrBucketNotEmpty ErrBucketAlreadyExists ErrBucketAlreadyOwnedByYou - ErrInvalidBucketName ErrNoSuchBucket + ErrInvalidBucketName + ErrInvalidDigest ErrInternalError ) @@ -64,6 +65,11 @@ var errorCodeResponse = map[ErrorCode]APIError{ Description: "The specified bucket is not valid.", HTTPStatusCode: http.StatusBadRequest, }, + ErrInvalidDigest: { + Code: "InvalidDigest", + Description: "The Content-Md5 you specified is not valid.", + HTTPStatusCode: http.StatusBadRequest, + }, ErrNoSuchBucket: { Code: "NoSuchBucket", Description: "The specified bucket does not exist", diff --git a/weed/s3api/s3api_handlers.go b/weed/s3api/s3api_handlers.go index 229b3a740..13dfc8d15 100644 --- a/weed/s3api/s3api_handlers.go +++ b/weed/s3api/s3api_handlers.go @@ -11,6 +11,7 @@ import ( "github.com/chrislusf/seaweedfs/weed/util" "bytes" "encoding/xml" + "encoding/base64" ) type mimeType string @@ -93,3 +94,14 @@ func writeSuccessResponseXML(w http.ResponseWriter, response []byte) { func writeSuccessResponseEmpty(w http.ResponseWriter) { writeResponse(w, http.StatusOK, nil, mimeNone) } + +func validateContentMd5(h http.Header) ([]byte, error) { + md5B64, ok := h["Content-Md5"] + if ok { + if md5B64[0] == "" { + return nil, fmt.Errorf("Content-Md5 header set to empty value") + } + return base64.StdEncoding.DecodeString(md5B64[0]) + } + return []byte{}, nil +} diff --git a/weed/s3api/s3api_object_handlers.go b/weed/s3api/s3api_object_handlers.go new file mode 100644 index 000000000..079509792 --- /dev/null +++ b/weed/s3api/s3api_object_handlers.go @@ -0,0 +1,90 @@ +package s3api + +import ( + "net/http" + "github.com/chrislusf/seaweedfs/weed/glog" + "fmt" + "github.com/gorilla/mux" + "io/ioutil" + "encoding/json" +) + +var ( + client *http.Client +) + +func init() { + client = &http.Client{Transport: &http.Transport{ + MaxIdleConnsPerHost: 1024, + }} +} + +type UploadResult struct { + Name string `json:"name,omitempty"` + Size uint32 `json:"size,omitempty"` + Error string `json:"error,omitempty"` +} + +func (s3a *S3ApiServer) PutObjectHandler(w http.ResponseWriter, r *http.Request) { + + // http://docs.aws.amazon.com/AmazonS3/latest/dev/UploadingObjects.html + + vars := mux.Vars(r) + bucket := vars["bucket"] + object := vars["object"] + + _, err := validateContentMd5(r.Header) + if err != nil { + writeErrorResponse(w, ErrInvalidDigest, r.URL) + return + } + + uploadUrl := fmt.Sprintf("http://%s%s/%s/%s?collection=%s", + s3a.option.Filer, s3a.option.BucketsPath, bucket, object, bucket) + proxyReq, err := http.NewRequest("PUT", uploadUrl, r.Body) + + if err != nil { + glog.Errorf("NewRequest %s: %v", uploadUrl, err) + writeErrorResponse(w, ErrInternalError, r.URL) + return + } + + proxyReq.Header.Set("Host", s3a.option.Filer) + proxyReq.Header.Set("X-Forwarded-For", r.RemoteAddr) + + for header, values := range r.Header { + for _, value := range values { + proxyReq.Header.Add(header, value) + } + } + + resp, postErr := client.Do(proxyReq) + + if postErr != nil { + glog.Errorf("post to filer: %v", postErr) + writeErrorResponse(w, ErrInternalError, r.URL) + return + } + defer resp.Body.Close() + + resp_body, ra_err := ioutil.ReadAll(resp.Body) + if ra_err != nil { + glog.Errorf("upload to filer response read: %v", ra_err) + writeErrorResponse(w, ErrInternalError, r.URL) + return + } + var ret UploadResult + unmarshal_err := json.Unmarshal(resp_body, &ret) + if unmarshal_err != nil { + glog.Errorf("failing to read upload to %s : %v", uploadUrl, string(resp_body)) + writeErrorResponse(w, ErrInternalError, r.URL) + return + } + if ret.Error != "" { + glog.Errorf("upload to filer error: %v", ret.Error) + writeErrorResponse(w, ErrInternalError, r.URL) + return + } + + writeSuccessResponseEmpty(w) +} diff --git a/weed/s3api/s3api_server.go b/weed/s3api/s3api_server.go index 8730f0b88..d8e6de1f7 100644 --- a/weed/s3api/s3api_server.go +++ b/weed/s3api/s3api_server.go @@ -42,6 +42,10 @@ func (s3a *S3ApiServer) registerRouter(router *mux.Router) { routers = append(routers, apiRouter.PathPrefix("/{bucket}").Subrouter()) for _, bucket := range routers { + + // PutObject + bucket.Methods("PUT").Path("/{object:.+}").HandlerFunc(s3a.PutObjectHandler) + // PutBucket bucket.Methods("PUT").HandlerFunc(s3a.PutBucketHandler) // DeleteBucket diff --git a/weed/server/filer_server_handlers_write.go b/weed/server/filer_server_handlers_write.go index 0c278f67c..a02fe04aa 100644 --- a/weed/server/filer_server_handlers_write.go +++ b/weed/server/filer_server_handlers_write.go @@ -70,9 +70,10 @@ func (fs *FilerServer) assignNewFileInfo(w http.ResponseWriter, r *http.Request, DataCenter: "", } } + assignResult, ae := operation.Assign(fs.filer.GetMaster(), ar, altRequest) if ae != nil { - glog.V(0).Infoln("failing to assign a file id", ae.Error()) + glog.Errorf("failing to assign a file id: %v", ae) writeJsonError(w, r, http.StatusInternalServerError, ae) err = ae return @@ -102,20 +103,24 @@ func (fs *FilerServer) PostHandler(w http.ResponseWriter, r *http.Request) { return } - var fileId, urlLocation string - var err error - + /* + var path string if strings.HasPrefix(r.Header.Get("Content-Type"), "multipart/form-data; boundary=") { - fileId, urlLocation, err = fs.multipartUploadAnalyzer(w, r, replication, collection, dataCenter) - if err != nil { - return - } + path, err = fs.multipartUploadAnalyzer(w, r, replication, collection, dataCenter) } else { - fileId, urlLocation, err = fs.monolithicUploadAnalyzer(w, r, replication, collection, dataCenter) - if err != nil || fileId == "" { - return - } + path, err = fs.monolithicUploadAnalyzer(w, r, replication, collection, dataCenter) + } + */ + + fileId, urlLocation, err := fs.queryFileInfoByPath(w, r, r.URL.Path) + if fileId, urlLocation, err = fs.queryFileInfoByPath(w, r, r.URL.Path); err == nil && fileId == "" { + fileId, urlLocation, err = fs.assignNewFileInfo(w, r, replication, collection, dataCenter) } + if err != nil || fileId == "" || urlLocation == "" { + return + } + + glog.V(0).Infof("request header %+v, urlLocation: %v", r.Header, urlLocation) u, _ := url.Parse(urlLocation) @@ -142,7 +147,7 @@ func (fs *FilerServer) PostHandler(w http.ResponseWriter, r *http.Request) { } resp, do_err := util.Do(request) if do_err != nil { - glog.V(0).Infoln("failing to connect to volume server", r.RequestURI, do_err.Error()) + glog.Errorf("failing to connect to volume server %s: %v, %+v", r.RequestURI, do_err, r.Method) writeJsonError(w, r, http.StatusInternalServerError, do_err) return } diff --git a/weed/server/filer_server_handlers_write_monopart.go b/weed/server/filer_server_handlers_write_monopart.go index 777d5bc43..ccafbe5c1 100644 --- a/weed/server/filer_server_handlers_write_monopart.go +++ b/weed/server/filer_server_handlers_write_monopart.go @@ -73,15 +73,8 @@ func checkContentMD5(w http.ResponseWriter, r *http.Request) (err error) { return } -func (fs *FilerServer) monolithicUploadAnalyzer(w http.ResponseWriter, r *http.Request, replication, collection string, dataCenter string) (fileId, urlLocation string, err error) { - /* - Amazon S3 ref link:[http://docs.aws.amazon.com/AmazonS3/latest/API/Welcome.html] - There is a long way to provide a completely compatibility against all Amazon S3 API, I just made - a simple data stream adapter between S3 PUT API and seaweedfs's volume storage Write API - 1. The request url format should be http://$host:$port/$bucketName/$objectName - 2. bucketName will be mapped to seaweedfs's collection name - 3. You could customize and make your enhancement. - */ +func (fs *FilerServer) monolithicUploadAnalyzer(w http.ResponseWriter, r *http.Request, replication, collection string, dataCenter string) (path string, err error) { + lastPos := strings.LastIndex(r.URL.Path, "/") if lastPos == -1 || lastPos == 0 || lastPos == len(r.URL.Path)-1 { glog.V(0).Infof("URL Path [%s] is invalid, could not retrieve file name", r.URL.Path) @@ -99,13 +92,8 @@ func (fs *FilerServer) monolithicUploadAnalyzer(w http.ResponseWriter, r *http.R return } - secondPos := strings.Index(r.URL.Path[1:], "/") + 1 - collection = r.URL.Path[1:secondPos] - path := r.URL.Path + path = r.URL.Path - if fileId, urlLocation, err = fs.queryFileInfoByPath(w, r, path); err == nil && fileId == "" { - fileId, urlLocation, err = fs.assignNewFileInfo(w, r, replication, collection, dataCenter) - } return } diff --git a/weed/server/filer_server_handlers_write_multipart.go b/weed/server/filer_server_handlers_write_multipart.go index 056317750..302b58e70 100644 --- a/weed/server/filer_server_handlers_write_multipart.go +++ b/weed/server/filer_server_handlers_write_multipart.go @@ -1,39 +1,13 @@ package weed_server import ( - "bytes" - "io/ioutil" "net/http" - "strings" - - "github.com/chrislusf/seaweedfs/weed/glog" - "github.com/chrislusf/seaweedfs/weed/storage" ) -func (fs *FilerServer) multipartUploadAnalyzer(w http.ResponseWriter, r *http.Request, replication, collection string, dataCenter string) (fileId, urlLocation string, err error) { +func (fs *FilerServer) multipartUploadAnalyzer(w http.ResponseWriter, r *http.Request, replication, collection string, dataCenter string) (path string, err error) { //Default handle way for http multipart if r.Method == "PUT" { - buf, _ := ioutil.ReadAll(r.Body) - r.Body = ioutil.NopCloser(bytes.NewBuffer(buf)) - fileName, _, _, _, _, _, _, _, pe := storage.ParseUpload(r) - if pe != nil { - glog.V(0).Infoln("failing to parse post body", pe.Error()) - writeJsonError(w, r, http.StatusInternalServerError, pe) - err = pe - return - } - //reconstruct http request body for following new request to volume server - r.Body = ioutil.NopCloser(bytes.NewBuffer(buf)) - - path := r.URL.Path - if strings.HasSuffix(path, "/") { - if fileName != "" { - path += fileName - } - } - fileId, urlLocation, err = fs.queryFileInfoByPath(w, r, path) - } else { - fileId, urlLocation, err = fs.assignNewFileInfo(w, r, replication, collection, dataCenter) + path = r.URL.Path } return } |
