aboutsummaryrefslogtreecommitdiff
path: root/weed/topology/store_replicate.go
diff options
context:
space:
mode:
Diffstat (limited to 'weed/topology/store_replicate.go')
-rw-r--r--weed/topology/store_replicate.go182
1 files changed, 99 insertions, 83 deletions
diff --git a/weed/topology/store_replicate.go b/weed/topology/store_replicate.go
index d21c4d210..8c4996d45 100644
--- a/weed/topology/store_replicate.go
+++ b/weed/topology/store_replicate.go
@@ -1,7 +1,6 @@
package topology
import (
- "bytes"
"encoding/json"
"errors"
"fmt"
@@ -25,58 +24,60 @@ func ReplicatedWrite(masterNode string, s *storage.Store,
//check JWT
jwt := security.GetJwt(r)
+ var remoteLocations []operation.Location
+ if r.FormValue("type") != "replicate" {
+ remoteLocations, err = getWritableRemoteReplications(s, volumeId, masterNode)
+ if err != nil {
+ glog.V(0).Infoln(err)
+ return
+ }
+ }
+
size, isUnchanged, err = s.WriteVolumeNeedle(volumeId, n)
if err != nil {
err = fmt.Errorf("failed to write to local disk: %v", err)
+ glog.V(0).Infoln(err)
return
}
- needToReplicate := !s.HasVolume(volumeId)
- needToReplicate = needToReplicate || s.GetVolume(volumeId).NeedToReplicate()
- if !needToReplicate {
- needToReplicate = s.GetVolume(volumeId).NeedToReplicate()
- }
- if needToReplicate { //send to other replica locations
- if r.FormValue("type") != "replicate" {
-
- if err = distributedOperation(masterNode, s, volumeId, func(location operation.Location) error {
- u := url.URL{
- Scheme: "http",
- Host: location.Url,
- Path: r.URL.Path,
- }
- q := url.Values{
- "type": {"replicate"},
- "ttl": {n.Ttl.String()},
- }
- if n.LastModified > 0 {
- q.Set("ts", strconv.FormatUint(n.LastModified, 10))
- }
- if n.IsChunkedManifest() {
- q.Set("cm", "true")
+ if len(remoteLocations) > 0 { //send to other replica locations
+ if err = distributedOperation(remoteLocations, s, func(location operation.Location) error {
+ u := url.URL{
+ Scheme: "http",
+ Host: location.Url,
+ Path: r.URL.Path,
+ }
+ q := url.Values{
+ "type": {"replicate"},
+ "ttl": {n.Ttl.String()},
+ }
+ if n.LastModified > 0 {
+ q.Set("ts", strconv.FormatUint(n.LastModified, 10))
+ }
+ if n.IsChunkedManifest() {
+ q.Set("cm", "true")
+ }
+ u.RawQuery = q.Encode()
+
+ pairMap := make(map[string]string)
+ if n.HasPairs() {
+ tmpMap := make(map[string]string)
+ err := json.Unmarshal(n.Pairs, &tmpMap)
+ if err != nil {
+ glog.V(0).Infoln("Unmarshal pairs error:", err)
}
- u.RawQuery = q.Encode()
-
- pairMap := make(map[string]string)
- if n.HasPairs() {
- tmpMap := make(map[string]string)
- err := json.Unmarshal(n.Pairs, &tmpMap)
- if err != nil {
- glog.V(0).Infoln("Unmarshal pairs error:", err)
- }
- for k, v := range tmpMap {
- pairMap[needle.PairNamePrefix+k] = v
- }
+ for k, v := range tmpMap {
+ pairMap[needle.PairNamePrefix+k] = v
}
-
- _, err := operation.Upload(u.String(),
- string(n.Name), bytes.NewReader(n.Data), n.IsGzipped(), string(n.Mime),
- pairMap, jwt)
- return err
- }); err != nil {
- size = 0
- err = fmt.Errorf("failed to write to replicas for volume %d: %v", volumeId, err)
}
+
+ // volume server do not know about encryption
+ _, err := operation.UploadData(u.String(), string(n.Name), false, n.Data, n.IsGzipped(), string(n.Mime), pairMap, jwt)
+ return err
+ }); err != nil {
+ size = 0
+ err = fmt.Errorf("failed to write to replicas for volume %d: %v", volumeId, err)
+ glog.V(0).Infoln(err)
}
}
return
@@ -84,31 +85,34 @@ func ReplicatedWrite(masterNode string, s *storage.Store,
func ReplicatedDelete(masterNode string, store *storage.Store,
volumeId needle.VolumeId, n *needle.Needle,
- r *http.Request) (uint32, error) {
+ r *http.Request) (size uint32, err error) {
//check JWT
jwt := security.GetJwt(r)
- ret, err := store.DeleteVolumeNeedle(volumeId, n)
+ var remoteLocations []operation.Location
+ if r.FormValue("type") != "replicate" {
+ remoteLocations, err = getWritableRemoteReplications(store, volumeId, masterNode)
+ if err != nil {
+ glog.V(0).Infoln(err)
+ return
+ }
+ }
+
+ size, err = store.DeleteVolumeNeedle(volumeId, n)
if err != nil {
glog.V(0).Infoln("delete error:", err)
- return ret, err
+ return
}
- needToReplicate := !store.HasVolume(volumeId)
- if !needToReplicate && ret > 0 {
- needToReplicate = store.GetVolume(volumeId).NeedToReplicate()
- }
- if needToReplicate { //send to other replica locations
- if r.FormValue("type") != "replicate" {
- if err = distributedOperation(masterNode, store, volumeId, func(location operation.Location) error {
- return util.Delete("http://"+location.Url+r.URL.Path+"?type=replicate", string(jwt))
- }); err != nil {
- ret = 0
- }
+ if len(remoteLocations) > 0 { //send to other replica locations
+ if err = distributedOperation(remoteLocations, store, func(location operation.Location) error {
+ return util.Delete("http://"+location.Url+r.URL.Path+"?type=replicate", string(jwt))
+ }); err != nil {
+ size = 0
}
}
- return ret, err
+ return
}
type DistributedOperationResult map[string]error
@@ -131,32 +135,44 @@ type RemoteResult struct {
Error error
}
-func distributedOperation(masterNode string, store *storage.Store, volumeId needle.VolumeId, op func(location operation.Location) error) error {
- if lookupResult, lookupErr := operation.Lookup(masterNode, volumeId.String()); lookupErr == nil {
- length := 0
- selfUrl := (store.Ip + ":" + strconv.Itoa(store.Port))
- results := make(chan RemoteResult)
- for _, location := range lookupResult.Locations {
- if location.Url != selfUrl {
- length++
- go func(location operation.Location, results chan RemoteResult) {
- results <- RemoteResult{location.Url, op(location)}
- }(location, results)
+func distributedOperation(locations []operation.Location, store *storage.Store, op func(location operation.Location) error) error {
+ length := len(locations)
+ results := make(chan RemoteResult)
+ for _, location := range locations {
+ go func(location operation.Location, results chan RemoteResult) {
+ results <- RemoteResult{location.Url, op(location)}
+ }(location, results)
+ }
+ ret := DistributedOperationResult(make(map[string]error))
+ for i := 0; i < length; i++ {
+ result := <-results
+ ret[result.Host] = result.Error
+ }
+
+ return ret.Error()
+}
+
+func getWritableRemoteReplications(s *storage.Store, volumeId needle.VolumeId, masterNode string) (
+ remoteLocations []operation.Location, err error) {
+ copyCount := s.GetVolume(volumeId).ReplicaPlacement.GetCopyCount()
+ if copyCount > 1 {
+ if lookupResult, lookupErr := operation.Lookup(masterNode, volumeId.String()); lookupErr == nil {
+ if len(lookupResult.Locations) < copyCount {
+ err = fmt.Errorf("replicating opetations [%d] is less than volume %d replication copy count [%d]",
+ len(lookupResult.Locations), volumeId, copyCount)
+ return
}
- }
- ret := DistributedOperationResult(make(map[string]error))
- for i := 0; i < length; i++ {
- result := <-results
- ret[result.Host] = result.Error
- }
- if volume := store.GetVolume(volumeId); volume != nil {
- if length+1 < volume.ReplicaPlacement.GetCopyCount() {
- return fmt.Errorf("replicating opetations [%d] is less than volume's replication copy count [%d]", length+1, volume.ReplicaPlacement.GetCopyCount())
+ selfUrl := s.Ip + ":" + strconv.Itoa(s.Port)
+ for _, location := range lookupResult.Locations {
+ if location.Url != selfUrl {
+ remoteLocations = append(remoteLocations, location)
+ }
}
+ } else {
+ err = fmt.Errorf("failed to lookup for %d: %v", volumeId, lookupErr)
+ return
}
- return ret.Error()
- } else {
- glog.V(0).Infoln()
- return fmt.Errorf("Failed to lookup for %d: %v", volumeId, lookupErr)
}
+
+ return
}