aboutsummaryrefslogtreecommitdiff
path: root/weed/operation
diff options
context:
space:
mode:
authorbingoohuang <bingoo.huang@gmail.com>2021-04-26 17:19:35 +0800
committerbingoohuang <bingoo.huang@gmail.com>2021-04-26 17:19:35 +0800
commitd861cbd81b75b6684c971ac00e33685e6575b833 (patch)
tree301805fef4aa5d0096bfb1510536f7a009b661e7 /weed/operation
parent70da715d8d917527291b35fb069fac077d17b868 (diff)
parent4ee58922eff61a5a4ca29c0b4829b097a498549e (diff)
downloadseaweedfs-d861cbd81b75b6684c971ac00e33685e6575b833.tar.xz
seaweedfs-d861cbd81b75b6684c971ac00e33685e6575b833.zip
Merge branch 'master' of https://github.com/bingoohuang/seaweedfs
Diffstat (limited to 'weed/operation')
-rw-r--r--weed/operation/assign_file_id.go71
-rw-r--r--weed/operation/buffer_pool.go24
-rw-r--r--weed/operation/chunked_file.go62
-rw-r--r--weed/operation/delete_content.go31
-rw-r--r--weed/operation/grpc_client.go40
-rw-r--r--weed/operation/lookup.go15
-rw-r--r--weed/operation/needle_parse_test.go131
-rw-r--r--weed/operation/stats.go26
-rw-r--r--weed/operation/submit.go57
-rw-r--r--weed/operation/tail_volume.go11
-rw-r--r--weed/operation/upload_content.go223
11 files changed, 523 insertions, 168 deletions
diff --git a/weed/operation/assign_file_id.go b/weed/operation/assign_file_id.go
index 2dfa44483..ffd3e4938 100644
--- a/weed/operation/assign_file_id.go
+++ b/weed/operation/assign_file_id.go
@@ -3,11 +3,14 @@ package operation
import (
"context"
"fmt"
+ "strings"
+
+ "github.com/chrislusf/seaweedfs/weed/storage/needle"
+ "google.golang.org/grpc"
+
"github.com/chrislusf/seaweedfs/weed/pb/master_pb"
"github.com/chrislusf/seaweedfs/weed/security"
"github.com/chrislusf/seaweedfs/weed/util"
- "google.golang.org/grpc"
- "strings"
)
type VolumeAssignRequest struct {
@@ -15,6 +18,7 @@ type VolumeAssignRequest struct {
Replication string
Collection string
Ttl string
+ DiskType string
DataCenter string
Rack string
DataNode string
@@ -30,7 +34,7 @@ type AssignResult struct {
Auth security.EncodedJwt `json:"auth,omitempty"`
}
-func Assign(server string, grpcDialOption grpc.DialOption, primaryRequest *VolumeAssignRequest, alternativeRequests ...*VolumeAssignRequest) (*AssignResult, error) {
+func Assign(masterFn GetMasterFn, grpcDialOption grpc.DialOption, primaryRequest *VolumeAssignRequest, alternativeRequests ...*VolumeAssignRequest) (*AssignResult, error) {
var requests []*VolumeAssignRequest
requests = append(requests, primaryRequest)
@@ -44,17 +48,18 @@ func Assign(server string, grpcDialOption grpc.DialOption, primaryRequest *Volum
continue
}
- lastError = WithMasterServerClient(server, grpcDialOption, func(masterClient master_pb.SeaweedClient) error {
+ lastError = WithMasterServerClient(masterFn(), grpcDialOption, func(masterClient master_pb.SeaweedClient) error {
req := &master_pb.AssignRequest{
- Count: primaryRequest.Count,
- Replication: primaryRequest.Replication,
- Collection: primaryRequest.Collection,
- Ttl: primaryRequest.Ttl,
- DataCenter: primaryRequest.DataCenter,
- Rack: primaryRequest.Rack,
- DataNode: primaryRequest.DataNode,
- WritableVolumeCount: primaryRequest.WritableVolumeCount,
+ Count: request.Count,
+ Replication: request.Replication,
+ Collection: request.Collection,
+ Ttl: request.Ttl,
+ DiskType: request.DiskType,
+ DataCenter: request.DataCenter,
+ Rack: request.Rack,
+ DataNode: request.DataNode,
+ WritableVolumeCount: request.WritableVolumeCount,
}
resp, grpcErr := masterClient.Assign(context.Background(), req)
if grpcErr != nil {
@@ -81,6 +86,7 @@ func Assign(server string, grpcDialOption grpc.DialOption, primaryRequest *Volum
continue
}
+ break
}
return ret, lastError
@@ -99,3 +105,44 @@ func LookupJwt(master string, fileId string) security.EncodedJwt {
return security.EncodedJwt(tokenStr)
}
+
+type StorageOption struct {
+ Replication string
+ DiskType string
+ Collection string
+ DataCenter string
+ Rack string
+ TtlSeconds int32
+ Fsync bool
+ VolumeGrowthCount uint32
+}
+
+func (so *StorageOption) TtlString() string {
+ return needle.SecondsToTTL(so.TtlSeconds)
+}
+
+func (so *StorageOption) ToAssignRequests(count int) (ar *VolumeAssignRequest, altRequest *VolumeAssignRequest) {
+ ar = &VolumeAssignRequest{
+ Count: uint64(count),
+ Replication: so.Replication,
+ Collection: so.Collection,
+ Ttl: so.TtlString(),
+ DiskType: so.DiskType,
+ DataCenter: so.DataCenter,
+ Rack: so.Rack,
+ WritableVolumeCount: so.VolumeGrowthCount,
+ }
+ if so.DataCenter != "" || so.Rack != "" {
+ altRequest = &VolumeAssignRequest{
+ Count: uint64(count),
+ Replication: so.Replication,
+ Collection: so.Collection,
+ Ttl: so.TtlString(),
+ DiskType: so.DiskType,
+ DataCenter: "",
+ Rack: "",
+ WritableVolumeCount: so.VolumeGrowthCount,
+ }
+ }
+ return
+}
diff --git a/weed/operation/buffer_pool.go b/weed/operation/buffer_pool.go
new file mode 100644
index 000000000..9cbe4787f
--- /dev/null
+++ b/weed/operation/buffer_pool.go
@@ -0,0 +1,24 @@
+package operation
+
+import (
+ "github.com/valyala/bytebufferpool"
+ "sync/atomic"
+)
+
+var bufferCounter int64
+
+func GetBuffer() *bytebufferpool.ByteBuffer {
+ defer func() {
+ atomic.AddInt64(&bufferCounter, 1)
+ // println("+", bufferCounter)
+ }()
+ return bytebufferpool.Get()
+}
+
+func PutBuffer(buf *bytebufferpool.ByteBuffer) {
+ defer func() {
+ atomic.AddInt64(&bufferCounter, -1)
+ // println("-", bufferCounter)
+ }()
+ bytebufferpool.Put(buf)
+}
diff --git a/weed/operation/chunked_file.go b/weed/operation/chunked_file.go
index 295204dd8..8506e0518 100644
--- a/weed/operation/chunked_file.go
+++ b/weed/operation/chunked_file.go
@@ -8,11 +8,10 @@ import (
"io/ioutil"
"net/http"
"sort"
+ "sync"
"google.golang.org/grpc"
- "sync"
-
"github.com/chrislusf/seaweedfs/weed/glog"
"github.com/chrislusf/seaweedfs/weed/util"
)
@@ -41,23 +40,24 @@ type ChunkManifest struct {
// seekable chunked file reader
type ChunkedFileReader struct {
- Manifest *ChunkManifest
- Master string
- pos int64
- pr *io.PipeReader
- pw *io.PipeWriter
- mutex sync.Mutex
+ totalSize int64
+ chunkList []*ChunkInfo
+ master string
+ pos int64
+ pr *io.PipeReader
+ pw *io.PipeWriter
+ mutex sync.Mutex
}
func (s ChunkList) Len() int { return len(s) }
func (s ChunkList) Less(i, j int) bool { return s[i].Offset < s[j].Offset }
func (s ChunkList) Swap(i, j int) { s[i], s[j] = s[j], s[i] }
-func LoadChunkManifest(buffer []byte, isGzipped bool) (*ChunkManifest, error) {
- if isGzipped {
+func LoadChunkManifest(buffer []byte, isCompressed bool) (*ChunkManifest, error) {
+ if isCompressed {
var err error
- if buffer, err = util.UnGzipData(buffer); err != nil {
- return nil, err
+ if buffer, err = util.DecompressData(buffer); err != nil {
+ glog.V(0).Infof("fail to decompress chunk manifest: %v", err)
}
}
cm := ChunkManifest{}
@@ -72,12 +72,12 @@ func (cm *ChunkManifest) Marshal() ([]byte, error) {
return json.Marshal(cm)
}
-func (cm *ChunkManifest) DeleteChunks(master string, grpcDialOption grpc.DialOption) error {
+func (cm *ChunkManifest) DeleteChunks(masterFn GetMasterFn, usePublicUrl bool, grpcDialOption grpc.DialOption) error {
var fileIds []string
for _, ci := range cm.Chunks {
fileIds = append(fileIds, ci.Fid)
}
- results, err := DeleteFiles(master, grpcDialOption, fileIds)
+ results, err := DeleteFiles(masterFn, usePublicUrl, grpcDialOption, fileIds)
if err != nil {
glog.V(0).Infof("delete %+v: %v", fileIds, err)
return fmt.Errorf("chunk delete: %v", err)
@@ -126,16 +126,29 @@ func readChunkNeedle(fileUrl string, w io.Writer, offset int64) (written int64,
return io.Copy(w, resp.Body)
}
+func NewChunkedFileReader(chunkList []*ChunkInfo, master string) *ChunkedFileReader {
+ var totalSize int64
+ for _, chunk := range chunkList {
+ totalSize += chunk.Size
+ }
+ sort.Sort(ChunkList(chunkList))
+ return &ChunkedFileReader{
+ totalSize: totalSize,
+ chunkList: chunkList,
+ master: master,
+ }
+}
+
func (cf *ChunkedFileReader) Seek(offset int64, whence int) (int64, error) {
var err error
switch whence {
- case 0:
- case 1:
+ case io.SeekStart:
+ case io.SeekCurrent:
offset += cf.pos
- case 2:
- offset = cf.Manifest.Size - offset
+ case io.SeekEnd:
+ offset = cf.totalSize + offset
}
- if offset > cf.Manifest.Size {
+ if offset > cf.totalSize {
err = ErrInvalidRange
}
if cf.pos != offset {
@@ -146,10 +159,9 @@ func (cf *ChunkedFileReader) Seek(offset int64, whence int) (int64, error) {
}
func (cf *ChunkedFileReader) WriteTo(w io.Writer) (n int64, err error) {
- cm := cf.Manifest
chunkIndex := -1
chunkStartOffset := int64(0)
- for i, ci := range cm.Chunks {
+ for i, ci := range cf.chunkList {
if cf.pos >= ci.Offset && cf.pos < ci.Offset+ci.Size {
chunkIndex = i
chunkStartOffset = cf.pos - ci.Offset
@@ -159,10 +171,12 @@ func (cf *ChunkedFileReader) WriteTo(w io.Writer) (n int64, err error) {
if chunkIndex < 0 {
return n, ErrInvalidRange
}
- for ; chunkIndex < cm.Chunks.Len(); chunkIndex++ {
- ci := cm.Chunks[chunkIndex]
+ for ; chunkIndex < len(cf.chunkList); chunkIndex++ {
+ ci := cf.chunkList[chunkIndex]
// if we need read date from local volume server first?
- fileUrl, lookupError := LookupFileId(cf.Master, ci.Fid)
+ fileUrl, lookupError := LookupFileId(func() string {
+ return cf.master
+ }, ci.Fid)
if lookupError != nil {
return n, lookupError
}
diff --git a/weed/operation/delete_content.go b/weed/operation/delete_content.go
index 358399324..8f87882b1 100644
--- a/weed/operation/delete_content.go
+++ b/weed/operation/delete_content.go
@@ -4,12 +4,12 @@ import (
"context"
"errors"
"fmt"
- "github.com/chrislusf/seaweedfs/weed/glog"
- "github.com/chrislusf/seaweedfs/weed/pb/volume_server_pb"
"google.golang.org/grpc"
"net/http"
"strings"
"sync"
+
+ "github.com/chrislusf/seaweedfs/weed/pb/volume_server_pb"
)
type DeleteResult struct {
@@ -28,10 +28,18 @@ func ParseFileId(fid string) (vid string, key_cookie string, err error) {
}
// DeleteFiles batch deletes a list of fileIds
-func DeleteFiles(master string, grpcDialOption grpc.DialOption, fileIds []string) ([]*volume_server_pb.DeleteResult, error) {
-
- lookupFunc := func(vids []string) (map[string]LookupResult, error) {
- return LookupVolumeIds(master, grpcDialOption, vids)
+func DeleteFiles(masterFn GetMasterFn, usePublicUrl bool, grpcDialOption grpc.DialOption, fileIds []string) ([]*volume_server_pb.DeleteResult, error) {
+
+ lookupFunc := func(vids []string) (results map[string]LookupResult, err error) {
+ results, err = LookupVolumeIds(masterFn, grpcDialOption, vids)
+ if err == nil && usePublicUrl {
+ for _, result := range results {
+ for _, loc := range result.Locations {
+ loc.Url = loc.PublicUrl
+ }
+ }
+ }
+ return
}
return DeleteFilesWithLookupVolumeId(grpcDialOption, fileIds, lookupFunc)
@@ -92,9 +100,9 @@ func DeleteFilesWithLookupVolumeId(grpcDialOption grpc.DialOption, fileIds []str
go func(server string, fidList []string) {
defer wg.Done()
- if deleteResults, deleteErr := DeleteFilesAtOneVolumeServer(server, grpcDialOption, fidList); deleteErr != nil {
+ if deleteResults, deleteErr := DeleteFilesAtOneVolumeServer(server, grpcDialOption, fidList, true); deleteErr != nil {
err = deleteErr
- } else {
+ } else if deleteResults != nil {
resultChan <- deleteResults
}
@@ -107,18 +115,17 @@ func DeleteFilesWithLookupVolumeId(grpcDialOption grpc.DialOption, fileIds []str
ret = append(ret, result...)
}
- glog.V(1).Infof("deleted %d items", len(ret))
-
return ret, err
}
// DeleteFilesAtOneVolumeServer deletes a list of files that is on one volume server via gRpc
-func DeleteFilesAtOneVolumeServer(volumeServer string, grpcDialOption grpc.DialOption, fileIds []string) (ret []*volume_server_pb.DeleteResult, err error) {
+func DeleteFilesAtOneVolumeServer(volumeServer string, grpcDialOption grpc.DialOption, fileIds []string, includeCookie bool) (ret []*volume_server_pb.DeleteResult, err error) {
err = WithVolumeServerClient(volumeServer, grpcDialOption, func(volumeServerClient volume_server_pb.VolumeServerClient) error {
req := &volume_server_pb.BatchDeleteRequest{
- FileIds: fileIds,
+ FileIds: fileIds,
+ SkipCookieCheck: !includeCookie,
}
resp, err := volumeServerClient.BatchDelete(context.Background(), req)
diff --git a/weed/operation/grpc_client.go b/weed/operation/grpc_client.go
index f6b2b69e9..025a65b38 100644
--- a/weed/operation/grpc_client.go
+++ b/weed/operation/grpc_client.go
@@ -1,27 +1,27 @@
package operation
import (
- "context"
"fmt"
+ "strconv"
+ "strings"
+
+ "google.golang.org/grpc"
+
"github.com/chrislusf/seaweedfs/weed/glog"
+ "github.com/chrislusf/seaweedfs/weed/pb"
+ "github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
"github.com/chrislusf/seaweedfs/weed/pb/master_pb"
"github.com/chrislusf/seaweedfs/weed/pb/volume_server_pb"
- "github.com/chrislusf/seaweedfs/weed/util"
- "google.golang.org/grpc"
- "strconv"
- "strings"
)
func WithVolumeServerClient(volumeServer string, grpcDialOption grpc.DialOption, fn func(volume_server_pb.VolumeServerClient) error) error {
- ctx := context.Background()
-
grpcAddress, err := toVolumeServerGrpcAddress(volumeServer)
if err != nil {
- return err
+ return fmt.Errorf("failed to parse volume server %v: %v", volumeServer, err)
}
- return util.WithCachedGrpcClient(ctx, func(grpcConnection *grpc.ClientConn) error {
+ return pb.WithCachedGrpcClient(func(grpcConnection *grpc.ClientConn) error {
client := volume_server_pb.NewVolumeServerClient(grpcConnection)
return fn(client)
}, grpcAddress, grpcDialOption)
@@ -40,16 +40,28 @@ func toVolumeServerGrpcAddress(volumeServer string) (grpcAddress string, err err
func WithMasterServerClient(masterServer string, grpcDialOption grpc.DialOption, fn func(masterClient master_pb.SeaweedClient) error) error {
- ctx := context.Background()
-
- masterGrpcAddress, parseErr := util.ParseServerToGrpcAddress(masterServer)
+ masterGrpcAddress, parseErr := pb.ParseServerToGrpcAddress(masterServer)
if parseErr != nil {
- return fmt.Errorf("failed to parse master grpc %v: %v", masterServer, parseErr)
+ return fmt.Errorf("failed to parse master %v: %v", masterServer, parseErr)
}
- return util.WithCachedGrpcClient(ctx, func(grpcConnection *grpc.ClientConn) error {
+ return pb.WithCachedGrpcClient(func(grpcConnection *grpc.ClientConn) error {
client := master_pb.NewSeaweedClient(grpcConnection)
return fn(client)
}, masterGrpcAddress, grpcDialOption)
}
+
+func WithFilerServerClient(filerServer string, grpcDialOption grpc.DialOption, fn func(masterClient filer_pb.SeaweedFilerClient) error) error {
+
+ filerGrpcAddress, parseErr := pb.ParseServerToGrpcAddress(filerServer)
+ if parseErr != nil {
+ return fmt.Errorf("failed to parse filer %v: %v", filerGrpcAddress, parseErr)
+ }
+
+ return pb.WithCachedGrpcClient(func(grpcConnection *grpc.ClientConn) error {
+ client := filer_pb.NewSeaweedFilerClient(grpcConnection)
+ return fn(client)
+ }, filerGrpcAddress, grpcDialOption)
+
+}
diff --git a/weed/operation/lookup.go b/weed/operation/lookup.go
index d0773e7fd..0372e47b0 100644
--- a/weed/operation/lookup.go
+++ b/weed/operation/lookup.go
@@ -33,10 +33,10 @@ var (
vc VidCache // caching of volume locations, re-check if after 10 minutes
)
-func Lookup(server string, vid string) (ret *LookupResult, err error) {
+func Lookup(masterFn GetMasterFn, vid string) (ret *LookupResult, err error) {
locations, cache_err := vc.Get(vid)
if cache_err != nil {
- if ret, err = do_lookup(server, vid); err == nil {
+ if ret, err = do_lookup(masterFn, vid); err == nil {
vc.Set(vid, ret.Locations, 10*time.Minute)
}
} else {
@@ -45,9 +45,10 @@ func Lookup(server string, vid string) (ret *LookupResult, err error) {
return
}
-func do_lookup(server string, vid string) (*LookupResult, error) {
+func do_lookup(masterFn GetMasterFn, vid string) (*LookupResult, error) {
values := make(url.Values)
values.Add("volumeId", vid)
+ server := masterFn()
jsonBlob, err := util.Post("http://"+server+"/dir/lookup", values)
if err != nil {
return nil, err
@@ -63,12 +64,12 @@ func do_lookup(server string, vid string) (*LookupResult, error) {
return &ret, nil
}
-func LookupFileId(server string, fileId string) (fullUrl string, err error) {
+func LookupFileId(masterFn GetMasterFn, fileId string) (fullUrl string, err error) {
parts := strings.Split(fileId, ",")
if len(parts) != 2 {
return "", errors.New("Invalid fileId " + fileId)
}
- lookup, lookupError := Lookup(server, parts[0])
+ lookup, lookupError := Lookup(masterFn, parts[0])
if lookupError != nil {
return "", lookupError
}
@@ -79,7 +80,7 @@ func LookupFileId(server string, fileId string) (fullUrl string, err error) {
}
// LookupVolumeIds find volume locations by cache and actual lookup
-func LookupVolumeIds(server string, grpcDialOption grpc.DialOption, vids []string) (map[string]LookupResult, error) {
+func LookupVolumeIds(masterFn GetMasterFn, grpcDialOption grpc.DialOption, vids []string) (map[string]LookupResult, error) {
ret := make(map[string]LookupResult)
var unknown_vids []string
@@ -99,7 +100,7 @@ func LookupVolumeIds(server string, grpcDialOption grpc.DialOption, vids []strin
//only query unknown_vids
- err := WithMasterServerClient(server, grpcDialOption, func(masterClient master_pb.SeaweedClient) error {
+ err := WithMasterServerClient(masterFn(), grpcDialOption, func(masterClient master_pb.SeaweedClient) error {
req := &master_pb.LookupVolumeRequest{
VolumeIds: unknown_vids,
diff --git a/weed/operation/needle_parse_test.go b/weed/operation/needle_parse_test.go
new file mode 100644
index 000000000..202374e1b
--- /dev/null
+++ b/weed/operation/needle_parse_test.go
@@ -0,0 +1,131 @@
+package operation
+
+import (
+ "bytes"
+ "fmt"
+ "io"
+ "net/http"
+ "testing"
+
+ "github.com/stretchr/testify/assert"
+
+ "github.com/chrislusf/seaweedfs/weed/storage/needle"
+ "github.com/chrislusf/seaweedfs/weed/util"
+)
+
+type MockClient struct {
+ needleHandling func(n *needle.Needle, originalSize int, e error)
+}
+
+func (m *MockClient) Do(req *http.Request) (*http.Response, error) {
+ n, originalSize, _, err := needle.CreateNeedleFromRequest(req, false, 1024*1024)
+ if m.needleHandling != nil {
+ m.needleHandling(n, originalSize, err)
+ }
+ return &http.Response{
+ StatusCode: http.StatusNoContent,
+ }, io.EOF
+}
+
+/*
+
+The mime type is always the value passed in.
+
+Compress or not depends on the content detection, file name extension, and compression ratio.
+
+If the content is already compressed, need to know the content size.
+
+*/
+
+func TestCreateNeedleFromRequest(t *testing.T) {
+ mc := &MockClient{}
+ tmp := HttpClient
+ HttpClient = mc
+ defer func() {
+ HttpClient = tmp
+ }()
+
+ {
+ mc.needleHandling = func(n *needle.Needle, originalSize int, err error) {
+ assert.Equal(t, nil, err, "upload: %v", err)
+ assert.Equal(t, "", string(n.Mime), "mime detection failed: %v", string(n.Mime))
+ assert.Equal(t, true, n.IsCompressed(), "this should be compressed")
+ assert.Equal(t, true, util.IsGzippedContent(n.Data), "this should be gzip")
+ fmt.Printf("needle: %v, originalSize: %d\n", n, originalSize)
+ }
+ uploadResult, err, data := Upload("http://localhost:8080/389,0f084d17353afda0", "t.txt", false, bytes.NewReader([]byte(textContent)), false, "", nil, "")
+ if len(data) != len(textContent) {
+ t.Errorf("data actual %d expected %d", len(data), len(textContent))
+ }
+ if err != nil {
+ fmt.Printf("err: %v\n", err)
+ }
+ fmt.Printf("uploadResult: %+v\n", uploadResult)
+ }
+
+ {
+ mc.needleHandling = func(n *needle.Needle, originalSize int, err error) {
+ assert.Equal(t, nil, err, "upload: %v", err)
+ assert.Equal(t, "text/plain", string(n.Mime), "mime detection failed: %v", string(n.Mime))
+ assert.Equal(t, true, n.IsCompressed(), "this should be compressed")
+ assert.Equal(t, true, util.IsGzippedContent(n.Data), "this should be gzip")
+ fmt.Printf("needle: %v, dataSize:%d originalSize:%d\n", n, len(n.Data), originalSize)
+ }
+ gzippedData, _ := util.GzipData([]byte(textContent))
+ Upload("http://localhost:8080/389,0f084d17353afda0", "t.txt", false, bytes.NewReader(gzippedData), true, "text/plain", nil, "")
+ }
+
+ /*
+ {
+ mc.needleHandling = func(n *needle.Needle, originalSize int, err error) {
+ assert.Equal(t, nil, err, "upload: %v", err)
+ assert.Equal(t, "text/plain", string(n.Mime), "mime detection failed: %v", string(n.Mime))
+ assert.Equal(t, true, n.IsCompressed(), "this should be compressed")
+ assert.Equal(t, true, util.IsZstdContent(n.Data), "this should be zstd")
+ fmt.Printf("needle: %v, dataSize:%d originalSize:%d\n", n, len(n.Data), originalSize)
+ }
+ zstdData, _ := util.ZstdData([]byte(textContent))
+ Upload("http://localhost:8080/389,0f084d17353afda0", "t.txt", false, bytes.NewReader(zstdData), true, "text/plain", nil, "")
+ }
+
+ {
+ mc.needleHandling = func(n *needle.Needle, originalSize int, err error) {
+ assert.Equal(t, nil, err, "upload: %v", err)
+ assert.Equal(t, "application/zstd", string(n.Mime), "mime detection failed: %v", string(n.Mime))
+ assert.Equal(t, false, n.IsCompressed(), "this should not be compressed")
+ assert.Equal(t, true, util.IsZstdContent(n.Data), "this should still be zstd")
+ fmt.Printf("needle: %v, dataSize:%d originalSize:%d\n", n, len(n.Data), originalSize)
+ }
+ zstdData, _ := util.ZstdData([]byte(textContent))
+ Upload("http://localhost:8080/389,0f084d17353afda0", "t.txt", false, bytes.NewReader(zstdData), false, "application/zstd", nil, "")
+ }
+ */
+
+}
+
+var textContent = `Redistribution and use in source and binary forms, with or without
+modification, are permitted provided that the following conditions are
+met:
+
+ * Redistributions of source code must retain the above copyright
+notice, this list of conditions and the following disclaimer.
+ * Redistributions in binary form must reproduce the above
+copyright notice, this list of conditions and the following disclaimer
+in the documentation and/or other materials provided with the
+distribution.
+ * Neither the name of Google Inc. nor the names of its
+contributors may be used to endorse or promote products derived from
+this software without specific prior written permission.
+
+THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+"AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+(INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+`
diff --git a/weed/operation/stats.go b/weed/operation/stats.go
deleted file mode 100644
index b69a33750..000000000
--- a/weed/operation/stats.go
+++ /dev/null
@@ -1,26 +0,0 @@
-package operation
-
-import (
- "context"
- "google.golang.org/grpc"
-
- "github.com/chrislusf/seaweedfs/weed/pb/master_pb"
-)
-
-func Statistics(server string, grpcDialOption grpc.DialOption, req *master_pb.StatisticsRequest) (resp *master_pb.StatisticsResponse, err error) {
-
- err = WithMasterServerClient(server, grpcDialOption, func(masterClient master_pb.SeaweedClient) error {
-
- grpcResponse, grpcErr := masterClient.Statistics(context.Background(), req)
- if grpcErr != nil {
- return grpcErr
- }
-
- resp = grpcResponse
-
- return nil
-
- })
-
- return
-}
diff --git a/weed/operation/submit.go b/weed/operation/submit.go
index 62f067430..87c5e4279 100644
--- a/weed/operation/submit.go
+++ b/weed/operation/submit.go
@@ -1,8 +1,6 @@
package operation
import (
- "bytes"
- "google.golang.org/grpc"
"io"
"mime"
"net/url"
@@ -11,6 +9,8 @@ import (
"strconv"
"strings"
+ "google.golang.org/grpc"
+
"github.com/chrislusf/seaweedfs/weed/glog"
"github.com/chrislusf/seaweedfs/weed/security"
)
@@ -25,20 +25,23 @@ type FilePart struct {
Collection string
DataCenter string
Ttl string
+ DiskType string
Server string //this comes from assign result
Fid string //this comes from assign result, but customizable
+ Fsync bool
}
type SubmitResult struct {
FileName string `json:"fileName,omitempty"`
- FileUrl string `json:"fileUrl,omitempty"`
+ FileUrl string `json:"url,omitempty"`
Fid string `json:"fid,omitempty"`
Size uint32 `json:"size,omitempty"`
Error string `json:"error,omitempty"`
}
-func SubmitFiles(master string, grpcDialOption grpc.DialOption, files []FilePart,
- replication string, collection string, dataCenter string, ttl string, maxMB int) ([]SubmitResult, error) {
+type GetMasterFn func() string
+
+func SubmitFiles(masterFn GetMasterFn, grpcDialOption grpc.DialOption, files []FilePart, replication string, collection string, dataCenter string, ttl string, diskType string, maxMB int, usePublicUrl bool) ([]SubmitResult, error) {
results := make([]SubmitResult, len(files))
for index, file := range files {
results[index].FileName = file.FileName
@@ -49,10 +52,11 @@ func SubmitFiles(master string, grpcDialOption grpc.DialOption, files []FilePart
Collection: collection,
DataCenter: dataCenter,
Ttl: ttl,
+ DiskType: diskType,
}
- ret, err := Assign(master, grpcDialOption, ar)
+ ret, err := Assign(masterFn, grpcDialOption, ar)
if err != nil {
- for index, _ := range files {
+ for index := range files {
results[index].Error = err.Error()
}
return results, err
@@ -63,10 +67,15 @@ func SubmitFiles(master string, grpcDialOption grpc.DialOption, files []FilePart
file.Fid = file.Fid + "_" + strconv.Itoa(index)
}
file.Server = ret.Url
+ if usePublicUrl {
+ file.Server = ret.PublicUrl
+ }
file.Replication = replication
file.Collection = collection
file.DataCenter = dataCenter
- results[index].Size, err = file.Upload(maxMB, master, ret.Auth, grpcDialOption)
+ file.Ttl = ttl
+ file.DiskType = diskType
+ results[index].Size, err = file.Upload(maxMB, masterFn, usePublicUrl, ret.Auth, grpcDialOption)
if err != nil {
results[index].Error = err.Error()
}
@@ -109,11 +118,14 @@ func newFilePart(fullPathFilename string) (ret FilePart, err error) {
return ret, nil
}
-func (fi FilePart) Upload(maxMB int, master string, jwt security.EncodedJwt, grpcDialOption grpc.DialOption) (retSize uint32, err error) {
+func (fi FilePart) Upload(maxMB int, masterFn GetMasterFn, usePublicUrl bool, jwt security.EncodedJwt, grpcDialOption grpc.DialOption) (retSize uint32, err error) {
fileUrl := "http://" + fi.Server + "/" + fi.Fid
if fi.ModTime != 0 {
fileUrl += "?ts=" + strconv.Itoa(int(fi.ModTime))
}
+ if fi.Fsync {
+ fileUrl += "?fsync=true"
+ }
if closer, ok := fi.Reader.(io.Closer); ok {
defer closer.Close()
}
@@ -136,8 +148,9 @@ func (fi FilePart) Upload(maxMB int, master string, jwt security.EncodedJwt, grp
Replication: fi.Replication,
Collection: fi.Collection,
Ttl: fi.Ttl,
+ DiskType: fi.DiskType,
}
- ret, err = Assign(master, grpcDialOption, ar)
+ ret, err = Assign(masterFn, grpcDialOption, ar)
if err != nil {
return
}
@@ -149,11 +162,12 @@ func (fi FilePart) Upload(maxMB int, master string, jwt security.EncodedJwt, grp
Replication: fi.Replication,
Collection: fi.Collection,
Ttl: fi.Ttl,
+ DiskType: fi.DiskType,
}
- ret, err = Assign(master, grpcDialOption, ar)
+ ret, err = Assign(masterFn, grpcDialOption, ar)
if err != nil {
// delete all uploaded chunks
- cm.DeleteChunks(master, grpcDialOption)
+ cm.DeleteChunks(masterFn, usePublicUrl, grpcDialOption)
return
}
id = ret.Fid
@@ -164,14 +178,17 @@ func (fi FilePart) Upload(maxMB int, master string, jwt security.EncodedJwt, grp
}
}
fileUrl := "http://" + ret.Url + "/" + id
+ if usePublicUrl {
+ fileUrl = "http://" + ret.PublicUrl + "/" + id
+ }
count, e := upload_one_chunk(
baseName+"-"+strconv.FormatInt(i+1, 10),
io.LimitReader(fi.Reader, chunkSize),
- master, fileUrl,
+ masterFn, fileUrl,
ret.Auth)
if e != nil {
// delete all uploaded chunks
- cm.DeleteChunks(master, grpcDialOption)
+ cm.DeleteChunks(masterFn, usePublicUrl, grpcDialOption)
return 0, e
}
cm.Chunks = append(cm.Chunks,
@@ -186,10 +203,10 @@ func (fi FilePart) Upload(maxMB int, master string, jwt security.EncodedJwt, grp
err = upload_chunked_file_manifest(fileUrl, &cm, jwt)
if err != nil {
// delete all uploaded chunks
- cm.DeleteChunks(master, grpcDialOption)
+ cm.DeleteChunks(masterFn, usePublicUrl, grpcDialOption)
}
} else {
- ret, e := Upload(fileUrl, baseName, fi.Reader, false, fi.MimeType, nil, jwt)
+ ret, e, _ := Upload(fileUrl, baseName, false, fi.Reader, false, fi.MimeType, nil, jwt)
if e != nil {
return 0, e
}
@@ -198,12 +215,11 @@ func (fi FilePart) Upload(maxMB int, master string, jwt security.EncodedJwt, grp
return
}
-func upload_one_chunk(filename string, reader io.Reader, master,
+func upload_one_chunk(filename string, reader io.Reader, masterFn GetMasterFn,
fileUrl string, jwt security.EncodedJwt,
) (size uint32, e error) {
glog.V(4).Info("Uploading part ", filename, " to ", fileUrl, "...")
- uploadResult, uploadError := Upload(fileUrl, filename, reader, false,
- "", nil, jwt)
+ uploadResult, uploadError, _ := Upload(fileUrl, filename, false, reader, false, "", nil, jwt)
if uploadError != nil {
return 0, uploadError
}
@@ -215,12 +231,11 @@ func upload_chunked_file_manifest(fileUrl string, manifest *ChunkManifest, jwt s
if e != nil {
return e
}
- bufReader := bytes.NewReader(buf)
glog.V(4).Info("Uploading chunks manifest ", manifest.Name, " to ", fileUrl, "...")
u, _ := url.Parse(fileUrl)
q := u.Query()
q.Set("cm", "true")
u.RawQuery = q.Encode()
- _, e = Upload(u.String(), manifest.Name, bufReader, false, "application/json", nil, jwt)
+ _, e = UploadData(u.String(), manifest.Name, false, buf, false, "application/json", nil, jwt)
return e
}
diff --git a/weed/operation/tail_volume.go b/weed/operation/tail_volume.go
index b53f18ce1..045948274 100644
--- a/weed/operation/tail_volume.go
+++ b/weed/operation/tail_volume.go
@@ -5,14 +5,15 @@ import (
"fmt"
"io"
+ "google.golang.org/grpc"
+
"github.com/chrislusf/seaweedfs/weed/pb/volume_server_pb"
"github.com/chrislusf/seaweedfs/weed/storage/needle"
- "google.golang.org/grpc"
)
-func TailVolume(master string, grpcDialOption grpc.DialOption, vid needle.VolumeId, sinceNs uint64, timeoutSeconds int, fn func(n *needle.Needle) error) error {
+func TailVolume(masterFn GetMasterFn, grpcDialOption grpc.DialOption, vid needle.VolumeId, sinceNs uint64, timeoutSeconds int, fn func(n *needle.Needle) error) error {
// find volume location, replication, ttl info
- lookup, err := Lookup(master, vid.String())
+ lookup, err := Lookup(masterFn, vid.String())
if err != nil {
return fmt.Errorf("look up volume %d: %v", vid, err)
}
@@ -27,8 +28,10 @@ func TailVolume(master string, grpcDialOption grpc.DialOption, vid needle.Volume
func TailVolumeFromSource(volumeServer string, grpcDialOption grpc.DialOption, vid needle.VolumeId, sinceNs uint64, idleTimeoutSeconds int, fn func(n *needle.Needle) error) error {
return WithVolumeServerClient(volumeServer, grpcDialOption, func(client volume_server_pb.VolumeServerClient) error {
+ ctx, cancel := context.WithCancel(context.Background())
+ defer cancel()
- stream, err := client.VolumeTailSender(context.Background(), &volume_server_pb.VolumeTailSenderRequest{
+ stream, err := client.VolumeTailSender(ctx, &volume_server_pb.VolumeTailSenderRequest{
VolumeId: uint32(vid),
SinceNs: sinceNs,
IdleTimeoutSeconds: uint32(idleTimeoutSeconds),
diff --git a/weed/operation/upload_content.go b/weed/operation/upload_content.go
index c387d0230..944186eeb 100644
--- a/weed/operation/upload_content.go
+++ b/weed/operation/upload_content.go
@@ -2,10 +2,7 @@ package operation
import (
"bytes"
- "compress/flate"
- "compress/gzip"
"encoding/json"
- "errors"
"fmt"
"io"
"io/ioutil"
@@ -15,73 +12,188 @@ import (
"net/textproto"
"path/filepath"
"strings"
+ "time"
"github.com/chrislusf/seaweedfs/weed/glog"
+ "github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
"github.com/chrislusf/seaweedfs/weed/security"
"github.com/chrislusf/seaweedfs/weed/util"
)
type UploadResult struct {
- Name string `json:"name,omitempty"`
- Size uint32 `json:"size,omitempty"`
- Error string `json:"error,omitempty"`
- ETag string `json:"eTag,omitempty"`
+ Name string `json:"name,omitempty"`
+ Size uint32 `json:"size,omitempty"`
+ Error string `json:"error,omitempty"`
+ ETag string `json:"eTag,omitempty"`
+ CipherKey []byte `json:"cipherKey,omitempty"`
+ Mime string `json:"mime,omitempty"`
+ Gzip uint32 `json:"gzip,omitempty"`
+ ContentMd5 string `json:"contentMd5,omitempty"`
+ RetryCount int `json:"-"`
+}
+
+func (uploadResult *UploadResult) ToPbFileChunk(fileId string, offset int64) *filer_pb.FileChunk {
+ fid, _ := filer_pb.ToFileIdObject(fileId)
+ return &filer_pb.FileChunk{
+ FileId: fileId,
+ Offset: offset,
+ Size: uint64(uploadResult.Size),
+ Mtime: time.Now().UnixNano(),
+ ETag: uploadResult.ETag,
+ CipherKey: uploadResult.CipherKey,
+ IsCompressed: uploadResult.Gzip > 0,
+ Fid: fid,
+ }
+}
+
+// HTTPClient interface for testing
+type HTTPClient interface {
+ Do(req *http.Request) (*http.Response, error)
}
var (
- client *http.Client
+ HttpClient HTTPClient
)
func init() {
- client = &http.Client{Transport: &http.Transport{
+ HttpClient = &http.Client{Transport: &http.Transport{
+ MaxIdleConns: 1024,
MaxIdleConnsPerHost: 1024,
}}
}
-var fileNameEscaper = strings.NewReplacer("\\", "\\\\", "\"", "\\\"")
+var fileNameEscaper = strings.NewReplacer(`\`, `\\`, `"`, `\"`)
// Upload sends a POST request to a volume server to upload the content with adjustable compression level
-func UploadWithLocalCompressionLevel(uploadUrl string, filename string, reader io.Reader, isGzipped bool, mtype string, pairMap map[string]string, jwt security.EncodedJwt, compressionLevel int) (*UploadResult, error) {
- if compressionLevel < 1 {
- compressionLevel = 1
- }
- if compressionLevel > 9 {
- compressionLevel = 9
- }
- return doUpload(uploadUrl, filename, reader, isGzipped, mtype, pairMap, compressionLevel, jwt)
+func UploadData(uploadUrl string, filename string, cipher bool, data []byte, isInputCompressed bool, mtype string, pairMap map[string]string, jwt security.EncodedJwt) (uploadResult *UploadResult, err error) {
+ uploadResult, err = retriedUploadData(uploadUrl, filename, cipher, data, isInputCompressed, mtype, pairMap, jwt)
+ return
}
// Upload sends a POST request to a volume server to upload the content with fast compression
-func Upload(uploadUrl string, filename string, reader io.Reader, isGzipped bool, mtype string, pairMap map[string]string, jwt security.EncodedJwt) (*UploadResult, error) {
- return doUpload(uploadUrl, filename, reader, isGzipped, mtype, pairMap, flate.BestSpeed, jwt)
+func Upload(uploadUrl string, filename string, cipher bool, reader io.Reader, isInputCompressed bool, mtype string, pairMap map[string]string, jwt security.EncodedJwt) (uploadResult *UploadResult, err error, data []byte) {
+ uploadResult, err, data = doUpload(uploadUrl, filename, cipher, reader, isInputCompressed, mtype, pairMap, jwt)
+ return
+}
+
+func doUpload(uploadUrl string, filename string, cipher bool, reader io.Reader, isInputCompressed bool, mtype string, pairMap map[string]string, jwt security.EncodedJwt) (uploadResult *UploadResult, err error, data []byte) {
+ bytesReader, ok := reader.(*util.BytesReader)
+ if ok {
+ data = bytesReader.Bytes
+ } else {
+ data, err = ioutil.ReadAll(reader)
+ if err != nil {
+ err = fmt.Errorf("read input: %v", err)
+ return
+ }
+ }
+ uploadResult, uploadErr := retriedUploadData(uploadUrl, filename, cipher, data, isInputCompressed, mtype, pairMap, jwt)
+ return uploadResult, uploadErr, data
}
-func doUpload(uploadUrl string, filename string, reader io.Reader, isGzipped bool, mtype string, pairMap map[string]string, compression int, jwt security.EncodedJwt) (*UploadResult, error) {
- contentIsGzipped := isGzipped
+func retriedUploadData(uploadUrl string, filename string, cipher bool, data []byte, isInputCompressed bool, mtype string, pairMap map[string]string, jwt security.EncodedJwt) (uploadResult *UploadResult, err error) {
+ for i := 0; i < 3; i++ {
+ uploadResult, err = doUploadData(uploadUrl, filename, cipher, data, isInputCompressed, mtype, pairMap, jwt)
+ if err == nil {
+ uploadResult.RetryCount = i
+ return
+ } else {
+ glog.Warningf("uploading to %s: %v", uploadUrl, err)
+ }
+ time.Sleep(time.Millisecond * time.Duration(237*(i+1)))
+ }
+ return
+}
+
+func doUploadData(uploadUrl string, filename string, cipher bool, data []byte, isInputCompressed bool, mtype string, pairMap map[string]string, jwt security.EncodedJwt) (uploadResult *UploadResult, err error) {
+ contentIsGzipped := isInputCompressed
shouldGzipNow := false
- if !isGzipped {
- if shouldBeZipped, iAmSure := util.IsGzippableFileType(filepath.Base(filename), mtype); iAmSure && shouldBeZipped {
+ if !isInputCompressed {
+ if mtype == "" {
+ mtype = http.DetectContentType(data)
+ // println("detect1 mimetype to", mtype)
+ if mtype == "application/octet-stream" {
+ mtype = ""
+ }
+ }
+ if shouldBeCompressed, iAmSure := util.IsCompressableFileType(filepath.Base(filename), mtype); iAmSure && shouldBeCompressed {
shouldGzipNow = true
+ } else if !iAmSure && mtype == "" && len(data) > 16*1024 {
+ var compressed []byte
+ compressed, err = util.GzipData(data[0:128])
+ shouldGzipNow = len(compressed)*10 < 128*9 // can not compress to less than 90%
+ }
+ }
+
+ var clearDataLen int
+
+ // gzip if possible
+ // this could be double copying
+ clearDataLen = len(data)
+ clearData := data
+ if shouldGzipNow && !cipher {
+ compressed, compressErr := util.GzipData(data)
+ // fmt.Printf("data is compressed from %d ==> %d\n", len(data), len(compressed))
+ if compressErr == nil {
+ data = compressed
contentIsGzipped = true
}
+ } else if isInputCompressed {
+ // just to get the clear data length
+ clearData, err = util.DecompressData(data)
+ if err == nil {
+ clearDataLen = len(clearData)
+ }
}
- return upload_content(uploadUrl, func(w io.Writer) (err error) {
- if shouldGzipNow {
- gzWriter, _ := gzip.NewWriterLevel(w, compression)
- _, err = io.Copy(gzWriter, reader)
- gzWriter.Close()
- } else {
- _, err = io.Copy(w, reader)
+
+ if cipher {
+ // encrypt(gzip(data))
+
+ // encrypt
+ cipherKey := util.GenCipherKey()
+ encryptedData, encryptionErr := util.Encrypt(clearData, cipherKey)
+ if encryptionErr != nil {
+ err = fmt.Errorf("encrypt input: %v", encryptionErr)
+ return
}
- return
- }, filename, contentIsGzipped, mtype, pairMap, jwt)
+
+ // upload data
+ uploadResult, err = upload_content(uploadUrl, func(w io.Writer) (err error) {
+ _, err = w.Write(encryptedData)
+ return
+ }, "", false, len(encryptedData), "", nil, jwt)
+ if uploadResult == nil {
+ return
+ }
+ uploadResult.Name = filename
+ uploadResult.Mime = mtype
+ uploadResult.CipherKey = cipherKey
+ uploadResult.Size = uint32(clearDataLen)
+ } else {
+ // upload data
+ uploadResult, err = upload_content(uploadUrl, func(w io.Writer) (err error) {
+ _, err = w.Write(data)
+ return
+ }, filename, contentIsGzipped, len(data), mtype, pairMap, jwt)
+ if uploadResult == nil {
+ return
+ }
+ uploadResult.Size = uint32(clearDataLen)
+ if contentIsGzipped {
+ uploadResult.Gzip = 1
+ }
+ }
+
+ return uploadResult, err
}
-func upload_content(uploadUrl string, fillBufferFunction func(w io.Writer) error, filename string, isGzipped bool, mtype string, pairMap map[string]string, jwt security.EncodedJwt) (*UploadResult, error) {
- body_buf := bytes.NewBufferString("")
- body_writer := multipart.NewWriter(body_buf)
+func upload_content(uploadUrl string, fillBufferFunction func(w io.Writer) error, filename string, isGzipped bool, originalDataSize int, mtype string, pairMap map[string]string, jwt security.EncodedJwt) (*UploadResult, error) {
+ buf := GetBuffer()
+ defer PutBuffer(buf)
+ body_writer := multipart.NewWriter(buf)
h := make(textproto.MIMEHeader)
h.Set("Content-Disposition", fmt.Sprintf(`form-data; name="file"; filename="%s"`, fileNameEscaper.Replace(filename)))
+ h.Set("Idempotency-Key", uploadUrl)
if mtype == "" {
mtype = mime.TypeByExtension(strings.ToLower(filepath.Ext(filename)))
}
@@ -107,10 +219,10 @@ func upload_content(uploadUrl string, fillBufferFunction func(w io.Writer) error
return nil, err
}
- req, postErr := http.NewRequest("POST", uploadUrl, body_buf)
+ req, postErr := http.NewRequest("POST", uploadUrl, bytes.NewReader(buf.Bytes()))
if postErr != nil {
- glog.V(0).Infoln("failing to upload to", uploadUrl, postErr.Error())
- return nil, postErr
+ glog.V(1).Infof("create upload request %s: %v", uploadUrl, postErr)
+ return nil, fmt.Errorf("create upload request %s: %v", uploadUrl, postErr)
}
req.Header.Set("Content-Type", content_type)
for k, v := range pairMap {
@@ -119,27 +231,42 @@ func upload_content(uploadUrl string, fillBufferFunction func(w io.Writer) error
if jwt != "" {
req.Header.Set("Authorization", "BEARER "+string(jwt))
}
- resp, post_err := client.Do(req)
+ // print("+")
+ resp, post_err := HttpClient.Do(req)
if post_err != nil {
- glog.V(0).Infoln("failing to upload to", uploadUrl, post_err.Error())
- return nil, post_err
+ if strings.Contains(post_err.Error(), "connection reset by peer") ||
+ strings.Contains(post_err.Error(), "use of closed network connection") {
+ resp, post_err = HttpClient.Do(req)
+ }
}
- defer resp.Body.Close()
+ if post_err != nil {
+ return nil, fmt.Errorf("upload %s %d bytes to %v: %v", filename, originalDataSize, uploadUrl, post_err)
+ }
+ // print("-")
+ defer util.CloseResponse(resp)
+
+ var ret UploadResult
etag := getEtag(resp)
+ if resp.StatusCode == http.StatusNoContent {
+ ret.ETag = etag
+ return &ret, nil
+ }
+
resp_body, ra_err := ioutil.ReadAll(resp.Body)
if ra_err != nil {
- return nil, ra_err
+ return nil, fmt.Errorf("read response body %v: %v", uploadUrl, ra_err)
}
- var ret UploadResult
+
unmarshal_err := json.Unmarshal(resp_body, &ret)
if unmarshal_err != nil {
- glog.V(0).Infoln("failing to read upload response", uploadUrl, string(resp_body))
- return nil, unmarshal_err
+ glog.Errorf("unmarshal %s: %v", uploadUrl, string(resp_body))
+ return nil, fmt.Errorf("unmarshal %v: %v", uploadUrl, unmarshal_err)
}
if ret.Error != "" {
- return nil, errors.New(ret.Error)
+ return nil, fmt.Errorf("unmarshalled error %v: %v", uploadUrl, ret.Error)
}
ret.ETag = etag
+ ret.ContentMd5 = resp.Header.Get("Content-MD5")
return &ret, nil
}