diff options
Diffstat (limited to 'weed/command')
| -rw-r--r-- | weed/command/filer_backup.go | 2 | ||||
| -rw-r--r-- | weed/command/filer_meta_backup.go | 2 | ||||
| -rw-r--r-- | weed/command/filer_meta_tail.go | 2 | ||||
| -rw-r--r-- | weed/command/filer_remote_sync.go | 217 | ||||
| -rw-r--r-- | weed/command/filer_remote_sync_buckets.go | 363 | ||||
| -rw-r--r-- | weed/command/filer_remote_sync_dir.go | 221 | ||||
| -rw-r--r-- | weed/command/filer_sync.go | 2 | ||||
| -rw-r--r-- | weed/command/filer_sync_std.go | 1 | ||||
| -rw-r--r-- | weed/command/imports.go | 6 | ||||
| -rw-r--r-- | weed/command/mount_notsupported.go | 5 | ||||
| -rw-r--r-- | weed/command/mount_std.go | 1 | ||||
| -rw-r--r-- | weed/command/scaffold/filer.toml | 8 |
12 files changed, 659 insertions, 171 deletions
diff --git a/weed/command/filer_backup.go b/weed/command/filer_backup.go index 0c450181b..5b6409187 100644 --- a/weed/command/filer_backup.go +++ b/weed/command/filer_backup.go @@ -113,6 +113,6 @@ func doFilerBackup(grpcDialOption grpc.DialOption, backupOption *FilerBackupOpti }) return pb.FollowMetadata(sourceFiler, grpcDialOption, "backup_"+dataSink.GetName(), - sourcePath, startFrom.UnixNano(), 0, processEventFnWithOffset, false) + sourcePath, nil, startFrom.UnixNano(), 0, processEventFnWithOffset, false) } diff --git a/weed/command/filer_meta_backup.go b/weed/command/filer_meta_backup.go index 6fe323fba..3757f63f1 100644 --- a/weed/command/filer_meta_backup.go +++ b/weed/command/filer_meta_backup.go @@ -196,7 +196,7 @@ func (metaBackup *FilerMetaBackupOptions) streamMetadataBackup() error { }) return pb.FollowMetadata(*metaBackup.filerAddress, metaBackup.grpcDialOption, "meta_backup", - *metaBackup.filerDirectory, startTime.UnixNano(), 0, processEventFnWithOffset, false) + *metaBackup.filerDirectory, nil, startTime.UnixNano(), 0, processEventFnWithOffset, false) } diff --git a/weed/command/filer_meta_tail.go b/weed/command/filer_meta_tail.go index 28c0db99b..0363ae8d1 100644 --- a/weed/command/filer_meta_tail.go +++ b/weed/command/filer_meta_tail.go @@ -104,7 +104,7 @@ func runFilerMetaTail(cmd *Command, args []string) bool { } tailErr := pb.FollowMetadata(*tailFiler, grpcDialOption, "tail", - *tailTarget, time.Now().Add(-*tailStart).UnixNano(), 0, + *tailTarget, nil, time.Now().Add(-*tailStart).UnixNano(), 0, func(resp *filer_pb.SubscribeMetadataResponse) error { if !shouldPrint(resp) { return nil diff --git a/weed/command/filer_remote_sync.go b/weed/command/filer_remote_sync.go index db5ce8b90..6ca8477ce 100644 --- a/weed/command/filer_remote_sync.go +++ b/weed/command/filer_remote_sync.go @@ -3,12 +3,10 @@ package command import ( "context" "fmt" - "github.com/chrislusf/seaweedfs/weed/filer" "github.com/chrislusf/seaweedfs/weed/glog" "github.com/chrislusf/seaweedfs/weed/pb" "github.com/chrislusf/seaweedfs/weed/pb/filer_pb" "github.com/chrislusf/seaweedfs/weed/pb/remote_pb" - "github.com/chrislusf/seaweedfs/weed/remote_storage" "github.com/chrislusf/seaweedfs/weed/replication/source" "github.com/chrislusf/seaweedfs/weed/security" "github.com/chrislusf/seaweedfs/weed/util" @@ -17,12 +15,18 @@ import ( ) type RemoteSyncOptions struct { - filerAddress *string - grpcDialOption grpc.DialOption - readChunkFromFiler *bool - debug *bool - timeAgo *time.Duration - dir *string + filerAddress *string + grpcDialOption grpc.DialOption + readChunkFromFiler *bool + debug *bool + timeAgo *time.Duration + dir *string + createBucketAt *string + createBucketRandomSuffix *bool + + mappings *remote_pb.RemoteStorageMapping + remoteConfs map[string]*remote_pb.RemoteConf + bucketsDir string } const ( @@ -48,19 +52,29 @@ func init() { cmdFilerRemoteSynchronize.Run = runFilerRemoteSynchronize // break init cycle remoteSyncOptions.filerAddress = cmdFilerRemoteSynchronize.Flag.String("filer", "localhost:8888", "filer of the SeaweedFS cluster") remoteSyncOptions.dir = cmdFilerRemoteSynchronize.Flag.String("dir", "/", "a mounted directory on filer") + remoteSyncOptions.createBucketAt = cmdFilerRemoteSynchronize.Flag.String("createBucketAt", "", "one remote storage name to create new buckets in") + remoteSyncOptions.createBucketRandomSuffix = cmdFilerRemoteSynchronize.Flag.Bool("createBucketWithRandomSuffix", false, "add randomized suffix to bucket name to avoid conflicts") remoteSyncOptions.readChunkFromFiler = cmdFilerRemoteSynchronize.Flag.Bool("filerProxy", false, "read file chunks from filer instead of volume servers") remoteSyncOptions.debug = cmdFilerRemoteSynchronize.Flag.Bool("debug", false, "debug mode to print out filer updated remote files") remoteSyncOptions.timeAgo = cmdFilerRemoteSynchronize.Flag.Duration("timeAgo", 0, "start time before now. \"300ms\", \"1.5h\" or \"2h45m\". Valid time units are \"ns\", \"us\" (or \"µs\"), \"ms\", \"s\", \"m\", \"h\"") } var cmdFilerRemoteSynchronize = &Command{ - UsageLine: "filer.remote.sync -filer=<filerHost>:<filerPort> -dir=/mount/s3_on_cloud", - Short: "resumable continuously write back updates to remote storage if the directory is mounted to the remote storage", - Long: `resumable continuously write back updates to remote storage if the directory is mounted to the remote storage + UsageLine: "filer.remote.sync -dir=/mount/s3_on_cloud or -createBucketAt=clound1", + Short: "resumable continuously write back updates to remote storage", + Long: `resumable continuously write back updates to remote storage filer.remote.sync listens on filer update events. If any mounted remote file is updated, it will fetch the updated content, and write to the remote storage. + + There are two modes: + 1)Write back one mounted folder to remote storage + weed filer.remote.sync -dir=/mount/s3_on_cloud + 2)Watch /buckets folder and write back all changes. + Any new buckets will be created in this remote storage. + weed filer.remote.sync -createBucketAt=cloud1 + `, } @@ -73,13 +87,6 @@ func runFilerRemoteSynchronize(cmd *Command, args []string) bool { dir := *remoteSyncOptions.dir filerAddress := *remoteSyncOptions.filerAddress - // read filer remote storage mount mappings - _, _, remoteStorageMountLocation, storageConf, detectErr := filer.DetectMountInfo(grpcDialOption, filerAddress, dir) - if detectErr != nil { - fmt.Printf("read mount info: %v", detectErr) - return false - } - filerSource := &source.FilerSource{} filerSource.DoInitialize( filerAddress, @@ -88,156 +95,42 @@ func runFilerRemoteSynchronize(cmd *Command, args []string) bool { *remoteSyncOptions.readChunkFromFiler, ) - fmt.Printf("synchronize %s to remote storage...\n", dir) - util.RetryForever("filer.remote.sync "+dir, func() error { - return followUpdatesAndUploadToRemote(&remoteSyncOptions, filerSource, dir, storageConf, remoteStorageMountLocation) - }, func(err error) bool { - if err != nil { - glog.Errorf("synchronize %s: %v", dir, err) - } - return true - }) - - return true -} - -func followUpdatesAndUploadToRemote(option *RemoteSyncOptions, filerSource *source.FilerSource, mountedDir string, remoteStorage *remote_pb.RemoteConf, remoteStorageMountLocation *remote_pb.RemoteStorageLocation) error { - - dirHash := util.HashStringToLong(mountedDir) - - // 1. specified by timeAgo - // 2. last offset timestamp for this directory - // 3. directory creation time - var lastOffsetTs time.Time - if *option.timeAgo == 0 { - mountedDirEntry, err := filer_pb.GetEntry(option, util.FullPath(mountedDir)) - if err != nil { - return fmt.Errorf("lookup %s: %v", mountedDir, err) - } - - lastOffsetTsNs, err := getOffset(option.grpcDialOption, *option.filerAddress, RemoteSyncKeyPrefix, int32(dirHash)) - if mountedDirEntry != nil { - if err == nil && mountedDirEntry.Attributes.Crtime < lastOffsetTsNs/1000000 { - lastOffsetTs = time.Unix(0, lastOffsetTsNs) - glog.V(0).Infof("resume from %v", lastOffsetTs) - } else { - lastOffsetTs = time.Unix(mountedDirEntry.Attributes.Crtime, 0) - } - } else { - lastOffsetTs = time.Now() - } - } else { - lastOffsetTs = time.Now().Add(-*option.timeAgo) - } - - client, err := remote_storage.GetRemoteStorage(remoteStorage) - if err != nil { - return err - } + storageName := *remoteSyncOptions.createBucketAt + if storageName != "" { - eachEntryFunc := func(resp *filer_pb.SubscribeMetadataResponse) error { - message := resp.EventNotification - if message.OldEntry == nil && message.NewEntry == nil { - return nil - } - if message.OldEntry == nil && message.NewEntry != nil { - if !filer.HasData(message.NewEntry) { - return nil - } - glog.V(2).Infof("create: %+v", resp) - if !shouldSendToRemote(message.NewEntry) { - glog.V(2).Infof("skipping creating: %+v", resp) - return nil - } - dest := toRemoteStorageLocation(util.FullPath(mountedDir), util.NewFullPath(message.NewParentPath, message.NewEntry.Name), remoteStorageMountLocation) - if message.NewEntry.IsDirectory { - glog.V(0).Infof("mkdir %s", remote_storage.FormatLocation(dest)) - return client.WriteDirectory(dest, message.NewEntry) - } - glog.V(0).Infof("create %s", remote_storage.FormatLocation(dest)) - reader := filer.NewFileReader(filerSource, message.NewEntry) - remoteEntry, writeErr := client.WriteFile(dest, message.NewEntry, reader) - if writeErr != nil { - return writeErr - } - return updateLocalEntry(&remoteSyncOptions, message.NewParentPath, message.NewEntry, remoteEntry) - } - if message.OldEntry != nil && message.NewEntry == nil { - glog.V(2).Infof("delete: %+v", resp) - dest := toRemoteStorageLocation(util.FullPath(mountedDir), util.NewFullPath(resp.Directory, message.OldEntry.Name), remoteStorageMountLocation) - glog.V(0).Infof("delete %s", remote_storage.FormatLocation(dest)) - return client.DeleteFile(dest) - } - if message.OldEntry != nil && message.NewEntry != nil { - oldDest := toRemoteStorageLocation(util.FullPath(mountedDir), util.NewFullPath(resp.Directory, message.OldEntry.Name), remoteStorageMountLocation) - dest := toRemoteStorageLocation(util.FullPath(mountedDir), util.NewFullPath(message.NewParentPath, message.NewEntry.Name), remoteStorageMountLocation) - if !shouldSendToRemote(message.NewEntry) { - glog.V(2).Infof("skipping updating: %+v", resp) - return nil - } - if message.NewEntry.IsDirectory { - return client.WriteDirectory(dest, message.NewEntry) - } - if resp.Directory == message.NewParentPath && message.OldEntry.Name == message.NewEntry.Name { - if filer.IsSameData(message.OldEntry, message.NewEntry) { - glog.V(2).Infof("update meta: %+v", resp) - return client.UpdateFileMetadata(dest, message.OldEntry, message.NewEntry) - } - } - glog.V(2).Infof("update: %+v", resp) - glog.V(0).Infof("delete %s", remote_storage.FormatLocation(oldDest)) - if err := client.DeleteFile(oldDest); err != nil { + remoteSyncOptions.bucketsDir = "/buckets" + // check buckets again + remoteSyncOptions.WithFilerClient(func(filerClient filer_pb.SeaweedFilerClient) error { + resp, err := filerClient.GetFilerConfiguration(context.Background(), &filer_pb.GetFilerConfigurationRequest{}) + if err != nil { return err } - reader := filer.NewFileReader(filerSource, message.NewEntry) - glog.V(0).Infof("create %s", remote_storage.FormatLocation(dest)) - remoteEntry, writeErr := client.WriteFile(dest, message.NewEntry, reader) - if writeErr != nil { - return writeErr - } - return updateLocalEntry(&remoteSyncOptions, message.NewParentPath, message.NewEntry, remoteEntry) - } - - return nil - } - - processEventFnWithOffset := pb.AddOffsetFunc(eachEntryFunc, 3*time.Second, func(counter int64, lastTsNs int64) error { - lastTime := time.Unix(0, lastTsNs) - glog.V(0).Infof("remote sync %s progressed to %v %0.2f/sec", *option.filerAddress, lastTime, float64(counter)/float64(3)) - return setOffset(option.grpcDialOption, *option.filerAddress, RemoteSyncKeyPrefix, int32(dirHash), lastTsNs) - }) - - return pb.FollowMetadata(*option.filerAddress, option.grpcDialOption, - "filer.remote.sync", mountedDir, lastOffsetTs.UnixNano(), 0, processEventFnWithOffset, false) -} + remoteSyncOptions.bucketsDir = resp.DirBuckets + return nil + }) -func toRemoteStorageLocation(mountDir, sourcePath util.FullPath, remoteMountLocation *remote_pb.RemoteStorageLocation) *remote_pb.RemoteStorageLocation { - source := string(sourcePath[len(mountDir):]) - dest := util.FullPath(remoteMountLocation.Path).Child(source) - return &remote_pb.RemoteStorageLocation{ - Name: remoteMountLocation.Name, - Bucket: remoteMountLocation.Bucket, - Path: string(dest), + fmt.Printf("synchronize %s, default new bucket creation in %s ...\n", remoteSyncOptions.bucketsDir, storageName) + util.RetryForever("filer.remote.sync buckets "+storageName, func() error { + return remoteSyncOptions.followBucketUpdatesAndUploadToRemote(filerSource) + }, func(err error) bool { + if err != nil { + glog.Errorf("synchronize %s to %s: %v", remoteSyncOptions.bucketsDir, storageName, err) + } + return true + }) } -} -func shouldSendToRemote(entry *filer_pb.Entry) bool { - if entry.RemoteEntry == nil { - return true - } - if entry.RemoteEntry.LastLocalSyncTsNs/1e9 < entry.Attributes.Mtime { - return true + if dir != "" { + fmt.Printf("synchronize %s to remote storage...\n", dir) + util.RetryForever("filer.remote.sync "+dir, func() error { + return followUpdatesAndUploadToRemote(&remoteSyncOptions, filerSource, dir) + }, func(err error) bool { + if err != nil { + glog.Errorf("synchronize %s: %v", dir, err) + } + return true + }) } - return false -} -func updateLocalEntry(filerClient filer_pb.FilerClient, dir string, entry *filer_pb.Entry, remoteEntry *filer_pb.RemoteEntry) error { - entry.RemoteEntry = remoteEntry - return filerClient.WithFilerClient(func(client filer_pb.SeaweedFilerClient) error { - _, err := client.UpdateEntry(context.Background(), &filer_pb.UpdateEntryRequest{ - Directory: dir, - Entry: entry, - }) - return err - }) + return true } diff --git a/weed/command/filer_remote_sync_buckets.go b/weed/command/filer_remote_sync_buckets.go new file mode 100644 index 000000000..4059fd228 --- /dev/null +++ b/weed/command/filer_remote_sync_buckets.go @@ -0,0 +1,363 @@ +package command + +import ( + "fmt" + "github.com/chrislusf/seaweedfs/weed/filer" + "github.com/chrislusf/seaweedfs/weed/glog" + "github.com/chrislusf/seaweedfs/weed/pb" + "github.com/chrislusf/seaweedfs/weed/pb/filer_pb" + "github.com/chrislusf/seaweedfs/weed/pb/remote_pb" + "github.com/chrislusf/seaweedfs/weed/remote_storage" + "github.com/chrislusf/seaweedfs/weed/replication/source" + "github.com/chrislusf/seaweedfs/weed/util" + "github.com/golang/protobuf/proto" + "math" + "math/rand" + "strings" + "time" +) + +func (option *RemoteSyncOptions) followBucketUpdatesAndUploadToRemote(filerSource *source.FilerSource) error { + + // read filer remote storage mount mappings + if detectErr := option.collectRemoteStorageConf(); detectErr != nil { + return fmt.Errorf("read mount info: %v", detectErr) + } + + eachEntryFunc, err := option.makeBucketedEventProcessor(filerSource) + if err != nil { + return err + } + + processEventFnWithOffset := pb.AddOffsetFunc(eachEntryFunc, 3*time.Second, func(counter int64, lastTsNs int64) error { + lastTime := time.Unix(0, lastTsNs) + glog.V(0).Infof("remote sync %s progressed to %v %0.2f/sec", *option.filerAddress, lastTime, float64(counter)/float64(3)) + return remote_storage.SetSyncOffset(option.grpcDialOption, *option.filerAddress, option.bucketsDir, lastTsNs) + }) + + lastOffsetTs := collectLastSyncOffset(option, option.bucketsDir) + + return pb.FollowMetadata(*option.filerAddress, option.grpcDialOption, "filer.remote.sync", + option.bucketsDir, []string{filer.DirectoryEtcRemote}, lastOffsetTs.UnixNano(), 0, processEventFnWithOffset, false) +} + +func (option *RemoteSyncOptions) makeBucketedEventProcessor(filerSource *source.FilerSource) (pb.ProcessMetadataFunc, error) { + + handleCreateBucket := func(entry *filer_pb.Entry) error { + if !entry.IsDirectory { + return nil + } + remoteConf, found := option.remoteConfs[*option.createBucketAt] + if !found { + return fmt.Errorf("un-configured remote storage %s", *option.createBucketAt) + } + + client, err := remote_storage.GetRemoteStorage(remoteConf) + if err != nil { + return err + } + + bucketName := strings.ToLower(entry.Name) + if *option.createBucketRandomSuffix { + // https://docs.aws.amazon.com/AmazonS3/latest/userguide/bucketnamingrules.html + if len(bucketName)+5 > 63 { + bucketName = bucketName[:58] + } + bucketName = fmt.Sprintf("%s-%4d", bucketName, rand.Uint32()%10000) + } + + glog.V(0).Infof("create bucket %s", bucketName) + if err := client.CreateBucket(bucketName); err != nil { + return err + } + + bucketPath := util.FullPath(option.bucketsDir).Child(entry.Name) + remoteLocation := &remote_pb.RemoteStorageLocation{ + Name: *option.createBucketAt, + Bucket: bucketName, + Path: "/", + } + + // need to add new mapping here before getting upates from metadata tailing + option.mappings.Mappings[string(bucketPath)] = remoteLocation + + return filer.InsertMountMapping(option, string(bucketPath), remoteLocation) + + } + handleDeleteBucket := func(entry *filer_pb.Entry) error { + if !entry.IsDirectory { + return nil + } + + client, remoteStorageMountLocation, err := option.findRemoteStorageClient(entry.Name) + if err != nil { + return err + } + + glog.V(0).Infof("delete remote bucket %s", remoteStorageMountLocation.Bucket) + if err := client.DeleteBucket(remoteStorageMountLocation.Bucket); err != nil { + return err + } + + bucketPath := util.FullPath(option.bucketsDir).Child(entry.Name) + + return filer.DeleteMountMapping(option, string(bucketPath)) + } + + handleEtcRemoteChanges := func(resp *filer_pb.SubscribeMetadataResponse) error { + message := resp.EventNotification + if message.NewEntry != nil { + // update + if message.NewEntry.Name == filer.REMOTE_STORAGE_MOUNT_FILE { + newMappings, readErr := filer.UnmarshalRemoteStorageMappings(message.NewEntry.Content) + if readErr != nil { + return fmt.Errorf("unmarshal mappings: %v", readErr) + } + option.mappings = newMappings + } + if strings.HasSuffix(message.NewEntry.Name, filer.REMOTE_STORAGE_CONF_SUFFIX) { + conf := &remote_pb.RemoteConf{} + if err := proto.Unmarshal(message.NewEntry.Content, conf); err != nil { + return fmt.Errorf("unmarshal %s/%s: %v", filer.DirectoryEtcRemote, message.NewEntry.Name, err) + } + option.remoteConfs[conf.Name] = conf + } + } else if message.OldEntry != nil { + // deletion + if strings.HasSuffix(message.OldEntry.Name, filer.REMOTE_STORAGE_CONF_SUFFIX) { + conf := &remote_pb.RemoteConf{} + if err := proto.Unmarshal(message.OldEntry.Content, conf); err != nil { + return fmt.Errorf("unmarshal %s/%s: %v", filer.DirectoryEtcRemote, message.OldEntry.Name, err) + } + delete(option.remoteConfs, conf.Name) + } + } + + return nil + } + + eachEntryFunc := func(resp *filer_pb.SubscribeMetadataResponse) error { + message := resp.EventNotification + if strings.HasPrefix(resp.Directory, filer.DirectoryEtcRemote) { + return handleEtcRemoteChanges(resp) + } + + if message.OldEntry == nil && message.NewEntry == nil { + return nil + } + if message.OldEntry == nil && message.NewEntry != nil { + if message.NewParentPath == option.bucketsDir { + return handleCreateBucket(message.NewEntry) + } + if !filer.HasData(message.NewEntry) { + return nil + } + bucket, remoteStorageMountLocation, remoteStorage, ok := option.detectBucketInfo(message.NewParentPath) + if !ok { + return nil + } + client, err := remote_storage.GetRemoteStorage(remoteStorage) + if err != nil { + return err + } + glog.V(2).Infof("create: %+v", resp) + if !shouldSendToRemote(message.NewEntry) { + glog.V(2).Infof("skipping creating: %+v", resp) + return nil + } + dest := toRemoteStorageLocation(bucket, util.NewFullPath(message.NewParentPath, message.NewEntry.Name), remoteStorageMountLocation) + if message.NewEntry.IsDirectory { + glog.V(0).Infof("mkdir %s", remote_storage.FormatLocation(dest)) + return client.WriteDirectory(dest, message.NewEntry) + } + glog.V(0).Infof("create %s", remote_storage.FormatLocation(dest)) + reader := filer.NewFileReader(filerSource, message.NewEntry) + remoteEntry, writeErr := client.WriteFile(dest, message.NewEntry, reader) + if writeErr != nil { + return writeErr + } + return updateLocalEntry(&remoteSyncOptions, message.NewParentPath, message.NewEntry, remoteEntry) + } + if message.OldEntry != nil && message.NewEntry == nil { + if resp.Directory == option.bucketsDir { + return handleDeleteBucket(message.OldEntry) + } + bucket, remoteStorageMountLocation, remoteStorage, ok := option.detectBucketInfo(resp.Directory) + if !ok { + return nil + } + client, err := remote_storage.GetRemoteStorage(remoteStorage) + if err != nil { + return err + } + glog.V(2).Infof("delete: %+v", resp) + dest := toRemoteStorageLocation(bucket, util.NewFullPath(resp.Directory, message.OldEntry.Name), remoteStorageMountLocation) + if message.OldEntry.IsDirectory { + glog.V(0).Infof("rmdir %s", remote_storage.FormatLocation(dest)) + return client.RemoveDirectory(dest) + } + glog.V(0).Infof("delete %s", remote_storage.FormatLocation(dest)) + return client.DeleteFile(dest) + } + if message.OldEntry != nil && message.NewEntry != nil { + if resp.Directory == option.bucketsDir { + if message.NewParentPath == option.bucketsDir { + if message.OldEntry.Name == message.NewEntry.Name { + return nil + } + if err := handleCreateBucket(message.NewEntry); err != nil { + return err + } + if err := handleDeleteBucket(message.OldEntry); err != nil { + return err + } + } + } + oldBucket, oldRemoteStorageMountLocation, oldRemoteStorage, oldOk := option.detectBucketInfo(resp.Directory) + newBucket, newRemoteStorageMountLocation, newRemoteStorage, newOk := option.detectBucketInfo(message.NewParentPath) + if oldOk && newOk { + if !shouldSendToRemote(message.NewEntry) { + glog.V(2).Infof("skipping updating: %+v", resp) + return nil + } + client, err := remote_storage.GetRemoteStorage(oldRemoteStorage) + if err != nil { + return err + } + if resp.Directory == message.NewParentPath && message.OldEntry.Name == message.NewEntry.Name { + // update the same entry + if message.NewEntry.IsDirectory { + // update directory property + return nil + } + if filer.IsSameData(message.OldEntry, message.NewEntry) { + glog.V(2).Infof("update meta: %+v", resp) + oldDest := toRemoteStorageLocation(oldBucket, util.NewFullPath(resp.Directory, message.OldEntry.Name), oldRemoteStorageMountLocation) + return client.UpdateFileMetadata(oldDest, message.OldEntry, message.NewEntry) + } else { + newDest := toRemoteStorageLocation(newBucket, util.NewFullPath(message.NewParentPath, message.NewEntry.Name), newRemoteStorageMountLocation) + reader := filer.NewFileReader(filerSource, message.NewEntry) + glog.V(0).Infof("create %s", remote_storage.FormatLocation(newDest)) + remoteEntry, writeErr := client.WriteFile(newDest, message.NewEntry, reader) + if writeErr != nil { + return writeErr + } + return updateLocalEntry(&remoteSyncOptions, message.NewParentPath, message.NewEntry, remoteEntry) + } + } + } + + // the following is entry rename + if oldOk { + client, err := remote_storage.GetRemoteStorage(oldRemoteStorage) + if err != nil { + return err + } + oldDest := toRemoteStorageLocation(oldBucket, util.NewFullPath(resp.Directory, message.OldEntry.Name), oldRemoteStorageMountLocation) + if message.OldEntry.IsDirectory { + return client.RemoveDirectory(oldDest) + } + glog.V(0).Infof("delete %s", remote_storage.FormatLocation(oldDest)) + if err := client.DeleteFile(oldDest); err != nil { + return err + } + } + if newOk { + if !shouldSendToRemote(message.NewEntry) { + glog.V(2).Infof("skipping updating: %+v", resp) + return nil + } + client, err := remote_storage.GetRemoteStorage(newRemoteStorage) + if err != nil { + return err + } + newDest := toRemoteStorageLocation(newBucket, util.NewFullPath(message.NewParentPath, message.NewEntry.Name), newRemoteStorageMountLocation) + if message.NewEntry.IsDirectory { + return client.WriteDirectory(newDest, message.NewEntry) + } + reader := filer.NewFileReader(filerSource, message.NewEntry) + glog.V(0).Infof("create %s", remote_storage.FormatLocation(newDest)) + remoteEntry, writeErr := client.WriteFile(newDest, message.NewEntry, reader) + if writeErr != nil { + return writeErr + } + return updateLocalEntry(&remoteSyncOptions, message.NewParentPath, message.NewEntry, remoteEntry) + } + } + + return nil + } + return eachEntryFunc, nil +} + +func (option *RemoteSyncOptions) findRemoteStorageClient(bucketName string) (client remote_storage.RemoteStorageClient, remoteStorageMountLocation *remote_pb.RemoteStorageLocation, err error) { + bucket := util.FullPath(option.bucketsDir).Child(bucketName) + + var isMounted bool + remoteStorageMountLocation, isMounted = option.mappings.Mappings[string(bucket)] + if !isMounted { + return nil, remoteStorageMountLocation, fmt.Errorf("%s is not mounted", bucket) + } + remoteConf, hasClient := option.remoteConfs[remoteStorageMountLocation.Name] + if !hasClient { + return nil, remoteStorageMountLocation, fmt.Errorf("%s mounted to un-configured %+v", bucket, remoteStorageMountLocation) + } + + client, err = remote_storage.GetRemoteStorage(remoteConf) + if err != nil { + return nil, remoteStorageMountLocation, err + } + return client, remoteStorageMountLocation, nil +} + +func (option *RemoteSyncOptions) detectBucketInfo(actualDir string) (bucket util.FullPath, remoteStorageMountLocation *remote_pb.RemoteStorageLocation, remoteConf *remote_pb.RemoteConf, ok bool) { + bucket, ok = extractBucketPath(option.bucketsDir, actualDir) + if !ok { + return "", nil, nil, false + } + var isMounted bool + remoteStorageMountLocation, isMounted = option.mappings.Mappings[string(bucket)] + if !isMounted { + glog.Warningf("%s is not mounted", bucket) + return "", nil, nil, false + } + var hasClient bool + remoteConf, hasClient = option.remoteConfs[remoteStorageMountLocation.Name] + if !hasClient { + glog.Warningf("%s mounted to un-configured %+v", bucket, remoteStorageMountLocation) + return "", nil, nil, false + } + return bucket, remoteStorageMountLocation, remoteConf, true +} + +func extractBucketPath(bucketsDir, dir string) (util.FullPath, bool) { + if !strings.HasPrefix(dir, bucketsDir+"/") { + return "", false + } + parts := strings.SplitN(dir[len(bucketsDir)+1:], "/", 2) + return util.FullPath(bucketsDir).Child(parts[0]), true +} + +func (option *RemoteSyncOptions) collectRemoteStorageConf() (err error) { + + if mappings, err := filer.ReadMountMappings(option.grpcDialOption, *option.filerAddress); err != nil { + return err + } else { + option.mappings = mappings + } + + option.remoteConfs = make(map[string]*remote_pb.RemoteConf) + err = filer_pb.List(option, filer.DirectoryEtcRemote, "", func(entry *filer_pb.Entry, isLast bool) error { + if !strings.HasSuffix(entry.Name, filer.REMOTE_STORAGE_CONF_SUFFIX) { + return nil + } + conf := &remote_pb.RemoteConf{} + if err := proto.Unmarshal(entry.Content, conf); err != nil { + return fmt.Errorf("unmarshal %s/%s: %v", filer.DirectoryEtcRemote, entry.Name, err) + } + option.remoteConfs[conf.Name] = conf + return nil + }, "", false, math.MaxUint32) + + return +} diff --git a/weed/command/filer_remote_sync_dir.go b/weed/command/filer_remote_sync_dir.go new file mode 100644 index 000000000..dc2e9a1fb --- /dev/null +++ b/weed/command/filer_remote_sync_dir.go @@ -0,0 +1,221 @@ +package command + +import ( + "context" + "fmt" + "github.com/chrislusf/seaweedfs/weed/filer" + "github.com/chrislusf/seaweedfs/weed/glog" + "github.com/chrislusf/seaweedfs/weed/pb" + "github.com/chrislusf/seaweedfs/weed/pb/filer_pb" + "github.com/chrislusf/seaweedfs/weed/pb/remote_pb" + "github.com/chrislusf/seaweedfs/weed/remote_storage" + "github.com/chrislusf/seaweedfs/weed/replication/source" + "github.com/chrislusf/seaweedfs/weed/util" + "github.com/golang/protobuf/proto" + "os" + "strings" + "time" +) + +func followUpdatesAndUploadToRemote(option *RemoteSyncOptions, filerSource *source.FilerSource, mountedDir string) error { + + // read filer remote storage mount mappings + _, _, remoteStorageMountLocation, remoteStorage, detectErr := filer.DetectMountInfo(option.grpcDialOption, *option.filerAddress, mountedDir) + if detectErr != nil { + return fmt.Errorf("read mount info: %v", detectErr) + } + + eachEntryFunc, err := makeEventProcessor(remoteStorage, mountedDir, remoteStorageMountLocation, filerSource) + if err != nil { + return err + } + + processEventFnWithOffset := pb.AddOffsetFunc(eachEntryFunc, 3*time.Second, func(counter int64, lastTsNs int64) error { + lastTime := time.Unix(0, lastTsNs) + glog.V(0).Infof("remote sync %s progressed to %v %0.2f/sec", *option.filerAddress, lastTime, float64(counter)/float64(3)) + return remote_storage.SetSyncOffset(option.grpcDialOption, *option.filerAddress, mountedDir, lastTsNs) + }) + + lastOffsetTs := collectLastSyncOffset(option, mountedDir) + + return pb.FollowMetadata(*option.filerAddress, option.grpcDialOption, "filer.remote.sync", + mountedDir, []string{filer.DirectoryEtcRemote}, lastOffsetTs.UnixNano(), 0, processEventFnWithOffset, false) +} + +func makeEventProcessor(remoteStorage *remote_pb.RemoteConf, mountedDir string, remoteStorageMountLocation *remote_pb.RemoteStorageLocation, filerSource *source.FilerSource) (pb.ProcessMetadataFunc, error) { + client, err := remote_storage.GetRemoteStorage(remoteStorage) + if err != nil { + return nil, err + } + + handleEtcRemoteChanges := func(resp *filer_pb.SubscribeMetadataResponse) error { + message := resp.EventNotification + if message.NewEntry == nil { + return nil + } + if message.NewEntry.Name == filer.REMOTE_STORAGE_MOUNT_FILE { + mappings, readErr := filer.UnmarshalRemoteStorageMappings(message.NewEntry.Content) + if readErr != nil { + return fmt.Errorf("unmarshal mappings: %v", readErr) + } + if remoteLoc, found := mappings.Mappings[mountedDir]; found { + if remoteStorageMountLocation.Bucket != remoteLoc.Bucket || remoteStorageMountLocation.Path != remoteLoc.Path { + glog.Fatalf("Unexpected mount changes %+v => %+v", remoteStorageMountLocation, remoteLoc) + } + } else { + glog.V(0).Infof("unmounted %s exiting ...", mountedDir) + os.Exit(0) + } + } + if message.NewEntry.Name == remoteStorage.Name+filer.REMOTE_STORAGE_CONF_SUFFIX { + conf := &remote_pb.RemoteConf{} + if err := proto.Unmarshal(message.NewEntry.Content, conf); err != nil { + return fmt.Errorf("unmarshal %s/%s: %v", filer.DirectoryEtcRemote, message.NewEntry.Name, err) + } + remoteStorage = conf + if newClient, err := remote_storage.GetRemoteStorage(remoteStorage); err == nil { + client = newClient + } else { + return err + } + } + + return nil + } + + eachEntryFunc := func(resp *filer_pb.SubscribeMetadataResponse) error { + message := resp.EventNotification + if strings.HasPrefix(resp.Directory, filer.DirectoryEtcRemote) { + return handleEtcRemoteChanges(resp) + } + + if message.OldEntry == nil && message.NewEntry == nil { + return nil + } + if message.OldEntry == nil && message.NewEntry != nil { + if !filer.HasData(message.NewEntry) { + return nil + } + glog.V(2).Infof("create: %+v", resp) + if !shouldSendToRemote(message.NewEntry) { + glog.V(2).Infof("skipping creating: %+v", resp) + return nil + } + dest := toRemoteStorageLocation(util.FullPath(mountedDir), util.NewFullPath(message.NewParentPath, message.NewEntry.Name), remoteStorageMountLocation) + if message.NewEntry.IsDirectory { + glog.V(0).Infof("mkdir %s", remote_storage.FormatLocation(dest)) + return client.WriteDirectory(dest, message.NewEntry) + } + glog.V(0).Infof("create %s", remote_storage.FormatLocation(dest)) + reader := filer.NewFileReader(filerSource, message.NewEntry) + remoteEntry, writeErr := client.WriteFile(dest, message.NewEntry, reader) + if writeErr != nil { + return writeErr + } + return updateLocalEntry(&remoteSyncOptions, message.NewParentPath, message.NewEntry, remoteEntry) + } + if message.OldEntry != nil && message.NewEntry == nil { + glog.V(2).Infof("delete: %+v", resp) + dest := toRemoteStorageLocation(util.FullPath(mountedDir), util.NewFullPath(resp.Directory, message.OldEntry.Name), remoteStorageMountLocation) + if message.OldEntry.IsDirectory { + glog.V(0).Infof("rmdir %s", remote_storage.FormatLocation(dest)) + return client.RemoveDirectory(dest) + } + glog.V(0).Infof("delete %s", remote_storage.FormatLocation(dest)) + return client.DeleteFile(dest) + } + if message.OldEntry != nil && message.NewEntry != nil { + oldDest := toRemoteStorageLocation(util.FullPath(mountedDir), util.NewFullPath(resp.Directory, message.OldEntry.Name), remoteStorageMountLocation) + dest := toRemoteStorageLocation(util.FullPath(mountedDir), util.NewFullPath(message.NewParentPath, message.NewEntry.Name), remoteStorageMountLocation) + if !shouldSendToRemote(message.NewEntry) { + glog.V(2).Infof("skipping updating: %+v", resp) + return nil + } + if message.NewEntry.IsDirectory { + return client.WriteDirectory(dest, message.NewEntry) + } + if resp.Directory == message.NewParentPath && message.OldEntry.Name == message.NewEntry.Name { + if filer.IsSameData(message.OldEntry, message.NewEntry) { + glog.V(2).Infof("update meta: %+v", resp) + return client.UpdateFileMetadata(dest, message.OldEntry, message.NewEntry) + } + } + glog.V(2).Infof("update: %+v", resp) + glog.V(0).Infof("delete %s", remote_storage.FormatLocation(oldDest)) + if err := client.DeleteFile(oldDest); err != nil { + return err + } + reader := filer.NewFileReader(filerSource, message.NewEntry) + glog.V(0).Infof("create %s", remote_storage.FormatLocation(dest)) + remoteEntry, writeErr := client.WriteFile(dest, message.NewEntry, reader) + if writeErr != nil { + return writeErr + } + return updateLocalEntry(&remoteSyncOptions, message.NewParentPath, message.NewEntry, remoteEntry) + } + + return nil + } + return eachEntryFunc, nil +} + +func collectLastSyncOffset(option *RemoteSyncOptions, mountedDir string) time.Time { + // 1. specified by timeAgo + // 2. last offset timestamp for this directory + // 3. directory creation time + var lastOffsetTs time.Time + if *option.timeAgo == 0 { + mountedDirEntry, err := filer_pb.GetEntry(option, util.FullPath(mountedDir)) + if err != nil { + glog.V(0).Infof("get mounted directory %s: %v", mountedDir, err) + return time.Now() + } + + lastOffsetTsNs, err := remote_storage.GetSyncOffset(option.grpcDialOption, *option.filerAddress, mountedDir) + if mountedDirEntry != nil { + if err == nil && mountedDirEntry.Attributes.Crtime < lastOffsetTsNs/1000000 { + lastOffsetTs = time.Unix(0, lastOffsetTsNs) + glog.V(0).Infof("resume from %v", lastOffsetTs) + } else { + lastOffsetTs = time.Unix(mountedDirEntry.Attributes.Crtime, 0) + } + } else { + lastOffsetTs = time.Now() + } + } else { + lastOffsetTs = time.Now().Add(-*option.timeAgo) + } + return lastOffsetTs +} + +func toRemoteStorageLocation(mountDir, sourcePath util.FullPath, remoteMountLocation *remote_pb.RemoteStorageLocation) *remote_pb.RemoteStorageLocation { + source := string(sourcePath[len(mountDir):]) + dest := util.FullPath(remoteMountLocation.Path).Child(source) + return &remote_pb.RemoteStorageLocation{ + Name: remoteMountLocation.Name, + Bucket: remoteMountLocation.Bucket, + Path: string(dest), + } +} + +func shouldSendToRemote(entry *filer_pb.Entry) bool { + if entry.RemoteEntry == nil { + return true + } + if entry.RemoteEntry.RemoteMtime < entry.Attributes.Mtime { + return true + } + return false +} + +func updateLocalEntry(filerClient filer_pb.FilerClient, dir string, entry *filer_pb.Entry, remoteEntry *filer_pb.RemoteEntry) error { + remoteEntry.LastLocalSyncTsNs = time.Now().UnixNano() + entry.RemoteEntry = remoteEntry + return filerClient.WithFilerClient(func(client filer_pb.SeaweedFilerClient) error { + _, err := client.UpdateEntry(context.Background(), &filer_pb.UpdateEntryRequest{ + Directory: dir, + Entry: entry, + }) + return err + }) +} diff --git a/weed/command/filer_sync.go b/weed/command/filer_sync.go index 5440811dd..33efdb2b7 100644 --- a/weed/command/filer_sync.go +++ b/weed/command/filer_sync.go @@ -171,7 +171,7 @@ func doSubscribeFilerMetaChanges(grpcDialOption grpc.DialOption, sourceFiler, so }) return pb.FollowMetadata(sourceFiler, grpcDialOption, "syncTo_"+targetFiler, - sourcePath, sourceFilerOffsetTsNs, targetFilerSignature, processEventFnWithOffset, false) + sourcePath, nil, sourceFilerOffsetTsNs, targetFilerSignature, processEventFnWithOffset, false) } diff --git a/weed/command/filer_sync_std.go b/weed/command/filer_sync_std.go index 63851eaf8..1f9b6fa14 100644 --- a/weed/command/filer_sync_std.go +++ b/weed/command/filer_sync_std.go @@ -1,3 +1,4 @@ +//go:build !windows // +build !windows package command diff --git a/weed/command/imports.go b/weed/command/imports.go index ee35ed56f..a2f59189f 100644 --- a/weed/command/imports.go +++ b/weed/command/imports.go @@ -3,9 +3,10 @@ package command import ( _ "net/http/pprof" - _ "github.com/chrislusf/seaweedfs/weed/remote_storage/s3" - _ "github.com/chrislusf/seaweedfs/weed/remote_storage/gcs" _ "github.com/chrislusf/seaweedfs/weed/remote_storage/azure" + _ "github.com/chrislusf/seaweedfs/weed/remote_storage/gcs" + _ "github.com/chrislusf/seaweedfs/weed/remote_storage/hdfs" + _ "github.com/chrislusf/seaweedfs/weed/remote_storage/s3" _ "github.com/chrislusf/seaweedfs/weed/replication/sink/azuresink" _ "github.com/chrislusf/seaweedfs/weed/replication/sink/b2sink" @@ -29,4 +30,5 @@ import ( _ "github.com/chrislusf/seaweedfs/weed/filer/redis" _ "github.com/chrislusf/seaweedfs/weed/filer/redis2" _ "github.com/chrislusf/seaweedfs/weed/filer/sqlite" + _ "github.com/chrislusf/seaweedfs/weed/filer/tikv" ) diff --git a/weed/command/mount_notsupported.go b/weed/command/mount_notsupported.go index f3c0de3d6..1e5c9f53d 100644 --- a/weed/command/mount_notsupported.go +++ b/weed/command/mount_notsupported.go @@ -1,6 +1,5 @@ -// +build !linux -// +build !darwin -// +build !freebsd +//go:build !linux && !darwin && !freebsd +// +build !linux,!darwin,!freebsd package command diff --git a/weed/command/mount_std.go b/weed/command/mount_std.go index cdf340067..e393e5894 100644 --- a/weed/command/mount_std.go +++ b/weed/command/mount_std.go @@ -1,3 +1,4 @@ +//go:build linux || darwin || freebsd // +build linux darwin freebsd package command diff --git a/weed/command/scaffold/filer.toml b/weed/command/scaffold/filer.toml index 9e9258865..c8cb70131 100644 --- a/weed/command/scaffold/filer.toml +++ b/weed/command/scaffold/filer.toml @@ -230,3 +230,11 @@ location = "/tmp/" address = "localhost:6379" password = "" database = 1 + +[tikv] +enabled = false +# If you have many pd address, use ',' split then: +# pdaddrs = "pdhost1:2379, pdhost2:2379, pdhost3:2379" +pdaddrs = "localhost:2379" +# Concurrency for TiKV delete range +deleterange_concurrency = 1 |
