aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorChris Lu <chris.lu@gmail.com>2020-08-30 02:07:14 -0700
committerChris Lu <chris.lu@gmail.com>2020-08-30 02:07:14 -0700
commitf2a8574448d8c0e0233711778dc7bfe1e9154b63 (patch)
tree2191eb4d8586622da36790dd32588f78c1f832eb
parent99ecf63276b3b33351f316c4d3ea7269d591a3cb (diff)
downloadseaweedfs-f2a8574448d8c0e0233711778dc7bfe1e9154b63.tar.xz
seaweedfs-f2a8574448d8c0e0233711778dc7bfe1e9154b63.zip
filer and mount deletion resolves manifest chunks also
-rw-r--r--weed/filer2/filechunk_manifest.go37
-rw-r--r--weed/filer2/filer_deletion.go18
-rw-r--r--weed/filesys/wfs_deletion.go12
3 files changed, 48 insertions, 19 deletions
diff --git a/weed/filer2/filechunk_manifest.go b/weed/filer2/filechunk_manifest.go
index e8cf564e3..ba4625bab 100644
--- a/weed/filer2/filechunk_manifest.go
+++ b/weed/filer2/filechunk_manifest.go
@@ -28,7 +28,7 @@ func HasChunkManifest(chunks []*filer_pb.FileChunk) bool {
func SeparateManifestChunks(chunks []*filer_pb.FileChunk) (manifestChunks, nonManifestChunks []*filer_pb.FileChunk) {
for _, c := range chunks {
- if !c.IsChunkManifest {
+ if c.IsChunkManifest {
manifestChunks = append(manifestChunks, c)
} else {
nonManifestChunks = append(nonManifestChunks, c)
@@ -37,7 +37,7 @@ func SeparateManifestChunks(chunks []*filer_pb.FileChunk) (manifestChunks, nonMa
return
}
-func ResolveChunkManifest(lookupFileIdFn LookupFileIdFunctionType, chunks []*filer_pb.FileChunk) (dataChunks, manifestChunks []*filer_pb.FileChunk, manefestResolveErr error) {
+func ResolveChunkManifest(lookupFileIdFn LookupFileIdFunctionType, chunks []*filer_pb.FileChunk) (dataChunks, manifestChunks []*filer_pb.FileChunk, manifestResolveErr error) {
// TODO maybe parallel this
for _, chunk := range chunks {
if !chunk.IsChunkManifest {
@@ -45,19 +45,14 @@ func ResolveChunkManifest(lookupFileIdFn LookupFileIdFunctionType, chunks []*fil
continue
}
- // IsChunkManifest
- data, err := fetchChunk(lookupFileIdFn, chunk.FileId, chunk.CipherKey, chunk.IsCompressed)
+ resolvedChunks, err := ResolveOneChunkManifest(lookupFileIdFn, chunk)
if err != nil {
- return chunks, nil, fmt.Errorf("fail to read manifest %s: %v", chunk.FileId, err)
- }
- m := &filer_pb.FileChunkManifest{}
- if err := proto.Unmarshal(data, m); err != nil {
- return chunks, nil, fmt.Errorf("fail to unmarshal manifest %s: %v", chunk.FileId, err)
+ return chunks, nil, err
}
+
manifestChunks = append(manifestChunks, chunk)
// recursive
- filer_pb.AfterEntryDeserialization(m.Chunks)
- dchunks, mchunks, subErr := ResolveChunkManifest(lookupFileIdFn, m.Chunks)
+ dchunks, mchunks, subErr := ResolveChunkManifest(lookupFileIdFn, resolvedChunks)
if subErr != nil {
return chunks, nil, subErr
}
@@ -67,6 +62,26 @@ func ResolveChunkManifest(lookupFileIdFn LookupFileIdFunctionType, chunks []*fil
return
}
+func ResolveOneChunkManifest(lookupFileIdFn LookupFileIdFunctionType, chunk *filer_pb.FileChunk) (dataChunks []*filer_pb.FileChunk, manifestResolveErr error) {
+ if !chunk.IsChunkManifest {
+ return
+ }
+
+ // IsChunkManifest
+ data, err := fetchChunk(lookupFileIdFn, chunk.GetFileIdString(), chunk.CipherKey, chunk.IsCompressed)
+ if err != nil {
+ return nil, fmt.Errorf("fail to read manifest %s: %v", chunk.GetFileIdString(), err)
+ }
+ m := &filer_pb.FileChunkManifest{}
+ if err := proto.Unmarshal(data, m); err != nil {
+ return nil, fmt.Errorf("fail to unmarshal manifest %s: %v", chunk.GetFileIdString(), err)
+ }
+
+ // recursive
+ filer_pb.AfterEntryDeserialization(m.Chunks)
+ return m.Chunks, nil
+}
+
// TODO fetch from cache for weed mount?
func fetchChunk(lookupFileIdFn LookupFileIdFunctionType, fileId string, cipherKey []byte, isGzipped bool) ([]byte, error) {
urlString, err := lookupFileIdFn(fileId)
diff --git a/weed/filer2/filer_deletion.go b/weed/filer2/filer_deletion.go
index 2ff9dac63..e3eb0e61f 100644
--- a/weed/filer2/filer_deletion.go
+++ b/weed/filer2/filer_deletion.go
@@ -70,16 +70,20 @@ func (f *Filer) loopProcessingDeletion() {
func (f *Filer) DeleteChunks(chunks []*filer_pb.FileChunk) {
for _, chunk := range chunks {
- f.fileIdDeletionQueue.EnQueue(chunk.GetFileIdString())
+ if !chunk.IsChunkManifest {
+ f.fileIdDeletionQueue.EnQueue(chunk.GetFileIdString())
+ continue
+ }
+ dataChunks, manifestResolveErr := ResolveOneChunkManifest(f.MasterClient.LookupFileId, chunk)
+ if manifestResolveErr != nil {
+ glog.V(0).Infof("failed to resolve manifest %s: %v", chunk.FileId, manifestResolveErr)
+ }
+ for _, dChunk := range dataChunks {
+ f.fileIdDeletionQueue.EnQueue(dChunk.GetFileIdString())
+ }
}
}
-// DeleteFileByFileId direct delete by file id.
-// Only used when the fileId is not being managed by snapshots.
-func (f *Filer) DeleteFileByFileId(fileId string) {
- f.fileIdDeletionQueue.EnQueue(fileId)
-}
-
func (f *Filer) deleteChunksIfNotNew(oldEntry, newEntry *Entry) {
if oldEntry == nil {
diff --git a/weed/filesys/wfs_deletion.go b/weed/filesys/wfs_deletion.go
index 203ebdad1..210ae1ba4 100644
--- a/weed/filesys/wfs_deletion.go
+++ b/weed/filesys/wfs_deletion.go
@@ -18,7 +18,17 @@ func (wfs *WFS) deleteFileChunks(chunks []*filer_pb.FileChunk) {
var fileIds []string
for _, chunk := range chunks {
- fileIds = append(fileIds, chunk.GetFileIdString())
+ if !chunk.IsChunkManifest {
+ fileIds = append(fileIds, chunk.GetFileIdString())
+ continue
+ }
+ dataChunks, manifestResolveErr := filer2.ResolveOneChunkManifest(filer2.LookupFn(wfs), chunk)
+ if manifestResolveErr != nil {
+ glog.V(0).Infof("failed to resolve manifest %s: %v", chunk.FileId, manifestResolveErr)
+ }
+ for _, dChunk := range dataChunks {
+ fileIds = append(fileIds, dChunk.GetFileIdString())
+ }
}
wfs.WithFilerClient(func(client filer_pb.SeaweedFilerClient) error {