diff options
Diffstat (limited to 'weed/topology/store_replicate.go')
| -rw-r--r-- | weed/topology/store_replicate.go | 109 |
1 files changed, 105 insertions, 4 deletions
diff --git a/weed/topology/store_replicate.go b/weed/topology/store_replicate.go index b195b48ed..78a68d7c8 100644 --- a/weed/topology/store_replicate.go +++ b/weed/topology/store_replicate.go @@ -10,6 +10,8 @@ import ( "strconv" "strings" + "github.com/valyala/fasthttp" + "github.com/chrislusf/seaweedfs/weed/glog" "github.com/chrislusf/seaweedfs/weed/operation" "github.com/chrislusf/seaweedfs/weed/security" @@ -18,12 +20,12 @@ import ( "github.com/chrislusf/seaweedfs/weed/util" ) -func ReplicatedWrite(masterNode string, s *storage.Store, +func OldReplicatedWrite(masterNode string, s *storage.Store, volumeId needle.VolumeId, n *needle.Needle, r *http.Request) (size uint32, isUnchanged bool, err error) { //check JWT - jwt := security.GetJwt(r) + jwt := security.OldGetJwt(r) var remoteLocations []operation.Location if r.FormValue("type") != "replicate" { @@ -85,12 +87,79 @@ func ReplicatedWrite(masterNode string, s *storage.Store, return } -func ReplicatedDelete(masterNode string, store *storage.Store, +func ReplicatedWrite(masterNode string, s *storage.Store, + volumeId needle.VolumeId, n *needle.Needle, + ctx *fasthttp.RequestCtx) (size uint32, isUnchanged bool, err error) { + + //check JWT + jwt := security.GetJwt(ctx) + + var remoteLocations []operation.Location + if string(ctx.FormValue("type")) != "replicate" { + remoteLocations, err = getWritableRemoteReplications(s, volumeId, masterNode) + if err != nil { + glog.V(0).Infoln(err) + return + } + } + + size, isUnchanged, err = s.WriteVolumeNeedle(volumeId, n) + if err != nil { + err = fmt.Errorf("failed to write to local disk: %v", err) + glog.V(0).Infoln(err) + return + } + + if len(remoteLocations) > 0 { //send to other replica locations + if err = distributedOperation(remoteLocations, s, func(location operation.Location) error { + u := url.URL{ + Scheme: "http", + Host: location.Url, + Path: string(ctx.Path()), + } + q := url.Values{ + "type": {"replicate"}, + "ttl": {n.Ttl.String()}, + } + if n.LastModified > 0 { + q.Set("ts", strconv.FormatUint(n.LastModified, 10)) + } + if n.IsChunkedManifest() { + q.Set("cm", "true") + } + u.RawQuery = q.Encode() + + pairMap := make(map[string]string) + if n.HasPairs() { + tmpMap := make(map[string]string) + err := json.Unmarshal(n.Pairs, &tmpMap) + if err != nil { + glog.V(0).Infoln("Unmarshal pairs error:", err) + } + for k, v := range tmpMap { + pairMap[needle.PairNamePrefix+k] = v + } + } + + _, err := operation.Upload(u.String(), + string(n.Name), bytes.NewReader(n.Data), n.IsGzipped(), string(n.Mime), + pairMap, jwt) + return err + }); err != nil { + size = 0 + err = fmt.Errorf("failed to write to replicas for volume %d: %v", volumeId, err) + glog.V(0).Infoln(err) + } + } + return +} + +func OldReplicatedDelete(masterNode string, store *storage.Store, volumeId needle.VolumeId, n *needle.Needle, r *http.Request) (size uint32, err error) { //check JWT - jwt := security.GetJwt(r) + jwt := security.OldGetJwt(r) var remoteLocations []operation.Location if r.FormValue("type") != "replicate" { @@ -117,6 +186,38 @@ func ReplicatedDelete(masterNode string, store *storage.Store, return } +func ReplicatedDelete(masterNode string, store *storage.Store, + volumeId needle.VolumeId, n *needle.Needle, + ctx *fasthttp.RequestCtx) (size uint32, err error) { + + //check JWT + jwt := security.GetJwt(ctx) + + var remoteLocations []operation.Location + if string(ctx.FormValue("type")) != "replicate" { + remoteLocations, err = getWritableRemoteReplications(store, volumeId, masterNode) + if err != nil { + glog.V(0).Infoln(err) + return + } + } + + size, err = store.DeleteVolumeNeedle(volumeId, n) + if err != nil { + glog.V(0).Infoln("delete error:", err) + return + } + + if len(remoteLocations) > 0 { //send to other replica locations + if err = distributedOperation(remoteLocations, store, func(location operation.Location) error { + return util.Delete("http://"+location.Url+string(ctx.Path())+"?type=replicate", string(jwt)) + }); err != nil { + size = 0 + } + } + return +} + type DistributedOperationResult map[string]error func (dr DistributedOperationResult) Error() error { |
