aboutsummaryrefslogtreecommitdiff
path: root/weed/replication/replicator.go
diff options
context:
space:
mode:
authorChris Lu <chris.lu@gmail.com>2018-09-17 01:37:24 -0700
committerChris Lu <chris.lu@gmail.com>2018-09-17 01:37:24 -0700
commit779641e9d4d9f843267d0c44f9fc5f2c201462d3 (patch)
tree0090dee4f3f40db8f0470725ff74982119e88216 /weed/replication/replicator.go
parentb7a375f5aab78ef9a04c456ad0c46d9a5652eb88 (diff)
downloadseaweedfs-779641e9d4d9f843267d0c44f9fc5f2c201462d3.tar.xz
seaweedfs-779641e9d4d9f843267d0c44f9fc5f2c201462d3.zip
adjust replicated entry name
Diffstat (limited to 'weed/replication/replicator.go')
-rw-r--r--weed/replication/replicator.go34
1 files changed, 27 insertions, 7 deletions
diff --git a/weed/replication/replicator.go b/weed/replication/replicator.go
index c223021c2..4f5d5203e 100644
--- a/weed/replication/replicator.go
+++ b/weed/replication/replicator.go
@@ -4,28 +4,48 @@ import (
"github.com/chrislusf/seaweedfs/weed/replication/sink"
"github.com/chrislusf/seaweedfs/weed/util"
"github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
+ "github.com/chrislusf/seaweedfs/weed/replication/source"
+ "strings"
+ "github.com/chrislusf/seaweedfs/weed/glog"
)
type Replicator struct {
- sink *sink.FilerSink
+ sink sink.ReplicationSink
+ source *source.FilerSource
}
-func NewReplicator(config util.Configuration) *Replicator {
+func NewReplicator(sourceConfig, sinkConfig util.Configuration) *Replicator {
sink := &sink.FilerSink{}
- sink.Initialize(config)
+ sink.Initialize(sinkConfig)
+
+ source := &source.FilerSource{}
+ source.Initialize(sourceConfig)
+
+ if sourceConfig.GetString("grpcAddress") == sinkConfig.GetString("grpcAddress") {
+ fromDir := sourceConfig.GetString("directory")
+ toDir := sinkConfig.GetString("directory")
+ if strings.HasPrefix(toDir, fromDir) {
+ glog.Fatalf("recursive replication! source directory %s includes the sink directory %s", fromDir, toDir)
+ }
+ }
return &Replicator{
- sink: sink,
+ sink: sink,
+ source: source,
}
}
func (r *Replicator) Replicate(key string, message *filer_pb.EventNotification) error {
+ if !strings.HasPrefix(key, r.source.Dir) {
+ return nil
+ }
+ key = r.sink.GetDirectory() + key[len(r.source.Dir):]
if message.OldEntry != nil && message.NewEntry == nil {
- return r.sink.DeleteEntry(message.OldEntry, message.DeleteChunks)
+ return r.sink.DeleteEntry(key, message.OldEntry, message.DeleteChunks)
}
if message.OldEntry == nil && message.NewEntry != nil {
- return r.sink.CreateEntry(message.NewEntry)
+ return r.sink.CreateEntry(key, message.NewEntry)
}
- return r.sink.UpdateEntry(message.OldEntry, message.NewEntry, message.DeleteChunks)
+ return r.sink.UpdateEntry(key, message.OldEntry, message.NewEntry, message.DeleteChunks)
}