aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--docker/Makefile3
-rw-r--r--docker/compose/local-filer-backup-compose.yml54
-rw-r--r--weed/command/filer_backup.go3
-rw-r--r--weed/replication/sink/s3sink/s3_sink.go121
-rw-r--r--weed/s3api/s3_constants/header.go1
5 files changed, 140 insertions, 42 deletions
diff --git a/docker/Makefile b/docker/Makefile
index 89a896f77..6d6c88190 100644
--- a/docker/Makefile
+++ b/docker/Makefile
@@ -81,6 +81,9 @@ cluster: build
2mount: build
docker compose -f compose/local-sync-mount-compose.yml -p seaweedfs up
+filer_backup: build
+ docker compose -f compose/local-filer-backup-compose.yml -p seaweedfs up
+
hashicorp_raft: build
docker compose -f compose/local-hashicorp-raft-compose.yml -p seaweedfs up
diff --git a/docker/compose/local-filer-backup-compose.yml b/docker/compose/local-filer-backup-compose.yml
new file mode 100644
index 000000000..3e4baf5fa
--- /dev/null
+++ b/docker/compose/local-filer-backup-compose.yml
@@ -0,0 +1,54 @@
+version: '3.9'
+
+services:
+ server-left:
+ image: chrislusf/seaweedfs:local
+ command: "-v=0 server -ip=server-left -filer -filer.maxMB 5 -s3 -s3.config=/etc/seaweedfs/s3.json -volume.max=0 -master.volumeSizeLimitMB=1024 -volume.preStopSeconds=1"
+ volumes:
+ - ./s3.json:/etc/seaweedfs/s3.json
+ healthcheck:
+ test: [ "CMD", "curl", "--fail", "-I", "http://localhost:9333/cluster/healthz" ]
+ interval: 3s
+ start_period: 15s
+ timeout: 30s
+ server-right:
+ image: chrislusf/seaweedfs:local
+ command: "-v=0 server -ip=server-right -filer -filer.maxMB 64 -s3 -s3.config=/etc/seaweedfs/s3.json -volume.max=0 -master.volumeSizeLimitMB=1024 -volume.preStopSeconds=1"
+ volumes:
+ - ./s3.json:/etc/seaweedfs/s3.json
+ healthcheck:
+ test: [ "CMD", "curl", "--fail", "-I", "http://localhost:9333/cluster/healthz" ]
+ interval: 3s
+ start_period: 15s
+ timeout: 30s
+ filer-backup:
+ image: chrislusf/seaweedfs:local
+ command: "-v=0 filer.backup -debug -doDeleteFiles=False -filer server-left:8888"
+ volumes:
+ - ./replication.toml:/etc/seaweedfs/replication.toml
+ environment:
+ WEED_SINK_LOCAL_INCREMENTAL_ENABLED: "false"
+ WEED_SINK_S3_ENABLED: "true"
+ WEED_SINK_S3_BUCKET: "backup"
+ WEED_SINK_S3_ENDPOINT: "http://server-right:8333"
+ WEED_SINK_S3_DIRECTORY: "/"
+ WEED_SINK_S3_AWS_ACCESS_KEY_ID: "some_access_key1"
+ WEED_SINK_S3_AWS_SECRET_ACCESS_KEY: "some_secret_key1"
+ WEED_SINK_S3_S3_DISABLE_CONTENT_MD5_VALIDATION: "false"
+ WEED_SINK_S3_UPLOADER_PART_SIZE_MB: "5"
+ WEED_SINK_S3_KEEP_PART_SIZE: "false"
+ depends_on:
+ server-left:
+ condition: service_healthy
+ server-right:
+ condition: service_healthy
+ minio-warp:
+ image: minio/warp
+ command: 'mixed --duration 5s --obj.size=6mb --md5 --objects 10 --concurrent 2'
+ restart: on-failure
+ environment:
+ WARP_HOST: "server-left:8333"
+ WARP_ACCESS_KEY: "some_access_key1"
+ WARP_SECRET_KEY: "some_secret_key1"
+ depends_on:
+ - filer-backup \ No newline at end of file
diff --git a/weed/command/filer_backup.go b/weed/command/filer_backup.go
index 691b1c0b5..4aeab60f2 100644
--- a/weed/command/filer_backup.go
+++ b/weed/command/filer_backup.go
@@ -85,8 +85,7 @@ const (
func doFilerBackup(grpcDialOption grpc.DialOption, backupOption *FilerBackupOptions, clientId int32, clientEpoch int32) error {
// find data sink
- config := util.GetViper()
- dataSink := findSink(config)
+ dataSink := findSink(util.GetViper())
if dataSink == nil {
return fmt.Errorf("no data sink configured in replication.toml")
}
diff --git a/weed/replication/sink/s3sink/s3_sink.go b/weed/replication/sink/s3sink/s3_sink.go
index a032b58e8..276ea30d6 100644
--- a/weed/replication/sink/s3sink/s3_sink.go
+++ b/weed/replication/sink/s3sink/s3_sink.go
@@ -8,6 +8,8 @@ import (
"github.com/aws/aws-sdk-go/service/s3"
"github.com/aws/aws-sdk-go/service/s3/s3iface"
"github.com/aws/aws-sdk-go/service/s3/s3manager"
+ "github.com/seaweedfs/seaweedfs/weed/s3api/s3_constants"
+ "strconv"
"strings"
"github.com/seaweedfs/seaweedfs/weed/filer"
@@ -19,14 +21,20 @@ import (
)
type S3Sink struct {
- conn s3iface.S3API
- region string
- bucket string
- dir string
- endpoint string
- acl string
- filerSource *source.FilerSource
- isIncremental bool
+ conn s3iface.S3API
+ filerSource *source.FilerSource
+ isIncremental bool
+ keepPartSize bool
+ s3DisableContentMD5Validation bool
+ s3ForcePathStyle bool
+ uploaderConcurrency int
+ uploaderMaxUploadParts int
+ uploaderPartSizeMb int
+ region string
+ bucket string
+ dir string
+ endpoint string
+ acl string
}
func init() {
@@ -46,21 +54,49 @@ func (s3sink *S3Sink) IsIncremental() bool {
}
func (s3sink *S3Sink) Initialize(configuration util.Configuration, prefix string) error {
- glog.V(0).Infof("sink.s3.region: %v", configuration.GetString(prefix+"region"))
- glog.V(0).Infof("sink.s3.bucket: %v", configuration.GetString(prefix+"bucket"))
- glog.V(0).Infof("sink.s3.directory: %v", configuration.GetString(prefix+"directory"))
- glog.V(0).Infof("sink.s3.endpoint: %v", configuration.GetString(prefix+"endpoint"))
- glog.V(0).Infof("sink.s3.acl: %v", configuration.GetString(prefix+"acl"))
- glog.V(0).Infof("sink.s3.is_incremental: %v", configuration.GetString(prefix+"is_incremental"))
+ configuration.SetDefault(prefix+"region", "us-east-2")
+ configuration.SetDefault(prefix+"directory", "/")
+ configuration.SetDefault(prefix+"keep_part_size", true)
+ configuration.SetDefault(prefix+"uploader_max_upload_parts", 1000)
+ configuration.SetDefault(prefix+"uploader_part_size_mb", 8)
+ configuration.SetDefault(prefix+"uploader_concurrency", 8)
+ configuration.SetDefault(prefix+"s3_disable_content_md5_validation", true)
+ configuration.SetDefault(prefix+"s3_force_path_style", true)
+ s3sink.region = configuration.GetString(prefix + "region")
+ s3sink.bucket = configuration.GetString(prefix + "bucket")
+ s3sink.dir = configuration.GetString(prefix + "directory")
+ s3sink.endpoint = configuration.GetString(prefix + "endpoint")
+ s3sink.acl = configuration.GetString(prefix + "acl")
s3sink.isIncremental = configuration.GetBool(prefix + "is_incremental")
+ s3sink.keepPartSize = configuration.GetBool(prefix + "keep_part_size")
+ s3sink.s3DisableContentMD5Validation = configuration.GetBool(prefix + "s3_disable_content_md5_validation")
+ s3sink.s3ForcePathStyle = configuration.GetBool(prefix + "s3_force_path_style")
+ s3sink.uploaderMaxUploadParts = configuration.GetInt(prefix + "uploader_max_upload_parts")
+ s3sink.uploaderPartSizeMb = configuration.GetInt(prefix + "uploader_part_size")
+ s3sink.uploaderConcurrency = configuration.GetInt(prefix + "uploader_concurrency")
+
+ glog.V(0).Infof("sink.s3.region: %v", s3sink.region)
+ glog.V(0).Infof("sink.s3.bucket: %v", s3sink.bucket)
+ glog.V(0).Infof("sink.s3.directory: %v", s3sink.dir)
+ glog.V(0).Infof("sink.s3.endpoint: %v", s3sink.endpoint)
+ glog.V(0).Infof("sink.s3.acl: %v", s3sink.acl)
+ glog.V(0).Infof("sink.s3.is_incremental: %v", s3sink.isIncremental)
+ glog.V(0).Infof("sink.s3.s3_disable_content_md5_validation: %v", s3sink.s3DisableContentMD5Validation)
+ glog.V(0).Infof("sink.s3.s3_force_path_style: %v", s3sink.s3ForcePathStyle)
+ glog.V(0).Infof("sink.s3.keep_part_size: %v", s3sink.keepPartSize)
+ if s3sink.uploaderMaxUploadParts > s3manager.MaxUploadParts {
+ s3sink.uploaderMaxUploadParts = s3manager.MaxUploadParts
+ glog.Warningf("uploader_max_upload_parts is greater than the maximum number of parts allowed when uploading multiple parts to Amazon S3")
+ glog.V(0).Infof("sink.s3.uploader_max_upload_parts: %v => %v", s3sink.uploaderMaxUploadParts, s3manager.MaxUploadParts)
+ } else {
+ glog.V(0).Infof("sink.s3.uploader_max_upload_parts: %v", s3sink.uploaderMaxUploadParts)
+ }
+ glog.V(0).Infof("sink.s3.uploader_part_size_mb: %v", s3sink.uploaderPartSizeMb)
+ glog.V(0).Infof("sink.s3.uploader_concurrency: %v", s3sink.uploaderConcurrency)
+
return s3sink.initialize(
configuration.GetString(prefix+"aws_access_key_id"),
configuration.GetString(prefix+"aws_secret_access_key"),
- configuration.GetString(prefix+"region"),
- configuration.GetString(prefix+"bucket"),
- configuration.GetString(prefix+"directory"),
- configuration.GetString(prefix+"endpoint"),
- configuration.GetString(prefix+"acl"),
)
}
@@ -68,18 +104,12 @@ func (s3sink *S3Sink) SetSourceFiler(s *source.FilerSource) {
s3sink.filerSource = s
}
-func (s3sink *S3Sink) initialize(awsAccessKeyId, awsSecretAccessKey, region, bucket, dir, endpoint, acl string) error {
- s3sink.region = region
- s3sink.bucket = bucket
- s3sink.dir = dir
- s3sink.endpoint = endpoint
- s3sink.acl = acl
-
+func (s3sink *S3Sink) initialize(awsAccessKeyId, awsSecretAccessKey string) error {
config := &aws.Config{
Region: aws.String(s3sink.region),
Endpoint: aws.String(s3sink.endpoint),
- S3ForcePathStyle: aws.Bool(true),
- S3DisableContentMD5Validation: aws.Bool(true),
+ S3DisableContentMD5Validation: aws.Bool(s3sink.s3DisableContentMD5Validation),
+ S3ForcePathStyle: aws.Bool(s3sink.s3ForcePathStyle),
}
if awsAccessKeyId != "" && awsSecretAccessKey != "" {
config.Credentials = credentials.NewStaticCredentials(awsAccessKeyId, awsSecretAccessKey, "")
@@ -128,19 +158,26 @@ func (s3sink *S3Sink) CreateEntry(key string, entry *filer_pb.Entry, signatures
reader := filer.NewFileReader(s3sink.filerSource, entry)
- fileSize := int64(filer.FileSize(entry))
-
- partSize := int64(8 * 1024 * 1024) // The minimum/default allowed part size is 5MB
- for partSize*1000 < fileSize {
- partSize *= 4
- }
-
// Create an uploader with the session and custom options
uploader := s3manager.NewUploaderWithClient(s3sink.conn, func(u *s3manager.Uploader) {
- u.PartSize = partSize
- u.Concurrency = 8
+ u.PartSize = int64(s3sink.uploaderPartSizeMb * 1024 * 1024)
+ u.Concurrency = s3sink.uploaderConcurrency
+ u.MaxUploadParts = s3sink.uploaderMaxUploadParts
})
+ if s3sink.keepPartSize {
+ switch chunkCount := len(entry.Chunks); {
+ case chunkCount > 1:
+ if firstChunkSize := int64(entry.Chunks[0].Size); firstChunkSize > s3manager.MinUploadPartSize {
+ uploader.PartSize = firstChunkSize
+ }
+ default:
+ uploader.PartSize = 0
+ }
+ }
+ if _, ok := entry.Extended[s3_constants.AmzUserMetaMtime]; !ok {
+ entry.Extended[s3_constants.AmzUserMetaMtime] = []byte(strconv.FormatInt(entry.Attributes.Mtime, 10))
+ }
// process tagging
tags := ""
if true {
@@ -153,14 +190,18 @@ func (s3sink *S3Sink) CreateEntry(key string, entry *filer_pb.Entry, signatures
}
// Upload the file to S3.
- _, err = uploader.Upload(&s3manager.UploadInput{
+ uploadInput := s3manager.UploadInput{
Bucket: aws.String(s3sink.bucket),
Key: aws.String(key),
Body: reader,
Tagging: aws.String(tags),
- })
+ }
+ if len(entry.Attributes.Md5) > 0 {
+ uploadInput.ContentMD5 = aws.String(fmt.Sprintf("%x", entry.Attributes.Md5))
+ }
+ _, err = uploader.Upload(&uploadInput)
- return
+ return err
}
diff --git a/weed/s3api/s3_constants/header.go b/weed/s3api/s3_constants/header.go
index 5037f4691..30a878ccb 100644
--- a/weed/s3api/s3_constants/header.go
+++ b/weed/s3api/s3_constants/header.go
@@ -30,6 +30,7 @@ const (
// S3 user-defined metadata
AmzUserMetaPrefix = "X-Amz-Meta-"
AmzUserMetaDirective = "X-Amz-Metadata-Directive"
+ AmzUserMetaMtime = "X-Amz-Meta-Mtime"
// S3 object tagging
AmzObjectTagging = "X-Amz-Tagging"