diff options
Diffstat (limited to 'weed/shell/command_fs_meta_save.go')
| -rw-r--r-- | weed/shell/command_fs_meta_save.go | 181 |
1 files changed, 98 insertions, 83 deletions
diff --git a/weed/shell/command_fs_meta_save.go b/weed/shell/command_fs_meta_save.go index e710fe297..b51fdd0f6 100644 --- a/weed/shell/command_fs_meta_save.go +++ b/weed/shell/command_fs_meta_save.go @@ -1,17 +1,19 @@ package shell import ( - "context" "flag" "fmt" "io" "os" + "sync" + "sync/atomic" "time" + "github.com/golang/protobuf/proto" + "github.com/chrislusf/seaweedfs/weed/filer2" "github.com/chrislusf/seaweedfs/weed/pb/filer_pb" "github.com/chrislusf/seaweedfs/weed/util" - "github.com/golang/protobuf/proto" ) func init() { @@ -51,114 +53,127 @@ func (c *commandFsMetaSave) Do(args []string, commandEnv *CommandEnv, writer io. return nil } - filerServer, filerPort, path, err := commandEnv.parseUrl(findInputDirectory(fsMetaSaveCommand.Args())) - if err != nil { - return err + filerServer, filerPort, path, parseErr := commandEnv.parseUrl(findInputDirectory(fsMetaSaveCommand.Args())) + if parseErr != nil { + return parseErr } - ctx := context.Background() + t := time.Now() + fileName := *outputFileName + if fileName == "" { + fileName = fmt.Sprintf("%s-%d-%4d%02d%02d-%02d%02d%02d.meta", + filerServer, filerPort, t.Year(), t.Month(), t.Day(), t.Hour(), t.Minute(), t.Second()) + } - return commandEnv.withFilerClient(ctx, filerServer, filerPort, func(client filer_pb.SeaweedFilerClient) error { + dst, openErr := os.OpenFile(fileName, os.O_WRONLY|os.O_CREATE|os.O_TRUNC, 0644) + if openErr != nil { + return fmt.Errorf("failed to create file %s: %v", fileName, openErr) + } + defer dst.Close() - t := time.Now() - fileName := *outputFileName - if fileName == "" { - fileName = fmt.Sprintf("%s-%d-%4d%02d%02d-%02d%02d%02d.meta", - filerServer, filerPort, t.Year(), t.Month(), t.Day(), t.Hour(), t.Minute(), t.Second()) + var wg sync.WaitGroup + wg.Add(1) + outputChan := make(chan []byte, 1024) + go func() { + sizeBuf := make([]byte, 4) + for b := range outputChan { + util.Uint32toBytes(sizeBuf, uint32(len(b))) + dst.Write(sizeBuf) + dst.Write(b) } + wg.Done() + }() - dst, err := os.OpenFile(fileName, os.O_WRONLY|os.O_CREATE|os.O_TRUNC, 0644) - if err != nil { - return nil - } - defer dst.Close() + var dirCount, fileCount uint64 - var dirCount, fileCount uint64 + err = doTraverseBFS(writer, commandEnv.getFilerClient(filerServer, filerPort), filer2.FullPath(path), func(parentPath filer2.FullPath, entry *filer_pb.Entry) { - sizeBuf := make([]byte, 4) + protoMessage := &filer_pb.FullEntry{ + Dir: string(parentPath), + Entry: entry, + } - err = doTraverse(ctx, writer, client, filer2.FullPath(path), func(parentPath filer2.FullPath, entry *filer_pb.Entry) error { + bytes, err := proto.Marshal(protoMessage) + if err != nil { + fmt.Fprintf(writer, "marshall error: %v\n", err) + return + } - protoMessage := &filer_pb.FullEntry{ - Dir: string(parentPath), - Entry: entry, - } + outputChan <- bytes - bytes, err := proto.Marshal(protoMessage) - if err != nil { - return fmt.Errorf("marshall error: %v", err) - } + if entry.IsDirectory { + atomic.AddUint64(&dirCount, 1) + } else { + atomic.AddUint64(&fileCount, 1) + } - util.Uint32toBytes(sizeBuf, uint32(len(bytes))) + if *verbose { + println(parentPath.Child(entry.Name)) + } - dst.Write(sizeBuf) - dst.Write(bytes) + }) - if entry.IsDirectory { - dirCount++ - } else { - fileCount++ - } + close(outputChan) - if *verbose { - println(parentPath.Child(entry.Name)) - } + wg.Wait() - return nil + if err == nil { + fmt.Fprintf(writer, "total %d directories, %d files\n", dirCount, fileCount) + fmt.Fprintf(writer, "meta data for http://%s:%d%s is saved to %s\n", filerServer, filerPort, path, fileName) + } - }) + return err - if err == nil { - fmt.Fprintf(writer, "\ntotal %d directories, %d files", dirCount, fileCount) - fmt.Fprintf(writer, "\nmeta data for http://%s:%d%s is saved to %s\n", filerServer, filerPort, path, fileName) - } +} +func doTraverseBFS(writer io.Writer, filerClient filer2.FilerClient, parentPath filer2.FullPath, fn func(parentPath filer2.FullPath, entry *filer_pb.Entry)) (err error) { - return err + K := 5 - }) + var jobQueueWg sync.WaitGroup + queue := util.NewQueue() + jobQueueWg.Add(1) + queue.Enqueue(parentPath) + var isTerminating bool + for i := 0; i < K; i++ { + go func() { + for { + if isTerminating { + break + } + t := queue.Dequeue() + if t == nil { + time.Sleep(329 * time.Millisecond) + continue + } + dir := t.(filer2.FullPath) + processErr := processOneDirectory(writer, filerClient, dir, queue, &jobQueueWg, fn) + if processErr != nil { + err = processErr + } + jobQueueWg.Done() + } + }() + } + jobQueueWg.Wait() + isTerminating = true + return } -func doTraverse(ctx context.Context, writer io.Writer, client filer_pb.SeaweedFilerClient, parentPath filer2.FullPath, fn func(parentPath filer2.FullPath, entry *filer_pb.Entry) error) (err error) { - - paginatedCount := -1 - startFromFileName := "" - paginateSize := 1000 - - for paginatedCount == -1 || paginatedCount == paginateSize { - resp, listErr := client.ListEntries(ctx, &filer_pb.ListEntriesRequest{ - Directory: string(parentPath), - Prefix: "", - StartFromFileName: startFromFileName, - InclusiveStartFrom: false, - Limit: uint32(paginateSize), - }) - if listErr != nil { - err = listErr - return - } - paginatedCount = len(resp.Entries) +func processOneDirectory(writer io.Writer, filerClient filer2.FilerClient, parentPath filer2.FullPath, queue *util.Queue, jobQueueWg *sync.WaitGroup, fn func(parentPath filer2.FullPath, entry *filer_pb.Entry)) (err error) { - for _, entry := range resp.Entries { + return filer2.ReadDirAllEntries(filerClient, parentPath, "", func(entry *filer_pb.Entry, isLast bool) { - if err = fn(parentPath, entry); err != nil { - return err - } + fn(parentPath, entry) - if entry.IsDirectory { - subDir := fmt.Sprintf("%s/%s", parentPath, entry.Name) - if parentPath == "/" { - subDir = "/" + entry.Name - } - if err = doTraverse(ctx, writer, client, filer2.FullPath(subDir), fn); err != nil { - return err - } + if entry.IsDirectory { + subDir := fmt.Sprintf("%s/%s", parentPath, entry.Name) + if parentPath == "/" { + subDir = "/" + entry.Name } - startFromFileName = entry.Name - + jobQueueWg.Add(1) + queue.Enqueue(filer2.FullPath(subDir)) } - } - - return + }) } |
