diff options
Diffstat (limited to 'weed/replication/replicator.go')
| -rw-r--r-- | weed/replication/replicator.go | 29 |
1 files changed, 11 insertions, 18 deletions
diff --git a/weed/replication/replicator.go b/weed/replication/replicator.go index 834da6217..d5a57ecac 100644 --- a/weed/replication/replicator.go +++ b/weed/replication/replicator.go @@ -6,8 +6,7 @@ import ( "github.com/chrislusf/seaweedfs/weed/pb/filer_pb" "github.com/chrislusf/seaweedfs/weed/replication/sink" - "github.com/chrislusf/seaweedfs/weed/replication/sink/filersink" - "github.com/chrislusf/seaweedfs/weed/replication/source" + "github.com/chrislusf/seaweedfs/weed/replication/source" "github.com/chrislusf/seaweedfs/weed/util" "github.com/chrislusf/seaweedfs/weed/glog" ) @@ -17,18 +16,15 @@ type Replicator struct { source *source.FilerSource } -func NewReplicator(sourceConfig, sinkConfig util.Configuration) *Replicator { - - sink := &filersink.FilerSink{} - sink.Initialize(sinkConfig) +func NewReplicator(sourceConfig util.Configuration, dataSink sink.ReplicationSink) *Replicator { source := &source.FilerSource{} source.Initialize(sourceConfig) - sink.SetSourceFiler(source) + dataSink.SetSourceFiler(source) return &Replicator{ - sink: sink, + sink: dataSink, source: source, } } @@ -39,23 +35,20 @@ func (r *Replicator) Replicate(key string, message *filer_pb.EventNotification) } key = filepath.Join(r.sink.GetSinkToDirectory(), key[len(r.source.Dir):]) if message.OldEntry != nil && message.NewEntry == nil { - return r.sink.DeleteEntry(key, message.OldEntry, message.DeleteChunks) + return r.sink.DeleteEntry(key, message.OldEntry.IsDirectory, message.DeleteChunks) } if message.OldEntry == nil && message.NewEntry != nil { return r.sink.CreateEntry(key, message.NewEntry) } - if existingEntry, err := r.sink.LookupEntry(key); err == nil { - if message.OldEntry == nil && message.NewEntry == nil { - glog.V(0).Infof("message %+v existingEntry: %+v", message, existingEntry) - return r.sink.DeleteEntry(key, existingEntry, true) - } - return r.sink.UpdateEntry(key, message.OldEntry, message.NewEntry, existingEntry, message.DeleteChunks) - } - - glog.V(0).Infof("key:%s, message %+v", key, message) if message.OldEntry == nil && message.NewEntry == nil { + glog.V(0).Infof("weird message %+v", message) return nil } + foundExisting, err := r.sink.UpdateEntry(key, message.OldEntry, message.NewEntry, message.DeleteChunks) + if foundExisting { + return err + } + return r.sink.CreateEntry(key, message.NewEntry) } |
