aboutsummaryrefslogtreecommitdiff
path: root/weed/operation
diff options
context:
space:
mode:
Diffstat (limited to 'weed/operation')
-rw-r--r--weed/operation/assign_file_id.go6
-rw-r--r--weed/operation/delete_content.go11
-rw-r--r--weed/operation/grpc_client.go21
-rw-r--r--weed/operation/submit.go14
-rw-r--r--weed/operation/tail_volume.go3
-rw-r--r--weed/operation/upload_content.go175
6 files changed, 175 insertions, 55 deletions
diff --git a/weed/operation/assign_file_id.go b/weed/operation/assign_file_id.go
index 2dfa44483..893bf516c 100644
--- a/weed/operation/assign_file_id.go
+++ b/weed/operation/assign_file_id.go
@@ -3,11 +3,13 @@ package operation
import (
"context"
"fmt"
+ "strings"
+
+ "google.golang.org/grpc"
+
"github.com/chrislusf/seaweedfs/weed/pb/master_pb"
"github.com/chrislusf/seaweedfs/weed/security"
"github.com/chrislusf/seaweedfs/weed/util"
- "google.golang.org/grpc"
- "strings"
)
type VolumeAssignRequest struct {
diff --git a/weed/operation/delete_content.go b/weed/operation/delete_content.go
index 6d84be76f..361c09e7e 100644
--- a/weed/operation/delete_content.go
+++ b/weed/operation/delete_content.go
@@ -4,12 +4,13 @@ import (
"context"
"errors"
"fmt"
- "github.com/chrislusf/seaweedfs/weed/glog"
- "github.com/chrislusf/seaweedfs/weed/pb/volume_server_pb"
- "google.golang.org/grpc"
"net/http"
"strings"
"sync"
+
+ "google.golang.org/grpc"
+
+ "github.com/chrislusf/seaweedfs/weed/pb/volume_server_pb"
)
type DeleteResult struct {
@@ -94,7 +95,7 @@ func DeleteFilesWithLookupVolumeId(grpcDialOption grpc.DialOption, fileIds []str
if deleteResults, deleteErr := DeleteFilesAtOneVolumeServer(server, grpcDialOption, fidList); deleteErr != nil {
err = deleteErr
- } else {
+ } else if deleteResults != nil {
resultChan <- deleteResults
}
@@ -107,8 +108,6 @@ func DeleteFilesWithLookupVolumeId(grpcDialOption grpc.DialOption, fileIds []str
ret = append(ret, result...)
}
- glog.V(0).Infof("deleted %d items", len(ret))
-
return ret, err
}
diff --git a/weed/operation/grpc_client.go b/weed/operation/grpc_client.go
index f6b2b69e9..dccf85da4 100644
--- a/weed/operation/grpc_client.go
+++ b/weed/operation/grpc_client.go
@@ -1,27 +1,26 @@
package operation
import (
- "context"
"fmt"
+ "strconv"
+ "strings"
+
+ "google.golang.org/grpc"
+
"github.com/chrislusf/seaweedfs/weed/glog"
+ "github.com/chrislusf/seaweedfs/weed/pb"
"github.com/chrislusf/seaweedfs/weed/pb/master_pb"
"github.com/chrislusf/seaweedfs/weed/pb/volume_server_pb"
- "github.com/chrislusf/seaweedfs/weed/util"
- "google.golang.org/grpc"
- "strconv"
- "strings"
)
func WithVolumeServerClient(volumeServer string, grpcDialOption grpc.DialOption, fn func(volume_server_pb.VolumeServerClient) error) error {
- ctx := context.Background()
-
grpcAddress, err := toVolumeServerGrpcAddress(volumeServer)
if err != nil {
return err
}
- return util.WithCachedGrpcClient(ctx, func(grpcConnection *grpc.ClientConn) error {
+ return pb.WithCachedGrpcClient(func(grpcConnection *grpc.ClientConn) error {
client := volume_server_pb.NewVolumeServerClient(grpcConnection)
return fn(client)
}, grpcAddress, grpcDialOption)
@@ -40,14 +39,12 @@ func toVolumeServerGrpcAddress(volumeServer string) (grpcAddress string, err err
func WithMasterServerClient(masterServer string, grpcDialOption grpc.DialOption, fn func(masterClient master_pb.SeaweedClient) error) error {
- ctx := context.Background()
-
- masterGrpcAddress, parseErr := util.ParseServerToGrpcAddress(masterServer)
+ masterGrpcAddress, parseErr := pb.ParseServerToGrpcAddress(masterServer)
if parseErr != nil {
return fmt.Errorf("failed to parse master grpc %v: %v", masterServer, parseErr)
}
- return util.WithCachedGrpcClient(ctx, func(grpcConnection *grpc.ClientConn) error {
+ return pb.WithCachedGrpcClient(func(grpcConnection *grpc.ClientConn) error {
client := master_pb.NewSeaweedClient(grpcConnection)
return fn(client)
}, masterGrpcAddress, grpcDialOption)
diff --git a/weed/operation/submit.go b/weed/operation/submit.go
index bdf59d966..5e4dc4374 100644
--- a/weed/operation/submit.go
+++ b/weed/operation/submit.go
@@ -1,8 +1,6 @@
package operation
import (
- "bytes"
- "google.golang.org/grpc"
"io"
"mime"
"net/url"
@@ -11,6 +9,8 @@ import (
"strconv"
"strings"
+ "google.golang.org/grpc"
+
"github.com/chrislusf/seaweedfs/weed/glog"
"github.com/chrislusf/seaweedfs/weed/security"
)
@@ -52,7 +52,7 @@ func SubmitFiles(master string, grpcDialOption grpc.DialOption, files []FilePart
}
ret, err := Assign(master, grpcDialOption, ar)
if err != nil {
- for index, _ := range files {
+ for index := range files {
results[index].Error = err.Error()
}
return results, err
@@ -189,7 +189,7 @@ func (fi FilePart) Upload(maxMB int, master string, jwt security.EncodedJwt, grp
cm.DeleteChunks(master, grpcDialOption)
}
} else {
- ret, e := Upload(fileUrl, baseName, fi.Reader, false, fi.MimeType, nil, jwt)
+ ret, e := Upload(fileUrl, baseName, false, fi.Reader, false, fi.MimeType, nil, jwt)
if e != nil {
return 0, e
}
@@ -202,8 +202,7 @@ func upload_one_chunk(filename string, reader io.Reader, master,
fileUrl string, jwt security.EncodedJwt,
) (size uint32, e error) {
glog.V(4).Info("Uploading part ", filename, " to ", fileUrl, "...")
- uploadResult, uploadError := Upload(fileUrl, filename, reader, false,
- "application/octet-stream", nil, jwt)
+ uploadResult, uploadError := Upload(fileUrl, filename, false, reader, false, "", nil, jwt)
if uploadError != nil {
return 0, uploadError
}
@@ -215,12 +214,11 @@ func upload_chunked_file_manifest(fileUrl string, manifest *ChunkManifest, jwt s
if e != nil {
return e
}
- bufReader := bytes.NewReader(buf)
glog.V(4).Info("Uploading chunks manifest ", manifest.Name, " to ", fileUrl, "...")
u, _ := url.Parse(fileUrl)
q := u.Query()
q.Set("cm", "true")
u.RawQuery = q.Encode()
- _, e = Upload(u.String(), manifest.Name, bufReader, false, "application/json", nil, jwt)
+ _, e = UploadData(u.String(), manifest.Name, false, buf, false, "application/json", nil, jwt)
return e
}
diff --git a/weed/operation/tail_volume.go b/weed/operation/tail_volume.go
index b53f18ce1..3cd66b5da 100644
--- a/weed/operation/tail_volume.go
+++ b/weed/operation/tail_volume.go
@@ -5,9 +5,10 @@ import (
"fmt"
"io"
+ "google.golang.org/grpc"
+
"github.com/chrislusf/seaweedfs/weed/pb/volume_server_pb"
"github.com/chrislusf/seaweedfs/weed/storage/needle"
- "google.golang.org/grpc"
)
func TailVolume(master string, grpcDialOption grpc.DialOption, vid needle.VolumeId, sinceNs uint64, timeoutSeconds int, fn func(n *needle.Needle) error) error {
diff --git a/weed/operation/upload_content.go b/weed/operation/upload_content.go
index c387d0230..52f8f9e2b 100644
--- a/weed/operation/upload_content.go
+++ b/weed/operation/upload_content.go
@@ -3,7 +3,7 @@ package operation
import (
"bytes"
"compress/flate"
- "compress/gzip"
+ "crypto/md5"
"encoding/json"
"errors"
"fmt"
@@ -22,10 +22,14 @@ import (
)
type UploadResult struct {
- Name string `json:"name,omitempty"`
- Size uint32 `json:"size,omitempty"`
- Error string `json:"error,omitempty"`
- ETag string `json:"eTag,omitempty"`
+ Name string `json:"name,omitempty"`
+ Size uint32 `json:"size,omitempty"`
+ Error string `json:"error,omitempty"`
+ ETag string `json:"eTag,omitempty"`
+ CipherKey []byte `json:"cipherKey,omitempty"`
+ Mime string `json:"mime,omitempty"`
+ Gzip uint32 `json:"gzip,omitempty"`
+ Md5 string `json:"md5,omitempty"`
}
var (
@@ -41,40 +45,159 @@ func init() {
var fileNameEscaper = strings.NewReplacer("\\", "\\\\", "\"", "\\\"")
// Upload sends a POST request to a volume server to upload the content with adjustable compression level
-func UploadWithLocalCompressionLevel(uploadUrl string, filename string, reader io.Reader, isGzipped bool, mtype string, pairMap map[string]string, jwt security.EncodedJwt, compressionLevel int) (*UploadResult, error) {
- if compressionLevel < 1 {
- compressionLevel = 1
+func UploadData(uploadUrl string, filename string, cipher bool, data []byte, isInputGzipped bool, mtype string, pairMap map[string]string, jwt security.EncodedJwt) (uploadResult *UploadResult, err error) {
+ hash := md5.New()
+ hash.Write(data)
+ uploadResult, err = doUploadData(uploadUrl, filename, cipher, data, isInputGzipped, mtype, pairMap, jwt)
+ if uploadResult != nil {
+ uploadResult.Md5 = fmt.Sprintf("%x", hash.Sum(nil))
}
- if compressionLevel > 9 {
- compressionLevel = 9
- }
- return doUpload(uploadUrl, filename, reader, isGzipped, mtype, pairMap, compressionLevel, jwt)
+ return
}
// Upload sends a POST request to a volume server to upload the content with fast compression
-func Upload(uploadUrl string, filename string, reader io.Reader, isGzipped bool, mtype string, pairMap map[string]string, jwt security.EncodedJwt) (*UploadResult, error) {
- return doUpload(uploadUrl, filename, reader, isGzipped, mtype, pairMap, flate.BestSpeed, jwt)
+func Upload(uploadUrl string, filename string, cipher bool, reader io.Reader, isInputGzipped bool, mtype string, pairMap map[string]string, jwt security.EncodedJwt) (uploadResult *UploadResult, err error) {
+ hash := md5.New()
+ reader = io.TeeReader(reader, hash)
+ uploadResult, err = doUpload(uploadUrl, filename, cipher, reader, isInputGzipped, mtype, pairMap, flate.BestSpeed, jwt)
+ if uploadResult != nil {
+ uploadResult.Md5 = fmt.Sprintf("%x", hash.Sum(nil))
+ }
+ return
+}
+
+func doUploadData(uploadUrl string, filename string, cipher bool, data []byte, isInputGzipped bool, mtype string, pairMap map[string]string, jwt security.EncodedJwt) (uploadResult *UploadResult, err error) {
+ contentIsGzipped := isInputGzipped
+ shouldGzipNow := false
+ if !isInputGzipped {
+ if shouldBeZipped, iAmSure := util.IsGzippableFileType(filepath.Base(filename), mtype); mtype == "" || iAmSure && shouldBeZipped {
+ shouldGzipNow = true
+ contentIsGzipped = true
+ }
+ }
+
+ var clearDataLen int
+
+ // gzip if possible
+ // this could be double copying
+ clearDataLen = len(data)
+ if shouldGzipNow {
+ data, err = util.GzipData(data)
+ } else if isInputGzipped {
+ // just to get the clear data length
+ clearData, err := util.UnGzipData(data)
+ if err == nil {
+ clearDataLen = len(clearData)
+ }
+ }
+
+ if cipher {
+ // encrypt(gzip(data))
+
+ // encrypt
+ cipherKey := util.GenCipherKey()
+ encryptedData, encryptionErr := util.Encrypt(data, cipherKey)
+ if encryptionErr != nil {
+ err = fmt.Errorf("encrypt input: %v", encryptionErr)
+ return
+ }
+
+ // upload data
+ uploadResult, err = upload_content(uploadUrl, func(w io.Writer) (err error) {
+ _, err = w.Write(encryptedData)
+ return
+ }, "", false, "", nil, jwt)
+ if uploadResult != nil {
+ uploadResult.Name = filename
+ uploadResult.Mime = mtype
+ uploadResult.CipherKey = cipherKey
+ }
+ } else {
+ // upload data
+ uploadResult, err = upload_content(uploadUrl, func(w io.Writer) (err error) {
+ _, err = w.Write(data)
+ return
+ }, filename, contentIsGzipped, mtype, pairMap, jwt)
+ }
+
+ uploadResult.Size = uint32(clearDataLen)
+ if contentIsGzipped {
+ uploadResult.Gzip = 1
+ }
+
+ return uploadResult, err
}
-func doUpload(uploadUrl string, filename string, reader io.Reader, isGzipped bool, mtype string, pairMap map[string]string, compression int, jwt security.EncodedJwt) (*UploadResult, error) {
- contentIsGzipped := isGzipped
+func doUpload(uploadUrl string, filename string, cipher bool, reader io.Reader, isInputGzipped bool, mtype string, pairMap map[string]string, compression int, jwt security.EncodedJwt) (uploadResult *UploadResult, err error) {
+ contentIsGzipped := isInputGzipped
shouldGzipNow := false
- if !isGzipped {
- if shouldBeZipped, iAmSure := util.IsGzippableFileType(filepath.Base(filename), mtype); iAmSure && shouldBeZipped {
+ if !isInputGzipped {
+ if shouldBeZipped, iAmSure := util.IsGzippableFileType(filepath.Base(filename), mtype); mtype == "" || iAmSure && shouldBeZipped {
shouldGzipNow = true
contentIsGzipped = true
}
}
- return upload_content(uploadUrl, func(w io.Writer) (err error) {
- if shouldGzipNow {
- gzWriter, _ := gzip.NewWriterLevel(w, compression)
- _, err = io.Copy(gzWriter, reader)
- gzWriter.Close()
- } else {
- _, err = io.Copy(w, reader)
+
+ var clearDataLen int
+
+ // gzip if possible
+ // this could be double copying
+ data, readErr := ioutil.ReadAll(reader)
+ if readErr != nil {
+ err = fmt.Errorf("read input: %v", readErr)
+ return
+ }
+ clearDataLen = len(data)
+ if shouldGzipNow {
+ data, err = util.GzipData(data)
+ } else if isInputGzipped {
+ // just to get the clear data length
+ clearData, err := util.UnGzipData(data)
+ if err == nil {
+ clearDataLen = len(clearData)
+ }
+ }
+
+ if cipher {
+ // encrypt(gzip(data))
+
+ // encrypt
+ cipherKey := util.GenCipherKey()
+ encryptedData, encryptionErr := util.Encrypt(data, cipherKey)
+ if encryptionErr != nil {
+ err = fmt.Errorf("encrypt input: %v", encryptionErr)
+ return
}
+
+ // upload data
+ uploadResult, err = upload_content(uploadUrl, func(w io.Writer) (err error) {
+ _, err = w.Write(encryptedData)
+ return
+ }, "", false, "", nil, jwt)
+ if uploadResult != nil {
+ uploadResult.Name = filename
+ uploadResult.Mime = mtype
+ uploadResult.CipherKey = cipherKey
+ uploadResult.Size = uint32(clearDataLen)
+ }
+ } else {
+ // upload data
+ uploadResult, err = upload_content(uploadUrl, func(w io.Writer) (err error) {
+ _, err = w.Write(data)
+ return
+ }, filename, contentIsGzipped, mtype, pairMap, jwt)
+ }
+
+ if uploadResult == nil {
return
- }, filename, contentIsGzipped, mtype, pairMap, jwt)
+ }
+
+ uploadResult.Size = uint32(clearDataLen)
+ if contentIsGzipped {
+ uploadResult.Gzip = 1
+ }
+
+ return uploadResult, err
}
func upload_content(uploadUrl string, fillBufferFunction func(w io.Writer) error, filename string, isGzipped bool, mtype string, pairMap map[string]string, jwt security.EncodedJwt) (*UploadResult, error) {