aboutsummaryrefslogtreecommitdiff
path: root/weed/s3api/s3api_object_handlers.go
diff options
context:
space:
mode:
Diffstat (limited to 'weed/s3api/s3api_object_handlers.go')
-rw-r--r--weed/s3api/s3api_object_handlers.go98
1 files changed, 98 insertions, 0 deletions
diff --git a/weed/s3api/s3api_object_handlers.go b/weed/s3api/s3api_object_handlers.go
index 44e93d297..3ec77d511 100644
--- a/weed/s3api/s3api_object_handlers.go
+++ b/weed/s3api/s3api_object_handlers.go
@@ -4,6 +4,8 @@ import (
"crypto/md5"
"encoding/json"
"fmt"
+ "github.com/minio/minio/cmd"
+ "github.com/minio/minio/pkg/s3select"
"io"
"io/ioutil"
"net/http"
@@ -12,6 +14,7 @@ import (
"github.com/chrislusf/seaweedfs/weed/glog"
"github.com/chrislusf/seaweedfs/weed/server"
"github.com/gorilla/mux"
+ xhttp "github.com/minio/minio/cmd/http"
)
var (
@@ -224,3 +227,98 @@ func getObject(vars map[string]string) string {
}
return object
}
+
+func (s3a *S3ApiServer) SelectObjectContent(w http.ResponseWriter, r *http.Request) {
+ vars := mux.Vars(r)
+ bucket := vars["bucket"]
+ object := getObject(vars)
+
+ s3Select, err := s3select.NewS3Select(r.Body)
+ if err != nil {
+ if serr, ok := err.(s3select.SelectError); ok {
+ encodedErrorResponse := encodeResponse(cmd.APIErrorResponse{
+ Code: serr.ErrorCode(),
+ Message: serr.ErrorMessage(),
+ BucketName: bucket,
+ Key: object,
+ Resource: r.URL.Path,
+ RequestID: w.Header().Get(xhttp.AmzRequestID),
+ HostID: "",
+ })
+ writeResponse(w, serr.HTTPStatusCode(), encodedErrorResponse, "application/xml")
+ } else {
+ writeErrorResponse(w, ErrInternalError, r.URL)
+ }
+ return
+ }
+
+ if strings.HasSuffix(r.URL.Path, "/") {
+ writeErrorResponse(w, ErrNotImplemented, r.URL)
+ return
+ }
+
+ destUrl := fmt.Sprintf("http://%s%s/%s%s",
+ s3a.option.Filer, s3a.option.BucketsPath, bucket, object)
+ passThroughResponseSelectObjectContent := func(proxyResonse *http.Response,
+ w http.ResponseWriter) {
+ getObject := func(offset, length int64) (io.ReadCloser, error) {
+ return proxyResonse.Body, nil
+ }
+ if err = s3Select.Open(getObject); err != nil {
+ if serr, ok := err.(s3select.SelectError); ok {
+ encodedErrorResponse := encodeResponse(cmd.APIErrorResponse{
+ Code: serr.ErrorCode(),
+ Message: serr.ErrorMessage(),
+ BucketName: bucket,
+ Key: object,
+ Resource: r.URL.Path,
+ RequestID: w.Header().Get(xhttp.AmzRequestID),
+ HostID: "",
+ })
+ writeResponse(w, serr.HTTPStatusCode(), encodedErrorResponse, mimeXML)
+ } else {
+ writeResponse(w, http.StatusInternalServerError, encodeResponse("not s3select.SelectError"), mimeXML)
+ }
+ return
+ }
+
+ s3Select.Evaluate(w)
+ s3Select.Close()
+ }
+ s3a.proxyToFilerS3Select(w, r, destUrl, passThroughResponseSelectObjectContent)
+}
+
+func (s3a *S3ApiServer) proxyToFilerS3Select(w http.ResponseWriter, r *http.Request, destUrl string,
+ responseFn func(proxyResonse *http.Response, w http.ResponseWriter)) {
+
+ glog.V(2).Infof("s3 proxying %s to %s", "GET", destUrl)
+
+ proxyReq, err := http.NewRequest("GET", destUrl, nil)
+
+ if err != nil {
+ glog.Errorf("NewRequest %s: %v", destUrl, err)
+ writeErrorResponse(w, ErrInternalError, r.URL)
+ return
+ }
+
+ proxyReq.Header.Set("Host", s3a.option.Filer)
+ proxyReq.Header.Set("X-Forwarded-For", r.RemoteAddr)
+ proxyReq.Header.Set("Etag-MD5", "True")
+
+ for header, values := range r.Header {
+ for _, value := range values {
+ proxyReq.Header.Add(header, value)
+ }
+ }
+
+ resp, postErr := client.Do(proxyReq)
+
+ if postErr != nil {
+ glog.Errorf("post to filer: %v", postErr)
+ writeErrorResponse(w, ErrInternalError, r.URL)
+ return
+ }
+ defer resp.Body.Close()
+
+ responseFn(resp, w)
+}