aboutsummaryrefslogtreecommitdiff
path: root/weed/replication
diff options
context:
space:
mode:
authorChris Lu <chris.lu@gmail.com>2019-06-22 20:04:56 -0700
committerChris Lu <chris.lu@gmail.com>2019-06-22 20:04:56 -0700
commit3fa1f150d9644056344b8ca822254de8fcbd203b (patch)
treedca95c0f5e17491697d476b1a0ef62fd8fbb2535 /weed/replication
parent1babec00e70fc194fa40951162db907cf0e363cd (diff)
downloadseaweedfs-3fa1f150d9644056344b8ca822254de8fcbd203b.tar.xz
seaweedfs-3fa1f150d9644056344b8ca822254de8fcbd203b.zip
refactoring
Diffstat (limited to 'weed/replication')
-rw-r--r--weed/replication/sink/filersink/fetch_write.go8
-rw-r--r--weed/replication/sink/filersink/filer_sink.go22
2 files changed, 7 insertions, 23 deletions
diff --git a/weed/replication/sink/filersink/fetch_write.go b/weed/replication/sink/filersink/fetch_write.go
index d24770e3d..97e9671a3 100644
--- a/weed/replication/sink/filersink/fetch_write.go
+++ b/weed/replication/sink/filersink/fetch_write.go
@@ -39,7 +39,7 @@ func (fs *FilerSink) replicateOneChunk(ctx context.Context, sourceChunk *filer_p
fileId, err := fs.fetchAndWrite(ctx, sourceChunk)
if err != nil {
- return nil, fmt.Errorf("copy %s: %v", sourceChunk.FileId, err)
+ return nil, fmt.Errorf("copy %s: %v", sourceChunk.GetFileIdString(), err)
}
return &filer_pb.FileChunk{
@@ -48,15 +48,15 @@ func (fs *FilerSink) replicateOneChunk(ctx context.Context, sourceChunk *filer_p
Size: sourceChunk.Size,
Mtime: sourceChunk.Mtime,
ETag: sourceChunk.ETag,
- SourceFileId: sourceChunk.FileId,
+ SourceFileId: sourceChunk.GetFileIdString(),
}, nil
}
func (fs *FilerSink) fetchAndWrite(ctx context.Context, sourceChunk *filer_pb.FileChunk) (fileId string, err error) {
- filename, header, readCloser, err := fs.filerSource.ReadPart(ctx, sourceChunk.FileId)
+ filename, header, readCloser, err := fs.filerSource.ReadPart(ctx, sourceChunk.GetFileIdString())
if err != nil {
- return "", fmt.Errorf("read part %s: %v", sourceChunk.FileId, err)
+ return "", fmt.Errorf("read part %s: %v", sourceChunk.GetFileIdString(), err)
}
defer readCloser.Close()
diff --git a/weed/replication/sink/filersink/filer_sink.go b/weed/replication/sink/filersink/filer_sink.go
index ff0fe8b74..f99c7fdf6 100644
--- a/weed/replication/sink/filersink/filer_sink.go
+++ b/weed/replication/sink/filersink/filer_sink.go
@@ -179,7 +179,7 @@ func (fs *FilerSink) UpdateEntry(ctx context.Context, key string, oldEntry *file
// delete the chunks that are deleted from the source
if deleteIncludeChunks {
// remove the deleted chunks. Actual data deletion happens in filer UpdateEntry FindUnusedFileChunks
- existingEntry.Chunks = minusChunks(existingEntry.Chunks, deletedChunks)
+ existingEntry.Chunks = filer2.MinusChunks(existingEntry.Chunks, deletedChunks)
}
// replicate the chunks that are new in the source
@@ -207,23 +207,7 @@ func (fs *FilerSink) UpdateEntry(ctx context.Context, key string, oldEntry *file
}
func compareChunks(oldEntry, newEntry *filer_pb.Entry) (deletedChunks, newChunks []*filer_pb.FileChunk) {
- deletedChunks = minusChunks(oldEntry.Chunks, newEntry.Chunks)
- newChunks = minusChunks(newEntry.Chunks, oldEntry.Chunks)
- return
-}
-
-func minusChunks(as, bs []*filer_pb.FileChunk) (delta []*filer_pb.FileChunk) {
- for _, a := range as {
- found := false
- for _, b := range bs {
- if a.FileId == b.FileId {
- found = true
- break
- }
- }
- if !found {
- delta = append(delta, a)
- }
- }
+ deletedChunks = filer2.MinusChunks(oldEntry.Chunks, newEntry.Chunks)
+ newChunks = filer2.MinusChunks(newEntry.Chunks, oldEntry.Chunks)
return
}