aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--weed-fs/src/cmd/weed/upload.go5
-rw-r--r--weed-fs/src/cmd/weed/volume.go68
-rw-r--r--weed-fs/src/pkg/operation/delete_content.go2
-rw-r--r--weed-fs/src/pkg/operation/upload_content.go8
4 files changed, 54 insertions, 29 deletions
diff --git a/weed-fs/src/cmd/weed/upload.go b/weed-fs/src/cmd/weed/upload.go
index faed646f1..89e0c34fc 100644
--- a/weed-fs/src/cmd/weed/upload.go
+++ b/weed-fs/src/cmd/weed/upload.go
@@ -44,7 +44,7 @@ func assign(count int) (*AssignResult, error) {
values.Add("replication", *uploadReplication)
jsonBlob, err := util.Post("http://"+*server+"/dir/assign", values)
if *IsDebug {
- fmt.Println("debug", *IsDebug, "assign result :", string(jsonBlob))
+ fmt.Println("assign result :", string(jsonBlob))
}
if err != nil {
return nil, err
@@ -83,7 +83,8 @@ type SubmitResult struct {
func submit(files []string) []SubmitResult {
ret, err := assign(len(files))
if err != nil {
- panic(err)
+ fmt.Println(err)
+ return nil
}
results := make([]SubmitResult, len(files))
for index, file := range files {
diff --git a/weed-fs/src/cmd/weed/volume.go b/weed-fs/src/cmd/weed/volume.go
index fb3e66c09..ebe67b0b9 100644
--- a/weed-fs/src/cmd/weed/volume.go
+++ b/weed-fs/src/cmd/weed/volume.go
@@ -135,13 +135,20 @@ func PostHandler(w http.ResponseWriter, r *http.Request) {
ret := store.Write(volumeId, needle)
if ret > 0 || !store.HasVolume(volumeId) { //send to other replica locations
if r.FormValue("type") != "standard" {
- waitTime, err := strconv.Atoi(r.FormValue("wait"))
- distributedOperation(volumeId, func(location operation.Location) bool {
- operation.Upload("http://"+location.Url+r.URL.Path+"?type=standard", filename, bytes.NewReader(needle.Data))
- return true
- }, err == nil && waitTime > 0)
+ if distributedOperation(volumeId, func(location operation.Location) bool {
+ _, err := operation.Upload("http://"+location.Url+r.URL.Path+"?type=standard", filename, bytes.NewReader(needle.Data))
+ return err == nil
+ }) {
+ w.WriteHeader(http.StatusCreated)
+ } else {
+ ret = 0
+ w.WriteHeader(http.StatusInternalServerError)
+ }
+ } else {
+ w.WriteHeader(http.StatusCreated)
}
- w.WriteHeader(http.StatusCreated)
+ } else {
+ w.WriteHeader(http.StatusInternalServerError)
}
m := make(map[string]uint32)
m["size"] = ret
@@ -179,13 +186,19 @@ func DeleteHandler(w http.ResponseWriter, r *http.Request) {
if ret > 0 || !store.HasVolume(volumeId) { //send to other replica locations
if r.FormValue("type") != "standard" {
- waitTime, err := strconv.Atoi(r.FormValue("wait"))
- distributedOperation(volumeId, func(location operation.Location) bool {
- operation.Delete("http://"+location.Url+r.URL.Path+"?type=standard")
- return true
- }, err == nil && waitTime > 0)
+ if distributedOperation(volumeId, func(location operation.Location) bool {
+ return nil == operation.Delete("http://"+location.Url+r.URL.Path+"?type=standard")
+ }) {
+ w.WriteHeader(http.StatusCreated)
+ } else {
+ ret = 0
+ w.WriteHeader(http.StatusInternalServerError)
+ }
+ } else {
+ w.WriteHeader(http.StatusCreated)
}
- w.WriteHeader(http.StatusCreated)
+ } else {
+ w.WriteHeader(http.StatusInternalServerError)
}
m := make(map[string]uint32)
@@ -215,23 +228,32 @@ func parseURLPath(path string) (vid, fid, ext string) {
type distributedFunction func(location operation.Location) bool
-func distributedOperation(volumeId storage.VolumeId, op distributedFunction, wait bool) {
+func distributedOperation(volumeId storage.VolumeId, op distributedFunction) bool {
if lookupResult, lookupErr := operation.Lookup(*masterNode, volumeId); lookupErr == nil {
- sendFunc := func(background bool) {
- for _, location := range lookupResult.Locations {
- if location.Url != (*ip + ":" + strconv.Itoa(*vport)) {
- if background {
- go op(location)
- } else {
- op(location)
- }
- }
+ length := 0
+ sem := make(chan int, len(lookupResult.Locations))
+ selfUrl := (*ip + ":" + strconv.Itoa(*vport))
+ results := make(chan bool)
+ for _, location := range lookupResult.Locations {
+ if location.Url != selfUrl {
+ sem <- 1
+ length++
+ go func(op distributedFunction, location operation.Location, sem chan int, results chan bool) {
+ ret := op(location)
+ <-sem
+ results <- ret
+ }(op, location, sem, results)
}
}
- sendFunc(wait)
+ ret := true
+ for i := 0; i < length; i++ {
+ ret = ret && <-results
+ }
+ return ret
} else {
log.Println("Failed to lookup for", volumeId, lookupErr.Error())
}
+ return false
}
func runVolume(cmd *Command, args []string) bool {
diff --git a/weed-fs/src/pkg/operation/delete_content.go b/weed-fs/src/pkg/operation/delete_content.go
index 66a61f55c..aeab9c3ac 100644
--- a/weed-fs/src/pkg/operation/delete_content.go
+++ b/weed-fs/src/pkg/operation/delete_content.go
@@ -2,11 +2,13 @@ package operation
import (
"net/http"
+ "log"
)
func Delete(url string) error {
req, err := http.NewRequest("DELETE", url, nil)
if err != nil {
+ log.Println("failing to delete", url)
return err
}
_, err = http.DefaultClient.Do(req)
diff --git a/weed-fs/src/pkg/operation/upload_content.go b/weed-fs/src/pkg/operation/upload_content.go
index ce2e5af68..652cbe71b 100644
--- a/weed-fs/src/pkg/operation/upload_content.go
+++ b/weed-fs/src/pkg/operation/upload_content.go
@@ -6,6 +6,7 @@ import (
_ "fmt"
"io"
"io/ioutil"
+ "log"
"mime/multipart"
"net/http"
)
@@ -23,7 +24,7 @@ func Upload(uploadUrl string, filename string, reader io.Reader) (*UploadResult,
body_writer.Close()
resp, err := http.Post(uploadUrl, content_type, body_buf)
if err != nil {
- println("uploading to", uploadUrl)
+ log.Println("failing to upload to", uploadUrl)
return nil, err
}
defer resp.Body.Close()
@@ -34,9 +35,8 @@ func Upload(uploadUrl string, filename string, reader io.Reader) (*UploadResult,
var ret UploadResult
err = json.Unmarshal(resp_body, &ret)
if err != nil {
- println("upload response to", uploadUrl, resp_body)
- panic(err.Error())
+ log.Println("failing to read upload resonse", uploadUrl, resp_body)
+ return nil, err
}
- //fmt.Println("Uploaded " + strconv.Itoa(ret.Size) + " Bytes to " + uploadUrl)
return &ret, nil
}