diff options
Diffstat (limited to 'weed/storage/backend/s3_backend/s3_backend.go')
| -rw-r--r-- | weed/storage/backend/s3_backend/s3_backend.go | 197 |
1 files changed, 127 insertions, 70 deletions
diff --git a/weed/storage/backend/s3_backend/s3_backend.go b/weed/storage/backend/s3_backend/s3_backend.go index 0ff7eca21..8d71861c2 100644 --- a/weed/storage/backend/s3_backend/s3_backend.go +++ b/weed/storage/backend/s3_backend/s3_backend.go @@ -2,119 +2,176 @@ package s3_backend import ( "fmt" + "io" + "os" "strings" "time" "github.com/aws/aws-sdk-go/service/s3" "github.com/aws/aws-sdk-go/service/s3/s3iface" + "github.com/google/uuid" + "github.com/chrislusf/seaweedfs/weed/glog" + "github.com/chrislusf/seaweedfs/weed/pb/volume_server_pb" "github.com/chrislusf/seaweedfs/weed/storage/backend" - "github.com/chrislusf/seaweedfs/weed/storage/needle" - "github.com/chrislusf/seaweedfs/weed/util" -) - -var ( - _ backend.DataStorageBackend = &S3Backend{} ) func init() { - backend.StorageBackends = append(backend.StorageBackends, &S3Backend{}) + backend.BackendStorageFactories["s3"] = &S3BackendFactory{} } -type S3Backend struct { - conn s3iface.S3API - region string - bucket string - dir string - vid needle.VolumeId - key string +type S3BackendFactory struct { } -func (s3backend S3Backend) ReadAt(p []byte, off int64) (n int, err error) { - bytesRange := fmt.Sprintf("bytes=%d-%d", off, off+int64(len(p))-1) - getObjectOutput, getObjectErr := s3backend.conn.GetObject(&s3.GetObjectInput{ - Bucket: &s3backend.bucket, - Key: &s3backend.key, - Range: &bytesRange, - }) +func (factory *S3BackendFactory) StorageType() backend.StorageType { + return backend.StorageType("s3") +} +func (factory *S3BackendFactory) BuildStorage(configuration backend.StringProperties, configPrefix string, id string) (backend.BackendStorage, error) { + return newS3BackendStorage(configuration, configPrefix, id) +} - if getObjectErr != nil { - return 0, fmt.Errorf("bucket %s GetObject %s: %v", s3backend.bucket, s3backend.key, getObjectErr) +type S3BackendStorage struct { + id string + aws_access_key_id string + aws_secret_access_key string + region string + bucket string + conn s3iface.S3API +} + +func newS3BackendStorage(configuration backend.StringProperties, configPrefix string, id string) (s *S3BackendStorage, err error) { + s = &S3BackendStorage{} + s.id = id + s.aws_access_key_id = configuration.GetString(configPrefix + "aws_access_key_id") + s.aws_secret_access_key = configuration.GetString(configPrefix + "aws_secret_access_key") + s.region = configuration.GetString(configPrefix + "region") + s.bucket = configuration.GetString(configPrefix + "bucket") + s.conn, err = createSession(s.aws_access_key_id, s.aws_secret_access_key, s.region) + + glog.V(0).Infof("created backend storage s3.%s for region %s bucket %s", s.id, s.region, s.bucket) + return +} + +func (s *S3BackendStorage) ToProperties() map[string]string { + m := make(map[string]string) + m["aws_access_key_id"] = s.aws_access_key_id + m["aws_secret_access_key"] = s.aws_secret_access_key + m["region"] = s.region + m["bucket"] = s.bucket + return m +} + +func (s *S3BackendStorage) NewStorageFile(key string, tierInfo *volume_server_pb.VolumeInfo) backend.BackendStorageFile { + if strings.HasPrefix(key, "/") { + key = key[1:] } - defer getObjectOutput.Body.Close() - return getObjectOutput.Body.Read(p) + f := &S3BackendStorageFile{ + backendStorage: s, + key: key, + tierInfo: tierInfo, + } + return f } -func (s3backend S3Backend) WriteAt(p []byte, off int64) (n int, err error) { - panic("implement me") +func (s *S3BackendStorage) CopyFile(f *os.File, attributes map[string]string, fn func(progressed int64, percentage float32) error) (key string, size int64, err error) { + randomUuid, _ := uuid.NewRandom() + key = randomUuid.String() + + glog.V(1).Infof("copying dat file of %s to remote s3.%s as %s", f.Name(), s.id, key) + + size, err = uploadToS3(s.conn, f.Name(), s.bucket, key, attributes, fn) + + return } -func (s3backend S3Backend) Truncate(off int64) error { - panic("implement me") +func (s *S3BackendStorage) DownloadFile(fileName string, key string, fn func(progressed int64, percentage float32) error) (size int64, err error) { + + glog.V(1).Infof("download dat file of %s from remote s3.%s as %s", fileName, s.id, key) + + size, err = downloadFromS3(s.conn, fileName, s.bucket, key, fn) + + return } -func (s3backend S3Backend) Close() error { - return nil +func (s *S3BackendStorage) DeleteFile(key string) (err error) { + + glog.V(1).Infof("delete dat file %s from remote", key) + + err = deleteFromS3(s.conn, s.bucket, key) + + return } -func (s3backend S3Backend) GetStat() (datSize int64, modTime time.Time, err error) { +type S3BackendStorageFile struct { + backendStorage *S3BackendStorage + key string + tierInfo *volume_server_pb.VolumeInfo +} - headObjectOutput, headObjectErr := s3backend.conn.HeadObject(&s3.HeadObjectInput{ - Bucket: &s3backend.bucket, - Key: &s3backend.key, +func (s3backendStorageFile S3BackendStorageFile) ReadAt(p []byte, off int64) (n int, err error) { + + bytesRange := fmt.Sprintf("bytes=%d-%d", off, off+int64(len(p))-1) + + // glog.V(0).Infof("read %s %s", s3backendStorageFile.key, bytesRange) + + getObjectOutput, getObjectErr := s3backendStorageFile.backendStorage.conn.GetObject(&s3.GetObjectInput{ + Bucket: &s3backendStorageFile.backendStorage.bucket, + Key: &s3backendStorageFile.key, + Range: &bytesRange, }) - if headObjectErr != nil { - return 0, time.Now(), fmt.Errorf("bucket %s HeadObject %s: %v", s3backend.bucket, s3backend.key, headObjectErr) + if getObjectErr != nil { + return 0, fmt.Errorf("bucket %s GetObject %s: %v", s3backendStorageFile.backendStorage.bucket, s3backendStorageFile.key, getObjectErr) + } + defer getObjectOutput.Body.Close() + + glog.V(4).Infof("read %s %s", s3backendStorageFile.key, bytesRange) + glog.V(4).Infof("content range: %s, contentLength: %d", *getObjectOutput.ContentRange, *getObjectOutput.ContentLength) + + for { + if n, err = getObjectOutput.Body.Read(p); err == nil && n < len(p) { + p = p[n:] + } else { + break + } } - datSize = int64(*headObjectOutput.ContentLength) - modTime = *headObjectOutput.LastModified + if err == io.EOF { + err = nil + } return } -func (s3backend S3Backend) String() string { - return fmt.Sprintf("%s/%s", s3backend.bucket, s3backend.key) +func (s3backendStorageFile S3BackendStorageFile) WriteAt(p []byte, off int64) (n int, err error) { + panic("not implemented") } -func (s3backend *S3Backend) GetName() string { - return "s3" +func (s3backendStorageFile S3BackendStorageFile) Truncate(off int64) error { + panic("not implemented") } -func (s3backend *S3Backend) GetSinkToDirectory() string { - return s3backend.dir +func (s3backendStorageFile S3BackendStorageFile) Close() error { + return nil } -func (s3backend *S3Backend) Initialize(configuration util.Configuration, vid needle.VolumeId) error { - glog.V(0).Infof("storage.backend.s3.region: %v", configuration.GetString("region")) - glog.V(0).Infof("storage.backend.s3.bucket: %v", configuration.GetString("bucket")) - glog.V(0).Infof("storage.backend.s3.directory: %v", configuration.GetString("directory")) +func (s3backendStorageFile S3BackendStorageFile) GetStat() (datSize int64, modTime time.Time, err error) { - return s3backend.initialize( - configuration.GetString("aws_access_key_id"), - configuration.GetString("aws_secret_access_key"), - configuration.GetString("region"), - configuration.GetString("bucket"), - configuration.GetString("directory"), - vid, - ) -} - -func (s3backend *S3Backend) initialize(awsAccessKeyId, awsSecretAccessKey, region, bucket, dir string, - vid needle.VolumeId) (err error) { - s3backend.region = region - s3backend.bucket = bucket - s3backend.dir = dir - s3backend.conn, err = createSession(awsAccessKeyId, awsSecretAccessKey, region) + files := s3backendStorageFile.tierInfo.GetFiles() - s3backend.vid = vid - s3backend.key = fmt.Sprintf("%s/%d.dat", dir, vid) - if strings.HasPrefix(s3backend.key, "/") { - s3backend.key = s3backend.key[1:] + if len(files) == 0 { + err = fmt.Errorf("remote file info not found") + return } - return err + datSize = int64(files[0].FileSize) + modTime = time.Unix(int64(files[0].ModifiedTime), 0) + + return +} + +func (s3backendStorageFile S3BackendStorageFile) Name() string { + return s3backendStorageFile.key } |
