diff options
Diffstat (limited to 'weed/command/filer_remote_gateway.go')
| -rw-r--r-- | weed/command/filer_remote_gateway.go | 113 |
1 files changed, 113 insertions, 0 deletions
diff --git a/weed/command/filer_remote_gateway.go b/weed/command/filer_remote_gateway.go new file mode 100644 index 000000000..ea23daf5e --- /dev/null +++ b/weed/command/filer_remote_gateway.go @@ -0,0 +1,113 @@ +package command + +import ( + "context" + "fmt" + "github.com/chrislusf/seaweedfs/weed/glog" + "github.com/chrislusf/seaweedfs/weed/pb" + "github.com/chrislusf/seaweedfs/weed/pb/filer_pb" + "github.com/chrislusf/seaweedfs/weed/pb/remote_pb" + "github.com/chrislusf/seaweedfs/weed/replication/source" + "github.com/chrislusf/seaweedfs/weed/security" + "github.com/chrislusf/seaweedfs/weed/util" + "google.golang.org/grpc" + "os" + "time" +) + +type RemoteGatewayOptions struct { + filerAddress *string + grpcDialOption grpc.DialOption + readChunkFromFiler *bool + timeAgo *time.Duration + createBucketAt *string + createBucketRandomSuffix *bool + + mappings *remote_pb.RemoteStorageMapping + remoteConfs map[string]*remote_pb.RemoteConf + bucketsDir string +} + +var _ = filer_pb.FilerClient(&RemoteGatewayOptions{}) + +func (option *RemoteGatewayOptions) WithFilerClient(fn func(filer_pb.SeaweedFilerClient) error) error { + return pb.WithFilerClient(pb.ServerAddress(*option.filerAddress), option.grpcDialOption, func(client filer_pb.SeaweedFilerClient) error { + return fn(client) + }) +} +func (option *RemoteGatewayOptions) AdjustedUrl(location *filer_pb.Location) string { + return location.Url +} + +var ( + remoteGatewayOptions RemoteGatewayOptions +) + +func init() { + cmdFilerRemoteGateway.Run = runFilerRemoteGateway // break init cycle + remoteGatewayOptions.filerAddress = cmdFilerRemoteGateway.Flag.String("filer", "localhost:8888", "filer of the SeaweedFS cluster") + remoteGatewayOptions.createBucketAt = cmdFilerRemoteGateway.Flag.String("createBucketAt", "", "one remote storage name to create new buckets in") + remoteGatewayOptions.createBucketRandomSuffix = cmdFilerRemoteGateway.Flag.Bool("createBucketWithRandomSuffix", true, "add randomized suffix to bucket name to avoid conflicts") + remoteGatewayOptions.readChunkFromFiler = cmdFilerRemoteGateway.Flag.Bool("filerProxy", false, "read file chunks from filer instead of volume servers") + remoteGatewayOptions.timeAgo = cmdFilerRemoteGateway.Flag.Duration("timeAgo", 0, "start time before now. \"300ms\", \"1.5h\" or \"2h45m\". Valid time units are \"ns\", \"us\" (or \"µs\"), \"ms\", \"s\", \"m\", \"h\"") +} + +var cmdFilerRemoteGateway = &Command{ + UsageLine: "filer.remote.gateway", + Short: "resumable continuously write back bucket creation, deletion, and other local updates to remote storage", + Long: `resumable continuously write back bucket creation, deletion, and other local updates to remote storage + + filer.remote.gateway listens on filer local buckets update events. + If any bucket is created, deleted, or updated, it will mirror the changes to remote object store. + + weed filer.remote.sync -createBucketAt=cloud1 + +`, +} + +func runFilerRemoteGateway(cmd *Command, args []string) bool { + + util.LoadConfiguration("security", false) + grpcDialOption := security.LoadClientTLS(util.GetViper(), "grpc.client") + remoteGatewayOptions.grpcDialOption = grpcDialOption + + filerAddress := pb.ServerAddress(*remoteGatewayOptions.filerAddress) + + filerSource := &source.FilerSource{} + filerSource.DoInitialize( + filerAddress.ToHttpAddress(), + filerAddress.ToGrpcAddress(), + "/", // does not matter + *remoteGatewayOptions.readChunkFromFiler, + ) + + remoteGatewayOptions.bucketsDir = "/buckets" + // check buckets again + remoteGatewayOptions.WithFilerClient(func(filerClient filer_pb.SeaweedFilerClient) error { + resp, err := filerClient.GetFilerConfiguration(context.Background(), &filer_pb.GetFilerConfigurationRequest{}) + if err != nil { + return err + } + remoteGatewayOptions.bucketsDir = resp.DirBuckets + return nil + }) + + // read filer remote storage mount mappings + if detectErr := remoteGatewayOptions.collectRemoteStorageConf(); detectErr != nil { + fmt.Fprintf(os.Stderr, "read mount info: %v\n", detectErr) + return true + } + + // synchronize /buckets folder + fmt.Printf("synchronize buckets in %s ...\n", remoteGatewayOptions.bucketsDir) + util.RetryForever("filer.remote.sync buckets", func() error { + return remoteGatewayOptions.followBucketUpdatesAndUploadToRemote(filerSource) + }, func(err error) bool { + if err != nil { + glog.Errorf("synchronize %s: %v", remoteGatewayOptions.bucketsDir, err) + } + return true + }) + return true + +} |
