diff options
Diffstat (limited to 'weed/s3api/s3api_governance_permissions_test.go')
| -rw-r--r-- | weed/s3api/s3api_governance_permissions_test.go | 599 |
1 files changed, 599 insertions, 0 deletions
diff --git a/weed/s3api/s3api_governance_permissions_test.go b/weed/s3api/s3api_governance_permissions_test.go new file mode 100644 index 000000000..2b8a35232 --- /dev/null +++ b/weed/s3api/s3api_governance_permissions_test.go @@ -0,0 +1,599 @@ +package s3api + +import ( + "net/http" + "net/http/httptest" + "strings" + "testing" + + "github.com/seaweedfs/seaweedfs/weed/s3api/s3_constants" +) + +// TestCheckGovernanceBypassPermissionResourceGeneration tests that the function +// correctly generates resource paths for the permission check +func TestCheckGovernanceBypassPermissionResourceGeneration(t *testing.T) { + tests := []struct { + name string + bucket string + object string + expectedPath string + description string + }{ + { + name: "simple_object", + bucket: "test-bucket", + object: "test-object.txt", + expectedPath: "test-bucket/test-object.txt", + description: "Simple bucket and object should be joined with slash", + }, + { + name: "object_with_leading_slash", + bucket: "test-bucket", + object: "/test-object.txt", + expectedPath: "test-bucket/test-object.txt", + description: "Leading slash should be trimmed from object name", + }, + { + name: "nested_object", + bucket: "test-bucket", + object: "/folder/subfolder/test-object.txt", + expectedPath: "test-bucket/folder/subfolder/test-object.txt", + description: "Nested object path should be handled correctly", + }, + { + name: "empty_object", + bucket: "test-bucket", + object: "", + expectedPath: "test-bucket/", + description: "Empty object should result in bucket with trailing slash", + }, + { + name: "root_object", + bucket: "test-bucket", + object: "/", + expectedPath: "test-bucket/", + description: "Root object should result in bucket with trailing slash", + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + // Test the resource generation logic used in checkGovernanceBypassPermission + resource := strings.TrimPrefix(tt.object, "/") + actualPath := tt.bucket + "/" + resource + + if actualPath != tt.expectedPath { + t.Errorf("Resource path generation failed. Expected: %s, Got: %s. %s", + tt.expectedPath, actualPath, tt.description) + } + }) + } +} + +// TestCheckGovernanceBypassPermissionActionGeneration tests that the function +// correctly generates action strings for IAM checking +func TestCheckGovernanceBypassPermissionActionGeneration(t *testing.T) { + tests := []struct { + name string + bucket string + object string + expectedBypassAction string + expectedAdminAction string + description string + }{ + { + name: "bypass_action_generation", + bucket: "test-bucket", + object: "test-object.txt", + expectedBypassAction: "BypassGovernanceRetention:test-bucket/test-object.txt", + expectedAdminAction: "Admin:test-bucket/test-object.txt", + description: "Actions should be properly formatted with resource path", + }, + { + name: "leading_slash_handling", + bucket: "test-bucket", + object: "/test-object.txt", + expectedBypassAction: "BypassGovernanceRetention:test-bucket/test-object.txt", + expectedAdminAction: "Admin:test-bucket/test-object.txt", + description: "Leading slash should be trimmed in action generation", + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + // Test the action generation logic used in checkGovernanceBypassPermission + resource := strings.TrimPrefix(tt.object, "/") + resourcePath := tt.bucket + "/" + resource + + bypassAction := s3_constants.ACTION_BYPASS_GOVERNANCE_RETENTION + ":" + resourcePath + adminAction := s3_constants.ACTION_ADMIN + ":" + resourcePath + + if bypassAction != tt.expectedBypassAction { + t.Errorf("Bypass action generation failed. Expected: %s, Got: %s. %s", + tt.expectedBypassAction, bypassAction, tt.description) + } + + if adminAction != tt.expectedAdminAction { + t.Errorf("Admin action generation failed. Expected: %s, Got: %s. %s", + tt.expectedAdminAction, adminAction, tt.description) + } + }) + } +} + +// TestCheckGovernanceBypassPermissionErrorHandling tests error handling scenarios +func TestCheckGovernanceBypassPermissionErrorHandling(t *testing.T) { + // Note: This test demonstrates the expected behavior for different error scenarios + // without requiring full IAM setup + + tests := []struct { + name string + bucket string + object string + description string + }{ + { + name: "empty_bucket", + bucket: "", + object: "test-object.txt", + description: "Empty bucket should be handled gracefully", + }, + { + name: "special_characters", + bucket: "test-bucket", + object: "test object with spaces.txt", + description: "Objects with special characters should be handled", + }, + { + name: "unicode_characters", + bucket: "test-bucket", + object: "测试文件.txt", + description: "Objects with unicode characters should be handled", + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + // Test that the function doesn't panic with various inputs + // This would normally call checkGovernanceBypassPermission + // but since we don't have a full S3ApiServer setup, we just test + // that the resource generation logic works without panicking + resource := strings.TrimPrefix(tt.object, "/") + resourcePath := tt.bucket + "/" + resource + + // Verify the resource path is generated + if resourcePath == "" { + t.Errorf("Resource path should not be empty for test case: %s", tt.description) + } + + t.Logf("Generated resource path for %s: %s", tt.description, resourcePath) + }) + } +} + +// TestCheckGovernanceBypassPermissionIntegrationBehavior documents the expected behavior +// when integrated with a full IAM system +func TestCheckGovernanceBypassPermissionIntegrationBehavior(t *testing.T) { + t.Skip("Documentation test - describes expected behavior with full IAM integration") + + // This test documents the expected behavior when checkGovernanceBypassPermission + // is called with a full IAM system: + // + // 1. Function calls s3a.iam.authRequest() with the bypass action + // 2. If authRequest returns errCode != s3err.ErrNone, function returns false + // 3. If authRequest succeeds, function checks identity.canDo() with the bypass action + // 4. If canDo() returns true, function returns true + // 5. If bypass permission fails, function checks admin action with identity.canDo() + // 6. If admin action succeeds, function returns true and logs admin access + // 7. If all checks fail, function returns false + // + // The function correctly uses: + // - s3_constants.ACTION_BYPASS_GOVERNANCE_RETENTION for bypass permission + // - s3_constants.ACTION_ADMIN for admin permission + // - Proper resource path generation with bucket/object format + // - Trimming of leading slashes from object names +} + +// TestGovernanceBypassPermission was removed because it tested the old +// insecure behavior of trusting the AmzIsAdmin header. The new implementation +// uses proper IAM authentication instead of relying on client-provided headers. + +// Test specifically for users with IAM bypass permission +func TestGovernanceBypassWithIAMPermission(t *testing.T) { + // This test demonstrates the expected behavior for non-admin users with bypass permission + // In a real implementation, this would integrate with the full IAM system + + t.Skip("Integration test requires full IAM setup - demonstrates expected behavior") + + // The expected behavior would be: + // 1. Non-admin user makes request with bypass header + // 2. checkGovernanceBypassPermission calls s3a.iam.authRequest + // 3. authRequest validates user identity and checks permissions + // 4. If user has s3:BypassGovernanceRetention permission, return true + // 5. Otherwise return false + + // For now, the function correctly returns false for non-admin users + // when the IAM system doesn't have the user configured with bypass permission +} + +func TestGovernancePermissionIntegration(t *testing.T) { + // Note: This test demonstrates the expected integration behavior + // In a real implementation, this would require setting up a proper IAM mock + // with identities that have the bypass governance permission + + t.Skip("Integration test requires full IAM setup - demonstrates expected behavior") + + // This test would verify: + // 1. User with BypassGovernanceRetention permission can bypass governance + // 2. User without permission cannot bypass governance + // 3. Admin users can always bypass governance + // 4. Anonymous users cannot bypass governance +} + +func TestGovernanceBypassHeader(t *testing.T) { + tests := []struct { + name string + headerValue string + expectedResult bool + description string + }{ + { + name: "bypass_header_true", + headerValue: "true", + expectedResult: true, + description: "Header with 'true' value should enable bypass", + }, + { + name: "bypass_header_false", + headerValue: "false", + expectedResult: false, + description: "Header with 'false' value should not enable bypass", + }, + { + name: "bypass_header_empty", + headerValue: "", + expectedResult: false, + description: "Empty header should not enable bypass", + }, + { + name: "bypass_header_invalid", + headerValue: "invalid", + expectedResult: false, + description: "Invalid header value should not enable bypass", + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + req := httptest.NewRequest("DELETE", "/bucket/object", nil) + if tt.headerValue != "" { + req.Header.Set("x-amz-bypass-governance-retention", tt.headerValue) + } + + result := req.Header.Get("x-amz-bypass-governance-retention") == "true" + + if result != tt.expectedResult { + t.Errorf("bypass header check = %v, want %v. %s", result, tt.expectedResult, tt.description) + } + }) + } +} + +func TestGovernanceRetentionModeChecking(t *testing.T) { + tests := []struct { + name string + retentionMode string + bypassGovernance bool + hasPermission bool + expectedError bool + expectedErrorType string + description string + }{ + { + name: "compliance_mode_cannot_bypass", + retentionMode: s3_constants.RetentionModeCompliance, + bypassGovernance: true, + hasPermission: true, + expectedError: true, + expectedErrorType: "compliance mode", + description: "Compliance mode should not be bypassable even with permission", + }, + { + name: "governance_mode_without_bypass", + retentionMode: s3_constants.RetentionModeGovernance, + bypassGovernance: false, + hasPermission: false, + expectedError: true, + expectedErrorType: "governance mode", + description: "Governance mode should be blocked without bypass", + }, + { + name: "governance_mode_with_bypass_no_permission", + retentionMode: s3_constants.RetentionModeGovernance, + bypassGovernance: true, + hasPermission: false, + expectedError: true, + expectedErrorType: "permission", + description: "Governance mode bypass should fail without permission", + }, + { + name: "governance_mode_with_bypass_and_permission", + retentionMode: s3_constants.RetentionModeGovernance, + bypassGovernance: true, + hasPermission: true, + expectedError: false, + expectedErrorType: "", + description: "Governance mode bypass should succeed with permission", + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + // Test validates the logic without actually needing the full implementation + // This demonstrates the expected behavior patterns + + var hasError bool + var errorType string + + if tt.retentionMode == s3_constants.RetentionModeCompliance { + hasError = true + errorType = "compliance mode" + } else if tt.retentionMode == s3_constants.RetentionModeGovernance { + if !tt.bypassGovernance { + hasError = true + errorType = "governance mode" + } else if !tt.hasPermission { + hasError = true + errorType = "permission" + } + } + + if hasError != tt.expectedError { + t.Errorf("expected error: %v, got error: %v. %s", tt.expectedError, hasError, tt.description) + } + + if tt.expectedError && !strings.Contains(errorType, tt.expectedErrorType) { + t.Errorf("expected error type containing '%s', got '%s'. %s", tt.expectedErrorType, errorType, tt.description) + } + }) + } +} + +func TestGovernancePermissionActionGeneration(t *testing.T) { + tests := []struct { + name string + bucket string + object string + expectedAction string + description string + }{ + { + name: "bucket_and_object_action", + bucket: "test-bucket", + object: "/test-object", // Object has "/" prefix from GetBucketAndObject + expectedAction: "BypassGovernanceRetention:test-bucket/test-object", + description: "Action should be generated correctly for bucket and object", + }, + { + name: "bucket_only_action", + bucket: "test-bucket", + object: "", + expectedAction: "BypassGovernanceRetention:test-bucket", + description: "Action should be generated correctly for bucket only", + }, + { + name: "nested_object_action", + bucket: "test-bucket", + object: "/folder/subfolder/object", // Object has "/" prefix from GetBucketAndObject + expectedAction: "BypassGovernanceRetention:test-bucket/folder/subfolder/object", + description: "Action should be generated correctly for nested objects", + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + action := s3_constants.ACTION_BYPASS_GOVERNANCE_RETENTION + ":" + tt.bucket + tt.object + + if action != tt.expectedAction { + t.Errorf("generated action: %s, expected: %s. %s", action, tt.expectedAction, tt.description) + } + }) + } +} + +// TestGovernancePermissionEndToEnd tests the complete object lock permission flow +func TestGovernancePermissionEndToEnd(t *testing.T) { + t.Skip("End-to-end testing requires full S3 API server setup - demonstrates expected behavior") + + // This test demonstrates the end-to-end flow that would be tested in a full integration test + // The checkObjectLockPermissions method is called by: + // 1. DeleteObjectHandler - when versioning is enabled and object lock is configured + // 2. DeleteMultipleObjectsHandler - for each object in versioned buckets + // 3. PutObjectHandler - via checkObjectLockPermissionsForPut for versioned buckets + // 4. PutObjectRetentionHandler - when setting retention on objects + // + // Each handler: + // - Extracts bypassGovernance from "x-amz-bypass-governance-retention" header + // - Calls checkObjectLockPermissions with the appropriate parameters + // - Handles the returned errors appropriately (ErrAccessDenied, etc.) + // + // The method integrates with the IAM system through checkGovernanceBypassPermission + // which validates the s3:BypassGovernanceRetention permission +} + +// TestGovernancePermissionHTTPFlow tests the HTTP header processing and method calls +func TestGovernancePermissionHTTPFlow(t *testing.T) { + tests := []struct { + name string + headerValue string + expectedBypassGovernance bool + }{ + { + name: "bypass_header_true", + headerValue: "true", + expectedBypassGovernance: true, + }, + { + name: "bypass_header_false", + headerValue: "false", + expectedBypassGovernance: false, + }, + { + name: "bypass_header_missing", + headerValue: "", + expectedBypassGovernance: false, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + // Create a mock HTTP request + req, _ := http.NewRequest("DELETE", "/bucket/test-object", nil) + if tt.headerValue != "" { + req.Header.Set("x-amz-bypass-governance-retention", tt.headerValue) + } + + // Test the header processing logic used in handlers + bypassGovernance := req.Header.Get("x-amz-bypass-governance-retention") == "true" + + if bypassGovernance != tt.expectedBypassGovernance { + t.Errorf("Expected bypassGovernance to be %v, got %v", tt.expectedBypassGovernance, bypassGovernance) + } + }) + } +} + +// TestGovernancePermissionMethodCalls tests that the governance permission methods are called correctly +func TestGovernancePermissionMethodCalls(t *testing.T) { + // Test that demonstrates the method call pattern used in handlers + + // This is the pattern used in DeleteObjectHandler: + t.Run("delete_object_handler_pattern", func(t *testing.T) { + req, _ := http.NewRequest("DELETE", "/bucket/test-object", nil) + req.Header.Set("x-amz-bypass-governance-retention", "true") + + // Extract parameters as done in the handler + bucket, object := s3_constants.GetBucketAndObject(req) + versionId := req.URL.Query().Get("versionId") + bypassGovernance := req.Header.Get("x-amz-bypass-governance-retention") == "true" + + // Verify the parameters are extracted correctly + // Note: The actual bucket and object extraction depends on the URL structure + t.Logf("Extracted bucket: %s, object: %s", bucket, object) + if versionId != "" { + t.Errorf("Expected versionId to be empty, got %v", versionId) + } + if !bypassGovernance { + t.Errorf("Expected bypassGovernance to be true") + } + }) + + // This is the pattern used in PutObjectHandler: + t.Run("put_object_handler_pattern", func(t *testing.T) { + req, _ := http.NewRequest("PUT", "/bucket/test-object", nil) + req.Header.Set("x-amz-bypass-governance-retention", "true") + + // Extract parameters as done in the handler + bucket, object := s3_constants.GetBucketAndObject(req) + bypassGovernance := req.Header.Get("x-amz-bypass-governance-retention") == "true" + versioningEnabled := true // Would be determined by isVersioningEnabled(bucket) + + // Verify the parameters are extracted correctly + // Note: The actual bucket and object extraction depends on the URL structure + t.Logf("Extracted bucket: %s, object: %s", bucket, object) + if !bypassGovernance { + t.Errorf("Expected bypassGovernance to be true") + } + if !versioningEnabled { + t.Errorf("Expected versioningEnabled to be true") + } + }) +} + +// TestGovernanceBypassNotPermittedError tests that ErrGovernanceBypassNotPermitted +// is returned when bypass is requested but the user lacks permission +func TestGovernanceBypassNotPermittedError(t *testing.T) { + // Test the error constant itself + if ErrGovernanceBypassNotPermitted == nil { + t.Error("ErrGovernanceBypassNotPermitted should be defined") + } + + // Verify the error message + expectedMessage := "user does not have permission to bypass governance retention" + if ErrGovernanceBypassNotPermitted.Error() != expectedMessage { + t.Errorf("expected error message '%s', got '%s'", + expectedMessage, ErrGovernanceBypassNotPermitted.Error()) + } + + // Test the scenario where this error should be returned + // This documents the expected behavior when: + // 1. Object is under governance retention + // 2. bypassGovernance is true + // 3. checkGovernanceBypassPermission returns false + testCases := []struct { + name string + retentionMode string + bypassGovernance bool + hasPermission bool + expectedError error + description string + }{ + { + name: "governance_bypass_without_permission", + retentionMode: s3_constants.RetentionModeGovernance, + bypassGovernance: true, + hasPermission: false, + expectedError: ErrGovernanceBypassNotPermitted, + description: "Should return ErrGovernanceBypassNotPermitted when bypass is requested but user lacks permission", + }, + { + name: "governance_bypass_with_permission", + retentionMode: s3_constants.RetentionModeGovernance, + bypassGovernance: true, + hasPermission: true, + expectedError: nil, + description: "Should succeed when bypass is requested and user has permission", + }, + { + name: "governance_no_bypass", + retentionMode: s3_constants.RetentionModeGovernance, + bypassGovernance: false, + hasPermission: false, + expectedError: ErrGovernanceModeActive, + description: "Should return ErrGovernanceModeActive when bypass is not requested", + }, + } + + for _, tc := range testCases { + t.Run(tc.name, func(t *testing.T) { + // This test documents the expected behavior pattern + // The actual checkObjectLockPermissions method implements this logic: + // if retention.Mode == s3_constants.RetentionModeGovernance { + // if !bypassGovernance { + // return ErrGovernanceModeActive + // } + // if !s3a.checkGovernanceBypassPermission(request, bucket, object) { + // return ErrGovernanceBypassNotPermitted + // } + // } + + var simulatedError error + if tc.retentionMode == s3_constants.RetentionModeGovernance { + if !tc.bypassGovernance { + simulatedError = ErrGovernanceModeActive + } else if !tc.hasPermission { + simulatedError = ErrGovernanceBypassNotPermitted + } + } + + if simulatedError != tc.expectedError { + t.Errorf("expected error %v, got %v. %s", tc.expectedError, simulatedError, tc.description) + } + + // Verify ErrGovernanceBypassNotPermitted is returned in the right case + if tc.name == "governance_bypass_without_permission" && simulatedError != ErrGovernanceBypassNotPermitted { + t.Errorf("Test case should return ErrGovernanceBypassNotPermitted but got %v", simulatedError) + } + }) + } +} |
