diff options
Diffstat (limited to 'weed/replication')
| -rw-r--r-- | weed/replication/replicator.go | 34 | ||||
| -rw-r--r-- | weed/replication/sink/filer_sink.go | 38 | ||||
| -rw-r--r-- | weed/replication/source/filer_source.go | 9 |
3 files changed, 50 insertions, 31 deletions
diff --git a/weed/replication/replicator.go b/weed/replication/replicator.go index c223021c2..4f5d5203e 100644 --- a/weed/replication/replicator.go +++ b/weed/replication/replicator.go @@ -4,28 +4,48 @@ import ( "github.com/chrislusf/seaweedfs/weed/replication/sink" "github.com/chrislusf/seaweedfs/weed/util" "github.com/chrislusf/seaweedfs/weed/pb/filer_pb" + "github.com/chrislusf/seaweedfs/weed/replication/source" + "strings" + "github.com/chrislusf/seaweedfs/weed/glog" ) type Replicator struct { - sink *sink.FilerSink + sink sink.ReplicationSink + source *source.FilerSource } -func NewReplicator(config util.Configuration) *Replicator { +func NewReplicator(sourceConfig, sinkConfig util.Configuration) *Replicator { sink := &sink.FilerSink{} - sink.Initialize(config) + sink.Initialize(sinkConfig) + + source := &source.FilerSource{} + source.Initialize(sourceConfig) + + if sourceConfig.GetString("grpcAddress") == sinkConfig.GetString("grpcAddress") { + fromDir := sourceConfig.GetString("directory") + toDir := sinkConfig.GetString("directory") + if strings.HasPrefix(toDir, fromDir) { + glog.Fatalf("recursive replication! source directory %s includes the sink directory %s", fromDir, toDir) + } + } return &Replicator{ - sink: sink, + sink: sink, + source: source, } } func (r *Replicator) Replicate(key string, message *filer_pb.EventNotification) error { + if !strings.HasPrefix(key, r.source.Dir) { + return nil + } + key = r.sink.GetDirectory() + key[len(r.source.Dir):] if message.OldEntry != nil && message.NewEntry == nil { - return r.sink.DeleteEntry(message.OldEntry, message.DeleteChunks) + return r.sink.DeleteEntry(key, message.OldEntry, message.DeleteChunks) } if message.OldEntry == nil && message.NewEntry != nil { - return r.sink.CreateEntry(message.NewEntry) + return r.sink.CreateEntry(key, message.NewEntry) } - return r.sink.UpdateEntry(message.OldEntry, message.NewEntry, message.DeleteChunks) + return r.sink.UpdateEntry(key, message.OldEntry, message.NewEntry, message.DeleteChunks) } diff --git a/weed/replication/sink/filer_sink.go b/weed/replication/sink/filer_sink.go index 57ebeddff..387bffb58 100644 --- a/weed/replication/sink/filer_sink.go +++ b/weed/replication/sink/filer_sink.go @@ -12,36 +12,38 @@ import ( ) type ReplicationSink interface { - DeleteEntry(entry *filer_pb.Entry, deleteIncludeChunks bool) error - CreateEntry(entry *filer_pb.Entry) error - UpdateEntry(oldEntry, newEntry *filer_pb.Entry, deleteIncludeChunks bool) error + DeleteEntry(key string, entry *filer_pb.Entry, deleteIncludeChunks bool) error + CreateEntry(key string, entry *filer_pb.Entry) error + UpdateEntry(key string, oldEntry, newEntry *filer_pb.Entry, deleteIncludeChunks bool) error + GetDirectory() string } type FilerSink struct { grpcAddress string - id string dir string } +func (fs *FilerSink) GetDirectory() string { + return fs.dir +} + func (fs *FilerSink) Initialize(configuration util.Configuration) error { return fs.initialize( configuration.GetString("grpcAddress"), - configuration.GetString("id"), configuration.GetString("directory"), ) } -func (fs *FilerSink) initialize(grpcAddress string, id string, dir string) (err error) { +func (fs *FilerSink) initialize(grpcAddress string, dir string) (err error) { fs.grpcAddress = grpcAddress - fs.id = id fs.dir = dir return nil } -func (fs *FilerSink) DeleteEntry(entry *filer_pb.Entry, deleteIncludeChunks bool) error { +func (fs *FilerSink) DeleteEntry(key string, entry *filer_pb.Entry, deleteIncludeChunks bool) error { return fs.withFilerClient(func(client filer_pb.SeaweedFilerClient) error { - dir, name := filer2.FullPath(entry.Name).DirAndName() + dir, name := filer2.FullPath(key).DirAndName() request := &filer_pb.DeleteEntryRequest{ Directory: dir, @@ -53,26 +55,26 @@ func (fs *FilerSink) DeleteEntry(entry *filer_pb.Entry, deleteIncludeChunks bool glog.V(1).Infof("delete entry: %v", request) _, err := client.DeleteEntry(context.Background(), request) if err != nil { - glog.V(0).Infof("delete entry %s: %v", entry.Name, err) - return fmt.Errorf("delete entry %s: %v", entry.Name, err) + glog.V(0).Infof("delete entry %s: %v", key, err) + return fmt.Errorf("delete entry %s: %v", key, err) } return nil }) } -func (fs *FilerSink) CreateEntry(entry *filer_pb.Entry) error { +func (fs *FilerSink) CreateEntry(key string, entry *filer_pb.Entry) error { replicatedChunks, err := replicateChunks(entry.Chunks) if err != nil { - glog.V(0).Infof("replicate entry chunks %s: %v", entry.Name, err) - return fmt.Errorf("replicate entry chunks %s: %v", entry.Name, err) + glog.V(0).Infof("replicate entry chunks %s: %v", key, err) + return fmt.Errorf("replicate entry chunks %s: %v", key, err) } return fs.withFilerClient(func(client filer_pb.SeaweedFilerClient) error { - dir, name := filer2.FullPath(entry.Name).DirAndName() + dir, name := filer2.FullPath(key).DirAndName() request := &filer_pb.CreateEntryRequest{ Directory: dir, @@ -86,15 +88,15 @@ func (fs *FilerSink) CreateEntry(entry *filer_pb.Entry) error { glog.V(1).Infof("create: %v", request) if _, err := client.CreateEntry(context.Background(), request); err != nil { - glog.V(0).Infof("create entry %s: %v", entry.Name, err) - return fmt.Errorf("create entry %s: %v", entry.Name, err) + glog.V(0).Infof("create entry %s: %v", key, err) + return fmt.Errorf("create entry %s: %v", key, err) } return nil }) } -func (fs *FilerSink) UpdateEntry(oldEntry, newEntry *filer_pb.Entry, deleteIncludeChunks bool) error { +func (fs *FilerSink) UpdateEntry(key string, oldEntry, newEntry *filer_pb.Entry, deleteIncludeChunks bool) error { return nil } diff --git a/weed/replication/source/filer_source.go b/weed/replication/source/filer_source.go index 49c623815..c3b575b6d 100644 --- a/weed/replication/source/filer_source.go +++ b/weed/replication/source/filer_source.go @@ -16,22 +16,19 @@ type ReplicationSource interface { type FilerSource struct { grpcAddress string - id string - dir string + Dir string } func (fs *FilerSource) Initialize(configuration util.Configuration) error { return fs.initialize( configuration.GetString("grpcAddress"), - configuration.GetString("id"), configuration.GetString("directory"), ) } -func (fs *FilerSource) initialize(grpcAddress string, id string, dir string) (err error) { +func (fs *FilerSource) initialize(grpcAddress string, dir string) (err error) { fs.grpcAddress = grpcAddress - fs.id = id - fs.dir = dir + fs.Dir = dir return nil } |
