diff options
Diffstat (limited to 'weed/filesys/wfs_deletion.go')
| -rw-r--r-- | weed/filesys/wfs_deletion.go | 89 |
1 files changed, 52 insertions, 37 deletions
diff --git a/weed/filesys/wfs_deletion.go b/weed/filesys/wfs_deletion.go index f58ef24f4..bf21b1808 100644 --- a/weed/filesys/wfs_deletion.go +++ b/weed/filesys/wfs_deletion.go @@ -2,39 +2,15 @@ package filesys import ( "context" - "time" + "google.golang.org/grpc" + + "github.com/chrislusf/seaweedfs/weed/filer2" "github.com/chrislusf/seaweedfs/weed/glog" + "github.com/chrislusf/seaweedfs/weed/operation" "github.com/chrislusf/seaweedfs/weed/pb/filer_pb" ) -func (wfs *WFS) loopProcessingDeletion() { - - ticker := time.NewTicker(2 * time.Second) - - wfs.withFilerClient(func(client filer_pb.SeaweedFilerClient) error { - var fileIds []string - for { - select { - case fids := <-wfs.fileIdsDeletionChan: - fileIds = append(fileIds, fids...) - if len(fileIds) >= 1024 { - glog.V(1).Infof("deleting fileIds len=%d", len(fileIds)) - deleteFileIds(context.Background(), client, fileIds) - fileIds = fileIds[:0] - } - case <-ticker.C: - if len(fileIds) > 0 { - glog.V(1).Infof("timed deletion fileIds len=%d", len(fileIds)) - deleteFileIds(context.Background(), client, fileIds) - fileIds = fileIds[:0] - } - } - } - }) - -} - func (wfs *WFS) deleteFileChunks(chunks []*filer_pb.FileChunk) { if len(chunks) == 0 { return @@ -42,17 +18,56 @@ func (wfs *WFS) deleteFileChunks(chunks []*filer_pb.FileChunk) { var fileIds []string for _, chunk := range chunks { - fileIds = append(fileIds, chunk.FileId) + fileIds = append(fileIds, chunk.GetFileIdString()) } - var async = false - if async { - wfs.fileIdsDeletionChan <- fileIds - return - } - - wfs.withFilerClient(func(client filer_pb.SeaweedFilerClient) error { - deleteFileIds(context.Background(), client, fileIds) + wfs.WithFilerClient(func(client filer_pb.SeaweedFilerClient) error { + wfs.deleteFileIds(wfs.option.GrpcDialOption, client, fileIds) return nil }) } + +func (wfs *WFS) deleteFileIds(grpcDialOption grpc.DialOption, client filer_pb.SeaweedFilerClient, fileIds []string) error { + + var vids []string + for _, fileId := range fileIds { + vids = append(vids, filer2.VolumeId(fileId)) + } + + lookupFunc := func(vids []string) (map[string]operation.LookupResult, error) { + + m := make(map[string]operation.LookupResult) + + glog.V(4).Infof("remove file lookup volume id locations: %v", vids) + resp, err := client.LookupVolume(context.Background(), &filer_pb.LookupVolumeRequest{ + VolumeIds: vids, + }) + if err != nil { + return m, err + } + + for _, vid := range vids { + lr := operation.LookupResult{ + VolumeId: vid, + Locations: nil, + } + locations, found := resp.LocationsMap[vid] + if !found { + continue + } + for _, loc := range locations.Locations { + lr.Locations = append(lr.Locations, operation.Location{ + Url: wfs.AdjustedUrl(loc.Url), + PublicUrl: loc.PublicUrl, + }) + } + m[vid] = lr + } + + return m, err + } + + _, err := operation.DeleteFilesWithLookupVolumeId(grpcDialOption, fileIds, lookupFunc) + + return err +} |
