aboutsummaryrefslogtreecommitdiff
path: root/weed/operation
diff options
context:
space:
mode:
authorshibinbin <shibinbin@megvii.com>2020-06-04 17:24:18 +0800
committershibinbin <shibinbin@megvii.com>2020-06-04 17:24:18 +0800
commit40334bc28d3fa694ce59b4e65077efb845264d20 (patch)
treea085e2e33851c4d916bef2952abc7cfbfe95ee88 /weed/operation
parentd892cad15d748327c2b7c649f6398ff35d8dce0b (diff)
parentfbed2e9026b71c810dd86bd826c9e068e93d3c48 (diff)
downloadseaweedfs-40334bc28d3fa694ce59b4e65077efb845264d20.tar.xz
seaweedfs-40334bc28d3fa694ce59b4e65077efb845264d20.zip
Merge remote-tracking branch 'upstream/master'
Diffstat (limited to 'weed/operation')
-rw-r--r--weed/operation/assign_file_id.go8
-rw-r--r--weed/operation/chunked_file.go52
-rw-r--r--weed/operation/delete_content.go28
-rw-r--r--weed/operation/grpc_client.go48
-rw-r--r--weed/operation/lookup.go4
-rw-r--r--weed/operation/stats.go26
-rw-r--r--weed/operation/submit.go34
-rw-r--r--weed/operation/sync_volume.go4
-rw-r--r--weed/operation/tail_volume.go7
-rw-r--r--weed/operation/upload_content.go149
10 files changed, 233 insertions, 127 deletions
diff --git a/weed/operation/assign_file_id.go b/weed/operation/assign_file_id.go
index b67d8b708..893bf516c 100644
--- a/weed/operation/assign_file_id.go
+++ b/weed/operation/assign_file_id.go
@@ -3,11 +3,13 @@ package operation
import (
"context"
"fmt"
+ "strings"
+
+ "google.golang.org/grpc"
+
"github.com/chrislusf/seaweedfs/weed/pb/master_pb"
"github.com/chrislusf/seaweedfs/weed/security"
"github.com/chrislusf/seaweedfs/weed/util"
- "google.golang.org/grpc"
- "strings"
)
type VolumeAssignRequest struct {
@@ -44,7 +46,7 @@ func Assign(server string, grpcDialOption grpc.DialOption, primaryRequest *Volum
continue
}
- lastError = WithMasterServerClient(server, grpcDialOption, func(ctx context.Context, masterClient master_pb.SeaweedClient) error {
+ lastError = WithMasterServerClient(server, grpcDialOption, func(masterClient master_pb.SeaweedClient) error {
req := &master_pb.AssignRequest{
Count: primaryRequest.Count,
diff --git a/weed/operation/chunked_file.go b/weed/operation/chunked_file.go
index 295204dd8..baa0038c4 100644
--- a/weed/operation/chunked_file.go
+++ b/weed/operation/chunked_file.go
@@ -8,11 +8,10 @@ import (
"io/ioutil"
"net/http"
"sort"
+ "sync"
"google.golang.org/grpc"
- "sync"
-
"github.com/chrislusf/seaweedfs/weed/glog"
"github.com/chrislusf/seaweedfs/weed/util"
)
@@ -41,12 +40,13 @@ type ChunkManifest struct {
// seekable chunked file reader
type ChunkedFileReader struct {
- Manifest *ChunkManifest
- Master string
- pos int64
- pr *io.PipeReader
- pw *io.PipeWriter
- mutex sync.Mutex
+ totalSize int64
+ chunkList []*ChunkInfo
+ master string
+ pos int64
+ pr *io.PipeReader
+ pw *io.PipeWriter
+ mutex sync.Mutex
}
func (s ChunkList) Len() int { return len(s) }
@@ -72,12 +72,12 @@ func (cm *ChunkManifest) Marshal() ([]byte, error) {
return json.Marshal(cm)
}
-func (cm *ChunkManifest) DeleteChunks(master string, grpcDialOption grpc.DialOption) error {
+func (cm *ChunkManifest) DeleteChunks(master string, usePublicUrl bool, grpcDialOption grpc.DialOption) error {
var fileIds []string
for _, ci := range cm.Chunks {
fileIds = append(fileIds, ci.Fid)
}
- results, err := DeleteFiles(master, grpcDialOption, fileIds)
+ results, err := DeleteFiles(master, usePublicUrl, grpcDialOption, fileIds)
if err != nil {
glog.V(0).Infof("delete %+v: %v", fileIds, err)
return fmt.Errorf("chunk delete: %v", err)
@@ -126,16 +126,29 @@ func readChunkNeedle(fileUrl string, w io.Writer, offset int64) (written int64,
return io.Copy(w, resp.Body)
}
+func NewChunkedFileReader(chunkList []*ChunkInfo, master string) *ChunkedFileReader {
+ var totalSize int64
+ for _, chunk := range chunkList {
+ totalSize += chunk.Size
+ }
+ sort.Sort(ChunkList(chunkList))
+ return &ChunkedFileReader{
+ totalSize: totalSize,
+ chunkList: chunkList,
+ master: master,
+ }
+}
+
func (cf *ChunkedFileReader) Seek(offset int64, whence int) (int64, error) {
var err error
switch whence {
- case 0:
- case 1:
+ case io.SeekStart:
+ case io.SeekCurrent:
offset += cf.pos
- case 2:
- offset = cf.Manifest.Size - offset
+ case io.SeekEnd:
+ offset = cf.totalSize + offset
}
- if offset > cf.Manifest.Size {
+ if offset > cf.totalSize {
err = ErrInvalidRange
}
if cf.pos != offset {
@@ -146,10 +159,9 @@ func (cf *ChunkedFileReader) Seek(offset int64, whence int) (int64, error) {
}
func (cf *ChunkedFileReader) WriteTo(w io.Writer) (n int64, err error) {
- cm := cf.Manifest
chunkIndex := -1
chunkStartOffset := int64(0)
- for i, ci := range cm.Chunks {
+ for i, ci := range cf.chunkList {
if cf.pos >= ci.Offset && cf.pos < ci.Offset+ci.Size {
chunkIndex = i
chunkStartOffset = cf.pos - ci.Offset
@@ -159,10 +171,10 @@ func (cf *ChunkedFileReader) WriteTo(w io.Writer) (n int64, err error) {
if chunkIndex < 0 {
return n, ErrInvalidRange
}
- for ; chunkIndex < cm.Chunks.Len(); chunkIndex++ {
- ci := cm.Chunks[chunkIndex]
+ for ; chunkIndex < len(cf.chunkList); chunkIndex++ {
+ ci := cf.chunkList[chunkIndex]
// if we need read date from local volume server first?
- fileUrl, lookupError := LookupFileId(cf.Master, ci.Fid)
+ fileUrl, lookupError := LookupFileId(cf.master, ci.Fid)
if lookupError != nil {
return n, lookupError
}
diff --git a/weed/operation/delete_content.go b/weed/operation/delete_content.go
index 95bbde9f9..9868a411d 100644
--- a/weed/operation/delete_content.go
+++ b/weed/operation/delete_content.go
@@ -10,7 +10,6 @@ import (
"google.golang.org/grpc"
- "github.com/chrislusf/seaweedfs/weed/glog"
"github.com/chrislusf/seaweedfs/weed/pb/volume_server_pb"
)
@@ -30,10 +29,18 @@ func ParseFileId(fid string) (vid string, key_cookie string, err error) {
}
// DeleteFiles batch deletes a list of fileIds
-func DeleteFiles(master string, grpcDialOption grpc.DialOption, fileIds []string) ([]*volume_server_pb.DeleteResult, error) {
-
- lookupFunc := func(vids []string) (map[string]LookupResult, error) {
- return LookupVolumeIds(master, grpcDialOption, vids)
+func DeleteFiles(master string, usePublicUrl bool, grpcDialOption grpc.DialOption, fileIds []string) ([]*volume_server_pb.DeleteResult, error) {
+
+ lookupFunc := func(vids []string) (results map[string]LookupResult, err error) {
+ results, err = LookupVolumeIds(master, grpcDialOption, vids)
+ if err == nil && usePublicUrl {
+ for _, result := range results {
+ for _, loc := range result.Locations {
+ loc.Url = loc.PublicUrl
+ }
+ }
+ }
+ return
}
return DeleteFilesWithLookupVolumeId(grpcDialOption, fileIds, lookupFunc)
@@ -94,7 +101,7 @@ func DeleteFilesWithLookupVolumeId(grpcDialOption grpc.DialOption, fileIds []str
go func(server string, fidList []string) {
defer wg.Done()
- if deleteResults, deleteErr := DeleteFilesAtOneVolumeServer(server, grpcDialOption, fidList); deleteErr != nil {
+ if deleteResults, deleteErr := DeleteFilesAtOneVolumeServer(server, grpcDialOption, fidList, true); deleteErr != nil {
err = deleteErr
} else if deleteResults != nil {
resultChan <- deleteResults
@@ -109,18 +116,17 @@ func DeleteFilesWithLookupVolumeId(grpcDialOption grpc.DialOption, fileIds []str
ret = append(ret, result...)
}
- glog.V(1).Infof("deleted %d items", len(ret))
-
return ret, err
}
// DeleteFilesAtOneVolumeServer deletes a list of files that is on one volume server via gRpc
-func DeleteFilesAtOneVolumeServer(volumeServer string, grpcDialOption grpc.DialOption, fileIds []string) (ret []*volume_server_pb.DeleteResult, err error) {
+func DeleteFilesAtOneVolumeServer(volumeServer string, grpcDialOption grpc.DialOption, fileIds []string, includeCookie bool) (ret []*volume_server_pb.DeleteResult, err error) {
- err = WithVolumeServerClient(volumeServer, grpcDialOption, func(ctx context.Context, volumeServerClient volume_server_pb.VolumeServerClient) error {
+ err = WithVolumeServerClient(volumeServer, grpcDialOption, func(volumeServerClient volume_server_pb.VolumeServerClient) error {
req := &volume_server_pb.BatchDeleteRequest{
- FileIds: fileIds,
+ FileIds: fileIds,
+ SkipCookieCheck: !includeCookie,
}
resp, err := volumeServerClient.BatchDelete(context.Background(), req)
diff --git a/weed/operation/grpc_client.go b/weed/operation/grpc_client.go
index e7ee2d2ba..025a65b38 100644
--- a/weed/operation/grpc_client.go
+++ b/weed/operation/grpc_client.go
@@ -1,29 +1,29 @@
package operation
import (
- "context"
"fmt"
+ "strconv"
+ "strings"
+
+ "google.golang.org/grpc"
+
"github.com/chrislusf/seaweedfs/weed/glog"
+ "github.com/chrislusf/seaweedfs/weed/pb"
+ "github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
"github.com/chrislusf/seaweedfs/weed/pb/master_pb"
"github.com/chrislusf/seaweedfs/weed/pb/volume_server_pb"
- "github.com/chrislusf/seaweedfs/weed/util"
- "google.golang.org/grpc"
- "strconv"
- "strings"
)
-func WithVolumeServerClient(volumeServer string, grpcDialOption grpc.DialOption, fn func(context.Context, volume_server_pb.VolumeServerClient) error) error {
-
- ctx := context.Background()
+func WithVolumeServerClient(volumeServer string, grpcDialOption grpc.DialOption, fn func(volume_server_pb.VolumeServerClient) error) error {
grpcAddress, err := toVolumeServerGrpcAddress(volumeServer)
if err != nil {
- return err
+ return fmt.Errorf("failed to parse volume server %v: %v", volumeServer, err)
}
- return util.WithCachedGrpcClient(ctx, func(ctx2 context.Context, grpcConnection *grpc.ClientConn) error {
+ return pb.WithCachedGrpcClient(func(grpcConnection *grpc.ClientConn) error {
client := volume_server_pb.NewVolumeServerClient(grpcConnection)
- return fn(ctx2, client)
+ return fn(client)
}, grpcAddress, grpcDialOption)
}
@@ -38,18 +38,30 @@ func toVolumeServerGrpcAddress(volumeServer string) (grpcAddress string, err err
return fmt.Sprintf("%s:%d", volumeServer[0:sepIndex], port+10000), nil
}
-func WithMasterServerClient(masterServer string, grpcDialOption grpc.DialOption, fn func(ctx2 context.Context, masterClient master_pb.SeaweedClient) error) error {
+func WithMasterServerClient(masterServer string, grpcDialOption grpc.DialOption, fn func(masterClient master_pb.SeaweedClient) error) error {
- ctx := context.Background()
-
- masterGrpcAddress, parseErr := util.ParseServerToGrpcAddress(masterServer)
+ masterGrpcAddress, parseErr := pb.ParseServerToGrpcAddress(masterServer)
if parseErr != nil {
- return fmt.Errorf("failed to parse master grpc %v: %v", masterServer, parseErr)
+ return fmt.Errorf("failed to parse master %v: %v", masterServer, parseErr)
}
- return util.WithCachedGrpcClient(ctx, func(ctx2 context.Context, grpcConnection *grpc.ClientConn) error {
+ return pb.WithCachedGrpcClient(func(grpcConnection *grpc.ClientConn) error {
client := master_pb.NewSeaweedClient(grpcConnection)
- return fn(ctx2, client)
+ return fn(client)
}, masterGrpcAddress, grpcDialOption)
}
+
+func WithFilerServerClient(filerServer string, grpcDialOption grpc.DialOption, fn func(masterClient filer_pb.SeaweedFilerClient) error) error {
+
+ filerGrpcAddress, parseErr := pb.ParseServerToGrpcAddress(filerServer)
+ if parseErr != nil {
+ return fmt.Errorf("failed to parse filer %v: %v", filerGrpcAddress, parseErr)
+ }
+
+ return pb.WithCachedGrpcClient(func(grpcConnection *grpc.ClientConn) error {
+ client := filer_pb.NewSeaweedFilerClient(grpcConnection)
+ return fn(client)
+ }, filerGrpcAddress, grpcDialOption)
+
+}
diff --git a/weed/operation/lookup.go b/weed/operation/lookup.go
index 78769ac5a..d0773e7fd 100644
--- a/weed/operation/lookup.go
+++ b/weed/operation/lookup.go
@@ -99,12 +99,12 @@ func LookupVolumeIds(server string, grpcDialOption grpc.DialOption, vids []strin
//only query unknown_vids
- err := WithMasterServerClient(server, grpcDialOption, func(ctx context.Context, masterClient master_pb.SeaweedClient) error {
+ err := WithMasterServerClient(server, grpcDialOption, func(masterClient master_pb.SeaweedClient) error {
req := &master_pb.LookupVolumeRequest{
VolumeIds: unknown_vids,
}
- resp, grpcErr := masterClient.LookupVolume(ctx, req)
+ resp, grpcErr := masterClient.LookupVolume(context.Background(), req)
if grpcErr != nil {
return grpcErr
}
diff --git a/weed/operation/stats.go b/weed/operation/stats.go
deleted file mode 100644
index 3e6327f19..000000000
--- a/weed/operation/stats.go
+++ /dev/null
@@ -1,26 +0,0 @@
-package operation
-
-import (
- "context"
- "google.golang.org/grpc"
-
- "github.com/chrislusf/seaweedfs/weed/pb/master_pb"
-)
-
-func Statistics(server string, grpcDialOption grpc.DialOption, req *master_pb.StatisticsRequest) (resp *master_pb.StatisticsResponse, err error) {
-
- err = WithMasterServerClient(server, grpcDialOption, func(ctx context.Context, masterClient master_pb.SeaweedClient) error {
-
- grpcResponse, grpcErr := masterClient.Statistics(ctx, req)
- if grpcErr != nil {
- return grpcErr
- }
-
- resp = grpcResponse
-
- return nil
-
- })
-
- return
-}
diff --git a/weed/operation/submit.go b/weed/operation/submit.go
index 62f067430..e8bec382a 100644
--- a/weed/operation/submit.go
+++ b/weed/operation/submit.go
@@ -1,8 +1,6 @@
package operation
import (
- "bytes"
- "google.golang.org/grpc"
"io"
"mime"
"net/url"
@@ -11,6 +9,8 @@ import (
"strconv"
"strings"
+ "google.golang.org/grpc"
+
"github.com/chrislusf/seaweedfs/weed/glog"
"github.com/chrislusf/seaweedfs/weed/security"
)
@@ -27,6 +27,7 @@ type FilePart struct {
Ttl string
Server string //this comes from assign result
Fid string //this comes from assign result, but customizable
+ Fsync bool
}
type SubmitResult struct {
@@ -37,8 +38,7 @@ type SubmitResult struct {
Error string `json:"error,omitempty"`
}
-func SubmitFiles(master string, grpcDialOption grpc.DialOption, files []FilePart,
- replication string, collection string, dataCenter string, ttl string, maxMB int) ([]SubmitResult, error) {
+func SubmitFiles(master string, grpcDialOption grpc.DialOption, files []FilePart, replication string, collection string, dataCenter string, ttl string, maxMB int, usePublicUrl bool) ([]SubmitResult, error) {
results := make([]SubmitResult, len(files))
for index, file := range files {
results[index].FileName = file.FileName
@@ -52,7 +52,7 @@ func SubmitFiles(master string, grpcDialOption grpc.DialOption, files []FilePart
}
ret, err := Assign(master, grpcDialOption, ar)
if err != nil {
- for index, _ := range files {
+ for index := range files {
results[index].Error = err.Error()
}
return results, err
@@ -63,10 +63,13 @@ func SubmitFiles(master string, grpcDialOption grpc.DialOption, files []FilePart
file.Fid = file.Fid + "_" + strconv.Itoa(index)
}
file.Server = ret.Url
+ if usePublicUrl {
+ file.Server = ret.PublicUrl
+ }
file.Replication = replication
file.Collection = collection
file.DataCenter = dataCenter
- results[index].Size, err = file.Upload(maxMB, master, ret.Auth, grpcDialOption)
+ results[index].Size, err = file.Upload(maxMB, master, usePublicUrl, ret.Auth, grpcDialOption)
if err != nil {
results[index].Error = err.Error()
}
@@ -109,11 +112,14 @@ func newFilePart(fullPathFilename string) (ret FilePart, err error) {
return ret, nil
}
-func (fi FilePart) Upload(maxMB int, master string, jwt security.EncodedJwt, grpcDialOption grpc.DialOption) (retSize uint32, err error) {
+func (fi FilePart) Upload(maxMB int, master string, usePublicUrl bool, jwt security.EncodedJwt, grpcDialOption grpc.DialOption) (retSize uint32, err error) {
fileUrl := "http://" + fi.Server + "/" + fi.Fid
if fi.ModTime != 0 {
fileUrl += "?ts=" + strconv.Itoa(int(fi.ModTime))
}
+ if fi.Fsync {
+ fileUrl += "?fsync=true"
+ }
if closer, ok := fi.Reader.(io.Closer); ok {
defer closer.Close()
}
@@ -153,7 +159,7 @@ func (fi FilePart) Upload(maxMB int, master string, jwt security.EncodedJwt, grp
ret, err = Assign(master, grpcDialOption, ar)
if err != nil {
// delete all uploaded chunks
- cm.DeleteChunks(master, grpcDialOption)
+ cm.DeleteChunks(master, usePublicUrl, grpcDialOption)
return
}
id = ret.Fid
@@ -171,7 +177,7 @@ func (fi FilePart) Upload(maxMB int, master string, jwt security.EncodedJwt, grp
ret.Auth)
if e != nil {
// delete all uploaded chunks
- cm.DeleteChunks(master, grpcDialOption)
+ cm.DeleteChunks(master, usePublicUrl, grpcDialOption)
return 0, e
}
cm.Chunks = append(cm.Chunks,
@@ -186,10 +192,10 @@ func (fi FilePart) Upload(maxMB int, master string, jwt security.EncodedJwt, grp
err = upload_chunked_file_manifest(fileUrl, &cm, jwt)
if err != nil {
// delete all uploaded chunks
- cm.DeleteChunks(master, grpcDialOption)
+ cm.DeleteChunks(master, usePublicUrl, grpcDialOption)
}
} else {
- ret, e := Upload(fileUrl, baseName, fi.Reader, false, fi.MimeType, nil, jwt)
+ ret, e, _ := Upload(fileUrl, baseName, false, fi.Reader, false, fi.MimeType, nil, jwt)
if e != nil {
return 0, e
}
@@ -202,8 +208,7 @@ func upload_one_chunk(filename string, reader io.Reader, master,
fileUrl string, jwt security.EncodedJwt,
) (size uint32, e error) {
glog.V(4).Info("Uploading part ", filename, " to ", fileUrl, "...")
- uploadResult, uploadError := Upload(fileUrl, filename, reader, false,
- "", nil, jwt)
+ uploadResult, uploadError, _ := Upload(fileUrl, filename, false, reader, false, "", nil, jwt)
if uploadError != nil {
return 0, uploadError
}
@@ -215,12 +220,11 @@ func upload_chunked_file_manifest(fileUrl string, manifest *ChunkManifest, jwt s
if e != nil {
return e
}
- bufReader := bytes.NewReader(buf)
glog.V(4).Info("Uploading chunks manifest ", manifest.Name, " to ", fileUrl, "...")
u, _ := url.Parse(fileUrl)
q := u.Query()
q.Set("cm", "true")
u.RawQuery = q.Encode()
- _, e = Upload(u.String(), manifest.Name, bufReader, false, "application/json", nil, jwt)
+ _, e = UploadData(u.String(), manifest.Name, false, buf, false, "application/json", nil, jwt)
return e
}
diff --git a/weed/operation/sync_volume.go b/weed/operation/sync_volume.go
index 4b39ad544..5562f12ab 100644
--- a/weed/operation/sync_volume.go
+++ b/weed/operation/sync_volume.go
@@ -8,9 +8,9 @@ import (
func GetVolumeSyncStatus(server string, grpcDialOption grpc.DialOption, vid uint32) (resp *volume_server_pb.VolumeSyncStatusResponse, err error) {
- WithVolumeServerClient(server, grpcDialOption, func(ctx context.Context, client volume_server_pb.VolumeServerClient) error {
+ WithVolumeServerClient(server, grpcDialOption, func(client volume_server_pb.VolumeServerClient) error {
- resp, err = client.VolumeSyncStatus(ctx, &volume_server_pb.VolumeSyncStatusRequest{
+ resp, err = client.VolumeSyncStatus(context.Background(), &volume_server_pb.VolumeSyncStatusRequest{
VolumeId: vid,
})
return nil
diff --git a/weed/operation/tail_volume.go b/weed/operation/tail_volume.go
index 1e8b0a16e..3cd66b5da 100644
--- a/weed/operation/tail_volume.go
+++ b/weed/operation/tail_volume.go
@@ -5,9 +5,10 @@ import (
"fmt"
"io"
+ "google.golang.org/grpc"
+
"github.com/chrislusf/seaweedfs/weed/pb/volume_server_pb"
"github.com/chrislusf/seaweedfs/weed/storage/needle"
- "google.golang.org/grpc"
)
func TailVolume(master string, grpcDialOption grpc.DialOption, vid needle.VolumeId, sinceNs uint64, timeoutSeconds int, fn func(n *needle.Needle) error) error {
@@ -26,9 +27,9 @@ func TailVolume(master string, grpcDialOption grpc.DialOption, vid needle.Volume
}
func TailVolumeFromSource(volumeServer string, grpcDialOption grpc.DialOption, vid needle.VolumeId, sinceNs uint64, idleTimeoutSeconds int, fn func(n *needle.Needle) error) error {
- return WithVolumeServerClient(volumeServer, grpcDialOption, func(ctx context.Context, client volume_server_pb.VolumeServerClient) error {
+ return WithVolumeServerClient(volumeServer, grpcDialOption, func(client volume_server_pb.VolumeServerClient) error {
- stream, err := client.VolumeTailSender(ctx, &volume_server_pb.VolumeTailSenderRequest{
+ stream, err := client.VolumeTailSender(context.Background(), &volume_server_pb.VolumeTailSenderRequest{
VolumeId: uint32(vid),
SinceNs: sinceNs,
IdleTimeoutSeconds: uint32(idleTimeoutSeconds),
diff --git a/weed/operation/upload_content.go b/weed/operation/upload_content.go
index c387d0230..1e2c591c5 100644
--- a/weed/operation/upload_content.go
+++ b/weed/operation/upload_content.go
@@ -2,8 +2,7 @@ package operation
import (
"bytes"
- "compress/flate"
- "compress/gzip"
+ "crypto/md5"
"encoding/json"
"errors"
"fmt"
@@ -15,17 +14,35 @@ import (
"net/textproto"
"path/filepath"
"strings"
+ "time"
"github.com/chrislusf/seaweedfs/weed/glog"
+ "github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
"github.com/chrislusf/seaweedfs/weed/security"
"github.com/chrislusf/seaweedfs/weed/util"
)
type UploadResult struct {
- Name string `json:"name,omitempty"`
- Size uint32 `json:"size,omitempty"`
- Error string `json:"error,omitempty"`
- ETag string `json:"eTag,omitempty"`
+ Name string `json:"name,omitempty"`
+ Size uint32 `json:"size,omitempty"`
+ Error string `json:"error,omitempty"`
+ ETag string `json:"eTag,omitempty"`
+ CipherKey []byte `json:"cipherKey,omitempty"`
+ Mime string `json:"mime,omitempty"`
+ Gzip uint32 `json:"gzip,omitempty"`
+ Md5 string `json:"md5,omitempty"`
+}
+
+func (uploadResult *UploadResult) ToPbFileChunk(fileId string, offset int64) *filer_pb.FileChunk {
+ return &filer_pb.FileChunk{
+ FileId: fileId,
+ Offset: offset,
+ Size: uint64(uploadResult.Size),
+ Mtime: time.Now().UnixNano(),
+ ETag: uploadResult.ETag,
+ CipherKey: uploadResult.CipherKey,
+ IsGzipped: uploadResult.Gzip > 0,
+ }
}
var (
@@ -41,40 +58,113 @@ func init() {
var fileNameEscaper = strings.NewReplacer("\\", "\\\\", "\"", "\\\"")
// Upload sends a POST request to a volume server to upload the content with adjustable compression level
-func UploadWithLocalCompressionLevel(uploadUrl string, filename string, reader io.Reader, isGzipped bool, mtype string, pairMap map[string]string, jwt security.EncodedJwt, compressionLevel int) (*UploadResult, error) {
- if compressionLevel < 1 {
- compressionLevel = 1
- }
- if compressionLevel > 9 {
- compressionLevel = 9
+func UploadData(uploadUrl string, filename string, cipher bool, data []byte, isInputGzipped bool, mtype string, pairMap map[string]string, jwt security.EncodedJwt) (uploadResult *UploadResult, err error) {
+ uploadResult, err = doUploadData(uploadUrl, filename, cipher, data, isInputGzipped, mtype, pairMap, jwt)
+ if uploadResult != nil {
+ uploadResult.Md5 = util.Md5(data)
}
- return doUpload(uploadUrl, filename, reader, isGzipped, mtype, pairMap, compressionLevel, jwt)
+ return
}
// Upload sends a POST request to a volume server to upload the content with fast compression
-func Upload(uploadUrl string, filename string, reader io.Reader, isGzipped bool, mtype string, pairMap map[string]string, jwt security.EncodedJwt) (*UploadResult, error) {
- return doUpload(uploadUrl, filename, reader, isGzipped, mtype, pairMap, flate.BestSpeed, jwt)
+func Upload(uploadUrl string, filename string, cipher bool, reader io.Reader, isInputGzipped bool, mtype string, pairMap map[string]string, jwt security.EncodedJwt) (uploadResult *UploadResult, err error, data []byte) {
+ hash := md5.New()
+ reader = io.TeeReader(reader, hash)
+ uploadResult, err, data = doUpload(uploadUrl, filename, cipher, reader, isInputGzipped, mtype, pairMap, jwt)
+ if uploadResult != nil {
+ uploadResult.Md5 = fmt.Sprintf("%x", hash.Sum(nil))
+ }
+ return
+}
+
+func doUpload(uploadUrl string, filename string, cipher bool, reader io.Reader, isInputGzipped bool, mtype string, pairMap map[string]string, jwt security.EncodedJwt) (uploadResult *UploadResult, err error, data []byte) {
+ data, err = ioutil.ReadAll(reader)
+ if err != nil {
+ err = fmt.Errorf("read input: %v", err)
+ return
+ }
+ uploadResult, uploadErr := doUploadData(uploadUrl, filename, cipher, data, isInputGzipped, mtype, pairMap, jwt)
+ return uploadResult, uploadErr, data
}
-func doUpload(uploadUrl string, filename string, reader io.Reader, isGzipped bool, mtype string, pairMap map[string]string, compression int, jwt security.EncodedJwt) (*UploadResult, error) {
- contentIsGzipped := isGzipped
+func doUploadData(uploadUrl string, filename string, cipher bool, data []byte, isInputGzipped bool, mtype string, pairMap map[string]string, jwt security.EncodedJwt) (uploadResult *UploadResult, err error) {
+ contentIsGzipped := isInputGzipped
shouldGzipNow := false
- if !isGzipped {
+ if !isInputGzipped {
+ if mtype == "" {
+ mtype = http.DetectContentType(data)
+ if mtype == "application/octet-stream" {
+ mtype = ""
+ }
+ }
if shouldBeZipped, iAmSure := util.IsGzippableFileType(filepath.Base(filename), mtype); iAmSure && shouldBeZipped {
shouldGzipNow = true
+ } else if !iAmSure && mtype == "" && len(data) > 128 {
+ var compressed []byte
+ compressed, err = util.GzipData(data[0:128])
+ shouldGzipNow = len(compressed)*10 < 128*9 // can not compress to less than 90%
+ }
+ }
+
+ var clearDataLen int
+
+ // gzip if possible
+ // this could be double copying
+ clearDataLen = len(data)
+ if shouldGzipNow {
+ compressed, compressErr := util.GzipData(data)
+ // fmt.Printf("data is compressed from %d ==> %d\n", len(data), len(compressed))
+ if compressErr == nil {
+ data = compressed
contentIsGzipped = true
}
+ } else if isInputGzipped {
+ // just to get the clear data length
+ clearData, err := util.UnGzipData(data)
+ if err == nil {
+ clearDataLen = len(clearData)
+ }
}
- return upload_content(uploadUrl, func(w io.Writer) (err error) {
- if shouldGzipNow {
- gzWriter, _ := gzip.NewWriterLevel(w, compression)
- _, err = io.Copy(gzWriter, reader)
- gzWriter.Close()
- } else {
- _, err = io.Copy(w, reader)
+
+ if cipher {
+ // encrypt(gzip(data))
+
+ // encrypt
+ cipherKey := util.GenCipherKey()
+ encryptedData, encryptionErr := util.Encrypt(data, cipherKey)
+ if encryptionErr != nil {
+ err = fmt.Errorf("encrypt input: %v", encryptionErr)
+ return
+ }
+
+ // upload data
+ uploadResult, err = upload_content(uploadUrl, func(w io.Writer) (err error) {
+ _, err = w.Write(encryptedData)
+ return
+ }, "", false, "", nil, jwt)
+ if uploadResult != nil {
+ uploadResult.Name = filename
+ uploadResult.Mime = mtype
+ uploadResult.CipherKey = cipherKey
}
+ } else {
+ // upload data
+ uploadResult, err = upload_content(uploadUrl, func(w io.Writer) (err error) {
+ _, err = w.Write(data)
+ return
+ }, filename, contentIsGzipped, mtype, pairMap, jwt)
+ }
+
+ if uploadResult == nil {
return
- }, filename, contentIsGzipped, mtype, pairMap, jwt)
+ }
+
+ uploadResult.Size = uint32(clearDataLen)
+ if contentIsGzipped {
+ uploadResult.Gzip = 1
+ }
+
+ return uploadResult, err
}
func upload_content(uploadUrl string, fillBufferFunction func(w io.Writer) error, filename string, isGzipped bool, mtype string, pairMap map[string]string, jwt security.EncodedJwt) (*UploadResult, error) {
@@ -125,12 +215,17 @@ func upload_content(uploadUrl string, fillBufferFunction func(w io.Writer) error
return nil, post_err
}
defer resp.Body.Close()
+
+ var ret UploadResult
etag := getEtag(resp)
+ if resp.StatusCode == http.StatusNoContent {
+ ret.ETag = etag
+ return &ret, nil
+ }
resp_body, ra_err := ioutil.ReadAll(resp.Body)
if ra_err != nil {
return nil, ra_err
}
- var ret UploadResult
unmarshal_err := json.Unmarshal(resp_body, &ret)
if unmarshal_err != nil {
glog.V(0).Infoln("failing to read upload response", uploadUrl, string(resp_body))