diff options
| author | Konstantin Lebedev <9497591+kmlebedev@users.noreply.github.com> | 2023-12-29 00:54:37 +0500 |
|---|---|---|
| committer | GitHub <noreply@github.com> | 2023-12-28 11:54:37 -0800 |
| commit | 183352c796d0f81b8ff8072f18b74f615133c1fe (patch) | |
| tree | c4789ce92d4d3f88e2f2c941770b104dee1e7349 | |
| parent | 0b8f9de4ecf6563126260ec8a5bddf2b144a7024 (diff) | |
| download | seaweedfs-183352c796d0f81b8ff8072f18b74f615133c1fe.tar.xz seaweedfs-183352c796d0f81b8ff8072f18b74f615133c1fe.zip | |
shell meta load add concurrency (#4529)
* fix: increase speed cmd fs meta load
* fix: add wg
| -rw-r--r-- | weed/shell/command_fs_meta_load.go | 36 |
1 files changed, 27 insertions, 9 deletions
diff --git a/weed/shell/command_fs_meta_load.go b/weed/shell/command_fs_meta_load.go index 0cbdddb49..a2ae9401d 100644 --- a/weed/shell/command_fs_meta_load.go +++ b/weed/shell/command_fs_meta_load.go @@ -6,6 +6,7 @@ import ( "io" "os" "strings" + "sync" "time" "google.golang.org/protobuf/proto" @@ -47,6 +48,7 @@ func (c *commandFsMetaLoad) Do(args []string, commandEnv *CommandEnv, writer io. metaLoadCommand := flag.NewFlagSet(c.Name(), flag.ContinueOnError) c.dirPrefix = metaLoadCommand.String("dirPrefix", "", "load entries only with directories matching prefix") + concurrency := metaLoadCommand.Int("concurrency", 1, "number of parallel meta load to filer") verbose := metaLoadCommand.Bool("v", true, "verbose mode") if err = metaLoadCommand.Parse(args[0 : len(args)-1]); err != nil { return nil @@ -64,6 +66,9 @@ func (c *commandFsMetaLoad) Do(args []string, commandEnv *CommandEnv, writer io. err = commandEnv.WithFilerClient(false, func(client filer_pb.SeaweedFilerClient) error { sizeBuf := make([]byte, 4) + waitChan := make(chan struct{}, *concurrency) + defer close(waitChan) + var wg sync.WaitGroup for { if n, err := dst.Read(sizeBuf); n != 4 { @@ -105,21 +110,34 @@ func (c *commandFsMetaLoad) Do(args []string, commandEnv *CommandEnv, writer io. } fullEntry.Entry.Name = strings.ReplaceAll(fullEntry.Entry.Name, "/", "x") - if err := filer_pb.CreateEntry(client, &filer_pb.CreateEntryRequest{ - Directory: fullEntry.Dir, - Entry: fullEntry.Entry, - }); err != nil { - return err - } - if fullEntry.Entry.IsDirectory { + wg.Wait() + if errEntry := filer_pb.CreateEntry(client, &filer_pb.CreateEntryRequest{ + Directory: fullEntry.Dir, + Entry: fullEntry.Entry, + }); errEntry != nil { + return errEntry + } dirCount++ } else { + wg.Add(1) + waitChan <- struct{}{} + go func(entry *filer_pb.FullEntry) { + if errEntry := filer_pb.CreateEntry(client, &filer_pb.CreateEntryRequest{ + Directory: entry.Dir, + Entry: entry.Entry, + }); errEntry != nil { + err = errEntry + } + defer wg.Done() + <-waitChan + }(fullEntry) + if err != nil { + return err + } fileCount++ } - } - }) if err == nil { |
