aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorChris Lu <chrislusf@users.noreply.github.com>2022-03-08 00:41:04 -0800
committerGitHub <noreply@github.com>2022-03-08 00:41:04 -0800
commit23d024708d9a22aa0f7e645294c4997cd8dcdb13 (patch)
tree702ab945382f806fd370324d49d7869703873bb2
parent3aeee3d748712d4c1afa5eacd4f12939f8173afa (diff)
parentf7f2a597dd8f56683f8acd3254a37fca5518eae5 (diff)
downloadseaweedfs-23d024708d9a22aa0f7e645294c4997cd8dcdb13.tar.xz
seaweedfs-23d024708d9a22aa0f7e645294c4997cd8dcdb13.zip
Merge pull request #2730 from banjiaojuhao/filer-bugfix_mis-delete-chunks-when-append
[bugfix] filer: chunk mis-deletion
-rw-r--r--weed/filer/filechunk_manifest.go6
-rw-r--r--weed/filer/filer_deletion.go35
-rw-r--r--weed/shell/command_volume_fsck.go12
3 files changed, 41 insertions, 12 deletions
diff --git a/weed/filer/filechunk_manifest.go b/weed/filer/filechunk_manifest.go
index 29860c27a..d036cfd24 100644
--- a/weed/filer/filechunk_manifest.go
+++ b/weed/filer/filechunk_manifest.go
@@ -68,12 +68,12 @@ func ResolveChunkManifest(lookupFileIdFn wdclient.LookupFileIdFunctionType, chun
manifestChunks = append(manifestChunks, chunk)
// recursive
- dchunks, mchunks, subErr := ResolveChunkManifest(lookupFileIdFn, resolvedChunks, startOffset, stopOffset)
+ dataChunks, manifestChunks, subErr := ResolveChunkManifest(lookupFileIdFn, resolvedChunks, startOffset, stopOffset)
if subErr != nil {
return chunks, nil, subErr
}
- dataChunks = append(dataChunks, dchunks...)
- manifestChunks = append(manifestChunks, mchunks...)
+ dataChunks = append(dataChunks, dataChunks...)
+ manifestChunks = append(manifestChunks, manifestChunks...)
}
return
}
diff --git a/weed/filer/filer_deletion.go b/weed/filer/filer_deletion.go
index e0191d7f1..e73f94151 100644
--- a/weed/filer/filer_deletion.go
+++ b/weed/filer/filer_deletion.go
@@ -1,6 +1,7 @@
package filer
import (
+ "math"
"strings"
"time"
@@ -129,6 +130,12 @@ func (f *Filer) DeleteChunks(chunks []*filer_pb.FileChunk) {
}
}
+func (f *Filer) DeleteChunksNotRecursive(chunks []*filer_pb.FileChunk) {
+ for _, chunk := range chunks {
+ f.fileIdDeletionQueue.EnQueue(chunk.GetFileIdString())
+ }
+}
+
func (f *Filer) deleteChunksIfNotNew(oldEntry, newEntry *Entry) {
if oldEntry == nil {
@@ -141,14 +148,36 @@ func (f *Filer) deleteChunksIfNotNew(oldEntry, newEntry *Entry) {
var toDelete []*filer_pb.FileChunk
newChunkIds := make(map[string]bool)
- for _, newChunk := range newEntry.Chunks {
+ newDataChunks, newManifestChunks, err := ResolveChunkManifest(f.MasterClient.GetLookupFileIdFunction(),
+ newEntry.Chunks, 0, math.MaxInt64)
+ if err != nil {
+ glog.Errorf("Failed to resolve new entry chunks when delete old entry chunks. new: %s, old: %s",
+ newEntry.Chunks, oldEntry.Chunks)
+ return
+ }
+ for _, newChunk := range newDataChunks {
+ newChunkIds[newChunk.GetFileIdString()] = true
+ }
+ for _, newChunk := range newManifestChunks {
newChunkIds[newChunk.GetFileIdString()] = true
}
- for _, oldChunk := range oldEntry.Chunks {
+ oldDataChunks, oldManifestChunks, err := ResolveChunkManifest(f.MasterClient.GetLookupFileIdFunction(),
+ oldEntry.Chunks, 0, math.MaxInt64)
+ if err != nil {
+ glog.Errorf("Failed to resolve old entry chunks when delete old entry chunks. new: %s, old: %s",
+ newEntry.Chunks, oldEntry.Chunks)
+ return
+ }
+ for _, oldChunk := range oldDataChunks {
+ if _, found := newChunkIds[oldChunk.GetFileIdString()]; !found {
+ toDelete = append(toDelete, oldChunk)
+ }
+ }
+ for _, oldChunk := range oldManifestChunks {
if _, found := newChunkIds[oldChunk.GetFileIdString()]; !found {
toDelete = append(toDelete, oldChunk)
}
}
- f.DeleteChunks(toDelete)
+ f.DeleteChunksNotRecursive(toDelete)
}
diff --git a/weed/shell/command_volume_fsck.go b/weed/shell/command_volume_fsck.go
index 28f2d6753..1b3d7bf0d 100644
--- a/weed/shell/command_volume_fsck.go
+++ b/weed/shell/command_volume_fsck.go
@@ -153,12 +153,12 @@ func (c *commandVolumeFsck) collectFilerFileIdAndPaths(volumeIdToServer map[uint
if verbose && entry.Entry.IsDirectory {
fmt.Fprintf(writer, "checking directory %s\n", util.NewFullPath(entry.Dir, entry.Entry.Name))
}
- dChunks, mChunks, resolveErr := filer.ResolveChunkManifest(filer.LookupFn(c.env), entry.Entry.Chunks, 0, math.MaxInt64)
+ dataChunks, manifestChunks, resolveErr := filer.ResolveChunkManifest(filer.LookupFn(c.env), entry.Entry.Chunks, 0, math.MaxInt64)
if resolveErr != nil {
return nil
}
- dChunks = append(dChunks, mChunks...)
- for _, chunk := range dChunks {
+ dataChunks = append(dataChunks, manifestChunks...)
+ for _, chunk := range dataChunks {
outputChan <- &Item{
vid: chunk.Fid.VolumeId,
fileKey: chunk.Fid.FileKey,
@@ -332,15 +332,15 @@ func (c *commandVolumeFsck) collectFilerFileIds(volumeIdToServer map[uint32]VInf
fileKey uint64
}
return doTraverseBfsAndSaving(c.env, nil, "/", false, func(entry *filer_pb.FullEntry, outputChan chan interface{}) (err error) {
- dChunks, mChunks, resolveErr := filer.ResolveChunkManifest(filer.LookupFn(c.env), entry.Entry.Chunks, 0, math.MaxInt64)
+ dataChunks, manifestChunks, resolveErr := filer.ResolveChunkManifest(filer.LookupFn(c.env), entry.Entry.Chunks, 0, math.MaxInt64)
if resolveErr != nil {
if verbose {
fmt.Fprintf(writer, "resolving manifest chunks in %s: %v\n", util.NewFullPath(entry.Dir, entry.Entry.Name), resolveErr)
}
return nil
}
- dChunks = append(dChunks, mChunks...)
- for _, chunk := range dChunks {
+ dataChunks = append(dataChunks, manifestChunks...)
+ for _, chunk := range dataChunks {
outputChan <- &Item{
vid: chunk.Fid.VolumeId,
fileKey: chunk.Fid.FileKey,