aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorChris Lu <chris.lu@gmail.com>2019-11-18 19:24:34 -0800
committerChris Lu <chris.lu@gmail.com>2019-11-18 19:24:37 -0800
commit9711a6ffaabdac8516317a5539316e9a2bb83faf (patch)
tree153c006cb1316cf4092383085ceffcab0c215ee3
parentba73c053c3cf61ff83e1700c639dcb1538f42b41 (diff)
downloadseaweedfs-9711a6ffaabdac8516317a5539316e9a2bb83faf.tar.xz
seaweedfs-9711a6ffaabdac8516317a5539316e9a2bb83faf.zip
WIP
-rw-r--r--weed/command/scaffold.go8
-rw-r--r--weed/notification/aws_sqs/aws_sqs_pub.go6
-rw-r--r--weed/replication/sink/s3sink/s3_sink.go6
-rw-r--r--weed/replication/sub/notification_aws_sqs.go6
-rw-r--r--weed/storage/backend/backend.go4
-rw-r--r--weed/storage/backend/s3_backend/s3_backend.go120
-rw-r--r--weed/storage/backend/s3_backend/s3_sessions.go54
7 files changed, 195 insertions, 9 deletions
diff --git a/weed/command/scaffold.go b/weed/command/scaffold.go
index ed7df359a..13091764e 100644
--- a/weed/command/scaffold.go
+++ b/weed/command/scaffold.go
@@ -356,5 +356,13 @@ type = memory # Choose [memory|etcd] type for storing the file id sequence
sequencer_etcd_urls = http://127.0.0.1:2379
+[storage.backend.s3]
+enabled = true
+aws_access_key_id = "" # if empty, loads from the shared credentials file (~/.aws/credentials).
+aws_secret_access_key = "" # if empty, loads from the shared credentials file (~/.aws/credentials).
+region = "us-east-2"
+bucket = "your_bucket_name" # an existing bucket
+directory = "/" # destination directory
+
`
)
diff --git a/weed/notification/aws_sqs/aws_sqs_pub.go b/weed/notification/aws_sqs/aws_sqs_pub.go
index c1af7f27a..4c1302abb 100644
--- a/weed/notification/aws_sqs/aws_sqs_pub.go
+++ b/weed/notification/aws_sqs/aws_sqs_pub.go
@@ -38,13 +38,13 @@ func (k *AwsSqsPub) Initialize(configuration util.Configuration) (err error) {
)
}
-func (k *AwsSqsPub) initialize(awsAccessKeyId, aswSecretAccessKey, region, queueName string) (err error) {
+func (k *AwsSqsPub) initialize(awsAccessKeyId, awsSecretAccessKey, region, queueName string) (err error) {
config := &aws.Config{
Region: aws.String(region),
}
- if awsAccessKeyId != "" && aswSecretAccessKey != "" {
- config.Credentials = credentials.NewStaticCredentials(awsAccessKeyId, aswSecretAccessKey, "")
+ if awsAccessKeyId != "" && awsSecretAccessKey != "" {
+ config.Credentials = credentials.NewStaticCredentials(awsAccessKeyId, awsSecretAccessKey, "")
}
sess, err := session.NewSession(config)
diff --git a/weed/replication/sink/s3sink/s3_sink.go b/weed/replication/sink/s3sink/s3_sink.go
index d5cad3541..4cff341d0 100644
--- a/weed/replication/sink/s3sink/s3_sink.go
+++ b/weed/replication/sink/s3sink/s3_sink.go
@@ -56,7 +56,7 @@ func (s3sink *S3Sink) SetSourceFiler(s *source.FilerSource) {
s3sink.filerSource = s
}
-func (s3sink *S3Sink) initialize(awsAccessKeyId, aswSecretAccessKey, region, bucket, dir string) error {
+func (s3sink *S3Sink) initialize(awsAccessKeyId, awsSecretAccessKey, region, bucket, dir string) error {
s3sink.region = region
s3sink.bucket = bucket
s3sink.dir = dir
@@ -64,8 +64,8 @@ func (s3sink *S3Sink) initialize(awsAccessKeyId, aswSecretAccessKey, region, buc
config := &aws.Config{
Region: aws.String(s3sink.region),
}
- if awsAccessKeyId != "" && aswSecretAccessKey != "" {
- config.Credentials = credentials.NewStaticCredentials(awsAccessKeyId, aswSecretAccessKey, "")
+ if awsAccessKeyId != "" && awsSecretAccessKey != "" {
+ config.Credentials = credentials.NewStaticCredentials(awsAccessKeyId, awsSecretAccessKey, "")
}
sess, err := session.NewSession(config)
diff --git a/weed/replication/sub/notification_aws_sqs.go b/weed/replication/sub/notification_aws_sqs.go
index f0100f4de..bed26c79c 100644
--- a/weed/replication/sub/notification_aws_sqs.go
+++ b/weed/replication/sub/notification_aws_sqs.go
@@ -38,13 +38,13 @@ func (k *AwsSqsInput) Initialize(configuration util.Configuration) error {
)
}
-func (k *AwsSqsInput) initialize(awsAccessKeyId, aswSecretAccessKey, region, queueName string) (err error) {
+func (k *AwsSqsInput) initialize(awsAccessKeyId, awsSecretAccessKey, region, queueName string) (err error) {
config := &aws.Config{
Region: aws.String(region),
}
- if awsAccessKeyId != "" && aswSecretAccessKey != "" {
- config.Credentials = credentials.NewStaticCredentials(awsAccessKeyId, aswSecretAccessKey, "")
+ if awsAccessKeyId != "" && awsSecretAccessKey != "" {
+ config.Credentials = credentials.NewStaticCredentials(awsAccessKeyId, awsSecretAccessKey, "")
}
sess, err := session.NewSession(config)
diff --git a/weed/storage/backend/backend.go b/weed/storage/backend/backend.go
index ae0f84216..3c297f20b 100644
--- a/weed/storage/backend/backend.go
+++ b/weed/storage/backend/backend.go
@@ -13,3 +13,7 @@ type DataStorageBackend interface {
GetStat() (datSize int64, modTime time.Time, err error)
String() string
}
+
+var (
+ StorageBackends []DataStorageBackend
+)
diff --git a/weed/storage/backend/s3_backend/s3_backend.go b/weed/storage/backend/s3_backend/s3_backend.go
new file mode 100644
index 000000000..0ff7eca21
--- /dev/null
+++ b/weed/storage/backend/s3_backend/s3_backend.go
@@ -0,0 +1,120 @@
+package s3_backend
+
+import (
+ "fmt"
+ "strings"
+ "time"
+
+ "github.com/aws/aws-sdk-go/service/s3"
+ "github.com/aws/aws-sdk-go/service/s3/s3iface"
+ "github.com/chrislusf/seaweedfs/weed/glog"
+ "github.com/chrislusf/seaweedfs/weed/storage/backend"
+ "github.com/chrislusf/seaweedfs/weed/storage/needle"
+ "github.com/chrislusf/seaweedfs/weed/util"
+)
+
+var (
+ _ backend.DataStorageBackend = &S3Backend{}
+)
+
+func init() {
+ backend.StorageBackends = append(backend.StorageBackends, &S3Backend{})
+}
+
+type S3Backend struct {
+ conn s3iface.S3API
+ region string
+ bucket string
+ dir string
+ vid needle.VolumeId
+ key string
+}
+
+func (s3backend S3Backend) ReadAt(p []byte, off int64) (n int, err error) {
+ bytesRange := fmt.Sprintf("bytes=%d-%d", off, off+int64(len(p))-1)
+ getObjectOutput, getObjectErr := s3backend.conn.GetObject(&s3.GetObjectInput{
+ Bucket: &s3backend.bucket,
+ Key: &s3backend.key,
+ Range: &bytesRange,
+ })
+
+ if getObjectErr != nil {
+ return 0, fmt.Errorf("bucket %s GetObject %s: %v", s3backend.bucket, s3backend.key, getObjectErr)
+ }
+ defer getObjectOutput.Body.Close()
+
+ return getObjectOutput.Body.Read(p)
+
+}
+
+func (s3backend S3Backend) WriteAt(p []byte, off int64) (n int, err error) {
+ panic("implement me")
+}
+
+func (s3backend S3Backend) Truncate(off int64) error {
+ panic("implement me")
+}
+
+func (s3backend S3Backend) Close() error {
+ return nil
+}
+
+func (s3backend S3Backend) GetStat() (datSize int64, modTime time.Time, err error) {
+
+ headObjectOutput, headObjectErr := s3backend.conn.HeadObject(&s3.HeadObjectInput{
+ Bucket: &s3backend.bucket,
+ Key: &s3backend.key,
+ })
+
+ if headObjectErr != nil {
+ return 0, time.Now(), fmt.Errorf("bucket %s HeadObject %s: %v", s3backend.bucket, s3backend.key, headObjectErr)
+ }
+
+ datSize = int64(*headObjectOutput.ContentLength)
+ modTime = *headObjectOutput.LastModified
+
+ return
+}
+
+func (s3backend S3Backend) String() string {
+ return fmt.Sprintf("%s/%s", s3backend.bucket, s3backend.key)
+}
+
+func (s3backend *S3Backend) GetName() string {
+ return "s3"
+}
+
+func (s3backend *S3Backend) GetSinkToDirectory() string {
+ return s3backend.dir
+}
+
+func (s3backend *S3Backend) Initialize(configuration util.Configuration, vid needle.VolumeId) error {
+ glog.V(0).Infof("storage.backend.s3.region: %v", configuration.GetString("region"))
+ glog.V(0).Infof("storage.backend.s3.bucket: %v", configuration.GetString("bucket"))
+ glog.V(0).Infof("storage.backend.s3.directory: %v", configuration.GetString("directory"))
+
+ return s3backend.initialize(
+ configuration.GetString("aws_access_key_id"),
+ configuration.GetString("aws_secret_access_key"),
+ configuration.GetString("region"),
+ configuration.GetString("bucket"),
+ configuration.GetString("directory"),
+ vid,
+ )
+}
+
+func (s3backend *S3Backend) initialize(awsAccessKeyId, awsSecretAccessKey, region, bucket, dir string,
+ vid needle.VolumeId) (err error) {
+ s3backend.region = region
+ s3backend.bucket = bucket
+ s3backend.dir = dir
+ s3backend.conn, err = createSession(awsAccessKeyId, awsSecretAccessKey, region)
+
+ s3backend.vid = vid
+ s3backend.key = fmt.Sprintf("%s/%d.dat", dir, vid)
+ if strings.HasPrefix(s3backend.key, "/") {
+ s3backend.key = s3backend.key[1:]
+ }
+
+ return err
+}
diff --git a/weed/storage/backend/s3_backend/s3_sessions.go b/weed/storage/backend/s3_backend/s3_sessions.go
new file mode 100644
index 000000000..ef4f8c137
--- /dev/null
+++ b/weed/storage/backend/s3_backend/s3_sessions.go
@@ -0,0 +1,54 @@
+package s3_backend
+
+import (
+ "fmt"
+ "sync"
+
+ "github.com/aws/aws-sdk-go/aws"
+ "github.com/aws/aws-sdk-go/aws/credentials"
+ "github.com/aws/aws-sdk-go/aws/session"
+ "github.com/aws/aws-sdk-go/service/s3"
+ "github.com/aws/aws-sdk-go/service/s3/s3iface"
+)
+
+var (
+ s3Sessions = make(map[string]s3iface.S3API)
+ sessionsLock sync.RWMutex
+)
+
+func getSession(region string) (s3iface.S3API, bool) {
+ sessionsLock.RLock()
+ defer sessionsLock.RUnlock()
+
+ sess, found := s3Sessions[region]
+ return sess, found
+}
+
+func createSession(awsAccessKeyId, awsSecretAccessKey, region string) (s3iface.S3API, error) {
+
+ sessionsLock.Lock()
+ defer sessionsLock.Unlock()
+
+ if t, found := s3Sessions[region]; found {
+ return t, nil
+ }
+
+ config := &aws.Config{
+ Region: aws.String(region),
+ }
+ if awsAccessKeyId != "" && awsSecretAccessKey != "" {
+ config.Credentials = credentials.NewStaticCredentials(awsAccessKeyId, awsSecretAccessKey, "")
+ }
+
+ sess, err := session.NewSession(config)
+ if err != nil {
+ return nil, fmt.Errorf("create aws session in region %s: %v", region, err)
+ }
+
+ t:= s3.New(sess)
+
+ s3Sessions[region] = t
+
+ return t, nil
+
+} \ No newline at end of file