diff options
Diffstat (limited to 'weed/mq/kafka/schema/manager_test.go')
| -rw-r--r-- | weed/mq/kafka/schema/manager_test.go | 331 |
1 files changed, 331 insertions, 0 deletions
diff --git a/weed/mq/kafka/schema/manager_test.go b/weed/mq/kafka/schema/manager_test.go new file mode 100644 index 000000000..eec2a479e --- /dev/null +++ b/weed/mq/kafka/schema/manager_test.go @@ -0,0 +1,331 @@ +package schema + +import ( + "encoding/json" + "net/http" + "net/http/httptest" + "testing" + + "github.com/linkedin/goavro/v2" +) + +func TestManager_DecodeMessage(t *testing.T) { + // Create mock schema registry + server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + if r.URL.Path == "/schemas/ids/1" { + response := map[string]interface{}{ + "schema": `{ + "type": "record", + "name": "User", + "fields": [ + {"name": "id", "type": "int"}, + {"name": "name", "type": "string"} + ] + }`, + "subject": "user-value", + "version": 1, + } + json.NewEncoder(w).Encode(response) + } else { + w.WriteHeader(http.StatusNotFound) + } + })) + defer server.Close() + + // Create manager + config := ManagerConfig{ + RegistryURL: server.URL, + ValidationMode: ValidationPermissive, + } + + manager, err := NewManager(config) + if err != nil { + t.Fatalf("Failed to create manager: %v", err) + } + + // Create test Avro message + avroSchema := `{ + "type": "record", + "name": "User", + "fields": [ + {"name": "id", "type": "int"}, + {"name": "name", "type": "string"} + ] + }` + + codec, err := goavro.NewCodec(avroSchema) + if err != nil { + t.Fatalf("Failed to create Avro codec: %v", err) + } + + // Create test data + testRecord := map[string]interface{}{ + "id": int32(123), + "name": "John Doe", + } + + // Encode to Avro binary + avroBinary, err := codec.BinaryFromNative(nil, testRecord) + if err != nil { + t.Fatalf("Failed to encode Avro data: %v", err) + } + + // Create Confluent envelope + confluentMsg := CreateConfluentEnvelope(FormatAvro, 1, nil, avroBinary) + + // Test decoding + decodedMsg, err := manager.DecodeMessage(confluentMsg) + if err != nil { + t.Fatalf("Failed to decode message: %v", err) + } + + // Verify decoded message + if decodedMsg.SchemaID != 1 { + t.Errorf("Expected schema ID 1, got %d", decodedMsg.SchemaID) + } + + if decodedMsg.SchemaFormat != FormatAvro { + t.Errorf("Expected Avro format, got %v", decodedMsg.SchemaFormat) + } + + if decodedMsg.Subject != "user-value" { + t.Errorf("Expected subject 'user-value', got %s", decodedMsg.Subject) + } + + // Verify decoded data + if decodedMsg.RecordValue == nil { + t.Fatal("Expected non-nil RecordValue") + } + + idValue := decodedMsg.RecordValue.Fields["id"] + if idValue == nil || idValue.GetInt32Value() != 123 { + t.Errorf("Expected id=123, got %v", idValue) + } + + nameValue := decodedMsg.RecordValue.Fields["name"] + if nameValue == nil || nameValue.GetStringValue() != "John Doe" { + t.Errorf("Expected name='John Doe', got %v", nameValue) + } +} + +func TestManager_IsSchematized(t *testing.T) { + config := ManagerConfig{ + RegistryURL: "http://localhost:8081", // Not used for this test + } + + manager, err := NewManager(config) + if err != nil { + // Skip test if we can't connect to registry + t.Skip("Skipping test - no registry available") + } + + tests := []struct { + name string + message []byte + expected bool + }{ + { + name: "schematized message", + message: []byte{0x00, 0x00, 0x00, 0x00, 0x01, 0x48, 0x65, 0x6c, 0x6c, 0x6f}, + expected: true, + }, + { + name: "non-schematized message", + message: []byte{0x48, 0x65, 0x6c, 0x6c, 0x6f}, // Just "Hello" + expected: false, + }, + { + name: "empty message", + message: []byte{}, + expected: false, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + result := manager.IsSchematized(tt.message) + if result != tt.expected { + t.Errorf("IsSchematized() = %v, want %v", result, tt.expected) + } + }) + } +} + +func TestManager_GetSchemaInfo(t *testing.T) { + // Create mock schema registry + server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + if r.URL.Path == "/schemas/ids/42" { + response := map[string]interface{}{ + "schema": `{ + "type": "record", + "name": "Product", + "fields": [ + {"name": "id", "type": "string"}, + {"name": "price", "type": "double"} + ] + }`, + "subject": "product-value", + "version": 3, + } + json.NewEncoder(w).Encode(response) + } else { + w.WriteHeader(http.StatusNotFound) + } + })) + defer server.Close() + + config := ManagerConfig{ + RegistryURL: server.URL, + } + + manager, err := NewManager(config) + if err != nil { + t.Fatalf("Failed to create manager: %v", err) + } + + // Create test message with schema ID 42 + testMsg := CreateConfluentEnvelope(FormatAvro, 42, nil, []byte("test-payload")) + + schemaID, format, err := manager.GetSchemaInfo(testMsg) + if err != nil { + t.Fatalf("Failed to get schema info: %v", err) + } + + if schemaID != 42 { + t.Errorf("Expected schema ID 42, got %d", schemaID) + } + + if format != FormatAvro { + t.Errorf("Expected Avro format, got %v", format) + } +} + +func TestManager_CacheManagement(t *testing.T) { + config := ManagerConfig{ + RegistryURL: "http://localhost:8081", // Not used for this test + } + + manager, err := NewManager(config) + if err != nil { + t.Skip("Skipping test - no registry available") + } + + // Check initial cache stats + decoders, schemas, subjects := manager.GetCacheStats() + if decoders != 0 || schemas != 0 || subjects != 0 { + t.Errorf("Expected empty cache initially, got decoders=%d, schemas=%d, subjects=%d", + decoders, schemas, subjects) + } + + // Clear cache (should be no-op on empty cache) + manager.ClearCache() + + // Verify still empty + decoders, schemas, subjects = manager.GetCacheStats() + if decoders != 0 || schemas != 0 || subjects != 0 { + t.Errorf("Expected empty cache after clear, got decoders=%d, schemas=%d, subjects=%d", + decoders, schemas, subjects) + } +} + +func TestManager_EncodeMessage(t *testing.T) { + // Create mock schema registry + server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + if r.URL.Path == "/schemas/ids/1" { + response := map[string]interface{}{ + "schema": `{ + "type": "record", + "name": "User", + "fields": [ + {"name": "id", "type": "int"}, + {"name": "name", "type": "string"} + ] + }`, + "subject": "user-value", + "version": 1, + } + json.NewEncoder(w).Encode(response) + } else { + w.WriteHeader(http.StatusNotFound) + } + })) + defer server.Close() + + config := ManagerConfig{ + RegistryURL: server.URL, + } + + manager, err := NewManager(config) + if err != nil { + t.Fatalf("Failed to create manager: %v", err) + } + + // Create test RecordValue + testMap := map[string]interface{}{ + "id": int32(456), + "name": "Jane Smith", + } + recordValue := MapToRecordValue(testMap) + + // Test encoding + encoded, err := manager.EncodeMessage(recordValue, 1, FormatAvro) + if err != nil { + t.Fatalf("Failed to encode message: %v", err) + } + + // Verify it's a valid Confluent envelope + envelope, ok := ParseConfluentEnvelope(encoded) + if !ok { + t.Fatal("Encoded message is not a valid Confluent envelope") + } + + if envelope.SchemaID != 1 { + t.Errorf("Expected schema ID 1, got %d", envelope.SchemaID) + } + + if envelope.Format != FormatAvro { + t.Errorf("Expected Avro format, got %v", envelope.Format) + } + + // Test round-trip: decode the encoded message + decodedMsg, err := manager.DecodeMessage(encoded) + if err != nil { + t.Fatalf("Failed to decode round-trip message: %v", err) + } + + // Verify round-trip data integrity + if decodedMsg.RecordValue.Fields["id"].GetInt32Value() != 456 { + t.Error("Round-trip failed for id field") + } + + if decodedMsg.RecordValue.Fields["name"].GetStringValue() != "Jane Smith" { + t.Error("Round-trip failed for name field") + } +} + +// Benchmark tests +func BenchmarkManager_DecodeMessage(b *testing.B) { + // Setup (similar to TestManager_DecodeMessage but simplified) + server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + response := map[string]interface{}{ + "schema": `{"type":"record","name":"User","fields":[{"name":"id","type":"int"}]}`, + "subject": "user-value", + "version": 1, + } + json.NewEncoder(w).Encode(response) + })) + defer server.Close() + + config := ManagerConfig{RegistryURL: server.URL} + manager, _ := NewManager(config) + + // Create test message + codec, _ := goavro.NewCodec(`{"type":"record","name":"User","fields":[{"name":"id","type":"int"}]}`) + avroBinary, _ := codec.BinaryFromNative(nil, map[string]interface{}{"id": int32(123)}) + testMsg := CreateConfluentEnvelope(FormatAvro, 1, nil, avroBinary) + + b.ResetTimer() + for i := 0; i < b.N; i++ { + _, _ = manager.DecodeMessage(testMsg) + } +} |
