aboutsummaryrefslogtreecommitdiff
path: root/weed/mq/kafka/schema/integration_test.go
diff options
context:
space:
mode:
Diffstat (limited to 'weed/mq/kafka/schema/integration_test.go')
-rw-r--r--weed/mq/kafka/schema/integration_test.go643
1 files changed, 643 insertions, 0 deletions
diff --git a/weed/mq/kafka/schema/integration_test.go b/weed/mq/kafka/schema/integration_test.go
new file mode 100644
index 000000000..5677131c1
--- /dev/null
+++ b/weed/mq/kafka/schema/integration_test.go
@@ -0,0 +1,643 @@
+package schema
+
+import (
+ "encoding/json"
+ "net/http"
+ "net/http/httptest"
+ "testing"
+ "time"
+
+ "github.com/linkedin/goavro/v2"
+)
+
+// TestFullIntegration_AvroWorkflow tests the complete Avro workflow
+func TestFullIntegration_AvroWorkflow(t *testing.T) {
+ // Create comprehensive mock schema registry
+ server := createMockSchemaRegistry(t)
+ defer server.Close()
+
+ // Create manager with realistic configuration
+ config := ManagerConfig{
+ RegistryURL: server.URL,
+ ValidationMode: ValidationPermissive,
+ EnableMirroring: false,
+ CacheTTL: "5m",
+ }
+
+ manager, err := NewManager(config)
+ if err != nil {
+ t.Fatalf("Failed to create manager: %v", err)
+ }
+
+ // Test 1: Producer workflow - encode schematized message
+ t.Run("Producer_Workflow", func(t *testing.T) {
+ // Create realistic user data (with proper Avro union handling)
+ userData := map[string]interface{}{
+ "id": int32(12345),
+ "name": "Alice Johnson",
+ "email": map[string]interface{}{"string": "alice@example.com"}, // Avro union
+ "age": map[string]interface{}{"int": int32(28)}, // Avro union
+ "preferences": map[string]interface{}{
+ "Preferences": map[string]interface{}{ // Avro union with record type
+ "notifications": true,
+ "theme": "dark",
+ },
+ },
+ }
+
+ // Create Avro message (simulate what a Kafka producer would send)
+ avroSchema := getUserAvroSchema()
+ codec, err := goavro.NewCodec(avroSchema)
+ if err != nil {
+ t.Fatalf("Failed to create Avro codec: %v", err)
+ }
+
+ avroBinary, err := codec.BinaryFromNative(nil, userData)
+ if err != nil {
+ t.Fatalf("Failed to encode Avro data: %v", err)
+ }
+
+ // Create Confluent envelope (what Kafka Gateway receives)
+ confluentMsg := CreateConfluentEnvelope(FormatAvro, 1, nil, avroBinary)
+
+ // Decode message (Produce path processing)
+ decodedMsg, err := manager.DecodeMessage(confluentMsg)
+ if err != nil {
+ t.Fatalf("Failed to decode message: %v", err)
+ }
+
+ // Verify decoded data
+ if decodedMsg.SchemaID != 1 {
+ t.Errorf("Expected schema ID 1, got %d", decodedMsg.SchemaID)
+ }
+
+ if decodedMsg.SchemaFormat != FormatAvro {
+ t.Errorf("Expected Avro format, got %v", decodedMsg.SchemaFormat)
+ }
+
+ // Verify field values
+ fields := decodedMsg.RecordValue.Fields
+ if fields["id"].GetInt32Value() != 12345 {
+ t.Errorf("Expected id=12345, got %v", fields["id"].GetInt32Value())
+ }
+
+ if fields["name"].GetStringValue() != "Alice Johnson" {
+ t.Errorf("Expected name='Alice Johnson', got %v", fields["name"].GetStringValue())
+ }
+
+ t.Logf("Successfully processed producer message with %d fields", len(fields))
+ })
+
+ // Test 2: Consumer workflow - reconstruct original message
+ t.Run("Consumer_Workflow", func(t *testing.T) {
+ // Create test RecordValue (simulate what's stored in SeaweedMQ)
+ testData := map[string]interface{}{
+ "id": int32(67890),
+ "name": "Bob Smith",
+ "email": map[string]interface{}{"string": "bob@example.com"},
+ "age": map[string]interface{}{"int": int32(35)}, // Avro union
+ }
+ recordValue := MapToRecordValue(testData)
+
+ // Reconstruct message (Fetch path processing)
+ reconstructedMsg, err := manager.EncodeMessage(recordValue, 1, FormatAvro)
+ if err != nil {
+ t.Fatalf("Failed to reconstruct message: %v", err)
+ }
+
+ // Verify reconstructed message can be parsed
+ envelope, ok := ParseConfluentEnvelope(reconstructedMsg)
+ if !ok {
+ t.Fatal("Failed to parse reconstructed envelope")
+ }
+
+ if envelope.SchemaID != 1 {
+ t.Errorf("Expected schema ID 1, got %d", envelope.SchemaID)
+ }
+
+ // Verify the payload can be decoded by Avro
+ avroSchema := getUserAvroSchema()
+ codec, err := goavro.NewCodec(avroSchema)
+ if err != nil {
+ t.Fatalf("Failed to create Avro codec: %v", err)
+ }
+
+ decodedData, _, err := codec.NativeFromBinary(envelope.Payload)
+ if err != nil {
+ t.Fatalf("Failed to decode reconstructed Avro data: %v", err)
+ }
+
+ // Verify data integrity
+ decodedMap := decodedData.(map[string]interface{})
+ if decodedMap["id"] != int32(67890) {
+ t.Errorf("Expected id=67890, got %v", decodedMap["id"])
+ }
+
+ if decodedMap["name"] != "Bob Smith" {
+ t.Errorf("Expected name='Bob Smith', got %v", decodedMap["name"])
+ }
+
+ t.Logf("Successfully reconstructed consumer message: %d bytes", len(reconstructedMsg))
+ })
+
+ // Test 3: Round-trip integrity
+ t.Run("Round_Trip_Integrity", func(t *testing.T) {
+ originalData := map[string]interface{}{
+ "id": int32(99999),
+ "name": "Charlie Brown",
+ "email": map[string]interface{}{"string": "charlie@example.com"},
+ "age": map[string]interface{}{"int": int32(42)}, // Avro union
+ "preferences": map[string]interface{}{
+ "Preferences": map[string]interface{}{ // Avro union with record type
+ "notifications": true,
+ "theme": "dark",
+ },
+ },
+ }
+
+ // Encode -> Decode -> Encode -> Decode
+ avroSchema := getUserAvroSchema()
+ codec, _ := goavro.NewCodec(avroSchema)
+
+ // Step 1: Original -> Confluent
+ avroBinary, _ := codec.BinaryFromNative(nil, originalData)
+ confluentMsg := CreateConfluentEnvelope(FormatAvro, 1, nil, avroBinary)
+
+ // Step 2: Confluent -> RecordValue
+ decodedMsg, _ := manager.DecodeMessage(confluentMsg)
+
+ // Step 3: RecordValue -> Confluent
+ reconstructedMsg, encodeErr := manager.EncodeMessage(decodedMsg.RecordValue, 1, FormatAvro)
+ if encodeErr != nil {
+ t.Fatalf("Failed to encode message: %v", encodeErr)
+ }
+
+ // Verify the reconstructed message is valid
+ if len(reconstructedMsg) == 0 {
+ t.Fatal("Reconstructed message is empty")
+ }
+
+ // Step 4: Confluent -> Verify
+ finalDecodedMsg, err := manager.DecodeMessage(reconstructedMsg)
+ if err != nil {
+ // Debug: Check if the reconstructed message is properly formatted
+ envelope, ok := ParseConfluentEnvelope(reconstructedMsg)
+ if !ok {
+ t.Fatalf("Round-trip failed: reconstructed message is not a valid Confluent envelope")
+ }
+ t.Logf("Debug: Envelope SchemaID=%d, Format=%v, PayloadLen=%d",
+ envelope.SchemaID, envelope.Format, len(envelope.Payload))
+ t.Fatalf("Round-trip failed: %v", err)
+ }
+
+ // Verify data integrity through complete round-trip
+ finalFields := finalDecodedMsg.RecordValue.Fields
+ if finalFields["id"].GetInt32Value() != 99999 {
+ t.Error("Round-trip failed for id field")
+ }
+
+ if finalFields["name"].GetStringValue() != "Charlie Brown" {
+ t.Error("Round-trip failed for name field")
+ }
+
+ t.Log("Round-trip integrity test passed")
+ })
+}
+
+// TestFullIntegration_MultiFormatSupport tests all schema formats together
+func TestFullIntegration_MultiFormatSupport(t *testing.T) {
+ server := createMockSchemaRegistry(t)
+ defer server.Close()
+
+ config := ManagerConfig{
+ RegistryURL: server.URL,
+ ValidationMode: ValidationPermissive,
+ }
+
+ manager, err := NewManager(config)
+ if err != nil {
+ t.Fatalf("Failed to create manager: %v", err)
+ }
+
+ testCases := []struct {
+ name string
+ format Format
+ schemaID uint32
+ testData interface{}
+ }{
+ {
+ name: "Avro_Format",
+ format: FormatAvro,
+ schemaID: 1,
+ testData: map[string]interface{}{
+ "id": int32(123),
+ "name": "Avro User",
+ },
+ },
+ {
+ name: "JSON_Schema_Format",
+ format: FormatJSONSchema,
+ schemaID: 3,
+ testData: map[string]interface{}{
+ "id": float64(456), // JSON numbers are float64
+ "name": "JSON User",
+ "active": true,
+ },
+ },
+ }
+
+ for _, tc := range testCases {
+ t.Run(tc.name, func(t *testing.T) {
+ // Create RecordValue from test data
+ recordValue := MapToRecordValue(tc.testData.(map[string]interface{}))
+
+ // Test encoding
+ encoded, err := manager.EncodeMessage(recordValue, tc.schemaID, tc.format)
+ if err != nil {
+ if tc.format == FormatProtobuf {
+ // Protobuf encoding may fail due to incomplete implementation
+ t.Skipf("Protobuf encoding not fully implemented: %v", err)
+ } else {
+ t.Fatalf("Failed to encode %s message: %v", tc.name, err)
+ }
+ }
+
+ // Test decoding
+ decoded, err := manager.DecodeMessage(encoded)
+ if err != nil {
+ t.Fatalf("Failed to decode %s message: %v", tc.name, err)
+ }
+
+ // Verify format
+ if decoded.SchemaFormat != tc.format {
+ t.Errorf("Expected format %v, got %v", tc.format, decoded.SchemaFormat)
+ }
+
+ // Verify schema ID
+ if decoded.SchemaID != tc.schemaID {
+ t.Errorf("Expected schema ID %d, got %d", tc.schemaID, decoded.SchemaID)
+ }
+
+ t.Logf("Successfully processed %s format", tc.name)
+ })
+ }
+}
+
+// TestIntegration_CachePerformance tests caching behavior under load
+func TestIntegration_CachePerformance(t *testing.T) {
+ server := createMockSchemaRegistry(t)
+ defer server.Close()
+
+ config := ManagerConfig{
+ RegistryURL: server.URL,
+ ValidationMode: ValidationPermissive,
+ }
+
+ manager, err := NewManager(config)
+ if err != nil {
+ t.Fatalf("Failed to create manager: %v", err)
+ }
+
+ // Create test message
+ testData := map[string]interface{}{
+ "id": int32(1),
+ "name": "Cache Test",
+ }
+
+ avroSchema := getUserAvroSchema()
+ codec, _ := goavro.NewCodec(avroSchema)
+ avroBinary, _ := codec.BinaryFromNative(nil, testData)
+ testMsg := CreateConfluentEnvelope(FormatAvro, 1, nil, avroBinary)
+
+ // First decode (should hit registry)
+ start := time.Now()
+ _, err = manager.DecodeMessage(testMsg)
+ if err != nil {
+ t.Fatalf("First decode failed: %v", err)
+ }
+ firstDuration := time.Since(start)
+
+ // Subsequent decodes (should hit cache)
+ start = time.Now()
+ for i := 0; i < 100; i++ {
+ _, err = manager.DecodeMessage(testMsg)
+ if err != nil {
+ t.Fatalf("Cached decode failed: %v", err)
+ }
+ }
+ cachedDuration := time.Since(start)
+
+ // Verify cache performance improvement
+ avgCachedTime := cachedDuration / 100
+ if avgCachedTime >= firstDuration {
+ t.Logf("Warning: Cache may not be effective. First: %v, Avg Cached: %v",
+ firstDuration, avgCachedTime)
+ }
+
+ // Check cache stats
+ decoders, schemas, subjects := manager.GetCacheStats()
+ if decoders == 0 || schemas == 0 {
+ t.Error("Expected non-zero cache stats")
+ }
+
+ t.Logf("Cache performance: First decode: %v, Average cached: %v",
+ firstDuration, avgCachedTime)
+ t.Logf("Cache stats: %d decoders, %d schemas, %d subjects",
+ decoders, schemas, subjects)
+}
+
+// TestIntegration_ErrorHandling tests error scenarios
+func TestIntegration_ErrorHandling(t *testing.T) {
+ server := createMockSchemaRegistry(t)
+ defer server.Close()
+
+ config := ManagerConfig{
+ RegistryURL: server.URL,
+ ValidationMode: ValidationStrict,
+ }
+
+ manager, err := NewManager(config)
+ if err != nil {
+ t.Fatalf("Failed to create manager: %v", err)
+ }
+
+ testCases := []struct {
+ name string
+ message []byte
+ expectError bool
+ errorType string
+ }{
+ {
+ name: "Non_Schematized_Message",
+ message: []byte("plain text message"),
+ expectError: true,
+ errorType: "not schematized",
+ },
+ {
+ name: "Invalid_Schema_ID",
+ message: CreateConfluentEnvelope(FormatAvro, 999, nil, []byte("payload")),
+ expectError: true,
+ errorType: "schema not found",
+ },
+ {
+ name: "Empty_Payload",
+ message: CreateConfluentEnvelope(FormatAvro, 1, nil, []byte{}),
+ expectError: true,
+ errorType: "empty payload",
+ },
+ {
+ name: "Corrupted_Avro_Data",
+ message: CreateConfluentEnvelope(FormatAvro, 1, nil, []byte("invalid avro")),
+ expectError: true,
+ errorType: "decode failed",
+ },
+ }
+
+ for _, tc := range testCases {
+ t.Run(tc.name, func(t *testing.T) {
+ _, err := manager.DecodeMessage(tc.message)
+
+ if (err != nil) != tc.expectError {
+ t.Errorf("Expected error: %v, got error: %v", tc.expectError, err != nil)
+ }
+
+ if tc.expectError && err != nil {
+ t.Logf("Expected error occurred: %v", err)
+ }
+ })
+ }
+}
+
+// TestIntegration_SchemaEvolution tests schema evolution scenarios
+func TestIntegration_SchemaEvolution(t *testing.T) {
+ server := createMockSchemaRegistryWithEvolution(t)
+ defer server.Close()
+
+ config := ManagerConfig{
+ RegistryURL: server.URL,
+ ValidationMode: ValidationPermissive,
+ }
+
+ manager, err := NewManager(config)
+ if err != nil {
+ t.Fatalf("Failed to create manager: %v", err)
+ }
+
+ // Test decoding messages with different schema versions
+ t.Run("Schema_V1_Message", func(t *testing.T) {
+ // Create message with schema v1 (basic user)
+ userData := map[string]interface{}{
+ "id": int32(1),
+ "name": "User V1",
+ }
+
+ avroSchema := getUserAvroSchemaV1()
+ codec, _ := goavro.NewCodec(avroSchema)
+ avroBinary, _ := codec.BinaryFromNative(nil, userData)
+ msg := CreateConfluentEnvelope(FormatAvro, 1, nil, avroBinary)
+
+ decoded, err := manager.DecodeMessage(msg)
+ if err != nil {
+ t.Fatalf("Failed to decode v1 message: %v", err)
+ }
+
+ if decoded.Version != 1 {
+ t.Errorf("Expected version 1, got %d", decoded.Version)
+ }
+ })
+
+ t.Run("Schema_V2_Message", func(t *testing.T) {
+ // Create message with schema v2 (user with email)
+ userData := map[string]interface{}{
+ "id": int32(2),
+ "name": "User V2",
+ "email": map[string]interface{}{"string": "user@example.com"},
+ }
+
+ avroSchema := getUserAvroSchemaV2()
+ codec, _ := goavro.NewCodec(avroSchema)
+ avroBinary, _ := codec.BinaryFromNative(nil, userData)
+ msg := CreateConfluentEnvelope(FormatAvro, 2, nil, avroBinary)
+
+ decoded, err := manager.DecodeMessage(msg)
+ if err != nil {
+ t.Fatalf("Failed to decode v2 message: %v", err)
+ }
+
+ if decoded.Version != 2 {
+ t.Errorf("Expected version 2, got %d", decoded.Version)
+ }
+ })
+}
+
+// Helper functions for creating mock schema registries
+
+func createMockSchemaRegistry(t *testing.T) *httptest.Server {
+ return httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
+ switch r.URL.Path {
+ case "/subjects":
+ // List subjects
+ subjects := []string{"user-value", "product-value", "order-value"}
+ json.NewEncoder(w).Encode(subjects)
+
+ case "/schemas/ids/1":
+ // Avro user schema
+ response := map[string]interface{}{
+ "schema": getUserAvroSchema(),
+ "subject": "user-value",
+ "version": 1,
+ }
+ json.NewEncoder(w).Encode(response)
+
+ case "/schemas/ids/2":
+ // Protobuf schema (simplified)
+ response := map[string]interface{}{
+ "schema": "syntax = \"proto3\"; message User { int32 id = 1; string name = 2; }",
+ "subject": "user-value",
+ "version": 2,
+ }
+ json.NewEncoder(w).Encode(response)
+
+ case "/schemas/ids/3":
+ // JSON Schema
+ response := map[string]interface{}{
+ "schema": getUserJSONSchema(),
+ "subject": "user-value",
+ "version": 3,
+ }
+ json.NewEncoder(w).Encode(response)
+
+ default:
+ w.WriteHeader(http.StatusNotFound)
+ }
+ }))
+}
+
+func createMockSchemaRegistryWithEvolution(t *testing.T) *httptest.Server {
+ return httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
+ switch r.URL.Path {
+ case "/schemas/ids/1":
+ // Schema v1
+ response := map[string]interface{}{
+ "schema": getUserAvroSchemaV1(),
+ "subject": "user-value",
+ "version": 1,
+ }
+ json.NewEncoder(w).Encode(response)
+
+ case "/schemas/ids/2":
+ // Schema v2 (evolved)
+ response := map[string]interface{}{
+ "schema": getUserAvroSchemaV2(),
+ "subject": "user-value",
+ "version": 2,
+ }
+ json.NewEncoder(w).Encode(response)
+
+ default:
+ w.WriteHeader(http.StatusNotFound)
+ }
+ }))
+}
+
+// Schema definitions for testing
+
+func getUserAvroSchema() string {
+ return `{
+ "type": "record",
+ "name": "User",
+ "fields": [
+ {"name": "id", "type": "int"},
+ {"name": "name", "type": "string"},
+ {"name": "email", "type": ["null", "string"], "default": null},
+ {"name": "age", "type": ["null", "int"], "default": null},
+ {"name": "preferences", "type": ["null", {
+ "type": "record",
+ "name": "Preferences",
+ "fields": [
+ {"name": "notifications", "type": "boolean", "default": true},
+ {"name": "theme", "type": "string", "default": "light"}
+ ]
+ }], "default": null}
+ ]
+ }`
+}
+
+func getUserAvroSchemaV1() string {
+ return `{
+ "type": "record",
+ "name": "User",
+ "fields": [
+ {"name": "id", "type": "int"},
+ {"name": "name", "type": "string"}
+ ]
+ }`
+}
+
+func getUserAvroSchemaV2() string {
+ return `{
+ "type": "record",
+ "name": "User",
+ "fields": [
+ {"name": "id", "type": "int"},
+ {"name": "name", "type": "string"},
+ {"name": "email", "type": ["null", "string"], "default": null}
+ ]
+ }`
+}
+
+func getUserJSONSchema() string {
+ return `{
+ "$schema": "http://json-schema.org/draft-07/schema#",
+ "type": "object",
+ "properties": {
+ "id": {"type": "integer"},
+ "name": {"type": "string"},
+ "active": {"type": "boolean"}
+ },
+ "required": ["id", "name"]
+ }`
+}
+
+// Benchmark tests for integration scenarios
+
+func BenchmarkIntegration_AvroDecoding(b *testing.B) {
+ server := createMockSchemaRegistry(nil)
+ defer server.Close()
+
+ config := ManagerConfig{RegistryURL: server.URL}
+ manager, _ := NewManager(config)
+
+ // Create test message
+ testData := map[string]interface{}{
+ "id": int32(1),
+ "name": "Benchmark User",
+ }
+
+ avroSchema := getUserAvroSchema()
+ codec, _ := goavro.NewCodec(avroSchema)
+ avroBinary, _ := codec.BinaryFromNative(nil, testData)
+ testMsg := CreateConfluentEnvelope(FormatAvro, 1, nil, avroBinary)
+
+ b.ResetTimer()
+ for i := 0; i < b.N; i++ {
+ _, _ = manager.DecodeMessage(testMsg)
+ }
+}
+
+func BenchmarkIntegration_JSONSchemaDecoding(b *testing.B) {
+ server := createMockSchemaRegistry(nil)
+ defer server.Close()
+
+ config := ManagerConfig{RegistryURL: server.URL}
+ manager, _ := NewManager(config)
+
+ // Create test message
+ jsonData := []byte(`{"id": 1, "name": "Benchmark User", "active": true}`)
+ testMsg := CreateConfluentEnvelope(FormatJSONSchema, 3, nil, jsonData)
+
+ b.ResetTimer()
+ for i := 0; i < b.N; i++ {
+ _, _ = manager.DecodeMessage(testMsg)
+ }
+}