aboutsummaryrefslogtreecommitdiff
path: root/weed/command/filer_replication.go
diff options
context:
space:
mode:
authorChris Lu <chris.lu@gmail.com>2018-09-17 00:27:56 -0700
committerChris Lu <chris.lu@gmail.com>2018-09-17 00:27:56 -0700
commit788acdf5275dfb7610afe9144c17d9128d1737f6 (patch)
tree55e6701210157981eecd8a4f72d8c9eefa01c457 /weed/command/filer_replication.go
parent865a0179369601e496d385e47bdbda93dcc3f243 (diff)
downloadseaweedfs-788acdf5275dfb7610afe9144c17d9128d1737f6.tar.xz
seaweedfs-788acdf5275dfb7610afe9144c17d9128d1737f6.zip
add WIP filer.replicate
Diffstat (limited to 'weed/command/filer_replication.go')
-rw-r--r--weed/command/filer_replication.go61
1 files changed, 61 insertions, 0 deletions
diff --git a/weed/command/filer_replication.go b/weed/command/filer_replication.go
new file mode 100644
index 000000000..18f549edc
--- /dev/null
+++ b/weed/command/filer_replication.go
@@ -0,0 +1,61 @@
+package command
+
+import (
+ "github.com/chrislusf/seaweedfs/weed/glog"
+ "github.com/chrislusf/seaweedfs/weed/server"
+ "github.com/spf13/viper"
+ "github.com/chrislusf/seaweedfs/weed/replication"
+)
+
+func init() {
+ cmdFilerReplicate.Run = runFilerReplicate // break init cycle
+}
+
+var cmdFilerReplicate = &Command{
+ UsageLine: "filer.replicate",
+ Short: "replicate file changes to another destination",
+ Long: `replicate file changes to another destination
+
+ filer.replicate listens on filer notifications. If any file is updated, it will fetch the updated content,
+ and write to the other destination.
+
+ Run "weed scaffold -config replication" to generate a replication.toml file and customize the parameters.
+
+ `,
+}
+
+func runFilerReplicate(cmd *Command, args []string) bool {
+
+ weed_server.LoadConfiguration("replication", true)
+ config := viper.GetViper()
+
+ var notificationInput replication.NotificationInput
+
+ for _, input := range replication.NotificationInputs {
+ if config.GetBool("notification." + input.GetName() + ".enabled") {
+ viperSub := config.Sub("notification." + input.GetName())
+ if err := input.Initialize(viperSub); err != nil {
+ glog.Fatalf("Failed to initialize notification input for %s: %+v",
+ input.GetName(), err)
+ }
+ glog.V(0).Infof("Configure notification input to %s", input.GetName())
+ notificationInput = input
+ break
+ }
+ }
+
+ replicator := replication.NewReplicator(config.Sub("sink.filer"))
+
+ for {
+ key, m, err := notificationInput.ReceiveMessage()
+ if err != nil {
+ glog.Errorf("receive %s: +v", key, err)
+ continue
+ }
+ if err = replicator.Replicate(key, m); err != nil {
+ glog.Errorf("replicate %s: +v", key, err)
+ }
+ }
+
+ return true
+}