aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorChris Lu <chris.lu@gmail.com>2018-11-20 20:56:28 -0800
committerChris Lu <chris.lu@gmail.com>2018-11-20 20:56:28 -0800
commitb282e34dc2933c71929b1d9297c44b1598344a1f (patch)
tree3d498e47aef09d0370280bdbffc9c83136919187
parent5065d4ab2df795436b1782c46836b2253a99034b (diff)
downloadseaweedfs-b282e34dc2933c71929b1d9297c44b1598344a1f.tar.xz
seaweedfs-b282e34dc2933c71929b1d9297c44b1598344a1f.zip
async file chunk deletion
-rw-r--r--weed/filer2/filer.go65
-rw-r--r--weed/filer2/filer_deletion.go88
-rw-r--r--weed/operation/delete_content.go12
-rw-r--r--weed/wdclient/vid_map.go9
4 files changed, 120 insertions, 54 deletions
diff --git a/weed/filer2/filer.go b/weed/filer2/filer.go
index 0c0a3e1fa..659054d86 100644
--- a/weed/filer2/filer.go
+++ b/weed/filer2/filer.go
@@ -10,24 +10,27 @@ import (
"time"
"github.com/chrislusf/seaweedfs/weed/glog"
- "github.com/chrislusf/seaweedfs/weed/operation"
- "github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
"github.com/chrislusf/seaweedfs/weed/wdclient"
"github.com/karlseguin/ccache"
- "github.com/chrislusf/seaweedfs/weed/storage"
)
type Filer struct {
- store FilerStore
- directoryCache *ccache.Cache
- MasterClient *wdclient.MasterClient
+ store FilerStore
+ directoryCache *ccache.Cache
+ MasterClient *wdclient.MasterClient
+ fileIdDeletionChan chan string
}
func NewFiler(masters []string) *Filer {
- return &Filer{
- directoryCache: ccache.New(ccache.Configure().MaxSize(1000).ItemsToPrune(100)),
- MasterClient: wdclient.NewMasterClient(context.Background(), "filer", masters),
+ f := &Filer{
+ directoryCache: ccache.New(ccache.Configure().MaxSize(1000).ItemsToPrune(100)),
+ MasterClient: wdclient.NewMasterClient(context.Background(), "filer", masters),
+ fileIdDeletionChan: make(chan string, 4096),
}
+
+ go f.loopProcessingDeletion()
+
+ return f
}
func (f *Filer) SetStore(store FilerStore) {
@@ -229,47 +232,3 @@ func (f *Filer) cacheSetDirectory(dirpath string, dirEntry *Entry, level int) {
f.directoryCache.Set(dirpath, dirEntry, time.Duration(minutes)*time.Minute)
}
-
-func (f *Filer) DeleteChunks(chunks []*filer_pb.FileChunk) {
- var fileIds []string
- for _, chunk := range chunks {
- fileIds = append(fileIds, chunk.FileId)
- }
- operation.DeleteFiles(f.GetMaster(), fileIds)
-}
-
-func (f *Filer) DeleteFileByFileId(fileId string) {
- volumeServer, err := f.MasterClient.LookupVolumeServer(fileId)
- if err != nil {
- glog.V(0).Infof("can not find file %s: %v", fileId, err)
- }
- if _, err := operation.DeleteFilesAtOneVolumeServer(volumeServer, []string{fileId}); err != nil && err != storage.NotFound {
- glog.V(0).Infof("deleting file %s: %v", fileId, err)
- }
-}
-
-func (f *Filer) deleteChunksIfNotNew(oldEntry, newEntry *Entry) {
-
- if oldEntry == nil {
- return
- }
- if newEntry == nil {
- f.DeleteChunks(oldEntry.Chunks)
- }
-
- var toDelete []*filer_pb.FileChunk
-
- for _, oldChunk := range oldEntry.Chunks {
- found := false
- for _, newChunk := range newEntry.Chunks {
- if oldChunk.FileId == newChunk.FileId {
- found = true
- break
- }
- }
- if !found {
- toDelete = append(toDelete, oldChunk)
- }
- }
- f.DeleteChunks(toDelete)
-}
diff --git a/weed/filer2/filer_deletion.go b/weed/filer2/filer_deletion.go
new file mode 100644
index 000000000..4561ecf54
--- /dev/null
+++ b/weed/filer2/filer_deletion.go
@@ -0,0 +1,88 @@
+package filer2
+
+import (
+ "time"
+
+ "github.com/chrislusf/seaweedfs/weed/operation"
+ "github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
+ "github.com/chrislusf/seaweedfs/weed/glog"
+)
+
+func (f *Filer) loopProcessingDeletion() {
+
+ ticker := time.NewTicker(5 * time.Second)
+
+ lookupFunc := func(vids []string) (map[string]operation.LookupResult, error) {
+ m := make(map[string]operation.LookupResult)
+ for _, vid := range vids {
+ locs := f.MasterClient.GetVidLocations(vid)
+ var locations []operation.Location
+ for _, loc := range locs {
+ locations = append(locations, operation.Location{
+ Url: loc.Url,
+ PublicUrl: loc.PublicUrl,
+ })
+ }
+ m[vid] = operation.LookupResult{
+ VolumeId: vid,
+ Locations: locations,
+ }
+ }
+ return m, nil
+ }
+
+ var fileIds []string
+ for {
+ select {
+ case fid := <-f.fileIdDeletionChan:
+ fileIds = append(fileIds, fid)
+ if len(fileIds) >= 4096 {
+ glog.V(1).Infof("deleting fileIds len=%d", len(fileIds))
+ operation.DeleteFilesWithLookupVolumeId(fileIds, lookupFunc)
+ fileIds = fileIds[:0]
+ }
+ case <-ticker.C:
+ if len(fileIds) > 0 {
+ glog.V(1).Infof("timed deletion fileIds len=%d", len(fileIds))
+ operation.DeleteFilesWithLookupVolumeId(fileIds, lookupFunc)
+ fileIds = fileIds[:0]
+ }
+ }
+ }
+}
+
+func (f *Filer) DeleteChunks(chunks []*filer_pb.FileChunk) {
+ for _, chunk := range chunks {
+ f.fileIdDeletionChan <- chunk.FileId
+ }
+}
+
+func (f *Filer) DeleteFileByFileId(fileId string) {
+ f.fileIdDeletionChan <- fileId
+}
+
+func (f *Filer) deleteChunksIfNotNew(oldEntry, newEntry *Entry) {
+
+ if oldEntry == nil {
+ return
+ }
+ if newEntry == nil {
+ f.DeleteChunks(oldEntry.Chunks)
+ }
+
+ var toDelete []*filer_pb.FileChunk
+
+ for _, oldChunk := range oldEntry.Chunks {
+ found := false
+ for _, newChunk := range newEntry.Chunks {
+ if oldChunk.FileId == newChunk.FileId {
+ found = true
+ break
+ }
+ }
+ if !found {
+ toDelete = append(toDelete, oldChunk)
+ }
+ }
+ f.DeleteChunks(toDelete)
+}
diff --git a/weed/operation/delete_content.go b/weed/operation/delete_content.go
index 806bfbe7b..fcb4f718a 100644
--- a/weed/operation/delete_content.go
+++ b/weed/operation/delete_content.go
@@ -29,6 +29,16 @@ func ParseFileId(fid string) (vid string, key_cookie string, err error) {
// DeleteFiles batch deletes a list of fileIds
func DeleteFiles(master string, fileIds []string) ([]*volume_server_pb.DeleteResult, error) {
+ lookupFunc := func(vids []string) (map[string]LookupResult, error) {
+ return LookupVolumeIds(master, vids)
+ }
+
+ return DeleteFilesWithLookupVolumeId(fileIds, lookupFunc)
+
+}
+
+func DeleteFilesWithLookupVolumeId(fileIds []string, lookupFunc func(vid []string) (map[string]LookupResult, error)) ([]*volume_server_pb.DeleteResult, error) {
+
var ret []*volume_server_pb.DeleteResult
vid_to_fileIds := make(map[string][]string)
@@ -50,7 +60,7 @@ func DeleteFiles(master string, fileIds []string) ([]*volume_server_pb.DeleteRes
vid_to_fileIds[vid] = append(vid_to_fileIds[vid], fileId)
}
- lookupResults, err := LookupVolumeIds(master, vids)
+ lookupResults, err := lookupFunc(vids)
if err != nil {
return ret, err
}
diff --git a/weed/wdclient/vid_map.go b/weed/wdclient/vid_map.go
index 02c3efd17..aef29f56f 100644
--- a/weed/wdclient/vid_map.go
+++ b/weed/wdclient/vid_map.go
@@ -66,6 +66,15 @@ func (vc *vidMap) LookupVolumeServer(fileId string) (volumeServer string, err er
return serverUrl, nil
}
+func (vc *vidMap) GetVidLocations(vid string) (locations []Location) {
+ id, err := strconv.Atoi(vid)
+ if err != nil {
+ glog.V(1).Infof("Unknown volume id %s", vid)
+ return nil
+ }
+ return vc.GetLocations(uint32(id))
+}
+
func (vc *vidMap) GetLocations(vid uint32) (locations []Location) {
vc.RLock()
defer vc.RUnlock()