diff options
Diffstat (limited to 'weed/operation')
| -rw-r--r-- | weed/operation/assign_file_id.go | 6 | ||||
| -rw-r--r-- | weed/operation/delete_content.go | 11 | ||||
| -rw-r--r-- | weed/operation/grpc_client.go | 21 | ||||
| -rw-r--r-- | weed/operation/submit.go | 14 | ||||
| -rw-r--r-- | weed/operation/tail_volume.go | 3 | ||||
| -rw-r--r-- | weed/operation/upload_content.go | 175 |
6 files changed, 175 insertions, 55 deletions
diff --git a/weed/operation/assign_file_id.go b/weed/operation/assign_file_id.go index 2dfa44483..893bf516c 100644 --- a/weed/operation/assign_file_id.go +++ b/weed/operation/assign_file_id.go @@ -3,11 +3,13 @@ package operation import ( "context" "fmt" + "strings" + + "google.golang.org/grpc" + "github.com/chrislusf/seaweedfs/weed/pb/master_pb" "github.com/chrislusf/seaweedfs/weed/security" "github.com/chrislusf/seaweedfs/weed/util" - "google.golang.org/grpc" - "strings" ) type VolumeAssignRequest struct { diff --git a/weed/operation/delete_content.go b/weed/operation/delete_content.go index 6d84be76f..361c09e7e 100644 --- a/weed/operation/delete_content.go +++ b/weed/operation/delete_content.go @@ -4,12 +4,13 @@ import ( "context" "errors" "fmt" - "github.com/chrislusf/seaweedfs/weed/glog" - "github.com/chrislusf/seaweedfs/weed/pb/volume_server_pb" - "google.golang.org/grpc" "net/http" "strings" "sync" + + "google.golang.org/grpc" + + "github.com/chrislusf/seaweedfs/weed/pb/volume_server_pb" ) type DeleteResult struct { @@ -94,7 +95,7 @@ func DeleteFilesWithLookupVolumeId(grpcDialOption grpc.DialOption, fileIds []str if deleteResults, deleteErr := DeleteFilesAtOneVolumeServer(server, grpcDialOption, fidList); deleteErr != nil { err = deleteErr - } else { + } else if deleteResults != nil { resultChan <- deleteResults } @@ -107,8 +108,6 @@ func DeleteFilesWithLookupVolumeId(grpcDialOption grpc.DialOption, fileIds []str ret = append(ret, result...) } - glog.V(0).Infof("deleted %d items", len(ret)) - return ret, err } diff --git a/weed/operation/grpc_client.go b/weed/operation/grpc_client.go index f6b2b69e9..dccf85da4 100644 --- a/weed/operation/grpc_client.go +++ b/weed/operation/grpc_client.go @@ -1,27 +1,26 @@ package operation import ( - "context" "fmt" + "strconv" + "strings" + + "google.golang.org/grpc" + "github.com/chrislusf/seaweedfs/weed/glog" + "github.com/chrislusf/seaweedfs/weed/pb" "github.com/chrislusf/seaweedfs/weed/pb/master_pb" "github.com/chrislusf/seaweedfs/weed/pb/volume_server_pb" - "github.com/chrislusf/seaweedfs/weed/util" - "google.golang.org/grpc" - "strconv" - "strings" ) func WithVolumeServerClient(volumeServer string, grpcDialOption grpc.DialOption, fn func(volume_server_pb.VolumeServerClient) error) error { - ctx := context.Background() - grpcAddress, err := toVolumeServerGrpcAddress(volumeServer) if err != nil { return err } - return util.WithCachedGrpcClient(ctx, func(grpcConnection *grpc.ClientConn) error { + return pb.WithCachedGrpcClient(func(grpcConnection *grpc.ClientConn) error { client := volume_server_pb.NewVolumeServerClient(grpcConnection) return fn(client) }, grpcAddress, grpcDialOption) @@ -40,14 +39,12 @@ func toVolumeServerGrpcAddress(volumeServer string) (grpcAddress string, err err func WithMasterServerClient(masterServer string, grpcDialOption grpc.DialOption, fn func(masterClient master_pb.SeaweedClient) error) error { - ctx := context.Background() - - masterGrpcAddress, parseErr := util.ParseServerToGrpcAddress(masterServer) + masterGrpcAddress, parseErr := pb.ParseServerToGrpcAddress(masterServer) if parseErr != nil { return fmt.Errorf("failed to parse master grpc %v: %v", masterServer, parseErr) } - return util.WithCachedGrpcClient(ctx, func(grpcConnection *grpc.ClientConn) error { + return pb.WithCachedGrpcClient(func(grpcConnection *grpc.ClientConn) error { client := master_pb.NewSeaweedClient(grpcConnection) return fn(client) }, masterGrpcAddress, grpcDialOption) diff --git a/weed/operation/submit.go b/weed/operation/submit.go index bdf59d966..5e4dc4374 100644 --- a/weed/operation/submit.go +++ b/weed/operation/submit.go @@ -1,8 +1,6 @@ package operation import ( - "bytes" - "google.golang.org/grpc" "io" "mime" "net/url" @@ -11,6 +9,8 @@ import ( "strconv" "strings" + "google.golang.org/grpc" + "github.com/chrislusf/seaweedfs/weed/glog" "github.com/chrislusf/seaweedfs/weed/security" ) @@ -52,7 +52,7 @@ func SubmitFiles(master string, grpcDialOption grpc.DialOption, files []FilePart } ret, err := Assign(master, grpcDialOption, ar) if err != nil { - for index, _ := range files { + for index := range files { results[index].Error = err.Error() } return results, err @@ -189,7 +189,7 @@ func (fi FilePart) Upload(maxMB int, master string, jwt security.EncodedJwt, grp cm.DeleteChunks(master, grpcDialOption) } } else { - ret, e := Upload(fileUrl, baseName, fi.Reader, false, fi.MimeType, nil, jwt) + ret, e := Upload(fileUrl, baseName, false, fi.Reader, false, fi.MimeType, nil, jwt) if e != nil { return 0, e } @@ -202,8 +202,7 @@ func upload_one_chunk(filename string, reader io.Reader, master, fileUrl string, jwt security.EncodedJwt, ) (size uint32, e error) { glog.V(4).Info("Uploading part ", filename, " to ", fileUrl, "...") - uploadResult, uploadError := Upload(fileUrl, filename, reader, false, - "application/octet-stream", nil, jwt) + uploadResult, uploadError := Upload(fileUrl, filename, false, reader, false, "", nil, jwt) if uploadError != nil { return 0, uploadError } @@ -215,12 +214,11 @@ func upload_chunked_file_manifest(fileUrl string, manifest *ChunkManifest, jwt s if e != nil { return e } - bufReader := bytes.NewReader(buf) glog.V(4).Info("Uploading chunks manifest ", manifest.Name, " to ", fileUrl, "...") u, _ := url.Parse(fileUrl) q := u.Query() q.Set("cm", "true") u.RawQuery = q.Encode() - _, e = Upload(u.String(), manifest.Name, bufReader, false, "application/json", nil, jwt) + _, e = UploadData(u.String(), manifest.Name, false, buf, false, "application/json", nil, jwt) return e } diff --git a/weed/operation/tail_volume.go b/weed/operation/tail_volume.go index b53f18ce1..3cd66b5da 100644 --- a/weed/operation/tail_volume.go +++ b/weed/operation/tail_volume.go @@ -5,9 +5,10 @@ import ( "fmt" "io" + "google.golang.org/grpc" + "github.com/chrislusf/seaweedfs/weed/pb/volume_server_pb" "github.com/chrislusf/seaweedfs/weed/storage/needle" - "google.golang.org/grpc" ) func TailVolume(master string, grpcDialOption grpc.DialOption, vid needle.VolumeId, sinceNs uint64, timeoutSeconds int, fn func(n *needle.Needle) error) error { diff --git a/weed/operation/upload_content.go b/weed/operation/upload_content.go index c387d0230..52f8f9e2b 100644 --- a/weed/operation/upload_content.go +++ b/weed/operation/upload_content.go @@ -3,7 +3,7 @@ package operation import ( "bytes" "compress/flate" - "compress/gzip" + "crypto/md5" "encoding/json" "errors" "fmt" @@ -22,10 +22,14 @@ import ( ) type UploadResult struct { - Name string `json:"name,omitempty"` - Size uint32 `json:"size,omitempty"` - Error string `json:"error,omitempty"` - ETag string `json:"eTag,omitempty"` + Name string `json:"name,omitempty"` + Size uint32 `json:"size,omitempty"` + Error string `json:"error,omitempty"` + ETag string `json:"eTag,omitempty"` + CipherKey []byte `json:"cipherKey,omitempty"` + Mime string `json:"mime,omitempty"` + Gzip uint32 `json:"gzip,omitempty"` + Md5 string `json:"md5,omitempty"` } var ( @@ -41,40 +45,159 @@ func init() { var fileNameEscaper = strings.NewReplacer("\\", "\\\\", "\"", "\\\"") // Upload sends a POST request to a volume server to upload the content with adjustable compression level -func UploadWithLocalCompressionLevel(uploadUrl string, filename string, reader io.Reader, isGzipped bool, mtype string, pairMap map[string]string, jwt security.EncodedJwt, compressionLevel int) (*UploadResult, error) { - if compressionLevel < 1 { - compressionLevel = 1 +func UploadData(uploadUrl string, filename string, cipher bool, data []byte, isInputGzipped bool, mtype string, pairMap map[string]string, jwt security.EncodedJwt) (uploadResult *UploadResult, err error) { + hash := md5.New() + hash.Write(data) + uploadResult, err = doUploadData(uploadUrl, filename, cipher, data, isInputGzipped, mtype, pairMap, jwt) + if uploadResult != nil { + uploadResult.Md5 = fmt.Sprintf("%x", hash.Sum(nil)) } - if compressionLevel > 9 { - compressionLevel = 9 - } - return doUpload(uploadUrl, filename, reader, isGzipped, mtype, pairMap, compressionLevel, jwt) + return } // Upload sends a POST request to a volume server to upload the content with fast compression -func Upload(uploadUrl string, filename string, reader io.Reader, isGzipped bool, mtype string, pairMap map[string]string, jwt security.EncodedJwt) (*UploadResult, error) { - return doUpload(uploadUrl, filename, reader, isGzipped, mtype, pairMap, flate.BestSpeed, jwt) +func Upload(uploadUrl string, filename string, cipher bool, reader io.Reader, isInputGzipped bool, mtype string, pairMap map[string]string, jwt security.EncodedJwt) (uploadResult *UploadResult, err error) { + hash := md5.New() + reader = io.TeeReader(reader, hash) + uploadResult, err = doUpload(uploadUrl, filename, cipher, reader, isInputGzipped, mtype, pairMap, flate.BestSpeed, jwt) + if uploadResult != nil { + uploadResult.Md5 = fmt.Sprintf("%x", hash.Sum(nil)) + } + return +} + +func doUploadData(uploadUrl string, filename string, cipher bool, data []byte, isInputGzipped bool, mtype string, pairMap map[string]string, jwt security.EncodedJwt) (uploadResult *UploadResult, err error) { + contentIsGzipped := isInputGzipped + shouldGzipNow := false + if !isInputGzipped { + if shouldBeZipped, iAmSure := util.IsGzippableFileType(filepath.Base(filename), mtype); mtype == "" || iAmSure && shouldBeZipped { + shouldGzipNow = true + contentIsGzipped = true + } + } + + var clearDataLen int + + // gzip if possible + // this could be double copying + clearDataLen = len(data) + if shouldGzipNow { + data, err = util.GzipData(data) + } else if isInputGzipped { + // just to get the clear data length + clearData, err := util.UnGzipData(data) + if err == nil { + clearDataLen = len(clearData) + } + } + + if cipher { + // encrypt(gzip(data)) + + // encrypt + cipherKey := util.GenCipherKey() + encryptedData, encryptionErr := util.Encrypt(data, cipherKey) + if encryptionErr != nil { + err = fmt.Errorf("encrypt input: %v", encryptionErr) + return + } + + // upload data + uploadResult, err = upload_content(uploadUrl, func(w io.Writer) (err error) { + _, err = w.Write(encryptedData) + return + }, "", false, "", nil, jwt) + if uploadResult != nil { + uploadResult.Name = filename + uploadResult.Mime = mtype + uploadResult.CipherKey = cipherKey + } + } else { + // upload data + uploadResult, err = upload_content(uploadUrl, func(w io.Writer) (err error) { + _, err = w.Write(data) + return + }, filename, contentIsGzipped, mtype, pairMap, jwt) + } + + uploadResult.Size = uint32(clearDataLen) + if contentIsGzipped { + uploadResult.Gzip = 1 + } + + return uploadResult, err } -func doUpload(uploadUrl string, filename string, reader io.Reader, isGzipped bool, mtype string, pairMap map[string]string, compression int, jwt security.EncodedJwt) (*UploadResult, error) { - contentIsGzipped := isGzipped +func doUpload(uploadUrl string, filename string, cipher bool, reader io.Reader, isInputGzipped bool, mtype string, pairMap map[string]string, compression int, jwt security.EncodedJwt) (uploadResult *UploadResult, err error) { + contentIsGzipped := isInputGzipped shouldGzipNow := false - if !isGzipped { - if shouldBeZipped, iAmSure := util.IsGzippableFileType(filepath.Base(filename), mtype); iAmSure && shouldBeZipped { + if !isInputGzipped { + if shouldBeZipped, iAmSure := util.IsGzippableFileType(filepath.Base(filename), mtype); mtype == "" || iAmSure && shouldBeZipped { shouldGzipNow = true contentIsGzipped = true } } - return upload_content(uploadUrl, func(w io.Writer) (err error) { - if shouldGzipNow { - gzWriter, _ := gzip.NewWriterLevel(w, compression) - _, err = io.Copy(gzWriter, reader) - gzWriter.Close() - } else { - _, err = io.Copy(w, reader) + + var clearDataLen int + + // gzip if possible + // this could be double copying + data, readErr := ioutil.ReadAll(reader) + if readErr != nil { + err = fmt.Errorf("read input: %v", readErr) + return + } + clearDataLen = len(data) + if shouldGzipNow { + data, err = util.GzipData(data) + } else if isInputGzipped { + // just to get the clear data length + clearData, err := util.UnGzipData(data) + if err == nil { + clearDataLen = len(clearData) + } + } + + if cipher { + // encrypt(gzip(data)) + + // encrypt + cipherKey := util.GenCipherKey() + encryptedData, encryptionErr := util.Encrypt(data, cipherKey) + if encryptionErr != nil { + err = fmt.Errorf("encrypt input: %v", encryptionErr) + return } + + // upload data + uploadResult, err = upload_content(uploadUrl, func(w io.Writer) (err error) { + _, err = w.Write(encryptedData) + return + }, "", false, "", nil, jwt) + if uploadResult != nil { + uploadResult.Name = filename + uploadResult.Mime = mtype + uploadResult.CipherKey = cipherKey + uploadResult.Size = uint32(clearDataLen) + } + } else { + // upload data + uploadResult, err = upload_content(uploadUrl, func(w io.Writer) (err error) { + _, err = w.Write(data) + return + }, filename, contentIsGzipped, mtype, pairMap, jwt) + } + + if uploadResult == nil { return - }, filename, contentIsGzipped, mtype, pairMap, jwt) + } + + uploadResult.Size = uint32(clearDataLen) + if contentIsGzipped { + uploadResult.Gzip = 1 + } + + return uploadResult, err } func upload_content(uploadUrl string, fillBufferFunction func(w io.Writer) error, filename string, isGzipped bool, mtype string, pairMap map[string]string, jwt security.EncodedJwt) (*UploadResult, error) { |
