diff options
Diffstat (limited to 'weed/operation/submit.go')
| -rw-r--r-- | weed/operation/submit.go | 40 |
1 files changed, 17 insertions, 23 deletions
diff --git a/weed/operation/submit.go b/weed/operation/submit.go index b81f64593..5e4dc4374 100644 --- a/weed/operation/submit.go +++ b/weed/operation/submit.go @@ -1,7 +1,6 @@ package operation import ( - "bytes" "io" "mime" "net/url" @@ -10,6 +9,8 @@ import ( "strconv" "strings" + "google.golang.org/grpc" + "github.com/chrislusf/seaweedfs/weed/glog" "github.com/chrislusf/seaweedfs/weed/security" ) @@ -18,7 +19,6 @@ type FilePart struct { Reader io.Reader FileName string FileSize int64 - IsGzipped bool MimeType string ModTime int64 //in seconds Replication string @@ -37,10 +37,8 @@ type SubmitResult struct { Error string `json:"error,omitempty"` } -func SubmitFiles(master string, files []FilePart, - replication string, collection string, dataCenter string, ttl string, maxMB int, - secret security.Secret, -) ([]SubmitResult, error) { +func SubmitFiles(master string, grpcDialOption grpc.DialOption, files []FilePart, + replication string, collection string, dataCenter string, ttl string, maxMB int) ([]SubmitResult, error) { results := make([]SubmitResult, len(files)) for index, file := range files { results[index].FileName = file.FileName @@ -52,9 +50,9 @@ func SubmitFiles(master string, files []FilePart, DataCenter: dataCenter, Ttl: ttl, } - ret, err := Assign(master, ar) + ret, err := Assign(master, grpcDialOption, ar) if err != nil { - for index, _ := range files { + for index := range files { results[index].Error = err.Error() } return results, err @@ -68,7 +66,7 @@ func SubmitFiles(master string, files []FilePart, file.Replication = replication file.Collection = collection file.DataCenter = dataCenter - results[index].Size, err = file.Upload(maxMB, master, secret) + results[index].Size, err = file.Upload(maxMB, master, ret.Auth, grpcDialOption) if err != nil { results[index].Error = err.Error() } @@ -103,7 +101,6 @@ func newFilePart(fullPathFilename string) (ret FilePart, err error) { ret.ModTime = fi.ModTime().UTC().Unix() ret.FileSize = fi.Size() ext := strings.ToLower(path.Ext(fullPathFilename)) - ret.IsGzipped = ext == ".gz" ret.FileName = fi.Name() if ext != "" { ret.MimeType = mime.TypeByExtension(ext) @@ -112,8 +109,7 @@ func newFilePart(fullPathFilename string) (ret FilePart, err error) { return ret, nil } -func (fi FilePart) Upload(maxMB int, master string, secret security.Secret) (retSize uint32, err error) { - jwt := security.GenJwt(secret, fi.Fid) +func (fi FilePart) Upload(maxMB int, master string, jwt security.EncodedJwt, grpcDialOption grpc.DialOption) (retSize uint32, err error) { fileUrl := "http://" + fi.Server + "/" + fi.Fid if fi.ModTime != 0 { fileUrl += "?ts=" + strconv.Itoa(int(fi.ModTime)) @@ -141,7 +137,7 @@ func (fi FilePart) Upload(maxMB int, master string, secret security.Secret) (ret Collection: fi.Collection, Ttl: fi.Ttl, } - ret, err = Assign(master, ar) + ret, err = Assign(master, grpcDialOption, ar) if err != nil { return } @@ -154,10 +150,10 @@ func (fi FilePart) Upload(maxMB int, master string, secret security.Secret) (ret Collection: fi.Collection, Ttl: fi.Ttl, } - ret, err = Assign(master, ar) + ret, err = Assign(master, grpcDialOption, ar) if err != nil { // delete all uploaded chunks - cm.DeleteChunks(master) + cm.DeleteChunks(master, grpcDialOption) return } id = ret.Fid @@ -172,10 +168,10 @@ func (fi FilePart) Upload(maxMB int, master string, secret security.Secret) (ret baseName+"-"+strconv.FormatInt(i+1, 10), io.LimitReader(fi.Reader, chunkSize), master, fileUrl, - jwt) + ret.Auth) if e != nil { // delete all uploaded chunks - cm.DeleteChunks(master) + cm.DeleteChunks(master, grpcDialOption) return 0, e } cm.Chunks = append(cm.Chunks, @@ -190,10 +186,10 @@ func (fi FilePart) Upload(maxMB int, master string, secret security.Secret) (ret err = upload_chunked_file_manifest(fileUrl, &cm, jwt) if err != nil { // delete all uploaded chunks - cm.DeleteChunks(master) + cm.DeleteChunks(master, grpcDialOption) } } else { - ret, e := Upload(fileUrl, baseName, fi.Reader, fi.IsGzipped, fi.MimeType, nil, jwt) + ret, e := Upload(fileUrl, baseName, false, fi.Reader, false, fi.MimeType, nil, jwt) if e != nil { return 0, e } @@ -206,8 +202,7 @@ func upload_one_chunk(filename string, reader io.Reader, master, fileUrl string, jwt security.EncodedJwt, ) (size uint32, e error) { glog.V(4).Info("Uploading part ", filename, " to ", fileUrl, "...") - uploadResult, uploadError := Upload(fileUrl, filename, reader, false, - "application/octet-stream", nil, jwt) + uploadResult, uploadError := Upload(fileUrl, filename, false, reader, false, "", nil, jwt) if uploadError != nil { return 0, uploadError } @@ -219,12 +214,11 @@ func upload_chunked_file_manifest(fileUrl string, manifest *ChunkManifest, jwt s if e != nil { return e } - bufReader := bytes.NewReader(buf) glog.V(4).Info("Uploading chunks manifest ", manifest.Name, " to ", fileUrl, "...") u, _ := url.Parse(fileUrl) q := u.Query() q.Set("cm", "true") u.RawQuery = q.Encode() - _, e = Upload(u.String(), manifest.Name, bufReader, false, "application/json", nil, jwt) + _, e = UploadData(u.String(), manifest.Name, false, buf, false, "application/json", nil, jwt) return e } |
