diff options
| author | banjiaojuhao <banjiaojuhao@qq.com> | 2022-03-07 15:14:24 +0800 |
|---|---|---|
| committer | banjiaojuhao <banjiaojuhao@qq.com> | 2022-03-08 16:22:55 +0800 |
| commit | d61bea90385a73c9c500a9feb3dac267eb1c10d2 (patch) | |
| tree | 4de1859664078aa0e80082ed119c3292e0bca8e7 | |
| parent | 3aeee3d748712d4c1afa5eacd4f12939f8173afa (diff) | |
| download | seaweedfs-d61bea90385a73c9c500a9feb3dac267eb1c10d2.tar.xz seaweedfs-d61bea90385a73c9c500a9feb3dac267eb1c10d2.zip | |
[bugfix] filer: In file modification, old chunks will be mis-deleted when they are merged(Manifestized).
| -rw-r--r-- | weed/filer/filer_deletion.go | 34 |
1 files changed, 31 insertions, 3 deletions
diff --git a/weed/filer/filer_deletion.go b/weed/filer/filer_deletion.go index e0191d7f1..48986a82a 100644 --- a/weed/filer/filer_deletion.go +++ b/weed/filer/filer_deletion.go @@ -129,6 +129,12 @@ func (f *Filer) DeleteChunks(chunks []*filer_pb.FileChunk) { } } +func (f *Filer) DeleteChunksNotRecursive(chunks []*filer_pb.FileChunk) { + for _, chunk := range chunks { + f.fileIdDeletionQueue.EnQueue(chunk.GetFileIdString()) + } +} + func (f *Filer) deleteChunksIfNotNew(oldEntry, newEntry *Entry) { if oldEntry == nil { @@ -141,14 +147,36 @@ func (f *Filer) deleteChunksIfNotNew(oldEntry, newEntry *Entry) { var toDelete []*filer_pb.FileChunk newChunkIds := make(map[string]bool) - for _, newChunk := range newEntry.Chunks { + newDChunks, newMChunks, err := ResolveChunkManifest(f.MasterClient.GetLookupFileIdFunction(), + newEntry.Chunks, 0, int64(newEntry.Size())) + if err != nil { + glog.Errorf("Failed to resolve new entry chunks when delete old entry chunks. new: %s, old: %s", + newEntry.Chunks, oldEntry.Chunks) + return + } + for _, newChunk := range newDChunks { + newChunkIds[newChunk.GetFileIdString()] = true + } + for _, newChunk := range newMChunks { newChunkIds[newChunk.GetFileIdString()] = true } - for _, oldChunk := range oldEntry.Chunks { + oldDChunks, oldMChunks, err := ResolveChunkManifest(f.MasterClient.GetLookupFileIdFunction(), + oldEntry.Chunks, 0, int64(oldEntry.Size())) + if err != nil { + glog.Errorf("Failed to resolve old entry chunks when delete old entry chunks. new: %s, old: %s", + newEntry.Chunks, oldEntry.Chunks) + return + } + for _, oldChunk := range oldDChunks { + if _, found := newChunkIds[oldChunk.GetFileIdString()]; !found { + toDelete = append(toDelete, oldChunk) + } + } + for _, oldChunk := range oldMChunks { if _, found := newChunkIds[oldChunk.GetFileIdString()]; !found { toDelete = append(toDelete, oldChunk) } } - f.DeleteChunks(toDelete) + f.DeleteChunksNotRecursive(toDelete) } |
