diff options
Diffstat (limited to 'weed/s3api/s3api_object_handlers.go')
| -rw-r--r-- | weed/s3api/s3api_object_handlers.go | 98 |
1 files changed, 98 insertions, 0 deletions
diff --git a/weed/s3api/s3api_object_handlers.go b/weed/s3api/s3api_object_handlers.go index 44e93d297..3ec77d511 100644 --- a/weed/s3api/s3api_object_handlers.go +++ b/weed/s3api/s3api_object_handlers.go @@ -4,6 +4,8 @@ import ( "crypto/md5" "encoding/json" "fmt" + "github.com/minio/minio/cmd" + "github.com/minio/minio/pkg/s3select" "io" "io/ioutil" "net/http" @@ -12,6 +14,7 @@ import ( "github.com/chrislusf/seaweedfs/weed/glog" "github.com/chrislusf/seaweedfs/weed/server" "github.com/gorilla/mux" + xhttp "github.com/minio/minio/cmd/http" ) var ( @@ -224,3 +227,98 @@ func getObject(vars map[string]string) string { } return object } + +func (s3a *S3ApiServer) SelectObjectContent(w http.ResponseWriter, r *http.Request) { + vars := mux.Vars(r) + bucket := vars["bucket"] + object := getObject(vars) + + s3Select, err := s3select.NewS3Select(r.Body) + if err != nil { + if serr, ok := err.(s3select.SelectError); ok { + encodedErrorResponse := encodeResponse(cmd.APIErrorResponse{ + Code: serr.ErrorCode(), + Message: serr.ErrorMessage(), + BucketName: bucket, + Key: object, + Resource: r.URL.Path, + RequestID: w.Header().Get(xhttp.AmzRequestID), + HostID: "", + }) + writeResponse(w, serr.HTTPStatusCode(), encodedErrorResponse, "application/xml") + } else { + writeErrorResponse(w, ErrInternalError, r.URL) + } + return + } + + if strings.HasSuffix(r.URL.Path, "/") { + writeErrorResponse(w, ErrNotImplemented, r.URL) + return + } + + destUrl := fmt.Sprintf("http://%s%s/%s%s", + s3a.option.Filer, s3a.option.BucketsPath, bucket, object) + passThroughResponseSelectObjectContent := func(proxyResonse *http.Response, + w http.ResponseWriter) { + getObject := func(offset, length int64) (io.ReadCloser, error) { + return proxyResonse.Body, nil + } + if err = s3Select.Open(getObject); err != nil { + if serr, ok := err.(s3select.SelectError); ok { + encodedErrorResponse := encodeResponse(cmd.APIErrorResponse{ + Code: serr.ErrorCode(), + Message: serr.ErrorMessage(), + BucketName: bucket, + Key: object, + Resource: r.URL.Path, + RequestID: w.Header().Get(xhttp.AmzRequestID), + HostID: "", + }) + writeResponse(w, serr.HTTPStatusCode(), encodedErrorResponse, mimeXML) + } else { + writeResponse(w, http.StatusInternalServerError, encodeResponse("not s3select.SelectError"), mimeXML) + } + return + } + + s3Select.Evaluate(w) + s3Select.Close() + } + s3a.proxyToFilerS3Select(w, r, destUrl, passThroughResponseSelectObjectContent) +} + +func (s3a *S3ApiServer) proxyToFilerS3Select(w http.ResponseWriter, r *http.Request, destUrl string, + responseFn func(proxyResonse *http.Response, w http.ResponseWriter)) { + + glog.V(2).Infof("s3 proxying %s to %s", "GET", destUrl) + + proxyReq, err := http.NewRequest("GET", destUrl, nil) + + if err != nil { + glog.Errorf("NewRequest %s: %v", destUrl, err) + writeErrorResponse(w, ErrInternalError, r.URL) + return + } + + proxyReq.Header.Set("Host", s3a.option.Filer) + proxyReq.Header.Set("X-Forwarded-For", r.RemoteAddr) + proxyReq.Header.Set("Etag-MD5", "True") + + for header, values := range r.Header { + for _, value := range values { + proxyReq.Header.Add(header, value) + } + } + + resp, postErr := client.Do(proxyReq) + + if postErr != nil { + glog.Errorf("post to filer: %v", postErr) + writeErrorResponse(w, ErrInternalError, r.URL) + return + } + defer resp.Body.Close() + + responseFn(resp, w) +} |
