diff options
Diffstat (limited to 'weed/command')
| -rw-r--r-- | weed/command/command.go | 1 | ||||
| -rw-r--r-- | weed/command/filer_replication.go | 61 | ||||
| -rw-r--r-- | weed/command/scaffold.go | 32 |
3 files changed, 93 insertions, 1 deletions
diff --git a/weed/command/command.go b/weed/command/command.go index 61f13c5e2..91b9bf3fc 100644 --- a/weed/command/command.go +++ b/weed/command/command.go @@ -14,6 +14,7 @@ var Commands = []*Command{ cmdCopy, cmdFix, cmdFilerExport, + cmdFilerReplicate, cmdServer, cmdMaster, cmdFiler, diff --git a/weed/command/filer_replication.go b/weed/command/filer_replication.go new file mode 100644 index 000000000..18f549edc --- /dev/null +++ b/weed/command/filer_replication.go @@ -0,0 +1,61 @@ +package command + +import ( + "github.com/chrislusf/seaweedfs/weed/glog" + "github.com/chrislusf/seaweedfs/weed/server" + "github.com/spf13/viper" + "github.com/chrislusf/seaweedfs/weed/replication" +) + +func init() { + cmdFilerReplicate.Run = runFilerReplicate // break init cycle +} + +var cmdFilerReplicate = &Command{ + UsageLine: "filer.replicate", + Short: "replicate file changes to another destination", + Long: `replicate file changes to another destination + + filer.replicate listens on filer notifications. If any file is updated, it will fetch the updated content, + and write to the other destination. + + Run "weed scaffold -config replication" to generate a replication.toml file and customize the parameters. + + `, +} + +func runFilerReplicate(cmd *Command, args []string) bool { + + weed_server.LoadConfiguration("replication", true) + config := viper.GetViper() + + var notificationInput replication.NotificationInput + + for _, input := range replication.NotificationInputs { + if config.GetBool("notification." + input.GetName() + ".enabled") { + viperSub := config.Sub("notification." + input.GetName()) + if err := input.Initialize(viperSub); err != nil { + glog.Fatalf("Failed to initialize notification input for %s: %+v", + input.GetName(), err) + } + glog.V(0).Infof("Configure notification input to %s", input.GetName()) + notificationInput = input + break + } + } + + replicator := replication.NewReplicator(config.Sub("sink.filer")) + + for { + key, m, err := notificationInput.ReceiveMessage() + if err != nil { + glog.Errorf("receive %s: +v", key, err) + continue + } + if err = replicator.Replicate(key, m); err != nil { + glog.Errorf("replicate %s: +v", key, err) + } + } + + return true +} diff --git a/weed/command/scaffold.go b/weed/command/scaffold.go index a5df2d18d..17d8b1884 100644 --- a/weed/command/scaffold.go +++ b/weed/command/scaffold.go @@ -19,7 +19,7 @@ var cmdScaffold = &Command{ var ( outputPath = cmdScaffold.Flag.String("output", "", "if not empty, save the configuration file to this directory") - config = cmdScaffold.Flag.String("config", "filer", "the configuration file to generate") + config = cmdScaffold.Flag.String("config", "filer", "[filer|replication] the configuration file to generate") ) func runScaffold(cmd *Command, args []string) bool { @@ -28,6 +28,8 @@ func runScaffold(cmd *Command, args []string) bool { switch *config { case "filer": content = FILER_TOML_EXAMPLE + case "replication": + content = REPLICATION_TOML_EXAMPLE } if content == "" { println("need a valid -config option") @@ -140,4 +142,32 @@ hosts = [ topic = "seaweedfs_filer" ` + REPLICATION_TOML_EXAMPLE = ` +# A sample TOML config file for replicating SeaweedFS filer store + + +[source.filer] +enabled = true +grpcAddress = "localhost:18888" +# id is to identify the notification source, avoid reprocessing the same events +id = "filer1" +# all files under this directory tree and not from this source.filer.id is replicated +directory = "/" + +[notification.kafka] +enabled = true +hosts = [ + "localhost:9092" +] +topic = "seaweedfs_filer" + +[sink.filer] +enabled = true +grpcAddress = "localhost:18888" +# id is to identify the notification source, avoid reprocessing the same events +id = "filer2" +# all files under this directory tree and not from this source.filer.id is replicated +directory = "/" + +` ) |
