diff options
| author | Chris Lu <chris.lu@gmail.com> | 2019-11-28 18:33:18 -0800 |
|---|---|---|
| committer | Chris Lu <chris.lu@gmail.com> | 2019-11-28 18:33:18 -0800 |
| commit | f60154f330a81354c433da37d612f235d1c0e4e9 (patch) | |
| tree | 5e473f9616e35e525468959496e0448fb505e20a /weed/storage/backend/s3_backend/s3_backend.go | |
| parent | 641b92f53c057a2d0dd957ef2da26aadf0b70419 (diff) | |
| download | seaweedfs-f60154f330a81354c433da37d612f235d1c0e4e9.tar.xz seaweedfs-f60154f330a81354c433da37d612f235d1c0e4e9.zip | |
master load backend storage config from master.toml
Diffstat (limited to 'weed/storage/backend/s3_backend/s3_backend.go')
| -rw-r--r-- | weed/storage/backend/s3_backend/s3_backend.go | 124 |
1 files changed, 68 insertions, 56 deletions
diff --git a/weed/storage/backend/s3_backend/s3_backend.go b/weed/storage/backend/s3_backend/s3_backend.go index 69360806f..980e9e9d7 100644 --- a/weed/storage/backend/s3_backend/s3_backend.go +++ b/weed/storage/backend/s3_backend/s3_backend.go @@ -10,36 +10,77 @@ import ( "github.com/aws/aws-sdk-go/service/s3/s3iface" "github.com/chrislusf/seaweedfs/weed/glog" "github.com/chrislusf/seaweedfs/weed/storage/backend" - "github.com/chrislusf/seaweedfs/weed/storage/needle" "github.com/chrislusf/seaweedfs/weed/util" ) -var ( - _ backend.DataStorageBackend = &S3Backend{} -) - func init() { - backend.StorageBackends = append(backend.StorageBackends, &S3Backend{}) + backend.BackendStorageFactories["s3"] = &S3BackendFactory{} +} + +type S3BackendFactory struct { } -type S3Backend struct { +func (factory *S3BackendFactory) StorageType() backend.StorageType { + return backend.StorageType("s3") +} +func (factory *S3BackendFactory) BuildStorage(configuration util.Configuration, id string) (backend.BackendStorage, error) { + return newS3BackendStorage(configuration, id) +} + +type S3BackendStorage struct { + id string conn s3iface.S3API region string bucket string - vid needle.VolumeId - key string } -func (s3backend S3Backend) ReadAt(p []byte, off int64) (n int, err error) { +func newS3BackendStorage(configuration util.Configuration, id string) (s *S3BackendStorage, err error) { + s = &S3BackendStorage{} + s.id = id + s.conn, err = createSession( + configuration.GetString("aws_access_key_id"), + configuration.GetString("aws_secret_access_key"), + configuration.GetString("region")) + s.region = configuration.GetString("region") + s.bucket = configuration.GetString("bucket") + + glog.V(0).Infof("created s3 backend storage %s for region %s bucket %s", s.Name(), s.region, s.bucket) + return +} + +func (s *S3BackendStorage) Name() string { + return "s3." + s.id +} + +func (s *S3BackendStorage) NewStorageFile(key string) backend.BackendStorageFile { + if strings.HasPrefix(key, "/") { + key = key[1:] + } + + f := &S3BackendStorageFile{ + backendStorage: s, + key: key, + } + + return f +} + +type S3BackendStorageFile struct { + backendStorage *S3BackendStorage + key string +} + +func (s3backendStorageFile S3BackendStorageFile) ReadAt(p []byte, off int64) (n int, err error) { bytesRange := fmt.Sprintf("bytes=%d-%d", off, off+int64(len(p))-1) - getObjectOutput, getObjectErr := s3backend.conn.GetObject(&s3.GetObjectInput{ - Bucket: &s3backend.bucket, - Key: &s3backend.key, + getObjectOutput, getObjectErr := s3backendStorageFile.backendStorage.conn.GetObject(&s3.GetObjectInput{ + Bucket: &s3backendStorageFile.backendStorage.bucket, + Key: &s3backendStorageFile.key, Range: &bytesRange, }) if getObjectErr != nil { - return 0, fmt.Errorf("bucket %s GetObject %s: %v", s3backend.bucket, s3backend.key, getObjectErr) + return 0, fmt.Errorf("bucket %s GetObject %s: %v", + s3backendStorageFile.backendStorage.bucket, s3backendStorageFile.key, getObjectErr) } defer getObjectOutput.Body.Close() @@ -47,27 +88,28 @@ func (s3backend S3Backend) ReadAt(p []byte, off int64) (n int, err error) { } -func (s3backend S3Backend) WriteAt(p []byte, off int64) (n int, err error) { +func (s3backendStorageFile S3BackendStorageFile) WriteAt(p []byte, off int64) (n int, err error) { panic("implement me") } -func (s3backend S3Backend) Truncate(off int64) error { +func (s3backendStorageFile S3BackendStorageFile) Truncate(off int64) error { panic("implement me") } -func (s3backend S3Backend) Close() error { +func (s3backendStorageFile S3BackendStorageFile) Close() error { return nil } -func (s3backend S3Backend) GetStat() (datSize int64, modTime time.Time, err error) { +func (s3backendStorageFile S3BackendStorageFile) GetStat() (datSize int64, modTime time.Time, err error) { - headObjectOutput, headObjectErr := s3backend.conn.HeadObject(&s3.HeadObjectInput{ - Bucket: &s3backend.bucket, - Key: &s3backend.key, + headObjectOutput, headObjectErr := s3backendStorageFile.backendStorage.conn.HeadObject(&s3.HeadObjectInput{ + Bucket: &s3backendStorageFile.backendStorage.bucket, + Key: &s3backendStorageFile.key, }) if headObjectErr != nil { - return 0, time.Now(), fmt.Errorf("bucket %s HeadObject %s: %v", s3backend.bucket, s3backend.key, headObjectErr) + return 0, time.Now(), fmt.Errorf("bucket %s HeadObject %s: %v", + s3backendStorageFile.backendStorage.bucket, s3backendStorageFile.key, headObjectErr) } datSize = int64(*headObjectOutput.ContentLength) @@ -76,44 +118,14 @@ func (s3backend S3Backend) GetStat() (datSize int64, modTime time.Time, err erro return } -func (s3backend S3Backend) String() string { - return fmt.Sprintf("%s/%s", s3backend.bucket, s3backend.key) +func (s3backendStorageFile S3BackendStorageFile) String() string { + return s3backendStorageFile.key } -func (s3backend *S3Backend) GetName() string { +func (s3backendStorageFile *S3BackendStorageFile) GetName() string { return "s3" } -func (s3backend S3Backend) Instantiate(src *os.File) error { +func (s3backendStorageFile S3BackendStorageFile) Instantiate(src *os.File) error { panic("implement me") } - -func (s3backend *S3Backend) Initialize(configuration util.Configuration, prefix string, vid needle.VolumeId) error { - glog.V(0).Infof("storage.backend.s3.region: %v", configuration.GetString("region")) - glog.V(0).Infof("storage.backend.s3.bucket: %v", configuration.GetString("bucket")) - glog.V(0).Infof("storage.backend.s3.directory: %v", configuration.GetString("directory")) - - return s3backend.initialize( - configuration.GetString("aws_access_key_id"), - configuration.GetString("aws_secret_access_key"), - configuration.GetString("region"), - configuration.GetString("bucket"), - prefix, - vid, - ) -} - -func (s3backend *S3Backend) initialize(awsAccessKeyId, awsSecretAccessKey, region, bucket string, - prefix string, vid needle.VolumeId) (err error) { - s3backend.region = region - s3backend.bucket = bucket - s3backend.conn, err = createSession(awsAccessKeyId, awsSecretAccessKey, region) - - s3backend.vid = vid - s3backend.key = fmt.Sprintf("%s_%d.dat", prefix, vid) - if strings.HasPrefix(s3backend.key, "/") { - s3backend.key = s3backend.key[1:] - } - - return err -} |
