aboutsummaryrefslogtreecommitdiff
path: root/weed/remote_storage
diff options
context:
space:
mode:
authorChris Lu <chris.lu@gmail.com>2021-07-26 22:53:44 -0700
committerChris Lu <chris.lu@gmail.com>2021-07-26 22:53:44 -0700
commit99b599aa8a674ccd584d612e8e871fdca7670620 (patch)
treee2d16070d21c4ecfa68509b095d21c7232b348fa /weed/remote_storage
parent35f70c51b0ff1ca71fec6c0194086329ef9e3e86 (diff)
downloadseaweedfs-99b599aa8a674ccd584d612e8e871fdca7670620.tar.xz
seaweedfs-99b599aa8a674ccd584d612e8e871fdca7670620.zip
remote.mount
Diffstat (limited to 'weed/remote_storage')
-rw-r--r--weed/remote_storage/remote_storage.go50
-rw-r--r--weed/remote_storage/s3/s3_storage_client.go94
2 files changed, 144 insertions, 0 deletions
diff --git a/weed/remote_storage/remote_storage.go b/weed/remote_storage/remote_storage.go
new file mode 100644
index 000000000..79f71001b
--- /dev/null
+++ b/weed/remote_storage/remote_storage.go
@@ -0,0 +1,50 @@
+package remote_storage
+
+import (
+ "fmt"
+ "github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
+ "sync"
+)
+
+type VisitFunc func(dir string, name string, isDirectory bool, remoteEntry *filer_pb.RemoteEntry) error
+
+type RemoteStorageClient interface {
+ Traverse(rootDir string, visitFn VisitFunc) error
+}
+
+type RemoteStorageClientMaker interface {
+ Make(remoteConf *filer_pb.RemoteConf) (RemoteStorageClient, error)
+}
+
+var (
+ RemoteStorageClientMakers = make(map[string]RemoteStorageClientMaker)
+ remoteStorageClients = make(map[string]RemoteStorageClient)
+ remoteStorageClientsLock sync.Mutex
+)
+
+func makeRemoteStorageClient(remoteConf *filer_pb.RemoteConf) (RemoteStorageClient, error) {
+ maker, found := RemoteStorageClientMakers[remoteConf.Type]
+ if !found {
+ return nil, fmt.Errorf("remote storage type %s not found", remoteConf.Type)
+ }
+ return maker.Make(remoteConf)
+}
+
+func GetRemoteStorage(remoteConf *filer_pb.RemoteConf) (RemoteStorageClient, error) {
+ remoteStorageClientsLock.Lock()
+ defer remoteStorageClientsLock.Unlock()
+
+ existingRemoteStorageClient, found := remoteStorageClients[remoteConf.Name]
+ if found {
+ return existingRemoteStorageClient, nil
+ }
+
+ newRemoteStorageClient, err := makeRemoteStorageClient(remoteConf)
+ if err != nil {
+ return nil, fmt.Errorf("make remote storage client %s: %v", remoteConf.Name, err)
+ }
+
+ remoteStorageClients[remoteConf.Name] = newRemoteStorageClient
+
+ return newRemoteStorageClient, nil
+}
diff --git a/weed/remote_storage/s3/s3_storage_client.go b/weed/remote_storage/s3/s3_storage_client.go
new file mode 100644
index 000000000..625d6e0e6
--- /dev/null
+++ b/weed/remote_storage/s3/s3_storage_client.go
@@ -0,0 +1,94 @@
+package s3
+
+import (
+ "fmt"
+ "github.com/aws/aws-sdk-go/aws"
+ "github.com/aws/aws-sdk-go/aws/credentials"
+ "github.com/aws/aws-sdk-go/aws/session"
+ "github.com/aws/aws-sdk-go/service/s3"
+ "github.com/aws/aws-sdk-go/service/s3/s3iface"
+ "github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
+ "github.com/chrislusf/seaweedfs/weed/remote_storage"
+ "github.com/chrislusf/seaweedfs/weed/util"
+ "strings"
+)
+
+func init() {
+ remote_storage.RemoteStorageClientMakers["s3"] = new(s3RemoteStorageMaker)
+}
+
+type s3RemoteStorageMaker struct{}
+
+func (s s3RemoteStorageMaker) Make(conf *filer_pb.RemoteConf) (remote_storage.RemoteStorageClient, error) {
+ client := &s3RemoteStorageClient{
+ conf: conf,
+ }
+ config := &aws.Config{
+ Region: aws.String(conf.S3Region),
+ Endpoint: aws.String(conf.S3Endpoint),
+ S3ForcePathStyle: aws.Bool(true),
+ }
+ if conf.S3AccessKey != "" && conf.S3SecretKey != "" {
+ config.Credentials = credentials.NewStaticCredentials(conf.S3AccessKey, conf.S3SecretKey, "")
+ }
+
+ sess, err := session.NewSession(config)
+ if err != nil {
+ return nil, fmt.Errorf("create aws session: %v", err)
+ }
+ client.conn = s3.New(sess)
+ return client, nil
+}
+
+type s3RemoteStorageClient struct {
+ conf *filer_pb.RemoteConf
+ conn s3iface.S3API
+}
+
+func (s s3RemoteStorageClient) Traverse(rootDir string, visitFn remote_storage.VisitFunc) (err error) {
+ if !strings.HasPrefix(rootDir, "/") {
+ return fmt.Errorf("remote directory %s should start with /", rootDir)
+ }
+ bucket := strings.Split(rootDir[1:], "/")[0]
+ prefix := rootDir[1+len(bucket):]
+ if len(prefix) > 0 && strings.HasPrefix(prefix, "/") {
+ prefix = prefix[1:]
+ }
+
+ listInput := &s3.ListObjectsV2Input{
+ Bucket: aws.String(bucket),
+ ContinuationToken: nil,
+ Delimiter: nil, // not aws.String("/"), iterate through all entries
+ EncodingType: nil,
+ ExpectedBucketOwner: nil,
+ FetchOwner: nil,
+ MaxKeys: nil, // aws.Int64(1000),
+ Prefix: aws.String(prefix),
+ RequestPayer: nil,
+ StartAfter: nil,
+ }
+ isLastPage := false
+ for !isLastPage && err == nil {
+ listErr := s.conn.ListObjectsV2Pages(listInput, func(page *s3.ListObjectsV2Output, lastPage bool) bool {
+ for _, content := range page.Contents {
+ key := *content.Key
+ dir, name := util.FullPath("/" + key).DirAndName()
+ if err := visitFn(dir, name, false, &filer_pb.RemoteEntry{
+ LastModifiedAt: (*content.LastModified).Unix(),
+ Size: *content.Size,
+ ETag: *content.ETag,
+ StorageName: s.conf.Name,
+ }); err != nil {
+ return false
+ }
+ }
+ listInput.ContinuationToken = page.NextContinuationToken
+ isLastPage = lastPage
+ return true
+ })
+ if listErr != nil {
+ err = fmt.Errorf("list %v: %v", rootDir, listErr)
+ }
+ }
+ return
+}