aboutsummaryrefslogtreecommitdiff
path: root/weed/command/filer_remote_sync.go
blob: 18e8fcc3c0d4bc49c087db3bbcc6e89bb96089b0 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
package command

import (
	"context"
	"fmt"
	"github.com/chrislusf/seaweedfs/weed/glog"
	"github.com/chrislusf/seaweedfs/weed/pb"
	"github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
	"github.com/chrislusf/seaweedfs/weed/pb/remote_pb"
	"github.com/chrislusf/seaweedfs/weed/replication/source"
	"github.com/chrislusf/seaweedfs/weed/security"
	"github.com/chrislusf/seaweedfs/weed/util"
	"google.golang.org/grpc"
	"time"
)

type RemoteSyncOptions struct {
	filerAddress             *string
	grpcDialOption           grpc.DialOption
	readChunkFromFiler       *bool
	debug                    *bool
	timeAgo                  *time.Duration
	dir                      *string
	createBucketAt           *string
	createBucketRandomSuffix *bool

	mappings    *remote_pb.RemoteStorageMapping
	remoteConfs map[string]*remote_pb.RemoteConf
	bucketsDir  string
}

var _ = filer_pb.FilerClient(&RemoteSyncOptions{})

func (option *RemoteSyncOptions) WithFilerClient(fn func(filer_pb.SeaweedFilerClient) error) error {
	return pb.WithFilerClient(*option.filerAddress, option.grpcDialOption, func(client filer_pb.SeaweedFilerClient) error {
		return fn(client)
	})
}
func (option *RemoteSyncOptions) AdjustedUrl(location *filer_pb.Location) string {
	return location.Url
}

var (
	remoteSyncOptions RemoteSyncOptions
)

func init() {
	cmdFilerRemoteSynchronize.Run = runFilerRemoteSynchronize // break init cycle
	remoteSyncOptions.filerAddress = cmdFilerRemoteSynchronize.Flag.String("filer", "localhost:8888", "filer of the SeaweedFS cluster")
	remoteSyncOptions.dir = cmdFilerRemoteSynchronize.Flag.String("dir", "/", "a mounted directory on filer")
	remoteSyncOptions.createBucketAt = cmdFilerRemoteSynchronize.Flag.String("createBucketAt", "", "one remote storage name to create new buckets in")
	remoteSyncOptions.createBucketRandomSuffix = cmdFilerRemoteSynchronize.Flag.Bool("createBucketWithRandomSuffix", false, "add randomized suffix to bucket name to avoid conflicts")
	remoteSyncOptions.readChunkFromFiler = cmdFilerRemoteSynchronize.Flag.Bool("filerProxy", false, "read file chunks from filer instead of volume servers")
	remoteSyncOptions.debug = cmdFilerRemoteSynchronize.Flag.Bool("debug", false, "debug mode to print out filer updated remote files")
	remoteSyncOptions.timeAgo = cmdFilerRemoteSynchronize.Flag.Duration("timeAgo", 0, "start time before now. \"300ms\", \"1.5h\" or \"2h45m\". Valid time units are \"ns\", \"us\" (or \"µs\"), \"ms\", \"s\", \"m\", \"h\"")
}

var cmdFilerRemoteSynchronize = &Command{
	UsageLine: "filer.remote.sync -dir=/mount/s3_on_cloud or -createBucketAt=clound1",
	Short:     "resumable continuously write back updates to remote storage",
	Long: `resumable continuously write back updates to remote storage

	filer.remote.sync listens on filer update events. 
	If any mounted remote file is updated, it will fetch the updated content,
	and write to the remote storage.

	There are two modes:
	1)Write back one mounted folder to remote storage
		weed filer.remote.sync -dir=/mount/s3_on_cloud
	2)Watch /buckets folder and write back all changes.
	  Any new buckets will be created in this remote storage.
		weed filer.remote.sync -createBucketAt=cloud1

`,
}

func runFilerRemoteSynchronize(cmd *Command, args []string) bool {

	util.LoadConfiguration("security", false)
	grpcDialOption := security.LoadClientTLS(util.GetViper(), "grpc.client")
	remoteSyncOptions.grpcDialOption = grpcDialOption

	dir := *remoteSyncOptions.dir
	filerAddress := *remoteSyncOptions.filerAddress

	filerSource := &source.FilerSource{}
	filerSource.DoInitialize(
		filerAddress,
		pb.ServerToGrpcAddress(filerAddress),
		"/", // does not matter
		*remoteSyncOptions.readChunkFromFiler,
	)

	storageName := *remoteSyncOptions.createBucketAt
	if storageName != "" {

		remoteSyncOptions.bucketsDir = "/buckets"
		// check buckets again
		remoteSyncOptions.WithFilerClient(func(filerClient filer_pb.SeaweedFilerClient) error {
			resp, err := filerClient.GetFilerConfiguration(context.Background(), &filer_pb.GetFilerConfigurationRequest{})
			if err != nil {
				return err
			}
			remoteSyncOptions.bucketsDir = resp.DirBuckets
			return nil
		})

		fmt.Printf("synchronize %s, default new bucket creation in %s ...\n", remoteSyncOptions.bucketsDir, storageName)
		util.RetryForever("filer.remote.sync buckets "+storageName, func() error {
			return remoteSyncOptions.followBucketUpdatesAndUploadToRemote(filerSource)
		}, func(err error) bool {
			if err != nil {
				glog.Errorf("synchronize %s to %s: %v", remoteSyncOptions.bucketsDir, storageName, err)
			}
			return true
		})
	}

	if dir != "" {
		fmt.Printf("synchronize %s to remote storage...\n", dir)
		util.RetryForever("filer.remote.sync "+dir, func() error {
			return followUpdatesAndUploadToRemote(&remoteSyncOptions, filerSource, dir)
		}, func(err error) bool {
			if err != nil {
				glog.Errorf("synchronize %s: %v", dir, err)
			}
			return true
		})
	}

	return true
}