aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorKonstantin Lebedev <9497591+kmlebedev@users.noreply.github.com>2023-12-29 00:54:37 +0500
committerGitHub <noreply@github.com>2023-12-28 11:54:37 -0800
commit183352c796d0f81b8ff8072f18b74f615133c1fe (patch)
treec4789ce92d4d3f88e2f2c941770b104dee1e7349
parent0b8f9de4ecf6563126260ec8a5bddf2b144a7024 (diff)
downloadseaweedfs-183352c796d0f81b8ff8072f18b74f615133c1fe.tar.xz
seaweedfs-183352c796d0f81b8ff8072f18b74f615133c1fe.zip
shell meta load add concurrency (#4529)
* fix: increase speed cmd fs meta load * fix: add wg
-rw-r--r--weed/shell/command_fs_meta_load.go36
1 files changed, 27 insertions, 9 deletions
diff --git a/weed/shell/command_fs_meta_load.go b/weed/shell/command_fs_meta_load.go
index 0cbdddb49..a2ae9401d 100644
--- a/weed/shell/command_fs_meta_load.go
+++ b/weed/shell/command_fs_meta_load.go
@@ -6,6 +6,7 @@ import (
"io"
"os"
"strings"
+ "sync"
"time"
"google.golang.org/protobuf/proto"
@@ -47,6 +48,7 @@ func (c *commandFsMetaLoad) Do(args []string, commandEnv *CommandEnv, writer io.
metaLoadCommand := flag.NewFlagSet(c.Name(), flag.ContinueOnError)
c.dirPrefix = metaLoadCommand.String("dirPrefix", "", "load entries only with directories matching prefix")
+ concurrency := metaLoadCommand.Int("concurrency", 1, "number of parallel meta load to filer")
verbose := metaLoadCommand.Bool("v", true, "verbose mode")
if err = metaLoadCommand.Parse(args[0 : len(args)-1]); err != nil {
return nil
@@ -64,6 +66,9 @@ func (c *commandFsMetaLoad) Do(args []string, commandEnv *CommandEnv, writer io.
err = commandEnv.WithFilerClient(false, func(client filer_pb.SeaweedFilerClient) error {
sizeBuf := make([]byte, 4)
+ waitChan := make(chan struct{}, *concurrency)
+ defer close(waitChan)
+ var wg sync.WaitGroup
for {
if n, err := dst.Read(sizeBuf); n != 4 {
@@ -105,21 +110,34 @@ func (c *commandFsMetaLoad) Do(args []string, commandEnv *CommandEnv, writer io.
}
fullEntry.Entry.Name = strings.ReplaceAll(fullEntry.Entry.Name, "/", "x")
- if err := filer_pb.CreateEntry(client, &filer_pb.CreateEntryRequest{
- Directory: fullEntry.Dir,
- Entry: fullEntry.Entry,
- }); err != nil {
- return err
- }
-
if fullEntry.Entry.IsDirectory {
+ wg.Wait()
+ if errEntry := filer_pb.CreateEntry(client, &filer_pb.CreateEntryRequest{
+ Directory: fullEntry.Dir,
+ Entry: fullEntry.Entry,
+ }); errEntry != nil {
+ return errEntry
+ }
dirCount++
} else {
+ wg.Add(1)
+ waitChan <- struct{}{}
+ go func(entry *filer_pb.FullEntry) {
+ if errEntry := filer_pb.CreateEntry(client, &filer_pb.CreateEntryRequest{
+ Directory: entry.Dir,
+ Entry: entry.Entry,
+ }); errEntry != nil {
+ err = errEntry
+ }
+ defer wg.Done()
+ <-waitChan
+ }(fullEntry)
+ if err != nil {
+ return err
+ }
fileCount++
}
-
}
-
})
if err == nil {