diff options
| author | Chris Lu <chris.lu@gmail.com> | 2021-08-08 01:21:42 -0700 |
|---|---|---|
| committer | Chris Lu <chris.lu@gmail.com> | 2021-08-08 01:21:42 -0700 |
| commit | 13e45e16054d16e8d8161a8ddb02fde3cd4cde8f (patch) | |
| tree | 29d7b15932e10b0adafe2b27b3618e330805f75c /weed/remote_storage | |
| parent | 8f5170c1389f2d0bac75ca2f95a676a05283317b (diff) | |
| download | seaweedfs-13e45e16054d16e8d8161a8ddb02fde3cd4cde8f.tar.xz seaweedfs-13e45e16054d16e8d8161a8ddb02fde3cd4cde8f.zip | |
filer.remote.sync can work now
Diffstat (limited to 'weed/remote_storage')
| -rw-r--r-- | weed/remote_storage/remote_storage.go | 6 | ||||
| -rw-r--r-- | weed/remote_storage/s3/s3_storage_client.go | 94 |
2 files changed, 93 insertions, 7 deletions
diff --git a/weed/remote_storage/remote_storage.go b/weed/remote_storage/remote_storage.go index 06c089d7a..608d158ad 100644 --- a/weed/remote_storage/remote_storage.go +++ b/weed/remote_storage/remote_storage.go @@ -3,6 +3,7 @@ package remote_storage import ( "fmt" "github.com/chrislusf/seaweedfs/weed/pb/filer_pb" + "io" "strings" "sync" ) @@ -30,7 +31,10 @@ type VisitFunc func(dir string, name string, isDirectory bool, remoteEntry *file type RemoteStorageClient interface { Traverse(loc *filer_pb.RemoteStorageLocation, visitFn VisitFunc) error - ReadFile(loc *filer_pb.RemoteStorageLocation, offset int64, size int64) (data[]byte, err error) + ReadFile(loc *filer_pb.RemoteStorageLocation, offset int64, size int64) (data []byte, err error) + WriteFile(loc *filer_pb.RemoteStorageLocation, entry *filer_pb.Entry, reader io.Reader) (err error) + UpdateFileMetadata(loc *filer_pb.RemoteStorageLocation, entry *filer_pb.Entry) (err error) + DeleteFile(loc *filer_pb.RemoteStorageLocation) (err error) } type RemoteStorageClientMaker interface { diff --git a/weed/remote_storage/s3/s3_storage_client.go b/weed/remote_storage/s3/s3_storage_client.go index 2263054f3..316751227 100644 --- a/weed/remote_storage/s3/s3_storage_client.go +++ b/weed/remote_storage/s3/s3_storage_client.go @@ -8,9 +8,11 @@ import ( "github.com/aws/aws-sdk-go/service/s3" "github.com/aws/aws-sdk-go/service/s3/s3iface" "github.com/aws/aws-sdk-go/service/s3/s3manager" + "github.com/chrislusf/seaweedfs/weed/filer" "github.com/chrislusf/seaweedfs/weed/pb/filer_pb" "github.com/chrislusf/seaweedfs/weed/remote_storage" "github.com/chrislusf/seaweedfs/weed/util" + "io" ) func init() { @@ -45,7 +47,9 @@ type s3RemoteStorageClient struct { conn s3iface.S3API } -func (s s3RemoteStorageClient) Traverse(remote *filer_pb.RemoteStorageLocation, visitFn remote_storage.VisitFunc) (err error) { +var _ = remote_storage.RemoteStorageClient(&s3RemoteStorageClient{}) + +func (s *s3RemoteStorageClient) Traverse(remote *filer_pb.RemoteStorageLocation, visitFn remote_storage.VisitFunc) (err error) { pathKey := remote.Path[1:] @@ -91,19 +95,19 @@ func (s s3RemoteStorageClient) Traverse(remote *filer_pb.RemoteStorageLocation, } return } -func (s s3RemoteStorageClient) ReadFile(loc *filer_pb.RemoteStorageLocation, offset int64, size int64) (data[]byte, err error) { +func (s *s3RemoteStorageClient) ReadFile(loc *filer_pb.RemoteStorageLocation, offset int64, size int64) (data []byte, err error) { downloader := s3manager.NewDownloaderWithClient(s.conn, func(u *s3manager.Downloader) { u.PartSize = int64(4 * 1024 * 1024) u.Concurrency = 1 }) - + dataSlice := make([]byte, int(size)) writerAt := aws.NewWriteAtBuffer(dataSlice) _, err = downloader.Download(writerAt, &s3.GetObjectInput{ - Bucket: aws.String(loc.Bucket), - Key: aws.String(loc.Path[1:]), - Range: aws.String(fmt.Sprintf("bytes=%d-%d", offset, offset+size-1)), + Bucket: aws.String(loc.Bucket), + Key: aws.String(loc.Path[1:]), + Range: aws.String(fmt.Sprintf("bytes=%d-%d", offset, offset+size-1)), }) if err != nil { return nil, fmt.Errorf("failed to download file %s%s: %v", loc.Bucket, loc.Path, err) @@ -111,3 +115,81 @@ func (s s3RemoteStorageClient) ReadFile(loc *filer_pb.RemoteStorageLocation, off return writerAt.Bytes(), nil } + +func (s *s3RemoteStorageClient) WriteFile(loc *filer_pb.RemoteStorageLocation, entry *filer_pb.Entry, reader io.Reader) (err error) { + + fileSize := int64(filer.FileSize(entry)) + + partSize := int64(8 * 1024 * 1024) // The minimum/default allowed part size is 5MB + for partSize*1000 < fileSize { + partSize *= 4 + } + + // Create an uploader with the session and custom options + uploader := s3manager.NewUploaderWithClient(s.conn, func(u *s3manager.Uploader) { + u.PartSize = partSize + u.Concurrency = 5 + }) + + // process tagging + tags := "" + for k, v := range entry.Extended { + if len(tags) > 0 { + tags = tags + "&" + } + tags = tags + k + "=" + string(v) + } + + // Upload the file to S3. + _, err = uploader.Upload(&s3manager.UploadInput{ + Bucket: aws.String(loc.Bucket), + Key: aws.String(loc.Path[1:]), + Body: reader, + ACL: aws.String("private"), + ServerSideEncryption: aws.String("AES256"), + StorageClass: aws.String("STANDARD_IA"), + Tagging: aws.String(tags), + }) + + //in case it fails to upload + if err != nil { + return fmt.Errorf("upload to s3 %s/%s%s: %v", loc.Name, loc.Bucket, loc.Path, err) + } + + return nil +} + +func toTagging(attributes map[string][]byte) *s3.Tagging { + tagging := &s3.Tagging{} + for k, v := range attributes { + tagging.TagSet = append(tagging.TagSet, &s3.Tag{ + Key: aws.String(k), + Value: aws.String(string(v)), + }) + } + return tagging +} + +func (s *s3RemoteStorageClient) UpdateFileMetadata(loc *filer_pb.RemoteStorageLocation, entry *filer_pb.Entry) (err error) { + tagging := toTagging(entry.Extended) + if len(tagging.TagSet) > 0 { + _, err = s.conn.PutObjectTagging(&s3.PutObjectTaggingInput{ + Bucket: aws.String(loc.Bucket), + Key: aws.String(loc.Path[1:]), + Tagging: toTagging(entry.Extended), + }) + } else { + _, err = s.conn.DeleteObjectTagging(&s3.DeleteObjectTaggingInput{ + Bucket: aws.String(loc.Bucket), + Key: aws.String(loc.Path[1:]), + }) + } + return +} +func (s *s3RemoteStorageClient) DeleteFile(loc *filer_pb.RemoteStorageLocation) (err error) { + _, err = s.conn.DeleteObject(&s3.DeleteObjectInput{ + Bucket: aws.String(loc.Bucket), + Key: aws.String(loc.Path[1:]), + }) + return +} |
