diff options
Diffstat (limited to 'weed/storage/backend')
| -rw-r--r-- | weed/storage/backend/backend.go | 122 | ||||
| -rw-r--r-- | weed/storage/backend/disk_file.go | 4 | ||||
| -rw-r--r-- | weed/storage/backend/memory_map/memory_map_backend.go | 4 | ||||
| -rw-r--r-- | weed/storage/backend/s3_backend/s3_backend.go | 197 | ||||
| -rw-r--r-- | weed/storage/backend/s3_backend/s3_download.go | 98 | ||||
| -rw-r--r-- | weed/storage/backend/s3_backend/s3_sessions.go | 8 | ||||
| -rw-r--r-- | weed/storage/backend/s3_backend/s3_upload.go | 114 |
7 files changed, 470 insertions, 77 deletions
diff --git a/weed/storage/backend/backend.go b/weed/storage/backend/backend.go index 3c297f20b..6941ca5a1 100644 --- a/weed/storage/backend/backend.go +++ b/weed/storage/backend/backend.go @@ -2,18 +2,134 @@ package backend import ( "io" + "os" + "strings" "time" + + "github.com/chrislusf/seaweedfs/weed/glog" + "github.com/chrislusf/seaweedfs/weed/pb/master_pb" + "github.com/chrislusf/seaweedfs/weed/pb/volume_server_pb" + "github.com/spf13/viper" ) -type DataStorageBackend interface { +type BackendStorageFile interface { io.ReaderAt io.WriterAt Truncate(off int64) error io.Closer GetStat() (datSize int64, modTime time.Time, err error) - String() string + Name() string +} + +type BackendStorage interface { + ToProperties() map[string]string + NewStorageFile(key string, tierInfo *volume_server_pb.VolumeInfo) BackendStorageFile + CopyFile(f *os.File, attributes map[string]string, fn func(progressed int64, percentage float32) error) (key string, size int64, err error) + DownloadFile(fileName string, key string, fn func(progressed int64, percentage float32) error) (size int64, err error) + DeleteFile(key string) (err error) +} + +type StringProperties interface { + GetString(key string) string +} +type StorageType string +type BackendStorageFactory interface { + StorageType() StorageType + BuildStorage(configuration StringProperties, configPrefix string, id string) (BackendStorage, error) } var ( - StorageBackends []DataStorageBackend + BackendStorageFactories = make(map[StorageType]BackendStorageFactory) + BackendStorages = make(map[string]BackendStorage) ) + +// used by master to load remote storage configurations +func LoadConfiguration(config *viper.Viper) { + + StorageBackendPrefix := "storage.backend" + + for backendTypeName := range config.GetStringMap(StorageBackendPrefix) { + backendStorageFactory, found := BackendStorageFactories[StorageType(backendTypeName)] + if !found { + glog.Fatalf("backend storage type %s not found", backendTypeName) + } + for backendStorageId := range config.GetStringMap(StorageBackendPrefix + "." + backendTypeName) { + if !config.GetBool(StorageBackendPrefix + "." + backendTypeName + "." + backendStorageId + ".enabled") { + continue + } + backendStorage, buildErr := backendStorageFactory.BuildStorage(config, + StorageBackendPrefix+"."+backendTypeName+"."+backendStorageId+".", backendStorageId) + if buildErr != nil { + glog.Fatalf("fail to create backend storage %s.%s", backendTypeName, backendStorageId) + } + BackendStorages[backendTypeName+"."+backendStorageId] = backendStorage + if backendStorageId == "default" { + BackendStorages[backendTypeName] = backendStorage + } + } + } + +} + +// used by volume server to receive remote storage configurations from master +func LoadFromPbStorageBackends(storageBackends []*master_pb.StorageBackend) { + + for _, storageBackend := range storageBackends { + backendStorageFactory, found := BackendStorageFactories[StorageType(storageBackend.Type)] + if !found { + glog.Warningf("storage type %s not found", storageBackend.Type) + continue + } + backendStorage, buildErr := backendStorageFactory.BuildStorage(newProperties(storageBackend.Properties), "", storageBackend.Id) + if buildErr != nil { + glog.Fatalf("fail to create backend storage %s.%s", storageBackend.Type, storageBackend.Id) + } + BackendStorages[storageBackend.Type+"."+storageBackend.Id] = backendStorage + if storageBackend.Id == "default" { + BackendStorages[storageBackend.Type] = backendStorage + } + } +} + +type Properties struct { + m map[string]string +} + +func newProperties(m map[string]string) *Properties { + return &Properties{m: m} +} + +func (p *Properties) GetString(key string) string { + if v, found := p.m[key]; found { + return v + } + return "" +} + +func ToPbStorageBackends() (backends []*master_pb.StorageBackend) { + for sName, s := range BackendStorages { + sType, sId := BackendNameToTypeId(sName) + if sType == "" { + continue + } + backends = append(backends, &master_pb.StorageBackend{ + Type: sType, + Id: sId, + Properties: s.ToProperties(), + }) + } + return +} + +func BackendNameToTypeId(backendName string) (backendType, backendId string) { + parts := strings.Split(backendName, ".") + if len(parts) == 1 { + return backendName, "default" + } + if len(parts) != 2 { + return + } + + backendType, backendId = parts[0], parts[1] + return +} diff --git a/weed/storage/backend/disk_file.go b/weed/storage/backend/disk_file.go index 7f2b39d15..c4b3caffb 100644 --- a/weed/storage/backend/disk_file.go +++ b/weed/storage/backend/disk_file.go @@ -6,7 +6,7 @@ import ( ) var ( - _ DataStorageBackend = &DiskFile{} + _ BackendStorageFile = &DiskFile{} ) type DiskFile struct { @@ -45,6 +45,6 @@ func (df *DiskFile) GetStat() (datSize int64, modTime time.Time, err error) { return 0, time.Time{}, err } -func (df *DiskFile) String() string { +func (df *DiskFile) Name() string { return df.fullFilePath } diff --git a/weed/storage/backend/memory_map/memory_map_backend.go b/weed/storage/backend/memory_map/memory_map_backend.go index bac105022..03e7308d0 100644 --- a/weed/storage/backend/memory_map/memory_map_backend.go +++ b/weed/storage/backend/memory_map/memory_map_backend.go @@ -8,7 +8,7 @@ import ( ) var ( - _ backend.DataStorageBackend = &MemoryMappedFile{} + _ backend.BackendStorageFile = &MemoryMappedFile{} ) type MemoryMappedFile struct { @@ -55,6 +55,6 @@ func (mmf *MemoryMappedFile) GetStat() (datSize int64, modTime time.Time, err er return 0, time.Time{}, err } -func (mmf *MemoryMappedFile) String() string { +func (mmf *MemoryMappedFile) Name() string { return mmf.mm.File.Name() } diff --git a/weed/storage/backend/s3_backend/s3_backend.go b/weed/storage/backend/s3_backend/s3_backend.go index 0ff7eca21..8d71861c2 100644 --- a/weed/storage/backend/s3_backend/s3_backend.go +++ b/weed/storage/backend/s3_backend/s3_backend.go @@ -2,119 +2,176 @@ package s3_backend import ( "fmt" + "io" + "os" "strings" "time" "github.com/aws/aws-sdk-go/service/s3" "github.com/aws/aws-sdk-go/service/s3/s3iface" + "github.com/google/uuid" + "github.com/chrislusf/seaweedfs/weed/glog" + "github.com/chrislusf/seaweedfs/weed/pb/volume_server_pb" "github.com/chrislusf/seaweedfs/weed/storage/backend" - "github.com/chrislusf/seaweedfs/weed/storage/needle" - "github.com/chrislusf/seaweedfs/weed/util" -) - -var ( - _ backend.DataStorageBackend = &S3Backend{} ) func init() { - backend.StorageBackends = append(backend.StorageBackends, &S3Backend{}) + backend.BackendStorageFactories["s3"] = &S3BackendFactory{} } -type S3Backend struct { - conn s3iface.S3API - region string - bucket string - dir string - vid needle.VolumeId - key string +type S3BackendFactory struct { } -func (s3backend S3Backend) ReadAt(p []byte, off int64) (n int, err error) { - bytesRange := fmt.Sprintf("bytes=%d-%d", off, off+int64(len(p))-1) - getObjectOutput, getObjectErr := s3backend.conn.GetObject(&s3.GetObjectInput{ - Bucket: &s3backend.bucket, - Key: &s3backend.key, - Range: &bytesRange, - }) +func (factory *S3BackendFactory) StorageType() backend.StorageType { + return backend.StorageType("s3") +} +func (factory *S3BackendFactory) BuildStorage(configuration backend.StringProperties, configPrefix string, id string) (backend.BackendStorage, error) { + return newS3BackendStorage(configuration, configPrefix, id) +} - if getObjectErr != nil { - return 0, fmt.Errorf("bucket %s GetObject %s: %v", s3backend.bucket, s3backend.key, getObjectErr) +type S3BackendStorage struct { + id string + aws_access_key_id string + aws_secret_access_key string + region string + bucket string + conn s3iface.S3API +} + +func newS3BackendStorage(configuration backend.StringProperties, configPrefix string, id string) (s *S3BackendStorage, err error) { + s = &S3BackendStorage{} + s.id = id + s.aws_access_key_id = configuration.GetString(configPrefix + "aws_access_key_id") + s.aws_secret_access_key = configuration.GetString(configPrefix + "aws_secret_access_key") + s.region = configuration.GetString(configPrefix + "region") + s.bucket = configuration.GetString(configPrefix + "bucket") + s.conn, err = createSession(s.aws_access_key_id, s.aws_secret_access_key, s.region) + + glog.V(0).Infof("created backend storage s3.%s for region %s bucket %s", s.id, s.region, s.bucket) + return +} + +func (s *S3BackendStorage) ToProperties() map[string]string { + m := make(map[string]string) + m["aws_access_key_id"] = s.aws_access_key_id + m["aws_secret_access_key"] = s.aws_secret_access_key + m["region"] = s.region + m["bucket"] = s.bucket + return m +} + +func (s *S3BackendStorage) NewStorageFile(key string, tierInfo *volume_server_pb.VolumeInfo) backend.BackendStorageFile { + if strings.HasPrefix(key, "/") { + key = key[1:] } - defer getObjectOutput.Body.Close() - return getObjectOutput.Body.Read(p) + f := &S3BackendStorageFile{ + backendStorage: s, + key: key, + tierInfo: tierInfo, + } + return f } -func (s3backend S3Backend) WriteAt(p []byte, off int64) (n int, err error) { - panic("implement me") +func (s *S3BackendStorage) CopyFile(f *os.File, attributes map[string]string, fn func(progressed int64, percentage float32) error) (key string, size int64, err error) { + randomUuid, _ := uuid.NewRandom() + key = randomUuid.String() + + glog.V(1).Infof("copying dat file of %s to remote s3.%s as %s", f.Name(), s.id, key) + + size, err = uploadToS3(s.conn, f.Name(), s.bucket, key, attributes, fn) + + return } -func (s3backend S3Backend) Truncate(off int64) error { - panic("implement me") +func (s *S3BackendStorage) DownloadFile(fileName string, key string, fn func(progressed int64, percentage float32) error) (size int64, err error) { + + glog.V(1).Infof("download dat file of %s from remote s3.%s as %s", fileName, s.id, key) + + size, err = downloadFromS3(s.conn, fileName, s.bucket, key, fn) + + return } -func (s3backend S3Backend) Close() error { - return nil +func (s *S3BackendStorage) DeleteFile(key string) (err error) { + + glog.V(1).Infof("delete dat file %s from remote", key) + + err = deleteFromS3(s.conn, s.bucket, key) + + return } -func (s3backend S3Backend) GetStat() (datSize int64, modTime time.Time, err error) { +type S3BackendStorageFile struct { + backendStorage *S3BackendStorage + key string + tierInfo *volume_server_pb.VolumeInfo +} - headObjectOutput, headObjectErr := s3backend.conn.HeadObject(&s3.HeadObjectInput{ - Bucket: &s3backend.bucket, - Key: &s3backend.key, +func (s3backendStorageFile S3BackendStorageFile) ReadAt(p []byte, off int64) (n int, err error) { + + bytesRange := fmt.Sprintf("bytes=%d-%d", off, off+int64(len(p))-1) + + // glog.V(0).Infof("read %s %s", s3backendStorageFile.key, bytesRange) + + getObjectOutput, getObjectErr := s3backendStorageFile.backendStorage.conn.GetObject(&s3.GetObjectInput{ + Bucket: &s3backendStorageFile.backendStorage.bucket, + Key: &s3backendStorageFile.key, + Range: &bytesRange, }) - if headObjectErr != nil { - return 0, time.Now(), fmt.Errorf("bucket %s HeadObject %s: %v", s3backend.bucket, s3backend.key, headObjectErr) + if getObjectErr != nil { + return 0, fmt.Errorf("bucket %s GetObject %s: %v", s3backendStorageFile.backendStorage.bucket, s3backendStorageFile.key, getObjectErr) + } + defer getObjectOutput.Body.Close() + + glog.V(4).Infof("read %s %s", s3backendStorageFile.key, bytesRange) + glog.V(4).Infof("content range: %s, contentLength: %d", *getObjectOutput.ContentRange, *getObjectOutput.ContentLength) + + for { + if n, err = getObjectOutput.Body.Read(p); err == nil && n < len(p) { + p = p[n:] + } else { + break + } } - datSize = int64(*headObjectOutput.ContentLength) - modTime = *headObjectOutput.LastModified + if err == io.EOF { + err = nil + } return } -func (s3backend S3Backend) String() string { - return fmt.Sprintf("%s/%s", s3backend.bucket, s3backend.key) +func (s3backendStorageFile S3BackendStorageFile) WriteAt(p []byte, off int64) (n int, err error) { + panic("not implemented") } -func (s3backend *S3Backend) GetName() string { - return "s3" +func (s3backendStorageFile S3BackendStorageFile) Truncate(off int64) error { + panic("not implemented") } -func (s3backend *S3Backend) GetSinkToDirectory() string { - return s3backend.dir +func (s3backendStorageFile S3BackendStorageFile) Close() error { + return nil } -func (s3backend *S3Backend) Initialize(configuration util.Configuration, vid needle.VolumeId) error { - glog.V(0).Infof("storage.backend.s3.region: %v", configuration.GetString("region")) - glog.V(0).Infof("storage.backend.s3.bucket: %v", configuration.GetString("bucket")) - glog.V(0).Infof("storage.backend.s3.directory: %v", configuration.GetString("directory")) +func (s3backendStorageFile S3BackendStorageFile) GetStat() (datSize int64, modTime time.Time, err error) { - return s3backend.initialize( - configuration.GetString("aws_access_key_id"), - configuration.GetString("aws_secret_access_key"), - configuration.GetString("region"), - configuration.GetString("bucket"), - configuration.GetString("directory"), - vid, - ) -} - -func (s3backend *S3Backend) initialize(awsAccessKeyId, awsSecretAccessKey, region, bucket, dir string, - vid needle.VolumeId) (err error) { - s3backend.region = region - s3backend.bucket = bucket - s3backend.dir = dir - s3backend.conn, err = createSession(awsAccessKeyId, awsSecretAccessKey, region) + files := s3backendStorageFile.tierInfo.GetFiles() - s3backend.vid = vid - s3backend.key = fmt.Sprintf("%s/%d.dat", dir, vid) - if strings.HasPrefix(s3backend.key, "/") { - s3backend.key = s3backend.key[1:] + if len(files) == 0 { + err = fmt.Errorf("remote file info not found") + return } - return err + datSize = int64(files[0].FileSize) + modTime = time.Unix(int64(files[0].ModifiedTime), 0) + + return +} + +func (s3backendStorageFile S3BackendStorageFile) Name() string { + return s3backendStorageFile.key } diff --git a/weed/storage/backend/s3_backend/s3_download.go b/weed/storage/backend/s3_backend/s3_download.go new file mode 100644 index 000000000..dbc28446a --- /dev/null +++ b/weed/storage/backend/s3_backend/s3_download.go @@ -0,0 +1,98 @@ +package s3_backend + +import ( + "fmt" + "os" + "sync/atomic" + + "github.com/aws/aws-sdk-go/aws" + "github.com/aws/aws-sdk-go/service/s3" + "github.com/aws/aws-sdk-go/service/s3/s3iface" + "github.com/aws/aws-sdk-go/service/s3/s3manager" + + "github.com/chrislusf/seaweedfs/weed/glog" +) + +func downloadFromS3(sess s3iface.S3API, destFileName string, sourceBucket string, sourceKey string, + fn func(progressed int64, percentage float32) error) (fileSize int64, err error) { + + fileSize, err = getFileSize(sess, sourceBucket, sourceKey) + if err != nil { + return + } + + //open the file + f, err := os.OpenFile(destFileName, os.O_WRONLY|os.O_CREATE|os.O_TRUNC, 0644) + if err != nil { + return 0, fmt.Errorf("failed to open file %q, %v", destFileName, err) + } + defer f.Close() + + // Create a downloader with the session and custom options + downloader := s3manager.NewDownloaderWithClient(sess, func(u *s3manager.Downloader) { + u.PartSize = int64(64 * 1024 * 1024) + u.Concurrency = 5 + }) + + fileWriter := &s3DownloadProgressedWriter{ + fp: f, + size: fileSize, + written: 0, + fn: fn, + } + + // Download the file from S3. + fileSize, err = downloader.Download(fileWriter, &s3.GetObjectInput{ + Bucket: aws.String(sourceBucket), + Key: aws.String(sourceKey), + }) + if err != nil { + return fileSize, fmt.Errorf("failed to download file %s: %v", destFileName, err) + } + + glog.V(1).Infof("downloaded file %s\n", destFileName) + + return +} + +// adapted from https://github.com/aws/aws-sdk-go/pull/1868 +// and https://petersouter.xyz/s3-download-progress-bar-in-golang/ +type s3DownloadProgressedWriter struct { + fp *os.File + size int64 + written int64 + fn func(progressed int64, percentage float32) error +} + +func (w *s3DownloadProgressedWriter) WriteAt(p []byte, off int64) (int, error) { + n, err := w.fp.WriteAt(p, off) + if err != nil { + return n, err + } + + // Got the length have read( or means has uploaded), and you can construct your message + atomic.AddInt64(&w.written, int64(n)) + + if w.fn != nil { + written := w.written + if err := w.fn(written, float32(written*100)/float32(w.size)); err != nil { + return n, err + } + } + + return n, err +} + +func getFileSize(svc s3iface.S3API, bucket string, key string) (filesize int64, error error) { + params := &s3.HeadObjectInput{ + Bucket: aws.String(bucket), + Key: aws.String(key), + } + + resp, err := svc.HeadObject(params) + if err != nil { + return 0, err + } + + return *resp.ContentLength, nil +} diff --git a/weed/storage/backend/s3_backend/s3_sessions.go b/weed/storage/backend/s3_backend/s3_sessions.go index cd7b7ad47..5fdbcb66b 100644 --- a/weed/storage/backend/s3_backend/s3_sessions.go +++ b/weed/storage/backend/s3_backend/s3_sessions.go @@ -52,3 +52,11 @@ func createSession(awsAccessKeyId, awsSecretAccessKey, region string) (s3iface.S return t, nil } + +func deleteFromS3(sess s3iface.S3API, sourceBucket string, sourceKey string) (err error) { + _, err = sess.DeleteObject(&s3.DeleteObjectInput{ + Bucket: aws.String(sourceBucket), + Key: aws.String(sourceKey), + }) + return err +} diff --git a/weed/storage/backend/s3_backend/s3_upload.go b/weed/storage/backend/s3_backend/s3_upload.go new file mode 100644 index 000000000..500a85590 --- /dev/null +++ b/weed/storage/backend/s3_backend/s3_upload.go @@ -0,0 +1,114 @@ +package s3_backend + +import ( + "fmt" + "os" + "sync/atomic" + + "github.com/aws/aws-sdk-go/aws" + "github.com/aws/aws-sdk-go/service/s3/s3iface" + "github.com/aws/aws-sdk-go/service/s3/s3manager" + + "github.com/chrislusf/seaweedfs/weed/glog" +) + +func uploadToS3(sess s3iface.S3API, filename string, destBucket string, destKey string, + attributes map[string]string, + fn func(progressed int64, percentage float32) error) (fileSize int64, err error) { + + //open the file + f, err := os.Open(filename) + if err != nil { + return 0, fmt.Errorf("failed to open file %q, %v", filename, err) + } + defer f.Close() + + info, err := f.Stat() + if err != nil { + return 0, fmt.Errorf("failed to stat file %q, %v", filename, err) + } + + fileSize = info.Size() + + partSize := int64(64 * 1024 * 1024) // The minimum/default allowed part size is 5MB + for partSize*1000 < fileSize { + partSize *= 4 + } + + // Create an uploader with the session and custom options + uploader := s3manager.NewUploaderWithClient(sess, func(u *s3manager.Uploader) { + u.PartSize = partSize + u.Concurrency = 5 + }) + + fileReader := &s3UploadProgressedReader{ + fp: f, + size: fileSize, + read: -fileSize, + fn: fn, + } + + // process tagging + tags := "" + for k, v := range attributes { + if len(tags) > 0 { + tags = tags + "&" + } + tags = tags + k + "=" + v + } + + // Upload the file to S3. + var result *s3manager.UploadOutput + result, err = uploader.Upload(&s3manager.UploadInput{ + Bucket: aws.String(destBucket), + Key: aws.String(destKey), + Body: fileReader, + ACL: aws.String("private"), + ServerSideEncryption: aws.String("AES256"), + StorageClass: aws.String("STANDARD_IA"), + Tagging: aws.String(tags), + }) + + //in case it fails to upload + if err != nil { + return 0, fmt.Errorf("failed to upload file %s: %v", filename, err) + } + glog.V(1).Infof("file %s uploaded to %s\n", filename, result.Location) + + return +} + +// adapted from https://github.com/aws/aws-sdk-go/pull/1868 +type s3UploadProgressedReader struct { + fp *os.File + size int64 + read int64 + fn func(progressed int64, percentage float32) error +} + +func (r *s3UploadProgressedReader) Read(p []byte) (int, error) { + return r.fp.Read(p) +} + +func (r *s3UploadProgressedReader) ReadAt(p []byte, off int64) (int, error) { + n, err := r.fp.ReadAt(p, off) + if err != nil { + return n, err + } + + // Got the length have read( or means has uploaded), and you can construct your message + atomic.AddInt64(&r.read, int64(n)) + + if r.fn != nil { + read := r.read + if err := r.fn(read, float32(read*100)/float32(r.size)); err != nil { + return n, err + } + } + + return n, err +} + +func (r *s3UploadProgressedReader) Seek(offset int64, whence int) (int64, error) { + return r.fp.Seek(offset, whence) +} |
