diff options
| author | Chris Lu <chris.lu@gmail.com> | 2013-04-16 00:10:21 -0700 |
|---|---|---|
| committer | Chris Lu <chris.lu@gmail.com> | 2013-04-16 00:10:21 -0700 |
| commit | 915b16f97a38affeaec6ced9860e1d30a903c047 (patch) | |
| tree | ff22485887c052e26f7d380435211ea17789c7af /go/weed/volume.go | |
| parent | e4da140d0a565bda12ac4edc6786765e65343494 (diff) | |
| download | seaweedfs-915b16f97a38affeaec6ced9860e1d30a903c047.tar.xz seaweedfs-915b16f97a38affeaec6ced9860e1d30a903c047.zip | |
refactoring, same logic, but the store replication logic is moved to a
stand-alone file, for later easier improvements
Diffstat (limited to 'go/weed/volume.go')
| -rw-r--r-- | go/weed/volume.go | 81 |
1 files changed, 4 insertions, 77 deletions
diff --git a/go/weed/volume.go b/go/weed/volume.go index 4e26052ce..33121388e 100644 --- a/go/weed/volume.go +++ b/go/weed/volume.go @@ -1,8 +1,8 @@ package main import ( - "bytes" "code.google.com/p/weed-fs/go/operation" + "code.google.com/p/weed-fs/go/replication" "code.google.com/p/weed-fs/go/storage" "log" "math/rand" @@ -187,46 +187,15 @@ func PostHandler(w http.ResponseWriter, r *http.Request) { if e != nil { writeJsonQuiet(w, r, e) } else { - needle, filename, ne := storage.NewNeedle(r) + needle, ne := storage.NewNeedle(r) if ne != nil { writeJsonQuiet(w, r, ne) } else { - ret, err := store.Write(volumeId, needle) - errorStatus := "" - needToReplicate := !store.HasVolume(volumeId) - if err != nil { - errorStatus = "Failed to write to local disk (" + err.Error() + ")" - } else if ret > 0 { - needToReplicate = needToReplicate || store.GetVolume(volumeId).NeedToReplicate() - } else { - errorStatus = "Failed to write to local disk" - } - if !needToReplicate && ret > 0 { - needToReplicate = store.GetVolume(volumeId).NeedToReplicate() - } - if needToReplicate { //send to other replica locations - if r.FormValue("type") != "standard" { - if !distributedOperation(volumeId, func(location operation.Location) bool { - _, err := operation.Upload("http://"+location.Url+r.URL.Path+"?type=standard", filename, bytes.NewReader(needle.Data)) - return err == nil - }) { - ret = 0 - errorStatus = "Failed to write to replicas for volume " + volumeId.String() - } - } - } + ret, errorStatus := replication.ReplicatedWrite(*masterNode, store, volumeId, needle, r) m := make(map[string]interface{}) if errorStatus == "" { w.WriteHeader(http.StatusCreated) } else { - if _, e = store.Delete(volumeId, needle); e != nil { - errorStatus += "\nCannot delete " + strconv.FormatUint(needle.Id, 10) + " from " + - strconv.FormatUint(uint64(volumeId), 10) + ": " + e.Error() - } else { - distributedOperation(volumeId, func(location operation.Location) bool { - return nil == operation.Delete("http://"+location.Url+r.URL.Path+"?type=standard") - }) - } w.WriteHeader(http.StatusInternalServerError) m["error"] = errorStatus } @@ -259,25 +228,7 @@ func DeleteHandler(w http.ResponseWriter, r *http.Request) { } n.Size = 0 - ret, err := store.Delete(volumeId, n) - if err != nil { - log.Println("delete error:", err) - return - } - - needToReplicate := !store.HasVolume(volumeId) - if !needToReplicate && ret > 0 { - needToReplicate = store.GetVolume(volumeId).NeedToReplicate() - } - if needToReplicate { //send to other replica locations - if r.FormValue("type") != "standard" { - if !distributedOperation(volumeId, func(location operation.Location) bool { - return nil == operation.Delete("http://"+location.Url+r.URL.Path+"?type=standard") - }) { - ret = 0 - } - } - } + ret := replication.ReplicatedDelete(*masterNode, store, volumeId, n, r) if ret != 0 { w.WriteHeader(http.StatusAccepted) @@ -311,30 +262,6 @@ func parseURLPath(path string) (vid, fid, ext string) { return } -func distributedOperation(volumeId storage.VolumeId, op func(location operation.Location) bool) bool { - if lookupResult, lookupErr := operation.Lookup(*masterNode, volumeId); lookupErr == nil { - length := 0 - selfUrl := (*ip + ":" + strconv.Itoa(*vport)) - results := make(chan bool) - for _, location := range lookupResult.Locations { - if location.Url != selfUrl { - length++ - go func(location operation.Location, results chan bool) { - results <- op(location) - }(location, results) - } - } - ret := true - for i := 0; i < length; i++ { - ret = ret && <-results - } - return ret - } else { - log.Println("Failed to lookup for", volumeId, lookupErr.Error()) - } - return false -} - func runVolume(cmd *Command, args []string) bool { if *vMaxCpu < 1 { *vMaxCpu = runtime.NumCPU() |
