aboutsummaryrefslogtreecommitdiff
path: root/weed/mq/kafka/schema/evolution.go
diff options
context:
space:
mode:
Diffstat (limited to 'weed/mq/kafka/schema/evolution.go')
-rw-r--r--weed/mq/kafka/schema/evolution.go522
1 files changed, 522 insertions, 0 deletions
diff --git a/weed/mq/kafka/schema/evolution.go b/weed/mq/kafka/schema/evolution.go
new file mode 100644
index 000000000..73b56fc03
--- /dev/null
+++ b/weed/mq/kafka/schema/evolution.go
@@ -0,0 +1,522 @@
+package schema
+
+import (
+ "encoding/json"
+ "fmt"
+ "strings"
+
+ "github.com/linkedin/goavro/v2"
+)
+
+// CompatibilityLevel defines the schema compatibility level
+type CompatibilityLevel string
+
+const (
+ CompatibilityNone CompatibilityLevel = "NONE"
+ CompatibilityBackward CompatibilityLevel = "BACKWARD"
+ CompatibilityForward CompatibilityLevel = "FORWARD"
+ CompatibilityFull CompatibilityLevel = "FULL"
+)
+
+// SchemaEvolutionChecker handles schema compatibility checking and evolution
+type SchemaEvolutionChecker struct {
+ // Cache for parsed schemas to avoid re-parsing
+ schemaCache map[string]interface{}
+}
+
+// NewSchemaEvolutionChecker creates a new schema evolution checker
+func NewSchemaEvolutionChecker() *SchemaEvolutionChecker {
+ return &SchemaEvolutionChecker{
+ schemaCache: make(map[string]interface{}),
+ }
+}
+
+// CompatibilityResult represents the result of a compatibility check
+type CompatibilityResult struct {
+ Compatible bool
+ Issues []string
+ Level CompatibilityLevel
+}
+
+// CheckCompatibility checks if two schemas are compatible according to the specified level
+func (checker *SchemaEvolutionChecker) CheckCompatibility(
+ oldSchemaStr, newSchemaStr string,
+ format Format,
+ level CompatibilityLevel,
+) (*CompatibilityResult, error) {
+
+ result := &CompatibilityResult{
+ Compatible: true,
+ Issues: []string{},
+ Level: level,
+ }
+
+ if level == CompatibilityNone {
+ return result, nil
+ }
+
+ switch format {
+ case FormatAvro:
+ return checker.checkAvroCompatibility(oldSchemaStr, newSchemaStr, level)
+ case FormatProtobuf:
+ return checker.checkProtobufCompatibility(oldSchemaStr, newSchemaStr, level)
+ case FormatJSONSchema:
+ return checker.checkJSONSchemaCompatibility(oldSchemaStr, newSchemaStr, level)
+ default:
+ return nil, fmt.Errorf("unsupported schema format for compatibility check: %s", format)
+ }
+}
+
+// checkAvroCompatibility checks Avro schema compatibility
+func (checker *SchemaEvolutionChecker) checkAvroCompatibility(
+ oldSchemaStr, newSchemaStr string,
+ level CompatibilityLevel,
+) (*CompatibilityResult, error) {
+
+ result := &CompatibilityResult{
+ Compatible: true,
+ Issues: []string{},
+ Level: level,
+ }
+
+ // Parse old schema
+ oldSchema, err := goavro.NewCodec(oldSchemaStr)
+ if err != nil {
+ return nil, fmt.Errorf("failed to parse old Avro schema: %w", err)
+ }
+
+ // Parse new schema
+ newSchema, err := goavro.NewCodec(newSchemaStr)
+ if err != nil {
+ return nil, fmt.Errorf("failed to parse new Avro schema: %w", err)
+ }
+
+ // Parse schema structures for detailed analysis
+ var oldSchemaMap, newSchemaMap map[string]interface{}
+ if err := json.Unmarshal([]byte(oldSchemaStr), &oldSchemaMap); err != nil {
+ return nil, fmt.Errorf("failed to parse old schema JSON: %w", err)
+ }
+ if err := json.Unmarshal([]byte(newSchemaStr), &newSchemaMap); err != nil {
+ return nil, fmt.Errorf("failed to parse new schema JSON: %w", err)
+ }
+
+ // Check compatibility based on level
+ switch level {
+ case CompatibilityBackward:
+ checker.checkAvroBackwardCompatibility(oldSchemaMap, newSchemaMap, result)
+ case CompatibilityForward:
+ checker.checkAvroForwardCompatibility(oldSchemaMap, newSchemaMap, result)
+ case CompatibilityFull:
+ checker.checkAvroBackwardCompatibility(oldSchemaMap, newSchemaMap, result)
+ if result.Compatible {
+ checker.checkAvroForwardCompatibility(oldSchemaMap, newSchemaMap, result)
+ }
+ }
+
+ // Additional validation: try to create test data and check if it can be read
+ if result.Compatible {
+ if err := checker.validateAvroDataCompatibility(oldSchema, newSchema, level); err != nil {
+ result.Compatible = false
+ result.Issues = append(result.Issues, fmt.Sprintf("Data compatibility test failed: %v", err))
+ }
+ }
+
+ return result, nil
+}
+
+// checkAvroBackwardCompatibility checks if new schema can read data written with old schema
+func (checker *SchemaEvolutionChecker) checkAvroBackwardCompatibility(
+ oldSchema, newSchema map[string]interface{},
+ result *CompatibilityResult,
+) {
+ // Check if fields were removed without defaults
+ oldFields := checker.extractAvroFields(oldSchema)
+ newFields := checker.extractAvroFields(newSchema)
+
+ for fieldName, oldField := range oldFields {
+ if newField, exists := newFields[fieldName]; !exists {
+ // Field was removed - this breaks backward compatibility
+ result.Compatible = false
+ result.Issues = append(result.Issues,
+ fmt.Sprintf("Field '%s' was removed, breaking backward compatibility", fieldName))
+ } else {
+ // Field exists, check type compatibility
+ if !checker.areAvroTypesCompatible(oldField["type"], newField["type"], true) {
+ result.Compatible = false
+ result.Issues = append(result.Issues,
+ fmt.Sprintf("Field '%s' type changed incompatibly", fieldName))
+ }
+ }
+ }
+
+ // Check if new required fields were added without defaults
+ for fieldName, newField := range newFields {
+ if _, exists := oldFields[fieldName]; !exists {
+ // New field added
+ if _, hasDefault := newField["default"]; !hasDefault {
+ result.Compatible = false
+ result.Issues = append(result.Issues,
+ fmt.Sprintf("New required field '%s' added without default value", fieldName))
+ }
+ }
+ }
+}
+
+// checkAvroForwardCompatibility checks if old schema can read data written with new schema
+func (checker *SchemaEvolutionChecker) checkAvroForwardCompatibility(
+ oldSchema, newSchema map[string]interface{},
+ result *CompatibilityResult,
+) {
+ // Check if fields were added without defaults in old schema
+ oldFields := checker.extractAvroFields(oldSchema)
+ newFields := checker.extractAvroFields(newSchema)
+
+ for fieldName, newField := range newFields {
+ if _, exists := oldFields[fieldName]; !exists {
+ // New field added - for forward compatibility, the new field should have a default
+ // so that old schema can ignore it when reading data written with new schema
+ if _, hasDefault := newField["default"]; !hasDefault {
+ result.Compatible = false
+ result.Issues = append(result.Issues,
+ fmt.Sprintf("New field '%s' cannot be read by old schema (no default)", fieldName))
+ }
+ } else {
+ // Field exists, check type compatibility (reverse direction)
+ oldField := oldFields[fieldName]
+ if !checker.areAvroTypesCompatible(newField["type"], oldField["type"], false) {
+ result.Compatible = false
+ result.Issues = append(result.Issues,
+ fmt.Sprintf("Field '%s' type change breaks forward compatibility", fieldName))
+ }
+ }
+ }
+
+ // Check if fields were removed
+ for fieldName := range oldFields {
+ if _, exists := newFields[fieldName]; !exists {
+ result.Compatible = false
+ result.Issues = append(result.Issues,
+ fmt.Sprintf("Field '%s' was removed, breaking forward compatibility", fieldName))
+ }
+ }
+}
+
+// extractAvroFields extracts field information from an Avro schema
+func (checker *SchemaEvolutionChecker) extractAvroFields(schema map[string]interface{}) map[string]map[string]interface{} {
+ fields := make(map[string]map[string]interface{})
+
+ if fieldsArray, ok := schema["fields"].([]interface{}); ok {
+ for _, fieldInterface := range fieldsArray {
+ if field, ok := fieldInterface.(map[string]interface{}); ok {
+ if name, ok := field["name"].(string); ok {
+ fields[name] = field
+ }
+ }
+ }
+ }
+
+ return fields
+}
+
+// areAvroTypesCompatible checks if two Avro types are compatible
+func (checker *SchemaEvolutionChecker) areAvroTypesCompatible(oldType, newType interface{}, backward bool) bool {
+ // Simplified type compatibility check
+ // In a full implementation, this would handle complex types, unions, etc.
+
+ oldTypeStr := fmt.Sprintf("%v", oldType)
+ newTypeStr := fmt.Sprintf("%v", newType)
+
+ // Same type is always compatible
+ if oldTypeStr == newTypeStr {
+ return true
+ }
+
+ // Check for promotable types (e.g., int -> long, float -> double)
+ if backward {
+ return checker.isPromotableType(oldTypeStr, newTypeStr)
+ } else {
+ return checker.isPromotableType(newTypeStr, oldTypeStr)
+ }
+}
+
+// isPromotableType checks if a type can be promoted to another
+func (checker *SchemaEvolutionChecker) isPromotableType(from, to string) bool {
+ promotions := map[string][]string{
+ "int": {"long", "float", "double"},
+ "long": {"float", "double"},
+ "float": {"double"},
+ "string": {"bytes"},
+ "bytes": {"string"},
+ }
+
+ if validPromotions, exists := promotions[from]; exists {
+ for _, validTo := range validPromotions {
+ if to == validTo {
+ return true
+ }
+ }
+ }
+
+ return false
+}
+
+// validateAvroDataCompatibility validates compatibility by testing with actual data
+func (checker *SchemaEvolutionChecker) validateAvroDataCompatibility(
+ oldSchema, newSchema *goavro.Codec,
+ level CompatibilityLevel,
+) error {
+ // Create test data with old schema
+ testData := map[string]interface{}{
+ "test_field": "test_value",
+ }
+
+ // Try to encode with old schema
+ encoded, err := oldSchema.BinaryFromNative(nil, testData)
+ if err != nil {
+ // If we can't create test data, skip validation
+ return nil
+ }
+
+ // Try to decode with new schema (backward compatibility)
+ if level == CompatibilityBackward || level == CompatibilityFull {
+ _, _, err := newSchema.NativeFromBinary(encoded)
+ if err != nil {
+ return fmt.Errorf("backward compatibility failed: %w", err)
+ }
+ }
+
+ // Try to encode with new schema and decode with old (forward compatibility)
+ if level == CompatibilityForward || level == CompatibilityFull {
+ newEncoded, err := newSchema.BinaryFromNative(nil, testData)
+ if err == nil {
+ _, _, err = oldSchema.NativeFromBinary(newEncoded)
+ if err != nil {
+ return fmt.Errorf("forward compatibility failed: %w", err)
+ }
+ }
+ }
+
+ return nil
+}
+
+// checkProtobufCompatibility checks Protobuf schema compatibility
+func (checker *SchemaEvolutionChecker) checkProtobufCompatibility(
+ oldSchemaStr, newSchemaStr string,
+ level CompatibilityLevel,
+) (*CompatibilityResult, error) {
+
+ result := &CompatibilityResult{
+ Compatible: true,
+ Issues: []string{},
+ Level: level,
+ }
+
+ // For now, implement basic Protobuf compatibility rules
+ // In a full implementation, this would parse .proto files and check field numbers, types, etc.
+
+ // Basic check: if schemas are identical, they're compatible
+ if oldSchemaStr == newSchemaStr {
+ return result, nil
+ }
+
+ // For protobuf, we need to parse the schema and check:
+ // - Field numbers haven't changed
+ // - Required fields haven't been removed
+ // - Field types are compatible
+
+ // Simplified implementation - mark as compatible with warning
+ result.Issues = append(result.Issues, "Protobuf compatibility checking is simplified - manual review recommended")
+
+ return result, nil
+}
+
+// checkJSONSchemaCompatibility checks JSON Schema compatibility
+func (checker *SchemaEvolutionChecker) checkJSONSchemaCompatibility(
+ oldSchemaStr, newSchemaStr string,
+ level CompatibilityLevel,
+) (*CompatibilityResult, error) {
+
+ result := &CompatibilityResult{
+ Compatible: true,
+ Issues: []string{},
+ Level: level,
+ }
+
+ // Parse JSON schemas
+ var oldSchema, newSchema map[string]interface{}
+ if err := json.Unmarshal([]byte(oldSchemaStr), &oldSchema); err != nil {
+ return nil, fmt.Errorf("failed to parse old JSON schema: %w", err)
+ }
+ if err := json.Unmarshal([]byte(newSchemaStr), &newSchema); err != nil {
+ return nil, fmt.Errorf("failed to parse new JSON schema: %w", err)
+ }
+
+ // Check compatibility based on level
+ switch level {
+ case CompatibilityBackward:
+ checker.checkJSONSchemaBackwardCompatibility(oldSchema, newSchema, result)
+ case CompatibilityForward:
+ checker.checkJSONSchemaForwardCompatibility(oldSchema, newSchema, result)
+ case CompatibilityFull:
+ checker.checkJSONSchemaBackwardCompatibility(oldSchema, newSchema, result)
+ if result.Compatible {
+ checker.checkJSONSchemaForwardCompatibility(oldSchema, newSchema, result)
+ }
+ }
+
+ return result, nil
+}
+
+// checkJSONSchemaBackwardCompatibility checks JSON Schema backward compatibility
+func (checker *SchemaEvolutionChecker) checkJSONSchemaBackwardCompatibility(
+ oldSchema, newSchema map[string]interface{},
+ result *CompatibilityResult,
+) {
+ // Check if required fields were added
+ oldRequired := checker.extractJSONSchemaRequired(oldSchema)
+ newRequired := checker.extractJSONSchemaRequired(newSchema)
+
+ for _, field := range newRequired {
+ if !contains(oldRequired, field) {
+ result.Compatible = false
+ result.Issues = append(result.Issues,
+ fmt.Sprintf("New required field '%s' breaks backward compatibility", field))
+ }
+ }
+
+ // Check if properties were removed
+ oldProperties := checker.extractJSONSchemaProperties(oldSchema)
+ newProperties := checker.extractJSONSchemaProperties(newSchema)
+
+ for propName := range oldProperties {
+ if _, exists := newProperties[propName]; !exists {
+ result.Compatible = false
+ result.Issues = append(result.Issues,
+ fmt.Sprintf("Property '%s' was removed, breaking backward compatibility", propName))
+ }
+ }
+}
+
+// checkJSONSchemaForwardCompatibility checks JSON Schema forward compatibility
+func (checker *SchemaEvolutionChecker) checkJSONSchemaForwardCompatibility(
+ oldSchema, newSchema map[string]interface{},
+ result *CompatibilityResult,
+) {
+ // Check if required fields were removed
+ oldRequired := checker.extractJSONSchemaRequired(oldSchema)
+ newRequired := checker.extractJSONSchemaRequired(newSchema)
+
+ for _, field := range oldRequired {
+ if !contains(newRequired, field) {
+ result.Compatible = false
+ result.Issues = append(result.Issues,
+ fmt.Sprintf("Required field '%s' was removed, breaking forward compatibility", field))
+ }
+ }
+
+ // Check if properties were added
+ oldProperties := checker.extractJSONSchemaProperties(oldSchema)
+ newProperties := checker.extractJSONSchemaProperties(newSchema)
+
+ for propName := range newProperties {
+ if _, exists := oldProperties[propName]; !exists {
+ result.Issues = append(result.Issues,
+ fmt.Sprintf("New property '%s' added - ensure old schema can handle it", propName))
+ }
+ }
+}
+
+// extractJSONSchemaRequired extracts required fields from JSON Schema
+func (checker *SchemaEvolutionChecker) extractJSONSchemaRequired(schema map[string]interface{}) []string {
+ if required, ok := schema["required"].([]interface{}); ok {
+ var fields []string
+ for _, field := range required {
+ if fieldStr, ok := field.(string); ok {
+ fields = append(fields, fieldStr)
+ }
+ }
+ return fields
+ }
+ return []string{}
+}
+
+// extractJSONSchemaProperties extracts properties from JSON Schema
+func (checker *SchemaEvolutionChecker) extractJSONSchemaProperties(schema map[string]interface{}) map[string]interface{} {
+ if properties, ok := schema["properties"].(map[string]interface{}); ok {
+ return properties
+ }
+ return make(map[string]interface{})
+}
+
+// contains checks if a slice contains a string
+func contains(slice []string, item string) bool {
+ for _, s := range slice {
+ if s == item {
+ return true
+ }
+ }
+ return false
+}
+
+// GetCompatibilityLevel returns the compatibility level for a subject
+func (checker *SchemaEvolutionChecker) GetCompatibilityLevel(subject string) CompatibilityLevel {
+ // In a real implementation, this would query the schema registry
+ // For now, return a default level
+ return CompatibilityBackward
+}
+
+// SetCompatibilityLevel sets the compatibility level for a subject
+func (checker *SchemaEvolutionChecker) SetCompatibilityLevel(subject string, level CompatibilityLevel) error {
+ // In a real implementation, this would update the schema registry
+ return nil
+}
+
+// CanEvolve checks if a schema can be evolved according to the compatibility rules
+func (checker *SchemaEvolutionChecker) CanEvolve(
+ subject string,
+ currentSchemaStr, newSchemaStr string,
+ format Format,
+) (*CompatibilityResult, error) {
+
+ level := checker.GetCompatibilityLevel(subject)
+ return checker.CheckCompatibility(currentSchemaStr, newSchemaStr, format, level)
+}
+
+// SuggestEvolution suggests how to evolve a schema to maintain compatibility
+func (checker *SchemaEvolutionChecker) SuggestEvolution(
+ oldSchemaStr, newSchemaStr string,
+ format Format,
+ level CompatibilityLevel,
+) ([]string, error) {
+
+ suggestions := []string{}
+
+ result, err := checker.CheckCompatibility(oldSchemaStr, newSchemaStr, format, level)
+ if err != nil {
+ return nil, err
+ }
+
+ if result.Compatible {
+ suggestions = append(suggestions, "Schema evolution is compatible")
+ return suggestions, nil
+ }
+
+ // Analyze issues and provide suggestions
+ for _, issue := range result.Issues {
+ if strings.Contains(issue, "required field") && strings.Contains(issue, "added") {
+ suggestions = append(suggestions, "Add default values to new required fields")
+ }
+ if strings.Contains(issue, "removed") {
+ suggestions = append(suggestions, "Consider deprecating fields instead of removing them")
+ }
+ if strings.Contains(issue, "type changed") {
+ suggestions = append(suggestions, "Use type promotion or union types for type changes")
+ }
+ }
+
+ if len(suggestions) == 0 {
+ suggestions = append(suggestions, "Manual schema review required - compatibility issues detected")
+ }
+
+ return suggestions, nil
+}