diff options
Diffstat (limited to 'weed/mq/kafka/schema/integration_test.go')
| -rw-r--r-- | weed/mq/kafka/schema/integration_test.go | 643 |
1 files changed, 643 insertions, 0 deletions
diff --git a/weed/mq/kafka/schema/integration_test.go b/weed/mq/kafka/schema/integration_test.go new file mode 100644 index 000000000..5677131c1 --- /dev/null +++ b/weed/mq/kafka/schema/integration_test.go @@ -0,0 +1,643 @@ +package schema + +import ( + "encoding/json" + "net/http" + "net/http/httptest" + "testing" + "time" + + "github.com/linkedin/goavro/v2" +) + +// TestFullIntegration_AvroWorkflow tests the complete Avro workflow +func TestFullIntegration_AvroWorkflow(t *testing.T) { + // Create comprehensive mock schema registry + server := createMockSchemaRegistry(t) + defer server.Close() + + // Create manager with realistic configuration + config := ManagerConfig{ + RegistryURL: server.URL, + ValidationMode: ValidationPermissive, + EnableMirroring: false, + CacheTTL: "5m", + } + + manager, err := NewManager(config) + if err != nil { + t.Fatalf("Failed to create manager: %v", err) + } + + // Test 1: Producer workflow - encode schematized message + t.Run("Producer_Workflow", func(t *testing.T) { + // Create realistic user data (with proper Avro union handling) + userData := map[string]interface{}{ + "id": int32(12345), + "name": "Alice Johnson", + "email": map[string]interface{}{"string": "alice@example.com"}, // Avro union + "age": map[string]interface{}{"int": int32(28)}, // Avro union + "preferences": map[string]interface{}{ + "Preferences": map[string]interface{}{ // Avro union with record type + "notifications": true, + "theme": "dark", + }, + }, + } + + // Create Avro message (simulate what a Kafka producer would send) + avroSchema := getUserAvroSchema() + codec, err := goavro.NewCodec(avroSchema) + if err != nil { + t.Fatalf("Failed to create Avro codec: %v", err) + } + + avroBinary, err := codec.BinaryFromNative(nil, userData) + if err != nil { + t.Fatalf("Failed to encode Avro data: %v", err) + } + + // Create Confluent envelope (what Kafka Gateway receives) + confluentMsg := CreateConfluentEnvelope(FormatAvro, 1, nil, avroBinary) + + // Decode message (Produce path processing) + decodedMsg, err := manager.DecodeMessage(confluentMsg) + if err != nil { + t.Fatalf("Failed to decode message: %v", err) + } + + // Verify decoded data + if decodedMsg.SchemaID != 1 { + t.Errorf("Expected schema ID 1, got %d", decodedMsg.SchemaID) + } + + if decodedMsg.SchemaFormat != FormatAvro { + t.Errorf("Expected Avro format, got %v", decodedMsg.SchemaFormat) + } + + // Verify field values + fields := decodedMsg.RecordValue.Fields + if fields["id"].GetInt32Value() != 12345 { + t.Errorf("Expected id=12345, got %v", fields["id"].GetInt32Value()) + } + + if fields["name"].GetStringValue() != "Alice Johnson" { + t.Errorf("Expected name='Alice Johnson', got %v", fields["name"].GetStringValue()) + } + + t.Logf("Successfully processed producer message with %d fields", len(fields)) + }) + + // Test 2: Consumer workflow - reconstruct original message + t.Run("Consumer_Workflow", func(t *testing.T) { + // Create test RecordValue (simulate what's stored in SeaweedMQ) + testData := map[string]interface{}{ + "id": int32(67890), + "name": "Bob Smith", + "email": map[string]interface{}{"string": "bob@example.com"}, + "age": map[string]interface{}{"int": int32(35)}, // Avro union + } + recordValue := MapToRecordValue(testData) + + // Reconstruct message (Fetch path processing) + reconstructedMsg, err := manager.EncodeMessage(recordValue, 1, FormatAvro) + if err != nil { + t.Fatalf("Failed to reconstruct message: %v", err) + } + + // Verify reconstructed message can be parsed + envelope, ok := ParseConfluentEnvelope(reconstructedMsg) + if !ok { + t.Fatal("Failed to parse reconstructed envelope") + } + + if envelope.SchemaID != 1 { + t.Errorf("Expected schema ID 1, got %d", envelope.SchemaID) + } + + // Verify the payload can be decoded by Avro + avroSchema := getUserAvroSchema() + codec, err := goavro.NewCodec(avroSchema) + if err != nil { + t.Fatalf("Failed to create Avro codec: %v", err) + } + + decodedData, _, err := codec.NativeFromBinary(envelope.Payload) + if err != nil { + t.Fatalf("Failed to decode reconstructed Avro data: %v", err) + } + + // Verify data integrity + decodedMap := decodedData.(map[string]interface{}) + if decodedMap["id"] != int32(67890) { + t.Errorf("Expected id=67890, got %v", decodedMap["id"]) + } + + if decodedMap["name"] != "Bob Smith" { + t.Errorf("Expected name='Bob Smith', got %v", decodedMap["name"]) + } + + t.Logf("Successfully reconstructed consumer message: %d bytes", len(reconstructedMsg)) + }) + + // Test 3: Round-trip integrity + t.Run("Round_Trip_Integrity", func(t *testing.T) { + originalData := map[string]interface{}{ + "id": int32(99999), + "name": "Charlie Brown", + "email": map[string]interface{}{"string": "charlie@example.com"}, + "age": map[string]interface{}{"int": int32(42)}, // Avro union + "preferences": map[string]interface{}{ + "Preferences": map[string]interface{}{ // Avro union with record type + "notifications": true, + "theme": "dark", + }, + }, + } + + // Encode -> Decode -> Encode -> Decode + avroSchema := getUserAvroSchema() + codec, _ := goavro.NewCodec(avroSchema) + + // Step 1: Original -> Confluent + avroBinary, _ := codec.BinaryFromNative(nil, originalData) + confluentMsg := CreateConfluentEnvelope(FormatAvro, 1, nil, avroBinary) + + // Step 2: Confluent -> RecordValue + decodedMsg, _ := manager.DecodeMessage(confluentMsg) + + // Step 3: RecordValue -> Confluent + reconstructedMsg, encodeErr := manager.EncodeMessage(decodedMsg.RecordValue, 1, FormatAvro) + if encodeErr != nil { + t.Fatalf("Failed to encode message: %v", encodeErr) + } + + // Verify the reconstructed message is valid + if len(reconstructedMsg) == 0 { + t.Fatal("Reconstructed message is empty") + } + + // Step 4: Confluent -> Verify + finalDecodedMsg, err := manager.DecodeMessage(reconstructedMsg) + if err != nil { + // Debug: Check if the reconstructed message is properly formatted + envelope, ok := ParseConfluentEnvelope(reconstructedMsg) + if !ok { + t.Fatalf("Round-trip failed: reconstructed message is not a valid Confluent envelope") + } + t.Logf("Debug: Envelope SchemaID=%d, Format=%v, PayloadLen=%d", + envelope.SchemaID, envelope.Format, len(envelope.Payload)) + t.Fatalf("Round-trip failed: %v", err) + } + + // Verify data integrity through complete round-trip + finalFields := finalDecodedMsg.RecordValue.Fields + if finalFields["id"].GetInt32Value() != 99999 { + t.Error("Round-trip failed for id field") + } + + if finalFields["name"].GetStringValue() != "Charlie Brown" { + t.Error("Round-trip failed for name field") + } + + t.Log("Round-trip integrity test passed") + }) +} + +// TestFullIntegration_MultiFormatSupport tests all schema formats together +func TestFullIntegration_MultiFormatSupport(t *testing.T) { + server := createMockSchemaRegistry(t) + defer server.Close() + + config := ManagerConfig{ + RegistryURL: server.URL, + ValidationMode: ValidationPermissive, + } + + manager, err := NewManager(config) + if err != nil { + t.Fatalf("Failed to create manager: %v", err) + } + + testCases := []struct { + name string + format Format + schemaID uint32 + testData interface{} + }{ + { + name: "Avro_Format", + format: FormatAvro, + schemaID: 1, + testData: map[string]interface{}{ + "id": int32(123), + "name": "Avro User", + }, + }, + { + name: "JSON_Schema_Format", + format: FormatJSONSchema, + schemaID: 3, + testData: map[string]interface{}{ + "id": float64(456), // JSON numbers are float64 + "name": "JSON User", + "active": true, + }, + }, + } + + for _, tc := range testCases { + t.Run(tc.name, func(t *testing.T) { + // Create RecordValue from test data + recordValue := MapToRecordValue(tc.testData.(map[string]interface{})) + + // Test encoding + encoded, err := manager.EncodeMessage(recordValue, tc.schemaID, tc.format) + if err != nil { + if tc.format == FormatProtobuf { + // Protobuf encoding may fail due to incomplete implementation + t.Skipf("Protobuf encoding not fully implemented: %v", err) + } else { + t.Fatalf("Failed to encode %s message: %v", tc.name, err) + } + } + + // Test decoding + decoded, err := manager.DecodeMessage(encoded) + if err != nil { + t.Fatalf("Failed to decode %s message: %v", tc.name, err) + } + + // Verify format + if decoded.SchemaFormat != tc.format { + t.Errorf("Expected format %v, got %v", tc.format, decoded.SchemaFormat) + } + + // Verify schema ID + if decoded.SchemaID != tc.schemaID { + t.Errorf("Expected schema ID %d, got %d", tc.schemaID, decoded.SchemaID) + } + + t.Logf("Successfully processed %s format", tc.name) + }) + } +} + +// TestIntegration_CachePerformance tests caching behavior under load +func TestIntegration_CachePerformance(t *testing.T) { + server := createMockSchemaRegistry(t) + defer server.Close() + + config := ManagerConfig{ + RegistryURL: server.URL, + ValidationMode: ValidationPermissive, + } + + manager, err := NewManager(config) + if err != nil { + t.Fatalf("Failed to create manager: %v", err) + } + + // Create test message + testData := map[string]interface{}{ + "id": int32(1), + "name": "Cache Test", + } + + avroSchema := getUserAvroSchema() + codec, _ := goavro.NewCodec(avroSchema) + avroBinary, _ := codec.BinaryFromNative(nil, testData) + testMsg := CreateConfluentEnvelope(FormatAvro, 1, nil, avroBinary) + + // First decode (should hit registry) + start := time.Now() + _, err = manager.DecodeMessage(testMsg) + if err != nil { + t.Fatalf("First decode failed: %v", err) + } + firstDuration := time.Since(start) + + // Subsequent decodes (should hit cache) + start = time.Now() + for i := 0; i < 100; i++ { + _, err = manager.DecodeMessage(testMsg) + if err != nil { + t.Fatalf("Cached decode failed: %v", err) + } + } + cachedDuration := time.Since(start) + + // Verify cache performance improvement + avgCachedTime := cachedDuration / 100 + if avgCachedTime >= firstDuration { + t.Logf("Warning: Cache may not be effective. First: %v, Avg Cached: %v", + firstDuration, avgCachedTime) + } + + // Check cache stats + decoders, schemas, subjects := manager.GetCacheStats() + if decoders == 0 || schemas == 0 { + t.Error("Expected non-zero cache stats") + } + + t.Logf("Cache performance: First decode: %v, Average cached: %v", + firstDuration, avgCachedTime) + t.Logf("Cache stats: %d decoders, %d schemas, %d subjects", + decoders, schemas, subjects) +} + +// TestIntegration_ErrorHandling tests error scenarios +func TestIntegration_ErrorHandling(t *testing.T) { + server := createMockSchemaRegistry(t) + defer server.Close() + + config := ManagerConfig{ + RegistryURL: server.URL, + ValidationMode: ValidationStrict, + } + + manager, err := NewManager(config) + if err != nil { + t.Fatalf("Failed to create manager: %v", err) + } + + testCases := []struct { + name string + message []byte + expectError bool + errorType string + }{ + { + name: "Non_Schematized_Message", + message: []byte("plain text message"), + expectError: true, + errorType: "not schematized", + }, + { + name: "Invalid_Schema_ID", + message: CreateConfluentEnvelope(FormatAvro, 999, nil, []byte("payload")), + expectError: true, + errorType: "schema not found", + }, + { + name: "Empty_Payload", + message: CreateConfluentEnvelope(FormatAvro, 1, nil, []byte{}), + expectError: true, + errorType: "empty payload", + }, + { + name: "Corrupted_Avro_Data", + message: CreateConfluentEnvelope(FormatAvro, 1, nil, []byte("invalid avro")), + expectError: true, + errorType: "decode failed", + }, + } + + for _, tc := range testCases { + t.Run(tc.name, func(t *testing.T) { + _, err := manager.DecodeMessage(tc.message) + + if (err != nil) != tc.expectError { + t.Errorf("Expected error: %v, got error: %v", tc.expectError, err != nil) + } + + if tc.expectError && err != nil { + t.Logf("Expected error occurred: %v", err) + } + }) + } +} + +// TestIntegration_SchemaEvolution tests schema evolution scenarios +func TestIntegration_SchemaEvolution(t *testing.T) { + server := createMockSchemaRegistryWithEvolution(t) + defer server.Close() + + config := ManagerConfig{ + RegistryURL: server.URL, + ValidationMode: ValidationPermissive, + } + + manager, err := NewManager(config) + if err != nil { + t.Fatalf("Failed to create manager: %v", err) + } + + // Test decoding messages with different schema versions + t.Run("Schema_V1_Message", func(t *testing.T) { + // Create message with schema v1 (basic user) + userData := map[string]interface{}{ + "id": int32(1), + "name": "User V1", + } + + avroSchema := getUserAvroSchemaV1() + codec, _ := goavro.NewCodec(avroSchema) + avroBinary, _ := codec.BinaryFromNative(nil, userData) + msg := CreateConfluentEnvelope(FormatAvro, 1, nil, avroBinary) + + decoded, err := manager.DecodeMessage(msg) + if err != nil { + t.Fatalf("Failed to decode v1 message: %v", err) + } + + if decoded.Version != 1 { + t.Errorf("Expected version 1, got %d", decoded.Version) + } + }) + + t.Run("Schema_V2_Message", func(t *testing.T) { + // Create message with schema v2 (user with email) + userData := map[string]interface{}{ + "id": int32(2), + "name": "User V2", + "email": map[string]interface{}{"string": "user@example.com"}, + } + + avroSchema := getUserAvroSchemaV2() + codec, _ := goavro.NewCodec(avroSchema) + avroBinary, _ := codec.BinaryFromNative(nil, userData) + msg := CreateConfluentEnvelope(FormatAvro, 2, nil, avroBinary) + + decoded, err := manager.DecodeMessage(msg) + if err != nil { + t.Fatalf("Failed to decode v2 message: %v", err) + } + + if decoded.Version != 2 { + t.Errorf("Expected version 2, got %d", decoded.Version) + } + }) +} + +// Helper functions for creating mock schema registries + +func createMockSchemaRegistry(t *testing.T) *httptest.Server { + return httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + switch r.URL.Path { + case "/subjects": + // List subjects + subjects := []string{"user-value", "product-value", "order-value"} + json.NewEncoder(w).Encode(subjects) + + case "/schemas/ids/1": + // Avro user schema + response := map[string]interface{}{ + "schema": getUserAvroSchema(), + "subject": "user-value", + "version": 1, + } + json.NewEncoder(w).Encode(response) + + case "/schemas/ids/2": + // Protobuf schema (simplified) + response := map[string]interface{}{ + "schema": "syntax = \"proto3\"; message User { int32 id = 1; string name = 2; }", + "subject": "user-value", + "version": 2, + } + json.NewEncoder(w).Encode(response) + + case "/schemas/ids/3": + // JSON Schema + response := map[string]interface{}{ + "schema": getUserJSONSchema(), + "subject": "user-value", + "version": 3, + } + json.NewEncoder(w).Encode(response) + + default: + w.WriteHeader(http.StatusNotFound) + } + })) +} + +func createMockSchemaRegistryWithEvolution(t *testing.T) *httptest.Server { + return httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + switch r.URL.Path { + case "/schemas/ids/1": + // Schema v1 + response := map[string]interface{}{ + "schema": getUserAvroSchemaV1(), + "subject": "user-value", + "version": 1, + } + json.NewEncoder(w).Encode(response) + + case "/schemas/ids/2": + // Schema v2 (evolved) + response := map[string]interface{}{ + "schema": getUserAvroSchemaV2(), + "subject": "user-value", + "version": 2, + } + json.NewEncoder(w).Encode(response) + + default: + w.WriteHeader(http.StatusNotFound) + } + })) +} + +// Schema definitions for testing + +func getUserAvroSchema() string { + return `{ + "type": "record", + "name": "User", + "fields": [ + {"name": "id", "type": "int"}, + {"name": "name", "type": "string"}, + {"name": "email", "type": ["null", "string"], "default": null}, + {"name": "age", "type": ["null", "int"], "default": null}, + {"name": "preferences", "type": ["null", { + "type": "record", + "name": "Preferences", + "fields": [ + {"name": "notifications", "type": "boolean", "default": true}, + {"name": "theme", "type": "string", "default": "light"} + ] + }], "default": null} + ] + }` +} + +func getUserAvroSchemaV1() string { + return `{ + "type": "record", + "name": "User", + "fields": [ + {"name": "id", "type": "int"}, + {"name": "name", "type": "string"} + ] + }` +} + +func getUserAvroSchemaV2() string { + return `{ + "type": "record", + "name": "User", + "fields": [ + {"name": "id", "type": "int"}, + {"name": "name", "type": "string"}, + {"name": "email", "type": ["null", "string"], "default": null} + ] + }` +} + +func getUserJSONSchema() string { + return `{ + "$schema": "http://json-schema.org/draft-07/schema#", + "type": "object", + "properties": { + "id": {"type": "integer"}, + "name": {"type": "string"}, + "active": {"type": "boolean"} + }, + "required": ["id", "name"] + }` +} + +// Benchmark tests for integration scenarios + +func BenchmarkIntegration_AvroDecoding(b *testing.B) { + server := createMockSchemaRegistry(nil) + defer server.Close() + + config := ManagerConfig{RegistryURL: server.URL} + manager, _ := NewManager(config) + + // Create test message + testData := map[string]interface{}{ + "id": int32(1), + "name": "Benchmark User", + } + + avroSchema := getUserAvroSchema() + codec, _ := goavro.NewCodec(avroSchema) + avroBinary, _ := codec.BinaryFromNative(nil, testData) + testMsg := CreateConfluentEnvelope(FormatAvro, 1, nil, avroBinary) + + b.ResetTimer() + for i := 0; i < b.N; i++ { + _, _ = manager.DecodeMessage(testMsg) + } +} + +func BenchmarkIntegration_JSONSchemaDecoding(b *testing.B) { + server := createMockSchemaRegistry(nil) + defer server.Close() + + config := ManagerConfig{RegistryURL: server.URL} + manager, _ := NewManager(config) + + // Create test message + jsonData := []byte(`{"id": 1, "name": "Benchmark User", "active": true}`) + testMsg := CreateConfluentEnvelope(FormatJSONSchema, 3, nil, jsonData) + + b.ResetTimer() + for i := 0; i < b.N; i++ { + _, _ = manager.DecodeMessage(testMsg) + } +} |
