aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--weed/shell/command_fs_meta_notify.go2
-rw-r--r--weed/shell/command_fs_meta_save.go60
-rw-r--r--weed/util/queue.go61
3 files changed, 111 insertions, 12 deletions
diff --git a/weed/shell/command_fs_meta_notify.go b/weed/shell/command_fs_meta_notify.go
index 13b272fbf..4fe0e45a9 100644
--- a/weed/shell/command_fs_meta_notify.go
+++ b/weed/shell/command_fs_meta_notify.go
@@ -50,7 +50,7 @@ func (c *commandFsMetaNotify) Do(args []string, commandEnv *CommandEnv, writer i
var dirCount, fileCount uint64
- err = doTraverse(ctx, writer, client, filer2.FullPath(path), func(parentPath filer2.FullPath, entry *filer_pb.Entry) error {
+ err = doTraverseBFS(ctx, writer, client, filer2.FullPath(path), func(parentPath filer2.FullPath, entry *filer_pb.Entry) error {
if entry.IsDirectory {
dirCount++
diff --git a/weed/shell/command_fs_meta_save.go b/weed/shell/command_fs_meta_save.go
index e710fe297..dd5e9defb 100644
--- a/weed/shell/command_fs_meta_save.go
+++ b/weed/shell/command_fs_meta_save.go
@@ -6,12 +6,15 @@ import (
"fmt"
"io"
"os"
+ "sync"
+ "sync/atomic"
"time"
+ "github.com/golang/protobuf/proto"
+
"github.com/chrislusf/seaweedfs/weed/filer2"
"github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
"github.com/chrislusf/seaweedfs/weed/util"
- "github.com/golang/protobuf/proto"
)
func init() {
@@ -75,9 +78,7 @@ func (c *commandFsMetaSave) Do(args []string, commandEnv *CommandEnv, writer io.
var dirCount, fileCount uint64
- sizeBuf := make([]byte, 4)
-
- err = doTraverse(ctx, writer, client, filer2.FullPath(path), func(parentPath filer2.FullPath, entry *filer_pb.Entry) error {
+ err = doTraverseBFS(ctx, writer, client, filer2.FullPath(path), func(parentPath filer2.FullPath, entry *filer_pb.Entry) error {
protoMessage := &filer_pb.FullEntry{
Dir: string(parentPath),
@@ -89,15 +90,16 @@ func (c *commandFsMetaSave) Do(args []string, commandEnv *CommandEnv, writer io.
return fmt.Errorf("marshall error: %v", err)
}
+ sizeBuf := make([]byte, 4)
util.Uint32toBytes(sizeBuf, uint32(len(bytes)))
dst.Write(sizeBuf)
dst.Write(bytes)
if entry.IsDirectory {
- dirCount++
+ atomic.AddUint64(&dirCount, 1)
} else {
- fileCount++
+ atomic.AddUint64(&fileCount, 1)
}
if *verbose {
@@ -118,7 +120,45 @@ func (c *commandFsMetaSave) Do(args []string, commandEnv *CommandEnv, writer io.
})
}
-func doTraverse(ctx context.Context, writer io.Writer, client filer_pb.SeaweedFilerClient, parentPath filer2.FullPath, fn func(parentPath filer2.FullPath, entry *filer_pb.Entry) error) (err error) {
+func doTraverseBFS(ctx context.Context, writer io.Writer, client filer_pb.SeaweedFilerClient,
+ parentPath filer2.FullPath, fn func(parentPath filer2.FullPath, entry *filer_pb.Entry) error) (err error) {
+
+ K := 5
+
+ var jobQueueWg sync.WaitGroup
+ queue := util.NewQueue()
+ jobQueueWg.Add(1)
+ queue.Enqueue(parentPath)
+ var isTerminating bool
+
+ for i := 0; i < K; i++ {
+ go func() {
+ for {
+ if isTerminating {
+ break
+ }
+ t := queue.Dequeue()
+ if t == nil {
+ time.Sleep(329 * time.Millisecond)
+ continue
+ }
+ dir := t.(filer2.FullPath)
+ processErr := processOneDirectory(ctx, writer, client, dir, queue, &jobQueueWg, fn)
+ if processErr != nil {
+ err = processErr
+ }
+ jobQueueWg.Done()
+ }
+ }()
+ }
+ jobQueueWg.Wait()
+ isTerminating = true
+ return
+}
+
+func processOneDirectory(ctx context.Context, writer io.Writer, client filer_pb.SeaweedFilerClient,
+ parentPath filer2.FullPath, queue *util.Queue, jobQueueWg *sync.WaitGroup,
+ fn func(parentPath filer2.FullPath, entry *filer_pb.Entry) error) (err error) {
paginatedCount := -1
startFromFileName := ""
@@ -150,12 +190,10 @@ func doTraverse(ctx context.Context, writer io.Writer, client filer_pb.SeaweedFi
if parentPath == "/" {
subDir = "/" + entry.Name
}
- if err = doTraverse(ctx, writer, client, filer2.FullPath(subDir), fn); err != nil {
- return err
- }
+ jobQueueWg.Add(1)
+ queue.Enqueue(filer2.FullPath(subDir))
}
startFromFileName = entry.Name
-
}
}
diff --git a/weed/util/queue.go b/weed/util/queue.go
new file mode 100644
index 000000000..31d9d1769
--- /dev/null
+++ b/weed/util/queue.go
@@ -0,0 +1,61 @@
+package util
+
+import "sync"
+
+type node struct {
+ data interface{}
+ next *node
+}
+
+type Queue struct {
+ head *node
+ tail *node
+ count int
+ sync.RWMutex
+}
+
+func NewQueue() *Queue {
+ q := &Queue{}
+ return q
+}
+
+func (q *Queue) Len() int {
+ q.RLock()
+ defer q.RUnlock()
+ return q.count
+}
+
+func (q *Queue) Enqueue(item interface{}) {
+ q.Lock()
+ defer q.Unlock()
+
+ n := &node{data: item}
+
+ if q.tail == nil {
+ q.tail = n
+ q.head = n
+ } else {
+ q.tail.next = n
+ q.tail = n
+ }
+ q.count++
+}
+
+func (q *Queue) Dequeue() interface{} {
+ q.Lock()
+ defer q.Unlock()
+
+ if q.head == nil {
+ return nil
+ }
+
+ n := q.head
+ q.head = n.next
+
+ if q.head == nil {
+ q.tail = nil
+ }
+ q.count--
+
+ return n.data
+} \ No newline at end of file