diff options
| author | Chris Lu <chris.lu@gmail.com> | 2019-12-13 00:22:37 -0800 |
|---|---|---|
| committer | Chris Lu <chris.lu@gmail.com> | 2019-12-13 00:22:37 -0800 |
| commit | 0fa1269bc77abe30f4d108a88a97e29e1bca3124 (patch) | |
| tree | 5cc1c65ea9119dc23a5a4ded21e775d62c1bf9f3 /weed/shell/command_fs_meta_save.go | |
| parent | d0b423bbc07368bc53a08aec47618924851725a1 (diff) | |
| download | seaweedfs-0fa1269bc77abe30f4d108a88a97e29e1bca3124.tar.xz seaweedfs-0fa1269bc77abe30f4d108a88a97e29e1bca3124.zip | |
filer: streaming file listing
Diffstat (limited to 'weed/shell/command_fs_meta_save.go')
| -rw-r--r-- | weed/shell/command_fs_meta_save.go | 144 |
1 files changed, 57 insertions, 87 deletions
diff --git a/weed/shell/command_fs_meta_save.go b/weed/shell/command_fs_meta_save.go index dd5e9defb..4ff00c64b 100644 --- a/weed/shell/command_fs_meta_save.go +++ b/weed/shell/command_fs_meta_save.go @@ -54,74 +54,69 @@ func (c *commandFsMetaSave) Do(args []string, commandEnv *CommandEnv, writer io. return nil } - filerServer, filerPort, path, err := commandEnv.parseUrl(findInputDirectory(fsMetaSaveCommand.Args())) - if err != nil { - return err + filerServer, filerPort, path, parseErr := commandEnv.parseUrl(findInputDirectory(fsMetaSaveCommand.Args())) + if parseErr != nil { + return parseErr } ctx := context.Background() - return commandEnv.withFilerClient(ctx, filerServer, filerPort, func(client filer_pb.SeaweedFilerClient) error { - - t := time.Now() - fileName := *outputFileName - if fileName == "" { - fileName = fmt.Sprintf("%s-%d-%4d%02d%02d-%02d%02d%02d.meta", - filerServer, filerPort, t.Year(), t.Month(), t.Day(), t.Hour(), t.Minute(), t.Second()) - } - - dst, err := os.OpenFile(fileName, os.O_WRONLY|os.O_CREATE|os.O_TRUNC, 0644) - if err != nil { - return nil - } - defer dst.Close() - - var dirCount, fileCount uint64 - - err = doTraverseBFS(ctx, writer, client, filer2.FullPath(path), func(parentPath filer2.FullPath, entry *filer_pb.Entry) error { - - protoMessage := &filer_pb.FullEntry{ - Dir: string(parentPath), - Entry: entry, - } + t := time.Now() + fileName := *outputFileName + if fileName == "" { + fileName = fmt.Sprintf("%s-%d-%4d%02d%02d-%02d%02d%02d.meta", + filerServer, filerPort, t.Year(), t.Month(), t.Day(), t.Hour(), t.Minute(), t.Second()) + } - bytes, err := proto.Marshal(protoMessage) - if err != nil { - return fmt.Errorf("marshall error: %v", err) - } + dst, openErr := os.OpenFile(fileName, os.O_WRONLY|os.O_CREATE|os.O_TRUNC, 0644) + if openErr != nil { + return fmt.Errorf("failed to create file %s: %v", fileName, openErr) + } + defer dst.Close() - sizeBuf := make([]byte, 4) - util.Uint32toBytes(sizeBuf, uint32(len(bytes))) + var dirCount, fileCount uint64 - dst.Write(sizeBuf) - dst.Write(bytes) + err = doTraverseBFS(ctx, writer, commandEnv.getFilerClient(filerServer, filerPort), filer2.FullPath(path), func(parentPath filer2.FullPath, entry *filer_pb.Entry) { - if entry.IsDirectory { - atomic.AddUint64(&dirCount, 1) - } else { - atomic.AddUint64(&fileCount, 1) - } + protoMessage := &filer_pb.FullEntry{ + Dir: string(parentPath), + Entry: entry, + } - if *verbose { - println(parentPath.Child(entry.Name)) - } + bytes, err := proto.Marshal(protoMessage) + if err != nil { + fmt.Fprintf(writer, "marshall error: %v\n", err) + return + } - return nil + sizeBuf := make([]byte, 4) + util.Uint32toBytes(sizeBuf, uint32(len(bytes))) - }) + dst.Write(sizeBuf) + dst.Write(bytes) - if err == nil { - fmt.Fprintf(writer, "\ntotal %d directories, %d files", dirCount, fileCount) - fmt.Fprintf(writer, "\nmeta data for http://%s:%d%s is saved to %s\n", filerServer, filerPort, path, fileName) + if entry.IsDirectory { + atomic.AddUint64(&dirCount, 1) + } else { + atomic.AddUint64(&fileCount, 1) } - return err + if *verbose { + println(parentPath.Child(entry.Name)) + } }) + if err == nil { + fmt.Fprintf(writer, "\ntotal %d directories, %d files", dirCount, fileCount) + fmt.Fprintf(writer, "\nmeta data for http://%s:%d%s is saved to %s\n", filerServer, filerPort, path, fileName) + } + + return err + } -func doTraverseBFS(ctx context.Context, writer io.Writer, client filer_pb.SeaweedFilerClient, - parentPath filer2.FullPath, fn func(parentPath filer2.FullPath, entry *filer_pb.Entry) error) (err error) { +func doTraverseBFS(ctx context.Context, writer io.Writer, filerClient filer2.FilerClient, + parentPath filer2.FullPath, fn func(parentPath filer2.FullPath, entry *filer_pb.Entry)) (err error) { K := 5 @@ -143,7 +138,7 @@ func doTraverseBFS(ctx context.Context, writer io.Writer, client filer_pb.Seawee continue } dir := t.(filer2.FullPath) - processErr := processOneDirectory(ctx, writer, client, dir, queue, &jobQueueWg, fn) + processErr := processOneDirectory(ctx, writer, filerClient, dir, queue, &jobQueueWg, fn) if processErr != nil { err = processErr } @@ -156,47 +151,22 @@ func doTraverseBFS(ctx context.Context, writer io.Writer, client filer_pb.Seawee return } -func processOneDirectory(ctx context.Context, writer io.Writer, client filer_pb.SeaweedFilerClient, +func processOneDirectory(ctx context.Context, writer io.Writer, filerClient filer2.FilerClient, parentPath filer2.FullPath, queue *util.Queue, jobQueueWg *sync.WaitGroup, - fn func(parentPath filer2.FullPath, entry *filer_pb.Entry) error) (err error) { - - paginatedCount := -1 - startFromFileName := "" - paginateSize := 1000 - - for paginatedCount == -1 || paginatedCount == paginateSize { - resp, listErr := client.ListEntries(ctx, &filer_pb.ListEntriesRequest{ - Directory: string(parentPath), - Prefix: "", - StartFromFileName: startFromFileName, - InclusiveStartFrom: false, - Limit: uint32(paginateSize), - }) - if listErr != nil { - err = listErr - return - } + fn func(parentPath filer2.FullPath, entry *filer_pb.Entry)) (err error) { - paginatedCount = len(resp.Entries) + return filer2.ReadDirAllEntries(ctx, filerClient, string(parentPath), "", func(entry *filer_pb.Entry, isLast bool) { - for _, entry := range resp.Entries { + fn(parentPath, entry) - if err = fn(parentPath, entry); err != nil { - return err + if entry.IsDirectory { + subDir := fmt.Sprintf("%s/%s", parentPath, entry.Name) + if parentPath == "/" { + subDir = "/" + entry.Name } - - if entry.IsDirectory { - subDir := fmt.Sprintf("%s/%s", parentPath, entry.Name) - if parentPath == "/" { - subDir = "/" + entry.Name - } - jobQueueWg.Add(1) - queue.Enqueue(filer2.FullPath(subDir)) - } - startFromFileName = entry.Name + jobQueueWg.Add(1) + queue.Enqueue(filer2.FullPath(subDir)) } - } - - return + }) } |
