diff options
Diffstat (limited to 'weed/s3api/s3api_bucket_config.go')
| -rw-r--r-- | weed/s3api/s3api_bucket_config.go | 495 |
1 files changed, 421 insertions, 74 deletions
diff --git a/weed/s3api/s3api_bucket_config.go b/weed/s3api/s3api_bucket_config.go index e1e7403d8..61cddc45a 100644 --- a/weed/s3api/s3api_bucket_config.go +++ b/weed/s3api/s3api_bucket_config.go @@ -14,6 +14,7 @@ import ( "google.golang.org/protobuf/proto" "github.com/seaweedfs/seaweedfs/weed/glog" + "github.com/seaweedfs/seaweedfs/weed/kms" "github.com/seaweedfs/seaweedfs/weed/pb/filer_pb" "github.com/seaweedfs/seaweedfs/weed/pb/s3_pb" "github.com/seaweedfs/seaweedfs/weed/s3api/cors" @@ -31,26 +32,213 @@ type BucketConfig struct { IsPublicRead bool // Cached flag to avoid JSON parsing on every request CORS *cors.CORSConfiguration ObjectLockConfig *ObjectLockConfiguration // Cached parsed Object Lock configuration + KMSKeyCache *BucketKMSCache // Per-bucket KMS key cache for SSE-KMS operations LastModified time.Time Entry *filer_pb.Entry } +// BucketKMSCache represents per-bucket KMS key caching for SSE-KMS operations +// This provides better isolation and automatic cleanup compared to global caching +type BucketKMSCache struct { + cache map[string]*BucketKMSCacheEntry // Key: contextHash, Value: cached data key + mutex sync.RWMutex + bucket string // Bucket name for logging/debugging + lastTTL time.Duration // TTL used for cache entries (typically 1 hour) +} + +// BucketKMSCacheEntry represents a single cached KMS data key +type BucketKMSCacheEntry struct { + DataKey interface{} // Could be *kms.GenerateDataKeyResponse or similar + ExpiresAt time.Time + KeyID string + ContextHash string // Hash of encryption context for cache validation +} + +// NewBucketKMSCache creates a new per-bucket KMS key cache +func NewBucketKMSCache(bucketName string, ttl time.Duration) *BucketKMSCache { + return &BucketKMSCache{ + cache: make(map[string]*BucketKMSCacheEntry), + bucket: bucketName, + lastTTL: ttl, + } +} + +// Get retrieves a cached KMS data key if it exists and hasn't expired +func (bkc *BucketKMSCache) Get(contextHash string) (*BucketKMSCacheEntry, bool) { + if bkc == nil { + return nil, false + } + + bkc.mutex.RLock() + defer bkc.mutex.RUnlock() + + entry, exists := bkc.cache[contextHash] + if !exists { + return nil, false + } + + // Check if entry has expired + if time.Now().After(entry.ExpiresAt) { + return nil, false + } + + return entry, true +} + +// Set stores a KMS data key in the cache +func (bkc *BucketKMSCache) Set(contextHash, keyID string, dataKey interface{}, ttl time.Duration) { + if bkc == nil { + return + } + + bkc.mutex.Lock() + defer bkc.mutex.Unlock() + + bkc.cache[contextHash] = &BucketKMSCacheEntry{ + DataKey: dataKey, + ExpiresAt: time.Now().Add(ttl), + KeyID: keyID, + ContextHash: contextHash, + } + bkc.lastTTL = ttl +} + +// CleanupExpired removes expired entries from the cache +func (bkc *BucketKMSCache) CleanupExpired() int { + if bkc == nil { + return 0 + } + + bkc.mutex.Lock() + defer bkc.mutex.Unlock() + + now := time.Now() + expiredCount := 0 + + for key, entry := range bkc.cache { + if now.After(entry.ExpiresAt) { + // Clear sensitive data before removing from cache + bkc.clearSensitiveData(entry) + delete(bkc.cache, key) + expiredCount++ + } + } + + return expiredCount +} + +// Size returns the current number of cached entries +func (bkc *BucketKMSCache) Size() int { + if bkc == nil { + return 0 + } + + bkc.mutex.RLock() + defer bkc.mutex.RUnlock() + + return len(bkc.cache) +} + +// clearSensitiveData securely clears sensitive data from a cache entry +func (bkc *BucketKMSCache) clearSensitiveData(entry *BucketKMSCacheEntry) { + if dataKeyResp, ok := entry.DataKey.(*kms.GenerateDataKeyResponse); ok { + // Zero out the plaintext data key to prevent it from lingering in memory + if dataKeyResp.Plaintext != nil { + for i := range dataKeyResp.Plaintext { + dataKeyResp.Plaintext[i] = 0 + } + dataKeyResp.Plaintext = nil + } + } +} + +// Clear clears all cached KMS entries, securely zeroing sensitive data first +func (bkc *BucketKMSCache) Clear() { + if bkc == nil { + return + } + + bkc.mutex.Lock() + defer bkc.mutex.Unlock() + + // Clear sensitive data from all entries before deletion + for _, entry := range bkc.cache { + bkc.clearSensitiveData(entry) + } + + // Clear the cache map + bkc.cache = make(map[string]*BucketKMSCacheEntry) +} + // BucketConfigCache provides caching for bucket configurations // Cache entries are automatically updated/invalidated through metadata subscription events, // so TTL serves as a safety fallback rather than the primary consistency mechanism type BucketConfigCache struct { - cache map[string]*BucketConfig - mutex sync.RWMutex - ttl time.Duration // Safety fallback TTL; real-time consistency maintained via events + cache map[string]*BucketConfig + negativeCache map[string]time.Time // Cache for non-existent buckets + mutex sync.RWMutex + ttl time.Duration // Safety fallback TTL; real-time consistency maintained via events + negativeTTL time.Duration // TTL for negative cache entries +} + +// BucketMetadata represents the complete metadata for a bucket +type BucketMetadata struct { + Tags map[string]string `json:"tags,omitempty"` + CORS *cors.CORSConfiguration `json:"cors,omitempty"` + Encryption *s3_pb.EncryptionConfiguration `json:"encryption,omitempty"` + // Future extensions can be added here: + // Versioning *s3_pb.VersioningConfiguration `json:"versioning,omitempty"` + // Lifecycle *s3_pb.LifecycleConfiguration `json:"lifecycle,omitempty"` + // Notification *s3_pb.NotificationConfiguration `json:"notification,omitempty"` + // Replication *s3_pb.ReplicationConfiguration `json:"replication,omitempty"` + // Analytics *s3_pb.AnalyticsConfiguration `json:"analytics,omitempty"` + // Logging *s3_pb.LoggingConfiguration `json:"logging,omitempty"` + // Website *s3_pb.WebsiteConfiguration `json:"website,omitempty"` + // RequestPayer *s3_pb.RequestPayerConfiguration `json:"requestPayer,omitempty"` + // PublicAccess *s3_pb.PublicAccessConfiguration `json:"publicAccess,omitempty"` +} + +// NewBucketMetadata creates a new BucketMetadata with default values +func NewBucketMetadata() *BucketMetadata { + return &BucketMetadata{ + Tags: make(map[string]string), + } +} + +// IsEmpty returns true if the metadata has no configuration set +func (bm *BucketMetadata) IsEmpty() bool { + return len(bm.Tags) == 0 && bm.CORS == nil && bm.Encryption == nil +} + +// HasEncryption returns true if bucket has encryption configuration +func (bm *BucketMetadata) HasEncryption() bool { + return bm.Encryption != nil +} + +// HasCORS returns true if bucket has CORS configuration +func (bm *BucketMetadata) HasCORS() bool { + return bm.CORS != nil +} + +// HasTags returns true if bucket has tags +func (bm *BucketMetadata) HasTags() bool { + return len(bm.Tags) > 0 } // NewBucketConfigCache creates a new bucket configuration cache // TTL can be set to a longer duration since cache consistency is maintained // through real-time metadata subscription events rather than TTL expiration func NewBucketConfigCache(ttl time.Duration) *BucketConfigCache { + negativeTTL := ttl / 4 // Negative cache TTL is shorter than positive cache + if negativeTTL < 30*time.Second { + negativeTTL = 30 * time.Second // Minimum 30 seconds for negative cache + } + return &BucketConfigCache{ - cache: make(map[string]*BucketConfig), - ttl: ttl, + cache: make(map[string]*BucketConfig), + negativeCache: make(map[string]time.Time), + ttl: ttl, + negativeTTL: negativeTTL, } } @@ -95,11 +283,49 @@ func (bcc *BucketConfigCache) Clear() { defer bcc.mutex.Unlock() bcc.cache = make(map[string]*BucketConfig) + bcc.negativeCache = make(map[string]time.Time) +} + +// IsNegativelyCached checks if a bucket is in the negative cache (doesn't exist) +func (bcc *BucketConfigCache) IsNegativelyCached(bucket string) bool { + bcc.mutex.RLock() + defer bcc.mutex.RUnlock() + + if cachedTime, exists := bcc.negativeCache[bucket]; exists { + // Check if the negative cache entry is still valid + if time.Since(cachedTime) < bcc.negativeTTL { + return true + } + // Entry expired, remove it + delete(bcc.negativeCache, bucket) + } + return false +} + +// SetNegativeCache marks a bucket as non-existent in the negative cache +func (bcc *BucketConfigCache) SetNegativeCache(bucket string) { + bcc.mutex.Lock() + defer bcc.mutex.Unlock() + + bcc.negativeCache[bucket] = time.Now() +} + +// RemoveNegativeCache removes a bucket from the negative cache +func (bcc *BucketConfigCache) RemoveNegativeCache(bucket string) { + bcc.mutex.Lock() + defer bcc.mutex.Unlock() + + delete(bcc.negativeCache, bucket) } // getBucketConfig retrieves bucket configuration with caching func (s3a *S3ApiServer) getBucketConfig(bucket string) (*BucketConfig, s3err.ErrorCode) { - // Try cache first + // Check negative cache first + if s3a.bucketConfigCache.IsNegativelyCached(bucket) { + return nil, s3err.ErrNoSuchBucket + } + + // Try positive cache if config, found := s3a.bucketConfigCache.Get(bucket); found { return config, s3err.ErrNone } @@ -108,7 +334,8 @@ func (s3a *S3ApiServer) getBucketConfig(bucket string) (*BucketConfig, s3err.Err entry, err := s3a.getEntry(s3a.option.BucketsPath, bucket) if err != nil { if errors.Is(err, filer_pb.ErrNotFound) { - // Bucket doesn't exist + // Bucket doesn't exist - set negative cache + s3a.bucketConfigCache.SetNegativeCache(bucket) return nil, s3err.ErrNoSuchBucket } glog.Errorf("getBucketConfig: failed to get bucket entry for %s: %v", bucket, err) @@ -307,13 +534,13 @@ func (s3a *S3ApiServer) setBucketOwnership(bucket, ownership string) s3err.Error // loadCORSFromBucketContent loads CORS configuration from bucket directory content func (s3a *S3ApiServer) loadCORSFromBucketContent(bucket string) (*cors.CORSConfiguration, error) { - _, corsConfig, err := s3a.getBucketMetadata(bucket) + metadata, err := s3a.GetBucketMetadata(bucket) if err != nil { return nil, err } // Note: corsConfig can be nil if no CORS configuration is set, which is valid - return corsConfig, nil + return metadata.CORS, nil } // getCORSConfiguration retrieves CORS configuration with caching @@ -328,19 +555,10 @@ func (s3a *S3ApiServer) getCORSConfiguration(bucket string) (*cors.CORSConfigura // updateCORSConfiguration updates the CORS configuration for a bucket func (s3a *S3ApiServer) updateCORSConfiguration(bucket string, corsConfig *cors.CORSConfiguration) s3err.ErrorCode { - // Get existing metadata - existingTags, _, err := s3a.getBucketMetadata(bucket) + // Update using structured API + err := s3a.UpdateBucketCORS(bucket, corsConfig) if err != nil { - glog.Errorf("updateCORSConfiguration: failed to get bucket metadata for bucket %s: %v", bucket, err) - return s3err.ErrInternalError - } - - // Update CORS configuration - updatedCorsConfig := corsConfig - - // Store updated metadata - if err := s3a.setBucketMetadata(bucket, existingTags, updatedCorsConfig); err != nil { - glog.Errorf("updateCORSConfiguration: failed to persist CORS config to bucket content for bucket %s: %v", bucket, err) + glog.Errorf("updateCORSConfiguration: failed to update CORS config for bucket %s: %v", bucket, err) return s3err.ErrInternalError } @@ -350,19 +568,10 @@ func (s3a *S3ApiServer) updateCORSConfiguration(bucket string, corsConfig *cors. // removeCORSConfiguration removes the CORS configuration for a bucket func (s3a *S3ApiServer) removeCORSConfiguration(bucket string) s3err.ErrorCode { - // Get existing metadata - existingTags, _, err := s3a.getBucketMetadata(bucket) + // Update using structured API + err := s3a.ClearBucketCORS(bucket) if err != nil { - glog.Errorf("removeCORSConfiguration: failed to get bucket metadata for bucket %s: %v", bucket, err) - return s3err.ErrInternalError - } - - // Remove CORS configuration - var nilCorsConfig *cors.CORSConfiguration = nil - - // Store updated metadata - if err := s3a.setBucketMetadata(bucket, existingTags, nilCorsConfig); err != nil { - glog.Errorf("removeCORSConfiguration: failed to remove CORS config from bucket content for bucket %s: %v", bucket, err) + glog.Errorf("removeCORSConfiguration: failed to remove CORS config for bucket %s: %v", bucket, err) return s3err.ErrInternalError } @@ -466,49 +675,120 @@ func parseAndCachePublicReadStatus(acl []byte) bool { return false } -// getBucketMetadata retrieves bucket metadata from bucket directory content using protobuf -func (s3a *S3ApiServer) getBucketMetadata(bucket string) (map[string]string, *cors.CORSConfiguration, error) { +// getBucketMetadata retrieves bucket metadata as a structured object with caching +func (s3a *S3ApiServer) getBucketMetadata(bucket string) (*BucketMetadata, error) { + if s3a.bucketConfigCache != nil { + // Check negative cache first + if s3a.bucketConfigCache.IsNegativelyCached(bucket) { + return nil, fmt.Errorf("bucket directory not found %s", bucket) + } + + // Try to get from positive cache + if config, found := s3a.bucketConfigCache.Get(bucket); found { + // Extract metadata from cached config + if metadata, err := s3a.extractMetadataFromConfig(config); err == nil { + return metadata, nil + } + // If extraction fails, fall through to direct load + } + } + + // Load directly from filer + return s3a.loadBucketMetadataFromFiler(bucket) +} + +// extractMetadataFromConfig extracts BucketMetadata from cached BucketConfig +func (s3a *S3ApiServer) extractMetadataFromConfig(config *BucketConfig) (*BucketMetadata, error) { + if config == nil || config.Entry == nil { + return NewBucketMetadata(), nil + } + + // Parse metadata from entry content if available + if len(config.Entry.Content) > 0 { + var protoMetadata s3_pb.BucketMetadata + if err := proto.Unmarshal(config.Entry.Content, &protoMetadata); err != nil { + glog.Errorf("extractMetadataFromConfig: failed to unmarshal protobuf metadata for bucket %s: %v", config.Name, err) + return nil, err + } + // Convert protobuf to structured metadata + metadata := &BucketMetadata{ + Tags: protoMetadata.Tags, + CORS: corsConfigFromProto(protoMetadata.Cors), + Encryption: protoMetadata.Encryption, + } + return metadata, nil + } + + // Fallback: create metadata from cached CORS config + metadata := NewBucketMetadata() + if config.CORS != nil { + metadata.CORS = config.CORS + } + + return metadata, nil +} + +// loadBucketMetadataFromFiler loads bucket metadata directly from the filer +func (s3a *S3ApiServer) loadBucketMetadataFromFiler(bucket string) (*BucketMetadata, error) { // Validate bucket name to prevent path traversal attacks if bucket == "" || strings.Contains(bucket, "/") || strings.Contains(bucket, "\\") || strings.Contains(bucket, "..") || strings.Contains(bucket, "~") { - return nil, nil, fmt.Errorf("invalid bucket name: %s", bucket) + return nil, fmt.Errorf("invalid bucket name: %s", bucket) } // Clean the bucket name further to prevent any potential path traversal bucket = filepath.Clean(bucket) if bucket == "." || bucket == ".." { - return nil, nil, fmt.Errorf("invalid bucket name: %s", bucket) + return nil, fmt.Errorf("invalid bucket name: %s", bucket) } // Get bucket directory entry to access its content entry, err := s3a.getEntry(s3a.option.BucketsPath, bucket) if err != nil { - return nil, nil, fmt.Errorf("error retrieving bucket directory %s: %w", bucket, err) + // Check if this is a "not found" error + if errors.Is(err, filer_pb.ErrNotFound) { + // Set negative cache for non-existent bucket + if s3a.bucketConfigCache != nil { + s3a.bucketConfigCache.SetNegativeCache(bucket) + } + } + return nil, fmt.Errorf("error retrieving bucket directory %s: %w", bucket, err) } if entry == nil { - return nil, nil, fmt.Errorf("bucket directory not found %s", bucket) + // Set negative cache for non-existent bucket + if s3a.bucketConfigCache != nil { + s3a.bucketConfigCache.SetNegativeCache(bucket) + } + return nil, fmt.Errorf("bucket directory not found %s", bucket) } // If no content, return empty metadata if len(entry.Content) == 0 { - return make(map[string]string), nil, nil + return NewBucketMetadata(), nil } // Unmarshal metadata from protobuf var protoMetadata s3_pb.BucketMetadata if err := proto.Unmarshal(entry.Content, &protoMetadata); err != nil { glog.Errorf("getBucketMetadata: failed to unmarshal protobuf metadata for bucket %s: %v", bucket, err) - return make(map[string]string), nil, nil // Return empty metadata on error, don't fail + return nil, fmt.Errorf("failed to unmarshal bucket metadata for %s: %w", bucket, err) } // Convert protobuf CORS to standard CORS corsConfig := corsConfigFromProto(protoMetadata.Cors) - return protoMetadata.Tags, corsConfig, nil + // Create and return structured metadata + metadata := &BucketMetadata{ + Tags: protoMetadata.Tags, + CORS: corsConfig, + Encryption: protoMetadata.Encryption, + } + + return metadata, nil } -// setBucketMetadata stores bucket metadata in bucket directory content using protobuf -func (s3a *S3ApiServer) setBucketMetadata(bucket string, tags map[string]string, corsConfig *cors.CORSConfiguration) error { +// setBucketMetadata stores bucket metadata from a structured object +func (s3a *S3ApiServer) setBucketMetadata(bucket string, metadata *BucketMetadata) error { // Validate bucket name to prevent path traversal attacks if bucket == "" || strings.Contains(bucket, "/") || strings.Contains(bucket, "\\") || strings.Contains(bucket, "..") || strings.Contains(bucket, "~") { @@ -521,10 +801,16 @@ func (s3a *S3ApiServer) setBucketMetadata(bucket string, tags map[string]string, return fmt.Errorf("invalid bucket name: %s", bucket) } + // Default to empty metadata if nil + if metadata == nil { + metadata = NewBucketMetadata() + } + // Create protobuf metadata protoMetadata := &s3_pb.BucketMetadata{ - Tags: tags, - Cors: corsConfigToProto(corsConfig), + Tags: metadata.Tags, + Cors: corsConfigToProto(metadata.CORS), + Encryption: metadata.Encryption, } // Marshal metadata to protobuf @@ -555,46 +841,107 @@ func (s3a *S3ApiServer) setBucketMetadata(bucket string, tags map[string]string, _, err = client.UpdateEntry(context.Background(), request) return err }) + + // Invalidate cache after successful update + if err == nil && s3a.bucketConfigCache != nil { + s3a.bucketConfigCache.Remove(bucket) + s3a.bucketConfigCache.RemoveNegativeCache(bucket) // Remove from negative cache too + } + return err } -// getBucketTags retrieves bucket tags from bucket directory content -func (s3a *S3ApiServer) getBucketTags(bucket string) (map[string]string, error) { - tags, _, err := s3a.getBucketMetadata(bucket) +// New structured API functions using BucketMetadata + +// GetBucketMetadata retrieves complete bucket metadata as a structured object +func (s3a *S3ApiServer) GetBucketMetadata(bucket string) (*BucketMetadata, error) { + return s3a.getBucketMetadata(bucket) +} + +// SetBucketMetadata stores complete bucket metadata from a structured object +func (s3a *S3ApiServer) SetBucketMetadata(bucket string, metadata *BucketMetadata) error { + return s3a.setBucketMetadata(bucket, metadata) +} + +// UpdateBucketMetadata updates specific parts of bucket metadata while preserving others +// +// DISTRIBUTED SYSTEM DESIGN NOTE: +// This function implements a read-modify-write pattern with "last write wins" semantics. +// In the rare case of concurrent updates to different parts of bucket metadata +// (e.g., simultaneous tag and CORS updates), the last write may overwrite previous changes. +// +// This is an acceptable trade-off because: +// 1. Bucket metadata updates are infrequent in typical S3 usage +// 2. Traditional locking doesn't work in distributed systems across multiple nodes +// 3. The complexity of distributed consensus (e.g., Raft) for metadata updates would +// be disproportionate to the low frequency of bucket configuration changes +// 4. Most bucket operations (tags, CORS, encryption) are typically configured once +// during setup rather than being frequently modified +// +// If stronger consistency is required, consider implementing optimistic concurrency +// control with version numbers or ETags at the storage layer. +func (s3a *S3ApiServer) UpdateBucketMetadata(bucket string, update func(*BucketMetadata) error) error { + // Get current metadata + metadata, err := s3a.GetBucketMetadata(bucket) if err != nil { - return nil, err + return fmt.Errorf("failed to get current bucket metadata: %w", err) } - if len(tags) == 0 { - return nil, fmt.Errorf("no tags configuration found") + // Apply update function + if err := update(metadata); err != nil { + return fmt.Errorf("failed to apply metadata update: %w", err) } - return tags, nil + // Store updated metadata (last write wins) + return s3a.SetBucketMetadata(bucket, metadata) } -// setBucketTags stores bucket tags in bucket directory content -func (s3a *S3ApiServer) setBucketTags(bucket string, tags map[string]string) error { - // Get existing metadata - _, existingCorsConfig, err := s3a.getBucketMetadata(bucket) - if err != nil { - return err - } +// Helper functions for specific metadata operations using structured API - // Store updated metadata with new tags - err = s3a.setBucketMetadata(bucket, tags, existingCorsConfig) - return err +// UpdateBucketTags sets bucket tags using the structured API +func (s3a *S3ApiServer) UpdateBucketTags(bucket string, tags map[string]string) error { + return s3a.UpdateBucketMetadata(bucket, func(metadata *BucketMetadata) error { + metadata.Tags = tags + return nil + }) } -// deleteBucketTags removes bucket tags from bucket directory content -func (s3a *S3ApiServer) deleteBucketTags(bucket string) error { - // Get existing metadata - _, existingCorsConfig, err := s3a.getBucketMetadata(bucket) - if err != nil { - return err - } +// UpdateBucketCORS sets bucket CORS configuration using the structured API +func (s3a *S3ApiServer) UpdateBucketCORS(bucket string, corsConfig *cors.CORSConfiguration) error { + return s3a.UpdateBucketMetadata(bucket, func(metadata *BucketMetadata) error { + metadata.CORS = corsConfig + return nil + }) +} - // Store updated metadata with empty tags - emptyTags := make(map[string]string) - err = s3a.setBucketMetadata(bucket, emptyTags, existingCorsConfig) - return err +// UpdateBucketEncryption sets bucket encryption configuration using the structured API +func (s3a *S3ApiServer) UpdateBucketEncryption(bucket string, encryptionConfig *s3_pb.EncryptionConfiguration) error { + return s3a.UpdateBucketMetadata(bucket, func(metadata *BucketMetadata) error { + metadata.Encryption = encryptionConfig + return nil + }) +} + +// ClearBucketTags removes all bucket tags using the structured API +func (s3a *S3ApiServer) ClearBucketTags(bucket string) error { + return s3a.UpdateBucketMetadata(bucket, func(metadata *BucketMetadata) error { + metadata.Tags = make(map[string]string) + return nil + }) +} + +// ClearBucketCORS removes bucket CORS configuration using the structured API +func (s3a *S3ApiServer) ClearBucketCORS(bucket string) error { + return s3a.UpdateBucketMetadata(bucket, func(metadata *BucketMetadata) error { + metadata.CORS = nil + return nil + }) +} + +// ClearBucketEncryption removes bucket encryption configuration using the structured API +func (s3a *S3ApiServer) ClearBucketEncryption(bucket string) error { + return s3a.UpdateBucketMetadata(bucket, func(metadata *BucketMetadata) error { + metadata.Encryption = nil + return nil + }) } |
