aboutsummaryrefslogtreecommitdiff
path: root/weed/operation
diff options
context:
space:
mode:
Diffstat (limited to 'weed/operation')
-rw-r--r--weed/operation/chunked_file.go8
-rw-r--r--weed/operation/needle_parse_test.go129
-rw-r--r--weed/operation/submit.go3
-rw-r--r--weed/operation/tail_volume.go4
-rw-r--r--weed/operation/upload_content.go147
5 files changed, 226 insertions, 65 deletions
diff --git a/weed/operation/chunked_file.go b/weed/operation/chunked_file.go
index baa0038c4..1bac028ff 100644
--- a/weed/operation/chunked_file.go
+++ b/weed/operation/chunked_file.go
@@ -53,11 +53,11 @@ func (s ChunkList) Len() int { return len(s) }
func (s ChunkList) Less(i, j int) bool { return s[i].Offset < s[j].Offset }
func (s ChunkList) Swap(i, j int) { s[i], s[j] = s[j], s[i] }
-func LoadChunkManifest(buffer []byte, isGzipped bool) (*ChunkManifest, error) {
- if isGzipped {
+func LoadChunkManifest(buffer []byte, isCompressed bool) (*ChunkManifest, error) {
+ if isCompressed {
var err error
- if buffer, err = util.UnGzipData(buffer); err != nil {
- return nil, err
+ if buffer, err = util.DecompressData(buffer); err != nil {
+ glog.V(0).Infof("fail to decompress chunk manifest: %v", err)
}
}
cm := ChunkManifest{}
diff --git a/weed/operation/needle_parse_test.go b/weed/operation/needle_parse_test.go
new file mode 100644
index 000000000..177c620f4
--- /dev/null
+++ b/weed/operation/needle_parse_test.go
@@ -0,0 +1,129 @@
+package operation
+
+import (
+ "bytes"
+ "fmt"
+ "io"
+ "net/http"
+ "testing"
+
+ "github.com/stretchr/testify/assert"
+
+ "github.com/chrislusf/seaweedfs/weed/storage/needle"
+ "github.com/chrislusf/seaweedfs/weed/util"
+)
+
+type MockClient struct {
+ needleHandling func(n *needle.Needle, originalSize int, e error)
+}
+
+func (m *MockClient) Do(req *http.Request) (*http.Response, error) {
+ n, originalSize, _, err := needle.CreateNeedleFromRequest(req, false, 1024*1024)
+ if m.needleHandling != nil {
+ m.needleHandling(n, originalSize, err)
+ }
+ return &http.Response{
+ StatusCode: http.StatusNoContent,
+ }, io.EOF
+}
+
+/*
+
+The mime type is always the value passed in.
+
+Compress or not depends on the content detection, file name extension, and compression ratio.
+
+If the content is already compressed, need to know the content size.
+
+*/
+
+func TestCreateNeedleFromRequest(t *testing.T) {
+ mc := &MockClient{}
+ tmp := HttpClient
+ HttpClient = mc
+ defer func() {
+ HttpClient = tmp
+ }()
+
+ {
+ mc.needleHandling = func(n *needle.Needle, originalSize int, err error) {
+ assert.Equal(t, nil, err, "upload: %v", err)
+ assert.Equal(t, "", string(n.Mime), "mime detection failed: %v", string(n.Mime))
+ assert.Equal(t, true, n.IsCompressed(), "this should be compressed")
+ assert.Equal(t, true, util.IsGzippedContent(n.Data), "this should be gzip")
+ fmt.Printf("needle: %v, originalSize: %d\n", n, originalSize)
+ }
+ uploadResult, err, data := Upload("http://localhost:8080/389,0f084d17353afda0", "t.txt", false, bytes.NewReader([]byte(textContent)), false, "", nil, "")
+ if len(data) != len(textContent) {
+ t.Errorf("data actual %d expected %d", len(data), len(textContent))
+ }
+ if err != nil {
+ fmt.Printf("err: %v\n", err)
+ }
+ fmt.Printf("uploadResult: %+v\n", uploadResult)
+ }
+
+ {
+ mc.needleHandling = func(n *needle.Needle, originalSize int, err error) {
+ assert.Equal(t, nil, err, "upload: %v", err)
+ assert.Equal(t, "text/plain", string(n.Mime), "mime detection failed: %v", string(n.Mime))
+ assert.Equal(t, true, n.IsCompressed(), "this should be compressed")
+ assert.Equal(t, true, util.IsGzippedContent(n.Data), "this should be gzip")
+ fmt.Printf("needle: %v, dataSize:%d originalSize:%d\n", n, len(n.Data), originalSize)
+ }
+ gzippedData, _ := util.GzipData([]byte(textContent))
+ Upload("http://localhost:8080/389,0f084d17353afda0", "t.txt", false, bytes.NewReader(gzippedData), true, "text/plain", nil, "")
+ }
+
+ {
+ mc.needleHandling = func(n *needle.Needle, originalSize int, err error) {
+ assert.Equal(t, nil, err, "upload: %v", err)
+ assert.Equal(t, "text/plain", string(n.Mime), "mime detection failed: %v", string(n.Mime))
+ assert.Equal(t, true, n.IsCompressed(), "this should be compressed")
+ assert.Equal(t, true, util.IsZstdContent(n.Data), "this should be zstd")
+ fmt.Printf("needle: %v, dataSize:%d originalSize:%d\n", n, len(n.Data), originalSize)
+ }
+ zstdData, _ := util.ZstdData([]byte(textContent))
+ Upload("http://localhost:8080/389,0f084d17353afda0", "t.txt", false, bytes.NewReader(zstdData), true, "text/plain", nil, "")
+ }
+
+ {
+ mc.needleHandling = func(n *needle.Needle, originalSize int, err error) {
+ assert.Equal(t, nil, err, "upload: %v", err)
+ assert.Equal(t, "application/zstd", string(n.Mime), "mime detection failed: %v", string(n.Mime))
+ assert.Equal(t, false, n.IsCompressed(), "this should not be compressed")
+ assert.Equal(t, true, util.IsZstdContent(n.Data), "this should still be zstd")
+ fmt.Printf("needle: %v, dataSize:%d originalSize:%d\n", n, len(n.Data), originalSize)
+ }
+ zstdData, _ := util.ZstdData([]byte(textContent))
+ Upload("http://localhost:8080/389,0f084d17353afda0", "t.txt", false, bytes.NewReader(zstdData), false, "application/zstd", nil, "")
+ }
+
+}
+
+var textContent = `Redistribution and use in source and binary forms, with or without
+modification, are permitted provided that the following conditions are
+met:
+
+ * Redistributions of source code must retain the above copyright
+notice, this list of conditions and the following disclaimer.
+ * Redistributions in binary form must reproduce the above
+copyright notice, this list of conditions and the following disclaimer
+in the documentation and/or other materials provided with the
+distribution.
+ * Neither the name of Google Inc. nor the names of its
+contributors may be used to endorse or promote products derived from
+this software without specific prior written permission.
+
+THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+"AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+(INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+`
diff --git a/weed/operation/submit.go b/weed/operation/submit.go
index e8bec382a..25843c892 100644
--- a/weed/operation/submit.go
+++ b/weed/operation/submit.go
@@ -170,6 +170,9 @@ func (fi FilePart) Upload(maxMB int, master string, usePublicUrl bool, jwt secur
}
}
fileUrl := "http://" + ret.Url + "/" + id
+ if usePublicUrl {
+ fileUrl = "http://" + ret.PublicUrl + "/" + id
+ }
count, e := upload_one_chunk(
baseName+"-"+strconv.FormatInt(i+1, 10),
io.LimitReader(fi.Reader, chunkSize),
diff --git a/weed/operation/tail_volume.go b/weed/operation/tail_volume.go
index 3cd66b5da..a15c21ae4 100644
--- a/weed/operation/tail_volume.go
+++ b/weed/operation/tail_volume.go
@@ -28,8 +28,10 @@ func TailVolume(master string, grpcDialOption grpc.DialOption, vid needle.Volume
func TailVolumeFromSource(volumeServer string, grpcDialOption grpc.DialOption, vid needle.VolumeId, sinceNs uint64, idleTimeoutSeconds int, fn func(n *needle.Needle) error) error {
return WithVolumeServerClient(volumeServer, grpcDialOption, func(client volume_server_pb.VolumeServerClient) error {
+ ctx, cancel := context.WithCancel(context.Background())
+ defer cancel()
- stream, err := client.VolumeTailSender(context.Background(), &volume_server_pb.VolumeTailSenderRequest{
+ stream, err := client.VolumeTailSender(ctx, &volume_server_pb.VolumeTailSenderRequest{
VolumeId: uint32(vid),
SinceNs: sinceNs,
IdleTimeoutSeconds: uint32(idleTimeoutSeconds),
diff --git a/weed/operation/upload_content.go b/weed/operation/upload_content.go
index 1e2c591c5..a4148cb22 100644
--- a/weed/operation/upload_content.go
+++ b/weed/operation/upload_content.go
@@ -2,9 +2,7 @@ package operation
import (
"bytes"
- "crypto/md5"
"encoding/json"
- "errors"
"fmt"
"io"
"io/ioutil"
@@ -13,6 +11,7 @@ import (
"net/http"
"net/textproto"
"path/filepath"
+ "runtime/debug"
"strings"
"time"
@@ -20,37 +19,45 @@ import (
"github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
"github.com/chrislusf/seaweedfs/weed/security"
"github.com/chrislusf/seaweedfs/weed/util"
+ "github.com/valyala/bytebufferpool"
)
type UploadResult struct {
- Name string `json:"name,omitempty"`
- Size uint32 `json:"size,omitempty"`
- Error string `json:"error,omitempty"`
- ETag string `json:"eTag,omitempty"`
- CipherKey []byte `json:"cipherKey,omitempty"`
- Mime string `json:"mime,omitempty"`
- Gzip uint32 `json:"gzip,omitempty"`
- Md5 string `json:"md5,omitempty"`
+ Name string `json:"name,omitempty"`
+ Size uint32 `json:"size,omitempty"`
+ Error string `json:"error,omitempty"`
+ ETag string `json:"eTag,omitempty"`
+ CipherKey []byte `json:"cipherKey,omitempty"`
+ Mime string `json:"mime,omitempty"`
+ Gzip uint32 `json:"gzip,omitempty"`
+ ContentMd5 string `json:"contentMd5,omitempty"`
}
func (uploadResult *UploadResult) ToPbFileChunk(fileId string, offset int64) *filer_pb.FileChunk {
+ fid, _ := filer_pb.ToFileIdObject(fileId)
return &filer_pb.FileChunk{
- FileId: fileId,
- Offset: offset,
- Size: uint64(uploadResult.Size),
- Mtime: time.Now().UnixNano(),
- ETag: uploadResult.ETag,
- CipherKey: uploadResult.CipherKey,
- IsGzipped: uploadResult.Gzip > 0,
+ FileId: fileId,
+ Offset: offset,
+ Size: uint64(uploadResult.Size),
+ Mtime: time.Now().UnixNano(),
+ ETag: uploadResult.ETag,
+ CipherKey: uploadResult.CipherKey,
+ IsCompressed: uploadResult.Gzip > 0,
+ Fid: fid,
}
}
+// HTTPClient interface for testing
+type HTTPClient interface {
+ Do(req *http.Request) (*http.Response, error)
+}
+
var (
- client *http.Client
+ HttpClient HTTPClient
)
func init() {
- client = &http.Client{Transport: &http.Transport{
+ HttpClient = &http.Client{Transport: &http.Transport{
MaxIdleConnsPerHost: 1024,
}}
}
@@ -58,48 +65,61 @@ func init() {
var fileNameEscaper = strings.NewReplacer("\\", "\\\\", "\"", "\\\"")
// Upload sends a POST request to a volume server to upload the content with adjustable compression level
-func UploadData(uploadUrl string, filename string, cipher bool, data []byte, isInputGzipped bool, mtype string, pairMap map[string]string, jwt security.EncodedJwt) (uploadResult *UploadResult, err error) {
- uploadResult, err = doUploadData(uploadUrl, filename, cipher, data, isInputGzipped, mtype, pairMap, jwt)
- if uploadResult != nil {
- uploadResult.Md5 = util.Md5(data)
- }
+func UploadData(uploadUrl string, filename string, cipher bool, data []byte, isInputCompressed bool, mtype string, pairMap map[string]string, jwt security.EncodedJwt) (uploadResult *UploadResult, err error) {
+ uploadResult, err = retriedUploadData(uploadUrl, filename, cipher, data, isInputCompressed, mtype, pairMap, jwt)
return
}
// Upload sends a POST request to a volume server to upload the content with fast compression
-func Upload(uploadUrl string, filename string, cipher bool, reader io.Reader, isInputGzipped bool, mtype string, pairMap map[string]string, jwt security.EncodedJwt) (uploadResult *UploadResult, err error, data []byte) {
- hash := md5.New()
- reader = io.TeeReader(reader, hash)
- uploadResult, err, data = doUpload(uploadUrl, filename, cipher, reader, isInputGzipped, mtype, pairMap, jwt)
- if uploadResult != nil {
- uploadResult.Md5 = fmt.Sprintf("%x", hash.Sum(nil))
- }
+func Upload(uploadUrl string, filename string, cipher bool, reader io.Reader, isInputCompressed bool, mtype string, pairMap map[string]string, jwt security.EncodedJwt) (uploadResult *UploadResult, err error, data []byte) {
+ uploadResult, err, data = doUpload(uploadUrl, filename, cipher, reader, isInputCompressed, mtype, pairMap, jwt)
return
}
-func doUpload(uploadUrl string, filename string, cipher bool, reader io.Reader, isInputGzipped bool, mtype string, pairMap map[string]string, jwt security.EncodedJwt) (uploadResult *UploadResult, err error, data []byte) {
- data, err = ioutil.ReadAll(reader)
- if err != nil {
- err = fmt.Errorf("read input: %v", err)
- return
+func doUpload(uploadUrl string, filename string, cipher bool, reader io.Reader, isInputCompressed bool, mtype string, pairMap map[string]string, jwt security.EncodedJwt) (uploadResult *UploadResult, err error, data []byte) {
+ bytesReader, ok := reader.(*util.BytesReader)
+ if ok {
+ data = bytesReader.Bytes
+ } else {
+ buf := bytebufferpool.Get()
+ _, err = buf.ReadFrom(reader)
+ defer bytebufferpool.Put(buf)
+ if err != nil {
+ err = fmt.Errorf("read input: %v", err)
+ return
+ }
+ data = buf.Bytes()
}
- uploadResult, uploadErr := doUploadData(uploadUrl, filename, cipher, data, isInputGzipped, mtype, pairMap, jwt)
+ uploadResult, uploadErr := retriedUploadData(uploadUrl, filename, cipher, data, isInputCompressed, mtype, pairMap, jwt)
return uploadResult, uploadErr, data
}
-func doUploadData(uploadUrl string, filename string, cipher bool, data []byte, isInputGzipped bool, mtype string, pairMap map[string]string, jwt security.EncodedJwt) (uploadResult *UploadResult, err error) {
- contentIsGzipped := isInputGzipped
+func retriedUploadData(uploadUrl string, filename string, cipher bool, data []byte, isInputCompressed bool, mtype string, pairMap map[string]string, jwt security.EncodedJwt) (uploadResult *UploadResult, err error) {
+ for i := 0; i < 1; i++ {
+ uploadResult, err = doUploadData(uploadUrl, filename, cipher, data, isInputCompressed, mtype, pairMap, jwt)
+ if err == nil {
+ return
+ } else {
+ glog.Warningf("uploading to %s: %v", uploadUrl, err)
+ }
+ }
+ return
+}
+
+func doUploadData(uploadUrl string, filename string, cipher bool, data []byte, isInputCompressed bool, mtype string, pairMap map[string]string, jwt security.EncodedJwt) (uploadResult *UploadResult, err error) {
+ contentIsGzipped := isInputCompressed
shouldGzipNow := false
- if !isInputGzipped {
+ if !isInputCompressed {
if mtype == "" {
mtype = http.DetectContentType(data)
+ // println("detect1 mimetype to", mtype)
if mtype == "application/octet-stream" {
mtype = ""
}
}
- if shouldBeZipped, iAmSure := util.IsGzippableFileType(filepath.Base(filename), mtype); iAmSure && shouldBeZipped {
+ if shouldBeCompressed, iAmSure := util.IsCompressableFileType(filepath.Base(filename), mtype); iAmSure && shouldBeCompressed {
shouldGzipNow = true
- } else if !iAmSure && mtype == "" && len(data) > 128 {
+ } else if !iAmSure && mtype == "" && len(data) > 16*1024 {
var compressed []byte
compressed, err = util.GzipData(data[0:128])
shouldGzipNow = len(compressed)*10 < 128*9 // can not compress to less than 90%
@@ -118,9 +138,9 @@ func doUploadData(uploadUrl string, filename string, cipher bool, data []byte, i
data = compressed
contentIsGzipped = true
}
- } else if isInputGzipped {
+ } else if isInputCompressed {
// just to get the clear data length
- clearData, err := util.UnGzipData(data)
+ clearData, err := util.DecompressData(data)
if err == nil {
clearDataLen = len(clearData)
}
@@ -141,7 +161,7 @@ func doUploadData(uploadUrl string, filename string, cipher bool, data []byte, i
uploadResult, err = upload_content(uploadUrl, func(w io.Writer) (err error) {
_, err = w.Write(encryptedData)
return
- }, "", false, "", nil, jwt)
+ }, "", false, len(encryptedData), "", nil, jwt)
if uploadResult != nil {
uploadResult.Name = filename
uploadResult.Mime = mtype
@@ -152,7 +172,7 @@ func doUploadData(uploadUrl string, filename string, cipher bool, data []byte, i
uploadResult, err = upload_content(uploadUrl, func(w io.Writer) (err error) {
_, err = w.Write(data)
return
- }, filename, contentIsGzipped, mtype, pairMap, jwt)
+ }, filename, contentIsGzipped, 0, mtype, pairMap, jwt)
}
if uploadResult == nil {
@@ -167,9 +187,10 @@ func doUploadData(uploadUrl string, filename string, cipher bool, data []byte, i
return uploadResult, err
}
-func upload_content(uploadUrl string, fillBufferFunction func(w io.Writer) error, filename string, isGzipped bool, mtype string, pairMap map[string]string, jwt security.EncodedJwt) (*UploadResult, error) {
- body_buf := bytes.NewBufferString("")
- body_writer := multipart.NewWriter(body_buf)
+func upload_content(uploadUrl string, fillBufferFunction func(w io.Writer) error, filename string, isGzipped bool, originalDataSize int, mtype string, pairMap map[string]string, jwt security.EncodedJwt) (*UploadResult, error) {
+ buf := bytebufferpool.Get()
+ defer bytebufferpool.Put(buf)
+ body_writer := multipart.NewWriter(buf)
h := make(textproto.MIMEHeader)
h.Set("Content-Disposition", fmt.Sprintf(`form-data; name="file"; filename="%s"`, fileNameEscaper.Replace(filename)))
if mtype == "" {
@@ -197,10 +218,10 @@ func upload_content(uploadUrl string, fillBufferFunction func(w io.Writer) error
return nil, err
}
- req, postErr := http.NewRequest("POST", uploadUrl, body_buf)
+ req, postErr := http.NewRequest("POST", uploadUrl, bytes.NewReader(buf.Bytes()))
if postErr != nil {
- glog.V(0).Infoln("failing to upload to", uploadUrl, postErr.Error())
- return nil, postErr
+ glog.V(1).Infof("create upload request %s: %v", uploadUrl, postErr)
+ return nil, fmt.Errorf("create upload request %s: %v", uploadUrl, postErr)
}
req.Header.Set("Content-Type", content_type)
for k, v := range pairMap {
@@ -209,12 +230,15 @@ func upload_content(uploadUrl string, fillBufferFunction func(w io.Writer) error
if jwt != "" {
req.Header.Set("Authorization", "BEARER "+string(jwt))
}
- resp, post_err := client.Do(req)
+ // print("+")
+ resp, post_err := HttpClient.Do(req)
if post_err != nil {
- glog.V(0).Infoln("failing to upload to", uploadUrl, post_err.Error())
- return nil, post_err
+ glog.Errorf("upload %s %d bytes to %v: %v", filename, originalDataSize, uploadUrl, post_err)
+ debug.PrintStack()
+ return nil, fmt.Errorf("upload %s %d bytes to %v: %v", filename, originalDataSize, uploadUrl, post_err)
}
- defer resp.Body.Close()
+ // print("-")
+ defer util.CloseResponse(resp)
var ret UploadResult
etag := getEtag(resp)
@@ -222,19 +246,22 @@ func upload_content(uploadUrl string, fillBufferFunction func(w io.Writer) error
ret.ETag = etag
return &ret, nil
}
+
resp_body, ra_err := ioutil.ReadAll(resp.Body)
if ra_err != nil {
- return nil, ra_err
+ return nil, fmt.Errorf("read response body %v: %v", uploadUrl, ra_err)
}
+
unmarshal_err := json.Unmarshal(resp_body, &ret)
if unmarshal_err != nil {
- glog.V(0).Infoln("failing to read upload response", uploadUrl, string(resp_body))
- return nil, unmarshal_err
+ glog.Errorf("unmarshal %s: %v", uploadUrl, string(resp_body))
+ return nil, fmt.Errorf("unmarshal %v: %v", uploadUrl, unmarshal_err)
}
if ret.Error != "" {
- return nil, errors.New(ret.Error)
+ return nil, fmt.Errorf("unmarshalled error %v: %v", uploadUrl, ret.Error)
}
ret.ETag = etag
+ ret.ContentMd5 = resp.Header.Get("Content-MD5")
return &ret, nil
}