aboutsummaryrefslogtreecommitdiff
path: root/weed/command
diff options
context:
space:
mode:
Diffstat (limited to 'weed/command')
-rw-r--r--weed/command/command.go1
-rw-r--r--weed/command/filer_replication.go61
-rw-r--r--weed/command/scaffold.go32
3 files changed, 93 insertions, 1 deletions
diff --git a/weed/command/command.go b/weed/command/command.go
index 61f13c5e2..91b9bf3fc 100644
--- a/weed/command/command.go
+++ b/weed/command/command.go
@@ -14,6 +14,7 @@ var Commands = []*Command{
cmdCopy,
cmdFix,
cmdFilerExport,
+ cmdFilerReplicate,
cmdServer,
cmdMaster,
cmdFiler,
diff --git a/weed/command/filer_replication.go b/weed/command/filer_replication.go
new file mode 100644
index 000000000..18f549edc
--- /dev/null
+++ b/weed/command/filer_replication.go
@@ -0,0 +1,61 @@
+package command
+
+import (
+ "github.com/chrislusf/seaweedfs/weed/glog"
+ "github.com/chrislusf/seaweedfs/weed/server"
+ "github.com/spf13/viper"
+ "github.com/chrislusf/seaweedfs/weed/replication"
+)
+
+func init() {
+ cmdFilerReplicate.Run = runFilerReplicate // break init cycle
+}
+
+var cmdFilerReplicate = &Command{
+ UsageLine: "filer.replicate",
+ Short: "replicate file changes to another destination",
+ Long: `replicate file changes to another destination
+
+ filer.replicate listens on filer notifications. If any file is updated, it will fetch the updated content,
+ and write to the other destination.
+
+ Run "weed scaffold -config replication" to generate a replication.toml file and customize the parameters.
+
+ `,
+}
+
+func runFilerReplicate(cmd *Command, args []string) bool {
+
+ weed_server.LoadConfiguration("replication", true)
+ config := viper.GetViper()
+
+ var notificationInput replication.NotificationInput
+
+ for _, input := range replication.NotificationInputs {
+ if config.GetBool("notification." + input.GetName() + ".enabled") {
+ viperSub := config.Sub("notification." + input.GetName())
+ if err := input.Initialize(viperSub); err != nil {
+ glog.Fatalf("Failed to initialize notification input for %s: %+v",
+ input.GetName(), err)
+ }
+ glog.V(0).Infof("Configure notification input to %s", input.GetName())
+ notificationInput = input
+ break
+ }
+ }
+
+ replicator := replication.NewReplicator(config.Sub("sink.filer"))
+
+ for {
+ key, m, err := notificationInput.ReceiveMessage()
+ if err != nil {
+ glog.Errorf("receive %s: +v", key, err)
+ continue
+ }
+ if err = replicator.Replicate(key, m); err != nil {
+ glog.Errorf("replicate %s: +v", key, err)
+ }
+ }
+
+ return true
+}
diff --git a/weed/command/scaffold.go b/weed/command/scaffold.go
index a5df2d18d..17d8b1884 100644
--- a/weed/command/scaffold.go
+++ b/weed/command/scaffold.go
@@ -19,7 +19,7 @@ var cmdScaffold = &Command{
var (
outputPath = cmdScaffold.Flag.String("output", "", "if not empty, save the configuration file to this directory")
- config = cmdScaffold.Flag.String("config", "filer", "the configuration file to generate")
+ config = cmdScaffold.Flag.String("config", "filer", "[filer|replication] the configuration file to generate")
)
func runScaffold(cmd *Command, args []string) bool {
@@ -28,6 +28,8 @@ func runScaffold(cmd *Command, args []string) bool {
switch *config {
case "filer":
content = FILER_TOML_EXAMPLE
+ case "replication":
+ content = REPLICATION_TOML_EXAMPLE
}
if content == "" {
println("need a valid -config option")
@@ -140,4 +142,32 @@ hosts = [
topic = "seaweedfs_filer"
`
+ REPLICATION_TOML_EXAMPLE = `
+# A sample TOML config file for replicating SeaweedFS filer store
+
+
+[source.filer]
+enabled = true
+grpcAddress = "localhost:18888"
+# id is to identify the notification source, avoid reprocessing the same events
+id = "filer1"
+# all files under this directory tree and not from this source.filer.id is replicated
+directory = "/"
+
+[notification.kafka]
+enabled = true
+hosts = [
+ "localhost:9092"
+]
+topic = "seaweedfs_filer"
+
+[sink.filer]
+enabled = true
+grpcAddress = "localhost:18888"
+# id is to identify the notification source, avoid reprocessing the same events
+id = "filer2"
+# all files under this directory tree and not from this source.filer.id is replicated
+directory = "/"
+
+`
)