diff options
Diffstat (limited to 'weed/s3api/bucket_metadata.go')
| -rw-r--r-- | weed/s3api/bucket_metadata.go | 213 |
1 files changed, 213 insertions, 0 deletions
diff --git a/weed/s3api/bucket_metadata.go b/weed/s3api/bucket_metadata.go new file mode 100644 index 000000000..1b9b09981 --- /dev/null +++ b/weed/s3api/bucket_metadata.go @@ -0,0 +1,213 @@ +package s3api + +import ( + "bytes" + "encoding/json" + "github.com/aws/aws-sdk-go/private/protocol/json/jsonutil" + "github.com/aws/aws-sdk-go/service/s3" + "github.com/seaweedfs/seaweedfs/weed/glog" + "github.com/seaweedfs/seaweedfs/weed/pb/filer_pb" + "github.com/seaweedfs/seaweedfs/weed/s3api/s3_constants" + + //"github.com/seaweedfs/seaweedfs/weed/s3api" + "github.com/seaweedfs/seaweedfs/weed/s3api/s3err" + "github.com/seaweedfs/seaweedfs/weed/util" + "math" + "sync" +) + +var loadBucketMetadataFromFiler = func(r *BucketRegistry, bucketName string) (*BucketMetaData, error) { + entry, err := filer_pb.GetEntry(r.s3a, util.NewFullPath(r.s3a.option.BucketsPath, bucketName)) + if err != nil { + return nil, err + } + + return buildBucketMetadata(entry), nil +} + +type BucketMetaData struct { + _ struct{} `type:"structure"` + + Name string + + //By default, when another AWS account uploads an object to S3 bucket, + //that account (the object writer) owns the object, has access to it, and + //can grant other users access to it through ACLs. You can use Object Ownership + //to change this default behavior so that ACLs are disabled and you, as the + //bucket owner, automatically own every object in your bucket. + ObjectOwnership string + + // Container for the bucket owner's display name and ID. + Owner *s3.Owner `type:"structure"` + + // A list of grants for access controls. + Acl []*s3.Grant `locationName:"AccessControlList" locationNameList:"Grant" type:"list"` +} + +type BucketRegistry struct { + metadataCache map[string]*BucketMetaData + metadataCacheLock sync.RWMutex + + notFound map[string]struct{} + notFoundLock sync.RWMutex + s3a *S3ApiServer +} + +func NewBucketRegistry(s3a *S3ApiServer) *BucketRegistry { + br := &BucketRegistry{ + metadataCache: make(map[string]*BucketMetaData), + notFound: make(map[string]struct{}), + s3a: s3a, + } + err := br.init() + if err != nil { + glog.Fatal("init bucket registry failed", err) + return nil + } + return br +} + +func (r *BucketRegistry) init() error { + err := filer_pb.List(r.s3a, r.s3a.option.BucketsPath, "", func(entry *filer_pb.Entry, isLast bool) error { + r.LoadBucketMetadata(entry) + return nil + }, "", false, math.MaxUint32) + return err +} + +func (r *BucketRegistry) LoadBucketMetadata(entry *filer_pb.Entry) { + bucketMetadata := buildBucketMetadata(entry) + r.metadataCacheLock.Lock() + defer r.metadataCacheLock.Unlock() + r.metadataCache[entry.Name] = bucketMetadata +} + +func buildBucketMetadata(entry *filer_pb.Entry) *BucketMetaData { + entryJson, _ := json.Marshal(entry) + glog.V(3).Infof("build bucket metadata,entry=%s", entryJson) + bucketMetadata := &BucketMetaData{ + Name: entry.Name, + + //Default ownership: OwnershipBucketOwnerEnforced, which means Acl is disabled + ObjectOwnership: s3_constants.OwnershipBucketOwnerEnforced, + + // Default owner: `AccountAdmin` + Owner: &s3.Owner{ + ID: &AccountAdmin.Id, + DisplayName: &AccountAdmin.Name, + }, + } + if entry.Extended != nil { + //ownership control + ownership, ok := entry.Extended[s3_constants.ExtOwnershipKey] + if ok { + ownership := string(ownership) + valid := s3_constants.ValidateOwnership(ownership) + if valid { + bucketMetadata.ObjectOwnership = ownership + } else { + glog.Warningf("Invalid ownership: %s, bucket: %s", ownership, bucketMetadata.Name) + } + } + + //access control policy + acpBytes, ok := entry.Extended[s3_constants.ExtAcpKey] + if ok { + var acp s3.AccessControlPolicy + err := jsonutil.UnmarshalJSON(&acp, bytes.NewReader(acpBytes)) + if err == nil { + //validate owner + if acp.Owner != nil && acp.Owner.ID != nil { + bucketMetadata.Owner = acp.Owner + } else { + glog.Warningf("bucket ownerId is empty! bucket: %s", bucketMetadata.Name) + } + + //acl + bucketMetadata.Acl = acp.Grants + } else { + glog.Warningf("Unmarshal ACP: %s(%v), bucket: %s", string(acpBytes), err, bucketMetadata.Name) + } + } + } + return bucketMetadata +} + +func (r *BucketRegistry) RemoveBucketMetadata(entry *filer_pb.Entry) { + r.removeMetadataCache(entry.Name) + r.unMarkNotFound(entry.Name) +} + +func (r *BucketRegistry) GetBucketMetadata(bucketName string) (*BucketMetaData, s3err.ErrorCode) { + r.metadataCacheLock.RLock() + bucketMetadata, ok := r.metadataCache[bucketName] + r.metadataCacheLock.RUnlock() + if ok { + return bucketMetadata, s3err.ErrNone + } + + r.notFoundLock.RLock() + _, ok = r.notFound[bucketName] + r.notFoundLock.RUnlock() + if ok { + return nil, s3err.ErrNoSuchBucket + } + + bucketMetadata, errCode := r.LoadBucketMetadataFromFiler(bucketName) + if errCode != s3err.ErrNone { + return nil, errCode + } + + r.setMetadataCache(bucketMetadata) + r.unMarkNotFound(bucketName) + return bucketMetadata, s3err.ErrNone +} + +func (r *BucketRegistry) LoadBucketMetadataFromFiler(bucketName string) (*BucketMetaData, s3err.ErrorCode) { + r.notFoundLock.Lock() + defer r.notFoundLock.Unlock() + + //check if already exists + r.metadataCacheLock.RLock() + bucketMetaData, ok := r.metadataCache[bucketName] + r.metadataCacheLock.RUnlock() + if ok { + return bucketMetaData, s3err.ErrNone + } + + //if not exists, load from filer + bucketMetadata, err := loadBucketMetadataFromFiler(r, bucketName) + if err != nil { + if err == filer_pb.ErrNotFound { + // The bucket doesn't actually exist and should no longer loaded from the filer + r.notFound[bucketName] = struct{}{} + return nil, s3err.ErrNoSuchBucket + } + return nil, s3err.ErrInternalError + } + return bucketMetadata, s3err.ErrNone +} + +func (r *BucketRegistry) setMetadataCache(metadata *BucketMetaData) { + r.metadataCacheLock.Lock() + defer r.metadataCacheLock.Unlock() + r.metadataCache[metadata.Name] = metadata +} + +func (r *BucketRegistry) removeMetadataCache(bucket string) { + r.metadataCacheLock.Lock() + defer r.metadataCacheLock.Unlock() + delete(r.metadataCache, bucket) +} + +func (r *BucketRegistry) markNotFound(bucket string) { + r.notFoundLock.Lock() + defer r.notFoundLock.Unlock() + r.notFound[bucket] = struct{}{} +} + +func (r *BucketRegistry) unMarkNotFound(bucket string) { + r.notFoundLock.Lock() + defer r.notFoundLock.Unlock() + delete(r.notFound, bucket) +} |
