aboutsummaryrefslogtreecommitdiff
path: root/weed/topology/store_replicate.go
diff options
context:
space:
mode:
authorChris Lu <chris.lu@gmail.com>2020-02-10 20:23:04 -0800
committerChris Lu <chris.lu@gmail.com>2020-02-10 20:23:04 -0800
commitac3fc922566c42fb2d1d5603e7b0c167b868fce7 (patch)
tree3451368af70559711e435ba3dbb92c3752ff919c /weed/topology/store_replicate.go
parent29945fad51320deb7c72f57d1c7a84bcc51429da (diff)
downloadseaweedfs-origin/fasthttp.tar.xz
seaweedfs-origin/fasthttp.zip
partially doneorigin/fasthttp
Diffstat (limited to 'weed/topology/store_replicate.go')
-rw-r--r--weed/topology/store_replicate.go109
1 files changed, 105 insertions, 4 deletions
diff --git a/weed/topology/store_replicate.go b/weed/topology/store_replicate.go
index b195b48ed..78a68d7c8 100644
--- a/weed/topology/store_replicate.go
+++ b/weed/topology/store_replicate.go
@@ -10,6 +10,8 @@ import (
"strconv"
"strings"
+ "github.com/valyala/fasthttp"
+
"github.com/chrislusf/seaweedfs/weed/glog"
"github.com/chrislusf/seaweedfs/weed/operation"
"github.com/chrislusf/seaweedfs/weed/security"
@@ -18,12 +20,12 @@ import (
"github.com/chrislusf/seaweedfs/weed/util"
)
-func ReplicatedWrite(masterNode string, s *storage.Store,
+func OldReplicatedWrite(masterNode string, s *storage.Store,
volumeId needle.VolumeId, n *needle.Needle,
r *http.Request) (size uint32, isUnchanged bool, err error) {
//check JWT
- jwt := security.GetJwt(r)
+ jwt := security.OldGetJwt(r)
var remoteLocations []operation.Location
if r.FormValue("type") != "replicate" {
@@ -85,12 +87,79 @@ func ReplicatedWrite(masterNode string, s *storage.Store,
return
}
-func ReplicatedDelete(masterNode string, store *storage.Store,
+func ReplicatedWrite(masterNode string, s *storage.Store,
+ volumeId needle.VolumeId, n *needle.Needle,
+ ctx *fasthttp.RequestCtx) (size uint32, isUnchanged bool, err error) {
+
+ //check JWT
+ jwt := security.GetJwt(ctx)
+
+ var remoteLocations []operation.Location
+ if string(ctx.FormValue("type")) != "replicate" {
+ remoteLocations, err = getWritableRemoteReplications(s, volumeId, masterNode)
+ if err != nil {
+ glog.V(0).Infoln(err)
+ return
+ }
+ }
+
+ size, isUnchanged, err = s.WriteVolumeNeedle(volumeId, n)
+ if err != nil {
+ err = fmt.Errorf("failed to write to local disk: %v", err)
+ glog.V(0).Infoln(err)
+ return
+ }
+
+ if len(remoteLocations) > 0 { //send to other replica locations
+ if err = distributedOperation(remoteLocations, s, func(location operation.Location) error {
+ u := url.URL{
+ Scheme: "http",
+ Host: location.Url,
+ Path: string(ctx.Path()),
+ }
+ q := url.Values{
+ "type": {"replicate"},
+ "ttl": {n.Ttl.String()},
+ }
+ if n.LastModified > 0 {
+ q.Set("ts", strconv.FormatUint(n.LastModified, 10))
+ }
+ if n.IsChunkedManifest() {
+ q.Set("cm", "true")
+ }
+ u.RawQuery = q.Encode()
+
+ pairMap := make(map[string]string)
+ if n.HasPairs() {
+ tmpMap := make(map[string]string)
+ err := json.Unmarshal(n.Pairs, &tmpMap)
+ if err != nil {
+ glog.V(0).Infoln("Unmarshal pairs error:", err)
+ }
+ for k, v := range tmpMap {
+ pairMap[needle.PairNamePrefix+k] = v
+ }
+ }
+
+ _, err := operation.Upload(u.String(),
+ string(n.Name), bytes.NewReader(n.Data), n.IsGzipped(), string(n.Mime),
+ pairMap, jwt)
+ return err
+ }); err != nil {
+ size = 0
+ err = fmt.Errorf("failed to write to replicas for volume %d: %v", volumeId, err)
+ glog.V(0).Infoln(err)
+ }
+ }
+ return
+}
+
+func OldReplicatedDelete(masterNode string, store *storage.Store,
volumeId needle.VolumeId, n *needle.Needle,
r *http.Request) (size uint32, err error) {
//check JWT
- jwt := security.GetJwt(r)
+ jwt := security.OldGetJwt(r)
var remoteLocations []operation.Location
if r.FormValue("type") != "replicate" {
@@ -117,6 +186,38 @@ func ReplicatedDelete(masterNode string, store *storage.Store,
return
}
+func ReplicatedDelete(masterNode string, store *storage.Store,
+ volumeId needle.VolumeId, n *needle.Needle,
+ ctx *fasthttp.RequestCtx) (size uint32, err error) {
+
+ //check JWT
+ jwt := security.GetJwt(ctx)
+
+ var remoteLocations []operation.Location
+ if string(ctx.FormValue("type")) != "replicate" {
+ remoteLocations, err = getWritableRemoteReplications(store, volumeId, masterNode)
+ if err != nil {
+ glog.V(0).Infoln(err)
+ return
+ }
+ }
+
+ size, err = store.DeleteVolumeNeedle(volumeId, n)
+ if err != nil {
+ glog.V(0).Infoln("delete error:", err)
+ return
+ }
+
+ if len(remoteLocations) > 0 { //send to other replica locations
+ if err = distributedOperation(remoteLocations, store, func(location operation.Location) error {
+ return util.Delete("http://"+location.Url+string(ctx.Path())+"?type=replicate", string(jwt))
+ }); err != nil {
+ size = 0
+ }
+ }
+ return
+}
+
type DistributedOperationResult map[string]error
func (dr DistributedOperationResult) Error() error {