aboutsummaryrefslogtreecommitdiff
path: root/weed/s3api/s3_sse_metadata_test.go
diff options
context:
space:
mode:
Diffstat (limited to 'weed/s3api/s3_sse_metadata_test.go')
-rw-r--r--weed/s3api/s3_sse_metadata_test.go328
1 files changed, 328 insertions, 0 deletions
diff --git a/weed/s3api/s3_sse_metadata_test.go b/weed/s3api/s3_sse_metadata_test.go
new file mode 100644
index 000000000..c0c1360af
--- /dev/null
+++ b/weed/s3api/s3_sse_metadata_test.go
@@ -0,0 +1,328 @@
+package s3api
+
+import (
+ "testing"
+
+ "github.com/seaweedfs/seaweedfs/weed/s3api/s3_constants"
+)
+
+// TestSSECIsEncrypted tests detection of SSE-C encryption from metadata
+func TestSSECIsEncrypted(t *testing.T) {
+ testCases := []struct {
+ name string
+ metadata map[string][]byte
+ expected bool
+ }{
+ {
+ name: "Empty metadata",
+ metadata: CreateTestMetadata(),
+ expected: false,
+ },
+ {
+ name: "Valid SSE-C metadata",
+ metadata: CreateTestMetadataWithSSEC(GenerateTestSSECKey(1)),
+ expected: true,
+ },
+ {
+ name: "SSE-C algorithm only",
+ metadata: map[string][]byte{
+ s3_constants.AmzServerSideEncryptionCustomerAlgorithm: []byte("AES256"),
+ },
+ expected: true,
+ },
+ {
+ name: "SSE-C key MD5 only",
+ metadata: map[string][]byte{
+ s3_constants.AmzServerSideEncryptionCustomerKeyMD5: []byte("somemd5"),
+ },
+ expected: true,
+ },
+ {
+ name: "Other encryption type (SSE-KMS)",
+ metadata: map[string][]byte{
+ s3_constants.AmzServerSideEncryption: []byte("aws:kms"),
+ },
+ expected: false,
+ },
+ }
+
+ for _, tc := range testCases {
+ t.Run(tc.name, func(t *testing.T) {
+ result := IsSSECEncrypted(tc.metadata)
+ if result != tc.expected {
+ t.Errorf("Expected %v, got %v", tc.expected, result)
+ }
+ })
+ }
+}
+
+// TestSSEKMSIsEncrypted tests detection of SSE-KMS encryption from metadata
+func TestSSEKMSIsEncrypted(t *testing.T) {
+ testCases := []struct {
+ name string
+ metadata map[string][]byte
+ expected bool
+ }{
+ {
+ name: "Empty metadata",
+ metadata: CreateTestMetadata(),
+ expected: false,
+ },
+ {
+ name: "Valid SSE-KMS metadata",
+ metadata: map[string][]byte{
+ s3_constants.AmzServerSideEncryption: []byte("aws:kms"),
+ s3_constants.AmzEncryptedDataKey: []byte("encrypted-key"),
+ },
+ expected: true,
+ },
+ {
+ name: "SSE-KMS algorithm only",
+ metadata: map[string][]byte{
+ s3_constants.AmzServerSideEncryption: []byte("aws:kms"),
+ },
+ expected: true,
+ },
+ {
+ name: "SSE-KMS encrypted data key only",
+ metadata: map[string][]byte{
+ s3_constants.AmzEncryptedDataKey: []byte("encrypted-key"),
+ },
+ expected: false, // Only encrypted data key without algorithm header should not be considered SSE-KMS
+ },
+ {
+ name: "Other encryption type (SSE-C)",
+ metadata: map[string][]byte{
+ s3_constants.AmzServerSideEncryptionCustomerAlgorithm: []byte("AES256"),
+ },
+ expected: false,
+ },
+ {
+ name: "SSE-S3 (AES256)",
+ metadata: map[string][]byte{
+ s3_constants.AmzServerSideEncryption: []byte("AES256"),
+ },
+ expected: false,
+ },
+ }
+
+ for _, tc := range testCases {
+ t.Run(tc.name, func(t *testing.T) {
+ result := IsSSEKMSEncrypted(tc.metadata)
+ if result != tc.expected {
+ t.Errorf("Expected %v, got %v", tc.expected, result)
+ }
+ })
+ }
+}
+
+// TestSSETypeDiscrimination tests that SSE types don't interfere with each other
+func TestSSETypeDiscrimination(t *testing.T) {
+ // Test SSE-C headers don't trigger SSE-KMS detection
+ t.Run("SSE-C headers don't trigger SSE-KMS", func(t *testing.T) {
+ req := CreateTestHTTPRequest("PUT", "/bucket/object", nil)
+ keyPair := GenerateTestSSECKey(1)
+ SetupTestSSECHeaders(req, keyPair)
+
+ // Should detect SSE-C, not SSE-KMS
+ if !IsSSECRequest(req) {
+ t.Error("Should detect SSE-C request")
+ }
+ if IsSSEKMSRequest(req) {
+ t.Error("Should not detect SSE-KMS request for SSE-C headers")
+ }
+ })
+
+ // Test SSE-KMS headers don't trigger SSE-C detection
+ t.Run("SSE-KMS headers don't trigger SSE-C", func(t *testing.T) {
+ req := CreateTestHTTPRequest("PUT", "/bucket/object", nil)
+ SetupTestSSEKMSHeaders(req, "test-key-id")
+
+ // Should detect SSE-KMS, not SSE-C
+ if IsSSECRequest(req) {
+ t.Error("Should not detect SSE-C request for SSE-KMS headers")
+ }
+ if !IsSSEKMSRequest(req) {
+ t.Error("Should detect SSE-KMS request")
+ }
+ })
+
+ // Test metadata discrimination
+ t.Run("Metadata type discrimination", func(t *testing.T) {
+ ssecMetadata := CreateTestMetadataWithSSEC(GenerateTestSSECKey(1))
+
+ // Should detect as SSE-C, not SSE-KMS
+ if !IsSSECEncrypted(ssecMetadata) {
+ t.Error("Should detect SSE-C encrypted metadata")
+ }
+ if IsSSEKMSEncrypted(ssecMetadata) {
+ t.Error("Should not detect SSE-KMS for SSE-C metadata")
+ }
+ })
+}
+
+// TestSSECParseCorruptedMetadata tests handling of corrupted SSE-C metadata
+func TestSSECParseCorruptedMetadata(t *testing.T) {
+ testCases := []struct {
+ name string
+ metadata map[string][]byte
+ expectError bool
+ errorMessage string
+ }{
+ {
+ name: "Missing algorithm",
+ metadata: map[string][]byte{
+ s3_constants.AmzServerSideEncryptionCustomerKeyMD5: []byte("valid-md5"),
+ },
+ expectError: false, // Detection should still work with partial metadata
+ },
+ {
+ name: "Invalid key MD5 format",
+ metadata: map[string][]byte{
+ s3_constants.AmzServerSideEncryptionCustomerAlgorithm: []byte("AES256"),
+ s3_constants.AmzServerSideEncryptionCustomerKeyMD5: []byte("invalid-base64!"),
+ },
+ expectError: false, // Detection should work, validation happens later
+ },
+ {
+ name: "Empty values",
+ metadata: map[string][]byte{
+ s3_constants.AmzServerSideEncryptionCustomerAlgorithm: []byte(""),
+ s3_constants.AmzServerSideEncryptionCustomerKeyMD5: []byte(""),
+ },
+ expectError: false,
+ },
+ }
+
+ for _, tc := range testCases {
+ t.Run(tc.name, func(t *testing.T) {
+ // Test that detection doesn't panic on corrupted metadata
+ result := IsSSECEncrypted(tc.metadata)
+ // The detection should be robust and not crash
+ t.Logf("Detection result for %s: %v", tc.name, result)
+ })
+ }
+}
+
+// TestSSEKMSParseCorruptedMetadata tests handling of corrupted SSE-KMS metadata
+func TestSSEKMSParseCorruptedMetadata(t *testing.T) {
+ testCases := []struct {
+ name string
+ metadata map[string][]byte
+ }{
+ {
+ name: "Invalid encrypted data key",
+ metadata: map[string][]byte{
+ s3_constants.AmzServerSideEncryption: []byte("aws:kms"),
+ s3_constants.AmzEncryptedDataKey: []byte("invalid-base64!"),
+ },
+ },
+ {
+ name: "Invalid encryption context",
+ metadata: map[string][]byte{
+ s3_constants.AmzServerSideEncryption: []byte("aws:kms"),
+ s3_constants.AmzEncryptionContextMeta: []byte("invalid-json"),
+ },
+ },
+ {
+ name: "Empty values",
+ metadata: map[string][]byte{
+ s3_constants.AmzServerSideEncryption: []byte(""),
+ s3_constants.AmzEncryptedDataKey: []byte(""),
+ },
+ },
+ }
+
+ for _, tc := range testCases {
+ t.Run(tc.name, func(t *testing.T) {
+ // Test that detection doesn't panic on corrupted metadata
+ result := IsSSEKMSEncrypted(tc.metadata)
+ t.Logf("Detection result for %s: %v", tc.name, result)
+ })
+ }
+}
+
+// TestSSEMetadataDeserialization tests SSE-KMS metadata deserialization with various inputs
+func TestSSEMetadataDeserialization(t *testing.T) {
+ testCases := []struct {
+ name string
+ data []byte
+ expectError bool
+ }{
+ {
+ name: "Empty data",
+ data: []byte{},
+ expectError: true,
+ },
+ {
+ name: "Invalid JSON",
+ data: []byte("invalid-json"),
+ expectError: true,
+ },
+ {
+ name: "Valid JSON but wrong structure",
+ data: []byte(`{"wrong": "structure"}`),
+ expectError: false, // Our deserialization might be lenient
+ },
+ {
+ name: "Null data",
+ data: nil,
+ expectError: true,
+ },
+ }
+
+ for _, tc := range testCases {
+ t.Run(tc.name, func(t *testing.T) {
+ _, err := DeserializeSSEKMSMetadata(tc.data)
+ if tc.expectError && err == nil {
+ t.Error("Expected error but got none")
+ }
+ if !tc.expectError && err != nil {
+ t.Errorf("Expected no error but got: %v", err)
+ }
+ })
+ }
+}
+
+// TestGeneralSSEDetection tests the general SSE detection that works across types
+func TestGeneralSSEDetection(t *testing.T) {
+ testCases := []struct {
+ name string
+ metadata map[string][]byte
+ expected bool
+ }{
+ {
+ name: "No encryption",
+ metadata: CreateTestMetadata(),
+ expected: false,
+ },
+ {
+ name: "SSE-C encrypted",
+ metadata: CreateTestMetadataWithSSEC(GenerateTestSSECKey(1)),
+ expected: true,
+ },
+ {
+ name: "SSE-KMS encrypted",
+ metadata: map[string][]byte{
+ s3_constants.AmzServerSideEncryption: []byte("aws:kms"),
+ },
+ expected: true,
+ },
+ {
+ name: "SSE-S3 encrypted",
+ metadata: map[string][]byte{
+ s3_constants.AmzServerSideEncryption: []byte("AES256"),
+ },
+ expected: true,
+ },
+ }
+
+ for _, tc := range testCases {
+ t.Run(tc.name, func(t *testing.T) {
+ result := IsAnySSEEncrypted(tc.metadata)
+ if result != tc.expected {
+ t.Errorf("Expected %v, got %v", tc.expected, result)
+ }
+ })
+ }
+}