diff options
| author | Chris Lu <chris.lu@gmail.com> | 2019-12-06 22:20:59 -0800 |
|---|---|---|
| committer | Chris Lu <chris.lu@gmail.com> | 2019-12-06 22:20:59 -0800 |
| commit | f81d43442b11b0471db00a77ad8bc9b462977474 (patch) | |
| tree | 1d0ad3566947cd4c9e5ba3a6374490197bdf3c8b | |
| parent | f38f90b7ea546373076c69d3b93069ea8fa46755 (diff) | |
| download | seaweedfs-f81d43442b11b0471db00a77ad8bc9b462977474.tar.xz seaweedfs-f81d43442b11b0471db00a77ad8bc9b462977474.zip | |
filer: speed up filer.meta.save by parallelizing
| -rw-r--r-- | weed/shell/command_fs_meta_notify.go | 2 | ||||
| -rw-r--r-- | weed/shell/command_fs_meta_save.go | 60 | ||||
| -rw-r--r-- | weed/util/queue.go | 61 |
3 files changed, 111 insertions, 12 deletions
diff --git a/weed/shell/command_fs_meta_notify.go b/weed/shell/command_fs_meta_notify.go index 13b272fbf..4fe0e45a9 100644 --- a/weed/shell/command_fs_meta_notify.go +++ b/weed/shell/command_fs_meta_notify.go @@ -50,7 +50,7 @@ func (c *commandFsMetaNotify) Do(args []string, commandEnv *CommandEnv, writer i var dirCount, fileCount uint64 - err = doTraverse(ctx, writer, client, filer2.FullPath(path), func(parentPath filer2.FullPath, entry *filer_pb.Entry) error { + err = doTraverseBFS(ctx, writer, client, filer2.FullPath(path), func(parentPath filer2.FullPath, entry *filer_pb.Entry) error { if entry.IsDirectory { dirCount++ diff --git a/weed/shell/command_fs_meta_save.go b/weed/shell/command_fs_meta_save.go index e710fe297..dd5e9defb 100644 --- a/weed/shell/command_fs_meta_save.go +++ b/weed/shell/command_fs_meta_save.go @@ -6,12 +6,15 @@ import ( "fmt" "io" "os" + "sync" + "sync/atomic" "time" + "github.com/golang/protobuf/proto" + "github.com/chrislusf/seaweedfs/weed/filer2" "github.com/chrislusf/seaweedfs/weed/pb/filer_pb" "github.com/chrislusf/seaweedfs/weed/util" - "github.com/golang/protobuf/proto" ) func init() { @@ -75,9 +78,7 @@ func (c *commandFsMetaSave) Do(args []string, commandEnv *CommandEnv, writer io. var dirCount, fileCount uint64 - sizeBuf := make([]byte, 4) - - err = doTraverse(ctx, writer, client, filer2.FullPath(path), func(parentPath filer2.FullPath, entry *filer_pb.Entry) error { + err = doTraverseBFS(ctx, writer, client, filer2.FullPath(path), func(parentPath filer2.FullPath, entry *filer_pb.Entry) error { protoMessage := &filer_pb.FullEntry{ Dir: string(parentPath), @@ -89,15 +90,16 @@ func (c *commandFsMetaSave) Do(args []string, commandEnv *CommandEnv, writer io. return fmt.Errorf("marshall error: %v", err) } + sizeBuf := make([]byte, 4) util.Uint32toBytes(sizeBuf, uint32(len(bytes))) dst.Write(sizeBuf) dst.Write(bytes) if entry.IsDirectory { - dirCount++ + atomic.AddUint64(&dirCount, 1) } else { - fileCount++ + atomic.AddUint64(&fileCount, 1) } if *verbose { @@ -118,7 +120,45 @@ func (c *commandFsMetaSave) Do(args []string, commandEnv *CommandEnv, writer io. }) } -func doTraverse(ctx context.Context, writer io.Writer, client filer_pb.SeaweedFilerClient, parentPath filer2.FullPath, fn func(parentPath filer2.FullPath, entry *filer_pb.Entry) error) (err error) { +func doTraverseBFS(ctx context.Context, writer io.Writer, client filer_pb.SeaweedFilerClient, + parentPath filer2.FullPath, fn func(parentPath filer2.FullPath, entry *filer_pb.Entry) error) (err error) { + + K := 5 + + var jobQueueWg sync.WaitGroup + queue := util.NewQueue() + jobQueueWg.Add(1) + queue.Enqueue(parentPath) + var isTerminating bool + + for i := 0; i < K; i++ { + go func() { + for { + if isTerminating { + break + } + t := queue.Dequeue() + if t == nil { + time.Sleep(329 * time.Millisecond) + continue + } + dir := t.(filer2.FullPath) + processErr := processOneDirectory(ctx, writer, client, dir, queue, &jobQueueWg, fn) + if processErr != nil { + err = processErr + } + jobQueueWg.Done() + } + }() + } + jobQueueWg.Wait() + isTerminating = true + return +} + +func processOneDirectory(ctx context.Context, writer io.Writer, client filer_pb.SeaweedFilerClient, + parentPath filer2.FullPath, queue *util.Queue, jobQueueWg *sync.WaitGroup, + fn func(parentPath filer2.FullPath, entry *filer_pb.Entry) error) (err error) { paginatedCount := -1 startFromFileName := "" @@ -150,12 +190,10 @@ func doTraverse(ctx context.Context, writer io.Writer, client filer_pb.SeaweedFi if parentPath == "/" { subDir = "/" + entry.Name } - if err = doTraverse(ctx, writer, client, filer2.FullPath(subDir), fn); err != nil { - return err - } + jobQueueWg.Add(1) + queue.Enqueue(filer2.FullPath(subDir)) } startFromFileName = entry.Name - } } diff --git a/weed/util/queue.go b/weed/util/queue.go new file mode 100644 index 000000000..31d9d1769 --- /dev/null +++ b/weed/util/queue.go @@ -0,0 +1,61 @@ +package util + +import "sync" + +type node struct { + data interface{} + next *node +} + +type Queue struct { + head *node + tail *node + count int + sync.RWMutex +} + +func NewQueue() *Queue { + q := &Queue{} + return q +} + +func (q *Queue) Len() int { + q.RLock() + defer q.RUnlock() + return q.count +} + +func (q *Queue) Enqueue(item interface{}) { + q.Lock() + defer q.Unlock() + + n := &node{data: item} + + if q.tail == nil { + q.tail = n + q.head = n + } else { + q.tail.next = n + q.tail = n + } + q.count++ +} + +func (q *Queue) Dequeue() interface{} { + q.Lock() + defer q.Unlock() + + if q.head == nil { + return nil + } + + n := q.head + q.head = n.next + + if q.head == nil { + q.tail = nil + } + q.count-- + + return n.data +}
\ No newline at end of file |
