diff options
| author | chrislu <chris.lu@gmail.com> | 2024-03-07 10:42:29 -0800 |
|---|---|---|
| committer | chrislu <chris.lu@gmail.com> | 2024-03-07 10:42:29 -0800 |
| commit | 8f79bb398780a5b0e746c2be4160e74dcc65b287 (patch) | |
| tree | 34f45830dda7740d125f5f0c33763a1cc019e0af /weed | |
| parent | fe03b1b5228d421f2b9e6903a728aa76866166f1 (diff) | |
| parent | b544a69550d6793dc61eafccc9b850d43bee5d32 (diff) | |
| download | seaweedfs-8f79bb398780a5b0e746c2be4160e74dcc65b287.tar.xz seaweedfs-8f79bb398780a5b0e746c2be4160e74dcc65b287.zip | |
Merge branch 'master' into mq-subscribe
Diffstat (limited to 'weed')
| -rw-r--r-- | weed/command/filer_backup.go | 3 | ||||
| -rw-r--r-- | weed/replication/sink/s3sink/s3_sink.go | 121 | ||||
| -rw-r--r-- | weed/s3api/s3_constants/header.go | 1 | ||||
| -rw-r--r-- | weed/s3api/s3api_objects_list_handlers.go | 67 | ||||
| -rw-r--r-- | weed/server/filer_grpc_server_admin.go | 3 | ||||
| -rw-r--r-- | weed/shell/command_fs_log.go | 53 | ||||
| -rw-r--r-- | weed/shell/shell_liner.go | 19 | ||||
| -rw-r--r-- | weed/storage/store.go | 7 |
8 files changed, 187 insertions, 87 deletions
diff --git a/weed/command/filer_backup.go b/weed/command/filer_backup.go index 691b1c0b5..4aeab60f2 100644 --- a/weed/command/filer_backup.go +++ b/weed/command/filer_backup.go @@ -85,8 +85,7 @@ const ( func doFilerBackup(grpcDialOption grpc.DialOption, backupOption *FilerBackupOptions, clientId int32, clientEpoch int32) error { // find data sink - config := util.GetViper() - dataSink := findSink(config) + dataSink := findSink(util.GetViper()) if dataSink == nil { return fmt.Errorf("no data sink configured in replication.toml") } diff --git a/weed/replication/sink/s3sink/s3_sink.go b/weed/replication/sink/s3sink/s3_sink.go index a032b58e8..276ea30d6 100644 --- a/weed/replication/sink/s3sink/s3_sink.go +++ b/weed/replication/sink/s3sink/s3_sink.go @@ -8,6 +8,8 @@ import ( "github.com/aws/aws-sdk-go/service/s3" "github.com/aws/aws-sdk-go/service/s3/s3iface" "github.com/aws/aws-sdk-go/service/s3/s3manager" + "github.com/seaweedfs/seaweedfs/weed/s3api/s3_constants" + "strconv" "strings" "github.com/seaweedfs/seaweedfs/weed/filer" @@ -19,14 +21,20 @@ import ( ) type S3Sink struct { - conn s3iface.S3API - region string - bucket string - dir string - endpoint string - acl string - filerSource *source.FilerSource - isIncremental bool + conn s3iface.S3API + filerSource *source.FilerSource + isIncremental bool + keepPartSize bool + s3DisableContentMD5Validation bool + s3ForcePathStyle bool + uploaderConcurrency int + uploaderMaxUploadParts int + uploaderPartSizeMb int + region string + bucket string + dir string + endpoint string + acl string } func init() { @@ -46,21 +54,49 @@ func (s3sink *S3Sink) IsIncremental() bool { } func (s3sink *S3Sink) Initialize(configuration util.Configuration, prefix string) error { - glog.V(0).Infof("sink.s3.region: %v", configuration.GetString(prefix+"region")) - glog.V(0).Infof("sink.s3.bucket: %v", configuration.GetString(prefix+"bucket")) - glog.V(0).Infof("sink.s3.directory: %v", configuration.GetString(prefix+"directory")) - glog.V(0).Infof("sink.s3.endpoint: %v", configuration.GetString(prefix+"endpoint")) - glog.V(0).Infof("sink.s3.acl: %v", configuration.GetString(prefix+"acl")) - glog.V(0).Infof("sink.s3.is_incremental: %v", configuration.GetString(prefix+"is_incremental")) + configuration.SetDefault(prefix+"region", "us-east-2") + configuration.SetDefault(prefix+"directory", "/") + configuration.SetDefault(prefix+"keep_part_size", true) + configuration.SetDefault(prefix+"uploader_max_upload_parts", 1000) + configuration.SetDefault(prefix+"uploader_part_size_mb", 8) + configuration.SetDefault(prefix+"uploader_concurrency", 8) + configuration.SetDefault(prefix+"s3_disable_content_md5_validation", true) + configuration.SetDefault(prefix+"s3_force_path_style", true) + s3sink.region = configuration.GetString(prefix + "region") + s3sink.bucket = configuration.GetString(prefix + "bucket") + s3sink.dir = configuration.GetString(prefix + "directory") + s3sink.endpoint = configuration.GetString(prefix + "endpoint") + s3sink.acl = configuration.GetString(prefix + "acl") s3sink.isIncremental = configuration.GetBool(prefix + "is_incremental") + s3sink.keepPartSize = configuration.GetBool(prefix + "keep_part_size") + s3sink.s3DisableContentMD5Validation = configuration.GetBool(prefix + "s3_disable_content_md5_validation") + s3sink.s3ForcePathStyle = configuration.GetBool(prefix + "s3_force_path_style") + s3sink.uploaderMaxUploadParts = configuration.GetInt(prefix + "uploader_max_upload_parts") + s3sink.uploaderPartSizeMb = configuration.GetInt(prefix + "uploader_part_size") + s3sink.uploaderConcurrency = configuration.GetInt(prefix + "uploader_concurrency") + + glog.V(0).Infof("sink.s3.region: %v", s3sink.region) + glog.V(0).Infof("sink.s3.bucket: %v", s3sink.bucket) + glog.V(0).Infof("sink.s3.directory: %v", s3sink.dir) + glog.V(0).Infof("sink.s3.endpoint: %v", s3sink.endpoint) + glog.V(0).Infof("sink.s3.acl: %v", s3sink.acl) + glog.V(0).Infof("sink.s3.is_incremental: %v", s3sink.isIncremental) + glog.V(0).Infof("sink.s3.s3_disable_content_md5_validation: %v", s3sink.s3DisableContentMD5Validation) + glog.V(0).Infof("sink.s3.s3_force_path_style: %v", s3sink.s3ForcePathStyle) + glog.V(0).Infof("sink.s3.keep_part_size: %v", s3sink.keepPartSize) + if s3sink.uploaderMaxUploadParts > s3manager.MaxUploadParts { + s3sink.uploaderMaxUploadParts = s3manager.MaxUploadParts + glog.Warningf("uploader_max_upload_parts is greater than the maximum number of parts allowed when uploading multiple parts to Amazon S3") + glog.V(0).Infof("sink.s3.uploader_max_upload_parts: %v => %v", s3sink.uploaderMaxUploadParts, s3manager.MaxUploadParts) + } else { + glog.V(0).Infof("sink.s3.uploader_max_upload_parts: %v", s3sink.uploaderMaxUploadParts) + } + glog.V(0).Infof("sink.s3.uploader_part_size_mb: %v", s3sink.uploaderPartSizeMb) + glog.V(0).Infof("sink.s3.uploader_concurrency: %v", s3sink.uploaderConcurrency) + return s3sink.initialize( configuration.GetString(prefix+"aws_access_key_id"), configuration.GetString(prefix+"aws_secret_access_key"), - configuration.GetString(prefix+"region"), - configuration.GetString(prefix+"bucket"), - configuration.GetString(prefix+"directory"), - configuration.GetString(prefix+"endpoint"), - configuration.GetString(prefix+"acl"), ) } @@ -68,18 +104,12 @@ func (s3sink *S3Sink) SetSourceFiler(s *source.FilerSource) { s3sink.filerSource = s } -func (s3sink *S3Sink) initialize(awsAccessKeyId, awsSecretAccessKey, region, bucket, dir, endpoint, acl string) error { - s3sink.region = region - s3sink.bucket = bucket - s3sink.dir = dir - s3sink.endpoint = endpoint - s3sink.acl = acl - +func (s3sink *S3Sink) initialize(awsAccessKeyId, awsSecretAccessKey string) error { config := &aws.Config{ Region: aws.String(s3sink.region), Endpoint: aws.String(s3sink.endpoint), - S3ForcePathStyle: aws.Bool(true), - S3DisableContentMD5Validation: aws.Bool(true), + S3DisableContentMD5Validation: aws.Bool(s3sink.s3DisableContentMD5Validation), + S3ForcePathStyle: aws.Bool(s3sink.s3ForcePathStyle), } if awsAccessKeyId != "" && awsSecretAccessKey != "" { config.Credentials = credentials.NewStaticCredentials(awsAccessKeyId, awsSecretAccessKey, "") @@ -128,19 +158,26 @@ func (s3sink *S3Sink) CreateEntry(key string, entry *filer_pb.Entry, signatures reader := filer.NewFileReader(s3sink.filerSource, entry) - fileSize := int64(filer.FileSize(entry)) - - partSize := int64(8 * 1024 * 1024) // The minimum/default allowed part size is 5MB - for partSize*1000 < fileSize { - partSize *= 4 - } - // Create an uploader with the session and custom options uploader := s3manager.NewUploaderWithClient(s3sink.conn, func(u *s3manager.Uploader) { - u.PartSize = partSize - u.Concurrency = 8 + u.PartSize = int64(s3sink.uploaderPartSizeMb * 1024 * 1024) + u.Concurrency = s3sink.uploaderConcurrency + u.MaxUploadParts = s3sink.uploaderMaxUploadParts }) + if s3sink.keepPartSize { + switch chunkCount := len(entry.Chunks); { + case chunkCount > 1: + if firstChunkSize := int64(entry.Chunks[0].Size); firstChunkSize > s3manager.MinUploadPartSize { + uploader.PartSize = firstChunkSize + } + default: + uploader.PartSize = 0 + } + } + if _, ok := entry.Extended[s3_constants.AmzUserMetaMtime]; !ok { + entry.Extended[s3_constants.AmzUserMetaMtime] = []byte(strconv.FormatInt(entry.Attributes.Mtime, 10)) + } // process tagging tags := "" if true { @@ -153,14 +190,18 @@ func (s3sink *S3Sink) CreateEntry(key string, entry *filer_pb.Entry, signatures } // Upload the file to S3. - _, err = uploader.Upload(&s3manager.UploadInput{ + uploadInput := s3manager.UploadInput{ Bucket: aws.String(s3sink.bucket), Key: aws.String(key), Body: reader, Tagging: aws.String(tags), - }) + } + if len(entry.Attributes.Md5) > 0 { + uploadInput.ContentMD5 = aws.String(fmt.Sprintf("%x", entry.Attributes.Md5)) + } + _, err = uploader.Upload(&uploadInput) - return + return err } diff --git a/weed/s3api/s3_constants/header.go b/weed/s3api/s3_constants/header.go index 5037f4691..30a878ccb 100644 --- a/weed/s3api/s3_constants/header.go +++ b/weed/s3api/s3_constants/header.go @@ -30,6 +30,7 @@ const ( // S3 user-defined metadata AmzUserMetaPrefix = "X-Amz-Meta-" AmzUserMetaDirective = "X-Amz-Metadata-Directive" + AmzUserMetaMtime = "X-Amz-Meta-Mtime" // S3 object tagging AmzObjectTagging = "X-Amz-Tagging" diff --git a/weed/s3api/s3api_objects_list_handlers.go b/weed/s3api/s3api_objects_list_handlers.go index 78b77a044..b2ad915b9 100644 --- a/weed/s3api/s3api_objects_list_handlers.go +++ b/weed/s3api/s3api_objects_list_handlers.go @@ -47,10 +47,6 @@ func (s3a *S3ApiServer) ListObjectsV2Handler(w http.ResponseWriter, r *http.Requ s3err.WriteErrorResponse(w, r, s3err.ErrInvalidMaxKeys) return } - if delimiter != "" && delimiter != "/" { - s3err.WriteErrorResponse(w, r, s3err.ErrNotImplemented) - return - } marker := continuationToken if continuationToken == "" { @@ -103,10 +99,6 @@ func (s3a *S3ApiServer) ListObjectsV1Handler(w http.ResponseWriter, r *http.Requ s3err.WriteErrorResponse(w, r, s3err.ErrInvalidMaxKeys) return } - if delimiter != "" && delimiter != "/" { - s3err.WriteErrorResponse(w, r, s3err.ErrNotImplemented) - return - } response, err := s3a.listFilerEntries(bucket, originalPrefix, maxKeys, marker, delimiter) @@ -171,22 +163,51 @@ func (s3a *S3ApiServer) listFilerEntries(bucket string, originalPrefix string, m cursor.maxKeys-- } } else { - storageClass := "STANDARD" - if v, ok := entry.Extended[s3_constants.AmzStorageClass]; ok { - storageClass = string(v) + var delimiterFound bool + if delimiter != "" { + // keys that contain the same string between the prefix and the first occurrence of the delimiter are grouped together as a commonPrefix. + // extract the string between the prefix and the delimiter and add it to the commonPrefixes if it's unique. + fullPath := fmt.Sprintf("%s/%s", dir, entry.Name)[len(bucketPrefix):] + delimitedPath := strings.SplitN(fullPath, delimiter, 2) + if len(delimitedPath) == 2 { + + // S3 clients expect the delimited prefix to contain the delimiter. + delimitedPrefix := delimitedPath[0] + delimiter + + for i := range commonPrefixes { + if commonPrefixes[i].Prefix == delimitedPrefix { + delimiterFound = true + break + } + } + + if !delimiterFound { + commonPrefixes = append(commonPrefixes, PrefixEntry{ + Prefix: delimitedPrefix, + }) + cursor.maxKeys-- + delimiterFound = true + } + } + } + if !delimiterFound { + storageClass := "STANDARD" + if v, ok := entry.Extended[s3_constants.AmzStorageClass]; ok { + storageClass = string(v) + } + contents = append(contents, ListEntry{ + Key: fmt.Sprintf("%s/%s", dir, entry.Name)[len(bucketPrefix):], + LastModified: time.Unix(entry.Attributes.Mtime, 0).UTC(), + ETag: "\"" + filer.ETag(entry) + "\"", + Size: int64(filer.FileSize(entry)), + Owner: CanonicalUser{ + ID: fmt.Sprintf("%x", entry.Attributes.Uid), + DisplayName: entry.Attributes.UserName, + }, + StorageClass: StorageClass(storageClass), + }) + cursor.maxKeys-- } - contents = append(contents, ListEntry{ - Key: fmt.Sprintf("%s/%s", dir, entry.Name)[len(bucketPrefix):], - LastModified: time.Unix(entry.Attributes.Mtime, 0).UTC(), - ETag: "\"" + filer.ETag(entry) + "\"", - Size: int64(filer.FileSize(entry)), - Owner: CanonicalUser{ - ID: fmt.Sprintf("%x", entry.Attributes.Uid), - DisplayName: entry.Attributes.UserName, - }, - StorageClass: StorageClass(storageClass), - }) - cursor.maxKeys-- } }) if doErr != nil { diff --git a/weed/server/filer_grpc_server_admin.go b/weed/server/filer_grpc_server_admin.go index 8a58e287c..b4caaf4e2 100644 --- a/weed/server/filer_grpc_server_admin.go +++ b/weed/server/filer_grpc_server_admin.go @@ -84,8 +84,6 @@ func (fs *FilerServer) Ping(ctx context.Context, req *filer_pb.PingRequest) (res func (fs *FilerServer) GetFilerConfiguration(ctx context.Context, req *filer_pb.GetFilerConfigurationRequest) (resp *filer_pb.GetFilerConfigurationResponse, err error) { - clusterId, _ := fs.filer.Store.KvGet(context.Background(), []byte("clusterId")) - t := &filer_pb.GetFilerConfigurationResponse{ Masters: fs.option.Masters.GetInstancesAsStrings(), Collection: fs.option.Collection, @@ -97,7 +95,6 @@ func (fs *FilerServer) GetFilerConfiguration(ctx context.Context, req *filer_pb. MetricsAddress: fs.metricsAddress, MetricsIntervalSec: int32(fs.metricsIntervalSec), Version: util.Version(), - ClusterId: string(clusterId), FilerGroup: fs.option.FilerGroup, } diff --git a/weed/shell/command_fs_log.go b/weed/shell/command_fs_log.go new file mode 100644 index 000000000..5567f76e6 --- /dev/null +++ b/weed/shell/command_fs_log.go @@ -0,0 +1,53 @@ +package shell + +import ( + "flag" + "fmt" + "github.com/seaweedfs/seaweedfs/weed/filer" + "github.com/seaweedfs/seaweedfs/weed/pb/filer_pb" + "io" + "time" +) + +func init() { + Commands = append(Commands, &commandFsLogPurge{}) +} + +type commandFsLogPurge struct { +} + +func (c *commandFsLogPurge) Name() string { + return "fs.log.purge" +} + +func (c *commandFsLogPurge) Help() string { + return `purge filer logs + + fs.log.purge [-v] [-modifyDayAgo 365] +` +} + +func (c *commandFsLogPurge) Do(args []string, commandEnv *CommandEnv, writer io.Writer) (err error) { + fsLogPurgeCommand := flag.NewFlagSet(c.Name(), flag.ContinueOnError) + daysAgo := fsLogPurgeCommand.Uint("daysAgo", 365, "purge logs older than N days") + verbose := fsLogPurgeCommand.Bool("v", false, "verbose mode") + + if err = fsLogPurgeCommand.Parse(args); err != nil { + return err + } + + modificationTimeAgo := time.Now().Add(-time.Hour * 24 * time.Duration(*daysAgo)).Unix() + err = filer_pb.ReadDirAllEntries(commandEnv, filer.SystemLogDir, "", func(entry *filer_pb.Entry, isLast bool) error { + if entry.Attributes.Mtime > modificationTimeAgo { + return nil + } + if errDel := filer_pb.Remove(commandEnv, filer.SystemLogDir, entry.Name, true, true, true, false, nil); errDel != nil { + return errDel + } + if *verbose { + fmt.Fprintf(writer, "delete %s\n", entry.Name) + } + return nil + }) + return err +} diff --git a/weed/shell/shell_liner.go b/weed/shell/shell_liner.go index 28672c17c..20add302a 100644 --- a/weed/shell/shell_liner.go +++ b/weed/shell/shell_liner.go @@ -5,7 +5,6 @@ import ( "fmt" "github.com/seaweedfs/seaweedfs/weed/cluster" "github.com/seaweedfs/seaweedfs/weed/pb" - "github.com/seaweedfs/seaweedfs/weed/pb/filer_pb" "github.com/seaweedfs/seaweedfs/weed/pb/master_pb" "github.com/seaweedfs/seaweedfs/weed/util" "github.com/seaweedfs/seaweedfs/weed/util/grace" @@ -74,24 +73,6 @@ func RunShell(options ShellOptions) { fmt.Println() } - if commandEnv.option.FilerAddress != "" { - commandEnv.WithFilerClient(false, func(filerClient filer_pb.SeaweedFilerClient) error { - resp, err := filerClient.GetFilerConfiguration(context.Background(), &filer_pb.GetFilerConfigurationRequest{}) - if err != nil { - return err - } - if resp.ClusterId != "" { - fmt.Printf(` ---- -Free Monitoring Data URL: -https://cloud.seaweedfs.com/ui/%s ---- -`, resp.ClusterId) - } - return nil - }) - } - for { cmd, err := line.Prompt("> ") if err != nil { diff --git a/weed/storage/store.go b/weed/storage/store.go index d290909f1..782eb5b79 100644 --- a/weed/storage/store.go +++ b/weed/storage/store.go @@ -251,6 +251,7 @@ func (s *Store) CollectHeartbeat() *master_pb.Heartbeat { maxVolumeCounts := make(map[string]uint32) var maxFileKey NeedleId collectionVolumeSize := make(map[string]int64) + collectionVolumeDeletedBytes := make(map[string]int64) collectionVolumeReadOnlyCount := make(map[string]map[string]uint8) for _, location := range s.Locations { var deleteVids []needle.VolumeId @@ -283,9 +284,11 @@ func (s *Store) CollectHeartbeat() *master_pb.Heartbeat { if _, exist := collectionVolumeSize[v.Collection]; !exist { collectionVolumeSize[v.Collection] = 0 + collectionVolumeDeletedBytes[v.Collection] = 0 } if !shouldDeleteVolume { collectionVolumeSize[v.Collection] += int64(volumeMessage.Size) + collectionVolumeDeletedBytes[v.Collection] += int64(volumeMessage.DeletedByteCount) } else { collectionVolumeSize[v.Collection] -= int64(volumeMessage.Size) if collectionVolumeSize[v.Collection] <= 0 { @@ -342,6 +345,10 @@ func (s *Store) CollectHeartbeat() *master_pb.Heartbeat { stats.VolumeServerDiskSizeGauge.WithLabelValues(col, "normal").Set(float64(size)) } + for col, deletedBytes := range collectionVolumeDeletedBytes{ + stats.VolumeServerDiskSizeGauge.WithLabelValues(col, "deleted_bytes").Set(float64(deletedBytes)) + } + for col, types := range collectionVolumeReadOnlyCount { for t, count := range types { stats.VolumeServerReadOnlyVolumeGauge.WithLabelValues(col, t).Set(float64(count)) |
