aboutsummaryrefslogtreecommitdiff
path: root/weed/mq/kafka/schema/manager_evolution_test.go
diff options
context:
space:
mode:
Diffstat (limited to 'weed/mq/kafka/schema/manager_evolution_test.go')
-rw-r--r--weed/mq/kafka/schema/manager_evolution_test.go344
1 files changed, 344 insertions, 0 deletions
diff --git a/weed/mq/kafka/schema/manager_evolution_test.go b/weed/mq/kafka/schema/manager_evolution_test.go
new file mode 100644
index 000000000..232c0e1e7
--- /dev/null
+++ b/weed/mq/kafka/schema/manager_evolution_test.go
@@ -0,0 +1,344 @@
+package schema
+
+import (
+ "strings"
+ "testing"
+
+ "github.com/stretchr/testify/assert"
+ "github.com/stretchr/testify/require"
+)
+
+// TestManager_SchemaEvolution tests schema evolution integration in the manager
+func TestManager_SchemaEvolution(t *testing.T) {
+ // Create a manager without registry (for testing evolution logic only)
+ manager := &Manager{
+ evolutionChecker: NewSchemaEvolutionChecker(),
+ }
+
+ t.Run("Compatible Avro evolution", func(t *testing.T) {
+ oldSchema := `{
+ "type": "record",
+ "name": "User",
+ "fields": [
+ {"name": "id", "type": "int"},
+ {"name": "name", "type": "string"}
+ ]
+ }`
+
+ newSchema := `{
+ "type": "record",
+ "name": "User",
+ "fields": [
+ {"name": "id", "type": "int"},
+ {"name": "name", "type": "string"},
+ {"name": "email", "type": "string", "default": ""}
+ ]
+ }`
+
+ result, err := manager.CheckSchemaCompatibility(oldSchema, newSchema, FormatAvro, CompatibilityBackward)
+ require.NoError(t, err)
+ assert.True(t, result.Compatible)
+ assert.Empty(t, result.Issues)
+ })
+
+ t.Run("Incompatible Avro evolution", func(t *testing.T) {
+ oldSchema := `{
+ "type": "record",
+ "name": "User",
+ "fields": [
+ {"name": "id", "type": "int"},
+ {"name": "name", "type": "string"},
+ {"name": "email", "type": "string"}
+ ]
+ }`
+
+ newSchema := `{
+ "type": "record",
+ "name": "User",
+ "fields": [
+ {"name": "id", "type": "int"},
+ {"name": "name", "type": "string"}
+ ]
+ }`
+
+ result, err := manager.CheckSchemaCompatibility(oldSchema, newSchema, FormatAvro, CompatibilityBackward)
+ require.NoError(t, err)
+ assert.False(t, result.Compatible)
+ assert.NotEmpty(t, result.Issues)
+ assert.Contains(t, result.Issues[0], "Field 'email' was removed")
+ })
+
+ t.Run("Schema evolution suggestions", func(t *testing.T) {
+ oldSchema := `{
+ "type": "record",
+ "name": "User",
+ "fields": [
+ {"name": "id", "type": "int"},
+ {"name": "name", "type": "string"}
+ ]
+ }`
+
+ newSchema := `{
+ "type": "record",
+ "name": "User",
+ "fields": [
+ {"name": "id", "type": "int"},
+ {"name": "name", "type": "string"},
+ {"name": "email", "type": "string"}
+ ]
+ }`
+
+ suggestions, err := manager.SuggestSchemaEvolution(oldSchema, newSchema, FormatAvro, CompatibilityBackward)
+ require.NoError(t, err)
+ assert.NotEmpty(t, suggestions)
+
+ // Should suggest adding default values
+ found := false
+ for _, suggestion := range suggestions {
+ if strings.Contains(suggestion, "default") {
+ found = true
+ break
+ }
+ }
+ assert.True(t, found, "Should suggest adding default values, got: %v", suggestions)
+ })
+
+ t.Run("JSON Schema evolution", func(t *testing.T) {
+ oldSchema := `{
+ "type": "object",
+ "properties": {
+ "id": {"type": "integer"},
+ "name": {"type": "string"}
+ },
+ "required": ["id", "name"]
+ }`
+
+ newSchema := `{
+ "type": "object",
+ "properties": {
+ "id": {"type": "integer"},
+ "name": {"type": "string"},
+ "email": {"type": "string"}
+ },
+ "required": ["id", "name"]
+ }`
+
+ result, err := manager.CheckSchemaCompatibility(oldSchema, newSchema, FormatJSONSchema, CompatibilityBackward)
+ require.NoError(t, err)
+ assert.True(t, result.Compatible)
+ })
+
+ t.Run("Full compatibility check", func(t *testing.T) {
+ oldSchema := `{
+ "type": "record",
+ "name": "User",
+ "fields": [
+ {"name": "id", "type": "int"},
+ {"name": "name", "type": "string"}
+ ]
+ }`
+
+ newSchema := `{
+ "type": "record",
+ "name": "User",
+ "fields": [
+ {"name": "id", "type": "int"},
+ {"name": "name", "type": "string"},
+ {"name": "email", "type": "string", "default": ""}
+ ]
+ }`
+
+ result, err := manager.CheckSchemaCompatibility(oldSchema, newSchema, FormatAvro, CompatibilityFull)
+ require.NoError(t, err)
+ assert.True(t, result.Compatible)
+ })
+
+ t.Run("Type promotion compatibility", func(t *testing.T) {
+ oldSchema := `{
+ "type": "record",
+ "name": "User",
+ "fields": [
+ {"name": "id", "type": "int"},
+ {"name": "score", "type": "int"}
+ ]
+ }`
+
+ newSchema := `{
+ "type": "record",
+ "name": "User",
+ "fields": [
+ {"name": "id", "type": "int"},
+ {"name": "score", "type": "long"}
+ ]
+ }`
+
+ result, err := manager.CheckSchemaCompatibility(oldSchema, newSchema, FormatAvro, CompatibilityBackward)
+ require.NoError(t, err)
+ assert.True(t, result.Compatible)
+ })
+}
+
+// TestManager_CompatibilityLevels tests compatibility level management
+func TestManager_CompatibilityLevels(t *testing.T) {
+ manager := &Manager{
+ evolutionChecker: NewSchemaEvolutionChecker(),
+ }
+
+ t.Run("Get default compatibility level", func(t *testing.T) {
+ level := manager.GetCompatibilityLevel("test-subject")
+ assert.Equal(t, CompatibilityBackward, level)
+ })
+
+ t.Run("Set compatibility level", func(t *testing.T) {
+ err := manager.SetCompatibilityLevel("test-subject", CompatibilityFull)
+ assert.NoError(t, err)
+ })
+}
+
+// TestManager_CanEvolveSchema tests the CanEvolveSchema method
+func TestManager_CanEvolveSchema(t *testing.T) {
+ manager := &Manager{
+ evolutionChecker: NewSchemaEvolutionChecker(),
+ }
+
+ t.Run("Compatible evolution", func(t *testing.T) {
+ currentSchema := `{
+ "type": "record",
+ "name": "User",
+ "fields": [
+ {"name": "id", "type": "int"},
+ {"name": "name", "type": "string"}
+ ]
+ }`
+
+ newSchema := `{
+ "type": "record",
+ "name": "User",
+ "fields": [
+ {"name": "id", "type": "int"},
+ {"name": "name", "type": "string"},
+ {"name": "email", "type": "string", "default": ""}
+ ]
+ }`
+
+ result, err := manager.CanEvolveSchema("test-subject", currentSchema, newSchema, FormatAvro)
+ require.NoError(t, err)
+ assert.True(t, result.Compatible)
+ })
+
+ t.Run("Incompatible evolution", func(t *testing.T) {
+ currentSchema := `{
+ "type": "record",
+ "name": "User",
+ "fields": [
+ {"name": "id", "type": "int"},
+ {"name": "name", "type": "string"},
+ {"name": "email", "type": "string"}
+ ]
+ }`
+
+ newSchema := `{
+ "type": "record",
+ "name": "User",
+ "fields": [
+ {"name": "id", "type": "int"},
+ {"name": "name", "type": "string"}
+ ]
+ }`
+
+ result, err := manager.CanEvolveSchema("test-subject", currentSchema, newSchema, FormatAvro)
+ require.NoError(t, err)
+ assert.False(t, result.Compatible)
+ assert.Contains(t, result.Issues[0], "Field 'email' was removed")
+ })
+}
+
+// TestManager_SchemaEvolutionWorkflow tests a complete schema evolution workflow
+func TestManager_SchemaEvolutionWorkflow(t *testing.T) {
+ manager := &Manager{
+ evolutionChecker: NewSchemaEvolutionChecker(),
+ }
+
+ t.Run("Complete evolution workflow", func(t *testing.T) {
+ // Step 1: Define initial schema
+ initialSchema := `{
+ "type": "record",
+ "name": "UserEvent",
+ "fields": [
+ {"name": "userId", "type": "int"},
+ {"name": "action", "type": "string"}
+ ]
+ }`
+
+ // Step 2: Propose schema evolution (compatible)
+ evolvedSchema := `{
+ "type": "record",
+ "name": "UserEvent",
+ "fields": [
+ {"name": "userId", "type": "int"},
+ {"name": "action", "type": "string"},
+ {"name": "timestamp", "type": "long", "default": 0}
+ ]
+ }`
+
+ // Check compatibility explicitly
+ result, err := manager.CanEvolveSchema("user-events", initialSchema, evolvedSchema, FormatAvro)
+ require.NoError(t, err)
+ assert.True(t, result.Compatible)
+
+ // Step 3: Try incompatible evolution
+ incompatibleSchema := `{
+ "type": "record",
+ "name": "UserEvent",
+ "fields": [
+ {"name": "userId", "type": "int"}
+ ]
+ }`
+
+ result, err = manager.CanEvolveSchema("user-events", initialSchema, incompatibleSchema, FormatAvro)
+ require.NoError(t, err)
+ assert.False(t, result.Compatible)
+ assert.Contains(t, result.Issues[0], "Field 'action' was removed")
+
+ // Step 4: Get suggestions for incompatible evolution
+ suggestions, err := manager.SuggestSchemaEvolution(initialSchema, incompatibleSchema, FormatAvro, CompatibilityBackward)
+ require.NoError(t, err)
+ assert.NotEmpty(t, suggestions)
+ })
+}
+
+// BenchmarkSchemaEvolution benchmarks schema evolution operations
+func BenchmarkSchemaEvolution(b *testing.B) {
+ manager := &Manager{
+ evolutionChecker: NewSchemaEvolutionChecker(),
+ }
+
+ oldSchema := `{
+ "type": "record",
+ "name": "User",
+ "fields": [
+ {"name": "id", "type": "int"},
+ {"name": "name", "type": "string"},
+ {"name": "email", "type": "string", "default": ""}
+ ]
+ }`
+
+ newSchema := `{
+ "type": "record",
+ "name": "User",
+ "fields": [
+ {"name": "id", "type": "int"},
+ {"name": "name", "type": "string"},
+ {"name": "email", "type": "string", "default": ""},
+ {"name": "age", "type": "int", "default": 0}
+ ]
+ }`
+
+ b.ResetTimer()
+ for i := 0; i < b.N; i++ {
+ _, err := manager.CheckSchemaCompatibility(oldSchema, newSchema, FormatAvro, CompatibilityBackward)
+ if err != nil {
+ b.Fatal(err)
+ }
+ }
+}