aboutsummaryrefslogtreecommitdiff
path: root/weed/storage/backend/s3_backend/s3_backend.go
diff options
context:
space:
mode:
Diffstat (limited to 'weed/storage/backend/s3_backend/s3_backend.go')
-rw-r--r--weed/storage/backend/s3_backend/s3_backend.go197
1 files changed, 127 insertions, 70 deletions
diff --git a/weed/storage/backend/s3_backend/s3_backend.go b/weed/storage/backend/s3_backend/s3_backend.go
index 0ff7eca21..8d71861c2 100644
--- a/weed/storage/backend/s3_backend/s3_backend.go
+++ b/weed/storage/backend/s3_backend/s3_backend.go
@@ -2,119 +2,176 @@ package s3_backend
import (
"fmt"
+ "io"
+ "os"
"strings"
"time"
"github.com/aws/aws-sdk-go/service/s3"
"github.com/aws/aws-sdk-go/service/s3/s3iface"
+ "github.com/google/uuid"
+
"github.com/chrislusf/seaweedfs/weed/glog"
+ "github.com/chrislusf/seaweedfs/weed/pb/volume_server_pb"
"github.com/chrislusf/seaweedfs/weed/storage/backend"
- "github.com/chrislusf/seaweedfs/weed/storage/needle"
- "github.com/chrislusf/seaweedfs/weed/util"
-)
-
-var (
- _ backend.DataStorageBackend = &S3Backend{}
)
func init() {
- backend.StorageBackends = append(backend.StorageBackends, &S3Backend{})
+ backend.BackendStorageFactories["s3"] = &S3BackendFactory{}
}
-type S3Backend struct {
- conn s3iface.S3API
- region string
- bucket string
- dir string
- vid needle.VolumeId
- key string
+type S3BackendFactory struct {
}
-func (s3backend S3Backend) ReadAt(p []byte, off int64) (n int, err error) {
- bytesRange := fmt.Sprintf("bytes=%d-%d", off, off+int64(len(p))-1)
- getObjectOutput, getObjectErr := s3backend.conn.GetObject(&s3.GetObjectInput{
- Bucket: &s3backend.bucket,
- Key: &s3backend.key,
- Range: &bytesRange,
- })
+func (factory *S3BackendFactory) StorageType() backend.StorageType {
+ return backend.StorageType("s3")
+}
+func (factory *S3BackendFactory) BuildStorage(configuration backend.StringProperties, configPrefix string, id string) (backend.BackendStorage, error) {
+ return newS3BackendStorage(configuration, configPrefix, id)
+}
- if getObjectErr != nil {
- return 0, fmt.Errorf("bucket %s GetObject %s: %v", s3backend.bucket, s3backend.key, getObjectErr)
+type S3BackendStorage struct {
+ id string
+ aws_access_key_id string
+ aws_secret_access_key string
+ region string
+ bucket string
+ conn s3iface.S3API
+}
+
+func newS3BackendStorage(configuration backend.StringProperties, configPrefix string, id string) (s *S3BackendStorage, err error) {
+ s = &S3BackendStorage{}
+ s.id = id
+ s.aws_access_key_id = configuration.GetString(configPrefix + "aws_access_key_id")
+ s.aws_secret_access_key = configuration.GetString(configPrefix + "aws_secret_access_key")
+ s.region = configuration.GetString(configPrefix + "region")
+ s.bucket = configuration.GetString(configPrefix + "bucket")
+ s.conn, err = createSession(s.aws_access_key_id, s.aws_secret_access_key, s.region)
+
+ glog.V(0).Infof("created backend storage s3.%s for region %s bucket %s", s.id, s.region, s.bucket)
+ return
+}
+
+func (s *S3BackendStorage) ToProperties() map[string]string {
+ m := make(map[string]string)
+ m["aws_access_key_id"] = s.aws_access_key_id
+ m["aws_secret_access_key"] = s.aws_secret_access_key
+ m["region"] = s.region
+ m["bucket"] = s.bucket
+ return m
+}
+
+func (s *S3BackendStorage) NewStorageFile(key string, tierInfo *volume_server_pb.VolumeInfo) backend.BackendStorageFile {
+ if strings.HasPrefix(key, "/") {
+ key = key[1:]
}
- defer getObjectOutput.Body.Close()
- return getObjectOutput.Body.Read(p)
+ f := &S3BackendStorageFile{
+ backendStorage: s,
+ key: key,
+ tierInfo: tierInfo,
+ }
+ return f
}
-func (s3backend S3Backend) WriteAt(p []byte, off int64) (n int, err error) {
- panic("implement me")
+func (s *S3BackendStorage) CopyFile(f *os.File, attributes map[string]string, fn func(progressed int64, percentage float32) error) (key string, size int64, err error) {
+ randomUuid, _ := uuid.NewRandom()
+ key = randomUuid.String()
+
+ glog.V(1).Infof("copying dat file of %s to remote s3.%s as %s", f.Name(), s.id, key)
+
+ size, err = uploadToS3(s.conn, f.Name(), s.bucket, key, attributes, fn)
+
+ return
}
-func (s3backend S3Backend) Truncate(off int64) error {
- panic("implement me")
+func (s *S3BackendStorage) DownloadFile(fileName string, key string, fn func(progressed int64, percentage float32) error) (size int64, err error) {
+
+ glog.V(1).Infof("download dat file of %s from remote s3.%s as %s", fileName, s.id, key)
+
+ size, err = downloadFromS3(s.conn, fileName, s.bucket, key, fn)
+
+ return
}
-func (s3backend S3Backend) Close() error {
- return nil
+func (s *S3BackendStorage) DeleteFile(key string) (err error) {
+
+ glog.V(1).Infof("delete dat file %s from remote", key)
+
+ err = deleteFromS3(s.conn, s.bucket, key)
+
+ return
}
-func (s3backend S3Backend) GetStat() (datSize int64, modTime time.Time, err error) {
+type S3BackendStorageFile struct {
+ backendStorage *S3BackendStorage
+ key string
+ tierInfo *volume_server_pb.VolumeInfo
+}
- headObjectOutput, headObjectErr := s3backend.conn.HeadObject(&s3.HeadObjectInput{
- Bucket: &s3backend.bucket,
- Key: &s3backend.key,
+func (s3backendStorageFile S3BackendStorageFile) ReadAt(p []byte, off int64) (n int, err error) {
+
+ bytesRange := fmt.Sprintf("bytes=%d-%d", off, off+int64(len(p))-1)
+
+ // glog.V(0).Infof("read %s %s", s3backendStorageFile.key, bytesRange)
+
+ getObjectOutput, getObjectErr := s3backendStorageFile.backendStorage.conn.GetObject(&s3.GetObjectInput{
+ Bucket: &s3backendStorageFile.backendStorage.bucket,
+ Key: &s3backendStorageFile.key,
+ Range: &bytesRange,
})
- if headObjectErr != nil {
- return 0, time.Now(), fmt.Errorf("bucket %s HeadObject %s: %v", s3backend.bucket, s3backend.key, headObjectErr)
+ if getObjectErr != nil {
+ return 0, fmt.Errorf("bucket %s GetObject %s: %v", s3backendStorageFile.backendStorage.bucket, s3backendStorageFile.key, getObjectErr)
+ }
+ defer getObjectOutput.Body.Close()
+
+ glog.V(4).Infof("read %s %s", s3backendStorageFile.key, bytesRange)
+ glog.V(4).Infof("content range: %s, contentLength: %d", *getObjectOutput.ContentRange, *getObjectOutput.ContentLength)
+
+ for {
+ if n, err = getObjectOutput.Body.Read(p); err == nil && n < len(p) {
+ p = p[n:]
+ } else {
+ break
+ }
}
- datSize = int64(*headObjectOutput.ContentLength)
- modTime = *headObjectOutput.LastModified
+ if err == io.EOF {
+ err = nil
+ }
return
}
-func (s3backend S3Backend) String() string {
- return fmt.Sprintf("%s/%s", s3backend.bucket, s3backend.key)
+func (s3backendStorageFile S3BackendStorageFile) WriteAt(p []byte, off int64) (n int, err error) {
+ panic("not implemented")
}
-func (s3backend *S3Backend) GetName() string {
- return "s3"
+func (s3backendStorageFile S3BackendStorageFile) Truncate(off int64) error {
+ panic("not implemented")
}
-func (s3backend *S3Backend) GetSinkToDirectory() string {
- return s3backend.dir
+func (s3backendStorageFile S3BackendStorageFile) Close() error {
+ return nil
}
-func (s3backend *S3Backend) Initialize(configuration util.Configuration, vid needle.VolumeId) error {
- glog.V(0).Infof("storage.backend.s3.region: %v", configuration.GetString("region"))
- glog.V(0).Infof("storage.backend.s3.bucket: %v", configuration.GetString("bucket"))
- glog.V(0).Infof("storage.backend.s3.directory: %v", configuration.GetString("directory"))
+func (s3backendStorageFile S3BackendStorageFile) GetStat() (datSize int64, modTime time.Time, err error) {
- return s3backend.initialize(
- configuration.GetString("aws_access_key_id"),
- configuration.GetString("aws_secret_access_key"),
- configuration.GetString("region"),
- configuration.GetString("bucket"),
- configuration.GetString("directory"),
- vid,
- )
-}
-
-func (s3backend *S3Backend) initialize(awsAccessKeyId, awsSecretAccessKey, region, bucket, dir string,
- vid needle.VolumeId) (err error) {
- s3backend.region = region
- s3backend.bucket = bucket
- s3backend.dir = dir
- s3backend.conn, err = createSession(awsAccessKeyId, awsSecretAccessKey, region)
+ files := s3backendStorageFile.tierInfo.GetFiles()
- s3backend.vid = vid
- s3backend.key = fmt.Sprintf("%s/%d.dat", dir, vid)
- if strings.HasPrefix(s3backend.key, "/") {
- s3backend.key = s3backend.key[1:]
+ if len(files) == 0 {
+ err = fmt.Errorf("remote file info not found")
+ return
}
- return err
+ datSize = int64(files[0].FileSize)
+ modTime = time.Unix(int64(files[0].ModifiedTime), 0)
+
+ return
+}
+
+func (s3backendStorageFile S3BackendStorageFile) Name() string {
+ return s3backendStorageFile.key
}