aboutsummaryrefslogtreecommitdiff
path: root/weed/s3api/bucket_metadata.go
diff options
context:
space:
mode:
authorLHHDZ <changlin.shi@ly.com>2022-09-30 03:29:01 +0800
committerGitHub <noreply@github.com>2022-09-29 12:29:01 -0700
commit3de1e1978091e9675c9d54655dcde0c7dea9d222 (patch)
tree86d5b68876c9e964e9ff3ae6b05cef66aa62abf5 /weed/s3api/bucket_metadata.go
parent5e9039d728221b69dc30010e73c3a0a4e7c7e7e8 (diff)
downloadseaweedfs-3de1e1978091e9675c9d54655dcde0c7dea9d222.tar.xz
seaweedfs-3de1e1978091e9675c9d54655dcde0c7dea9d222.zip
s3: sync bucket info from filer (#3759)
Diffstat (limited to 'weed/s3api/bucket_metadata.go')
-rw-r--r--weed/s3api/bucket_metadata.go213
1 files changed, 213 insertions, 0 deletions
diff --git a/weed/s3api/bucket_metadata.go b/weed/s3api/bucket_metadata.go
new file mode 100644
index 000000000..1b9b09981
--- /dev/null
+++ b/weed/s3api/bucket_metadata.go
@@ -0,0 +1,213 @@
+package s3api
+
+import (
+ "bytes"
+ "encoding/json"
+ "github.com/aws/aws-sdk-go/private/protocol/json/jsonutil"
+ "github.com/aws/aws-sdk-go/service/s3"
+ "github.com/seaweedfs/seaweedfs/weed/glog"
+ "github.com/seaweedfs/seaweedfs/weed/pb/filer_pb"
+ "github.com/seaweedfs/seaweedfs/weed/s3api/s3_constants"
+
+ //"github.com/seaweedfs/seaweedfs/weed/s3api"
+ "github.com/seaweedfs/seaweedfs/weed/s3api/s3err"
+ "github.com/seaweedfs/seaweedfs/weed/util"
+ "math"
+ "sync"
+)
+
+var loadBucketMetadataFromFiler = func(r *BucketRegistry, bucketName string) (*BucketMetaData, error) {
+ entry, err := filer_pb.GetEntry(r.s3a, util.NewFullPath(r.s3a.option.BucketsPath, bucketName))
+ if err != nil {
+ return nil, err
+ }
+
+ return buildBucketMetadata(entry), nil
+}
+
+type BucketMetaData struct {
+ _ struct{} `type:"structure"`
+
+ Name string
+
+ //By default, when another AWS account uploads an object to S3 bucket,
+ //that account (the object writer) owns the object, has access to it, and
+ //can grant other users access to it through ACLs. You can use Object Ownership
+ //to change this default behavior so that ACLs are disabled and you, as the
+ //bucket owner, automatically own every object in your bucket.
+ ObjectOwnership string
+
+ // Container for the bucket owner's display name and ID.
+ Owner *s3.Owner `type:"structure"`
+
+ // A list of grants for access controls.
+ Acl []*s3.Grant `locationName:"AccessControlList" locationNameList:"Grant" type:"list"`
+}
+
+type BucketRegistry struct {
+ metadataCache map[string]*BucketMetaData
+ metadataCacheLock sync.RWMutex
+
+ notFound map[string]struct{}
+ notFoundLock sync.RWMutex
+ s3a *S3ApiServer
+}
+
+func NewBucketRegistry(s3a *S3ApiServer) *BucketRegistry {
+ br := &BucketRegistry{
+ metadataCache: make(map[string]*BucketMetaData),
+ notFound: make(map[string]struct{}),
+ s3a: s3a,
+ }
+ err := br.init()
+ if err != nil {
+ glog.Fatal("init bucket registry failed", err)
+ return nil
+ }
+ return br
+}
+
+func (r *BucketRegistry) init() error {
+ err := filer_pb.List(r.s3a, r.s3a.option.BucketsPath, "", func(entry *filer_pb.Entry, isLast bool) error {
+ r.LoadBucketMetadata(entry)
+ return nil
+ }, "", false, math.MaxUint32)
+ return err
+}
+
+func (r *BucketRegistry) LoadBucketMetadata(entry *filer_pb.Entry) {
+ bucketMetadata := buildBucketMetadata(entry)
+ r.metadataCacheLock.Lock()
+ defer r.metadataCacheLock.Unlock()
+ r.metadataCache[entry.Name] = bucketMetadata
+}
+
+func buildBucketMetadata(entry *filer_pb.Entry) *BucketMetaData {
+ entryJson, _ := json.Marshal(entry)
+ glog.V(3).Infof("build bucket metadata,entry=%s", entryJson)
+ bucketMetadata := &BucketMetaData{
+ Name: entry.Name,
+
+ //Default ownership: OwnershipBucketOwnerEnforced, which means Acl is disabled
+ ObjectOwnership: s3_constants.OwnershipBucketOwnerEnforced,
+
+ // Default owner: `AccountAdmin`
+ Owner: &s3.Owner{
+ ID: &AccountAdmin.Id,
+ DisplayName: &AccountAdmin.Name,
+ },
+ }
+ if entry.Extended != nil {
+ //ownership control
+ ownership, ok := entry.Extended[s3_constants.ExtOwnershipKey]
+ if ok {
+ ownership := string(ownership)
+ valid := s3_constants.ValidateOwnership(ownership)
+ if valid {
+ bucketMetadata.ObjectOwnership = ownership
+ } else {
+ glog.Warningf("Invalid ownership: %s, bucket: %s", ownership, bucketMetadata.Name)
+ }
+ }
+
+ //access control policy
+ acpBytes, ok := entry.Extended[s3_constants.ExtAcpKey]
+ if ok {
+ var acp s3.AccessControlPolicy
+ err := jsonutil.UnmarshalJSON(&acp, bytes.NewReader(acpBytes))
+ if err == nil {
+ //validate owner
+ if acp.Owner != nil && acp.Owner.ID != nil {
+ bucketMetadata.Owner = acp.Owner
+ } else {
+ glog.Warningf("bucket ownerId is empty! bucket: %s", bucketMetadata.Name)
+ }
+
+ //acl
+ bucketMetadata.Acl = acp.Grants
+ } else {
+ glog.Warningf("Unmarshal ACP: %s(%v), bucket: %s", string(acpBytes), err, bucketMetadata.Name)
+ }
+ }
+ }
+ return bucketMetadata
+}
+
+func (r *BucketRegistry) RemoveBucketMetadata(entry *filer_pb.Entry) {
+ r.removeMetadataCache(entry.Name)
+ r.unMarkNotFound(entry.Name)
+}
+
+func (r *BucketRegistry) GetBucketMetadata(bucketName string) (*BucketMetaData, s3err.ErrorCode) {
+ r.metadataCacheLock.RLock()
+ bucketMetadata, ok := r.metadataCache[bucketName]
+ r.metadataCacheLock.RUnlock()
+ if ok {
+ return bucketMetadata, s3err.ErrNone
+ }
+
+ r.notFoundLock.RLock()
+ _, ok = r.notFound[bucketName]
+ r.notFoundLock.RUnlock()
+ if ok {
+ return nil, s3err.ErrNoSuchBucket
+ }
+
+ bucketMetadata, errCode := r.LoadBucketMetadataFromFiler(bucketName)
+ if errCode != s3err.ErrNone {
+ return nil, errCode
+ }
+
+ r.setMetadataCache(bucketMetadata)
+ r.unMarkNotFound(bucketName)
+ return bucketMetadata, s3err.ErrNone
+}
+
+func (r *BucketRegistry) LoadBucketMetadataFromFiler(bucketName string) (*BucketMetaData, s3err.ErrorCode) {
+ r.notFoundLock.Lock()
+ defer r.notFoundLock.Unlock()
+
+ //check if already exists
+ r.metadataCacheLock.RLock()
+ bucketMetaData, ok := r.metadataCache[bucketName]
+ r.metadataCacheLock.RUnlock()
+ if ok {
+ return bucketMetaData, s3err.ErrNone
+ }
+
+ //if not exists, load from filer
+ bucketMetadata, err := loadBucketMetadataFromFiler(r, bucketName)
+ if err != nil {
+ if err == filer_pb.ErrNotFound {
+ // The bucket doesn't actually exist and should no longer loaded from the filer
+ r.notFound[bucketName] = struct{}{}
+ return nil, s3err.ErrNoSuchBucket
+ }
+ return nil, s3err.ErrInternalError
+ }
+ return bucketMetadata, s3err.ErrNone
+}
+
+func (r *BucketRegistry) setMetadataCache(metadata *BucketMetaData) {
+ r.metadataCacheLock.Lock()
+ defer r.metadataCacheLock.Unlock()
+ r.metadataCache[metadata.Name] = metadata
+}
+
+func (r *BucketRegistry) removeMetadataCache(bucket string) {
+ r.metadataCacheLock.Lock()
+ defer r.metadataCacheLock.Unlock()
+ delete(r.metadataCache, bucket)
+}
+
+func (r *BucketRegistry) markNotFound(bucket string) {
+ r.notFoundLock.Lock()
+ defer r.notFoundLock.Unlock()
+ r.notFound[bucket] = struct{}{}
+}
+
+func (r *BucketRegistry) unMarkNotFound(bucket string) {
+ r.notFoundLock.Lock()
+ defer r.notFoundLock.Unlock()
+ delete(r.notFound, bucket)
+}