aboutsummaryrefslogtreecommitdiff
path: root/weed/operation/submit.go
diff options
context:
space:
mode:
Diffstat (limited to 'weed/operation/submit.go')
-rw-r--r--weed/operation/submit.go40
1 files changed, 17 insertions, 23 deletions
diff --git a/weed/operation/submit.go b/weed/operation/submit.go
index b81f64593..5e4dc4374 100644
--- a/weed/operation/submit.go
+++ b/weed/operation/submit.go
@@ -1,7 +1,6 @@
package operation
import (
- "bytes"
"io"
"mime"
"net/url"
@@ -10,6 +9,8 @@ import (
"strconv"
"strings"
+ "google.golang.org/grpc"
+
"github.com/chrislusf/seaweedfs/weed/glog"
"github.com/chrislusf/seaweedfs/weed/security"
)
@@ -18,7 +19,6 @@ type FilePart struct {
Reader io.Reader
FileName string
FileSize int64
- IsGzipped bool
MimeType string
ModTime int64 //in seconds
Replication string
@@ -37,10 +37,8 @@ type SubmitResult struct {
Error string `json:"error,omitempty"`
}
-func SubmitFiles(master string, files []FilePart,
- replication string, collection string, dataCenter string, ttl string, maxMB int,
- secret security.Secret,
-) ([]SubmitResult, error) {
+func SubmitFiles(master string, grpcDialOption grpc.DialOption, files []FilePart,
+ replication string, collection string, dataCenter string, ttl string, maxMB int) ([]SubmitResult, error) {
results := make([]SubmitResult, len(files))
for index, file := range files {
results[index].FileName = file.FileName
@@ -52,9 +50,9 @@ func SubmitFiles(master string, files []FilePart,
DataCenter: dataCenter,
Ttl: ttl,
}
- ret, err := Assign(master, ar)
+ ret, err := Assign(master, grpcDialOption, ar)
if err != nil {
- for index, _ := range files {
+ for index := range files {
results[index].Error = err.Error()
}
return results, err
@@ -68,7 +66,7 @@ func SubmitFiles(master string, files []FilePart,
file.Replication = replication
file.Collection = collection
file.DataCenter = dataCenter
- results[index].Size, err = file.Upload(maxMB, master, secret)
+ results[index].Size, err = file.Upload(maxMB, master, ret.Auth, grpcDialOption)
if err != nil {
results[index].Error = err.Error()
}
@@ -103,7 +101,6 @@ func newFilePart(fullPathFilename string) (ret FilePart, err error) {
ret.ModTime = fi.ModTime().UTC().Unix()
ret.FileSize = fi.Size()
ext := strings.ToLower(path.Ext(fullPathFilename))
- ret.IsGzipped = ext == ".gz"
ret.FileName = fi.Name()
if ext != "" {
ret.MimeType = mime.TypeByExtension(ext)
@@ -112,8 +109,7 @@ func newFilePart(fullPathFilename string) (ret FilePart, err error) {
return ret, nil
}
-func (fi FilePart) Upload(maxMB int, master string, secret security.Secret) (retSize uint32, err error) {
- jwt := security.GenJwt(secret, fi.Fid)
+func (fi FilePart) Upload(maxMB int, master string, jwt security.EncodedJwt, grpcDialOption grpc.DialOption) (retSize uint32, err error) {
fileUrl := "http://" + fi.Server + "/" + fi.Fid
if fi.ModTime != 0 {
fileUrl += "?ts=" + strconv.Itoa(int(fi.ModTime))
@@ -141,7 +137,7 @@ func (fi FilePart) Upload(maxMB int, master string, secret security.Secret) (ret
Collection: fi.Collection,
Ttl: fi.Ttl,
}
- ret, err = Assign(master, ar)
+ ret, err = Assign(master, grpcDialOption, ar)
if err != nil {
return
}
@@ -154,10 +150,10 @@ func (fi FilePart) Upload(maxMB int, master string, secret security.Secret) (ret
Collection: fi.Collection,
Ttl: fi.Ttl,
}
- ret, err = Assign(master, ar)
+ ret, err = Assign(master, grpcDialOption, ar)
if err != nil {
// delete all uploaded chunks
- cm.DeleteChunks(master)
+ cm.DeleteChunks(master, grpcDialOption)
return
}
id = ret.Fid
@@ -172,10 +168,10 @@ func (fi FilePart) Upload(maxMB int, master string, secret security.Secret) (ret
baseName+"-"+strconv.FormatInt(i+1, 10),
io.LimitReader(fi.Reader, chunkSize),
master, fileUrl,
- jwt)
+ ret.Auth)
if e != nil {
// delete all uploaded chunks
- cm.DeleteChunks(master)
+ cm.DeleteChunks(master, grpcDialOption)
return 0, e
}
cm.Chunks = append(cm.Chunks,
@@ -190,10 +186,10 @@ func (fi FilePart) Upload(maxMB int, master string, secret security.Secret) (ret
err = upload_chunked_file_manifest(fileUrl, &cm, jwt)
if err != nil {
// delete all uploaded chunks
- cm.DeleteChunks(master)
+ cm.DeleteChunks(master, grpcDialOption)
}
} else {
- ret, e := Upload(fileUrl, baseName, fi.Reader, fi.IsGzipped, fi.MimeType, nil, jwt)
+ ret, e := Upload(fileUrl, baseName, false, fi.Reader, false, fi.MimeType, nil, jwt)
if e != nil {
return 0, e
}
@@ -206,8 +202,7 @@ func upload_one_chunk(filename string, reader io.Reader, master,
fileUrl string, jwt security.EncodedJwt,
) (size uint32, e error) {
glog.V(4).Info("Uploading part ", filename, " to ", fileUrl, "...")
- uploadResult, uploadError := Upload(fileUrl, filename, reader, false,
- "application/octet-stream", nil, jwt)
+ uploadResult, uploadError := Upload(fileUrl, filename, false, reader, false, "", nil, jwt)
if uploadError != nil {
return 0, uploadError
}
@@ -219,12 +214,11 @@ func upload_chunked_file_manifest(fileUrl string, manifest *ChunkManifest, jwt s
if e != nil {
return e
}
- bufReader := bytes.NewReader(buf)
glog.V(4).Info("Uploading chunks manifest ", manifest.Name, " to ", fileUrl, "...")
u, _ := url.Parse(fileUrl)
q := u.Query()
q.Set("cm", "true")
u.RawQuery = q.Encode()
- _, e = Upload(u.String(), manifest.Name, bufReader, false, "application/json", nil, jwt)
+ _, e = UploadData(u.String(), manifest.Name, false, buf, false, "application/json", nil, jwt)
return e
}