diff options
Diffstat (limited to 'weed/storage/backend')
| -rw-r--r-- | weed/storage/backend/backend.go | 2 | ||||
| -rw-r--r-- | weed/storage/backend/disk_file.go | 4 | ||||
| -rw-r--r-- | weed/storage/backend/memory_map/memory_map_backend.go | 4 | ||||
| -rw-r--r-- | weed/storage/backend/s3_backend/s3_backend.go | 17 | ||||
| -rw-r--r-- | weed/storage/backend/s3_backend/s3_upload.go | 56 |
5 files changed, 74 insertions, 9 deletions
diff --git a/weed/storage/backend/backend.go b/weed/storage/backend/backend.go index 3c297f20b..4d72abc87 100644 --- a/weed/storage/backend/backend.go +++ b/weed/storage/backend/backend.go @@ -2,6 +2,7 @@ package backend import ( "io" + "os" "time" ) @@ -12,6 +13,7 @@ type DataStorageBackend interface { io.Closer GetStat() (datSize int64, modTime time.Time, err error) String() string + Instantiate(src *os.File) error } var ( diff --git a/weed/storage/backend/disk_file.go b/weed/storage/backend/disk_file.go index 7f2b39d15..4fc3ed0c4 100644 --- a/weed/storage/backend/disk_file.go +++ b/weed/storage/backend/disk_file.go @@ -48,3 +48,7 @@ func (df *DiskFile) GetStat() (datSize int64, modTime time.Time, err error) { func (df *DiskFile) String() string { return df.fullFilePath } + +func (df *DiskFile) Instantiate(src *os.File) error { + panic("should not implement Instantiate for DiskFile") +} diff --git a/weed/storage/backend/memory_map/memory_map_backend.go b/weed/storage/backend/memory_map/memory_map_backend.go index bac105022..c57252683 100644 --- a/weed/storage/backend/memory_map/memory_map_backend.go +++ b/weed/storage/backend/memory_map/memory_map_backend.go @@ -58,3 +58,7 @@ func (mmf *MemoryMappedFile) GetStat() (datSize int64, modTime time.Time, err er func (mmf *MemoryMappedFile) String() string { return mmf.mm.File.Name() } + +func (mmf *MemoryMappedFile) Instantiate(src *os.File) error { + panic("should not implement Instantiate for MemoryMappedFile") +} diff --git a/weed/storage/backend/s3_backend/s3_backend.go b/weed/storage/backend/s3_backend/s3_backend.go index 0ff7eca21..69360806f 100644 --- a/weed/storage/backend/s3_backend/s3_backend.go +++ b/weed/storage/backend/s3_backend/s3_backend.go @@ -2,6 +2,7 @@ package s3_backend import ( "fmt" + "os" "strings" "time" @@ -25,7 +26,6 @@ type S3Backend struct { conn s3iface.S3API region string bucket string - dir string vid needle.VolumeId key string } @@ -84,11 +84,11 @@ func (s3backend *S3Backend) GetName() string { return "s3" } -func (s3backend *S3Backend) GetSinkToDirectory() string { - return s3backend.dir +func (s3backend S3Backend) Instantiate(src *os.File) error { + panic("implement me") } -func (s3backend *S3Backend) Initialize(configuration util.Configuration, vid needle.VolumeId) error { +func (s3backend *S3Backend) Initialize(configuration util.Configuration, prefix string, vid needle.VolumeId) error { glog.V(0).Infof("storage.backend.s3.region: %v", configuration.GetString("region")) glog.V(0).Infof("storage.backend.s3.bucket: %v", configuration.GetString("bucket")) glog.V(0).Infof("storage.backend.s3.directory: %v", configuration.GetString("directory")) @@ -98,20 +98,19 @@ func (s3backend *S3Backend) Initialize(configuration util.Configuration, vid nee configuration.GetString("aws_secret_access_key"), configuration.GetString("region"), configuration.GetString("bucket"), - configuration.GetString("directory"), + prefix, vid, ) } -func (s3backend *S3Backend) initialize(awsAccessKeyId, awsSecretAccessKey, region, bucket, dir string, - vid needle.VolumeId) (err error) { +func (s3backend *S3Backend) initialize(awsAccessKeyId, awsSecretAccessKey, region, bucket string, + prefix string, vid needle.VolumeId) (err error) { s3backend.region = region s3backend.bucket = bucket - s3backend.dir = dir s3backend.conn, err = createSession(awsAccessKeyId, awsSecretAccessKey, region) s3backend.vid = vid - s3backend.key = fmt.Sprintf("%s/%d.dat", dir, vid) + s3backend.key = fmt.Sprintf("%s_%d.dat", prefix, vid) if strings.HasPrefix(s3backend.key, "/") { s3backend.key = s3backend.key[1:] } diff --git a/weed/storage/backend/s3_backend/s3_upload.go b/weed/storage/backend/s3_backend/s3_upload.go new file mode 100644 index 000000000..1fde0a84f --- /dev/null +++ b/weed/storage/backend/s3_backend/s3_upload.go @@ -0,0 +1,56 @@ +package s3_backend + +import ( + "fmt" + "os" + + "github.com/aws/aws-sdk-go/aws" + "github.com/aws/aws-sdk-go/service/s3/s3iface" + "github.com/aws/aws-sdk-go/service/s3/s3manager" +) + +func uploadToS3(sess s3iface.S3API, filename string, destBucket string, destKey string) error { + + //open the file + f, err := os.Open(filename) + if err != nil { + return fmt.Errorf("failed to open file %q, %v", filename, err) + } + defer f.Close() + + info, err := f.Stat() + if err != nil { + return fmt.Errorf("failed to stat file %q, %v", filename, err) + } + + fileSize := info.Size() + + partSize := int64(64 * 1024 * 1024) // The minimum/default allowed part size is 5MB + for partSize*1000 < fileSize { + partSize *= 4 + } + + // Create an uploader with the session and custom options + uploader := s3manager.NewUploaderWithClient(sess, func(u *s3manager.Uploader) { + u.PartSize = partSize + u.Concurrency = 15 // default is 15 + }) + + // Upload the file to S3. + result, err := uploader.Upload(&s3manager.UploadInput{ + Bucket: aws.String(destBucket), + Key: aws.String(destKey), + Body: f, + ACL: aws.String("private"), + ServerSideEncryption: aws.String("AES256"), + StorageClass: aws.String("STANDARD_IA"), + }) + + //in case it fails to upload + if err != nil { + return fmt.Errorf("failed to upload file, %v", err) + } + fmt.Printf("file %s uploaded to %s\n", filename, result.Location) + + return nil +} |
