diff options
Diffstat (limited to 'weed/shell/command_fs_meta_save.go')
| -rw-r--r-- | weed/shell/command_fs_meta_save.go | 131 |
1 files changed, 52 insertions, 79 deletions
diff --git a/weed/shell/command_fs_meta_save.go b/weed/shell/command_fs_meta_save.go index ed070350f..37d94fe42 100644 --- a/weed/shell/command_fs_meta_save.go +++ b/weed/shell/command_fs_meta_save.go @@ -1,18 +1,18 @@ package shell import ( - "context" "flag" "fmt" "io" "os" + "path/filepath" + "strings" "sync" "sync/atomic" "time" "github.com/golang/protobuf/proto" - "github.com/chrislusf/seaweedfs/weed/filer2" "github.com/chrislusf/seaweedfs/weed/pb/filer_pb" "github.com/chrislusf/seaweedfs/weed/util" ) @@ -40,8 +40,6 @@ func (c *commandFsMetaSave) Help() string { The meta data will be saved into a local <filer_host>-<port>-<time>.meta file. These meta data can be later loaded by fs.meta.load command, - This assumes there are no deletions, so this is different from taking a snapshot. - ` } @@ -50,22 +48,22 @@ func (c *commandFsMetaSave) Do(args []string, commandEnv *CommandEnv, writer io. fsMetaSaveCommand := flag.NewFlagSet(c.Name(), flag.ContinueOnError) verbose := fsMetaSaveCommand.Bool("v", false, "print out each processed files") outputFileName := fsMetaSaveCommand.String("o", "", "output the meta data to this file") + isObfuscate := fsMetaSaveCommand.Bool("obfuscate", false, "obfuscate the file names") + // chunksFileName := fsMetaSaveCommand.String("chunks", "", "output all the chunks to this file") if err = fsMetaSaveCommand.Parse(args); err != nil { return nil } - filerServer, filerPort, path, parseErr := commandEnv.parseUrl(findInputDirectory(fsMetaSaveCommand.Args())) + path, parseErr := commandEnv.parseUrl(findInputDirectory(fsMetaSaveCommand.Args())) if parseErr != nil { return parseErr } - ctx := context.Background() - - t := time.Now() fileName := *outputFileName if fileName == "" { + t := time.Now() fileName = fmt.Sprintf("%s-%d-%4d%02d%02d-%02d%02d%02d.meta", - filerServer, filerPort, t.Year(), t.Month(), t.Day(), t.Hour(), t.Minute(), t.Second()) + commandEnv.option.FilerHost, commandEnv.option.FilerPort, t.Year(), t.Month(), t.Day(), t.Hour(), t.Minute(), t.Second()) } dst, openErr := os.OpenFile(fileName, os.O_WRONLY|os.O_CREATE|os.O_TRUNC, 0644) @@ -74,43 +72,76 @@ func (c *commandFsMetaSave) Do(args []string, commandEnv *CommandEnv, writer io. } defer dst.Close() - var wg sync.WaitGroup - wg.Add(1) - outputChan := make(chan []byte, 1024) - go func() { + var cipherKey util.CipherKey + if *isObfuscate { + cipherKey = util.GenCipherKey() + } + + err = doTraverseBfsAndSaving(commandEnv, writer, path, *verbose, func(outputChan chan interface{}) { sizeBuf := make([]byte, 4) - for b := range outputChan { + for item := range outputChan { + b := item.([]byte) util.Uint32toBytes(sizeBuf, uint32(len(b))) dst.Write(sizeBuf) dst.Write(b) } + }, func(entry *filer_pb.FullEntry, outputChan chan interface{}) (err error) { + if !entry.Entry.IsDirectory { + ext := filepath.Ext(entry.Entry.Name) + if encrypted, encErr := util.Encrypt([]byte(entry.Entry.Name), cipherKey); encErr == nil { + entry.Entry.Name = util.Base64Encode(encrypted)[:len(entry.Entry.Name)] + ext + entry.Entry.Name = strings.ReplaceAll(entry.Entry.Name, "/", "x") + } + } + bytes, err := proto.Marshal(entry) + if err != nil { + fmt.Fprintf(writer, "marshall error: %v\n", err) + return + } + + outputChan <- bytes + return nil + }) + + if err == nil { + fmt.Fprintf(writer, "meta data for http://%s:%d%s is saved to %s\n", commandEnv.option.FilerHost, commandEnv.option.FilerPort, path, fileName) + } + + return err + +} + +func doTraverseBfsAndSaving(filerClient filer_pb.FilerClient, writer io.Writer, path string, verbose bool, saveFn func(outputChan chan interface{}), genFn func(entry *filer_pb.FullEntry, outputChan chan interface{}) error) error { + + var wg sync.WaitGroup + wg.Add(1) + outputChan := make(chan interface{}, 1024) + go func() { + saveFn(outputChan) wg.Done() }() var dirCount, fileCount uint64 - err = doTraverseBFS(ctx, writer, commandEnv.getFilerClient(filerServer, filerPort), filer2.FullPath(path), func(parentPath filer2.FullPath, entry *filer_pb.Entry) { + err := filer_pb.TraverseBfs(filerClient, util.FullPath(path), func(parentPath util.FullPath, entry *filer_pb.Entry) { protoMessage := &filer_pb.FullEntry{ Dir: string(parentPath), Entry: entry, } - bytes, err := proto.Marshal(protoMessage) - if err != nil { + if err := genFn(protoMessage, outputChan); err != nil { fmt.Fprintf(writer, "marshall error: %v\n", err) return } - outputChan <- bytes - if entry.IsDirectory { atomic.AddUint64(&dirCount, 1) } else { atomic.AddUint64(&fileCount, 1) } - if *verbose { + if verbose { println(parentPath.Child(entry.Name)) } @@ -120,66 +151,8 @@ func (c *commandFsMetaSave) Do(args []string, commandEnv *CommandEnv, writer io. wg.Wait() - if err == nil { + if err == nil && writer != nil { fmt.Fprintf(writer, "total %d directories, %d files\n", dirCount, fileCount) - fmt.Fprintf(writer, "meta data for http://%s:%d%s is saved to %s\n", filerServer, filerPort, path, fileName) } - return err - -} -func doTraverseBFS(ctx context.Context, writer io.Writer, filerClient filer2.FilerClient, - parentPath filer2.FullPath, fn func(parentPath filer2.FullPath, entry *filer_pb.Entry)) (err error) { - - K := 5 - - var jobQueueWg sync.WaitGroup - queue := util.NewQueue() - jobQueueWg.Add(1) - queue.Enqueue(parentPath) - var isTerminating bool - - for i := 0; i < K; i++ { - go func() { - for { - if isTerminating { - break - } - t := queue.Dequeue() - if t == nil { - time.Sleep(329 * time.Millisecond) - continue - } - dir := t.(filer2.FullPath) - processErr := processOneDirectory(ctx, writer, filerClient, dir, queue, &jobQueueWg, fn) - if processErr != nil { - err = processErr - } - jobQueueWg.Done() - } - }() - } - jobQueueWg.Wait() - isTerminating = true - return -} - -func processOneDirectory(ctx context.Context, writer io.Writer, filerClient filer2.FilerClient, - parentPath filer2.FullPath, queue *util.Queue, jobQueueWg *sync.WaitGroup, - fn func(parentPath filer2.FullPath, entry *filer_pb.Entry)) (err error) { - - return filer2.ReadDirAllEntries(ctx, filerClient, string(parentPath), "", func(entry *filer_pb.Entry, isLast bool) { - - fn(parentPath, entry) - - if entry.IsDirectory { - subDir := fmt.Sprintf("%s/%s", parentPath, entry.Name) - if parentPath == "/" { - subDir = "/" + entry.Name - } - jobQueueWg.Add(1) - queue.Enqueue(filer2.FullPath(subDir)) - } - }) - } |
