aboutsummaryrefslogtreecommitdiff
path: root/weed/storage/backend
diff options
context:
space:
mode:
Diffstat (limited to 'weed/storage/backend')
-rw-r--r--weed/storage/backend/backend.go122
-rw-r--r--weed/storage/backend/disk_file.go4
-rw-r--r--weed/storage/backend/memory_map/memory_map_backend.go4
-rw-r--r--weed/storage/backend/s3_backend/s3_backend.go197
-rw-r--r--weed/storage/backend/s3_backend/s3_download.go98
-rw-r--r--weed/storage/backend/s3_backend/s3_sessions.go8
-rw-r--r--weed/storage/backend/s3_backend/s3_upload.go114
7 files changed, 470 insertions, 77 deletions
diff --git a/weed/storage/backend/backend.go b/weed/storage/backend/backend.go
index 3c297f20b..6941ca5a1 100644
--- a/weed/storage/backend/backend.go
+++ b/weed/storage/backend/backend.go
@@ -2,18 +2,134 @@ package backend
import (
"io"
+ "os"
+ "strings"
"time"
+
+ "github.com/chrislusf/seaweedfs/weed/glog"
+ "github.com/chrislusf/seaweedfs/weed/pb/master_pb"
+ "github.com/chrislusf/seaweedfs/weed/pb/volume_server_pb"
+ "github.com/spf13/viper"
)
-type DataStorageBackend interface {
+type BackendStorageFile interface {
io.ReaderAt
io.WriterAt
Truncate(off int64) error
io.Closer
GetStat() (datSize int64, modTime time.Time, err error)
- String() string
+ Name() string
+}
+
+type BackendStorage interface {
+ ToProperties() map[string]string
+ NewStorageFile(key string, tierInfo *volume_server_pb.VolumeInfo) BackendStorageFile
+ CopyFile(f *os.File, attributes map[string]string, fn func(progressed int64, percentage float32) error) (key string, size int64, err error)
+ DownloadFile(fileName string, key string, fn func(progressed int64, percentage float32) error) (size int64, err error)
+ DeleteFile(key string) (err error)
+}
+
+type StringProperties interface {
+ GetString(key string) string
+}
+type StorageType string
+type BackendStorageFactory interface {
+ StorageType() StorageType
+ BuildStorage(configuration StringProperties, configPrefix string, id string) (BackendStorage, error)
}
var (
- StorageBackends []DataStorageBackend
+ BackendStorageFactories = make(map[StorageType]BackendStorageFactory)
+ BackendStorages = make(map[string]BackendStorage)
)
+
+// used by master to load remote storage configurations
+func LoadConfiguration(config *viper.Viper) {
+
+ StorageBackendPrefix := "storage.backend"
+
+ for backendTypeName := range config.GetStringMap(StorageBackendPrefix) {
+ backendStorageFactory, found := BackendStorageFactories[StorageType(backendTypeName)]
+ if !found {
+ glog.Fatalf("backend storage type %s not found", backendTypeName)
+ }
+ for backendStorageId := range config.GetStringMap(StorageBackendPrefix + "." + backendTypeName) {
+ if !config.GetBool(StorageBackendPrefix + "." + backendTypeName + "." + backendStorageId + ".enabled") {
+ continue
+ }
+ backendStorage, buildErr := backendStorageFactory.BuildStorage(config,
+ StorageBackendPrefix+"."+backendTypeName+"."+backendStorageId+".", backendStorageId)
+ if buildErr != nil {
+ glog.Fatalf("fail to create backend storage %s.%s", backendTypeName, backendStorageId)
+ }
+ BackendStorages[backendTypeName+"."+backendStorageId] = backendStorage
+ if backendStorageId == "default" {
+ BackendStorages[backendTypeName] = backendStorage
+ }
+ }
+ }
+
+}
+
+// used by volume server to receive remote storage configurations from master
+func LoadFromPbStorageBackends(storageBackends []*master_pb.StorageBackend) {
+
+ for _, storageBackend := range storageBackends {
+ backendStorageFactory, found := BackendStorageFactories[StorageType(storageBackend.Type)]
+ if !found {
+ glog.Warningf("storage type %s not found", storageBackend.Type)
+ continue
+ }
+ backendStorage, buildErr := backendStorageFactory.BuildStorage(newProperties(storageBackend.Properties), "", storageBackend.Id)
+ if buildErr != nil {
+ glog.Fatalf("fail to create backend storage %s.%s", storageBackend.Type, storageBackend.Id)
+ }
+ BackendStorages[storageBackend.Type+"."+storageBackend.Id] = backendStorage
+ if storageBackend.Id == "default" {
+ BackendStorages[storageBackend.Type] = backendStorage
+ }
+ }
+}
+
+type Properties struct {
+ m map[string]string
+}
+
+func newProperties(m map[string]string) *Properties {
+ return &Properties{m: m}
+}
+
+func (p *Properties) GetString(key string) string {
+ if v, found := p.m[key]; found {
+ return v
+ }
+ return ""
+}
+
+func ToPbStorageBackends() (backends []*master_pb.StorageBackend) {
+ for sName, s := range BackendStorages {
+ sType, sId := BackendNameToTypeId(sName)
+ if sType == "" {
+ continue
+ }
+ backends = append(backends, &master_pb.StorageBackend{
+ Type: sType,
+ Id: sId,
+ Properties: s.ToProperties(),
+ })
+ }
+ return
+}
+
+func BackendNameToTypeId(backendName string) (backendType, backendId string) {
+ parts := strings.Split(backendName, ".")
+ if len(parts) == 1 {
+ return backendName, "default"
+ }
+ if len(parts) != 2 {
+ return
+ }
+
+ backendType, backendId = parts[0], parts[1]
+ return
+}
diff --git a/weed/storage/backend/disk_file.go b/weed/storage/backend/disk_file.go
index 7f2b39d15..c4b3caffb 100644
--- a/weed/storage/backend/disk_file.go
+++ b/weed/storage/backend/disk_file.go
@@ -6,7 +6,7 @@ import (
)
var (
- _ DataStorageBackend = &DiskFile{}
+ _ BackendStorageFile = &DiskFile{}
)
type DiskFile struct {
@@ -45,6 +45,6 @@ func (df *DiskFile) GetStat() (datSize int64, modTime time.Time, err error) {
return 0, time.Time{}, err
}
-func (df *DiskFile) String() string {
+func (df *DiskFile) Name() string {
return df.fullFilePath
}
diff --git a/weed/storage/backend/memory_map/memory_map_backend.go b/weed/storage/backend/memory_map/memory_map_backend.go
index bac105022..03e7308d0 100644
--- a/weed/storage/backend/memory_map/memory_map_backend.go
+++ b/weed/storage/backend/memory_map/memory_map_backend.go
@@ -8,7 +8,7 @@ import (
)
var (
- _ backend.DataStorageBackend = &MemoryMappedFile{}
+ _ backend.BackendStorageFile = &MemoryMappedFile{}
)
type MemoryMappedFile struct {
@@ -55,6 +55,6 @@ func (mmf *MemoryMappedFile) GetStat() (datSize int64, modTime time.Time, err er
return 0, time.Time{}, err
}
-func (mmf *MemoryMappedFile) String() string {
+func (mmf *MemoryMappedFile) Name() string {
return mmf.mm.File.Name()
}
diff --git a/weed/storage/backend/s3_backend/s3_backend.go b/weed/storage/backend/s3_backend/s3_backend.go
index 0ff7eca21..8d71861c2 100644
--- a/weed/storage/backend/s3_backend/s3_backend.go
+++ b/weed/storage/backend/s3_backend/s3_backend.go
@@ -2,119 +2,176 @@ package s3_backend
import (
"fmt"
+ "io"
+ "os"
"strings"
"time"
"github.com/aws/aws-sdk-go/service/s3"
"github.com/aws/aws-sdk-go/service/s3/s3iface"
+ "github.com/google/uuid"
+
"github.com/chrislusf/seaweedfs/weed/glog"
+ "github.com/chrislusf/seaweedfs/weed/pb/volume_server_pb"
"github.com/chrislusf/seaweedfs/weed/storage/backend"
- "github.com/chrislusf/seaweedfs/weed/storage/needle"
- "github.com/chrislusf/seaweedfs/weed/util"
-)
-
-var (
- _ backend.DataStorageBackend = &S3Backend{}
)
func init() {
- backend.StorageBackends = append(backend.StorageBackends, &S3Backend{})
+ backend.BackendStorageFactories["s3"] = &S3BackendFactory{}
}
-type S3Backend struct {
- conn s3iface.S3API
- region string
- bucket string
- dir string
- vid needle.VolumeId
- key string
+type S3BackendFactory struct {
}
-func (s3backend S3Backend) ReadAt(p []byte, off int64) (n int, err error) {
- bytesRange := fmt.Sprintf("bytes=%d-%d", off, off+int64(len(p))-1)
- getObjectOutput, getObjectErr := s3backend.conn.GetObject(&s3.GetObjectInput{
- Bucket: &s3backend.bucket,
- Key: &s3backend.key,
- Range: &bytesRange,
- })
+func (factory *S3BackendFactory) StorageType() backend.StorageType {
+ return backend.StorageType("s3")
+}
+func (factory *S3BackendFactory) BuildStorage(configuration backend.StringProperties, configPrefix string, id string) (backend.BackendStorage, error) {
+ return newS3BackendStorage(configuration, configPrefix, id)
+}
- if getObjectErr != nil {
- return 0, fmt.Errorf("bucket %s GetObject %s: %v", s3backend.bucket, s3backend.key, getObjectErr)
+type S3BackendStorage struct {
+ id string
+ aws_access_key_id string
+ aws_secret_access_key string
+ region string
+ bucket string
+ conn s3iface.S3API
+}
+
+func newS3BackendStorage(configuration backend.StringProperties, configPrefix string, id string) (s *S3BackendStorage, err error) {
+ s = &S3BackendStorage{}
+ s.id = id
+ s.aws_access_key_id = configuration.GetString(configPrefix + "aws_access_key_id")
+ s.aws_secret_access_key = configuration.GetString(configPrefix + "aws_secret_access_key")
+ s.region = configuration.GetString(configPrefix + "region")
+ s.bucket = configuration.GetString(configPrefix + "bucket")
+ s.conn, err = createSession(s.aws_access_key_id, s.aws_secret_access_key, s.region)
+
+ glog.V(0).Infof("created backend storage s3.%s for region %s bucket %s", s.id, s.region, s.bucket)
+ return
+}
+
+func (s *S3BackendStorage) ToProperties() map[string]string {
+ m := make(map[string]string)
+ m["aws_access_key_id"] = s.aws_access_key_id
+ m["aws_secret_access_key"] = s.aws_secret_access_key
+ m["region"] = s.region
+ m["bucket"] = s.bucket
+ return m
+}
+
+func (s *S3BackendStorage) NewStorageFile(key string, tierInfo *volume_server_pb.VolumeInfo) backend.BackendStorageFile {
+ if strings.HasPrefix(key, "/") {
+ key = key[1:]
}
- defer getObjectOutput.Body.Close()
- return getObjectOutput.Body.Read(p)
+ f := &S3BackendStorageFile{
+ backendStorage: s,
+ key: key,
+ tierInfo: tierInfo,
+ }
+ return f
}
-func (s3backend S3Backend) WriteAt(p []byte, off int64) (n int, err error) {
- panic("implement me")
+func (s *S3BackendStorage) CopyFile(f *os.File, attributes map[string]string, fn func(progressed int64, percentage float32) error) (key string, size int64, err error) {
+ randomUuid, _ := uuid.NewRandom()
+ key = randomUuid.String()
+
+ glog.V(1).Infof("copying dat file of %s to remote s3.%s as %s", f.Name(), s.id, key)
+
+ size, err = uploadToS3(s.conn, f.Name(), s.bucket, key, attributes, fn)
+
+ return
}
-func (s3backend S3Backend) Truncate(off int64) error {
- panic("implement me")
+func (s *S3BackendStorage) DownloadFile(fileName string, key string, fn func(progressed int64, percentage float32) error) (size int64, err error) {
+
+ glog.V(1).Infof("download dat file of %s from remote s3.%s as %s", fileName, s.id, key)
+
+ size, err = downloadFromS3(s.conn, fileName, s.bucket, key, fn)
+
+ return
}
-func (s3backend S3Backend) Close() error {
- return nil
+func (s *S3BackendStorage) DeleteFile(key string) (err error) {
+
+ glog.V(1).Infof("delete dat file %s from remote", key)
+
+ err = deleteFromS3(s.conn, s.bucket, key)
+
+ return
}
-func (s3backend S3Backend) GetStat() (datSize int64, modTime time.Time, err error) {
+type S3BackendStorageFile struct {
+ backendStorage *S3BackendStorage
+ key string
+ tierInfo *volume_server_pb.VolumeInfo
+}
- headObjectOutput, headObjectErr := s3backend.conn.HeadObject(&s3.HeadObjectInput{
- Bucket: &s3backend.bucket,
- Key: &s3backend.key,
+func (s3backendStorageFile S3BackendStorageFile) ReadAt(p []byte, off int64) (n int, err error) {
+
+ bytesRange := fmt.Sprintf("bytes=%d-%d", off, off+int64(len(p))-1)
+
+ // glog.V(0).Infof("read %s %s", s3backendStorageFile.key, bytesRange)
+
+ getObjectOutput, getObjectErr := s3backendStorageFile.backendStorage.conn.GetObject(&s3.GetObjectInput{
+ Bucket: &s3backendStorageFile.backendStorage.bucket,
+ Key: &s3backendStorageFile.key,
+ Range: &bytesRange,
})
- if headObjectErr != nil {
- return 0, time.Now(), fmt.Errorf("bucket %s HeadObject %s: %v", s3backend.bucket, s3backend.key, headObjectErr)
+ if getObjectErr != nil {
+ return 0, fmt.Errorf("bucket %s GetObject %s: %v", s3backendStorageFile.backendStorage.bucket, s3backendStorageFile.key, getObjectErr)
+ }
+ defer getObjectOutput.Body.Close()
+
+ glog.V(4).Infof("read %s %s", s3backendStorageFile.key, bytesRange)
+ glog.V(4).Infof("content range: %s, contentLength: %d", *getObjectOutput.ContentRange, *getObjectOutput.ContentLength)
+
+ for {
+ if n, err = getObjectOutput.Body.Read(p); err == nil && n < len(p) {
+ p = p[n:]
+ } else {
+ break
+ }
}
- datSize = int64(*headObjectOutput.ContentLength)
- modTime = *headObjectOutput.LastModified
+ if err == io.EOF {
+ err = nil
+ }
return
}
-func (s3backend S3Backend) String() string {
- return fmt.Sprintf("%s/%s", s3backend.bucket, s3backend.key)
+func (s3backendStorageFile S3BackendStorageFile) WriteAt(p []byte, off int64) (n int, err error) {
+ panic("not implemented")
}
-func (s3backend *S3Backend) GetName() string {
- return "s3"
+func (s3backendStorageFile S3BackendStorageFile) Truncate(off int64) error {
+ panic("not implemented")
}
-func (s3backend *S3Backend) GetSinkToDirectory() string {
- return s3backend.dir
+func (s3backendStorageFile S3BackendStorageFile) Close() error {
+ return nil
}
-func (s3backend *S3Backend) Initialize(configuration util.Configuration, vid needle.VolumeId) error {
- glog.V(0).Infof("storage.backend.s3.region: %v", configuration.GetString("region"))
- glog.V(0).Infof("storage.backend.s3.bucket: %v", configuration.GetString("bucket"))
- glog.V(0).Infof("storage.backend.s3.directory: %v", configuration.GetString("directory"))
+func (s3backendStorageFile S3BackendStorageFile) GetStat() (datSize int64, modTime time.Time, err error) {
- return s3backend.initialize(
- configuration.GetString("aws_access_key_id"),
- configuration.GetString("aws_secret_access_key"),
- configuration.GetString("region"),
- configuration.GetString("bucket"),
- configuration.GetString("directory"),
- vid,
- )
-}
-
-func (s3backend *S3Backend) initialize(awsAccessKeyId, awsSecretAccessKey, region, bucket, dir string,
- vid needle.VolumeId) (err error) {
- s3backend.region = region
- s3backend.bucket = bucket
- s3backend.dir = dir
- s3backend.conn, err = createSession(awsAccessKeyId, awsSecretAccessKey, region)
+ files := s3backendStorageFile.tierInfo.GetFiles()
- s3backend.vid = vid
- s3backend.key = fmt.Sprintf("%s/%d.dat", dir, vid)
- if strings.HasPrefix(s3backend.key, "/") {
- s3backend.key = s3backend.key[1:]
+ if len(files) == 0 {
+ err = fmt.Errorf("remote file info not found")
+ return
}
- return err
+ datSize = int64(files[0].FileSize)
+ modTime = time.Unix(int64(files[0].ModifiedTime), 0)
+
+ return
+}
+
+func (s3backendStorageFile S3BackendStorageFile) Name() string {
+ return s3backendStorageFile.key
}
diff --git a/weed/storage/backend/s3_backend/s3_download.go b/weed/storage/backend/s3_backend/s3_download.go
new file mode 100644
index 000000000..dbc28446a
--- /dev/null
+++ b/weed/storage/backend/s3_backend/s3_download.go
@@ -0,0 +1,98 @@
+package s3_backend
+
+import (
+ "fmt"
+ "os"
+ "sync/atomic"
+
+ "github.com/aws/aws-sdk-go/aws"
+ "github.com/aws/aws-sdk-go/service/s3"
+ "github.com/aws/aws-sdk-go/service/s3/s3iface"
+ "github.com/aws/aws-sdk-go/service/s3/s3manager"
+
+ "github.com/chrislusf/seaweedfs/weed/glog"
+)
+
+func downloadFromS3(sess s3iface.S3API, destFileName string, sourceBucket string, sourceKey string,
+ fn func(progressed int64, percentage float32) error) (fileSize int64, err error) {
+
+ fileSize, err = getFileSize(sess, sourceBucket, sourceKey)
+ if err != nil {
+ return
+ }
+
+ //open the file
+ f, err := os.OpenFile(destFileName, os.O_WRONLY|os.O_CREATE|os.O_TRUNC, 0644)
+ if err != nil {
+ return 0, fmt.Errorf("failed to open file %q, %v", destFileName, err)
+ }
+ defer f.Close()
+
+ // Create a downloader with the session and custom options
+ downloader := s3manager.NewDownloaderWithClient(sess, func(u *s3manager.Downloader) {
+ u.PartSize = int64(64 * 1024 * 1024)
+ u.Concurrency = 5
+ })
+
+ fileWriter := &s3DownloadProgressedWriter{
+ fp: f,
+ size: fileSize,
+ written: 0,
+ fn: fn,
+ }
+
+ // Download the file from S3.
+ fileSize, err = downloader.Download(fileWriter, &s3.GetObjectInput{
+ Bucket: aws.String(sourceBucket),
+ Key: aws.String(sourceKey),
+ })
+ if err != nil {
+ return fileSize, fmt.Errorf("failed to download file %s: %v", destFileName, err)
+ }
+
+ glog.V(1).Infof("downloaded file %s\n", destFileName)
+
+ return
+}
+
+// adapted from https://github.com/aws/aws-sdk-go/pull/1868
+// and https://petersouter.xyz/s3-download-progress-bar-in-golang/
+type s3DownloadProgressedWriter struct {
+ fp *os.File
+ size int64
+ written int64
+ fn func(progressed int64, percentage float32) error
+}
+
+func (w *s3DownloadProgressedWriter) WriteAt(p []byte, off int64) (int, error) {
+ n, err := w.fp.WriteAt(p, off)
+ if err != nil {
+ return n, err
+ }
+
+ // Got the length have read( or means has uploaded), and you can construct your message
+ atomic.AddInt64(&w.written, int64(n))
+
+ if w.fn != nil {
+ written := w.written
+ if err := w.fn(written, float32(written*100)/float32(w.size)); err != nil {
+ return n, err
+ }
+ }
+
+ return n, err
+}
+
+func getFileSize(svc s3iface.S3API, bucket string, key string) (filesize int64, error error) {
+ params := &s3.HeadObjectInput{
+ Bucket: aws.String(bucket),
+ Key: aws.String(key),
+ }
+
+ resp, err := svc.HeadObject(params)
+ if err != nil {
+ return 0, err
+ }
+
+ return *resp.ContentLength, nil
+}
diff --git a/weed/storage/backend/s3_backend/s3_sessions.go b/weed/storage/backend/s3_backend/s3_sessions.go
index cd7b7ad47..5fdbcb66b 100644
--- a/weed/storage/backend/s3_backend/s3_sessions.go
+++ b/weed/storage/backend/s3_backend/s3_sessions.go
@@ -52,3 +52,11 @@ func createSession(awsAccessKeyId, awsSecretAccessKey, region string) (s3iface.S
return t, nil
}
+
+func deleteFromS3(sess s3iface.S3API, sourceBucket string, sourceKey string) (err error) {
+ _, err = sess.DeleteObject(&s3.DeleteObjectInput{
+ Bucket: aws.String(sourceBucket),
+ Key: aws.String(sourceKey),
+ })
+ return err
+}
diff --git a/weed/storage/backend/s3_backend/s3_upload.go b/weed/storage/backend/s3_backend/s3_upload.go
new file mode 100644
index 000000000..500a85590
--- /dev/null
+++ b/weed/storage/backend/s3_backend/s3_upload.go
@@ -0,0 +1,114 @@
+package s3_backend
+
+import (
+ "fmt"
+ "os"
+ "sync/atomic"
+
+ "github.com/aws/aws-sdk-go/aws"
+ "github.com/aws/aws-sdk-go/service/s3/s3iface"
+ "github.com/aws/aws-sdk-go/service/s3/s3manager"
+
+ "github.com/chrislusf/seaweedfs/weed/glog"
+)
+
+func uploadToS3(sess s3iface.S3API, filename string, destBucket string, destKey string,
+ attributes map[string]string,
+ fn func(progressed int64, percentage float32) error) (fileSize int64, err error) {
+
+ //open the file
+ f, err := os.Open(filename)
+ if err != nil {
+ return 0, fmt.Errorf("failed to open file %q, %v", filename, err)
+ }
+ defer f.Close()
+
+ info, err := f.Stat()
+ if err != nil {
+ return 0, fmt.Errorf("failed to stat file %q, %v", filename, err)
+ }
+
+ fileSize = info.Size()
+
+ partSize := int64(64 * 1024 * 1024) // The minimum/default allowed part size is 5MB
+ for partSize*1000 < fileSize {
+ partSize *= 4
+ }
+
+ // Create an uploader with the session and custom options
+ uploader := s3manager.NewUploaderWithClient(sess, func(u *s3manager.Uploader) {
+ u.PartSize = partSize
+ u.Concurrency = 5
+ })
+
+ fileReader := &s3UploadProgressedReader{
+ fp: f,
+ size: fileSize,
+ read: -fileSize,
+ fn: fn,
+ }
+
+ // process tagging
+ tags := ""
+ for k, v := range attributes {
+ if len(tags) > 0 {
+ tags = tags + "&"
+ }
+ tags = tags + k + "=" + v
+ }
+
+ // Upload the file to S3.
+ var result *s3manager.UploadOutput
+ result, err = uploader.Upload(&s3manager.UploadInput{
+ Bucket: aws.String(destBucket),
+ Key: aws.String(destKey),
+ Body: fileReader,
+ ACL: aws.String("private"),
+ ServerSideEncryption: aws.String("AES256"),
+ StorageClass: aws.String("STANDARD_IA"),
+ Tagging: aws.String(tags),
+ })
+
+ //in case it fails to upload
+ if err != nil {
+ return 0, fmt.Errorf("failed to upload file %s: %v", filename, err)
+ }
+ glog.V(1).Infof("file %s uploaded to %s\n", filename, result.Location)
+
+ return
+}
+
+// adapted from https://github.com/aws/aws-sdk-go/pull/1868
+type s3UploadProgressedReader struct {
+ fp *os.File
+ size int64
+ read int64
+ fn func(progressed int64, percentage float32) error
+}
+
+func (r *s3UploadProgressedReader) Read(p []byte) (int, error) {
+ return r.fp.Read(p)
+}
+
+func (r *s3UploadProgressedReader) ReadAt(p []byte, off int64) (int, error) {
+ n, err := r.fp.ReadAt(p, off)
+ if err != nil {
+ return n, err
+ }
+
+ // Got the length have read( or means has uploaded), and you can construct your message
+ atomic.AddInt64(&r.read, int64(n))
+
+ if r.fn != nil {
+ read := r.read
+ if err := r.fn(read, float32(read*100)/float32(r.size)); err != nil {
+ return n, err
+ }
+ }
+
+ return n, err
+}
+
+func (r *s3UploadProgressedReader) Seek(offset int64, whence int) (int64, error) {
+ return r.fp.Seek(offset, whence)
+}