aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorbanjiaojuhao <banjiaojuhao@qq.com>2022-03-07 15:14:24 +0800
committerbanjiaojuhao <banjiaojuhao@qq.com>2022-03-08 16:22:55 +0800
commitd61bea90385a73c9c500a9feb3dac267eb1c10d2 (patch)
tree4de1859664078aa0e80082ed119c3292e0bca8e7
parent3aeee3d748712d4c1afa5eacd4f12939f8173afa (diff)
downloadseaweedfs-d61bea90385a73c9c500a9feb3dac267eb1c10d2.tar.xz
seaweedfs-d61bea90385a73c9c500a9feb3dac267eb1c10d2.zip
[bugfix] filer: In file modification, old chunks will be mis-deleted when they are merged(Manifestized).
-rw-r--r--weed/filer/filer_deletion.go34
1 files changed, 31 insertions, 3 deletions
diff --git a/weed/filer/filer_deletion.go b/weed/filer/filer_deletion.go
index e0191d7f1..48986a82a 100644
--- a/weed/filer/filer_deletion.go
+++ b/weed/filer/filer_deletion.go
@@ -129,6 +129,12 @@ func (f *Filer) DeleteChunks(chunks []*filer_pb.FileChunk) {
}
}
+func (f *Filer) DeleteChunksNotRecursive(chunks []*filer_pb.FileChunk) {
+ for _, chunk := range chunks {
+ f.fileIdDeletionQueue.EnQueue(chunk.GetFileIdString())
+ }
+}
+
func (f *Filer) deleteChunksIfNotNew(oldEntry, newEntry *Entry) {
if oldEntry == nil {
@@ -141,14 +147,36 @@ func (f *Filer) deleteChunksIfNotNew(oldEntry, newEntry *Entry) {
var toDelete []*filer_pb.FileChunk
newChunkIds := make(map[string]bool)
- for _, newChunk := range newEntry.Chunks {
+ newDChunks, newMChunks, err := ResolveChunkManifest(f.MasterClient.GetLookupFileIdFunction(),
+ newEntry.Chunks, 0, int64(newEntry.Size()))
+ if err != nil {
+ glog.Errorf("Failed to resolve new entry chunks when delete old entry chunks. new: %s, old: %s",
+ newEntry.Chunks, oldEntry.Chunks)
+ return
+ }
+ for _, newChunk := range newDChunks {
+ newChunkIds[newChunk.GetFileIdString()] = true
+ }
+ for _, newChunk := range newMChunks {
newChunkIds[newChunk.GetFileIdString()] = true
}
- for _, oldChunk := range oldEntry.Chunks {
+ oldDChunks, oldMChunks, err := ResolveChunkManifest(f.MasterClient.GetLookupFileIdFunction(),
+ oldEntry.Chunks, 0, int64(oldEntry.Size()))
+ if err != nil {
+ glog.Errorf("Failed to resolve old entry chunks when delete old entry chunks. new: %s, old: %s",
+ newEntry.Chunks, oldEntry.Chunks)
+ return
+ }
+ for _, oldChunk := range oldDChunks {
+ if _, found := newChunkIds[oldChunk.GetFileIdString()]; !found {
+ toDelete = append(toDelete, oldChunk)
+ }
+ }
+ for _, oldChunk := range oldMChunks {
if _, found := newChunkIds[oldChunk.GetFileIdString()]; !found {
toDelete = append(toDelete, oldChunk)
}
}
- f.DeleteChunks(toDelete)
+ f.DeleteChunksNotRecursive(toDelete)
}