diff options
| author | Damiano Albani <damiano.albani@gmail.com> | 2023-04-18 23:12:12 +0200 |
|---|---|---|
| committer | GitHub <noreply@github.com> | 2023-04-18 14:12:12 -0700 |
| commit | 9f55c7c90e51621f2bc298be159b3fcc12aaddbc (patch) | |
| tree | 859f8185977cab571790c708a87e2d1603e1bf9e /weed/storage/backend/rclone_backend/rclone_backend.go | |
| parent | a408b46d95e7696aca8175c8f1780316a6f9d4ca (diff) | |
| download | seaweedfs-9f55c7c90e51621f2bc298be159b3fcc12aaddbc.tar.xz seaweedfs-9f55c7c90e51621f2bc298be159b3fcc12aaddbc.zip | |
Rclone storage backend (#4402)
* Add Rclone storage backend
* Support templating the name of files stored via Rclone
* Enable Rclone accounting
* Remove redundant type conversion
* Provide progress information for Rclone download/upload operations
* Log error when Rclone can't instantiate filesystem
* Remove filename templating functionality for Rclone storage
To (maybe) be later reintroduced as a generic functionality for all
storage backends.
* Remove S3 specific check
* Move Rclone config initialisation to init() method
Diffstat (limited to 'weed/storage/backend/rclone_backend/rclone_backend.go')
| -rw-r--r-- | weed/storage/backend/rclone_backend/rclone_backend.go | 270 |
1 files changed, 270 insertions, 0 deletions
diff --git a/weed/storage/backend/rclone_backend/rclone_backend.go b/weed/storage/backend/rclone_backend/rclone_backend.go new file mode 100644 index 000000000..825640a3d --- /dev/null +++ b/weed/storage/backend/rclone_backend/rclone_backend.go @@ -0,0 +1,270 @@ +package rclone_backend + +import ( + "context" + "fmt" + "github.com/rclone/rclone/fs/config/configfile" + "github.com/seaweedfs/seaweedfs/weed/util" + "io" + "os" + "time" + + "github.com/google/uuid" + + _ "github.com/rclone/rclone/backend/all" + "github.com/rclone/rclone/fs" + "github.com/rclone/rclone/fs/accounting" + "github.com/rclone/rclone/fs/object" + + "github.com/seaweedfs/seaweedfs/weed/glog" + "github.com/seaweedfs/seaweedfs/weed/pb/volume_server_pb" + "github.com/seaweedfs/seaweedfs/weed/storage/backend" +) + +func init() { + backend.BackendStorageFactories["rclone"] = &RcloneBackendFactory{} + configfile.Install() +} + +type RcloneBackendFactory struct { +} + +func (factory *RcloneBackendFactory) StorageType() backend.StorageType { + return "rclone" +} + +func (factory *RcloneBackendFactory) BuildStorage(configuration backend.StringProperties, configPrefix string, id string) (backend.BackendStorage, error) { + return newRcloneBackendStorage(configuration, configPrefix, id) +} + +type RcloneBackendStorage struct { + id string + remoteName string + fs fs.Fs +} + +func newRcloneBackendStorage(configuration backend.StringProperties, configPrefix string, id string) (s *RcloneBackendStorage, err error) { + s = &RcloneBackendStorage{} + s.id = id + s.remoteName = configuration.GetString(configPrefix + "remote_name") + + ctx := context.TODO() + accounting.Start(ctx) + + fsPath := fmt.Sprintf("%s:", s.remoteName) + s.fs, err = fs.NewFs(ctx, fsPath) + if err != nil { + glog.Errorf("failed to instantiate Rclone filesystem: %s", err) + return + } + + glog.V(0).Infof("created backend storage rclone.%s for remote name %s", s.id, s.remoteName) + return +} + +func (s *RcloneBackendStorage) ToProperties() map[string]string { + m := make(map[string]string) + m["remote_name"] = s.remoteName + return m +} + +func (s *RcloneBackendStorage) NewStorageFile(key string, tierInfo *volume_server_pb.VolumeInfo) backend.BackendStorageFile { + f := &RcloneBackendStorageFile{ + backendStorage: s, + key: key, + tierInfo: tierInfo, + } + + return f +} + +func (s *RcloneBackendStorage) CopyFile(f *os.File, fn func(progressed int64, percentage float32) error) (key string, size int64, err error) { + randomUuid, err := uuid.NewRandom() + if err != nil { + return key, 0, err + } + key = randomUuid.String() + + glog.V(1).Infof("copy dat file of %s to remote rclone.%s as %s", f.Name(), s.id, key) + + util.Retry("upload via Rclone", func() error { + size, err = uploadViaRclone(s.fs, f.Name(), key, fn) + return err + }) + + return +} + +func uploadViaRclone(rfs fs.Fs, filename string, key string, fn func(progressed int64, percentage float32) error) (fileSize int64, err error) { + ctx := context.TODO() + + file, err := os.Open(filename) + defer func(file *os.File) { + err := file.Close() + if err != nil { + return + } + }(file) + + if err != nil { + return 0, err + } + + stat, err := file.Stat() + if err != nil { + return 0, err + } + + info := object.NewStaticObjectInfo(key, stat.ModTime(), stat.Size(), true, nil, rfs) + + tr := accounting.NewStats(ctx).NewTransfer(info) + defer tr.Done(ctx, err) + acc := tr.Account(ctx, file) + pr := ProgressReader{acc: acc, tr: tr, fn: fn} + + obj, err := rfs.Put(ctx, &pr, info) + if err != nil { + return 0, err + } + + return obj.Size(), err +} + +func (s *RcloneBackendStorage) DownloadFile(filename string, key string, fn func(progressed int64, percentage float32) error) (size int64, err error) { + glog.V(1).Infof("download dat file of %s from remote rclone.%s as %s", filename, s.id, key) + + util.Retry("download via Rclone", func() error { + size, err = downloadViaRclone(s.fs, filename, key, fn) + return err + }) + + return +} + +func downloadViaRclone(fs fs.Fs, filename string, key string, fn func(progressed int64, percentage float32) error) (fileSize int64, err error) { + ctx := context.TODO() + + obj, err := fs.NewObject(ctx, key) + if err != nil { + return 0, err + } + + rc, err := obj.Open(ctx) + defer func(rc io.ReadCloser) { + err := rc.Close() + if err != nil { + return + } + }(rc) + + if err != nil { + return 0, err + } + + file, err := os.Create(filename) + defer func(file *os.File) { + err := file.Close() + if err != nil { + return + } + }(file) + + tr := accounting.NewStats(ctx).NewTransfer(obj) + defer tr.Done(ctx, err) + acc := tr.Account(ctx, rc) + pr := ProgressReader{acc: acc, tr: tr, fn: fn} + + written, err := io.Copy(file, &pr) + if err != nil { + return 0, err + } + + return written, nil +} + +func (s *RcloneBackendStorage) DeleteFile(key string) (err error) { + glog.V(1).Infof("delete dat file %s from remote", key) + + util.Retry("delete via Rclone", func() error { + err = deleteViaRclone(s.fs, key) + return err + }) + + return +} + +func deleteViaRclone(fs fs.Fs, key string) (err error) { + ctx := context.TODO() + + obj, err := fs.NewObject(ctx, key) + if err != nil { + return err + } + + return obj.Remove(ctx) +} + +type RcloneBackendStorageFile struct { + backendStorage *RcloneBackendStorage + key string + tierInfo *volume_server_pb.VolumeInfo +} + +func (rcloneBackendStorageFile RcloneBackendStorageFile) ReadAt(p []byte, off int64) (n int, err error) { + ctx := context.TODO() + + obj, err := rcloneBackendStorageFile.backendStorage.fs.NewObject(ctx, rcloneBackendStorageFile.key) + if err != nil { + return 0, err + } + + opt := fs.RangeOption{Start: off, End: off + int64(len(p)) - 1} + + rc, err := obj.Open(ctx, &opt) + defer func(rc io.ReadCloser) { + err := rc.Close() + if err != nil { + return + } + }(rc) + + if err != nil { + return 0, err + } + + return io.ReadFull(rc, p) +} + +func (rcloneBackendStorageFile RcloneBackendStorageFile) WriteAt(p []byte, off int64) (n int, err error) { + panic("not implemented") +} + +func (rcloneBackendStorageFile RcloneBackendStorageFile) Truncate(off int64) error { + panic("not implemented") +} + +func (rcloneBackendStorageFile RcloneBackendStorageFile) Close() error { + return nil +} + +func (rcloneBackendStorageFile RcloneBackendStorageFile) GetStat() (datSize int64, modTime time.Time, err error) { + files := rcloneBackendStorageFile.tierInfo.GetFiles() + + if len(files) == 0 { + err = fmt.Errorf("remote file info not found") + return + } + + datSize = int64(files[0].FileSize) + modTime = time.Unix(int64(files[0].ModifiedTime), 0) + + return +} + +func (rcloneBackendStorageFile RcloneBackendStorageFile) Name() string { + return rcloneBackendStorageFile.key +} + +func (rcloneBackendStorageFile RcloneBackendStorageFile) Sync() error { + return nil +} |
