aboutsummaryrefslogtreecommitdiff
path: root/weed/mq/kafka/schema/decode_encode_basic_test.go
diff options
context:
space:
mode:
Diffstat (limited to 'weed/mq/kafka/schema/decode_encode_basic_test.go')
-rw-r--r--weed/mq/kafka/schema/decode_encode_basic_test.go283
1 files changed, 283 insertions, 0 deletions
diff --git a/weed/mq/kafka/schema/decode_encode_basic_test.go b/weed/mq/kafka/schema/decode_encode_basic_test.go
new file mode 100644
index 000000000..af6091e3f
--- /dev/null
+++ b/weed/mq/kafka/schema/decode_encode_basic_test.go
@@ -0,0 +1,283 @@
+package schema
+
+import (
+ "bytes"
+ "encoding/binary"
+ "encoding/json"
+ "fmt"
+ "net/http"
+ "net/http/httptest"
+ "testing"
+
+ "github.com/linkedin/goavro/v2"
+ "github.com/stretchr/testify/assert"
+ "github.com/stretchr/testify/require"
+)
+
+// TestBasicSchemaDecodeEncode tests the core decode/encode functionality with working schemas
+func TestBasicSchemaDecodeEncode(t *testing.T) {
+ // Create mock schema registry
+ registry := createBasicMockRegistry(t)
+ defer registry.Close()
+
+ manager, err := NewManager(ManagerConfig{
+ RegistryURL: registry.URL,
+ })
+ require.NoError(t, err)
+
+ t.Run("Simple Avro String Record", func(t *testing.T) {
+ schemaID := int32(1)
+ schemaJSON := `{
+ "type": "record",
+ "name": "SimpleMessage",
+ "fields": [
+ {"name": "message", "type": "string"}
+ ]
+ }`
+
+ // Register schema
+ registerBasicSchema(t, registry, schemaID, schemaJSON)
+
+ // Create test data
+ testData := map[string]interface{}{
+ "message": "Hello World",
+ }
+
+ // Encode with Avro
+ codec, err := goavro.NewCodec(schemaJSON)
+ require.NoError(t, err)
+ avroBinary, err := codec.BinaryFromNative(nil, testData)
+ require.NoError(t, err)
+
+ // Create Confluent envelope
+ envelope := createBasicEnvelope(schemaID, avroBinary)
+
+ // Test decode
+ decoded, err := manager.DecodeMessage(envelope)
+ require.NoError(t, err)
+ assert.Equal(t, uint32(schemaID), decoded.SchemaID)
+ assert.Equal(t, FormatAvro, decoded.SchemaFormat)
+ assert.NotNil(t, decoded.RecordValue)
+
+ // Verify the message field
+ messageField, exists := decoded.RecordValue.Fields["message"]
+ require.True(t, exists)
+ assert.Equal(t, "Hello World", messageField.GetStringValue())
+
+ // Test encode back
+ reconstructed, err := manager.EncodeMessage(decoded.RecordValue, decoded.SchemaID, decoded.SchemaFormat)
+ require.NoError(t, err)
+
+ // Verify envelope structure
+ assert.Equal(t, envelope[:5], reconstructed[:5]) // Magic byte + schema ID
+ assert.True(t, len(reconstructed) > 5)
+ })
+
+ t.Run("JSON Schema with String Field", func(t *testing.T) {
+ schemaID := int32(10)
+ schemaJSON := `{
+ "type": "object",
+ "properties": {
+ "name": {"type": "string"}
+ },
+ "required": ["name"]
+ }`
+
+ // Register schema
+ registerBasicSchema(t, registry, schemaID, schemaJSON)
+
+ // Create test data
+ testData := map[string]interface{}{
+ "name": "Test User",
+ }
+
+ // Encode as JSON
+ jsonBytes, err := json.Marshal(testData)
+ require.NoError(t, err)
+
+ // Create Confluent envelope
+ envelope := createBasicEnvelope(schemaID, jsonBytes)
+
+ // For now, this will be detected as Avro due to format detection logic
+ // We'll test that it at least doesn't crash and provides a meaningful error
+ decoded, err := manager.DecodeMessage(envelope)
+
+ // The current implementation may detect this as Avro and fail
+ // That's expected behavior for now - we're testing the error handling
+ if err != nil {
+ t.Logf("Expected error for JSON Schema detected as Avro: %v", err)
+ assert.Contains(t, err.Error(), "Avro")
+ } else {
+ // If it succeeds (future improvement), verify basic structure
+ assert.Equal(t, uint32(schemaID), decoded.SchemaID)
+ assert.NotNil(t, decoded.RecordValue)
+ }
+ })
+
+ t.Run("Cache Performance", func(t *testing.T) {
+ schemaID := int32(20)
+ schemaJSON := `{
+ "type": "record",
+ "name": "CacheTest",
+ "fields": [
+ {"name": "value", "type": "string"}
+ ]
+ }`
+
+ registerBasicSchema(t, registry, schemaID, schemaJSON)
+
+ // Create test data
+ testData := map[string]interface{}{"value": "cached"}
+ codec, err := goavro.NewCodec(schemaJSON)
+ require.NoError(t, err)
+ avroBinary, err := codec.BinaryFromNative(nil, testData)
+ require.NoError(t, err)
+ envelope := createBasicEnvelope(schemaID, avroBinary)
+
+ // First decode - populates cache
+ decoded1, err := manager.DecodeMessage(envelope)
+ require.NoError(t, err)
+
+ // Second decode - uses cache
+ decoded2, err := manager.DecodeMessage(envelope)
+ require.NoError(t, err)
+
+ // Verify results are consistent
+ assert.Equal(t, decoded1.SchemaID, decoded2.SchemaID)
+ assert.Equal(t, decoded1.SchemaFormat, decoded2.SchemaFormat)
+
+ // Verify field values match
+ field1 := decoded1.RecordValue.Fields["value"]
+ field2 := decoded2.RecordValue.Fields["value"]
+ assert.Equal(t, field1.GetStringValue(), field2.GetStringValue())
+
+ // Check that cache is populated
+ decoders, schemas, _ := manager.GetCacheStats()
+ assert.True(t, decoders > 0, "Should have cached decoders")
+ assert.True(t, schemas > 0, "Should have cached schemas")
+ })
+}
+
+// TestSchemaValidation tests schema validation functionality
+func TestSchemaValidation(t *testing.T) {
+ registry := createBasicMockRegistry(t)
+ defer registry.Close()
+
+ manager, err := NewManager(ManagerConfig{
+ RegistryURL: registry.URL,
+ })
+ require.NoError(t, err)
+
+ t.Run("Valid Schema Message", func(t *testing.T) {
+ schemaID := int32(100)
+ schemaJSON := `{
+ "type": "record",
+ "name": "ValidMessage",
+ "fields": [
+ {"name": "id", "type": "string"},
+ {"name": "timestamp", "type": "long"}
+ ]
+ }`
+
+ registerBasicSchema(t, registry, schemaID, schemaJSON)
+
+ // Create valid test data
+ testData := map[string]interface{}{
+ "id": "msg-123",
+ "timestamp": int64(1640995200000),
+ }
+
+ codec, err := goavro.NewCodec(schemaJSON)
+ require.NoError(t, err)
+ avroBinary, err := codec.BinaryFromNative(nil, testData)
+ require.NoError(t, err)
+ envelope := createBasicEnvelope(schemaID, avroBinary)
+
+ // Should decode successfully
+ decoded, err := manager.DecodeMessage(envelope)
+ require.NoError(t, err)
+ assert.Equal(t, uint32(schemaID), decoded.SchemaID)
+
+ // Verify fields
+ idField := decoded.RecordValue.Fields["id"]
+ timestampField := decoded.RecordValue.Fields["timestamp"]
+ assert.Equal(t, "msg-123", idField.GetStringValue())
+ assert.Equal(t, int64(1640995200000), timestampField.GetInt64Value())
+ })
+
+ t.Run("Non-Schematized Message", func(t *testing.T) {
+ // Raw message without Confluent envelope
+ rawMessage := []byte("This is not a schematized message")
+
+ _, err := manager.DecodeMessage(rawMessage)
+ assert.Error(t, err)
+ assert.Contains(t, err.Error(), "not schematized")
+ })
+
+ t.Run("Invalid Envelope", func(t *testing.T) {
+ // Too short envelope
+ shortEnvelope := []byte{0x00, 0x00}
+ _, err := manager.DecodeMessage(shortEnvelope)
+ assert.Error(t, err)
+ assert.Contains(t, err.Error(), "not schematized")
+ })
+}
+
+// Helper functions for basic tests
+
+func createBasicMockRegistry(t *testing.T) *httptest.Server {
+ schemas := make(map[int32]string)
+
+ return httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
+ switch r.URL.Path {
+ case "/subjects":
+ w.WriteHeader(http.StatusOK)
+ w.Write([]byte("[]"))
+ default:
+ // Handle schema requests like /schemas/ids/1
+ var schemaID int32
+ if n, err := fmt.Sscanf(r.URL.Path, "/schemas/ids/%d", &schemaID); n == 1 && err == nil {
+ if schema, exists := schemas[schemaID]; exists {
+ response := fmt.Sprintf(`{"schema": %q}`, schema)
+ w.Header().Set("Content-Type", "application/json")
+ w.WriteHeader(http.StatusOK)
+ w.Write([]byte(response))
+ } else {
+ w.WriteHeader(http.StatusNotFound)
+ w.Write([]byte(`{"error_code": 40403, "message": "Schema not found"}`))
+ }
+ } else if r.Method == "POST" && r.URL.Path == "/register-schema" {
+ // Custom endpoint for test registration
+ var req struct {
+ SchemaID int32 `json:"schema_id"`
+ Schema string `json:"schema"`
+ }
+ if err := json.NewDecoder(r.Body).Decode(&req); err == nil {
+ schemas[req.SchemaID] = req.Schema
+ w.WriteHeader(http.StatusOK)
+ w.Write([]byte(`{"success": true}`))
+ } else {
+ w.WriteHeader(http.StatusBadRequest)
+ }
+ } else {
+ w.WriteHeader(http.StatusNotFound)
+ }
+ }
+ }))
+}
+
+func registerBasicSchema(t *testing.T, registry *httptest.Server, schemaID int32, schema string) {
+ reqBody := fmt.Sprintf(`{"schema_id": %d, "schema": %q}`, schemaID, schema)
+ resp, err := http.Post(registry.URL+"/register-schema", "application/json", bytes.NewReader([]byte(reqBody)))
+ require.NoError(t, err)
+ defer resp.Body.Close()
+ require.Equal(t, http.StatusOK, resp.StatusCode)
+}
+
+func createBasicEnvelope(schemaID int32, data []byte) []byte {
+ envelope := make([]byte, 5+len(data))
+ envelope[0] = 0x00 // Magic byte
+ binary.BigEndian.PutUint32(envelope[1:5], uint32(schemaID))
+ copy(envelope[5:], data)
+ return envelope
+}