aboutsummaryrefslogtreecommitdiff
path: root/weed/filer2/filer.go
diff options
context:
space:
mode:
Diffstat (limited to 'weed/filer2/filer.go')
-rw-r--r--weed/filer2/filer.go65
1 files changed, 12 insertions, 53 deletions
diff --git a/weed/filer2/filer.go b/weed/filer2/filer.go
index 0c0a3e1fa..659054d86 100644
--- a/weed/filer2/filer.go
+++ b/weed/filer2/filer.go
@@ -10,24 +10,27 @@ import (
"time"
"github.com/chrislusf/seaweedfs/weed/glog"
- "github.com/chrislusf/seaweedfs/weed/operation"
- "github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
"github.com/chrislusf/seaweedfs/weed/wdclient"
"github.com/karlseguin/ccache"
- "github.com/chrislusf/seaweedfs/weed/storage"
)
type Filer struct {
- store FilerStore
- directoryCache *ccache.Cache
- MasterClient *wdclient.MasterClient
+ store FilerStore
+ directoryCache *ccache.Cache
+ MasterClient *wdclient.MasterClient
+ fileIdDeletionChan chan string
}
func NewFiler(masters []string) *Filer {
- return &Filer{
- directoryCache: ccache.New(ccache.Configure().MaxSize(1000).ItemsToPrune(100)),
- MasterClient: wdclient.NewMasterClient(context.Background(), "filer", masters),
+ f := &Filer{
+ directoryCache: ccache.New(ccache.Configure().MaxSize(1000).ItemsToPrune(100)),
+ MasterClient: wdclient.NewMasterClient(context.Background(), "filer", masters),
+ fileIdDeletionChan: make(chan string, 4096),
}
+
+ go f.loopProcessingDeletion()
+
+ return f
}
func (f *Filer) SetStore(store FilerStore) {
@@ -229,47 +232,3 @@ func (f *Filer) cacheSetDirectory(dirpath string, dirEntry *Entry, level int) {
f.directoryCache.Set(dirpath, dirEntry, time.Duration(minutes)*time.Minute)
}
-
-func (f *Filer) DeleteChunks(chunks []*filer_pb.FileChunk) {
- var fileIds []string
- for _, chunk := range chunks {
- fileIds = append(fileIds, chunk.FileId)
- }
- operation.DeleteFiles(f.GetMaster(), fileIds)
-}
-
-func (f *Filer) DeleteFileByFileId(fileId string) {
- volumeServer, err := f.MasterClient.LookupVolumeServer(fileId)
- if err != nil {
- glog.V(0).Infof("can not find file %s: %v", fileId, err)
- }
- if _, err := operation.DeleteFilesAtOneVolumeServer(volumeServer, []string{fileId}); err != nil && err != storage.NotFound {
- glog.V(0).Infof("deleting file %s: %v", fileId, err)
- }
-}
-
-func (f *Filer) deleteChunksIfNotNew(oldEntry, newEntry *Entry) {
-
- if oldEntry == nil {
- return
- }
- if newEntry == nil {
- f.DeleteChunks(oldEntry.Chunks)
- }
-
- var toDelete []*filer_pb.FileChunk
-
- for _, oldChunk := range oldEntry.Chunks {
- found := false
- for _, newChunk := range newEntry.Chunks {
- if oldChunk.FileId == newChunk.FileId {
- found = true
- break
- }
- }
- if !found {
- toDelete = append(toDelete, oldChunk)
- }
- }
- f.DeleteChunks(toDelete)
-}