diff options
| author | sparklxb <sparklxb@163.com> | 2017-01-09 23:34:07 +0800 |
|---|---|---|
| committer | sparklxb <sparklxb@163.com> | 2017-01-09 23:34:47 +0800 |
| commit | 66e7013dfe1f14f6436c07a0ccf5aaa19ea599f7 (patch) | |
| tree | 0e25010198857b9ef164a6c538acb5063b74a38f /weed/operation/submit.go | |
| parent | 53cf1b4900630883ef38a95324cda29f50e75b8d (diff) | |
| download | seaweedfs-66e7013dfe1f14f6436c07a0ccf5aaa19ea599f7.tar.xz seaweedfs-66e7013dfe1f14f6436c07a0ccf5aaa19ea599f7.zip | |
suport uploading files to specific dataCenter
Diffstat (limited to 'weed/operation/submit.go')
| -rw-r--r-- | weed/operation/submit.go | 63 |
1 files changed, 45 insertions, 18 deletions
diff --git a/weed/operation/submit.go b/weed/operation/submit.go index 75d5afbde..349cddfce 100644 --- a/weed/operation/submit.go +++ b/weed/operation/submit.go @@ -23,6 +23,7 @@ type FilePart struct { ModTime int64 //in seconds Replication string Collection string + DataCenter string Ttl string Server string //this comes from assign result Fid string //this comes from assign result, but customizable @@ -37,7 +38,7 @@ type SubmitResult struct { } func SubmitFiles(master string, files []FilePart, - replication string, collection string, ttl string, maxMB int, + replication string, collection string, dataCenter string, ttl string, maxMB int, secret security.Secret, ) ([]SubmitResult, error) { results := make([]SubmitResult, len(files)) @@ -48,6 +49,7 @@ func SubmitFiles(master string, files []FilePart, Count: uint64(len(files)), Replication: replication, Collection: collection, + DataCenter: dataCenter, Ttl: ttl, } ret, err := Assign(master, ar) @@ -65,6 +67,7 @@ func SubmitFiles(master string, files []FilePart, file.Server = ret.Url file.Replication = replication file.Collection = collection + file.DataCenter = dataCenter results[index].Size, err = file.Upload(maxMB, master, secret) if err != nil { results[index].Error = err.Error() @@ -129,11 +132,46 @@ func (fi FilePart) Upload(maxMB int, master string, secret security.Secret) (ret Chunks: make([]*ChunkInfo, 0, chunks), } + var ret *AssignResult + var id string + if fi.DataCenter != "" { + ar := &VolumeAssignRequest{ + Count: uint64(chunks), + Replication: fi.Replication, + Collection: fi.Collection, + Ttl: fi.Ttl, + } + ret, err = Assign(master, ar) + if err != nil { + return + } + } for i := int64(0); i < chunks; i++ { - id, count, e := upload_one_chunk( + if fi.DataCenter == "" { + ar := &VolumeAssignRequest{ + Count: 1, + Replication: fi.Replication, + Collection: fi.Collection, + Ttl: fi.Ttl, + } + ret, err = Assign(master, ar) + if err != nil { + // delete all uploaded chunks + cm.DeleteChunks(master) + return + } + id = ret.Fid + } else { + id = ret.Fid + if i > 0 { + id += "_" + strconv.FormatInt(i, 10) + } + } + fileUrl := "http://" + ret.Url + "/" + id + count, e := upload_one_chunk( baseName+"-"+strconv.FormatInt(i+1, 10), io.LimitReader(fi.Reader, chunkSize), - master, fi.Replication, fi.Collection, fi.Ttl, + master, fileUrl, jwt) if e != nil { // delete all uploaded chunks @@ -165,26 +203,15 @@ func (fi FilePart) Upload(maxMB int, master string, secret security.Secret) (ret } func upload_one_chunk(filename string, reader io.Reader, master, - replication string, collection string, ttl string, jwt security.EncodedJwt, -) (fid string, size uint32, e error) { - ar := &VolumeAssignRequest{ - Count: 1, - Replication: replication, - Collection: collection, - Ttl: ttl, - } - ret, err := Assign(master, ar) - if err != nil { - return "", 0, err - } - fileUrl, fid := "http://"+ret.Url+"/"+ret.Fid, ret.Fid + fileUrl string, jwt security.EncodedJwt, +) (size uint32, e error) { glog.V(4).Info("Uploading part ", filename, " to ", fileUrl, "...") uploadResult, uploadError := Upload(fileUrl, filename, reader, false, "application/octet-stream", nil, jwt) if uploadError != nil { - return fid, 0, uploadError + return 0, uploadError } - return fid, uploadResult.Size, nil + return uploadResult.Size, nil } func upload_chunked_file_manifest(fileUrl string, manifest *ChunkManifest, jwt security.EncodedJwt) error { |
