aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--weed/filer2/filer.go19
-rw-r--r--weed/filer2/filer_deletion.go33
-rw-r--r--weed/util/queue_unbounded.go45
-rw-r--r--weed/util/queue_unbounded_test.go25
4 files changed, 95 insertions, 27 deletions
diff --git a/weed/filer2/filer.go b/weed/filer2/filer.go
index a0af942e0..4db48e386 100644
--- a/weed/filer2/filer.go
+++ b/weed/filer2/filer.go
@@ -13,6 +13,7 @@ import (
"github.com/karlseguin/ccache"
"github.com/chrislusf/seaweedfs/weed/glog"
+ "github.com/chrislusf/seaweedfs/weed/util"
"github.com/chrislusf/seaweedfs/weed/wdclient"
)
@@ -24,19 +25,19 @@ var (
)
type Filer struct {
- store *FilerStoreWrapper
- directoryCache *ccache.Cache
- MasterClient *wdclient.MasterClient
- fileIdDeletionChan chan string
- GrpcDialOption grpc.DialOption
+ store *FilerStoreWrapper
+ directoryCache *ccache.Cache
+ MasterClient *wdclient.MasterClient
+ fileIdDeletionQueue *util.UnboundedQueue
+ GrpcDialOption grpc.DialOption
}
func NewFiler(masters []string, grpcDialOption grpc.DialOption) *Filer {
f := &Filer{
- directoryCache: ccache.New(ccache.Configure().MaxSize(1000).ItemsToPrune(100)),
- MasterClient: wdclient.NewMasterClient(context.Background(), grpcDialOption, "filer", masters),
- fileIdDeletionChan: make(chan string, PaginationSize),
- GrpcDialOption: grpcDialOption,
+ directoryCache: ccache.New(ccache.Configure().MaxSize(1000).ItemsToPrune(100)),
+ MasterClient: wdclient.NewMasterClient(context.Background(), grpcDialOption, "filer", masters),
+ fileIdDeletionQueue: util.NewUnboundedQueue(),
+ GrpcDialOption: grpcDialOption,
}
go f.loopProcessingDeletion()
diff --git a/weed/filer2/filer_deletion.go b/weed/filer2/filer_deletion.go
index 9937685f7..3a64f636e 100644
--- a/weed/filer2/filer_deletion.go
+++ b/weed/filer2/filer_deletion.go
@@ -10,8 +10,6 @@ import (
func (f *Filer) loopProcessingDeletion() {
- ticker := time.NewTicker(5 * time.Second)
-
lookupFunc := func(vids []string) (map[string]operation.LookupResult, error) {
m := make(map[string]operation.LookupResult)
for _, vid := range vids {
@@ -31,36 +29,35 @@ func (f *Filer) loopProcessingDeletion() {
return m, nil
}
- var fileIds []string
+ var deletionCount int
for {
- select {
- case fid := <-f.fileIdDeletionChan:
- fileIds = append(fileIds, fid)
- if len(fileIds) >= 4096 {
- glog.V(1).Infof("deleting fileIds len=%d", len(fileIds))
- operation.DeleteFilesWithLookupVolumeId(f.GrpcDialOption, fileIds, lookupFunc)
- fileIds = fileIds[:0]
- }
- case <-ticker.C:
- if len(fileIds) > 0 {
- glog.V(1).Infof("timed deletion fileIds len=%d", len(fileIds))
- operation.DeleteFilesWithLookupVolumeId(f.GrpcDialOption, fileIds, lookupFunc)
- fileIds = fileIds[:0]
+ deletionCount = 0
+ f.fileIdDeletionQueue.Consume(func(fileIds []string) {
+ deletionCount = len(fileIds)
+ _, err := operation.DeleteFilesWithLookupVolumeId(f.GrpcDialOption, fileIds, lookupFunc)
+ if err != nil {
+ glog.V(0).Infof("deleting fileIds len=%d error: %v", deletionCount, err)
+ } else {
+ glog.V(1).Infof("deleting fileIds len=%d", deletionCount)
}
+ })
+
+ if deletionCount == 0 {
+ time.Sleep(1123 * time.Millisecond)
}
}
}
func (f *Filer) DeleteChunks(chunks []*filer_pb.FileChunk) {
for _, chunk := range chunks {
- f.fileIdDeletionChan <- chunk.GetFileIdString()
+ f.fileIdDeletionQueue.EnQueue(chunk.GetFileIdString())
}
}
// DeleteFileByFileId direct delete by file id.
// Only used when the fileId is not being managed by snapshots.
func (f *Filer) DeleteFileByFileId(fileId string) {
- f.fileIdDeletionChan <- fileId
+ f.fileIdDeletionQueue.EnQueue(fileId)
}
func (f *Filer) deleteChunksIfNotNew(oldEntry, newEntry *Entry) {
diff --git a/weed/util/queue_unbounded.go b/weed/util/queue_unbounded.go
new file mode 100644
index 000000000..664cd965e
--- /dev/null
+++ b/weed/util/queue_unbounded.go
@@ -0,0 +1,45 @@
+package util
+
+import "sync"
+
+type UnboundedQueue struct {
+ outbound []string
+ outboundLock sync.RWMutex
+ inbound []string
+ inboundLock sync.RWMutex
+}
+
+func NewUnboundedQueue() *UnboundedQueue {
+ q := &UnboundedQueue{}
+ return q
+}
+
+func (q *UnboundedQueue) EnQueue(items ...string) {
+ q.inboundLock.Lock()
+ defer q.inboundLock.Unlock()
+
+ q.outbound = append(q.outbound, items...)
+
+}
+
+func (q *UnboundedQueue) Consume(fn func([]string)) {
+ q.outboundLock.Lock()
+ defer q.outboundLock.Unlock()
+
+ if len(q.outbound) == 0 {
+ q.inboundLock.Lock()
+ inbountLen := len(q.inbound)
+ if inbountLen > 0 {
+ t := q.outbound
+ q.outbound = q.inbound
+ q.inbound = t
+ }
+ q.inboundLock.Unlock()
+ }
+
+ if len(q.outbound) > 0 {
+ fn(q.outbound)
+ q.outbound = q.outbound[:0]
+ }
+
+}
diff --git a/weed/util/queue_unbounded_test.go b/weed/util/queue_unbounded_test.go
new file mode 100644
index 000000000..2d02032cb
--- /dev/null
+++ b/weed/util/queue_unbounded_test.go
@@ -0,0 +1,25 @@
+package util
+
+import "testing"
+
+func TestEnqueueAndConsume(t *testing.T) {
+
+ q := NewUnboundedQueue()
+
+ q.EnQueue("1", "2", "3")
+
+ f := func(items []string) {
+ for _, t := range items {
+ println(t)
+ }
+ println("-----------------------")
+ }
+ q.Consume(f)
+
+ q.Consume(f)
+
+ q.EnQueue("4", "5")
+ q.EnQueue("6", "7")
+ q.Consume(f)
+
+}