aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorChris Lu <chris.lu@gmail.com>2021-03-03 02:02:29 -0800
committerChris Lu <chris.lu@gmail.com>2021-03-03 02:02:29 -0800
commitc0842fe99f9ae8fcb672411c87aa233fc321c16b (patch)
treee13470dee1c0093c0094375b0a6ce519eb4d4d37
parent0c5c51eb983781b441cc8acfac3ce2e437591e98 (diff)
downloadseaweedfs-c0842fe99f9ae8fcb672411c87aa233fc321c16b.tar.xz
seaweedfs-c0842fe99f9ae8fcb672411c87aa233fc321c16b.zip
add filer.meta.backup command
-rw-r--r--weed/command/command.go1
-rw-r--r--weed/command/filer_meta_backup.go268
2 files changed, 269 insertions, 0 deletions
diff --git a/weed/command/command.go b/weed/command/command.go
index 5506e6969..a9063eaa0 100644
--- a/weed/command/command.go
+++ b/weed/command/command.go
@@ -17,6 +17,7 @@ var Commands = []*Command{
cmdFiler,
cmdFilerBackup,
cmdFilerCat,
+ cmdFilerMetaBackup,
cmdFilerMetaTail,
cmdFilerReplicate,
cmdFilerSynchronize,
diff --git a/weed/command/filer_meta_backup.go b/weed/command/filer_meta_backup.go
new file mode 100644
index 000000000..7121db44b
--- /dev/null
+++ b/weed/command/filer_meta_backup.go
@@ -0,0 +1,268 @@
+package command
+
+import (
+ "context"
+ "fmt"
+ "github.com/chrislusf/seaweedfs/weed/filer"
+ "github.com/chrislusf/seaweedfs/weed/glog"
+ "github.com/spf13/viper"
+ "google.golang.org/grpc"
+ "io"
+ "reflect"
+ "time"
+
+ "github.com/chrislusf/seaweedfs/weed/pb"
+ "github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
+ "github.com/chrislusf/seaweedfs/weed/security"
+ "github.com/chrislusf/seaweedfs/weed/util"
+)
+
+var (
+ metaBackup FilerMetaBackupOptions
+)
+
+type FilerMetaBackupOptions struct {
+ grpcDialOption grpc.DialOption
+ filerAddress *string
+ filerDirectory *string
+ restart *bool
+ backupFilerConfig *string
+
+ store filer.FilerStore
+}
+
+func init() {
+ cmdFilerMetaBackup.Run = runFilerMetaBackup // break init cycle
+ metaBackup.filerAddress = cmdFilerMetaBackup.Flag.String("filer", "localhost:8888", "filer hostname:port")
+ metaBackup.filerDirectory = cmdFilerMetaBackup.Flag.String("filerDir", "/", "a folder on the filer")
+ metaBackup.restart = cmdFilerMetaBackup.Flag.Bool("restart", false, "copy the full metadata snapshot before async continuous backup")
+ metaBackup.backupFilerConfig = cmdFilerMetaBackup.Flag.String("config", "", "path to filer.toml specifying backup filer store")
+}
+
+var cmdFilerMetaBackup = &Command{
+ UsageLine: "filer.meta.backup [-filer=localhost:8888] [-filerDir=/] [-restart] -config=/path/to/backup_filer.toml",
+ Short: "continuously backup filer meta data changes to anther filer store specified in a backup_filer.toml",
+ Long: `continuously backup filer meta data changes.
+The backup writes to another filer store specified in a backup_filer.toml.
+
+ weed filer.meta.backup -config=/path/to/backup_filer.toml -filer="localhost:8888"
+ weed filer.meta.backup -config=/path/to/backup_filer.toml -filer="localhost:8888" -restart
+
+ `,
+}
+
+func runFilerMetaBackup(cmd *Command, args []string) bool {
+
+ metaBackup.grpcDialOption = security.LoadClientTLS(util.GetViper(), "grpc.client")
+
+ // load backup_filer.toml
+ v := viper.New()
+ v.SetConfigFile(*metaBackup.backupFilerConfig)
+
+ if err := v.ReadInConfig(); err != nil { // Handle errors reading the config file
+ glog.Fatalf("Failed to load %s file.\nPlease use this command to generate the a %s.toml file\n"+
+ " weed scaffold -config=%s -output=.\n\n\n",
+ *metaBackup.backupFilerConfig, "backup_filer", "filer")
+ }
+
+ if err := metaBackup.initStore(v); err != nil {
+ glog.V(0).Infof("init backup filer store: %v", err)
+ return true
+ }
+
+ missingPreviousBackup := false
+ _, err := metaBackup.getOffset()
+ if err != nil {
+ missingPreviousBackup = true
+ }
+
+ if *metaBackup.restart || missingPreviousBackup {
+ glog.V(0).Infof("traversing metadata tree...")
+ startTime := time.Now()
+ if err := metaBackup.traverseMetadata(); err != nil {
+ glog.Errorf("traverse meta data: %v", err)
+ return true
+ }
+ glog.V(0).Infof("metadata copied up to %v", startTime)
+ if err := metaBackup.setOffset(startTime); err != nil {
+ startTime = time.Now()
+ }
+ }
+
+ for {
+ err := metaBackup.streamMetadataBackup()
+ if err != nil {
+ glog.Errorf("filer meta backup from %s: %v", *metaBackup.filerAddress, err)
+ time.Sleep(1747 * time.Millisecond)
+ }
+ }
+
+ return true
+}
+
+func (metaBackup *FilerMetaBackupOptions) initStore(v *viper.Viper) error {
+ // load configuration for default filer store
+ hasDefaultStoreConfigured := false
+ for _, store := range filer.Stores {
+ if v.GetBool(store.GetName() + ".enabled") {
+ store = reflect.New(reflect.ValueOf(store).Elem().Type()).Interface().(filer.FilerStore)
+ if err := store.Initialize(v, store.GetName()+"."); err != nil {
+ glog.Fatalf("failed to initialize store for %s: %+v", store.GetName(), err)
+ }
+ glog.V(0).Infof("configured filer store to %s", store.GetName())
+ hasDefaultStoreConfigured = true
+ metaBackup.store = filer.NewFilerStoreWrapper(store)
+ break
+ }
+ }
+ if !hasDefaultStoreConfigured {
+ return fmt.Errorf("no filer store enabled in %s", v.ConfigFileUsed())
+ }
+
+ return nil
+}
+
+func (metaBackup *FilerMetaBackupOptions) traverseMetadata() (err error) {
+ var saveErr error
+
+ traverseErr := filer_pb.TraverseBfs(metaBackup, util.FullPath(*metaBackup.filerDirectory), func(parentPath util.FullPath, entry *filer_pb.Entry) {
+
+ println("+", parentPath.Child(entry.Name))
+ if err := metaBackup.store.InsertEntry(context.Background(), filer.FromPbEntry(string(parentPath), entry)); err != nil {
+ saveErr = fmt.Errorf("insert entry error: %v\n", err)
+ return
+ }
+
+ })
+
+ if traverseErr != nil {
+ return fmt.Errorf("traverse: %v", traverseErr)
+ }
+ return saveErr
+}
+
+var (
+ MetaBackupKey = []byte("metaBackup")
+)
+
+func (metaBackup *FilerMetaBackupOptions) streamMetadataBackup() error {
+
+ startTime, err := metaBackup.getOffset()
+ if err != nil {
+ startTime = time.Now()
+ }
+ glog.V(0).Infof("streaming from %v", startTime)
+
+ store := metaBackup.store
+
+ eachEntryFunc := func(resp *filer_pb.SubscribeMetadataResponse) error {
+
+ ctx := context.Background()
+ message := resp.EventNotification
+
+ if message.OldEntry == nil && message.NewEntry == nil {
+ return nil
+ }
+ if message.OldEntry == nil && message.NewEntry != nil {
+ println("+", util.FullPath(message.NewParentPath).Child(message.NewEntry.Name))
+ entry := filer.FromPbEntry(message.NewParentPath, message.NewEntry)
+ return store.InsertEntry(ctx, entry)
+ }
+ if message.OldEntry != nil && message.NewEntry == nil {
+ println("-", util.FullPath(resp.Directory).Child(message.OldEntry.Name))
+ return store.DeleteEntry(ctx, util.FullPath(resp.Directory).Child(message.OldEntry.Name))
+ }
+ if message.OldEntry != nil && message.NewEntry != nil {
+ if resp.Directory == message.NewParentPath && message.OldEntry.Name == message.NewEntry.Name {
+ println("~", util.FullPath(message.NewParentPath).Child(message.NewEntry.Name))
+ entry := filer.FromPbEntry(message.NewParentPath, message.NewEntry)
+ return store.UpdateEntry(ctx, entry)
+ }
+ println("-", util.FullPath(resp.Directory).Child(message.OldEntry.Name))
+ if err := store.DeleteEntry(ctx, util.FullPath(resp.Directory).Child(message.OldEntry.Name)); err != nil {
+ return err
+ }
+ println("+", util.FullPath(message.NewParentPath).Child(message.NewEntry.Name))
+ return store.InsertEntry(ctx, filer.FromPbEntry(message.NewParentPath, message.NewEntry))
+ }
+
+ return nil
+ }
+
+ tailErr := pb.WithFilerClient(*metaBackup.filerAddress, metaBackup.grpcDialOption, func(client filer_pb.SeaweedFilerClient) error {
+
+ ctx, cancel := context.WithCancel(context.Background())
+ defer cancel()
+
+ stream, err := client.SubscribeMetadata(ctx, &filer_pb.SubscribeMetadataRequest{
+ ClientName: "meta_backup",
+ PathPrefix: *metaBackup.filerDirectory,
+ SinceNs: startTime.UnixNano(),
+ })
+ if err != nil {
+ return fmt.Errorf("listen: %v", err)
+ }
+
+ var counter int64
+ var lastWriteTime time.Time
+ for {
+ resp, listenErr := stream.Recv()
+ if listenErr == io.EOF {
+ return nil
+ }
+ if listenErr != nil {
+ return listenErr
+ }
+ if err = eachEntryFunc(resp); err != nil {
+ return err
+ }
+
+ counter++
+ if lastWriteTime.Add(3 * time.Second).Before(time.Now()) {
+ glog.V(0).Infof("meta backup %s progressed to %v %0.2f/sec", *metaBackup.filerAddress, time.Unix(0, resp.TsNs), float64(counter)/float64(3))
+ counter = 0
+ lastWriteTime = time.Now()
+ if err2 := metaBackup.setOffset(lastWriteTime); err2 != nil {
+ return err2
+ }
+ }
+
+ }
+
+ })
+ return tailErr
+}
+
+func (metaBackup *FilerMetaBackupOptions) getOffset() (lastWriteTime time.Time, err error) {
+ value, err := metaBackup.store.KvGet(context.Background(), MetaBackupKey)
+ if err != nil {
+ return
+ }
+ tsNs := util.BytesToUint64(value)
+
+ return time.Unix(0, int64(tsNs)), nil
+}
+
+func (metaBackup *FilerMetaBackupOptions) setOffset(lastWriteTime time.Time) error {
+ valueBuf := make([]byte, 8)
+ util.Uint64toBytes(valueBuf, uint64(lastWriteTime.UnixNano()))
+
+ if err := metaBackup.store.KvPut(context.Background(), MetaBackupKey, valueBuf); err != nil {
+ return err
+ }
+ return nil
+}
+
+var _ = filer_pb.FilerClient(&FilerMetaBackupOptions{})
+
+func (metaBackup *FilerMetaBackupOptions) WithFilerClient(fn func(filer_pb.SeaweedFilerClient) error) error {
+
+ return pb.WithFilerClient(*metaBackup.filerAddress, metaBackup.grpcDialOption, func(client filer_pb.SeaweedFilerClient) error {
+ return fn(client)
+ })
+
+}
+
+func (metaBackup *FilerMetaBackupOptions) AdjustedUrl(location *filer_pb.Location) string {
+ return location.Url
+}