diff options
| author | Chris Lu <chris.lu@gmail.com> | 2020-07-19 17:59:43 -0700 |
|---|---|---|
| committer | Chris Lu <chris.lu@gmail.com> | 2020-07-19 17:59:43 -0700 |
| commit | 97d97f35287a88de392a1a422b3533339d923ae2 (patch) | |
| tree | f69b1c2aea74ffc10a9b1ef031572a0f34decfce /weed/replication | |
| parent | f90d2c93c9e34997a8e76aeefb438ec06b1cd093 (diff) | |
| download | seaweedfs-97d97f35287a88de392a1a422b3533339d923ae2.tar.xz seaweedfs-97d97f35287a88de392a1a422b3533339d923ae2.zip | |
go code can read and write chunk manifest
Diffstat (limited to 'weed/replication')
| -rw-r--r-- | weed/replication/sink/azuresink/azure_sink.go | 2 | ||||
| -rw-r--r-- | weed/replication/sink/b2sink/b2_sink.go | 2 | ||||
| -rw-r--r-- | weed/replication/sink/filersink/filer_sink.go | 26 | ||||
| -rw-r--r-- | weed/replication/sink/gcssink/gcs_sink.go | 2 | ||||
| -rw-r--r-- | weed/replication/sink/s3sink/s3_sink.go | 2 |
5 files changed, 25 insertions, 9 deletions
diff --git a/weed/replication/sink/azuresink/azure_sink.go b/weed/replication/sink/azuresink/azure_sink.go index aef97c06e..fa229de22 100644 --- a/weed/replication/sink/azuresink/azure_sink.go +++ b/weed/replication/sink/azuresink/azure_sink.go @@ -96,7 +96,7 @@ func (g *AzureSink) CreateEntry(key string, entry *filer_pb.Entry) error { } totalSize := filer2.TotalSize(entry.Chunks) - chunkViews := filer2.ViewFromChunks(entry.Chunks, 0, int64(totalSize)) + chunkViews := filer2.ViewFromChunks(g.filerSource.LookupFileId, entry.Chunks, 0, int64(totalSize)) // Create a URL that references a to-be-created blob in your // Azure Storage account's container. diff --git a/weed/replication/sink/b2sink/b2_sink.go b/weed/replication/sink/b2sink/b2_sink.go index 1e7d82ed4..bf8632827 100644 --- a/weed/replication/sink/b2sink/b2_sink.go +++ b/weed/replication/sink/b2sink/b2_sink.go @@ -85,7 +85,7 @@ func (g *B2Sink) CreateEntry(key string, entry *filer_pb.Entry) error { } totalSize := filer2.TotalSize(entry.Chunks) - chunkViews := filer2.ViewFromChunks(entry.Chunks, 0, int64(totalSize)) + chunkViews := filer2.ViewFromChunks(g.filerSource.LookupFileId, entry.Chunks, 0, int64(totalSize)) bucket, err := g.client.Bucket(context.Background(), g.bucket) if err != nil { diff --git a/weed/replication/sink/filersink/filer_sink.go b/weed/replication/sink/filersink/filer_sink.go index 50721a8f3..6429859b4 100644 --- a/weed/replication/sink/filersink/filer_sink.go +++ b/weed/replication/sink/filersink/filer_sink.go @@ -167,12 +167,15 @@ func (fs *FilerSink) UpdateEntry(key string, oldEntry *filer_pb.Entry, newParent glog.V(0).Infof("already replicated %s", key) } else { // find out what changed - deletedChunks, newChunks := compareChunks(oldEntry, newEntry) + deletedChunks, newChunks, err := compareChunks(filer2.LookupFn(fs), oldEntry, newEntry) + if err != nil { + return true, fmt.Errorf("replicte %s compare chunks error: %v", key, err) + } // delete the chunks that are deleted from the source if deleteIncludeChunks { // remove the deleted chunks. Actual data deletion happens in filer UpdateEntry FindUnusedFileChunks - existingEntry.Chunks = filer2.MinusChunks(existingEntry.Chunks, deletedChunks) + existingEntry.Chunks = filer2.DoMinusChunks(existingEntry.Chunks, deletedChunks) } // replicate the chunks that are new in the source @@ -200,8 +203,21 @@ func (fs *FilerSink) UpdateEntry(key string, oldEntry *filer_pb.Entry, newParent }) } -func compareChunks(oldEntry, newEntry *filer_pb.Entry) (deletedChunks, newChunks []*filer_pb.FileChunk) { - deletedChunks = filer2.MinusChunks(oldEntry.Chunks, newEntry.Chunks) - newChunks = filer2.MinusChunks(newEntry.Chunks, oldEntry.Chunks) +func compareChunks(lookupFileIdFn filer2.LookupFileIdFunctionType, oldEntry, newEntry *filer_pb.Entry) (deletedChunks, newChunks []*filer_pb.FileChunk, err error) { + aData, aMeta, aErr := filer2.ResolveChunkManifest(lookupFileIdFn, oldEntry.Chunks) + if aErr != nil { + return nil, nil, aErr + } + bData, bMeta, bErr := filer2.ResolveChunkManifest(lookupFileIdFn, newEntry.Chunks) + if bErr != nil { + return nil, nil, bErr + } + + deletedChunks = append(deletedChunks, filer2.DoMinusChunks(aData, bData)...) + deletedChunks = append(deletedChunks, filer2.DoMinusChunks(aMeta, bMeta)...) + + newChunks = append(newChunks, filer2.DoMinusChunks(bData, aData)...) + newChunks = append(newChunks, filer2.DoMinusChunks(bMeta, aMeta)...) + return } diff --git a/weed/replication/sink/gcssink/gcs_sink.go b/weed/replication/sink/gcssink/gcs_sink.go index bb5a54272..4b58160db 100644 --- a/weed/replication/sink/gcssink/gcs_sink.go +++ b/weed/replication/sink/gcssink/gcs_sink.go @@ -90,7 +90,7 @@ func (g *GcsSink) CreateEntry(key string, entry *filer_pb.Entry) error { } totalSize := filer2.TotalSize(entry.Chunks) - chunkViews := filer2.ViewFromChunks(entry.Chunks, 0, int64(totalSize)) + chunkViews := filer2.ViewFromChunks(g.filerSource.LookupFileId, entry.Chunks, 0, int64(totalSize)) wc := g.client.Bucket(g.bucket).Object(key).NewWriter(context.Background()) diff --git a/weed/replication/sink/s3sink/s3_sink.go b/weed/replication/sink/s3sink/s3_sink.go index d7af105b8..625cf406c 100644 --- a/weed/replication/sink/s3sink/s3_sink.go +++ b/weed/replication/sink/s3sink/s3_sink.go @@ -108,7 +108,7 @@ func (s3sink *S3Sink) CreateEntry(key string, entry *filer_pb.Entry) error { } totalSize := filer2.TotalSize(entry.Chunks) - chunkViews := filer2.ViewFromChunks(entry.Chunks, 0, int64(totalSize)) + chunkViews := filer2.ViewFromChunks(s3sink.filerSource.LookupFileId, entry.Chunks, 0, int64(totalSize)) parts := make([]*s3.CompletedPart, len(chunkViews)) |
