aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--weed/operation/assign_file_id.go5
-rw-r--r--weed/s3api/s3api_errors.go8
-rw-r--r--weed/s3api/s3api_handlers.go12
-rw-r--r--weed/s3api/s3api_object_handlers.go90
-rw-r--r--weed/s3api/s3api_server.go4
-rw-r--r--weed/server/filer_server_handlers_write.go31
-rw-r--r--weed/server/filer_server_handlers_write_monopart.go18
-rw-r--r--weed/server/filer_server_handlers_write_multipart.go30
8 files changed, 139 insertions, 59 deletions
diff --git a/weed/operation/assign_file_id.go b/weed/operation/assign_file_id.go
index c2e1e4444..169fd664d 100644
--- a/weed/operation/assign_file_id.go
+++ b/weed/operation/assign_file_id.go
@@ -59,8 +59,9 @@ func Assign(server string, primaryRequest *VolumeAssignRequest, alternativeReque
values.Add("dataNode", request.DataNode)
}
- jsonBlob, err := util.Post("http://"+server+"/dir/assign", values)
- glog.V(2).Infof("assign result from %s : %s", server, string(jsonBlob))
+ postUrl := fmt.Sprintf("http://%s/dir/assign", server)
+ jsonBlob, err := util.Post(postUrl, values)
+ glog.V(2).Infof("assign %d result from %s %+v : %s", i, postUrl, values, string(jsonBlob))
if err != nil {
return nil, err
}
diff --git a/weed/s3api/s3api_errors.go b/weed/s3api/s3api_errors.go
index 8af024700..10b48c2c8 100644
--- a/weed/s3api/s3api_errors.go
+++ b/weed/s3api/s3api_errors.go
@@ -31,8 +31,9 @@ const (
ErrBucketNotEmpty
ErrBucketAlreadyExists
ErrBucketAlreadyOwnedByYou
- ErrInvalidBucketName
ErrNoSuchBucket
+ ErrInvalidBucketName
+ ErrInvalidDigest
ErrInternalError
)
@@ -64,6 +65,11 @@ var errorCodeResponse = map[ErrorCode]APIError{
Description: "The specified bucket is not valid.",
HTTPStatusCode: http.StatusBadRequest,
},
+ ErrInvalidDigest: {
+ Code: "InvalidDigest",
+ Description: "The Content-Md5 you specified is not valid.",
+ HTTPStatusCode: http.StatusBadRequest,
+ },
ErrNoSuchBucket: {
Code: "NoSuchBucket",
Description: "The specified bucket does not exist",
diff --git a/weed/s3api/s3api_handlers.go b/weed/s3api/s3api_handlers.go
index 229b3a740..13dfc8d15 100644
--- a/weed/s3api/s3api_handlers.go
+++ b/weed/s3api/s3api_handlers.go
@@ -11,6 +11,7 @@ import (
"github.com/chrislusf/seaweedfs/weed/util"
"bytes"
"encoding/xml"
+ "encoding/base64"
)
type mimeType string
@@ -93,3 +94,14 @@ func writeSuccessResponseXML(w http.ResponseWriter, response []byte) {
func writeSuccessResponseEmpty(w http.ResponseWriter) {
writeResponse(w, http.StatusOK, nil, mimeNone)
}
+
+func validateContentMd5(h http.Header) ([]byte, error) {
+ md5B64, ok := h["Content-Md5"]
+ if ok {
+ if md5B64[0] == "" {
+ return nil, fmt.Errorf("Content-Md5 header set to empty value")
+ }
+ return base64.StdEncoding.DecodeString(md5B64[0])
+ }
+ return []byte{}, nil
+}
diff --git a/weed/s3api/s3api_object_handlers.go b/weed/s3api/s3api_object_handlers.go
new file mode 100644
index 000000000..079509792
--- /dev/null
+++ b/weed/s3api/s3api_object_handlers.go
@@ -0,0 +1,90 @@
+package s3api
+
+import (
+ "net/http"
+ "github.com/chrislusf/seaweedfs/weed/glog"
+ "fmt"
+ "github.com/gorilla/mux"
+ "io/ioutil"
+ "encoding/json"
+)
+
+var (
+ client *http.Client
+)
+
+func init() {
+ client = &http.Client{Transport: &http.Transport{
+ MaxIdleConnsPerHost: 1024,
+ }}
+}
+
+type UploadResult struct {
+ Name string `json:"name,omitempty"`
+ Size uint32 `json:"size,omitempty"`
+ Error string `json:"error,omitempty"`
+}
+
+func (s3a *S3ApiServer) PutObjectHandler(w http.ResponseWriter, r *http.Request) {
+
+ // http://docs.aws.amazon.com/AmazonS3/latest/dev/UploadingObjects.html
+
+ vars := mux.Vars(r)
+ bucket := vars["bucket"]
+ object := vars["object"]
+
+ _, err := validateContentMd5(r.Header)
+ if err != nil {
+ writeErrorResponse(w, ErrInvalidDigest, r.URL)
+ return
+ }
+
+ uploadUrl := fmt.Sprintf("http://%s%s/%s/%s?collection=%s",
+ s3a.option.Filer, s3a.option.BucketsPath, bucket, object, bucket)
+ proxyReq, err := http.NewRequest("PUT", uploadUrl, r.Body)
+
+ if err != nil {
+ glog.Errorf("NewRequest %s: %v", uploadUrl, err)
+ writeErrorResponse(w, ErrInternalError, r.URL)
+ return
+ }
+
+ proxyReq.Header.Set("Host", s3a.option.Filer)
+ proxyReq.Header.Set("X-Forwarded-For", r.RemoteAddr)
+
+ for header, values := range r.Header {
+ for _, value := range values {
+ proxyReq.Header.Add(header, value)
+ }
+ }
+
+ resp, postErr := client.Do(proxyReq)
+
+ if postErr != nil {
+ glog.Errorf("post to filer: %v", postErr)
+ writeErrorResponse(w, ErrInternalError, r.URL)
+ return
+ }
+ defer resp.Body.Close()
+
+ resp_body, ra_err := ioutil.ReadAll(resp.Body)
+ if ra_err != nil {
+ glog.Errorf("upload to filer response read: %v", ra_err)
+ writeErrorResponse(w, ErrInternalError, r.URL)
+ return
+ }
+ var ret UploadResult
+ unmarshal_err := json.Unmarshal(resp_body, &ret)
+ if unmarshal_err != nil {
+ glog.Errorf("failing to read upload to %s : %v", uploadUrl, string(resp_body))
+ writeErrorResponse(w, ErrInternalError, r.URL)
+ return
+ }
+ if ret.Error != "" {
+ glog.Errorf("upload to filer error: %v", ret.Error)
+ writeErrorResponse(w, ErrInternalError, r.URL)
+ return
+ }
+
+ writeSuccessResponseEmpty(w)
+}
diff --git a/weed/s3api/s3api_server.go b/weed/s3api/s3api_server.go
index 8730f0b88..d8e6de1f7 100644
--- a/weed/s3api/s3api_server.go
+++ b/weed/s3api/s3api_server.go
@@ -42,6 +42,10 @@ func (s3a *S3ApiServer) registerRouter(router *mux.Router) {
routers = append(routers, apiRouter.PathPrefix("/{bucket}").Subrouter())
for _, bucket := range routers {
+
+ // PutObject
+ bucket.Methods("PUT").Path("/{object:.+}").HandlerFunc(s3a.PutObjectHandler)
+
// PutBucket
bucket.Methods("PUT").HandlerFunc(s3a.PutBucketHandler)
// DeleteBucket
diff --git a/weed/server/filer_server_handlers_write.go b/weed/server/filer_server_handlers_write.go
index 0c278f67c..a02fe04aa 100644
--- a/weed/server/filer_server_handlers_write.go
+++ b/weed/server/filer_server_handlers_write.go
@@ -70,9 +70,10 @@ func (fs *FilerServer) assignNewFileInfo(w http.ResponseWriter, r *http.Request,
DataCenter: "",
}
}
+
assignResult, ae := operation.Assign(fs.filer.GetMaster(), ar, altRequest)
if ae != nil {
- glog.V(0).Infoln("failing to assign a file id", ae.Error())
+ glog.Errorf("failing to assign a file id: %v", ae)
writeJsonError(w, r, http.StatusInternalServerError, ae)
err = ae
return
@@ -102,20 +103,24 @@ func (fs *FilerServer) PostHandler(w http.ResponseWriter, r *http.Request) {
return
}
- var fileId, urlLocation string
- var err error
-
+ /*
+ var path string
if strings.HasPrefix(r.Header.Get("Content-Type"), "multipart/form-data; boundary=") {
- fileId, urlLocation, err = fs.multipartUploadAnalyzer(w, r, replication, collection, dataCenter)
- if err != nil {
- return
- }
+ path, err = fs.multipartUploadAnalyzer(w, r, replication, collection, dataCenter)
} else {
- fileId, urlLocation, err = fs.monolithicUploadAnalyzer(w, r, replication, collection, dataCenter)
- if err != nil || fileId == "" {
- return
- }
+ path, err = fs.monolithicUploadAnalyzer(w, r, replication, collection, dataCenter)
+ }
+ */
+
+ fileId, urlLocation, err := fs.queryFileInfoByPath(w, r, r.URL.Path)
+ if fileId, urlLocation, err = fs.queryFileInfoByPath(w, r, r.URL.Path); err == nil && fileId == "" {
+ fileId, urlLocation, err = fs.assignNewFileInfo(w, r, replication, collection, dataCenter)
}
+ if err != nil || fileId == "" || urlLocation == "" {
+ return
+ }
+
+ glog.V(0).Infof("request header %+v, urlLocation: %v", r.Header, urlLocation)
u, _ := url.Parse(urlLocation)
@@ -142,7 +147,7 @@ func (fs *FilerServer) PostHandler(w http.ResponseWriter, r *http.Request) {
}
resp, do_err := util.Do(request)
if do_err != nil {
- glog.V(0).Infoln("failing to connect to volume server", r.RequestURI, do_err.Error())
+ glog.Errorf("failing to connect to volume server %s: %v, %+v", r.RequestURI, do_err, r.Method)
writeJsonError(w, r, http.StatusInternalServerError, do_err)
return
}
diff --git a/weed/server/filer_server_handlers_write_monopart.go b/weed/server/filer_server_handlers_write_monopart.go
index 777d5bc43..ccafbe5c1 100644
--- a/weed/server/filer_server_handlers_write_monopart.go
+++ b/weed/server/filer_server_handlers_write_monopart.go
@@ -73,15 +73,8 @@ func checkContentMD5(w http.ResponseWriter, r *http.Request) (err error) {
return
}
-func (fs *FilerServer) monolithicUploadAnalyzer(w http.ResponseWriter, r *http.Request, replication, collection string, dataCenter string) (fileId, urlLocation string, err error) {
- /*
- Amazon S3 ref link:[http://docs.aws.amazon.com/AmazonS3/latest/API/Welcome.html]
- There is a long way to provide a completely compatibility against all Amazon S3 API, I just made
- a simple data stream adapter between S3 PUT API and seaweedfs's volume storage Write API
- 1. The request url format should be http://$host:$port/$bucketName/$objectName
- 2. bucketName will be mapped to seaweedfs's collection name
- 3. You could customize and make your enhancement.
- */
+func (fs *FilerServer) monolithicUploadAnalyzer(w http.ResponseWriter, r *http.Request, replication, collection string, dataCenter string) (path string, err error) {
+
lastPos := strings.LastIndex(r.URL.Path, "/")
if lastPos == -1 || lastPos == 0 || lastPos == len(r.URL.Path)-1 {
glog.V(0).Infof("URL Path [%s] is invalid, could not retrieve file name", r.URL.Path)
@@ -99,13 +92,8 @@ func (fs *FilerServer) monolithicUploadAnalyzer(w http.ResponseWriter, r *http.R
return
}
- secondPos := strings.Index(r.URL.Path[1:], "/") + 1
- collection = r.URL.Path[1:secondPos]
- path := r.URL.Path
+ path = r.URL.Path
- if fileId, urlLocation, err = fs.queryFileInfoByPath(w, r, path); err == nil && fileId == "" {
- fileId, urlLocation, err = fs.assignNewFileInfo(w, r, replication, collection, dataCenter)
- }
return
}
diff --git a/weed/server/filer_server_handlers_write_multipart.go b/weed/server/filer_server_handlers_write_multipart.go
index 056317750..302b58e70 100644
--- a/weed/server/filer_server_handlers_write_multipart.go
+++ b/weed/server/filer_server_handlers_write_multipart.go
@@ -1,39 +1,13 @@
package weed_server
import (
- "bytes"
- "io/ioutil"
"net/http"
- "strings"
-
- "github.com/chrislusf/seaweedfs/weed/glog"
- "github.com/chrislusf/seaweedfs/weed/storage"
)
-func (fs *FilerServer) multipartUploadAnalyzer(w http.ResponseWriter, r *http.Request, replication, collection string, dataCenter string) (fileId, urlLocation string, err error) {
+func (fs *FilerServer) multipartUploadAnalyzer(w http.ResponseWriter, r *http.Request, replication, collection string, dataCenter string) (path string, err error) {
//Default handle way for http multipart
if r.Method == "PUT" {
- buf, _ := ioutil.ReadAll(r.Body)
- r.Body = ioutil.NopCloser(bytes.NewBuffer(buf))
- fileName, _, _, _, _, _, _, _, pe := storage.ParseUpload(r)
- if pe != nil {
- glog.V(0).Infoln("failing to parse post body", pe.Error())
- writeJsonError(w, r, http.StatusInternalServerError, pe)
- err = pe
- return
- }
- //reconstruct http request body for following new request to volume server
- r.Body = ioutil.NopCloser(bytes.NewBuffer(buf))
-
- path := r.URL.Path
- if strings.HasSuffix(path, "/") {
- if fileName != "" {
- path += fileName
- }
- }
- fileId, urlLocation, err = fs.queryFileInfoByPath(w, r, path)
- } else {
- fileId, urlLocation, err = fs.assignNewFileInfo(w, r, replication, collection, dataCenter)
+ path = r.URL.Path
}
return
}