diff options
| author | Chris Lu <chris.lu@gmail.com> | 2014-04-13 01:29:52 -0700 |
|---|---|---|
| committer | Chris Lu <chris.lu@gmail.com> | 2014-04-13 01:29:52 -0700 |
| commit | f7f582ec8698dc43f1a2289dbd06fe0cade7468f (patch) | |
| tree | 1b788ffd9b33ef6807e6aaea3bc24b08cbf10fa8 /go/replication/store_replicate.go | |
| parent | 008aee0dc1932f75c86e52893044d9cd953ef405 (diff) | |
| download | seaweedfs-f7f582ec8698dc43f1a2289dbd06fe0cade7468f.tar.xz seaweedfs-f7f582ec8698dc43f1a2289dbd06fe0cade7468f.zip | |
1. refactoring, merge "replication" logic into "topology" package
2. when growing volumes, additional preferred "rack" and "dataNode"
paraemters are also provided. Previously only "dataCenter" paraemter is
provided.
Diffstat (limited to 'go/replication/store_replicate.go')
| -rw-r--r-- | go/replication/store_replicate.go | 96 |
1 files changed, 0 insertions, 96 deletions
diff --git a/go/replication/store_replicate.go b/go/replication/store_replicate.go deleted file mode 100644 index 249e7e3e6..000000000 --- a/go/replication/store_replicate.go +++ /dev/null @@ -1,96 +0,0 @@ -package replication - -import ( - "bytes" - "code.google.com/p/weed-fs/go/glog" - "code.google.com/p/weed-fs/go/operation" - "code.google.com/p/weed-fs/go/storage" - "code.google.com/p/weed-fs/go/util" - "net/http" - "strconv" -) - -func ReplicatedWrite(masterNode string, s *storage.Store, volumeId storage.VolumeId, needle *storage.Needle, r *http.Request) (size uint32, errorStatus string) { - ret, err := s.Write(volumeId, needle) - needToReplicate := !s.HasVolume(volumeId) - if err != nil { - errorStatus = "Failed to write to local disk (" + err.Error() + ")" - } else if ret > 0 { - needToReplicate = needToReplicate || s.GetVolume(volumeId).NeedToReplicate() - } else { - errorStatus = "Failed to write to local disk" - } - if !needToReplicate && ret > 0 { - needToReplicate = s.GetVolume(volumeId).NeedToReplicate() - } - if needToReplicate { //send to other replica locations - if r.FormValue("type") != "replicate" { - if !distributedOperation(masterNode, s, volumeId, func(location operation.Location) bool { - _, err := operation.Upload("http://"+location.Url+r.URL.Path+"?type=replicate&ts="+strconv.FormatUint(needle.LastModified, 10), string(needle.Name), bytes.NewReader(needle.Data), needle.IsGzipped(), string(needle.Mime)) - return err == nil - }) { - ret = 0 - errorStatus = "Failed to write to replicas for volume " + volumeId.String() - } - } - } - if errorStatus != "" { - if _, err = s.Delete(volumeId, needle); err != nil { - errorStatus += "\nCannot delete " + strconv.FormatUint(needle.Id, 10) + " from " + - volumeId.String() + ": " + err.Error() - } else { - distributedOperation(masterNode, s, volumeId, func(location operation.Location) bool { - return nil == util.Delete("http://"+location.Url+r.URL.Path+"?type=replicate") - }) - } - } - size = ret - return -} - -func ReplicatedDelete(masterNode string, store *storage.Store, volumeId storage.VolumeId, n *storage.Needle, r *http.Request) (ret uint32) { - ret, err := store.Delete(volumeId, n) - if err != nil { - glog.V(0).Infoln("delete error:", err) - return - } - - needToReplicate := !store.HasVolume(volumeId) - if !needToReplicate && ret > 0 { - needToReplicate = store.GetVolume(volumeId).NeedToReplicate() - } - if needToReplicate { //send to other replica locations - if r.FormValue("type") != "replicate" { - if !distributedOperation(masterNode, store, volumeId, func(location operation.Location) bool { - return nil == util.Delete("http://"+location.Url+r.URL.Path+"?type=replicate") - }) { - ret = 0 - } - } - } - return -} - -func distributedOperation(masterNode string, store *storage.Store, volumeId storage.VolumeId, op func(location operation.Location) bool) bool { - if lookupResult, lookupErr := operation.Lookup(masterNode, volumeId.String()); lookupErr == nil { - length := 0 - selfUrl := (store.Ip + ":" + strconv.Itoa(store.Port)) - results := make(chan bool) - for _, location := range lookupResult.Locations { - if location.Url != selfUrl { - length++ - go func(location operation.Location, results chan bool) { - results <- op(location) - }(location, results) - } - } - ret := true - for i := 0; i < length; i++ { - ret = ret && <-results - } - return ret - } else { - glog.V(0).Infoln("Failed to lookup for", volumeId, lookupErr.Error()) - } - return false -} |
