aboutsummaryrefslogtreecommitdiff
path: root/weed/s3api/s3api_governance_permissions_test.go
diff options
context:
space:
mode:
Diffstat (limited to 'weed/s3api/s3api_governance_permissions_test.go')
-rw-r--r--weed/s3api/s3api_governance_permissions_test.go599
1 files changed, 599 insertions, 0 deletions
diff --git a/weed/s3api/s3api_governance_permissions_test.go b/weed/s3api/s3api_governance_permissions_test.go
new file mode 100644
index 000000000..2b8a35232
--- /dev/null
+++ b/weed/s3api/s3api_governance_permissions_test.go
@@ -0,0 +1,599 @@
+package s3api
+
+import (
+ "net/http"
+ "net/http/httptest"
+ "strings"
+ "testing"
+
+ "github.com/seaweedfs/seaweedfs/weed/s3api/s3_constants"
+)
+
+// TestCheckGovernanceBypassPermissionResourceGeneration tests that the function
+// correctly generates resource paths for the permission check
+func TestCheckGovernanceBypassPermissionResourceGeneration(t *testing.T) {
+ tests := []struct {
+ name string
+ bucket string
+ object string
+ expectedPath string
+ description string
+ }{
+ {
+ name: "simple_object",
+ bucket: "test-bucket",
+ object: "test-object.txt",
+ expectedPath: "test-bucket/test-object.txt",
+ description: "Simple bucket and object should be joined with slash",
+ },
+ {
+ name: "object_with_leading_slash",
+ bucket: "test-bucket",
+ object: "/test-object.txt",
+ expectedPath: "test-bucket/test-object.txt",
+ description: "Leading slash should be trimmed from object name",
+ },
+ {
+ name: "nested_object",
+ bucket: "test-bucket",
+ object: "/folder/subfolder/test-object.txt",
+ expectedPath: "test-bucket/folder/subfolder/test-object.txt",
+ description: "Nested object path should be handled correctly",
+ },
+ {
+ name: "empty_object",
+ bucket: "test-bucket",
+ object: "",
+ expectedPath: "test-bucket/",
+ description: "Empty object should result in bucket with trailing slash",
+ },
+ {
+ name: "root_object",
+ bucket: "test-bucket",
+ object: "/",
+ expectedPath: "test-bucket/",
+ description: "Root object should result in bucket with trailing slash",
+ },
+ }
+
+ for _, tt := range tests {
+ t.Run(tt.name, func(t *testing.T) {
+ // Test the resource generation logic used in checkGovernanceBypassPermission
+ resource := strings.TrimPrefix(tt.object, "/")
+ actualPath := tt.bucket + "/" + resource
+
+ if actualPath != tt.expectedPath {
+ t.Errorf("Resource path generation failed. Expected: %s, Got: %s. %s",
+ tt.expectedPath, actualPath, tt.description)
+ }
+ })
+ }
+}
+
+// TestCheckGovernanceBypassPermissionActionGeneration tests that the function
+// correctly generates action strings for IAM checking
+func TestCheckGovernanceBypassPermissionActionGeneration(t *testing.T) {
+ tests := []struct {
+ name string
+ bucket string
+ object string
+ expectedBypassAction string
+ expectedAdminAction string
+ description string
+ }{
+ {
+ name: "bypass_action_generation",
+ bucket: "test-bucket",
+ object: "test-object.txt",
+ expectedBypassAction: "BypassGovernanceRetention:test-bucket/test-object.txt",
+ expectedAdminAction: "Admin:test-bucket/test-object.txt",
+ description: "Actions should be properly formatted with resource path",
+ },
+ {
+ name: "leading_slash_handling",
+ bucket: "test-bucket",
+ object: "/test-object.txt",
+ expectedBypassAction: "BypassGovernanceRetention:test-bucket/test-object.txt",
+ expectedAdminAction: "Admin:test-bucket/test-object.txt",
+ description: "Leading slash should be trimmed in action generation",
+ },
+ }
+
+ for _, tt := range tests {
+ t.Run(tt.name, func(t *testing.T) {
+ // Test the action generation logic used in checkGovernanceBypassPermission
+ resource := strings.TrimPrefix(tt.object, "/")
+ resourcePath := tt.bucket + "/" + resource
+
+ bypassAction := s3_constants.ACTION_BYPASS_GOVERNANCE_RETENTION + ":" + resourcePath
+ adminAction := s3_constants.ACTION_ADMIN + ":" + resourcePath
+
+ if bypassAction != tt.expectedBypassAction {
+ t.Errorf("Bypass action generation failed. Expected: %s, Got: %s. %s",
+ tt.expectedBypassAction, bypassAction, tt.description)
+ }
+
+ if adminAction != tt.expectedAdminAction {
+ t.Errorf("Admin action generation failed. Expected: %s, Got: %s. %s",
+ tt.expectedAdminAction, adminAction, tt.description)
+ }
+ })
+ }
+}
+
+// TestCheckGovernanceBypassPermissionErrorHandling tests error handling scenarios
+func TestCheckGovernanceBypassPermissionErrorHandling(t *testing.T) {
+ // Note: This test demonstrates the expected behavior for different error scenarios
+ // without requiring full IAM setup
+
+ tests := []struct {
+ name string
+ bucket string
+ object string
+ description string
+ }{
+ {
+ name: "empty_bucket",
+ bucket: "",
+ object: "test-object.txt",
+ description: "Empty bucket should be handled gracefully",
+ },
+ {
+ name: "special_characters",
+ bucket: "test-bucket",
+ object: "test object with spaces.txt",
+ description: "Objects with special characters should be handled",
+ },
+ {
+ name: "unicode_characters",
+ bucket: "test-bucket",
+ object: "测试文件.txt",
+ description: "Objects with unicode characters should be handled",
+ },
+ }
+
+ for _, tt := range tests {
+ t.Run(tt.name, func(t *testing.T) {
+ // Test that the function doesn't panic with various inputs
+ // This would normally call checkGovernanceBypassPermission
+ // but since we don't have a full S3ApiServer setup, we just test
+ // that the resource generation logic works without panicking
+ resource := strings.TrimPrefix(tt.object, "/")
+ resourcePath := tt.bucket + "/" + resource
+
+ // Verify the resource path is generated
+ if resourcePath == "" {
+ t.Errorf("Resource path should not be empty for test case: %s", tt.description)
+ }
+
+ t.Logf("Generated resource path for %s: %s", tt.description, resourcePath)
+ })
+ }
+}
+
+// TestCheckGovernanceBypassPermissionIntegrationBehavior documents the expected behavior
+// when integrated with a full IAM system
+func TestCheckGovernanceBypassPermissionIntegrationBehavior(t *testing.T) {
+ t.Skip("Documentation test - describes expected behavior with full IAM integration")
+
+ // This test documents the expected behavior when checkGovernanceBypassPermission
+ // is called with a full IAM system:
+ //
+ // 1. Function calls s3a.iam.authRequest() with the bypass action
+ // 2. If authRequest returns errCode != s3err.ErrNone, function returns false
+ // 3. If authRequest succeeds, function checks identity.canDo() with the bypass action
+ // 4. If canDo() returns true, function returns true
+ // 5. If bypass permission fails, function checks admin action with identity.canDo()
+ // 6. If admin action succeeds, function returns true and logs admin access
+ // 7. If all checks fail, function returns false
+ //
+ // The function correctly uses:
+ // - s3_constants.ACTION_BYPASS_GOVERNANCE_RETENTION for bypass permission
+ // - s3_constants.ACTION_ADMIN for admin permission
+ // - Proper resource path generation with bucket/object format
+ // - Trimming of leading slashes from object names
+}
+
+// TestGovernanceBypassPermission was removed because it tested the old
+// insecure behavior of trusting the AmzIsAdmin header. The new implementation
+// uses proper IAM authentication instead of relying on client-provided headers.
+
+// Test specifically for users with IAM bypass permission
+func TestGovernanceBypassWithIAMPermission(t *testing.T) {
+ // This test demonstrates the expected behavior for non-admin users with bypass permission
+ // In a real implementation, this would integrate with the full IAM system
+
+ t.Skip("Integration test requires full IAM setup - demonstrates expected behavior")
+
+ // The expected behavior would be:
+ // 1. Non-admin user makes request with bypass header
+ // 2. checkGovernanceBypassPermission calls s3a.iam.authRequest
+ // 3. authRequest validates user identity and checks permissions
+ // 4. If user has s3:BypassGovernanceRetention permission, return true
+ // 5. Otherwise return false
+
+ // For now, the function correctly returns false for non-admin users
+ // when the IAM system doesn't have the user configured with bypass permission
+}
+
+func TestGovernancePermissionIntegration(t *testing.T) {
+ // Note: This test demonstrates the expected integration behavior
+ // In a real implementation, this would require setting up a proper IAM mock
+ // with identities that have the bypass governance permission
+
+ t.Skip("Integration test requires full IAM setup - demonstrates expected behavior")
+
+ // This test would verify:
+ // 1. User with BypassGovernanceRetention permission can bypass governance
+ // 2. User without permission cannot bypass governance
+ // 3. Admin users can always bypass governance
+ // 4. Anonymous users cannot bypass governance
+}
+
+func TestGovernanceBypassHeader(t *testing.T) {
+ tests := []struct {
+ name string
+ headerValue string
+ expectedResult bool
+ description string
+ }{
+ {
+ name: "bypass_header_true",
+ headerValue: "true",
+ expectedResult: true,
+ description: "Header with 'true' value should enable bypass",
+ },
+ {
+ name: "bypass_header_false",
+ headerValue: "false",
+ expectedResult: false,
+ description: "Header with 'false' value should not enable bypass",
+ },
+ {
+ name: "bypass_header_empty",
+ headerValue: "",
+ expectedResult: false,
+ description: "Empty header should not enable bypass",
+ },
+ {
+ name: "bypass_header_invalid",
+ headerValue: "invalid",
+ expectedResult: false,
+ description: "Invalid header value should not enable bypass",
+ },
+ }
+
+ for _, tt := range tests {
+ t.Run(tt.name, func(t *testing.T) {
+ req := httptest.NewRequest("DELETE", "/bucket/object", nil)
+ if tt.headerValue != "" {
+ req.Header.Set("x-amz-bypass-governance-retention", tt.headerValue)
+ }
+
+ result := req.Header.Get("x-amz-bypass-governance-retention") == "true"
+
+ if result != tt.expectedResult {
+ t.Errorf("bypass header check = %v, want %v. %s", result, tt.expectedResult, tt.description)
+ }
+ })
+ }
+}
+
+func TestGovernanceRetentionModeChecking(t *testing.T) {
+ tests := []struct {
+ name string
+ retentionMode string
+ bypassGovernance bool
+ hasPermission bool
+ expectedError bool
+ expectedErrorType string
+ description string
+ }{
+ {
+ name: "compliance_mode_cannot_bypass",
+ retentionMode: s3_constants.RetentionModeCompliance,
+ bypassGovernance: true,
+ hasPermission: true,
+ expectedError: true,
+ expectedErrorType: "compliance mode",
+ description: "Compliance mode should not be bypassable even with permission",
+ },
+ {
+ name: "governance_mode_without_bypass",
+ retentionMode: s3_constants.RetentionModeGovernance,
+ bypassGovernance: false,
+ hasPermission: false,
+ expectedError: true,
+ expectedErrorType: "governance mode",
+ description: "Governance mode should be blocked without bypass",
+ },
+ {
+ name: "governance_mode_with_bypass_no_permission",
+ retentionMode: s3_constants.RetentionModeGovernance,
+ bypassGovernance: true,
+ hasPermission: false,
+ expectedError: true,
+ expectedErrorType: "permission",
+ description: "Governance mode bypass should fail without permission",
+ },
+ {
+ name: "governance_mode_with_bypass_and_permission",
+ retentionMode: s3_constants.RetentionModeGovernance,
+ bypassGovernance: true,
+ hasPermission: true,
+ expectedError: false,
+ expectedErrorType: "",
+ description: "Governance mode bypass should succeed with permission",
+ },
+ }
+
+ for _, tt := range tests {
+ t.Run(tt.name, func(t *testing.T) {
+ // Test validates the logic without actually needing the full implementation
+ // This demonstrates the expected behavior patterns
+
+ var hasError bool
+ var errorType string
+
+ if tt.retentionMode == s3_constants.RetentionModeCompliance {
+ hasError = true
+ errorType = "compliance mode"
+ } else if tt.retentionMode == s3_constants.RetentionModeGovernance {
+ if !tt.bypassGovernance {
+ hasError = true
+ errorType = "governance mode"
+ } else if !tt.hasPermission {
+ hasError = true
+ errorType = "permission"
+ }
+ }
+
+ if hasError != tt.expectedError {
+ t.Errorf("expected error: %v, got error: %v. %s", tt.expectedError, hasError, tt.description)
+ }
+
+ if tt.expectedError && !strings.Contains(errorType, tt.expectedErrorType) {
+ t.Errorf("expected error type containing '%s', got '%s'. %s", tt.expectedErrorType, errorType, tt.description)
+ }
+ })
+ }
+}
+
+func TestGovernancePermissionActionGeneration(t *testing.T) {
+ tests := []struct {
+ name string
+ bucket string
+ object string
+ expectedAction string
+ description string
+ }{
+ {
+ name: "bucket_and_object_action",
+ bucket: "test-bucket",
+ object: "/test-object", // Object has "/" prefix from GetBucketAndObject
+ expectedAction: "BypassGovernanceRetention:test-bucket/test-object",
+ description: "Action should be generated correctly for bucket and object",
+ },
+ {
+ name: "bucket_only_action",
+ bucket: "test-bucket",
+ object: "",
+ expectedAction: "BypassGovernanceRetention:test-bucket",
+ description: "Action should be generated correctly for bucket only",
+ },
+ {
+ name: "nested_object_action",
+ bucket: "test-bucket",
+ object: "/folder/subfolder/object", // Object has "/" prefix from GetBucketAndObject
+ expectedAction: "BypassGovernanceRetention:test-bucket/folder/subfolder/object",
+ description: "Action should be generated correctly for nested objects",
+ },
+ }
+
+ for _, tt := range tests {
+ t.Run(tt.name, func(t *testing.T) {
+ action := s3_constants.ACTION_BYPASS_GOVERNANCE_RETENTION + ":" + tt.bucket + tt.object
+
+ if action != tt.expectedAction {
+ t.Errorf("generated action: %s, expected: %s. %s", action, tt.expectedAction, tt.description)
+ }
+ })
+ }
+}
+
+// TestGovernancePermissionEndToEnd tests the complete object lock permission flow
+func TestGovernancePermissionEndToEnd(t *testing.T) {
+ t.Skip("End-to-end testing requires full S3 API server setup - demonstrates expected behavior")
+
+ // This test demonstrates the end-to-end flow that would be tested in a full integration test
+ // The checkObjectLockPermissions method is called by:
+ // 1. DeleteObjectHandler - when versioning is enabled and object lock is configured
+ // 2. DeleteMultipleObjectsHandler - for each object in versioned buckets
+ // 3. PutObjectHandler - via checkObjectLockPermissionsForPut for versioned buckets
+ // 4. PutObjectRetentionHandler - when setting retention on objects
+ //
+ // Each handler:
+ // - Extracts bypassGovernance from "x-amz-bypass-governance-retention" header
+ // - Calls checkObjectLockPermissions with the appropriate parameters
+ // - Handles the returned errors appropriately (ErrAccessDenied, etc.)
+ //
+ // The method integrates with the IAM system through checkGovernanceBypassPermission
+ // which validates the s3:BypassGovernanceRetention permission
+}
+
+// TestGovernancePermissionHTTPFlow tests the HTTP header processing and method calls
+func TestGovernancePermissionHTTPFlow(t *testing.T) {
+ tests := []struct {
+ name string
+ headerValue string
+ expectedBypassGovernance bool
+ }{
+ {
+ name: "bypass_header_true",
+ headerValue: "true",
+ expectedBypassGovernance: true,
+ },
+ {
+ name: "bypass_header_false",
+ headerValue: "false",
+ expectedBypassGovernance: false,
+ },
+ {
+ name: "bypass_header_missing",
+ headerValue: "",
+ expectedBypassGovernance: false,
+ },
+ }
+
+ for _, tt := range tests {
+ t.Run(tt.name, func(t *testing.T) {
+ // Create a mock HTTP request
+ req, _ := http.NewRequest("DELETE", "/bucket/test-object", nil)
+ if tt.headerValue != "" {
+ req.Header.Set("x-amz-bypass-governance-retention", tt.headerValue)
+ }
+
+ // Test the header processing logic used in handlers
+ bypassGovernance := req.Header.Get("x-amz-bypass-governance-retention") == "true"
+
+ if bypassGovernance != tt.expectedBypassGovernance {
+ t.Errorf("Expected bypassGovernance to be %v, got %v", tt.expectedBypassGovernance, bypassGovernance)
+ }
+ })
+ }
+}
+
+// TestGovernancePermissionMethodCalls tests that the governance permission methods are called correctly
+func TestGovernancePermissionMethodCalls(t *testing.T) {
+ // Test that demonstrates the method call pattern used in handlers
+
+ // This is the pattern used in DeleteObjectHandler:
+ t.Run("delete_object_handler_pattern", func(t *testing.T) {
+ req, _ := http.NewRequest("DELETE", "/bucket/test-object", nil)
+ req.Header.Set("x-amz-bypass-governance-retention", "true")
+
+ // Extract parameters as done in the handler
+ bucket, object := s3_constants.GetBucketAndObject(req)
+ versionId := req.URL.Query().Get("versionId")
+ bypassGovernance := req.Header.Get("x-amz-bypass-governance-retention") == "true"
+
+ // Verify the parameters are extracted correctly
+ // Note: The actual bucket and object extraction depends on the URL structure
+ t.Logf("Extracted bucket: %s, object: %s", bucket, object)
+ if versionId != "" {
+ t.Errorf("Expected versionId to be empty, got %v", versionId)
+ }
+ if !bypassGovernance {
+ t.Errorf("Expected bypassGovernance to be true")
+ }
+ })
+
+ // This is the pattern used in PutObjectHandler:
+ t.Run("put_object_handler_pattern", func(t *testing.T) {
+ req, _ := http.NewRequest("PUT", "/bucket/test-object", nil)
+ req.Header.Set("x-amz-bypass-governance-retention", "true")
+
+ // Extract parameters as done in the handler
+ bucket, object := s3_constants.GetBucketAndObject(req)
+ bypassGovernance := req.Header.Get("x-amz-bypass-governance-retention") == "true"
+ versioningEnabled := true // Would be determined by isVersioningEnabled(bucket)
+
+ // Verify the parameters are extracted correctly
+ // Note: The actual bucket and object extraction depends on the URL structure
+ t.Logf("Extracted bucket: %s, object: %s", bucket, object)
+ if !bypassGovernance {
+ t.Errorf("Expected bypassGovernance to be true")
+ }
+ if !versioningEnabled {
+ t.Errorf("Expected versioningEnabled to be true")
+ }
+ })
+}
+
+// TestGovernanceBypassNotPermittedError tests that ErrGovernanceBypassNotPermitted
+// is returned when bypass is requested but the user lacks permission
+func TestGovernanceBypassNotPermittedError(t *testing.T) {
+ // Test the error constant itself
+ if ErrGovernanceBypassNotPermitted == nil {
+ t.Error("ErrGovernanceBypassNotPermitted should be defined")
+ }
+
+ // Verify the error message
+ expectedMessage := "user does not have permission to bypass governance retention"
+ if ErrGovernanceBypassNotPermitted.Error() != expectedMessage {
+ t.Errorf("expected error message '%s', got '%s'",
+ expectedMessage, ErrGovernanceBypassNotPermitted.Error())
+ }
+
+ // Test the scenario where this error should be returned
+ // This documents the expected behavior when:
+ // 1. Object is under governance retention
+ // 2. bypassGovernance is true
+ // 3. checkGovernanceBypassPermission returns false
+ testCases := []struct {
+ name string
+ retentionMode string
+ bypassGovernance bool
+ hasPermission bool
+ expectedError error
+ description string
+ }{
+ {
+ name: "governance_bypass_without_permission",
+ retentionMode: s3_constants.RetentionModeGovernance,
+ bypassGovernance: true,
+ hasPermission: false,
+ expectedError: ErrGovernanceBypassNotPermitted,
+ description: "Should return ErrGovernanceBypassNotPermitted when bypass is requested but user lacks permission",
+ },
+ {
+ name: "governance_bypass_with_permission",
+ retentionMode: s3_constants.RetentionModeGovernance,
+ bypassGovernance: true,
+ hasPermission: true,
+ expectedError: nil,
+ description: "Should succeed when bypass is requested and user has permission",
+ },
+ {
+ name: "governance_no_bypass",
+ retentionMode: s3_constants.RetentionModeGovernance,
+ bypassGovernance: false,
+ hasPermission: false,
+ expectedError: ErrGovernanceModeActive,
+ description: "Should return ErrGovernanceModeActive when bypass is not requested",
+ },
+ }
+
+ for _, tc := range testCases {
+ t.Run(tc.name, func(t *testing.T) {
+ // This test documents the expected behavior pattern
+ // The actual checkObjectLockPermissions method implements this logic:
+ // if retention.Mode == s3_constants.RetentionModeGovernance {
+ // if !bypassGovernance {
+ // return ErrGovernanceModeActive
+ // }
+ // if !s3a.checkGovernanceBypassPermission(request, bucket, object) {
+ // return ErrGovernanceBypassNotPermitted
+ // }
+ // }
+
+ var simulatedError error
+ if tc.retentionMode == s3_constants.RetentionModeGovernance {
+ if !tc.bypassGovernance {
+ simulatedError = ErrGovernanceModeActive
+ } else if !tc.hasPermission {
+ simulatedError = ErrGovernanceBypassNotPermitted
+ }
+ }
+
+ if simulatedError != tc.expectedError {
+ t.Errorf("expected error %v, got %v. %s", tc.expectedError, simulatedError, tc.description)
+ }
+
+ // Verify ErrGovernanceBypassNotPermitted is returned in the right case
+ if tc.name == "governance_bypass_without_permission" && simulatedError != ErrGovernanceBypassNotPermitted {
+ t.Errorf("Test case should return ErrGovernanceBypassNotPermitted but got %v", simulatedError)
+ }
+ })
+ }
+}