aboutsummaryrefslogtreecommitdiff
path: root/weed/shell/command_fs_meta_save.go
diff options
context:
space:
mode:
authorChris Lu <chris.lu@gmail.com>2019-12-06 22:20:59 -0800
committerChris Lu <chris.lu@gmail.com>2019-12-06 22:20:59 -0800
commitf81d43442b11b0471db00a77ad8bc9b462977474 (patch)
tree1d0ad3566947cd4c9e5ba3a6374490197bdf3c8b /weed/shell/command_fs_meta_save.go
parentf38f90b7ea546373076c69d3b93069ea8fa46755 (diff)
downloadseaweedfs-f81d43442b11b0471db00a77ad8bc9b462977474.tar.xz
seaweedfs-f81d43442b11b0471db00a77ad8bc9b462977474.zip
filer: speed up filer.meta.save by parallelizing
Diffstat (limited to 'weed/shell/command_fs_meta_save.go')
-rw-r--r--weed/shell/command_fs_meta_save.go60
1 files changed, 49 insertions, 11 deletions
diff --git a/weed/shell/command_fs_meta_save.go b/weed/shell/command_fs_meta_save.go
index e710fe297..dd5e9defb 100644
--- a/weed/shell/command_fs_meta_save.go
+++ b/weed/shell/command_fs_meta_save.go
@@ -6,12 +6,15 @@ import (
"fmt"
"io"
"os"
+ "sync"
+ "sync/atomic"
"time"
+ "github.com/golang/protobuf/proto"
+
"github.com/chrislusf/seaweedfs/weed/filer2"
"github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
"github.com/chrislusf/seaweedfs/weed/util"
- "github.com/golang/protobuf/proto"
)
func init() {
@@ -75,9 +78,7 @@ func (c *commandFsMetaSave) Do(args []string, commandEnv *CommandEnv, writer io.
var dirCount, fileCount uint64
- sizeBuf := make([]byte, 4)
-
- err = doTraverse(ctx, writer, client, filer2.FullPath(path), func(parentPath filer2.FullPath, entry *filer_pb.Entry) error {
+ err = doTraverseBFS(ctx, writer, client, filer2.FullPath(path), func(parentPath filer2.FullPath, entry *filer_pb.Entry) error {
protoMessage := &filer_pb.FullEntry{
Dir: string(parentPath),
@@ -89,15 +90,16 @@ func (c *commandFsMetaSave) Do(args []string, commandEnv *CommandEnv, writer io.
return fmt.Errorf("marshall error: %v", err)
}
+ sizeBuf := make([]byte, 4)
util.Uint32toBytes(sizeBuf, uint32(len(bytes)))
dst.Write(sizeBuf)
dst.Write(bytes)
if entry.IsDirectory {
- dirCount++
+ atomic.AddUint64(&dirCount, 1)
} else {
- fileCount++
+ atomic.AddUint64(&fileCount, 1)
}
if *verbose {
@@ -118,7 +120,45 @@ func (c *commandFsMetaSave) Do(args []string, commandEnv *CommandEnv, writer io.
})
}
-func doTraverse(ctx context.Context, writer io.Writer, client filer_pb.SeaweedFilerClient, parentPath filer2.FullPath, fn func(parentPath filer2.FullPath, entry *filer_pb.Entry) error) (err error) {
+func doTraverseBFS(ctx context.Context, writer io.Writer, client filer_pb.SeaweedFilerClient,
+ parentPath filer2.FullPath, fn func(parentPath filer2.FullPath, entry *filer_pb.Entry) error) (err error) {
+
+ K := 5
+
+ var jobQueueWg sync.WaitGroup
+ queue := util.NewQueue()
+ jobQueueWg.Add(1)
+ queue.Enqueue(parentPath)
+ var isTerminating bool
+
+ for i := 0; i < K; i++ {
+ go func() {
+ for {
+ if isTerminating {
+ break
+ }
+ t := queue.Dequeue()
+ if t == nil {
+ time.Sleep(329 * time.Millisecond)
+ continue
+ }
+ dir := t.(filer2.FullPath)
+ processErr := processOneDirectory(ctx, writer, client, dir, queue, &jobQueueWg, fn)
+ if processErr != nil {
+ err = processErr
+ }
+ jobQueueWg.Done()
+ }
+ }()
+ }
+ jobQueueWg.Wait()
+ isTerminating = true
+ return
+}
+
+func processOneDirectory(ctx context.Context, writer io.Writer, client filer_pb.SeaweedFilerClient,
+ parentPath filer2.FullPath, queue *util.Queue, jobQueueWg *sync.WaitGroup,
+ fn func(parentPath filer2.FullPath, entry *filer_pb.Entry) error) (err error) {
paginatedCount := -1
startFromFileName := ""
@@ -150,12 +190,10 @@ func doTraverse(ctx context.Context, writer io.Writer, client filer_pb.SeaweedFi
if parentPath == "/" {
subDir = "/" + entry.Name
}
- if err = doTraverse(ctx, writer, client, filer2.FullPath(subDir), fn); err != nil {
- return err
- }
+ jobQueueWg.Add(1)
+ queue.Enqueue(filer2.FullPath(subDir))
}
startFromFileName = entry.Name
-
}
}