aboutsummaryrefslogtreecommitdiff
path: root/weed/filer2/filer_deletion.go
diff options
context:
space:
mode:
Diffstat (limited to 'weed/filer2/filer_deletion.go')
-rw-r--r--weed/filer2/filer_deletion.go76
1 files changed, 46 insertions, 30 deletions
diff --git a/weed/filer2/filer_deletion.go b/weed/filer2/filer_deletion.go
index 8fe8ae04f..a6b229771 100644
--- a/weed/filer2/filer_deletion.go
+++ b/weed/filer2/filer_deletion.go
@@ -6,16 +6,14 @@ import (
"github.com/chrislusf/seaweedfs/weed/glog"
"github.com/chrislusf/seaweedfs/weed/operation"
"github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
+ "github.com/chrislusf/seaweedfs/weed/wdclient"
)
-func (f *Filer) loopProcessingDeletion() {
-
- ticker := time.NewTicker(5 * time.Second)
-
- lookupFunc := func(vids []string) (map[string]operation.LookupResult, error) {
+func LookupByMasterClientFn(masterClient *wdclient.MasterClient) func(vids []string) (map[string]operation.LookupResult, error) {
+ return func(vids []string) (map[string]operation.LookupResult, error) {
m := make(map[string]operation.LookupResult)
for _, vid := range vids {
- locs := f.MasterClient.GetVidLocations(vid)
+ locs, _ := masterClient.GetVidLocations(vid)
var locations []operation.Location
for _, loc := range locs {
locations = append(locations, operation.Location{
@@ -30,35 +28,56 @@ func (f *Filer) loopProcessingDeletion() {
}
return m, nil
}
+}
+
+func (f *Filer) loopProcessingDeletion() {
+
+ lookupFunc := LookupByMasterClientFn(f.MasterClient)
- var fileIds []string
+ DeletionBatchSize := 100000 // roughly 20 bytes cost per file id.
+
+ var deletionCount int
for {
- select {
- case fid := <-f.fileIdDeletionChan:
- fileIds = append(fileIds, fid)
- if len(fileIds) >= 4096 {
- glog.V(1).Infof("deleting fileIds len=%d", len(fileIds))
- operation.DeleteFilesWithLookupVolumeId(fileIds, lookupFunc)
- fileIds = fileIds[:0]
- }
- case <-ticker.C:
- if len(fileIds) > 0 {
- glog.V(1).Infof("timed deletion fileIds len=%d", len(fileIds))
- operation.DeleteFilesWithLookupVolumeId(fileIds, lookupFunc)
- fileIds = fileIds[:0]
+ deletionCount = 0
+ f.fileIdDeletionQueue.Consume(func(fileIds []string) {
+ for len(fileIds) > 0 {
+ var toDeleteFileIds []string
+ if len(fileIds) > DeletionBatchSize {
+ toDeleteFileIds = fileIds[:DeletionBatchSize]
+ fileIds = fileIds[DeletionBatchSize:]
+ } else {
+ toDeleteFileIds = fileIds
+ fileIds = fileIds[:0]
+ }
+ deletionCount = len(toDeleteFileIds)
+ deleteResults, err := operation.DeleteFilesWithLookupVolumeId(f.GrpcDialOption, toDeleteFileIds, lookupFunc)
+ if err != nil {
+ glog.V(0).Infof("deleting fileIds len=%d error: %v", deletionCount, err)
+ } else {
+ glog.V(1).Infof("deleting fileIds len=%d", deletionCount)
+ }
+ if len(deleteResults) != deletionCount {
+ glog.V(0).Infof("delete %d fileIds actual %d", deletionCount, len(deleteResults))
+ }
}
+ })
+
+ if deletionCount == 0 {
+ time.Sleep(1123 * time.Millisecond)
}
}
}
func (f *Filer) DeleteChunks(chunks []*filer_pb.FileChunk) {
for _, chunk := range chunks {
- f.fileIdDeletionChan <- chunk.FileId
+ f.fileIdDeletionQueue.EnQueue(chunk.GetFileIdString())
}
}
+// DeleteFileByFileId direct delete by file id.
+// Only used when the fileId is not being managed by snapshots.
func (f *Filer) DeleteFileByFileId(fileId string) {
- f.fileIdDeletionChan <- fileId
+ f.fileIdDeletionQueue.EnQueue(fileId)
}
func (f *Filer) deleteChunksIfNotNew(oldEntry, newEntry *Entry) {
@@ -71,16 +90,13 @@ func (f *Filer) deleteChunksIfNotNew(oldEntry, newEntry *Entry) {
}
var toDelete []*filer_pb.FileChunk
+ newChunkIds := make(map[string]bool)
+ for _, newChunk := range newEntry.Chunks {
+ newChunkIds[newChunk.GetFileIdString()] = true
+ }
for _, oldChunk := range oldEntry.Chunks {
- found := false
- for _, newChunk := range newEntry.Chunks {
- if oldChunk.FileId == newChunk.FileId {
- found = true
- break
- }
- }
- if !found {
+ if _, found := newChunkIds[oldChunk.GetFileIdString()]; !found {
toDelete = append(toDelete, oldChunk)
}
}