aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorChris Lu <chrislusf@users.noreply.github.com>2017-01-09 10:07:07 -0800
committerGitHub <noreply@github.com>2017-01-09 10:07:07 -0800
commit9fa648e570b6c164f670c2360a45e97ee047df5d (patch)
tree6423b506709c743004313a45efd2587f63df38f3
parent07a51815e9f2aeb8d196be5136323a77bb24acc7 (diff)
parent66e7013dfe1f14f6436c07a0ccf5aaa19ea599f7 (diff)
downloadseaweedfs-9fa648e570b6c164f670c2360a45e97ee047df5d.tar.xz
seaweedfs-9fa648e570b6c164f670c2360a45e97ee047df5d.zip
Merge pull request #435 from sparklxb/master
support uploading files to specific dataCenter
-rw-r--r--weed/command/filer_copy.go2
-rw-r--r--weed/command/upload.go6
-rw-r--r--weed/operation/submit.go63
3 files changed, 50 insertions, 21 deletions
diff --git a/weed/command/filer_copy.go b/weed/command/filer_copy.go
index 2aa994f6f..da7fb43bb 100644
--- a/weed/command/filer_copy.go
+++ b/weed/command/filer_copy.go
@@ -126,7 +126,7 @@ func doEachCopy(fileOrDir string, host string, path string) bool {
}
results, err := operation.SubmitFiles(*copy.master, parts,
- *copy.replication, *copy.collection,
+ *copy.replication, *copy.collection, "",
*copy.ttl, *copy.maxMB, copy.secret)
if err != nil {
fmt.Printf("Failed to submit file %s: %v", fileOrDir, err)
diff --git a/weed/command/upload.go b/weed/command/upload.go
index d7a468610..72ef0af73 100644
--- a/weed/command/upload.go
+++ b/weed/command/upload.go
@@ -20,6 +20,7 @@ type UploadOptions struct {
include *string
replication *string
collection *string
+ dataCenter *string
ttl *string
maxMB *int
secretKey *string
@@ -33,6 +34,7 @@ func init() {
upload.include = cmdUpload.Flag.String("include", "", "pattens of files to upload, e.g., *.pdf, *.html, ab?d.txt, works together with -dir")
upload.replication = cmdUpload.Flag.String("replication", "", "replication type")
upload.collection = cmdUpload.Flag.String("collection", "", "optional collection name")
+ upload.dataCenter = cmdUpload.Flag.String("dataCenter", "", "optional data center name")
upload.ttl = cmdUpload.Flag.String("ttl", "", "time to live, e.g.: 1m, 1h, 1d, 1M, 1y")
upload.maxMB = cmdUpload.Flag.Int("maxMB", 0, "split files larger than the limit")
upload.secretKey = cmdUpload.Flag.String("secure.secret", "", "secret to encrypt Json Web Token(JWT)")
@@ -80,7 +82,7 @@ func runUpload(cmd *Command, args []string) bool {
return e
}
results, e := operation.SubmitFiles(*upload.master, parts,
- *upload.replication, *upload.collection,
+ *upload.replication, *upload.collection, *upload.dataCenter,
*upload.ttl, *upload.maxMB, secret)
bytes, _ := json.Marshal(results)
fmt.Println(string(bytes))
@@ -99,7 +101,7 @@ func runUpload(cmd *Command, args []string) bool {
fmt.Println(e.Error())
}
results, _ := operation.SubmitFiles(*upload.master, parts,
- *upload.replication, *upload.collection,
+ *upload.replication, *upload.collection, *upload.dataCenter,
*upload.ttl, *upload.maxMB, secret)
bytes, _ := json.Marshal(results)
fmt.Println(string(bytes))
diff --git a/weed/operation/submit.go b/weed/operation/submit.go
index 75d5afbde..349cddfce 100644
--- a/weed/operation/submit.go
+++ b/weed/operation/submit.go
@@ -23,6 +23,7 @@ type FilePart struct {
ModTime int64 //in seconds
Replication string
Collection string
+ DataCenter string
Ttl string
Server string //this comes from assign result
Fid string //this comes from assign result, but customizable
@@ -37,7 +38,7 @@ type SubmitResult struct {
}
func SubmitFiles(master string, files []FilePart,
- replication string, collection string, ttl string, maxMB int,
+ replication string, collection string, dataCenter string, ttl string, maxMB int,
secret security.Secret,
) ([]SubmitResult, error) {
results := make([]SubmitResult, len(files))
@@ -48,6 +49,7 @@ func SubmitFiles(master string, files []FilePart,
Count: uint64(len(files)),
Replication: replication,
Collection: collection,
+ DataCenter: dataCenter,
Ttl: ttl,
}
ret, err := Assign(master, ar)
@@ -65,6 +67,7 @@ func SubmitFiles(master string, files []FilePart,
file.Server = ret.Url
file.Replication = replication
file.Collection = collection
+ file.DataCenter = dataCenter
results[index].Size, err = file.Upload(maxMB, master, secret)
if err != nil {
results[index].Error = err.Error()
@@ -129,11 +132,46 @@ func (fi FilePart) Upload(maxMB int, master string, secret security.Secret) (ret
Chunks: make([]*ChunkInfo, 0, chunks),
}
+ var ret *AssignResult
+ var id string
+ if fi.DataCenter != "" {
+ ar := &VolumeAssignRequest{
+ Count: uint64(chunks),
+ Replication: fi.Replication,
+ Collection: fi.Collection,
+ Ttl: fi.Ttl,
+ }
+ ret, err = Assign(master, ar)
+ if err != nil {
+ return
+ }
+ }
for i := int64(0); i < chunks; i++ {
- id, count, e := upload_one_chunk(
+ if fi.DataCenter == "" {
+ ar := &VolumeAssignRequest{
+ Count: 1,
+ Replication: fi.Replication,
+ Collection: fi.Collection,
+ Ttl: fi.Ttl,
+ }
+ ret, err = Assign(master, ar)
+ if err != nil {
+ // delete all uploaded chunks
+ cm.DeleteChunks(master)
+ return
+ }
+ id = ret.Fid
+ } else {
+ id = ret.Fid
+ if i > 0 {
+ id += "_" + strconv.FormatInt(i, 10)
+ }
+ }
+ fileUrl := "http://" + ret.Url + "/" + id
+ count, e := upload_one_chunk(
baseName+"-"+strconv.FormatInt(i+1, 10),
io.LimitReader(fi.Reader, chunkSize),
- master, fi.Replication, fi.Collection, fi.Ttl,
+ master, fileUrl,
jwt)
if e != nil {
// delete all uploaded chunks
@@ -165,26 +203,15 @@ func (fi FilePart) Upload(maxMB int, master string, secret security.Secret) (ret
}
func upload_one_chunk(filename string, reader io.Reader, master,
- replication string, collection string, ttl string, jwt security.EncodedJwt,
-) (fid string, size uint32, e error) {
- ar := &VolumeAssignRequest{
- Count: 1,
- Replication: replication,
- Collection: collection,
- Ttl: ttl,
- }
- ret, err := Assign(master, ar)
- if err != nil {
- return "", 0, err
- }
- fileUrl, fid := "http://"+ret.Url+"/"+ret.Fid, ret.Fid
+ fileUrl string, jwt security.EncodedJwt,
+) (size uint32, e error) {
glog.V(4).Info("Uploading part ", filename, " to ", fileUrl, "...")
uploadResult, uploadError := Upload(fileUrl, filename, reader, false,
"application/octet-stream", nil, jwt)
if uploadError != nil {
- return fid, 0, uploadError
+ return 0, uploadError
}
- return fid, uploadResult.Size, nil
+ return uploadResult.Size, nil
}
func upload_chunked_file_manifest(fileUrl string, manifest *ChunkManifest, jwt security.EncodedJwt) error {