aboutsummaryrefslogtreecommitdiff
path: root/weed/filesys/wfs_deletion.go
diff options
context:
space:
mode:
Diffstat (limited to 'weed/filesys/wfs_deletion.go')
-rw-r--r--weed/filesys/wfs_deletion.go89
1 files changed, 52 insertions, 37 deletions
diff --git a/weed/filesys/wfs_deletion.go b/weed/filesys/wfs_deletion.go
index f58ef24f4..bf21b1808 100644
--- a/weed/filesys/wfs_deletion.go
+++ b/weed/filesys/wfs_deletion.go
@@ -2,39 +2,15 @@ package filesys
import (
"context"
- "time"
+ "google.golang.org/grpc"
+
+ "github.com/chrislusf/seaweedfs/weed/filer2"
"github.com/chrislusf/seaweedfs/weed/glog"
+ "github.com/chrislusf/seaweedfs/weed/operation"
"github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
)
-func (wfs *WFS) loopProcessingDeletion() {
-
- ticker := time.NewTicker(2 * time.Second)
-
- wfs.withFilerClient(func(client filer_pb.SeaweedFilerClient) error {
- var fileIds []string
- for {
- select {
- case fids := <-wfs.fileIdsDeletionChan:
- fileIds = append(fileIds, fids...)
- if len(fileIds) >= 1024 {
- glog.V(1).Infof("deleting fileIds len=%d", len(fileIds))
- deleteFileIds(context.Background(), client, fileIds)
- fileIds = fileIds[:0]
- }
- case <-ticker.C:
- if len(fileIds) > 0 {
- glog.V(1).Infof("timed deletion fileIds len=%d", len(fileIds))
- deleteFileIds(context.Background(), client, fileIds)
- fileIds = fileIds[:0]
- }
- }
- }
- })
-
-}
-
func (wfs *WFS) deleteFileChunks(chunks []*filer_pb.FileChunk) {
if len(chunks) == 0 {
return
@@ -42,17 +18,56 @@ func (wfs *WFS) deleteFileChunks(chunks []*filer_pb.FileChunk) {
var fileIds []string
for _, chunk := range chunks {
- fileIds = append(fileIds, chunk.FileId)
+ fileIds = append(fileIds, chunk.GetFileIdString())
}
- var async = false
- if async {
- wfs.fileIdsDeletionChan <- fileIds
- return
- }
-
- wfs.withFilerClient(func(client filer_pb.SeaweedFilerClient) error {
- deleteFileIds(context.Background(), client, fileIds)
+ wfs.WithFilerClient(func(client filer_pb.SeaweedFilerClient) error {
+ wfs.deleteFileIds(wfs.option.GrpcDialOption, client, fileIds)
return nil
})
}
+
+func (wfs *WFS) deleteFileIds(grpcDialOption grpc.DialOption, client filer_pb.SeaweedFilerClient, fileIds []string) error {
+
+ var vids []string
+ for _, fileId := range fileIds {
+ vids = append(vids, filer2.VolumeId(fileId))
+ }
+
+ lookupFunc := func(vids []string) (map[string]operation.LookupResult, error) {
+
+ m := make(map[string]operation.LookupResult)
+
+ glog.V(4).Infof("remove file lookup volume id locations: %v", vids)
+ resp, err := client.LookupVolume(context.Background(), &filer_pb.LookupVolumeRequest{
+ VolumeIds: vids,
+ })
+ if err != nil {
+ return m, err
+ }
+
+ for _, vid := range vids {
+ lr := operation.LookupResult{
+ VolumeId: vid,
+ Locations: nil,
+ }
+ locations, found := resp.LocationsMap[vid]
+ if !found {
+ continue
+ }
+ for _, loc := range locations.Locations {
+ lr.Locations = append(lr.Locations, operation.Location{
+ Url: wfs.AdjustedUrl(loc.Url),
+ PublicUrl: loc.PublicUrl,
+ })
+ }
+ m[vid] = lr
+ }
+
+ return m, err
+ }
+
+ _, err := operation.DeleteFilesWithLookupVolumeId(grpcDialOption, fileIds, lookupFunc)
+
+ return err
+}