aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorChris Lu <chris.lu@gmail.com>2019-04-16 01:58:28 -0700
committerChris Lu <chris.lu@gmail.com>2019-04-16 01:58:28 -0700
commit8ea1ee6dfaa78ebcacf4b03c3cb26b3155cc42eb (patch)
treed631f02621d38690d95b608bdc1aa944d80503b3
parent014906ec797c7075afed2b33705e0db63bcd2dbd (diff)
downloadseaweedfs-8ea1ee6dfaa78ebcacf4b03c3cb26b3155cc42eb.tar.xz
seaweedfs-8ea1ee6dfaa78ebcacf4b03c3cb26b3155cc42eb.zip
weed shell: add fs.meta.notify, removing filer.export
-rw-r--r--weed/command/command.go1
-rw-r--r--weed/command/filer_export.go191
-rw-r--r--weed/shell/command_fs_meta_notify.go78
3 files changed, 78 insertions, 192 deletions
diff --git a/weed/command/command.go b/weed/command/command.go
index 91b9bf3fc..39a01cc05 100644
--- a/weed/command/command.go
+++ b/weed/command/command.go
@@ -13,7 +13,6 @@ var Commands = []*Command{
cmdCompact,
cmdCopy,
cmdFix,
- cmdFilerExport,
cmdFilerReplicate,
cmdServer,
cmdMaster,
diff --git a/weed/command/filer_export.go b/weed/command/filer_export.go
deleted file mode 100644
index ed1ee8966..000000000
--- a/weed/command/filer_export.go
+++ /dev/null
@@ -1,191 +0,0 @@
-package command
-
-import (
- "context"
- "github.com/chrislusf/seaweedfs/weed/filer2"
- "github.com/chrislusf/seaweedfs/weed/glog"
- "github.com/chrislusf/seaweedfs/weed/notification"
- "github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
- "github.com/chrislusf/seaweedfs/weed/server"
- "github.com/spf13/viper"
-)
-
-func init() {
- cmdFilerExport.Run = runFilerExport // break init cycle
-}
-
-var cmdFilerExport = &Command{
- UsageLine: "filer.export -sourceStore=mysql -targetStore=cassandra",
- Short: "export meta data in filer store",
- Long: `Iterate the file tree and export all metadata out
-
- Both source and target store:
- * should be a store name already specified in filer.toml
- * do not need to be enabled state
-
- If target store is empty, only the directory tree will be listed.
-
- If target store is "notification", the list of entries will be sent to notification.
- This is usually used to bootstrap filer replication to a remote system.
-
- `,
-}
-
-var (
- // filerExportOutputFile = cmdFilerExport.Flag.String("output", "", "the output file. If empty, only list out the directory tree")
- filerExportSourceStore = cmdFilerExport.Flag.String("sourceStore", "", "the source store name in filer.toml, default to currently enabled store")
- filerExportTargetStore = cmdFilerExport.Flag.String("targetStore", "", "the target store name in filer.toml, or \"notification\" to export all files to message queue")
- dir = cmdFilerExport.Flag.String("dir", "/", "only process files under this directory")
- dirListLimit = cmdFilerExport.Flag.Int("dirListLimit", 100000, "limit directory list size")
- dryRun = cmdFilerExport.Flag.Bool("dryRun", false, "not actually moving data")
- verboseFilerExport = cmdFilerExport.Flag.Bool("v", false, "verbose entry details")
-)
-
-type statistics struct {
- directoryCount int
- fileCount int
-}
-
-func runFilerExport(cmd *Command, args []string) bool {
-
- weed_server.LoadConfiguration("filer", true)
- config := viper.GetViper()
-
- var sourceStore, targetStore filer2.FilerStore
-
- for _, store := range filer2.Stores {
- if store.GetName() == *filerExportSourceStore || *filerExportSourceStore == "" && config.GetBool(store.GetName()+".enabled") {
- viperSub := config.Sub(store.GetName())
- if err := store.Initialize(viperSub); err != nil {
- glog.Fatalf("Failed to initialize source store for %s: %+v",
- store.GetName(), err)
- } else {
- sourceStore = store
- }
- break
- }
- }
-
- for _, store := range filer2.Stores {
- if store.GetName() == *filerExportTargetStore {
- viperSub := config.Sub(store.GetName())
- if err := store.Initialize(viperSub); err != nil {
- glog.Fatalf("Failed to initialize target store for %s: %+v",
- store.GetName(), err)
- } else {
- targetStore = store
- }
- break
- }
- }
-
- if sourceStore == nil {
- glog.Errorf("Failed to find source store %s", *filerExportSourceStore)
- println("existing data sources are:")
- for _, store := range filer2.Stores {
- println(" " + store.GetName())
- }
- return false
- }
-
- if targetStore == nil && *filerExportTargetStore != "" && *filerExportTargetStore != "notification" {
- glog.Errorf("Failed to find target store %s", *filerExportTargetStore)
- println("existing data sources are:")
- for _, store := range filer2.Stores {
- println(" " + store.GetName())
- }
- return false
- }
-
- ctx := context.Background()
-
- stat := statistics{}
-
- var fn func(level int, entry *filer2.Entry) error
-
- if *filerExportTargetStore == "notification" {
- weed_server.LoadConfiguration("notification", false)
- v := viper.GetViper()
- notification.LoadConfiguration(v.Sub("notification"))
-
- fn = func(level int, entry *filer2.Entry) error {
- printout(level, entry)
- if *dryRun {
- return nil
- }
- return notification.Queue.SendMessage(
- string(entry.FullPath),
- &filer_pb.EventNotification{
- NewEntry: entry.ToProtoEntry(),
- },
- )
- }
- } else if targetStore == nil {
- fn = printout
- } else {
- fn = func(level int, entry *filer2.Entry) error {
- printout(level, entry)
- if *dryRun {
- return nil
- }
- return targetStore.InsertEntry(ctx, entry)
- }
- }
-
- doTraverse(ctx, &stat, sourceStore, filer2.FullPath(*dir), 0, fn)
-
- glog.Infof("processed %d directories, %d files", stat.directoryCount, stat.fileCount)
-
- return true
-}
-
-func doTraverse(ctx context.Context, stat *statistics, filerStore filer2.FilerStore, parentPath filer2.FullPath, level int, fn func(level int, entry *filer2.Entry) error) {
-
- limit := *dirListLimit
- lastEntryName := ""
- for {
- entries, err := filerStore.ListDirectoryEntries(ctx, parentPath, lastEntryName, false, limit)
- if err != nil {
- break
- }
- for _, entry := range entries {
- if fnErr := fn(level, entry); fnErr != nil {
- glog.Errorf("failed to process entry: %s", entry.FullPath)
- }
- if entry.IsDirectory() {
- stat.directoryCount++
- doTraverse(ctx, stat, filerStore, entry.FullPath, level+1, fn)
- } else {
- stat.fileCount++
- }
- lastEntryName = entry.Name()
- }
- if len(entries) < limit {
- break
- }
- }
-}
-
-func printout(level int, entry *filer2.Entry) error {
- for i := 0; i < level; i++ {
- if i == level-1 {
- print("+-")
- } else {
- print("| ")
- }
- }
- print(entry.FullPath.Name())
- if *verboseFilerExport {
- for _, chunk := range entry.Chunks {
- print("[")
- print(chunk.FileId)
- print(",")
- print(chunk.Offset)
- print(",")
- print(chunk.Size)
- print(")")
- }
- }
- println()
- return nil
-}
diff --git a/weed/shell/command_fs_meta_notify.go b/weed/shell/command_fs_meta_notify.go
new file mode 100644
index 000000000..ca4d8da5b
--- /dev/null
+++ b/weed/shell/command_fs_meta_notify.go
@@ -0,0 +1,78 @@
+package shell
+
+import (
+ "context"
+ "fmt"
+ "io"
+
+ "github.com/chrislusf/seaweedfs/weed/filer2"
+ "github.com/chrislusf/seaweedfs/weed/notification"
+ "github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
+ weed_server "github.com/chrislusf/seaweedfs/weed/server"
+ "github.com/spf13/viper"
+)
+
+func init() {
+ commands = append(commands, &commandFsMetaNotify{})
+}
+
+type commandFsMetaNotify struct {
+}
+
+func (c *commandFsMetaNotify) Name() string {
+ return "fs.meta.notify"
+}
+
+func (c *commandFsMetaNotify) Help() string {
+ return `recursively send directory and file meta data to notifiction message queue
+
+ fs.meta.notify # send meta data from current directory to notification message queue
+
+ The message queue will use it to trigger replication from this filer.
+
+`
+}
+
+func (c *commandFsMetaNotify) Do(args []string, commandEnv *commandEnv, writer io.Writer) (err error) {
+
+ filerServer, filerPort, path, err := commandEnv.parseUrl(findInputDirectory(args))
+ if err != nil {
+ return err
+ }
+
+ weed_server.LoadConfiguration("notification", true)
+ v := viper.GetViper()
+ notification.LoadConfiguration(v.Sub("notification"))
+
+ ctx := context.Background()
+
+ return commandEnv.withFilerClient(ctx, filerServer, filerPort, func(client filer_pb.SeaweedFilerClient) error {
+
+ var dirCount, fileCount uint64
+
+ err = doTraverse(ctx, writer, client, filer2.FullPath(path), func(parentPath filer2.FullPath, entry *filer_pb.Entry) error {
+
+ if entry.IsDirectory {
+ dirCount++
+ } else {
+ fileCount++
+ }
+
+ return notification.Queue.SendMessage(
+ string(parentPath.Child(entry.Name)),
+ &filer_pb.EventNotification{
+ NewEntry: entry,
+ },
+ )
+
+ })
+
+ if err == nil {
+ fmt.Fprintf(writer, "\ntotal notified %d directories, %d files\n", dirCount, fileCount)
+ }
+
+ return err
+
+ })
+
+}