aboutsummaryrefslogtreecommitdiff
path: root/weed
diff options
context:
space:
mode:
Diffstat (limited to 'weed')
-rw-r--r--weed/operation/needle_parse_test.go130
-rw-r--r--weed/operation/upload_content.go31
-rw-r--r--weed/storage/needle/needle_parse_upload.go17
3 files changed, 157 insertions, 21 deletions
diff --git a/weed/operation/needle_parse_test.go b/weed/operation/needle_parse_test.go
new file mode 100644
index 000000000..20a610eaa
--- /dev/null
+++ b/weed/operation/needle_parse_test.go
@@ -0,0 +1,130 @@
+package operation
+
+import (
+ "bytes"
+ "fmt"
+ "io"
+ "net/http"
+ "testing"
+
+ "github.com/stretchr/testify/assert"
+
+ "github.com/chrislusf/seaweedfs/weed/storage/needle"
+ "github.com/chrislusf/seaweedfs/weed/util"
+)
+
+type MockClient struct {
+ needleHandling func(n *needle.Needle, originalSize int, e error)
+}
+
+func (m *MockClient) Do(req *http.Request) (*http.Response, error) {
+ n, originalSize, err := needle.CreateNeedleFromRequest(req, 1024*1024)
+ if m.needleHandling != nil {
+ m.needleHandling(n, originalSize, err)
+ }
+ return &http.Response{
+ StatusCode: http.StatusNoContent,
+ }, io.EOF
+}
+
+/*
+
+The mime type is always the value passed in.
+
+Compress or not depends on the content detection, file name extension, and compression ratio.
+
+If the content is already compressed, need to know the content size.
+
+*/
+
+func TestCreateNeedleFromRequest(t *testing.T) {
+ mc := &MockClient{}
+ tmp := HttpClient
+ HttpClient = mc
+ defer func() {
+ HttpClient = tmp
+ }()
+
+ {
+ mc.needleHandling = func(n *needle.Needle, originalSize int, err error) {
+ assert.Equal(t, nil, err, "upload: %v", err)
+ assert.Equal(t, "", string(n.Mime), "mime detection failed: %v", string(n.Mime))
+ assert.Equal(t, true, n.IsCompressed(), "this should be compressed")
+ assert.Equal(t, true, util.IsGzippedContent(n.Data), "this should be gzip")
+ fmt.Printf("needle: %v, originalSize: %d\n", n, originalSize)
+ }
+ uploadResult, err, data := Upload("http://localhost:8080/389,0f084d17353afda0", "t.txt", false, bytes.NewReader([]byte(textContent)), false, "", nil, "")
+ if len(data) != len(textContent) {
+ t.Errorf("data actual %d expected %d", len(data), len(textContent))
+ }
+ if err != nil {
+ fmt.Printf("err: %v\n", err)
+ }
+ fmt.Printf("uploadResult: %+v\n", uploadResult)
+ }
+
+ {
+ mc.needleHandling = func(n *needle.Needle, originalSize int, err error) {
+ assert.Equal(t, nil, err, "upload: %v", err)
+ assert.Equal(t, "text/plain", string(n.Mime), "mime detection failed: %v", string(n.Mime))
+ assert.Equal(t, true, n.IsCompressed(), "this should be compressed")
+ assert.Equal(t, true, util.IsGzippedContent(n.Data), "this should be gzip")
+ fmt.Printf("needle: %v, dataSize:%d originalSize:%d\n", n, len(n.Data), originalSize)
+ }
+ gzippedData, _ := util.GzipData([]byte(textContent))
+ Upload("http://localhost:8080/389,0f084d17353afda0", "t.txt", false, bytes.NewReader(gzippedData), true, "text/plain", nil, "")
+ }
+
+ {
+ mc.needleHandling = func(n *needle.Needle, originalSize int, err error) {
+ assert.Equal(t, nil, err, "upload: %v", err)
+ assert.Equal(t, "text/plain", string(n.Mime), "mime detection failed: %v", string(n.Mime))
+ assert.Equal(t, true, n.IsCompressed(), "this should be compressed")
+ assert.Equal(t, true, util.IsZstdContent(n.Data), "this should be zstd")
+ fmt.Printf("needle: %v, dataSize:%d originalSize:%d\n", n, len(n.Data), originalSize)
+ }
+ zstdData, _ := util.ZstdData([]byte(textContent))
+ Upload("http://localhost:8080/389,0f084d17353afda0", "t.txt", false, bytes.NewReader(zstdData), true, "text/plain", nil, "")
+ }
+
+ {
+ mc.needleHandling = func(n *needle.Needle, originalSize int, err error) {
+ assert.Equal(t, nil, err, "upload: %v", err)
+ assert.Equal(t, "application/zstd", string(n.Mime), "mime detection failed: %v", string(n.Mime))
+ assert.Equal(t, false, n.IsCompressed(), "this should not be compressed")
+ assert.Equal(t, true, util.IsZstdContent(n.Data), "this should still be zstd")
+ fmt.Printf("needle: %v, dataSize:%d originalSize:%d\n", n, len(n.Data), originalSize)
+ }
+ zstdData, _ := util.ZstdData([]byte(textContent))
+ Upload("http://localhost:8080/389,0f084d17353afda0", "t.txt", false, bytes.NewReader(zstdData), false, "application/zstd", nil, "")
+ }
+
+}
+
+
+var textContent = `Redistribution and use in source and binary forms, with or without
+modification, are permitted provided that the following conditions are
+met:
+
+ * Redistributions of source code must retain the above copyright
+notice, this list of conditions and the following disclaimer.
+ * Redistributions in binary form must reproduce the above
+copyright notice, this list of conditions and the following disclaimer
+in the documentation and/or other materials provided with the
+distribution.
+ * Neither the name of Google Inc. nor the names of its
+contributors may be used to endorse or promote products derived from
+this software without specific prior written permission.
+
+THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+"AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+(INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+` \ No newline at end of file
diff --git a/weed/operation/upload_content.go b/weed/operation/upload_content.go
index 05b016457..658588ec3 100644
--- a/weed/operation/upload_content.go
+++ b/weed/operation/upload_content.go
@@ -45,12 +45,17 @@ func (uploadResult *UploadResult) ToPbFileChunk(fileId string, offset int64) *fi
}
}
+// HTTPClient interface for testing
+type HTTPClient interface {
+ Do(req *http.Request) (*http.Response, error)
+}
+
var (
- client *http.Client
+ HttpClient HTTPClient
)
func init() {
- client = &http.Client{Transport: &http.Transport{
+ HttpClient = &http.Client{Transport: &http.Transport{
MaxIdleConnsPerHost: 1024,
}}
}
@@ -58,8 +63,8 @@ func init() {
var fileNameEscaper = strings.NewReplacer("\\", "\\\\", "\"", "\\\"")
// Upload sends a POST request to a volume server to upload the content with adjustable compression level
-func UploadData(uploadUrl string, filename string, cipher bool, data []byte, isInputGzipped bool, mtype string, pairMap map[string]string, jwt security.EncodedJwt) (uploadResult *UploadResult, err error) {
- uploadResult, err = doUploadData(uploadUrl, filename, cipher, data, isInputGzipped, mtype, pairMap, jwt)
+func UploadData(uploadUrl string, filename string, cipher bool, data []byte, isInputCompressed bool, mtype string, pairMap map[string]string, jwt security.EncodedJwt) (uploadResult *UploadResult, err error) {
+ uploadResult, err = doUploadData(uploadUrl, filename, cipher, data, isInputCompressed, mtype, pairMap, jwt)
if uploadResult != nil {
uploadResult.Md5 = util.Md5(data)
}
@@ -67,30 +72,30 @@ func UploadData(uploadUrl string, filename string, cipher bool, data []byte, isI
}
// Upload sends a POST request to a volume server to upload the content with fast compression
-func Upload(uploadUrl string, filename string, cipher bool, reader io.Reader, isInputGzipped bool, mtype string, pairMap map[string]string, jwt security.EncodedJwt) (uploadResult *UploadResult, err error, data []byte) {
+func Upload(uploadUrl string, filename string, cipher bool, reader io.Reader, isInputCompressed bool, mtype string, pairMap map[string]string, jwt security.EncodedJwt) (uploadResult *UploadResult, err error, data []byte) {
hash := md5.New()
reader = io.TeeReader(reader, hash)
- uploadResult, err, data = doUpload(uploadUrl, filename, cipher, reader, isInputGzipped, mtype, pairMap, jwt)
+ uploadResult, err, data = doUpload(uploadUrl, filename, cipher, reader, isInputCompressed, mtype, pairMap, jwt)
if uploadResult != nil {
uploadResult.Md5 = fmt.Sprintf("%x", hash.Sum(nil))
}
return
}
-func doUpload(uploadUrl string, filename string, cipher bool, reader io.Reader, isInputGzipped bool, mtype string, pairMap map[string]string, jwt security.EncodedJwt) (uploadResult *UploadResult, err error, data []byte) {
+func doUpload(uploadUrl string, filename string, cipher bool, reader io.Reader, isInputCompressed bool, mtype string, pairMap map[string]string, jwt security.EncodedJwt) (uploadResult *UploadResult, err error, data []byte) {
data, err = ioutil.ReadAll(reader)
if err != nil {
err = fmt.Errorf("read input: %v", err)
return
}
- uploadResult, uploadErr := doUploadData(uploadUrl, filename, cipher, data, isInputGzipped, mtype, pairMap, jwt)
+ uploadResult, uploadErr := doUploadData(uploadUrl, filename, cipher, data, isInputCompressed, mtype, pairMap, jwt)
return uploadResult, uploadErr, data
}
-func doUploadData(uploadUrl string, filename string, cipher bool, data []byte, isInputGzipped bool, mtype string, pairMap map[string]string, jwt security.EncodedJwt) (uploadResult *UploadResult, err error) {
- contentIsGzipped := isInputGzipped
+func doUploadData(uploadUrl string, filename string, cipher bool, data []byte, isInputCompressed bool, mtype string, pairMap map[string]string, jwt security.EncodedJwt) (uploadResult *UploadResult, err error) {
+ contentIsGzipped := isInputCompressed
shouldGzipNow := false
- if !isInputGzipped {
+ if !isInputCompressed {
if mtype == "" {
mtype = http.DetectContentType(data)
// println("detect1 mimetype to", mtype)
@@ -119,7 +124,7 @@ func doUploadData(uploadUrl string, filename string, cipher bool, data []byte, i
data = compressed
contentIsGzipped = true
}
- } else if isInputGzipped {
+ } else if isInputCompressed {
// just to get the clear data length
clearData, err := util.DecompressData(data)
if err == nil {
@@ -210,7 +215,7 @@ func upload_content(uploadUrl string, fillBufferFunction func(w io.Writer) error
if jwt != "" {
req.Header.Set("Authorization", "BEARER "+string(jwt))
}
- resp, post_err := client.Do(req)
+ resp, post_err := HttpClient.Do(req)
if post_err != nil {
glog.V(0).Infoln("failing to upload to", uploadUrl, post_err.Error())
return nil, post_err
diff --git a/weed/storage/needle/needle_parse_upload.go b/weed/storage/needle/needle_parse_upload.go
index a2c881f92..10bef603f 100644
--- a/weed/storage/needle/needle_parse_upload.go
+++ b/weed/storage/needle/needle_parse_upload.go
@@ -54,13 +54,6 @@ func ParseUpload(r *http.Request, sizeLimit int64) (pu *ParsedUpload, e error) {
pu.OriginalDataSize = len(pu.Data)
pu.UncompressedData = pu.Data
// println("received data", len(pu.Data), "isGzipped", pu.IsCompressed, "mime", pu.MimeType, "name", pu.FileName)
- if pu.MimeType == "" {
- pu.MimeType = http.DetectContentType(pu.Data)
- // println("detected mimetype to", pu.MimeType)
- if pu.MimeType == "application/octet-stream" {
- pu.MimeType = ""
- }
- }
if pu.IsGzipped {
if unzipped, e := util.DecompressData(pu.Data); e == nil {
pu.OriginalDataSize = len(unzipped)
@@ -69,7 +62,15 @@ func ParseUpload(r *http.Request, sizeLimit int64) (pu *ParsedUpload, e error) {
}
} else {
ext := filepath.Base(pu.FileName)
- if shouldBeCompressed, iAmSure := util.IsCompressableFileType(ext, pu.MimeType); pu.MimeType == "" && !iAmSure || shouldBeCompressed && iAmSure {
+ mimeType := pu.MimeType
+ if mimeType == "" {
+ mimeType = http.DetectContentType(pu.Data)
+ }
+ // println("detected mimetype to", pu.MimeType)
+ if mimeType == "application/octet-stream" {
+ mimeType = ""
+ }
+ if shouldBeCompressed, iAmSure := util.IsCompressableFileType(ext, mimeType); mimeType == "" && !iAmSure || shouldBeCompressed && iAmSure {
// println("ext", ext, "iAmSure", iAmSure, "shouldGzip", shouldGzip, "mimeType", pu.MimeType)
if compressedData, err := util.GzipData(pu.Data); err == nil {
if len(compressedData)*10 < len(pu.Data)*9 {