diff options
| author | bingoohuang <bingoo.huang@gmail.com> | 2021-02-18 13:57:34 +0800 |
|---|---|---|
| committer | GitHub <noreply@github.com> | 2021-02-18 13:57:34 +0800 |
| commit | c8f56f5712c1efffc46de95a8057ed09c21da2db (patch) | |
| tree | bc3330e274901d782395b7396cb54d7cc42608b1 /weed/operation/submit.go | |
| parent | 12a78335860c4b1e220748e4adc4097050af5272 (diff) | |
| parent | 3575d41009e4367658e75e6ae780c6260b80daf9 (diff) | |
| download | seaweedfs-c8f56f5712c1efffc46de95a8057ed09c21da2db.tar.xz seaweedfs-c8f56f5712c1efffc46de95a8057ed09c21da2db.zip | |
Merge pull request #2 from chrislusf/master
Diffstat (limited to 'weed/operation/submit.go')
| -rw-r--r-- | weed/operation/submit.go | 29 |
1 files changed, 18 insertions, 11 deletions
diff --git a/weed/operation/submit.go b/weed/operation/submit.go index e785b68a9..87c5e4279 100644 --- a/weed/operation/submit.go +++ b/weed/operation/submit.go @@ -25,6 +25,7 @@ type FilePart struct { Collection string DataCenter string Ttl string + DiskType string Server string //this comes from assign result Fid string //this comes from assign result, but customizable Fsync bool @@ -38,7 +39,9 @@ type SubmitResult struct { Error string `json:"error,omitempty"` } -func SubmitFiles(master string, grpcDialOption grpc.DialOption, files []FilePart, replication string, collection string, dataCenter string, ttl string, maxMB int, usePublicUrl bool) ([]SubmitResult, error) { +type GetMasterFn func() string + +func SubmitFiles(masterFn GetMasterFn, grpcDialOption grpc.DialOption, files []FilePart, replication string, collection string, dataCenter string, ttl string, diskType string, maxMB int, usePublicUrl bool) ([]SubmitResult, error) { results := make([]SubmitResult, len(files)) for index, file := range files { results[index].FileName = file.FileName @@ -49,8 +52,9 @@ func SubmitFiles(master string, grpcDialOption grpc.DialOption, files []FilePart Collection: collection, DataCenter: dataCenter, Ttl: ttl, + DiskType: diskType, } - ret, err := Assign(master, grpcDialOption, ar) + ret, err := Assign(masterFn, grpcDialOption, ar) if err != nil { for index := range files { results[index].Error = err.Error() @@ -70,7 +74,8 @@ func SubmitFiles(master string, grpcDialOption grpc.DialOption, files []FilePart file.Collection = collection file.DataCenter = dataCenter file.Ttl = ttl - results[index].Size, err = file.Upload(maxMB, master, usePublicUrl, ret.Auth, grpcDialOption) + file.DiskType = diskType + results[index].Size, err = file.Upload(maxMB, masterFn, usePublicUrl, ret.Auth, grpcDialOption) if err != nil { results[index].Error = err.Error() } @@ -113,7 +118,7 @@ func newFilePart(fullPathFilename string) (ret FilePart, err error) { return ret, nil } -func (fi FilePart) Upload(maxMB int, master string, usePublicUrl bool, jwt security.EncodedJwt, grpcDialOption grpc.DialOption) (retSize uint32, err error) { +func (fi FilePart) Upload(maxMB int, masterFn GetMasterFn, usePublicUrl bool, jwt security.EncodedJwt, grpcDialOption grpc.DialOption) (retSize uint32, err error) { fileUrl := "http://" + fi.Server + "/" + fi.Fid if fi.ModTime != 0 { fileUrl += "?ts=" + strconv.Itoa(int(fi.ModTime)) @@ -143,8 +148,9 @@ func (fi FilePart) Upload(maxMB int, master string, usePublicUrl bool, jwt secur Replication: fi.Replication, Collection: fi.Collection, Ttl: fi.Ttl, + DiskType: fi.DiskType, } - ret, err = Assign(master, grpcDialOption, ar) + ret, err = Assign(masterFn, grpcDialOption, ar) if err != nil { return } @@ -156,11 +162,12 @@ func (fi FilePart) Upload(maxMB int, master string, usePublicUrl bool, jwt secur Replication: fi.Replication, Collection: fi.Collection, Ttl: fi.Ttl, + DiskType: fi.DiskType, } - ret, err = Assign(master, grpcDialOption, ar) + ret, err = Assign(masterFn, grpcDialOption, ar) if err != nil { // delete all uploaded chunks - cm.DeleteChunks(master, usePublicUrl, grpcDialOption) + cm.DeleteChunks(masterFn, usePublicUrl, grpcDialOption) return } id = ret.Fid @@ -177,11 +184,11 @@ func (fi FilePart) Upload(maxMB int, master string, usePublicUrl bool, jwt secur count, e := upload_one_chunk( baseName+"-"+strconv.FormatInt(i+1, 10), io.LimitReader(fi.Reader, chunkSize), - master, fileUrl, + masterFn, fileUrl, ret.Auth) if e != nil { // delete all uploaded chunks - cm.DeleteChunks(master, usePublicUrl, grpcDialOption) + cm.DeleteChunks(masterFn, usePublicUrl, grpcDialOption) return 0, e } cm.Chunks = append(cm.Chunks, @@ -196,7 +203,7 @@ func (fi FilePart) Upload(maxMB int, master string, usePublicUrl bool, jwt secur err = upload_chunked_file_manifest(fileUrl, &cm, jwt) if err != nil { // delete all uploaded chunks - cm.DeleteChunks(master, usePublicUrl, grpcDialOption) + cm.DeleteChunks(masterFn, usePublicUrl, grpcDialOption) } } else { ret, e, _ := Upload(fileUrl, baseName, false, fi.Reader, false, fi.MimeType, nil, jwt) @@ -208,7 +215,7 @@ func (fi FilePart) Upload(maxMB int, master string, usePublicUrl bool, jwt secur return } -func upload_one_chunk(filename string, reader io.Reader, master, +func upload_one_chunk(filename string, reader io.Reader, masterFn GetMasterFn, fileUrl string, jwt security.EncodedJwt, ) (size uint32, e error) { glog.V(4).Info("Uploading part ", filename, " to ", fileUrl, "...") |
