aboutsummaryrefslogtreecommitdiff
path: root/weed/operation/submit.go
diff options
context:
space:
mode:
authorChris Lu <chris.lu@gmail.com>2016-06-02 18:09:14 -0700
committerChris Lu <chris.lu@gmail.com>2016-06-02 18:09:14 -0700
commit5ce6bbf07672bf3f3c8d26cd2ce0e3e853a47c44 (patch)
tree2e4dd2ad0a618ab2b7cdebcdb9c503526c31e2e8 /weed/operation/submit.go
parentcaeffa3998adc060fa66c4cd77af971ff2d26c57 (diff)
downloadseaweedfs-5ce6bbf07672bf3f3c8d26cd2ce0e3e853a47c44.tar.xz
seaweedfs-5ce6bbf07672bf3f3c8d26cd2ce0e3e853a47c44.zip
directory structure change to work with glide
glide has its own requirements. My previous workaround caused me some code checkin errors. Need to fix this.
Diffstat (limited to 'weed/operation/submit.go')
-rw-r--r--weed/operation/submit.go194
1 files changed, 194 insertions, 0 deletions
diff --git a/weed/operation/submit.go b/weed/operation/submit.go
new file mode 100644
index 000000000..19bbd7a70
--- /dev/null
+++ b/weed/operation/submit.go
@@ -0,0 +1,194 @@
+package operation
+
+import (
+ "bytes"
+ "io"
+ "mime"
+ "net/url"
+ "os"
+ "path"
+ "strconv"
+ "strings"
+
+ "github.com/chrislusf/seaweedfs/weed/glog"
+ "github.com/chrislusf/seaweedfs/weed/security"
+)
+
+type FilePart struct {
+ Reader io.Reader
+ FileName string
+ FileSize int64
+ IsGzipped bool
+ MimeType string
+ ModTime int64 //in seconds
+ Replication string
+ Collection string
+ Ttl string
+ Server string //this comes from assign result
+ Fid string //this comes from assign result, but customizable
+}
+
+type SubmitResult struct {
+ FileName string `json:"fileName,omitempty"`
+ FileUrl string `json:"fileUrl,omitempty"`
+ Fid string `json:"fid,omitempty"`
+ Size uint32 `json:"size,omitempty"`
+ Error string `json:"error,omitempty"`
+}
+
+func SubmitFiles(master string, files []FilePart,
+ replication string, collection string, ttl string, maxMB int,
+ secret security.Secret,
+) ([]SubmitResult, error) {
+ results := make([]SubmitResult, len(files))
+ for index, file := range files {
+ results[index].FileName = file.FileName
+ }
+ ret, err := Assign(master, uint64(len(files)), replication, collection, ttl)
+ if err != nil {
+ for index, _ := range files {
+ results[index].Error = err.Error()
+ }
+ return results, err
+ }
+ for index, file := range files {
+ file.Fid = ret.Fid
+ if index > 0 {
+ file.Fid = file.Fid + "_" + strconv.Itoa(index)
+ }
+ file.Server = ret.Url
+ file.Replication = replication
+ file.Collection = collection
+ results[index].Size, err = file.Upload(maxMB, master, secret)
+ if err != nil {
+ results[index].Error = err.Error()
+ }
+ results[index].Fid = file.Fid
+ results[index].FileUrl = ret.PublicUrl + "/" + file.Fid
+ }
+ return results, nil
+}
+
+func NewFileParts(fullPathFilenames []string) (ret []FilePart, err error) {
+ ret = make([]FilePart, len(fullPathFilenames))
+ for index, file := range fullPathFilenames {
+ if ret[index], err = newFilePart(file); err != nil {
+ return
+ }
+ }
+ return
+}
+func newFilePart(fullPathFilename string) (ret FilePart, err error) {
+ fh, openErr := os.Open(fullPathFilename)
+ if openErr != nil {
+ glog.V(0).Info("Failed to open file: ", fullPathFilename)
+ return ret, openErr
+ }
+ ret.Reader = fh
+
+ if fi, fiErr := fh.Stat(); fiErr != nil {
+ glog.V(0).Info("Failed to stat file:", fullPathFilename)
+ return ret, fiErr
+ } else {
+ ret.ModTime = fi.ModTime().UTC().Unix()
+ ret.FileSize = fi.Size()
+ }
+ ext := strings.ToLower(path.Ext(fullPathFilename))
+ ret.IsGzipped = ext == ".gz"
+ if ret.IsGzipped {
+ ret.FileName = fullPathFilename[0 : len(fullPathFilename)-3]
+ }
+ ret.FileName = fullPathFilename
+ if ext != "" {
+ ret.MimeType = mime.TypeByExtension(ext)
+ }
+
+ return ret, nil
+}
+
+func (fi FilePart) Upload(maxMB int, master string, secret security.Secret) (retSize uint32, err error) {
+ jwt := security.GenJwt(secret, fi.Fid)
+ fileUrl := "http://" + fi.Server + "/" + fi.Fid
+ if fi.ModTime != 0 {
+ fileUrl += "?ts=" + strconv.Itoa(int(fi.ModTime))
+ }
+ if closer, ok := fi.Reader.(io.Closer); ok {
+ defer closer.Close()
+ }
+ baseName := path.Base(fi.FileName)
+ if maxMB > 0 && fi.FileSize > int64(maxMB*1024*1024) {
+ chunkSize := int64(maxMB * 1024 * 1024)
+ chunks := fi.FileSize/chunkSize + 1
+ cm := ChunkManifest{
+ Name: baseName,
+ Size: fi.FileSize,
+ Mime: fi.MimeType,
+ Chunks: make([]*ChunkInfo, 0, chunks),
+ }
+
+ for i := int64(0); i < chunks; i++ {
+ id, count, e := upload_one_chunk(
+ baseName+"-"+strconv.FormatInt(i+1, 10),
+ io.LimitReader(fi.Reader, chunkSize),
+ master, fi.Replication, fi.Collection, fi.Ttl,
+ jwt)
+ if e != nil {
+ // delete all uploaded chunks
+ cm.DeleteChunks(master)
+ return 0, e
+ }
+ cm.Chunks = append(cm.Chunks,
+ &ChunkInfo{
+ Offset: i * chunkSize,
+ Size: int64(count),
+ Fid: id,
+ },
+ )
+ retSize += count
+ }
+ err = upload_chunked_file_manifest(fileUrl, &cm, jwt)
+ if err != nil {
+ // delete all uploaded chunks
+ cm.DeleteChunks(master)
+ }
+ } else {
+ ret, e := Upload(fileUrl, baseName, fi.Reader, fi.IsGzipped, fi.MimeType, jwt)
+ if e != nil {
+ return 0, e
+ }
+ return ret.Size, e
+ }
+ return
+}
+
+func upload_one_chunk(filename string, reader io.Reader, master,
+ replication string, collection string, ttl string, jwt security.EncodedJwt,
+) (fid string, size uint32, e error) {
+ ret, err := Assign(master, 1, replication, collection, ttl)
+ if err != nil {
+ return "", 0, err
+ }
+ fileUrl, fid := "http://"+ret.Url+"/"+ret.Fid, ret.Fid
+ glog.V(4).Info("Uploading part ", filename, " to ", fileUrl, "...")
+ uploadResult, uploadError := Upload(fileUrl, filename, reader, false,
+ "application/octet-stream", jwt)
+ if uploadError != nil {
+ return fid, 0, uploadError
+ }
+ return fid, uploadResult.Size, nil
+}
+
+func upload_chunked_file_manifest(fileUrl string, manifest *ChunkManifest, jwt security.EncodedJwt) error {
+ buf, e := manifest.Marshal()
+ if e != nil {
+ return e
+ }
+ bufReader := bytes.NewReader(buf)
+ glog.V(4).Info("Uploading chunks manifest ", manifest.Name, " to ", fileUrl, "...")
+ u, _ := url.Parse(fileUrl)
+ q := u.Query()
+ q.Set("cm", "true")
+ u.RawQuery = q.Encode()
+ _, e = Upload(u.String(), manifest.Name, bufReader, false, "application/json", jwt)
+ return e
+}