diff options
| author | Chris Lu <chris.lu@gmail.com> | 2019-04-16 00:44:31 -0700 |
|---|---|---|
| committer | Chris Lu <chris.lu@gmail.com> | 2019-04-16 00:44:31 -0700 |
| commit | b3b42bc947ec44acdc69efdeebb623a0c092078a (patch) | |
| tree | 9fecee1d6f5bced1e7e1436a331e8c8973db8a78 /weed/replication | |
| parent | 967e108b9a33d4e3cdd59dacd3356c070ed312a9 (diff) | |
| download | seaweedfs-b3b42bc947ec44acdc69efdeebb623a0c092078a.tar.xz seaweedfs-b3b42bc947ec44acdc69efdeebb623a0c092078a.zip | |
replicate need to include new entry path
Diffstat (limited to 'weed/replication')
| -rw-r--r-- | weed/replication/replicator.go | 8 | ||||
| -rw-r--r-- | weed/replication/sink/azuresink/azure_sink.go | 2 | ||||
| -rw-r--r-- | weed/replication/sink/b2sink/b2_sink.go | 2 | ||||
| -rw-r--r-- | weed/replication/sink/filersink/filer_sink.go | 4 | ||||
| -rw-r--r-- | weed/replication/sink/gcssink/gcs_sink.go | 2 | ||||
| -rw-r--r-- | weed/replication/sink/replication_sink.go | 2 | ||||
| -rw-r--r-- | weed/replication/sink/s3sink/s3_sink.go | 2 |
7 files changed, 14 insertions, 8 deletions
diff --git a/weed/replication/replicator.go b/weed/replication/replicator.go index 20c1d08b5..7353cdc91 100644 --- a/weed/replication/replicator.go +++ b/weed/replication/replicator.go @@ -2,6 +2,7 @@ package replication import ( "context" + "fmt" "path/filepath" "strings" @@ -51,12 +52,17 @@ func (r *Replicator) Replicate(ctx context.Context, key string, message *filer_p return nil } - foundExisting, err := r.sink.UpdateEntry(ctx, key, message.OldEntry, message.NewEntry, message.DeleteChunks) + foundExisting, err := r.sink.UpdateEntry(ctx, key, message.OldEntry, message.NewParentPath, message.NewEntry, message.DeleteChunks) if foundExisting { glog.V(4).Infof("updated %v", key) return err } + err = r.sink.DeleteEntry(ctx, key, message.OldEntry.IsDirectory, false) + if err != nil { + return fmt.Errorf("delete old entry %v: %v", key, err) + } + glog.V(4).Infof("creating missing %v", key) return r.sink.CreateEntry(ctx, key, message.NewEntry) } diff --git a/weed/replication/sink/azuresink/azure_sink.go b/weed/replication/sink/azuresink/azure_sink.go index 760fbdbb5..6381908a1 100644 --- a/weed/replication/sink/azuresink/azure_sink.go +++ b/weed/replication/sink/azuresink/azure_sink.go @@ -132,7 +132,7 @@ func (g *AzureSink) CreateEntry(ctx context.Context, key string, entry *filer_pb } -func (g *AzureSink) UpdateEntry(ctx context.Context, key string, oldEntry, newEntry *filer_pb.Entry, deleteIncludeChunks bool) (foundExistingEntry bool, err error) { +func (g *AzureSink) UpdateEntry(ctx context.Context, key string, oldEntry *filer_pb.Entry, newParentPath string, newEntry *filer_pb.Entry, deleteIncludeChunks bool) (foundExistingEntry bool, err error) { key = cleanKey(key) // TODO improve efficiency return false, nil diff --git a/weed/replication/sink/b2sink/b2_sink.go b/weed/replication/sink/b2sink/b2_sink.go index c80bfcc49..35c2230fa 100644 --- a/weed/replication/sink/b2sink/b2_sink.go +++ b/weed/replication/sink/b2sink/b2_sink.go @@ -124,7 +124,7 @@ func (g *B2Sink) CreateEntry(ctx context.Context, key string, entry *filer_pb.En } -func (g *B2Sink) UpdateEntry(ctx context.Context, key string, oldEntry, newEntry *filer_pb.Entry, deleteIncludeChunks bool) (foundExistingEntry bool, err error) { +func (g *B2Sink) UpdateEntry(ctx context.Context, key string, oldEntry *filer_pb.Entry, newParentPath string, newEntry *filer_pb.Entry, deleteIncludeChunks bool) (foundExistingEntry bool, err error) { key = cleanKey(key) diff --git a/weed/replication/sink/filersink/filer_sink.go b/weed/replication/sink/filersink/filer_sink.go index 777c28620..ff0fe8b74 100644 --- a/weed/replication/sink/filersink/filer_sink.go +++ b/weed/replication/sink/filersink/filer_sink.go @@ -133,7 +133,7 @@ func (fs *FilerSink) CreateEntry(ctx context.Context, key string, entry *filer_p }) } -func (fs *FilerSink) UpdateEntry(ctx context.Context, key string, oldEntry, newEntry *filer_pb.Entry, deleteIncludeChunks bool) (foundExistingEntry bool, err error) { +func (fs *FilerSink) UpdateEntry(ctx context.Context, key string, oldEntry *filer_pb.Entry, newParentPath string, newEntry *filer_pb.Entry, deleteIncludeChunks bool) (foundExistingEntry bool, err error) { dir, name := filer2.FullPath(key).DirAndName() @@ -194,7 +194,7 @@ func (fs *FilerSink) UpdateEntry(ctx context.Context, key string, oldEntry, newE return true, fs.withFilerClient(ctx, func(client filer_pb.SeaweedFilerClient) error { request := &filer_pb.UpdateEntryRequest{ - Directory: dir, + Directory: newParentPath, Entry: existingEntry, } diff --git a/weed/replication/sink/gcssink/gcs_sink.go b/weed/replication/sink/gcssink/gcs_sink.go index 6b710a12a..abd7c49b9 100644 --- a/weed/replication/sink/gcssink/gcs_sink.go +++ b/weed/replication/sink/gcssink/gcs_sink.go @@ -119,7 +119,7 @@ func (g *GcsSink) CreateEntry(ctx context.Context, key string, entry *filer_pb.E } -func (g *GcsSink) UpdateEntry(ctx context.Context, key string, oldEntry, newEntry *filer_pb.Entry, deleteIncludeChunks bool) (foundExistingEntry bool, err error) { +func (g *GcsSink) UpdateEntry(ctx context.Context, key string, oldEntry *filer_pb.Entry, newParentPath string, newEntry *filer_pb.Entry, deleteIncludeChunks bool) (foundExistingEntry bool, err error) { // TODO improve efficiency return false, nil } diff --git a/weed/replication/sink/replication_sink.go b/weed/replication/sink/replication_sink.go index 984aebc58..dd54f0005 100644 --- a/weed/replication/sink/replication_sink.go +++ b/weed/replication/sink/replication_sink.go @@ -12,7 +12,7 @@ type ReplicationSink interface { Initialize(configuration util.Configuration) error DeleteEntry(ctx context.Context, key string, isDirectory, deleteIncludeChunks bool) error CreateEntry(ctx context.Context, key string, entry *filer_pb.Entry) error - UpdateEntry(ctx context.Context, key string, oldEntry, newEntry *filer_pb.Entry, deleteIncludeChunks bool) (foundExistingEntry bool, err error) + UpdateEntry(ctx context.Context, key string, oldEntry *filer_pb.Entry, newParentPath string, newEntry *filer_pb.Entry, deleteIncludeChunks bool) (foundExistingEntry bool, err error) GetSinkToDirectory() string SetSourceFiler(s *source.FilerSource) } diff --git a/weed/replication/sink/s3sink/s3_sink.go b/weed/replication/sink/s3sink/s3_sink.go index a5b52095c..d5cad3541 100644 --- a/weed/replication/sink/s3sink/s3_sink.go +++ b/weed/replication/sink/s3sink/s3_sink.go @@ -130,7 +130,7 @@ func (s3sink *S3Sink) CreateEntry(ctx context.Context, key string, entry *filer_ } -func (s3sink *S3Sink) UpdateEntry(ctx context.Context, key string, oldEntry, newEntry *filer_pb.Entry, deleteIncludeChunks bool) (foundExistingEntry bool, err error) { +func (s3sink *S3Sink) UpdateEntry(ctx context.Context, key string, oldEntry *filer_pb.Entry, newParentPath string, newEntry *filer_pb.Entry, deleteIncludeChunks bool) (foundExistingEntry bool, err error) { key = cleanKey(key) // TODO improve efficiency return false, nil |
