aboutsummaryrefslogtreecommitdiff
path: root/weed/command/filer_remote_sync.go
diff options
context:
space:
mode:
authorhilimd <68371223+hilimd@users.noreply.github.com>2021-09-13 10:34:33 +0800
committerGitHub <noreply@github.com>2021-09-13 10:34:33 +0800
commit1de733fda507e1da94b2e4741c74ba7e5e2c5f76 (patch)
treeaed7ac29e27e0f8def942154603375396fae9489 /weed/command/filer_remote_sync.go
parent27c05f8c0b5c7bda43babeb61d79684d11851111 (diff)
parent7591336a2269c1ad92266280634bcaea34f7a5d1 (diff)
downloadseaweedfs-1de733fda507e1da94b2e4741c74ba7e5e2c5f76.tar.xz
seaweedfs-1de733fda507e1da94b2e4741c74ba7e5e2c5f76.zip
Merge pull request #81 from chrislusf/master
sync
Diffstat (limited to 'weed/command/filer_remote_sync.go')
-rw-r--r--weed/command/filer_remote_sync.go254
1 files changed, 71 insertions, 183 deletions
diff --git a/weed/command/filer_remote_sync.go b/weed/command/filer_remote_sync.go
index 8b20957e4..3776ee4d9 100644
--- a/weed/command/filer_remote_sync.go
+++ b/weed/command/filer_remote_sync.go
@@ -3,32 +3,33 @@ package command
import (
"context"
"fmt"
- "github.com/chrislusf/seaweedfs/weed/filer"
"github.com/chrislusf/seaweedfs/weed/glog"
"github.com/chrislusf/seaweedfs/weed/pb"
"github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
- "github.com/chrislusf/seaweedfs/weed/remote_storage"
+ "github.com/chrislusf/seaweedfs/weed/pb/remote_pb"
"github.com/chrislusf/seaweedfs/weed/replication/source"
"github.com/chrislusf/seaweedfs/weed/security"
"github.com/chrislusf/seaweedfs/weed/util"
- "github.com/golang/protobuf/proto"
"google.golang.org/grpc"
+ "os"
"time"
)
type RemoteSyncOptions struct {
- filerAddress *string
- grpcDialOption grpc.DialOption
- readChunkFromFiler *bool
- debug *bool
- timeAgo *time.Duration
- dir *string
+ filerAddress *string
+ grpcDialOption grpc.DialOption
+ readChunkFromFiler *bool
+ debug *bool
+ timeAgo *time.Duration
+ dir *string
+ createBucketAt *string
+ createBucketRandomSuffix *bool
+
+ mappings *remote_pb.RemoteStorageMapping
+ remoteConfs map[string]*remote_pb.RemoteConf
+ bucketsDir string
}
-const (
- RemoteSyncKeyPrefix = "remote.sync."
-)
-
var _ = filer_pb.FilerClient(&RemoteSyncOptions{})
func (option *RemoteSyncOptions) WithFilerClient(fn func(filer_pb.SeaweedFilerClient) error) error {
@@ -47,20 +48,38 @@ var (
func init() {
cmdFilerRemoteSynchronize.Run = runFilerRemoteSynchronize // break init cycle
remoteSyncOptions.filerAddress = cmdFilerRemoteSynchronize.Flag.String("filer", "localhost:8888", "filer of the SeaweedFS cluster")
- remoteSyncOptions.dir = cmdFilerRemoteSynchronize.Flag.String("dir", "/", "a mounted directory on filer")
+ remoteSyncOptions.dir = cmdFilerRemoteSynchronize.Flag.String("dir", "", "a mounted directory on filer")
+ remoteSyncOptions.createBucketAt = cmdFilerRemoteSynchronize.Flag.String("createBucketAt", "", "one remote storage name to create new buckets in")
+ remoteSyncOptions.createBucketRandomSuffix = cmdFilerRemoteSynchronize.Flag.Bool("createBucketWithRandomSuffix", true, "add randomized suffix to bucket name to avoid conflicts")
remoteSyncOptions.readChunkFromFiler = cmdFilerRemoteSynchronize.Flag.Bool("filerProxy", false, "read file chunks from filer instead of volume servers")
remoteSyncOptions.debug = cmdFilerRemoteSynchronize.Flag.Bool("debug", false, "debug mode to print out filer updated remote files")
remoteSyncOptions.timeAgo = cmdFilerRemoteSynchronize.Flag.Duration("timeAgo", 0, "start time before now. \"300ms\", \"1.5h\" or \"2h45m\". Valid time units are \"ns\", \"us\" (or \"µs\"), \"ms\", \"s\", \"m\", \"h\"")
}
var cmdFilerRemoteSynchronize = &Command{
- UsageLine: "filer.remote.sync -filer=<filerHost>:<filerPort> -dir=/mount/s3_on_cloud",
- Short: "resumable continuously write back updates to remote storage if the directory is mounted to the remote storage",
- Long: `resumable continuously write back updates to remote storage if the directory is mounted to the remote storage
+ UsageLine: "filer.remote.sync",
+ Short: "resumable continuously write back updates to remote storage",
+ Long: `resumable continuously write back updates to remote storage
filer.remote.sync listens on filer update events.
If any mounted remote file is updated, it will fetch the updated content,
and write to the remote storage.
+
+ There are two modes:
+
+ 1)By default, watch /buckets folder and write back all changes.
+
+ # if there is only one remote storage configured
+ weed filer.remote.sync
+ # if there are multiple remote storages configured
+ # specify a remote storage to create new buckets.
+ weed filer.remote.sync -createBucketAt=cloud1
+
+ 2)Write back one mounted folder to remote storage
+
+ weed filer.remote.sync -dir=/mount/s3_on_cloud
+
+
`,
}
@@ -70,188 +89,57 @@ func runFilerRemoteSynchronize(cmd *Command, args []string) bool {
grpcDialOption := security.LoadClientTLS(util.GetViper(), "grpc.client")
remoteSyncOptions.grpcDialOption = grpcDialOption
- // read filer remote storage mount mappings
- mappings, readErr := filer.ReadMountMappings(grpcDialOption, *remoteSyncOptions.filerAddress)
- if readErr != nil {
- fmt.Printf("read mount mapping: %v", readErr)
- return false
- }
+ dir := *remoteSyncOptions.dir
+ filerAddress := *remoteSyncOptions.filerAddress
filerSource := &source.FilerSource{}
filerSource.DoInitialize(
- *remoteSyncOptions.filerAddress,
- pb.ServerToGrpcAddress(*remoteSyncOptions.filerAddress),
+ filerAddress,
+ pb.ServerToGrpcAddress(filerAddress),
"/", // does not matter
*remoteSyncOptions.readChunkFromFiler,
)
- var found bool
- for dir, remoteStorageMountLocation := range mappings.Mappings {
- if *remoteSyncOptions.dir == dir {
- found = true
- storageConf, readErr := filer.ReadRemoteStorageConf(grpcDialOption, *remoteSyncOptions.filerAddress, remoteStorageMountLocation.Name)
- if readErr != nil {
- fmt.Printf("read remote storage configuration for %s: %v", dir, readErr)
- continue
- }
- fmt.Printf("synchronize %s to remote storage...\n", *remoteSyncOptions.dir)
- if err := util.Retry("filer.remote.sync "+dir, func() error {
- return followUpdatesAndUploadToRemote(&remoteSyncOptions, filerSource, dir, storageConf, remoteStorageMountLocation)
- }); err != nil {
- fmt.Printf("synchronize %s: %v\n", *remoteSyncOptions.dir, err)
- }
- break
- }
- }
- if !found {
- fmt.Printf("directory %s is not mounted to any remote storage\n", *remoteSyncOptions.dir)
- return false
- }
-
- return true
-}
-
-func followUpdatesAndUploadToRemote(option *RemoteSyncOptions, filerSource *source.FilerSource, mountedDir string, remoteStorage *filer_pb.RemoteConf, remoteStorageMountLocation *filer_pb.RemoteStorageLocation) error {
-
- dirHash := util.HashStringToLong(mountedDir)
-
- // 1. specified by timeAgo
- // 2. last offset timestamp for this directory
- // 3. directory creation time
- var lastOffsetTs time.Time
- if *option.timeAgo == 0 {
- mountedDirEntry, err := filer_pb.GetEntry(option, util.FullPath(mountedDir))
+ remoteSyncOptions.bucketsDir = "/buckets"
+ // check buckets again
+ remoteSyncOptions.WithFilerClient(func(filerClient filer_pb.SeaweedFilerClient) error {
+ resp, err := filerClient.GetFilerConfiguration(context.Background(), &filer_pb.GetFilerConfigurationRequest{})
if err != nil {
- return fmt.Errorf("lookup %s: %v", mountedDir, err)
- }
-
- lastOffsetTsNs, err := getOffset(option.grpcDialOption, *option.filerAddress, RemoteSyncKeyPrefix, int32(dirHash))
- if err == nil && mountedDirEntry.Attributes.Crtime < lastOffsetTsNs/1000000 {
- lastOffsetTs = time.Unix(0, lastOffsetTsNs)
- glog.V(0).Infof("resume from %v", lastOffsetTs)
- } else {
- lastOffsetTs = time.Unix(mountedDirEntry.Attributes.Crtime, 0)
- }
- } else {
- lastOffsetTs = time.Now().Add(-*option.timeAgo)
- }
-
- client, err := remote_storage.GetRemoteStorage(remoteStorage)
- if err != nil {
- return err
- }
-
- eachEntryFunc := func(resp *filer_pb.SubscribeMetadataResponse) error {
- message := resp.EventNotification
- if message.OldEntry == nil && message.NewEntry == nil {
- return nil
- }
- if message.OldEntry == nil && message.NewEntry != nil {
- if len(message.NewEntry.Chunks) == 0 {
- return nil
- }
- fmt.Printf("create: %+v\n", resp)
- if !shouldSendToRemote(message.NewEntry) {
- fmt.Printf("skipping creating: %+v\n", resp)
- return nil
- }
- dest := toRemoteStorageLocation(util.FullPath(mountedDir), util.NewFullPath(message.NewParentPath, message.NewEntry.Name), remoteStorageMountLocation)
- if message.NewEntry.IsDirectory {
- return client.WriteDirectory(dest, message.NewEntry)
- }
- reader := filer.NewChunkStreamReader(filerSource, message.NewEntry.Chunks)
- remoteEntry, writeErr := client.WriteFile(dest, message.NewEntry, reader)
- if writeErr != nil {
- return writeErr
- }
- return updateLocalEntry(&remoteSyncOptions, message.NewParentPath, message.NewEntry, remoteEntry)
- }
- if message.OldEntry != nil && message.NewEntry == nil {
- fmt.Printf("delete: %+v\n", resp)
- dest := toRemoteStorageLocation(util.FullPath(mountedDir), util.NewFullPath(resp.Directory, message.OldEntry.Name), remoteStorageMountLocation)
- return client.DeleteFile(dest)
+ return err
}
- if message.OldEntry != nil && message.NewEntry != nil {
- oldDest := toRemoteStorageLocation(util.FullPath(mountedDir), util.NewFullPath(resp.Directory, message.OldEntry.Name), remoteStorageMountLocation)
- dest := toRemoteStorageLocation(util.FullPath(mountedDir), util.NewFullPath(message.NewParentPath, message.NewEntry.Name), remoteStorageMountLocation)
- if !shouldSendToRemote(message.NewEntry) {
- fmt.Printf("skipping updating: %+v\n", resp)
- return nil
- }
- if message.NewEntry.IsDirectory {
- return client.WriteDirectory(dest, message.NewEntry)
- }
- if resp.Directory == message.NewParentPath && message.OldEntry.Name == message.NewEntry.Name {
- if isSameChunks(message.OldEntry.Chunks, message.NewEntry.Chunks) {
- fmt.Printf("update meta: %+v\n", resp)
- return client.UpdateFileMetadata(dest, message.NewEntry)
- }
- }
- fmt.Printf("update: %+v\n", resp)
- if err := client.DeleteFile(oldDest); err != nil {
- return err
- }
- reader := filer.NewChunkStreamReader(filerSource, message.NewEntry.Chunks)
- remoteEntry, writeErr := client.WriteFile(dest, message.NewEntry, reader)
- if writeErr != nil {
- return writeErr
- }
- return updateLocalEntry(&remoteSyncOptions, message.NewParentPath, message.NewEntry, remoteEntry)
- }
-
+ remoteSyncOptions.bucketsDir = resp.DirBuckets
return nil
- }
-
- processEventFnWithOffset := pb.AddOffsetFunc(eachEntryFunc, 3*time.Second, func(counter int64, lastTsNs int64) error {
- lastTime := time.Unix(0, lastTsNs)
- glog.V(0).Infof("remote sync %s progressed to %v %0.2f/sec", *option.filerAddress, lastTime, float64(counter)/float64(3))
- return setOffset(option.grpcDialOption, *option.filerAddress, RemoteSyncKeyPrefix, int32(dirHash), lastTsNs)
})
- return pb.FollowMetadata(*option.filerAddress, option.grpcDialOption,
- "filer.remote.sync", mountedDir, lastOffsetTs.UnixNano(), 0, processEventFnWithOffset, false)
-}
-
-func toRemoteStorageLocation(mountDir, sourcePath util.FullPath, remoteMountLocation *filer_pb.RemoteStorageLocation) *filer_pb.RemoteStorageLocation {
- source := string(sourcePath[len(mountDir):])
- dest := util.FullPath(remoteMountLocation.Path).Child(source)
- return &filer_pb.RemoteStorageLocation{
- Name: remoteMountLocation.Name,
- Bucket: remoteMountLocation.Bucket,
- Path: string(dest),
- }
-}
-
-func isSameChunks(a, b []*filer_pb.FileChunk) bool {
- if len(a) != len(b) {
- return false
- }
- for i := 0; i < len(a); i++ {
- x, y := a[i], b[i]
- if !proto.Equal(x, y) {
- return false
- }
- }
- return true
-}
-
-func shouldSendToRemote(entry *filer_pb.Entry) bool {
- if entry.RemoteEntry == nil {
+ if dir != "" && dir != remoteSyncOptions.bucketsDir {
+ fmt.Printf("synchronize %s to remote storage...\n", dir)
+ util.RetryForever("filer.remote.sync "+dir, func() error {
+ return followUpdatesAndUploadToRemote(&remoteSyncOptions, filerSource, dir)
+ }, func(err error) bool {
+ if err != nil {
+ glog.Errorf("synchronize %s: %v", dir, err)
+ }
+ return true
+ })
return true
}
- if entry.RemoteEntry.LastLocalSyncTsNs/1e9 < entry.Attributes.Mtime {
+
+ // read filer remote storage mount mappings
+ if detectErr := remoteSyncOptions.collectRemoteStorageConf(); detectErr != nil {
+ fmt.Fprintf(os.Stderr, "read mount info: %v\n", detectErr)
return true
}
- return false
-}
-func updateLocalEntry(filerClient filer_pb.FilerClient, dir string, entry *filer_pb.Entry, remoteEntry *filer_pb.RemoteEntry) error {
- entry.RemoteEntry = remoteEntry
- return filerClient.WithFilerClient(func(client filer_pb.SeaweedFilerClient) error {
- _, err := client.UpdateEntry(context.Background(), &filer_pb.UpdateEntryRequest{
- Directory: dir,
- Entry: entry,
- })
- return err
+ // synchronize /buckets folder
+ fmt.Printf("synchronize buckets in %s ...\n", remoteSyncOptions.bucketsDir)
+ util.RetryForever("filer.remote.sync buckets", func() error {
+ return remoteSyncOptions.followBucketUpdatesAndUploadToRemote(filerSource)
+ }, func(err error) bool {
+ if err != nil {
+ glog.Errorf("synchronize %s: %v", remoteSyncOptions.bucketsDir, err)
+ }
+ return true
})
+ return true
+
}