aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorChris Lu <chris.lu@gmail.com>2018-11-03 12:43:45 -0700
committerChris Lu <chris.lu@gmail.com>2018-11-03 12:43:45 -0700
commita64613172d6421330b6b7c5b694423ab9273e678 (patch)
tree403d0c6399f5f37bad2878d797d3e2c5d005bd97
parent3674ad9f8ea86cb4903eaf6f1e3c2598d7f602eb (diff)
downloadseaweedfs-a64613172d6421330b6b7c5b694423ab9273e678.tar.xz
seaweedfs-a64613172d6421330b6b7c5b694423ab9273e678.zip
bootstrap filer replication with weed filer.export -targetStore=notification
-rw-r--r--weed/command/filer_export.go50
-rw-r--r--weed/filer2/entry.go12
-rw-r--r--weed/filer2/filer_notify.go16
-rw-r--r--weed/filer2/filer_notify_test.go4
4 files changed, 61 insertions, 21 deletions
diff --git a/weed/command/filer_export.go b/weed/command/filer_export.go
index 9bd0f3014..d05607bf5 100644
--- a/weed/command/filer_export.go
+++ b/weed/command/filer_export.go
@@ -5,6 +5,8 @@ import (
"github.com/chrislusf/seaweedfs/weed/glog"
"github.com/chrislusf/seaweedfs/weed/server"
"github.com/spf13/viper"
+ "github.com/chrislusf/seaweedfs/weed/notification"
+ "github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
)
func init() {
@@ -22,14 +24,18 @@ var cmdFilerExport = &Command{
If target store is empty, only the directory tree will be listed.
+ If target store is "notification", the list of entries will be sent to notification.
+ This is usually used to bootstrap filer replication to a remote system.
+
`,
}
var (
// filerExportOutputFile = cmdFilerExport.Flag.String("output", "", "the output file. If empty, only list out the directory tree")
filerExportSourceStore = cmdFilerExport.Flag.String("sourceStore", "", "the source store name in filer.toml")
- filerExportTargetStore = cmdFilerExport.Flag.String("targetStore", "", "the target store name in filer.toml")
+ filerExportTargetStore = cmdFilerExport.Flag.String("targetStore", "", "the target store name in filer.toml, or \"notification\" to export all files to message queue")
dirListLimit = cmdFilerExport.Flag.Int("dirListLimit", 100000, "limit directory list size")
+ dryRun = cmdFilerExport.Flag.Bool("dryRun", false, "not actually moving data")
)
type statistics struct {
@@ -48,7 +54,7 @@ func runFilerExport(cmd *Command, args []string) bool {
if store.GetName() == *filerExportSourceStore {
viperSub := config.Sub(store.GetName())
if err := store.Initialize(viperSub); err != nil {
- glog.Fatalf("Failed to initialize store for %s: %+v",
+ glog.Fatalf("Failed to initialize source store for %s: %+v",
store.GetName(), err)
} else {
sourceStore = store
@@ -61,7 +67,7 @@ func runFilerExport(cmd *Command, args []string) bool {
if store.GetName() == *filerExportTargetStore {
viperSub := config.Sub(store.GetName())
if err := store.Initialize(viperSub); err != nil {
- glog.Fatalf("Failed to initialize store for %s: %+v",
+ glog.Fatalf("Failed to initialize target store for %s: %+v",
store.GetName(), err)
} else {
targetStore = store
@@ -79,14 +85,44 @@ func runFilerExport(cmd *Command, args []string) bool {
return false
}
+ if targetStore == nil && *filerExportTargetStore != "" && *filerExportTargetStore != "notification" {
+ glog.Errorf("Failed to find target store %s", *filerExportTargetStore)
+ println("existing data sources are:")
+ for _, store := range filer2.Stores {
+ println(" " + store.GetName())
+ }
+ return false
+ }
+
stat := statistics{}
var fn func(level int, entry *filer2.Entry) error
- if targetStore == nil {
+ if *filerExportTargetStore == "notification" {
+ weed_server.LoadConfiguration("notification", false)
+ v := viper.GetViper()
+ notification.LoadConfiguration(v.Sub("notification"))
+
+ fn = func(level int, entry *filer2.Entry) error {
+ printout(level, entry)
+ if *dryRun {
+ return nil
+ }
+ return notification.Queue.SendMessage(
+ string(entry.FullPath),
+ &filer_pb.EventNotification{
+ NewEntry: entry.ToProtoEntry(),
+ },
+ )
+ }
+ } else if targetStore == nil {
fn = printout
} else {
fn = func(level int, entry *filer2.Entry) error {
+ printout(level, entry)
+ if *dryRun {
+ return nil
+ }
return targetStore.InsertEntry(entry)
}
}
@@ -126,7 +162,11 @@ func doTraverse(stat *statistics, filerStore filer2.FilerStore, parentPath filer
func printout(level int, entry *filer2.Entry) error {
for i := 0; i < level; i++ {
- print(" ")
+ if i == level-1 {
+ print("+-")
+ } else {
+ print("| ")
+ }
}
println(entry.FullPath.Name())
return nil
diff --git a/weed/filer2/entry.go b/weed/filer2/entry.go
index 82bc00b27..11318a3ae 100644
--- a/weed/filer2/entry.go
+++ b/weed/filer2/entry.go
@@ -43,3 +43,15 @@ func (entry *Entry) Timestamp() time.Time {
return entry.Mtime
}
}
+
+func (entry *Entry) ToProtoEntry() *filer_pb.Entry {
+ if entry == nil {
+ return nil
+ }
+ return &filer_pb.Entry{
+ Name: string(entry.FullPath),
+ IsDirectory: entry.IsDirectory(),
+ Attributes: EntryAttributeToPb(entry),
+ Chunks: entry.Chunks,
+ }
+}
diff --git a/weed/filer2/filer_notify.go b/weed/filer2/filer_notify.go
index 44928516c..b3c215249 100644
--- a/weed/filer2/filer_notify.go
+++ b/weed/filer2/filer_notify.go
@@ -23,23 +23,11 @@ func (f *Filer) NotifyUpdateEvent(oldEntry, newEntry *Entry, deleteChunks bool)
notification.Queue.SendMessage(
key,
&filer_pb.EventNotification{
- OldEntry: toProtoEntry(oldEntry),
- NewEntry: toProtoEntry(newEntry),
+ OldEntry: oldEntry.ToProtoEntry(),
+ NewEntry: newEntry.ToProtoEntry(),
DeleteChunks: deleteChunks,
},
)
}
}
-
-func toProtoEntry(entry *Entry) *filer_pb.Entry {
- if entry == nil {
- return nil
- }
- return &filer_pb.Entry{
- Name: string(entry.FullPath),
- IsDirectory: entry.IsDirectory(),
- Attributes: EntryAttributeToPb(entry),
- Chunks: entry.Chunks,
- }
-}
diff --git a/weed/filer2/filer_notify_test.go b/weed/filer2/filer_notify_test.go
index ab54cd1a2..b74e2ad35 100644
--- a/weed/filer2/filer_notify_test.go
+++ b/weed/filer2/filer_notify_test.go
@@ -32,8 +32,8 @@ func TestProtoMarshalText(t *testing.T) {
}
notification := &filer_pb.EventNotification{
- OldEntry: toProtoEntry(oldEntry),
- NewEntry: toProtoEntry(nil),
+ OldEntry: oldEntry.ToProtoEntry(),
+ NewEntry: nil,
DeleteChunks: true,
}