diff options
Diffstat (limited to 'weed/filer2/filer_deletion.go')
| -rw-r--r-- | weed/filer2/filer_deletion.go | 40 |
1 files changed, 18 insertions, 22 deletions
diff --git a/weed/filer2/filer_deletion.go b/weed/filer2/filer_deletion.go index 25e27e504..3a64f636e 100644 --- a/weed/filer2/filer_deletion.go +++ b/weed/filer2/filer_deletion.go @@ -10,8 +10,6 @@ import ( func (f *Filer) loopProcessingDeletion() { - ticker := time.NewTicker(5 * time.Second) - lookupFunc := func(vids []string) (map[string]operation.LookupResult, error) { m := make(map[string]operation.LookupResult) for _, vid := range vids { @@ -31,37 +29,35 @@ func (f *Filer) loopProcessingDeletion() { return m, nil } - var fileIds []string + var deletionCount int for { - select { - case fid := <-f.fileIdDeletionChan: - fileIds = append(fileIds, fid) - if len(fileIds) >= 4096 { - glog.V(1).Infof("deleting fileIds len=%d", len(fileIds)) - operation.DeleteFilesWithLookupVolumeId(f.GrpcDialOption, fileIds, lookupFunc) - fileIds = fileIds[:0] - } - case <-ticker.C: - if len(fileIds) > 0 { - glog.V(1).Infof("timed deletion fileIds len=%d", len(fileIds)) - operation.DeleteFilesWithLookupVolumeId(f.GrpcDialOption, fileIds, lookupFunc) - fileIds = fileIds[:0] + deletionCount = 0 + f.fileIdDeletionQueue.Consume(func(fileIds []string) { + deletionCount = len(fileIds) + _, err := operation.DeleteFilesWithLookupVolumeId(f.GrpcDialOption, fileIds, lookupFunc) + if err != nil { + glog.V(0).Infof("deleting fileIds len=%d error: %v", deletionCount, err) + } else { + glog.V(1).Infof("deleting fileIds len=%d", deletionCount) } + }) + + if deletionCount == 0 { + time.Sleep(1123 * time.Millisecond) } } } -func (f *Filer) DeleteChunks(fullpath FullPath, chunks []*filer_pb.FileChunk) { +func (f *Filer) DeleteChunks(chunks []*filer_pb.FileChunk) { for _, chunk := range chunks { - glog.V(3).Infof("deleting %s chunk %s", fullpath, chunk.String()) - f.fileIdDeletionChan <- chunk.GetFileIdString() + f.fileIdDeletionQueue.EnQueue(chunk.GetFileIdString()) } } // DeleteFileByFileId direct delete by file id. // Only used when the fileId is not being managed by snapshots. func (f *Filer) DeleteFileByFileId(fileId string) { - f.fileIdDeletionChan <- fileId + f.fileIdDeletionQueue.EnQueue(fileId) } func (f *Filer) deleteChunksIfNotNew(oldEntry, newEntry *Entry) { @@ -70,7 +66,7 @@ func (f *Filer) deleteChunksIfNotNew(oldEntry, newEntry *Entry) { return } if newEntry == nil { - f.DeleteChunks(oldEntry.FullPath, oldEntry.Chunks) + f.DeleteChunks(oldEntry.Chunks) } var toDelete []*filer_pb.FileChunk @@ -84,5 +80,5 @@ func (f *Filer) deleteChunksIfNotNew(oldEntry, newEntry *Entry) { toDelete = append(toDelete, oldChunk) } } - f.DeleteChunks(oldEntry.FullPath, toDelete) + f.DeleteChunks(toDelete) } |
