aboutsummaryrefslogtreecommitdiff
path: root/weed/operation/submit.go
diff options
context:
space:
mode:
authoryourchanges <yourchanges@gmail.com>2020-07-10 09:44:32 +0800
committerGitHub <noreply@github.com>2020-07-10 09:44:32 +0800
commite67096656b0fcdc313c7d8983b6ce36a54d794a3 (patch)
tree4d6cfd722cf6e19b5aa8253e477ddc596ea5e193 /weed/operation/submit.go
parent2b3cef7780a5e91d2072a33411926f9b30c88ee2 (diff)
parent1b680c06c1de27e6a3899c089ec354a9eb08ea44 (diff)
downloadseaweedfs-e67096656b0fcdc313c7d8983b6ce36a54d794a3.tar.xz
seaweedfs-e67096656b0fcdc313c7d8983b6ce36a54d794a3.zip
Merge pull request #1 from chrislusf/master
update
Diffstat (limited to 'weed/operation/submit.go')
-rw-r--r--weed/operation/submit.go44
1 files changed, 23 insertions, 21 deletions
diff --git a/weed/operation/submit.go b/weed/operation/submit.go
index 7a1a3085e..e8bec382a 100644
--- a/weed/operation/submit.go
+++ b/weed/operation/submit.go
@@ -1,7 +1,6 @@
package operation
import (
- "bytes"
"io"
"mime"
"net/url"
@@ -10,6 +9,8 @@ import (
"strconv"
"strings"
+ "google.golang.org/grpc"
+
"github.com/chrislusf/seaweedfs/weed/glog"
"github.com/chrislusf/seaweedfs/weed/security"
)
@@ -26,6 +27,7 @@ type FilePart struct {
Ttl string
Server string //this comes from assign result
Fid string //this comes from assign result, but customizable
+ Fsync bool
}
type SubmitResult struct {
@@ -36,10 +38,7 @@ type SubmitResult struct {
Error string `json:"error,omitempty"`
}
-func SubmitFiles(master string, files []FilePart,
- replication string, collection string, dataCenter string, ttl string, maxMB int,
- secret security.Secret,
-) ([]SubmitResult, error) {
+func SubmitFiles(master string, grpcDialOption grpc.DialOption, files []FilePart, replication string, collection string, dataCenter string, ttl string, maxMB int, usePublicUrl bool) ([]SubmitResult, error) {
results := make([]SubmitResult, len(files))
for index, file := range files {
results[index].FileName = file.FileName
@@ -51,9 +50,9 @@ func SubmitFiles(master string, files []FilePart,
DataCenter: dataCenter,
Ttl: ttl,
}
- ret, err := Assign(master, ar)
+ ret, err := Assign(master, grpcDialOption, ar)
if err != nil {
- for index, _ := range files {
+ for index := range files {
results[index].Error = err.Error()
}
return results, err
@@ -64,10 +63,13 @@ func SubmitFiles(master string, files []FilePart,
file.Fid = file.Fid + "_" + strconv.Itoa(index)
}
file.Server = ret.Url
+ if usePublicUrl {
+ file.Server = ret.PublicUrl
+ }
file.Replication = replication
file.Collection = collection
file.DataCenter = dataCenter
- results[index].Size, err = file.Upload(maxMB, master, secret)
+ results[index].Size, err = file.Upload(maxMB, master, usePublicUrl, ret.Auth, grpcDialOption)
if err != nil {
results[index].Error = err.Error()
}
@@ -110,12 +112,14 @@ func newFilePart(fullPathFilename string) (ret FilePart, err error) {
return ret, nil
}
-func (fi FilePart) Upload(maxMB int, master string, secret security.Secret) (retSize uint32, err error) {
- jwt := security.GenJwt(secret, fi.Fid)
+func (fi FilePart) Upload(maxMB int, master string, usePublicUrl bool, jwt security.EncodedJwt, grpcDialOption grpc.DialOption) (retSize uint32, err error) {
fileUrl := "http://" + fi.Server + "/" + fi.Fid
if fi.ModTime != 0 {
fileUrl += "?ts=" + strconv.Itoa(int(fi.ModTime))
}
+ if fi.Fsync {
+ fileUrl += "?fsync=true"
+ }
if closer, ok := fi.Reader.(io.Closer); ok {
defer closer.Close()
}
@@ -139,7 +143,7 @@ func (fi FilePart) Upload(maxMB int, master string, secret security.Secret) (ret
Collection: fi.Collection,
Ttl: fi.Ttl,
}
- ret, err = Assign(master, ar)
+ ret, err = Assign(master, grpcDialOption, ar)
if err != nil {
return
}
@@ -152,10 +156,10 @@ func (fi FilePart) Upload(maxMB int, master string, secret security.Secret) (ret
Collection: fi.Collection,
Ttl: fi.Ttl,
}
- ret, err = Assign(master, ar)
+ ret, err = Assign(master, grpcDialOption, ar)
if err != nil {
// delete all uploaded chunks
- cm.DeleteChunks(master)
+ cm.DeleteChunks(master, usePublicUrl, grpcDialOption)
return
}
id = ret.Fid
@@ -170,10 +174,10 @@ func (fi FilePart) Upload(maxMB int, master string, secret security.Secret) (ret
baseName+"-"+strconv.FormatInt(i+1, 10),
io.LimitReader(fi.Reader, chunkSize),
master, fileUrl,
- jwt)
+ ret.Auth)
if e != nil {
// delete all uploaded chunks
- cm.DeleteChunks(master)
+ cm.DeleteChunks(master, usePublicUrl, grpcDialOption)
return 0, e
}
cm.Chunks = append(cm.Chunks,
@@ -188,10 +192,10 @@ func (fi FilePart) Upload(maxMB int, master string, secret security.Secret) (ret
err = upload_chunked_file_manifest(fileUrl, &cm, jwt)
if err != nil {
// delete all uploaded chunks
- cm.DeleteChunks(master)
+ cm.DeleteChunks(master, usePublicUrl, grpcDialOption)
}
} else {
- ret, e := Upload(fileUrl, baseName, fi.Reader, false, fi.MimeType, nil, jwt)
+ ret, e, _ := Upload(fileUrl, baseName, false, fi.Reader, false, fi.MimeType, nil, jwt)
if e != nil {
return 0, e
}
@@ -204,8 +208,7 @@ func upload_one_chunk(filename string, reader io.Reader, master,
fileUrl string, jwt security.EncodedJwt,
) (size uint32, e error) {
glog.V(4).Info("Uploading part ", filename, " to ", fileUrl, "...")
- uploadResult, uploadError := Upload(fileUrl, filename, reader, false,
- "application/octet-stream", nil, jwt)
+ uploadResult, uploadError, _ := Upload(fileUrl, filename, false, reader, false, "", nil, jwt)
if uploadError != nil {
return 0, uploadError
}
@@ -217,12 +220,11 @@ func upload_chunked_file_manifest(fileUrl string, manifest *ChunkManifest, jwt s
if e != nil {
return e
}
- bufReader := bytes.NewReader(buf)
glog.V(4).Info("Uploading chunks manifest ", manifest.Name, " to ", fileUrl, "...")
u, _ := url.Parse(fileUrl)
q := u.Query()
q.Set("cm", "true")
u.RawQuery = q.Encode()
- _, e = Upload(u.String(), manifest.Name, bufReader, false, "application/json", nil, jwt)
+ _, e = UploadData(u.String(), manifest.Name, false, buf, false, "application/json", nil, jwt)
return e
}