aboutsummaryrefslogtreecommitdiff
path: root/weed/operation/submit.go
diff options
context:
space:
mode:
Diffstat (limited to 'weed/operation/submit.go')
-rw-r--r--weed/operation/submit.go29
1 files changed, 18 insertions, 11 deletions
diff --git a/weed/operation/submit.go b/weed/operation/submit.go
index e785b68a9..87c5e4279 100644
--- a/weed/operation/submit.go
+++ b/weed/operation/submit.go
@@ -25,6 +25,7 @@ type FilePart struct {
Collection string
DataCenter string
Ttl string
+ DiskType string
Server string //this comes from assign result
Fid string //this comes from assign result, but customizable
Fsync bool
@@ -38,7 +39,9 @@ type SubmitResult struct {
Error string `json:"error,omitempty"`
}
-func SubmitFiles(master string, grpcDialOption grpc.DialOption, files []FilePart, replication string, collection string, dataCenter string, ttl string, maxMB int, usePublicUrl bool) ([]SubmitResult, error) {
+type GetMasterFn func() string
+
+func SubmitFiles(masterFn GetMasterFn, grpcDialOption grpc.DialOption, files []FilePart, replication string, collection string, dataCenter string, ttl string, diskType string, maxMB int, usePublicUrl bool) ([]SubmitResult, error) {
results := make([]SubmitResult, len(files))
for index, file := range files {
results[index].FileName = file.FileName
@@ -49,8 +52,9 @@ func SubmitFiles(master string, grpcDialOption grpc.DialOption, files []FilePart
Collection: collection,
DataCenter: dataCenter,
Ttl: ttl,
+ DiskType: diskType,
}
- ret, err := Assign(master, grpcDialOption, ar)
+ ret, err := Assign(masterFn, grpcDialOption, ar)
if err != nil {
for index := range files {
results[index].Error = err.Error()
@@ -70,7 +74,8 @@ func SubmitFiles(master string, grpcDialOption grpc.DialOption, files []FilePart
file.Collection = collection
file.DataCenter = dataCenter
file.Ttl = ttl
- results[index].Size, err = file.Upload(maxMB, master, usePublicUrl, ret.Auth, grpcDialOption)
+ file.DiskType = diskType
+ results[index].Size, err = file.Upload(maxMB, masterFn, usePublicUrl, ret.Auth, grpcDialOption)
if err != nil {
results[index].Error = err.Error()
}
@@ -113,7 +118,7 @@ func newFilePart(fullPathFilename string) (ret FilePart, err error) {
return ret, nil
}
-func (fi FilePart) Upload(maxMB int, master string, usePublicUrl bool, jwt security.EncodedJwt, grpcDialOption grpc.DialOption) (retSize uint32, err error) {
+func (fi FilePart) Upload(maxMB int, masterFn GetMasterFn, usePublicUrl bool, jwt security.EncodedJwt, grpcDialOption grpc.DialOption) (retSize uint32, err error) {
fileUrl := "http://" + fi.Server + "/" + fi.Fid
if fi.ModTime != 0 {
fileUrl += "?ts=" + strconv.Itoa(int(fi.ModTime))
@@ -143,8 +148,9 @@ func (fi FilePart) Upload(maxMB int, master string, usePublicUrl bool, jwt secur
Replication: fi.Replication,
Collection: fi.Collection,
Ttl: fi.Ttl,
+ DiskType: fi.DiskType,
}
- ret, err = Assign(master, grpcDialOption, ar)
+ ret, err = Assign(masterFn, grpcDialOption, ar)
if err != nil {
return
}
@@ -156,11 +162,12 @@ func (fi FilePart) Upload(maxMB int, master string, usePublicUrl bool, jwt secur
Replication: fi.Replication,
Collection: fi.Collection,
Ttl: fi.Ttl,
+ DiskType: fi.DiskType,
}
- ret, err = Assign(master, grpcDialOption, ar)
+ ret, err = Assign(masterFn, grpcDialOption, ar)
if err != nil {
// delete all uploaded chunks
- cm.DeleteChunks(master, usePublicUrl, grpcDialOption)
+ cm.DeleteChunks(masterFn, usePublicUrl, grpcDialOption)
return
}
id = ret.Fid
@@ -177,11 +184,11 @@ func (fi FilePart) Upload(maxMB int, master string, usePublicUrl bool, jwt secur
count, e := upload_one_chunk(
baseName+"-"+strconv.FormatInt(i+1, 10),
io.LimitReader(fi.Reader, chunkSize),
- master, fileUrl,
+ masterFn, fileUrl,
ret.Auth)
if e != nil {
// delete all uploaded chunks
- cm.DeleteChunks(master, usePublicUrl, grpcDialOption)
+ cm.DeleteChunks(masterFn, usePublicUrl, grpcDialOption)
return 0, e
}
cm.Chunks = append(cm.Chunks,
@@ -196,7 +203,7 @@ func (fi FilePart) Upload(maxMB int, master string, usePublicUrl bool, jwt secur
err = upload_chunked_file_manifest(fileUrl, &cm, jwt)
if err != nil {
// delete all uploaded chunks
- cm.DeleteChunks(master, usePublicUrl, grpcDialOption)
+ cm.DeleteChunks(masterFn, usePublicUrl, grpcDialOption)
}
} else {
ret, e, _ := Upload(fileUrl, baseName, false, fi.Reader, false, fi.MimeType, nil, jwt)
@@ -208,7 +215,7 @@ func (fi FilePart) Upload(maxMB int, master string, usePublicUrl bool, jwt secur
return
}
-func upload_one_chunk(filename string, reader io.Reader, master,
+func upload_one_chunk(filename string, reader io.Reader, masterFn GetMasterFn,
fileUrl string, jwt security.EncodedJwt,
) (size uint32, e error) {
glog.V(4).Info("Uploading part ", filename, " to ", fileUrl, "...")