diff options
| author | Chris Lu <chris.lu@gmail.com> | 2018-09-17 01:37:24 -0700 |
|---|---|---|
| committer | Chris Lu <chris.lu@gmail.com> | 2018-09-17 01:37:24 -0700 |
| commit | 779641e9d4d9f843267d0c44f9fc5f2c201462d3 (patch) | |
| tree | 0090dee4f3f40db8f0470725ff74982119e88216 /weed/replication/replicator.go | |
| parent | b7a375f5aab78ef9a04c456ad0c46d9a5652eb88 (diff) | |
| download | seaweedfs-779641e9d4d9f843267d0c44f9fc5f2c201462d3.tar.xz seaweedfs-779641e9d4d9f843267d0c44f9fc5f2c201462d3.zip | |
adjust replicated entry name
Diffstat (limited to 'weed/replication/replicator.go')
| -rw-r--r-- | weed/replication/replicator.go | 34 |
1 files changed, 27 insertions, 7 deletions
diff --git a/weed/replication/replicator.go b/weed/replication/replicator.go index c223021c2..4f5d5203e 100644 --- a/weed/replication/replicator.go +++ b/weed/replication/replicator.go @@ -4,28 +4,48 @@ import ( "github.com/chrislusf/seaweedfs/weed/replication/sink" "github.com/chrislusf/seaweedfs/weed/util" "github.com/chrislusf/seaweedfs/weed/pb/filer_pb" + "github.com/chrislusf/seaweedfs/weed/replication/source" + "strings" + "github.com/chrislusf/seaweedfs/weed/glog" ) type Replicator struct { - sink *sink.FilerSink + sink sink.ReplicationSink + source *source.FilerSource } -func NewReplicator(config util.Configuration) *Replicator { +func NewReplicator(sourceConfig, sinkConfig util.Configuration) *Replicator { sink := &sink.FilerSink{} - sink.Initialize(config) + sink.Initialize(sinkConfig) + + source := &source.FilerSource{} + source.Initialize(sourceConfig) + + if sourceConfig.GetString("grpcAddress") == sinkConfig.GetString("grpcAddress") { + fromDir := sourceConfig.GetString("directory") + toDir := sinkConfig.GetString("directory") + if strings.HasPrefix(toDir, fromDir) { + glog.Fatalf("recursive replication! source directory %s includes the sink directory %s", fromDir, toDir) + } + } return &Replicator{ - sink: sink, + sink: sink, + source: source, } } func (r *Replicator) Replicate(key string, message *filer_pb.EventNotification) error { + if !strings.HasPrefix(key, r.source.Dir) { + return nil + } + key = r.sink.GetDirectory() + key[len(r.source.Dir):] if message.OldEntry != nil && message.NewEntry == nil { - return r.sink.DeleteEntry(message.OldEntry, message.DeleteChunks) + return r.sink.DeleteEntry(key, message.OldEntry, message.DeleteChunks) } if message.OldEntry == nil && message.NewEntry != nil { - return r.sink.CreateEntry(message.NewEntry) + return r.sink.CreateEntry(key, message.NewEntry) } - return r.sink.UpdateEntry(message.OldEntry, message.NewEntry, message.DeleteChunks) + return r.sink.UpdateEntry(key, message.OldEntry, message.NewEntry, message.DeleteChunks) } |
