aboutsummaryrefslogtreecommitdiff
path: root/weed
diff options
context:
space:
mode:
authorchrislu <chris.lu@gmail.com>2024-03-07 10:42:29 -0800
committerchrislu <chris.lu@gmail.com>2024-03-07 10:42:29 -0800
commit8f79bb398780a5b0e746c2be4160e74dcc65b287 (patch)
tree34f45830dda7740d125f5f0c33763a1cc019e0af /weed
parentfe03b1b5228d421f2b9e6903a728aa76866166f1 (diff)
parentb544a69550d6793dc61eafccc9b850d43bee5d32 (diff)
downloadseaweedfs-8f79bb398780a5b0e746c2be4160e74dcc65b287.tar.xz
seaweedfs-8f79bb398780a5b0e746c2be4160e74dcc65b287.zip
Merge branch 'master' into mq-subscribe
Diffstat (limited to 'weed')
-rw-r--r--weed/command/filer_backup.go3
-rw-r--r--weed/replication/sink/s3sink/s3_sink.go121
-rw-r--r--weed/s3api/s3_constants/header.go1
-rw-r--r--weed/s3api/s3api_objects_list_handlers.go67
-rw-r--r--weed/server/filer_grpc_server_admin.go3
-rw-r--r--weed/shell/command_fs_log.go53
-rw-r--r--weed/shell/shell_liner.go19
-rw-r--r--weed/storage/store.go7
8 files changed, 187 insertions, 87 deletions
diff --git a/weed/command/filer_backup.go b/weed/command/filer_backup.go
index 691b1c0b5..4aeab60f2 100644
--- a/weed/command/filer_backup.go
+++ b/weed/command/filer_backup.go
@@ -85,8 +85,7 @@ const (
func doFilerBackup(grpcDialOption grpc.DialOption, backupOption *FilerBackupOptions, clientId int32, clientEpoch int32) error {
// find data sink
- config := util.GetViper()
- dataSink := findSink(config)
+ dataSink := findSink(util.GetViper())
if dataSink == nil {
return fmt.Errorf("no data sink configured in replication.toml")
}
diff --git a/weed/replication/sink/s3sink/s3_sink.go b/weed/replication/sink/s3sink/s3_sink.go
index a032b58e8..276ea30d6 100644
--- a/weed/replication/sink/s3sink/s3_sink.go
+++ b/weed/replication/sink/s3sink/s3_sink.go
@@ -8,6 +8,8 @@ import (
"github.com/aws/aws-sdk-go/service/s3"
"github.com/aws/aws-sdk-go/service/s3/s3iface"
"github.com/aws/aws-sdk-go/service/s3/s3manager"
+ "github.com/seaweedfs/seaweedfs/weed/s3api/s3_constants"
+ "strconv"
"strings"
"github.com/seaweedfs/seaweedfs/weed/filer"
@@ -19,14 +21,20 @@ import (
)
type S3Sink struct {
- conn s3iface.S3API
- region string
- bucket string
- dir string
- endpoint string
- acl string
- filerSource *source.FilerSource
- isIncremental bool
+ conn s3iface.S3API
+ filerSource *source.FilerSource
+ isIncremental bool
+ keepPartSize bool
+ s3DisableContentMD5Validation bool
+ s3ForcePathStyle bool
+ uploaderConcurrency int
+ uploaderMaxUploadParts int
+ uploaderPartSizeMb int
+ region string
+ bucket string
+ dir string
+ endpoint string
+ acl string
}
func init() {
@@ -46,21 +54,49 @@ func (s3sink *S3Sink) IsIncremental() bool {
}
func (s3sink *S3Sink) Initialize(configuration util.Configuration, prefix string) error {
- glog.V(0).Infof("sink.s3.region: %v", configuration.GetString(prefix+"region"))
- glog.V(0).Infof("sink.s3.bucket: %v", configuration.GetString(prefix+"bucket"))
- glog.V(0).Infof("sink.s3.directory: %v", configuration.GetString(prefix+"directory"))
- glog.V(0).Infof("sink.s3.endpoint: %v", configuration.GetString(prefix+"endpoint"))
- glog.V(0).Infof("sink.s3.acl: %v", configuration.GetString(prefix+"acl"))
- glog.V(0).Infof("sink.s3.is_incremental: %v", configuration.GetString(prefix+"is_incremental"))
+ configuration.SetDefault(prefix+"region", "us-east-2")
+ configuration.SetDefault(prefix+"directory", "/")
+ configuration.SetDefault(prefix+"keep_part_size", true)
+ configuration.SetDefault(prefix+"uploader_max_upload_parts", 1000)
+ configuration.SetDefault(prefix+"uploader_part_size_mb", 8)
+ configuration.SetDefault(prefix+"uploader_concurrency", 8)
+ configuration.SetDefault(prefix+"s3_disable_content_md5_validation", true)
+ configuration.SetDefault(prefix+"s3_force_path_style", true)
+ s3sink.region = configuration.GetString(prefix + "region")
+ s3sink.bucket = configuration.GetString(prefix + "bucket")
+ s3sink.dir = configuration.GetString(prefix + "directory")
+ s3sink.endpoint = configuration.GetString(prefix + "endpoint")
+ s3sink.acl = configuration.GetString(prefix + "acl")
s3sink.isIncremental = configuration.GetBool(prefix + "is_incremental")
+ s3sink.keepPartSize = configuration.GetBool(prefix + "keep_part_size")
+ s3sink.s3DisableContentMD5Validation = configuration.GetBool(prefix + "s3_disable_content_md5_validation")
+ s3sink.s3ForcePathStyle = configuration.GetBool(prefix + "s3_force_path_style")
+ s3sink.uploaderMaxUploadParts = configuration.GetInt(prefix + "uploader_max_upload_parts")
+ s3sink.uploaderPartSizeMb = configuration.GetInt(prefix + "uploader_part_size")
+ s3sink.uploaderConcurrency = configuration.GetInt(prefix + "uploader_concurrency")
+
+ glog.V(0).Infof("sink.s3.region: %v", s3sink.region)
+ glog.V(0).Infof("sink.s3.bucket: %v", s3sink.bucket)
+ glog.V(0).Infof("sink.s3.directory: %v", s3sink.dir)
+ glog.V(0).Infof("sink.s3.endpoint: %v", s3sink.endpoint)
+ glog.V(0).Infof("sink.s3.acl: %v", s3sink.acl)
+ glog.V(0).Infof("sink.s3.is_incremental: %v", s3sink.isIncremental)
+ glog.V(0).Infof("sink.s3.s3_disable_content_md5_validation: %v", s3sink.s3DisableContentMD5Validation)
+ glog.V(0).Infof("sink.s3.s3_force_path_style: %v", s3sink.s3ForcePathStyle)
+ glog.V(0).Infof("sink.s3.keep_part_size: %v", s3sink.keepPartSize)
+ if s3sink.uploaderMaxUploadParts > s3manager.MaxUploadParts {
+ s3sink.uploaderMaxUploadParts = s3manager.MaxUploadParts
+ glog.Warningf("uploader_max_upload_parts is greater than the maximum number of parts allowed when uploading multiple parts to Amazon S3")
+ glog.V(0).Infof("sink.s3.uploader_max_upload_parts: %v => %v", s3sink.uploaderMaxUploadParts, s3manager.MaxUploadParts)
+ } else {
+ glog.V(0).Infof("sink.s3.uploader_max_upload_parts: %v", s3sink.uploaderMaxUploadParts)
+ }
+ glog.V(0).Infof("sink.s3.uploader_part_size_mb: %v", s3sink.uploaderPartSizeMb)
+ glog.V(0).Infof("sink.s3.uploader_concurrency: %v", s3sink.uploaderConcurrency)
+
return s3sink.initialize(
configuration.GetString(prefix+"aws_access_key_id"),
configuration.GetString(prefix+"aws_secret_access_key"),
- configuration.GetString(prefix+"region"),
- configuration.GetString(prefix+"bucket"),
- configuration.GetString(prefix+"directory"),
- configuration.GetString(prefix+"endpoint"),
- configuration.GetString(prefix+"acl"),
)
}
@@ -68,18 +104,12 @@ func (s3sink *S3Sink) SetSourceFiler(s *source.FilerSource) {
s3sink.filerSource = s
}
-func (s3sink *S3Sink) initialize(awsAccessKeyId, awsSecretAccessKey, region, bucket, dir, endpoint, acl string) error {
- s3sink.region = region
- s3sink.bucket = bucket
- s3sink.dir = dir
- s3sink.endpoint = endpoint
- s3sink.acl = acl
-
+func (s3sink *S3Sink) initialize(awsAccessKeyId, awsSecretAccessKey string) error {
config := &aws.Config{
Region: aws.String(s3sink.region),
Endpoint: aws.String(s3sink.endpoint),
- S3ForcePathStyle: aws.Bool(true),
- S3DisableContentMD5Validation: aws.Bool(true),
+ S3DisableContentMD5Validation: aws.Bool(s3sink.s3DisableContentMD5Validation),
+ S3ForcePathStyle: aws.Bool(s3sink.s3ForcePathStyle),
}
if awsAccessKeyId != "" && awsSecretAccessKey != "" {
config.Credentials = credentials.NewStaticCredentials(awsAccessKeyId, awsSecretAccessKey, "")
@@ -128,19 +158,26 @@ func (s3sink *S3Sink) CreateEntry(key string, entry *filer_pb.Entry, signatures
reader := filer.NewFileReader(s3sink.filerSource, entry)
- fileSize := int64(filer.FileSize(entry))
-
- partSize := int64(8 * 1024 * 1024) // The minimum/default allowed part size is 5MB
- for partSize*1000 < fileSize {
- partSize *= 4
- }
-
// Create an uploader with the session and custom options
uploader := s3manager.NewUploaderWithClient(s3sink.conn, func(u *s3manager.Uploader) {
- u.PartSize = partSize
- u.Concurrency = 8
+ u.PartSize = int64(s3sink.uploaderPartSizeMb * 1024 * 1024)
+ u.Concurrency = s3sink.uploaderConcurrency
+ u.MaxUploadParts = s3sink.uploaderMaxUploadParts
})
+ if s3sink.keepPartSize {
+ switch chunkCount := len(entry.Chunks); {
+ case chunkCount > 1:
+ if firstChunkSize := int64(entry.Chunks[0].Size); firstChunkSize > s3manager.MinUploadPartSize {
+ uploader.PartSize = firstChunkSize
+ }
+ default:
+ uploader.PartSize = 0
+ }
+ }
+ if _, ok := entry.Extended[s3_constants.AmzUserMetaMtime]; !ok {
+ entry.Extended[s3_constants.AmzUserMetaMtime] = []byte(strconv.FormatInt(entry.Attributes.Mtime, 10))
+ }
// process tagging
tags := ""
if true {
@@ -153,14 +190,18 @@ func (s3sink *S3Sink) CreateEntry(key string, entry *filer_pb.Entry, signatures
}
// Upload the file to S3.
- _, err = uploader.Upload(&s3manager.UploadInput{
+ uploadInput := s3manager.UploadInput{
Bucket: aws.String(s3sink.bucket),
Key: aws.String(key),
Body: reader,
Tagging: aws.String(tags),
- })
+ }
+ if len(entry.Attributes.Md5) > 0 {
+ uploadInput.ContentMD5 = aws.String(fmt.Sprintf("%x", entry.Attributes.Md5))
+ }
+ _, err = uploader.Upload(&uploadInput)
- return
+ return err
}
diff --git a/weed/s3api/s3_constants/header.go b/weed/s3api/s3_constants/header.go
index 5037f4691..30a878ccb 100644
--- a/weed/s3api/s3_constants/header.go
+++ b/weed/s3api/s3_constants/header.go
@@ -30,6 +30,7 @@ const (
// S3 user-defined metadata
AmzUserMetaPrefix = "X-Amz-Meta-"
AmzUserMetaDirective = "X-Amz-Metadata-Directive"
+ AmzUserMetaMtime = "X-Amz-Meta-Mtime"
// S3 object tagging
AmzObjectTagging = "X-Amz-Tagging"
diff --git a/weed/s3api/s3api_objects_list_handlers.go b/weed/s3api/s3api_objects_list_handlers.go
index 78b77a044..b2ad915b9 100644
--- a/weed/s3api/s3api_objects_list_handlers.go
+++ b/weed/s3api/s3api_objects_list_handlers.go
@@ -47,10 +47,6 @@ func (s3a *S3ApiServer) ListObjectsV2Handler(w http.ResponseWriter, r *http.Requ
s3err.WriteErrorResponse(w, r, s3err.ErrInvalidMaxKeys)
return
}
- if delimiter != "" && delimiter != "/" {
- s3err.WriteErrorResponse(w, r, s3err.ErrNotImplemented)
- return
- }
marker := continuationToken
if continuationToken == "" {
@@ -103,10 +99,6 @@ func (s3a *S3ApiServer) ListObjectsV1Handler(w http.ResponseWriter, r *http.Requ
s3err.WriteErrorResponse(w, r, s3err.ErrInvalidMaxKeys)
return
}
- if delimiter != "" && delimiter != "/" {
- s3err.WriteErrorResponse(w, r, s3err.ErrNotImplemented)
- return
- }
response, err := s3a.listFilerEntries(bucket, originalPrefix, maxKeys, marker, delimiter)
@@ -171,22 +163,51 @@ func (s3a *S3ApiServer) listFilerEntries(bucket string, originalPrefix string, m
cursor.maxKeys--
}
} else {
- storageClass := "STANDARD"
- if v, ok := entry.Extended[s3_constants.AmzStorageClass]; ok {
- storageClass = string(v)
+ var delimiterFound bool
+ if delimiter != "" {
+ // keys that contain the same string between the prefix and the first occurrence of the delimiter are grouped together as a commonPrefix.
+ // extract the string between the prefix and the delimiter and add it to the commonPrefixes if it's unique.
+ fullPath := fmt.Sprintf("%s/%s", dir, entry.Name)[len(bucketPrefix):]
+ delimitedPath := strings.SplitN(fullPath, delimiter, 2)
+ if len(delimitedPath) == 2 {
+
+ // S3 clients expect the delimited prefix to contain the delimiter.
+ delimitedPrefix := delimitedPath[0] + delimiter
+
+ for i := range commonPrefixes {
+ if commonPrefixes[i].Prefix == delimitedPrefix {
+ delimiterFound = true
+ break
+ }
+ }
+
+ if !delimiterFound {
+ commonPrefixes = append(commonPrefixes, PrefixEntry{
+ Prefix: delimitedPrefix,
+ })
+ cursor.maxKeys--
+ delimiterFound = true
+ }
+ }
+ }
+ if !delimiterFound {
+ storageClass := "STANDARD"
+ if v, ok := entry.Extended[s3_constants.AmzStorageClass]; ok {
+ storageClass = string(v)
+ }
+ contents = append(contents, ListEntry{
+ Key: fmt.Sprintf("%s/%s", dir, entry.Name)[len(bucketPrefix):],
+ LastModified: time.Unix(entry.Attributes.Mtime, 0).UTC(),
+ ETag: "\"" + filer.ETag(entry) + "\"",
+ Size: int64(filer.FileSize(entry)),
+ Owner: CanonicalUser{
+ ID: fmt.Sprintf("%x", entry.Attributes.Uid),
+ DisplayName: entry.Attributes.UserName,
+ },
+ StorageClass: StorageClass(storageClass),
+ })
+ cursor.maxKeys--
}
- contents = append(contents, ListEntry{
- Key: fmt.Sprintf("%s/%s", dir, entry.Name)[len(bucketPrefix):],
- LastModified: time.Unix(entry.Attributes.Mtime, 0).UTC(),
- ETag: "\"" + filer.ETag(entry) + "\"",
- Size: int64(filer.FileSize(entry)),
- Owner: CanonicalUser{
- ID: fmt.Sprintf("%x", entry.Attributes.Uid),
- DisplayName: entry.Attributes.UserName,
- },
- StorageClass: StorageClass(storageClass),
- })
- cursor.maxKeys--
}
})
if doErr != nil {
diff --git a/weed/server/filer_grpc_server_admin.go b/weed/server/filer_grpc_server_admin.go
index 8a58e287c..b4caaf4e2 100644
--- a/weed/server/filer_grpc_server_admin.go
+++ b/weed/server/filer_grpc_server_admin.go
@@ -84,8 +84,6 @@ func (fs *FilerServer) Ping(ctx context.Context, req *filer_pb.PingRequest) (res
func (fs *FilerServer) GetFilerConfiguration(ctx context.Context, req *filer_pb.GetFilerConfigurationRequest) (resp *filer_pb.GetFilerConfigurationResponse, err error) {
- clusterId, _ := fs.filer.Store.KvGet(context.Background(), []byte("clusterId"))
-
t := &filer_pb.GetFilerConfigurationResponse{
Masters: fs.option.Masters.GetInstancesAsStrings(),
Collection: fs.option.Collection,
@@ -97,7 +95,6 @@ func (fs *FilerServer) GetFilerConfiguration(ctx context.Context, req *filer_pb.
MetricsAddress: fs.metricsAddress,
MetricsIntervalSec: int32(fs.metricsIntervalSec),
Version: util.Version(),
- ClusterId: string(clusterId),
FilerGroup: fs.option.FilerGroup,
}
diff --git a/weed/shell/command_fs_log.go b/weed/shell/command_fs_log.go
new file mode 100644
index 000000000..5567f76e6
--- /dev/null
+++ b/weed/shell/command_fs_log.go
@@ -0,0 +1,53 @@
+package shell
+
+import (
+ "flag"
+ "fmt"
+ "github.com/seaweedfs/seaweedfs/weed/filer"
+ "github.com/seaweedfs/seaweedfs/weed/pb/filer_pb"
+ "io"
+ "time"
+)
+
+func init() {
+ Commands = append(Commands, &commandFsLogPurge{})
+}
+
+type commandFsLogPurge struct {
+}
+
+func (c *commandFsLogPurge) Name() string {
+ return "fs.log.purge"
+}
+
+func (c *commandFsLogPurge) Help() string {
+ return `purge filer logs
+
+ fs.log.purge [-v] [-modifyDayAgo 365]
+`
+}
+
+func (c *commandFsLogPurge) Do(args []string, commandEnv *CommandEnv, writer io.Writer) (err error) {
+ fsLogPurgeCommand := flag.NewFlagSet(c.Name(), flag.ContinueOnError)
+ daysAgo := fsLogPurgeCommand.Uint("daysAgo", 365, "purge logs older than N days")
+ verbose := fsLogPurgeCommand.Bool("v", false, "verbose mode")
+
+ if err = fsLogPurgeCommand.Parse(args); err != nil {
+ return err
+ }
+
+ modificationTimeAgo := time.Now().Add(-time.Hour * 24 * time.Duration(*daysAgo)).Unix()
+ err = filer_pb.ReadDirAllEntries(commandEnv, filer.SystemLogDir, "", func(entry *filer_pb.Entry, isLast bool) error {
+ if entry.Attributes.Mtime > modificationTimeAgo {
+ return nil
+ }
+ if errDel := filer_pb.Remove(commandEnv, filer.SystemLogDir, entry.Name, true, true, true, false, nil); errDel != nil {
+ return errDel
+ }
+ if *verbose {
+ fmt.Fprintf(writer, "delete %s\n", entry.Name)
+ }
+ return nil
+ })
+ return err
+}
diff --git a/weed/shell/shell_liner.go b/weed/shell/shell_liner.go
index 28672c17c..20add302a 100644
--- a/weed/shell/shell_liner.go
+++ b/weed/shell/shell_liner.go
@@ -5,7 +5,6 @@ import (
"fmt"
"github.com/seaweedfs/seaweedfs/weed/cluster"
"github.com/seaweedfs/seaweedfs/weed/pb"
- "github.com/seaweedfs/seaweedfs/weed/pb/filer_pb"
"github.com/seaweedfs/seaweedfs/weed/pb/master_pb"
"github.com/seaweedfs/seaweedfs/weed/util"
"github.com/seaweedfs/seaweedfs/weed/util/grace"
@@ -74,24 +73,6 @@ func RunShell(options ShellOptions) {
fmt.Println()
}
- if commandEnv.option.FilerAddress != "" {
- commandEnv.WithFilerClient(false, func(filerClient filer_pb.SeaweedFilerClient) error {
- resp, err := filerClient.GetFilerConfiguration(context.Background(), &filer_pb.GetFilerConfigurationRequest{})
- if err != nil {
- return err
- }
- if resp.ClusterId != "" {
- fmt.Printf(`
----
-Free Monitoring Data URL:
-https://cloud.seaweedfs.com/ui/%s
----
-`, resp.ClusterId)
- }
- return nil
- })
- }
-
for {
cmd, err := line.Prompt("> ")
if err != nil {
diff --git a/weed/storage/store.go b/weed/storage/store.go
index d290909f1..782eb5b79 100644
--- a/weed/storage/store.go
+++ b/weed/storage/store.go
@@ -251,6 +251,7 @@ func (s *Store) CollectHeartbeat() *master_pb.Heartbeat {
maxVolumeCounts := make(map[string]uint32)
var maxFileKey NeedleId
collectionVolumeSize := make(map[string]int64)
+ collectionVolumeDeletedBytes := make(map[string]int64)
collectionVolumeReadOnlyCount := make(map[string]map[string]uint8)
for _, location := range s.Locations {
var deleteVids []needle.VolumeId
@@ -283,9 +284,11 @@ func (s *Store) CollectHeartbeat() *master_pb.Heartbeat {
if _, exist := collectionVolumeSize[v.Collection]; !exist {
collectionVolumeSize[v.Collection] = 0
+ collectionVolumeDeletedBytes[v.Collection] = 0
}
if !shouldDeleteVolume {
collectionVolumeSize[v.Collection] += int64(volumeMessage.Size)
+ collectionVolumeDeletedBytes[v.Collection] += int64(volumeMessage.DeletedByteCount)
} else {
collectionVolumeSize[v.Collection] -= int64(volumeMessage.Size)
if collectionVolumeSize[v.Collection] <= 0 {
@@ -342,6 +345,10 @@ func (s *Store) CollectHeartbeat() *master_pb.Heartbeat {
stats.VolumeServerDiskSizeGauge.WithLabelValues(col, "normal").Set(float64(size))
}
+ for col, deletedBytes := range collectionVolumeDeletedBytes{
+ stats.VolumeServerDiskSizeGauge.WithLabelValues(col, "deleted_bytes").Set(float64(deletedBytes))
+ }
+
for col, types := range collectionVolumeReadOnlyCount {
for t, count := range types {
stats.VolumeServerReadOnlyVolumeGauge.WithLabelValues(col, t).Set(float64(count))