diff options
| author | Chris Lu <chris.lu@gmail.com> | 2021-07-26 22:53:44 -0700 |
|---|---|---|
| committer | Chris Lu <chris.lu@gmail.com> | 2021-07-26 22:53:44 -0700 |
| commit | 99b599aa8a674ccd584d612e8e871fdca7670620 (patch) | |
| tree | e2d16070d21c4ecfa68509b095d21c7232b348fa /weed/remote_storage | |
| parent | 35f70c51b0ff1ca71fec6c0194086329ef9e3e86 (diff) | |
| download | seaweedfs-99b599aa8a674ccd584d612e8e871fdca7670620.tar.xz seaweedfs-99b599aa8a674ccd584d612e8e871fdca7670620.zip | |
remote.mount
Diffstat (limited to 'weed/remote_storage')
| -rw-r--r-- | weed/remote_storage/remote_storage.go | 50 | ||||
| -rw-r--r-- | weed/remote_storage/s3/s3_storage_client.go | 94 |
2 files changed, 144 insertions, 0 deletions
diff --git a/weed/remote_storage/remote_storage.go b/weed/remote_storage/remote_storage.go new file mode 100644 index 000000000..79f71001b --- /dev/null +++ b/weed/remote_storage/remote_storage.go @@ -0,0 +1,50 @@ +package remote_storage + +import ( + "fmt" + "github.com/chrislusf/seaweedfs/weed/pb/filer_pb" + "sync" +) + +type VisitFunc func(dir string, name string, isDirectory bool, remoteEntry *filer_pb.RemoteEntry) error + +type RemoteStorageClient interface { + Traverse(rootDir string, visitFn VisitFunc) error +} + +type RemoteStorageClientMaker interface { + Make(remoteConf *filer_pb.RemoteConf) (RemoteStorageClient, error) +} + +var ( + RemoteStorageClientMakers = make(map[string]RemoteStorageClientMaker) + remoteStorageClients = make(map[string]RemoteStorageClient) + remoteStorageClientsLock sync.Mutex +) + +func makeRemoteStorageClient(remoteConf *filer_pb.RemoteConf) (RemoteStorageClient, error) { + maker, found := RemoteStorageClientMakers[remoteConf.Type] + if !found { + return nil, fmt.Errorf("remote storage type %s not found", remoteConf.Type) + } + return maker.Make(remoteConf) +} + +func GetRemoteStorage(remoteConf *filer_pb.RemoteConf) (RemoteStorageClient, error) { + remoteStorageClientsLock.Lock() + defer remoteStorageClientsLock.Unlock() + + existingRemoteStorageClient, found := remoteStorageClients[remoteConf.Name] + if found { + return existingRemoteStorageClient, nil + } + + newRemoteStorageClient, err := makeRemoteStorageClient(remoteConf) + if err != nil { + return nil, fmt.Errorf("make remote storage client %s: %v", remoteConf.Name, err) + } + + remoteStorageClients[remoteConf.Name] = newRemoteStorageClient + + return newRemoteStorageClient, nil +} diff --git a/weed/remote_storage/s3/s3_storage_client.go b/weed/remote_storage/s3/s3_storage_client.go new file mode 100644 index 000000000..625d6e0e6 --- /dev/null +++ b/weed/remote_storage/s3/s3_storage_client.go @@ -0,0 +1,94 @@ +package s3 + +import ( + "fmt" + "github.com/aws/aws-sdk-go/aws" + "github.com/aws/aws-sdk-go/aws/credentials" + "github.com/aws/aws-sdk-go/aws/session" + "github.com/aws/aws-sdk-go/service/s3" + "github.com/aws/aws-sdk-go/service/s3/s3iface" + "github.com/chrislusf/seaweedfs/weed/pb/filer_pb" + "github.com/chrislusf/seaweedfs/weed/remote_storage" + "github.com/chrislusf/seaweedfs/weed/util" + "strings" +) + +func init() { + remote_storage.RemoteStorageClientMakers["s3"] = new(s3RemoteStorageMaker) +} + +type s3RemoteStorageMaker struct{} + +func (s s3RemoteStorageMaker) Make(conf *filer_pb.RemoteConf) (remote_storage.RemoteStorageClient, error) { + client := &s3RemoteStorageClient{ + conf: conf, + } + config := &aws.Config{ + Region: aws.String(conf.S3Region), + Endpoint: aws.String(conf.S3Endpoint), + S3ForcePathStyle: aws.Bool(true), + } + if conf.S3AccessKey != "" && conf.S3SecretKey != "" { + config.Credentials = credentials.NewStaticCredentials(conf.S3AccessKey, conf.S3SecretKey, "") + } + + sess, err := session.NewSession(config) + if err != nil { + return nil, fmt.Errorf("create aws session: %v", err) + } + client.conn = s3.New(sess) + return client, nil +} + +type s3RemoteStorageClient struct { + conf *filer_pb.RemoteConf + conn s3iface.S3API +} + +func (s s3RemoteStorageClient) Traverse(rootDir string, visitFn remote_storage.VisitFunc) (err error) { + if !strings.HasPrefix(rootDir, "/") { + return fmt.Errorf("remote directory %s should start with /", rootDir) + } + bucket := strings.Split(rootDir[1:], "/")[0] + prefix := rootDir[1+len(bucket):] + if len(prefix) > 0 && strings.HasPrefix(prefix, "/") { + prefix = prefix[1:] + } + + listInput := &s3.ListObjectsV2Input{ + Bucket: aws.String(bucket), + ContinuationToken: nil, + Delimiter: nil, // not aws.String("/"), iterate through all entries + EncodingType: nil, + ExpectedBucketOwner: nil, + FetchOwner: nil, + MaxKeys: nil, // aws.Int64(1000), + Prefix: aws.String(prefix), + RequestPayer: nil, + StartAfter: nil, + } + isLastPage := false + for !isLastPage && err == nil { + listErr := s.conn.ListObjectsV2Pages(listInput, func(page *s3.ListObjectsV2Output, lastPage bool) bool { + for _, content := range page.Contents { + key := *content.Key + dir, name := util.FullPath("/" + key).DirAndName() + if err := visitFn(dir, name, false, &filer_pb.RemoteEntry{ + LastModifiedAt: (*content.LastModified).Unix(), + Size: *content.Size, + ETag: *content.ETag, + StorageName: s.conf.Name, + }); err != nil { + return false + } + } + listInput.ContinuationToken = page.NextContinuationToken + isLastPage = lastPage + return true + }) + if listErr != nil { + err = fmt.Errorf("list %v: %v", rootDir, listErr) + } + } + return +} |
