aboutsummaryrefslogtreecommitdiff
path: root/weed/storage/backend
diff options
context:
space:
mode:
Diffstat (limited to 'weed/storage/backend')
-rw-r--r--weed/storage/backend/backend.go2
-rw-r--r--weed/storage/backend/disk_file.go4
-rw-r--r--weed/storage/backend/memory_map/memory_map_backend.go4
-rw-r--r--weed/storage/backend/s3_backend/s3_backend.go17
-rw-r--r--weed/storage/backend/s3_backend/s3_upload.go56
5 files changed, 74 insertions, 9 deletions
diff --git a/weed/storage/backend/backend.go b/weed/storage/backend/backend.go
index 3c297f20b..4d72abc87 100644
--- a/weed/storage/backend/backend.go
+++ b/weed/storage/backend/backend.go
@@ -2,6 +2,7 @@ package backend
import (
"io"
+ "os"
"time"
)
@@ -12,6 +13,7 @@ type DataStorageBackend interface {
io.Closer
GetStat() (datSize int64, modTime time.Time, err error)
String() string
+ Instantiate(src *os.File) error
}
var (
diff --git a/weed/storage/backend/disk_file.go b/weed/storage/backend/disk_file.go
index 7f2b39d15..4fc3ed0c4 100644
--- a/weed/storage/backend/disk_file.go
+++ b/weed/storage/backend/disk_file.go
@@ -48,3 +48,7 @@ func (df *DiskFile) GetStat() (datSize int64, modTime time.Time, err error) {
func (df *DiskFile) String() string {
return df.fullFilePath
}
+
+func (df *DiskFile) Instantiate(src *os.File) error {
+ panic("should not implement Instantiate for DiskFile")
+}
diff --git a/weed/storage/backend/memory_map/memory_map_backend.go b/weed/storage/backend/memory_map/memory_map_backend.go
index bac105022..c57252683 100644
--- a/weed/storage/backend/memory_map/memory_map_backend.go
+++ b/weed/storage/backend/memory_map/memory_map_backend.go
@@ -58,3 +58,7 @@ func (mmf *MemoryMappedFile) GetStat() (datSize int64, modTime time.Time, err er
func (mmf *MemoryMappedFile) String() string {
return mmf.mm.File.Name()
}
+
+func (mmf *MemoryMappedFile) Instantiate(src *os.File) error {
+ panic("should not implement Instantiate for MemoryMappedFile")
+}
diff --git a/weed/storage/backend/s3_backend/s3_backend.go b/weed/storage/backend/s3_backend/s3_backend.go
index 0ff7eca21..69360806f 100644
--- a/weed/storage/backend/s3_backend/s3_backend.go
+++ b/weed/storage/backend/s3_backend/s3_backend.go
@@ -2,6 +2,7 @@ package s3_backend
import (
"fmt"
+ "os"
"strings"
"time"
@@ -25,7 +26,6 @@ type S3Backend struct {
conn s3iface.S3API
region string
bucket string
- dir string
vid needle.VolumeId
key string
}
@@ -84,11 +84,11 @@ func (s3backend *S3Backend) GetName() string {
return "s3"
}
-func (s3backend *S3Backend) GetSinkToDirectory() string {
- return s3backend.dir
+func (s3backend S3Backend) Instantiate(src *os.File) error {
+ panic("implement me")
}
-func (s3backend *S3Backend) Initialize(configuration util.Configuration, vid needle.VolumeId) error {
+func (s3backend *S3Backend) Initialize(configuration util.Configuration, prefix string, vid needle.VolumeId) error {
glog.V(0).Infof("storage.backend.s3.region: %v", configuration.GetString("region"))
glog.V(0).Infof("storage.backend.s3.bucket: %v", configuration.GetString("bucket"))
glog.V(0).Infof("storage.backend.s3.directory: %v", configuration.GetString("directory"))
@@ -98,20 +98,19 @@ func (s3backend *S3Backend) Initialize(configuration util.Configuration, vid nee
configuration.GetString("aws_secret_access_key"),
configuration.GetString("region"),
configuration.GetString("bucket"),
- configuration.GetString("directory"),
+ prefix,
vid,
)
}
-func (s3backend *S3Backend) initialize(awsAccessKeyId, awsSecretAccessKey, region, bucket, dir string,
- vid needle.VolumeId) (err error) {
+func (s3backend *S3Backend) initialize(awsAccessKeyId, awsSecretAccessKey, region, bucket string,
+ prefix string, vid needle.VolumeId) (err error) {
s3backend.region = region
s3backend.bucket = bucket
- s3backend.dir = dir
s3backend.conn, err = createSession(awsAccessKeyId, awsSecretAccessKey, region)
s3backend.vid = vid
- s3backend.key = fmt.Sprintf("%s/%d.dat", dir, vid)
+ s3backend.key = fmt.Sprintf("%s_%d.dat", prefix, vid)
if strings.HasPrefix(s3backend.key, "/") {
s3backend.key = s3backend.key[1:]
}
diff --git a/weed/storage/backend/s3_backend/s3_upload.go b/weed/storage/backend/s3_backend/s3_upload.go
new file mode 100644
index 000000000..1fde0a84f
--- /dev/null
+++ b/weed/storage/backend/s3_backend/s3_upload.go
@@ -0,0 +1,56 @@
+package s3_backend
+
+import (
+ "fmt"
+ "os"
+
+ "github.com/aws/aws-sdk-go/aws"
+ "github.com/aws/aws-sdk-go/service/s3/s3iface"
+ "github.com/aws/aws-sdk-go/service/s3/s3manager"
+)
+
+func uploadToS3(sess s3iface.S3API, filename string, destBucket string, destKey string) error {
+
+ //open the file
+ f, err := os.Open(filename)
+ if err != nil {
+ return fmt.Errorf("failed to open file %q, %v", filename, err)
+ }
+ defer f.Close()
+
+ info, err := f.Stat()
+ if err != nil {
+ return fmt.Errorf("failed to stat file %q, %v", filename, err)
+ }
+
+ fileSize := info.Size()
+
+ partSize := int64(64 * 1024 * 1024) // The minimum/default allowed part size is 5MB
+ for partSize*1000 < fileSize {
+ partSize *= 4
+ }
+
+ // Create an uploader with the session and custom options
+ uploader := s3manager.NewUploaderWithClient(sess, func(u *s3manager.Uploader) {
+ u.PartSize = partSize
+ u.Concurrency = 15 // default is 15
+ })
+
+ // Upload the file to S3.
+ result, err := uploader.Upload(&s3manager.UploadInput{
+ Bucket: aws.String(destBucket),
+ Key: aws.String(destKey),
+ Body: f,
+ ACL: aws.String("private"),
+ ServerSideEncryption: aws.String("AES256"),
+ StorageClass: aws.String("STANDARD_IA"),
+ })
+
+ //in case it fails to upload
+ if err != nil {
+ return fmt.Errorf("failed to upload file, %v", err)
+ }
+ fmt.Printf("file %s uploaded to %s\n", filename, result.Location)
+
+ return nil
+}