aboutsummaryrefslogtreecommitdiff
path: root/weed/storage/backend/s3_backend/s3_backend.go
diff options
context:
space:
mode:
authorChris Lu <chris.lu@gmail.com>2019-11-28 18:33:18 -0800
committerChris Lu <chris.lu@gmail.com>2019-11-28 18:33:18 -0800
commitf60154f330a81354c433da37d612f235d1c0e4e9 (patch)
tree5e473f9616e35e525468959496e0448fb505e20a /weed/storage/backend/s3_backend/s3_backend.go
parent641b92f53c057a2d0dd957ef2da26aadf0b70419 (diff)
downloadseaweedfs-f60154f330a81354c433da37d612f235d1c0e4e9.tar.xz
seaweedfs-f60154f330a81354c433da37d612f235d1c0e4e9.zip
master load backend storage config from master.toml
Diffstat (limited to 'weed/storage/backend/s3_backend/s3_backend.go')
-rw-r--r--weed/storage/backend/s3_backend/s3_backend.go124
1 files changed, 68 insertions, 56 deletions
diff --git a/weed/storage/backend/s3_backend/s3_backend.go b/weed/storage/backend/s3_backend/s3_backend.go
index 69360806f..980e9e9d7 100644
--- a/weed/storage/backend/s3_backend/s3_backend.go
+++ b/weed/storage/backend/s3_backend/s3_backend.go
@@ -10,36 +10,77 @@ import (
"github.com/aws/aws-sdk-go/service/s3/s3iface"
"github.com/chrislusf/seaweedfs/weed/glog"
"github.com/chrislusf/seaweedfs/weed/storage/backend"
- "github.com/chrislusf/seaweedfs/weed/storage/needle"
"github.com/chrislusf/seaweedfs/weed/util"
)
-var (
- _ backend.DataStorageBackend = &S3Backend{}
-)
-
func init() {
- backend.StorageBackends = append(backend.StorageBackends, &S3Backend{})
+ backend.BackendStorageFactories["s3"] = &S3BackendFactory{}
+}
+
+type S3BackendFactory struct {
}
-type S3Backend struct {
+func (factory *S3BackendFactory) StorageType() backend.StorageType {
+ return backend.StorageType("s3")
+}
+func (factory *S3BackendFactory) BuildStorage(configuration util.Configuration, id string) (backend.BackendStorage, error) {
+ return newS3BackendStorage(configuration, id)
+}
+
+type S3BackendStorage struct {
+ id string
conn s3iface.S3API
region string
bucket string
- vid needle.VolumeId
- key string
}
-func (s3backend S3Backend) ReadAt(p []byte, off int64) (n int, err error) {
+func newS3BackendStorage(configuration util.Configuration, id string) (s *S3BackendStorage, err error) {
+ s = &S3BackendStorage{}
+ s.id = id
+ s.conn, err = createSession(
+ configuration.GetString("aws_access_key_id"),
+ configuration.GetString("aws_secret_access_key"),
+ configuration.GetString("region"))
+ s.region = configuration.GetString("region")
+ s.bucket = configuration.GetString("bucket")
+
+ glog.V(0).Infof("created s3 backend storage %s for region %s bucket %s", s.Name(), s.region, s.bucket)
+ return
+}
+
+func (s *S3BackendStorage) Name() string {
+ return "s3." + s.id
+}
+
+func (s *S3BackendStorage) NewStorageFile(key string) backend.BackendStorageFile {
+ if strings.HasPrefix(key, "/") {
+ key = key[1:]
+ }
+
+ f := &S3BackendStorageFile{
+ backendStorage: s,
+ key: key,
+ }
+
+ return f
+}
+
+type S3BackendStorageFile struct {
+ backendStorage *S3BackendStorage
+ key string
+}
+
+func (s3backendStorageFile S3BackendStorageFile) ReadAt(p []byte, off int64) (n int, err error) {
bytesRange := fmt.Sprintf("bytes=%d-%d", off, off+int64(len(p))-1)
- getObjectOutput, getObjectErr := s3backend.conn.GetObject(&s3.GetObjectInput{
- Bucket: &s3backend.bucket,
- Key: &s3backend.key,
+ getObjectOutput, getObjectErr := s3backendStorageFile.backendStorage.conn.GetObject(&s3.GetObjectInput{
+ Bucket: &s3backendStorageFile.backendStorage.bucket,
+ Key: &s3backendStorageFile.key,
Range: &bytesRange,
})
if getObjectErr != nil {
- return 0, fmt.Errorf("bucket %s GetObject %s: %v", s3backend.bucket, s3backend.key, getObjectErr)
+ return 0, fmt.Errorf("bucket %s GetObject %s: %v",
+ s3backendStorageFile.backendStorage.bucket, s3backendStorageFile.key, getObjectErr)
}
defer getObjectOutput.Body.Close()
@@ -47,27 +88,28 @@ func (s3backend S3Backend) ReadAt(p []byte, off int64) (n int, err error) {
}
-func (s3backend S3Backend) WriteAt(p []byte, off int64) (n int, err error) {
+func (s3backendStorageFile S3BackendStorageFile) WriteAt(p []byte, off int64) (n int, err error) {
panic("implement me")
}
-func (s3backend S3Backend) Truncate(off int64) error {
+func (s3backendStorageFile S3BackendStorageFile) Truncate(off int64) error {
panic("implement me")
}
-func (s3backend S3Backend) Close() error {
+func (s3backendStorageFile S3BackendStorageFile) Close() error {
return nil
}
-func (s3backend S3Backend) GetStat() (datSize int64, modTime time.Time, err error) {
+func (s3backendStorageFile S3BackendStorageFile) GetStat() (datSize int64, modTime time.Time, err error) {
- headObjectOutput, headObjectErr := s3backend.conn.HeadObject(&s3.HeadObjectInput{
- Bucket: &s3backend.bucket,
- Key: &s3backend.key,
+ headObjectOutput, headObjectErr := s3backendStorageFile.backendStorage.conn.HeadObject(&s3.HeadObjectInput{
+ Bucket: &s3backendStorageFile.backendStorage.bucket,
+ Key: &s3backendStorageFile.key,
})
if headObjectErr != nil {
- return 0, time.Now(), fmt.Errorf("bucket %s HeadObject %s: %v", s3backend.bucket, s3backend.key, headObjectErr)
+ return 0, time.Now(), fmt.Errorf("bucket %s HeadObject %s: %v",
+ s3backendStorageFile.backendStorage.bucket, s3backendStorageFile.key, headObjectErr)
}
datSize = int64(*headObjectOutput.ContentLength)
@@ -76,44 +118,14 @@ func (s3backend S3Backend) GetStat() (datSize int64, modTime time.Time, err erro
return
}
-func (s3backend S3Backend) String() string {
- return fmt.Sprintf("%s/%s", s3backend.bucket, s3backend.key)
+func (s3backendStorageFile S3BackendStorageFile) String() string {
+ return s3backendStorageFile.key
}
-func (s3backend *S3Backend) GetName() string {
+func (s3backendStorageFile *S3BackendStorageFile) GetName() string {
return "s3"
}
-func (s3backend S3Backend) Instantiate(src *os.File) error {
+func (s3backendStorageFile S3BackendStorageFile) Instantiate(src *os.File) error {
panic("implement me")
}
-
-func (s3backend *S3Backend) Initialize(configuration util.Configuration, prefix string, vid needle.VolumeId) error {
- glog.V(0).Infof("storage.backend.s3.region: %v", configuration.GetString("region"))
- glog.V(0).Infof("storage.backend.s3.bucket: %v", configuration.GetString("bucket"))
- glog.V(0).Infof("storage.backend.s3.directory: %v", configuration.GetString("directory"))
-
- return s3backend.initialize(
- configuration.GetString("aws_access_key_id"),
- configuration.GetString("aws_secret_access_key"),
- configuration.GetString("region"),
- configuration.GetString("bucket"),
- prefix,
- vid,
- )
-}
-
-func (s3backend *S3Backend) initialize(awsAccessKeyId, awsSecretAccessKey, region, bucket string,
- prefix string, vid needle.VolumeId) (err error) {
- s3backend.region = region
- s3backend.bucket = bucket
- s3backend.conn, err = createSession(awsAccessKeyId, awsSecretAccessKey, region)
-
- s3backend.vid = vid
- s3backend.key = fmt.Sprintf("%s_%d.dat", prefix, vid)
- if strings.HasPrefix(s3backend.key, "/") {
- s3backend.key = s3backend.key[1:]
- }
-
- return err
-}