diff options
| author | binbinshi <javabinbin@126.com> | 2020-02-05 16:56:23 +0800 |
|---|---|---|
| committer | GitHub <noreply@github.com> | 2020-02-05 16:56:23 +0800 |
| commit | d892cad15d748327c2b7c649f6398ff35d8dce0b (patch) | |
| tree | 29cb8adae01d9f4eaeabb02996d162700da2de1a /weed/shell/command_fs_meta_save.go | |
| parent | d4f755347e4874cf0a2fd13480580f348b86a465 (diff) | |
| parent | 8d94564f4152cd890d5896a3dedf5e7589c5023e (diff) | |
| download | seaweedfs-d892cad15d748327c2b7c649f6398ff35d8dce0b.tar.xz seaweedfs-d892cad15d748327c2b7c649f6398ff35d8dce0b.zip | |
Merge pull request #1 from chrislusf/master
update from chrisluf
Diffstat (limited to 'weed/shell/command_fs_meta_save.go')
| -rw-r--r-- | weed/shell/command_fs_meta_save.go | 187 |
1 files changed, 104 insertions, 83 deletions
diff --git a/weed/shell/command_fs_meta_save.go b/weed/shell/command_fs_meta_save.go index e710fe297..178c826d5 100644 --- a/weed/shell/command_fs_meta_save.go +++ b/weed/shell/command_fs_meta_save.go @@ -6,12 +6,15 @@ import ( "fmt" "io" "os" + "sync" + "sync/atomic" "time" + "github.com/golang/protobuf/proto" + "github.com/chrislusf/seaweedfs/weed/filer2" "github.com/chrislusf/seaweedfs/weed/pb/filer_pb" "github.com/chrislusf/seaweedfs/weed/util" - "github.com/golang/protobuf/proto" ) func init() { @@ -51,114 +54,132 @@ func (c *commandFsMetaSave) Do(args []string, commandEnv *CommandEnv, writer io. return nil } - filerServer, filerPort, path, err := commandEnv.parseUrl(findInputDirectory(fsMetaSaveCommand.Args())) - if err != nil { - return err + filerServer, filerPort, path, parseErr := commandEnv.parseUrl(findInputDirectory(fsMetaSaveCommand.Args())) + if parseErr != nil { + return parseErr } ctx := context.Background() - return commandEnv.withFilerClient(ctx, filerServer, filerPort, func(client filer_pb.SeaweedFilerClient) error { - - t := time.Now() - fileName := *outputFileName - if fileName == "" { - fileName = fmt.Sprintf("%s-%d-%4d%02d%02d-%02d%02d%02d.meta", - filerServer, filerPort, t.Year(), t.Month(), t.Day(), t.Hour(), t.Minute(), t.Second()) - } - - dst, err := os.OpenFile(fileName, os.O_WRONLY|os.O_CREATE|os.O_TRUNC, 0644) - if err != nil { - return nil - } - defer dst.Close() + t := time.Now() + fileName := *outputFileName + if fileName == "" { + fileName = fmt.Sprintf("%s-%d-%4d%02d%02d-%02d%02d%02d.meta", + filerServer, filerPort, t.Year(), t.Month(), t.Day(), t.Hour(), t.Minute(), t.Second()) + } - var dirCount, fileCount uint64 + dst, openErr := os.OpenFile(fileName, os.O_WRONLY|os.O_CREATE|os.O_TRUNC, 0644) + if openErr != nil { + return fmt.Errorf("failed to create file %s: %v", fileName, openErr) + } + defer dst.Close() + var wg sync.WaitGroup + wg.Add(1) + outputChan := make(chan []byte, 1024) + go func() { sizeBuf := make([]byte, 4) - - err = doTraverse(ctx, writer, client, filer2.FullPath(path), func(parentPath filer2.FullPath, entry *filer_pb.Entry) error { - - protoMessage := &filer_pb.FullEntry{ - Dir: string(parentPath), - Entry: entry, - } - - bytes, err := proto.Marshal(protoMessage) - if err != nil { - return fmt.Errorf("marshall error: %v", err) - } - - util.Uint32toBytes(sizeBuf, uint32(len(bytes))) - + for b := range outputChan { + util.Uint32toBytes(sizeBuf, uint32(len(b))) dst.Write(sizeBuf) - dst.Write(bytes) + dst.Write(b) + } + wg.Done() + }() - if entry.IsDirectory { - dirCount++ - } else { - fileCount++ - } + var dirCount, fileCount uint64 - if *verbose { - println(parentPath.Child(entry.Name)) - } + err = doTraverseBFS(ctx, writer, commandEnv.getFilerClient(filerServer, filerPort), filer2.FullPath(path), func(parentPath filer2.FullPath, entry *filer_pb.Entry) { + + protoMessage := &filer_pb.FullEntry{ + Dir: string(parentPath), + Entry: entry, + } - return nil + bytes, err := proto.Marshal(protoMessage) + if err != nil { + fmt.Fprintf(writer, "marshall error: %v\n", err) + return + } - }) + outputChan <- bytes - if err == nil { - fmt.Fprintf(writer, "\ntotal %d directories, %d files", dirCount, fileCount) - fmt.Fprintf(writer, "\nmeta data for http://%s:%d%s is saved to %s\n", filerServer, filerPort, path, fileName) + if entry.IsDirectory { + atomic.AddUint64(&dirCount, 1) + } else { + atomic.AddUint64(&fileCount, 1) } - return err + if *verbose { + println(parentPath.Child(entry.Name)) + } }) -} -func doTraverse(ctx context.Context, writer io.Writer, client filer_pb.SeaweedFilerClient, parentPath filer2.FullPath, fn func(parentPath filer2.FullPath, entry *filer_pb.Entry) error) (err error) { - - paginatedCount := -1 - startFromFileName := "" - paginateSize := 1000 - - for paginatedCount == -1 || paginatedCount == paginateSize { - resp, listErr := client.ListEntries(ctx, &filer_pb.ListEntriesRequest{ - Directory: string(parentPath), - Prefix: "", - StartFromFileName: startFromFileName, - InclusiveStartFrom: false, - Limit: uint32(paginateSize), - }) - if listErr != nil { - err = listErr - return - } + close(outputChan) - paginatedCount = len(resp.Entries) + wg.Wait() - for _, entry := range resp.Entries { + if err == nil { + fmt.Fprintf(writer, "total %d directories, %d files\n", dirCount, fileCount) + fmt.Fprintf(writer, "meta data for http://%s:%d%s is saved to %s\n", filerServer, filerPort, path, fileName) + } - if err = fn(parentPath, entry); err != nil { - return err - } + return err - if entry.IsDirectory { - subDir := fmt.Sprintf("%s/%s", parentPath, entry.Name) - if parentPath == "/" { - subDir = "/" + entry.Name +} +func doTraverseBFS(ctx context.Context, writer io.Writer, filerClient filer2.FilerClient, + parentPath filer2.FullPath, fn func(parentPath filer2.FullPath, entry *filer_pb.Entry)) (err error) { + + K := 5 + + var jobQueueWg sync.WaitGroup + queue := util.NewQueue() + jobQueueWg.Add(1) + queue.Enqueue(parentPath) + var isTerminating bool + + for i := 0; i < K; i++ { + go func() { + for { + if isTerminating { + break + } + t := queue.Dequeue() + if t == nil { + time.Sleep(329 * time.Millisecond) + continue } - if err = doTraverse(ctx, writer, client, filer2.FullPath(subDir), fn); err != nil { - return err + dir := t.(filer2.FullPath) + processErr := processOneDirectory(ctx, writer, filerClient, dir, queue, &jobQueueWg, fn) + if processErr != nil { + err = processErr } + jobQueueWg.Done() } - startFromFileName = entry.Name - - } + }() } - + jobQueueWg.Wait() + isTerminating = true return +} + +func processOneDirectory(ctx context.Context, writer io.Writer, filerClient filer2.FilerClient, + parentPath filer2.FullPath, queue *util.Queue, jobQueueWg *sync.WaitGroup, + fn func(parentPath filer2.FullPath, entry *filer_pb.Entry)) (err error) { + + return filer2.ReadDirAllEntries(ctx, filerClient, parentPath, "", func(entry *filer_pb.Entry, isLast bool) { + + fn(parentPath, entry) + + if entry.IsDirectory { + subDir := fmt.Sprintf("%s/%s", parentPath, entry.Name) + if parentPath == "/" { + subDir = "/" + entry.Name + } + jobQueueWg.Add(1) + queue.Enqueue(filer2.FullPath(subDir)) + } + }) } |
