diff options
| author | hilimd <68371223+hilimd@users.noreply.github.com> | 2020-07-21 14:08:18 +0800 |
|---|---|---|
| committer | GitHub <noreply@github.com> | 2020-07-21 14:08:18 +0800 |
| commit | 6ea4ce722704171fdbddba61f423202a620b6ecf (patch) | |
| tree | b4e3e1deb1133cc7d34757c0981e30dabe196a03 /weed/replication | |
| parent | 5850bb733936399babbe2d77f4b27cac312e2798 (diff) | |
| parent | 885c624bceb61688c806b91350e70d75088c6eea (diff) | |
| download | seaweedfs-6ea4ce722704171fdbddba61f423202a620b6ecf.tar.xz seaweedfs-6ea4ce722704171fdbddba61f423202a620b6ecf.zip | |
Merge pull request #3 from chrislusf/master
sync
Diffstat (limited to 'weed/replication')
| -rw-r--r-- | weed/replication/sink/azuresink/azure_sink.go | 2 | ||||
| -rw-r--r-- | weed/replication/sink/b2sink/b2_sink.go | 2 | ||||
| -rw-r--r-- | weed/replication/sink/filersink/filer_sink.go | 26 | ||||
| -rw-r--r-- | weed/replication/sink/gcssink/gcs_sink.go | 2 | ||||
| -rw-r--r-- | weed/replication/sink/s3sink/s3_sink.go | 2 |
5 files changed, 25 insertions, 9 deletions
diff --git a/weed/replication/sink/azuresink/azure_sink.go b/weed/replication/sink/azuresink/azure_sink.go index aef97c06e..fa229de22 100644 --- a/weed/replication/sink/azuresink/azure_sink.go +++ b/weed/replication/sink/azuresink/azure_sink.go @@ -96,7 +96,7 @@ func (g *AzureSink) CreateEntry(key string, entry *filer_pb.Entry) error { } totalSize := filer2.TotalSize(entry.Chunks) - chunkViews := filer2.ViewFromChunks(entry.Chunks, 0, int64(totalSize)) + chunkViews := filer2.ViewFromChunks(g.filerSource.LookupFileId, entry.Chunks, 0, int64(totalSize)) // Create a URL that references a to-be-created blob in your // Azure Storage account's container. diff --git a/weed/replication/sink/b2sink/b2_sink.go b/weed/replication/sink/b2sink/b2_sink.go index 1e7d82ed4..bf8632827 100644 --- a/weed/replication/sink/b2sink/b2_sink.go +++ b/weed/replication/sink/b2sink/b2_sink.go @@ -85,7 +85,7 @@ func (g *B2Sink) CreateEntry(key string, entry *filer_pb.Entry) error { } totalSize := filer2.TotalSize(entry.Chunks) - chunkViews := filer2.ViewFromChunks(entry.Chunks, 0, int64(totalSize)) + chunkViews := filer2.ViewFromChunks(g.filerSource.LookupFileId, entry.Chunks, 0, int64(totalSize)) bucket, err := g.client.Bucket(context.Background(), g.bucket) if err != nil { diff --git a/weed/replication/sink/filersink/filer_sink.go b/weed/replication/sink/filersink/filer_sink.go index 50721a8f3..6429859b4 100644 --- a/weed/replication/sink/filersink/filer_sink.go +++ b/weed/replication/sink/filersink/filer_sink.go @@ -167,12 +167,15 @@ func (fs *FilerSink) UpdateEntry(key string, oldEntry *filer_pb.Entry, newParent glog.V(0).Infof("already replicated %s", key) } else { // find out what changed - deletedChunks, newChunks := compareChunks(oldEntry, newEntry) + deletedChunks, newChunks, err := compareChunks(filer2.LookupFn(fs), oldEntry, newEntry) + if err != nil { + return true, fmt.Errorf("replicte %s compare chunks error: %v", key, err) + } // delete the chunks that are deleted from the source if deleteIncludeChunks { // remove the deleted chunks. Actual data deletion happens in filer UpdateEntry FindUnusedFileChunks - existingEntry.Chunks = filer2.MinusChunks(existingEntry.Chunks, deletedChunks) + existingEntry.Chunks = filer2.DoMinusChunks(existingEntry.Chunks, deletedChunks) } // replicate the chunks that are new in the source @@ -200,8 +203,21 @@ func (fs *FilerSink) UpdateEntry(key string, oldEntry *filer_pb.Entry, newParent }) } -func compareChunks(oldEntry, newEntry *filer_pb.Entry) (deletedChunks, newChunks []*filer_pb.FileChunk) { - deletedChunks = filer2.MinusChunks(oldEntry.Chunks, newEntry.Chunks) - newChunks = filer2.MinusChunks(newEntry.Chunks, oldEntry.Chunks) +func compareChunks(lookupFileIdFn filer2.LookupFileIdFunctionType, oldEntry, newEntry *filer_pb.Entry) (deletedChunks, newChunks []*filer_pb.FileChunk, err error) { + aData, aMeta, aErr := filer2.ResolveChunkManifest(lookupFileIdFn, oldEntry.Chunks) + if aErr != nil { + return nil, nil, aErr + } + bData, bMeta, bErr := filer2.ResolveChunkManifest(lookupFileIdFn, newEntry.Chunks) + if bErr != nil { + return nil, nil, bErr + } + + deletedChunks = append(deletedChunks, filer2.DoMinusChunks(aData, bData)...) + deletedChunks = append(deletedChunks, filer2.DoMinusChunks(aMeta, bMeta)...) + + newChunks = append(newChunks, filer2.DoMinusChunks(bData, aData)...) + newChunks = append(newChunks, filer2.DoMinusChunks(bMeta, aMeta)...) + return } diff --git a/weed/replication/sink/gcssink/gcs_sink.go b/weed/replication/sink/gcssink/gcs_sink.go index bb5a54272..4b58160db 100644 --- a/weed/replication/sink/gcssink/gcs_sink.go +++ b/weed/replication/sink/gcssink/gcs_sink.go @@ -90,7 +90,7 @@ func (g *GcsSink) CreateEntry(key string, entry *filer_pb.Entry) error { } totalSize := filer2.TotalSize(entry.Chunks) - chunkViews := filer2.ViewFromChunks(entry.Chunks, 0, int64(totalSize)) + chunkViews := filer2.ViewFromChunks(g.filerSource.LookupFileId, entry.Chunks, 0, int64(totalSize)) wc := g.client.Bucket(g.bucket).Object(key).NewWriter(context.Background()) diff --git a/weed/replication/sink/s3sink/s3_sink.go b/weed/replication/sink/s3sink/s3_sink.go index d7af105b8..625cf406c 100644 --- a/weed/replication/sink/s3sink/s3_sink.go +++ b/weed/replication/sink/s3sink/s3_sink.go @@ -108,7 +108,7 @@ func (s3sink *S3Sink) CreateEntry(key string, entry *filer_pb.Entry) error { } totalSize := filer2.TotalSize(entry.Chunks) - chunkViews := filer2.ViewFromChunks(entry.Chunks, 0, int64(totalSize)) + chunkViews := filer2.ViewFromChunks(s3sink.filerSource.LookupFileId, entry.Chunks, 0, int64(totalSize)) parts := make([]*s3.CompletedPart, len(chunkViews)) |
