aboutsummaryrefslogtreecommitdiff
path: root/weed/remote_storage
diff options
context:
space:
mode:
authorChris Lu <chris.lu@gmail.com>2021-08-08 01:21:42 -0700
committerChris Lu <chris.lu@gmail.com>2021-08-08 01:21:42 -0700
commit13e45e16054d16e8d8161a8ddb02fde3cd4cde8f (patch)
tree29d7b15932e10b0adafe2b27b3618e330805f75c /weed/remote_storage
parent8f5170c1389f2d0bac75ca2f95a676a05283317b (diff)
downloadseaweedfs-13e45e16054d16e8d8161a8ddb02fde3cd4cde8f.tar.xz
seaweedfs-13e45e16054d16e8d8161a8ddb02fde3cd4cde8f.zip
filer.remote.sync can work now
Diffstat (limited to 'weed/remote_storage')
-rw-r--r--weed/remote_storage/remote_storage.go6
-rw-r--r--weed/remote_storage/s3/s3_storage_client.go94
2 files changed, 93 insertions, 7 deletions
diff --git a/weed/remote_storage/remote_storage.go b/weed/remote_storage/remote_storage.go
index 06c089d7a..608d158ad 100644
--- a/weed/remote_storage/remote_storage.go
+++ b/weed/remote_storage/remote_storage.go
@@ -3,6 +3,7 @@ package remote_storage
import (
"fmt"
"github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
+ "io"
"strings"
"sync"
)
@@ -30,7 +31,10 @@ type VisitFunc func(dir string, name string, isDirectory bool, remoteEntry *file
type RemoteStorageClient interface {
Traverse(loc *filer_pb.RemoteStorageLocation, visitFn VisitFunc) error
- ReadFile(loc *filer_pb.RemoteStorageLocation, offset int64, size int64) (data[]byte, err error)
+ ReadFile(loc *filer_pb.RemoteStorageLocation, offset int64, size int64) (data []byte, err error)
+ WriteFile(loc *filer_pb.RemoteStorageLocation, entry *filer_pb.Entry, reader io.Reader) (err error)
+ UpdateFileMetadata(loc *filer_pb.RemoteStorageLocation, entry *filer_pb.Entry) (err error)
+ DeleteFile(loc *filer_pb.RemoteStorageLocation) (err error)
}
type RemoteStorageClientMaker interface {
diff --git a/weed/remote_storage/s3/s3_storage_client.go b/weed/remote_storage/s3/s3_storage_client.go
index 2263054f3..316751227 100644
--- a/weed/remote_storage/s3/s3_storage_client.go
+++ b/weed/remote_storage/s3/s3_storage_client.go
@@ -8,9 +8,11 @@ import (
"github.com/aws/aws-sdk-go/service/s3"
"github.com/aws/aws-sdk-go/service/s3/s3iface"
"github.com/aws/aws-sdk-go/service/s3/s3manager"
+ "github.com/chrislusf/seaweedfs/weed/filer"
"github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
"github.com/chrislusf/seaweedfs/weed/remote_storage"
"github.com/chrislusf/seaweedfs/weed/util"
+ "io"
)
func init() {
@@ -45,7 +47,9 @@ type s3RemoteStorageClient struct {
conn s3iface.S3API
}
-func (s s3RemoteStorageClient) Traverse(remote *filer_pb.RemoteStorageLocation, visitFn remote_storage.VisitFunc) (err error) {
+var _ = remote_storage.RemoteStorageClient(&s3RemoteStorageClient{})
+
+func (s *s3RemoteStorageClient) Traverse(remote *filer_pb.RemoteStorageLocation, visitFn remote_storage.VisitFunc) (err error) {
pathKey := remote.Path[1:]
@@ -91,19 +95,19 @@ func (s s3RemoteStorageClient) Traverse(remote *filer_pb.RemoteStorageLocation,
}
return
}
-func (s s3RemoteStorageClient) ReadFile(loc *filer_pb.RemoteStorageLocation, offset int64, size int64) (data[]byte, err error) {
+func (s *s3RemoteStorageClient) ReadFile(loc *filer_pb.RemoteStorageLocation, offset int64, size int64) (data []byte, err error) {
downloader := s3manager.NewDownloaderWithClient(s.conn, func(u *s3manager.Downloader) {
u.PartSize = int64(4 * 1024 * 1024)
u.Concurrency = 1
})
-
+
dataSlice := make([]byte, int(size))
writerAt := aws.NewWriteAtBuffer(dataSlice)
_, err = downloader.Download(writerAt, &s3.GetObjectInput{
- Bucket: aws.String(loc.Bucket),
- Key: aws.String(loc.Path[1:]),
- Range: aws.String(fmt.Sprintf("bytes=%d-%d", offset, offset+size-1)),
+ Bucket: aws.String(loc.Bucket),
+ Key: aws.String(loc.Path[1:]),
+ Range: aws.String(fmt.Sprintf("bytes=%d-%d", offset, offset+size-1)),
})
if err != nil {
return nil, fmt.Errorf("failed to download file %s%s: %v", loc.Bucket, loc.Path, err)
@@ -111,3 +115,81 @@ func (s s3RemoteStorageClient) ReadFile(loc *filer_pb.RemoteStorageLocation, off
return writerAt.Bytes(), nil
}
+
+func (s *s3RemoteStorageClient) WriteFile(loc *filer_pb.RemoteStorageLocation, entry *filer_pb.Entry, reader io.Reader) (err error) {
+
+ fileSize := int64(filer.FileSize(entry))
+
+ partSize := int64(8 * 1024 * 1024) // The minimum/default allowed part size is 5MB
+ for partSize*1000 < fileSize {
+ partSize *= 4
+ }
+
+ // Create an uploader with the session and custom options
+ uploader := s3manager.NewUploaderWithClient(s.conn, func(u *s3manager.Uploader) {
+ u.PartSize = partSize
+ u.Concurrency = 5
+ })
+
+ // process tagging
+ tags := ""
+ for k, v := range entry.Extended {
+ if len(tags) > 0 {
+ tags = tags + "&"
+ }
+ tags = tags + k + "=" + string(v)
+ }
+
+ // Upload the file to S3.
+ _, err = uploader.Upload(&s3manager.UploadInput{
+ Bucket: aws.String(loc.Bucket),
+ Key: aws.String(loc.Path[1:]),
+ Body: reader,
+ ACL: aws.String("private"),
+ ServerSideEncryption: aws.String("AES256"),
+ StorageClass: aws.String("STANDARD_IA"),
+ Tagging: aws.String(tags),
+ })
+
+ //in case it fails to upload
+ if err != nil {
+ return fmt.Errorf("upload to s3 %s/%s%s: %v", loc.Name, loc.Bucket, loc.Path, err)
+ }
+
+ return nil
+}
+
+func toTagging(attributes map[string][]byte) *s3.Tagging {
+ tagging := &s3.Tagging{}
+ for k, v := range attributes {
+ tagging.TagSet = append(tagging.TagSet, &s3.Tag{
+ Key: aws.String(k),
+ Value: aws.String(string(v)),
+ })
+ }
+ return tagging
+}
+
+func (s *s3RemoteStorageClient) UpdateFileMetadata(loc *filer_pb.RemoteStorageLocation, entry *filer_pb.Entry) (err error) {
+ tagging := toTagging(entry.Extended)
+ if len(tagging.TagSet) > 0 {
+ _, err = s.conn.PutObjectTagging(&s3.PutObjectTaggingInput{
+ Bucket: aws.String(loc.Bucket),
+ Key: aws.String(loc.Path[1:]),
+ Tagging: toTagging(entry.Extended),
+ })
+ } else {
+ _, err = s.conn.DeleteObjectTagging(&s3.DeleteObjectTaggingInput{
+ Bucket: aws.String(loc.Bucket),
+ Key: aws.String(loc.Path[1:]),
+ })
+ }
+ return
+}
+func (s *s3RemoteStorageClient) DeleteFile(loc *filer_pb.RemoteStorageLocation) (err error) {
+ _, err = s.conn.DeleteObject(&s3.DeleteObjectInput{
+ Bucket: aws.String(loc.Bucket),
+ Key: aws.String(loc.Path[1:]),
+ })
+ return
+}