diff options
Diffstat (limited to 'weed/mq/kafka/schema/evolution.go')
| -rw-r--r-- | weed/mq/kafka/schema/evolution.go | 522 |
1 files changed, 522 insertions, 0 deletions
diff --git a/weed/mq/kafka/schema/evolution.go b/weed/mq/kafka/schema/evolution.go new file mode 100644 index 000000000..73b56fc03 --- /dev/null +++ b/weed/mq/kafka/schema/evolution.go @@ -0,0 +1,522 @@ +package schema + +import ( + "encoding/json" + "fmt" + "strings" + + "github.com/linkedin/goavro/v2" +) + +// CompatibilityLevel defines the schema compatibility level +type CompatibilityLevel string + +const ( + CompatibilityNone CompatibilityLevel = "NONE" + CompatibilityBackward CompatibilityLevel = "BACKWARD" + CompatibilityForward CompatibilityLevel = "FORWARD" + CompatibilityFull CompatibilityLevel = "FULL" +) + +// SchemaEvolutionChecker handles schema compatibility checking and evolution +type SchemaEvolutionChecker struct { + // Cache for parsed schemas to avoid re-parsing + schemaCache map[string]interface{} +} + +// NewSchemaEvolutionChecker creates a new schema evolution checker +func NewSchemaEvolutionChecker() *SchemaEvolutionChecker { + return &SchemaEvolutionChecker{ + schemaCache: make(map[string]interface{}), + } +} + +// CompatibilityResult represents the result of a compatibility check +type CompatibilityResult struct { + Compatible bool + Issues []string + Level CompatibilityLevel +} + +// CheckCompatibility checks if two schemas are compatible according to the specified level +func (checker *SchemaEvolutionChecker) CheckCompatibility( + oldSchemaStr, newSchemaStr string, + format Format, + level CompatibilityLevel, +) (*CompatibilityResult, error) { + + result := &CompatibilityResult{ + Compatible: true, + Issues: []string{}, + Level: level, + } + + if level == CompatibilityNone { + return result, nil + } + + switch format { + case FormatAvro: + return checker.checkAvroCompatibility(oldSchemaStr, newSchemaStr, level) + case FormatProtobuf: + return checker.checkProtobufCompatibility(oldSchemaStr, newSchemaStr, level) + case FormatJSONSchema: + return checker.checkJSONSchemaCompatibility(oldSchemaStr, newSchemaStr, level) + default: + return nil, fmt.Errorf("unsupported schema format for compatibility check: %s", format) + } +} + +// checkAvroCompatibility checks Avro schema compatibility +func (checker *SchemaEvolutionChecker) checkAvroCompatibility( + oldSchemaStr, newSchemaStr string, + level CompatibilityLevel, +) (*CompatibilityResult, error) { + + result := &CompatibilityResult{ + Compatible: true, + Issues: []string{}, + Level: level, + } + + // Parse old schema + oldSchema, err := goavro.NewCodec(oldSchemaStr) + if err != nil { + return nil, fmt.Errorf("failed to parse old Avro schema: %w", err) + } + + // Parse new schema + newSchema, err := goavro.NewCodec(newSchemaStr) + if err != nil { + return nil, fmt.Errorf("failed to parse new Avro schema: %w", err) + } + + // Parse schema structures for detailed analysis + var oldSchemaMap, newSchemaMap map[string]interface{} + if err := json.Unmarshal([]byte(oldSchemaStr), &oldSchemaMap); err != nil { + return nil, fmt.Errorf("failed to parse old schema JSON: %w", err) + } + if err := json.Unmarshal([]byte(newSchemaStr), &newSchemaMap); err != nil { + return nil, fmt.Errorf("failed to parse new schema JSON: %w", err) + } + + // Check compatibility based on level + switch level { + case CompatibilityBackward: + checker.checkAvroBackwardCompatibility(oldSchemaMap, newSchemaMap, result) + case CompatibilityForward: + checker.checkAvroForwardCompatibility(oldSchemaMap, newSchemaMap, result) + case CompatibilityFull: + checker.checkAvroBackwardCompatibility(oldSchemaMap, newSchemaMap, result) + if result.Compatible { + checker.checkAvroForwardCompatibility(oldSchemaMap, newSchemaMap, result) + } + } + + // Additional validation: try to create test data and check if it can be read + if result.Compatible { + if err := checker.validateAvroDataCompatibility(oldSchema, newSchema, level); err != nil { + result.Compatible = false + result.Issues = append(result.Issues, fmt.Sprintf("Data compatibility test failed: %v", err)) + } + } + + return result, nil +} + +// checkAvroBackwardCompatibility checks if new schema can read data written with old schema +func (checker *SchemaEvolutionChecker) checkAvroBackwardCompatibility( + oldSchema, newSchema map[string]interface{}, + result *CompatibilityResult, +) { + // Check if fields were removed without defaults + oldFields := checker.extractAvroFields(oldSchema) + newFields := checker.extractAvroFields(newSchema) + + for fieldName, oldField := range oldFields { + if newField, exists := newFields[fieldName]; !exists { + // Field was removed - this breaks backward compatibility + result.Compatible = false + result.Issues = append(result.Issues, + fmt.Sprintf("Field '%s' was removed, breaking backward compatibility", fieldName)) + } else { + // Field exists, check type compatibility + if !checker.areAvroTypesCompatible(oldField["type"], newField["type"], true) { + result.Compatible = false + result.Issues = append(result.Issues, + fmt.Sprintf("Field '%s' type changed incompatibly", fieldName)) + } + } + } + + // Check if new required fields were added without defaults + for fieldName, newField := range newFields { + if _, exists := oldFields[fieldName]; !exists { + // New field added + if _, hasDefault := newField["default"]; !hasDefault { + result.Compatible = false + result.Issues = append(result.Issues, + fmt.Sprintf("New required field '%s' added without default value", fieldName)) + } + } + } +} + +// checkAvroForwardCompatibility checks if old schema can read data written with new schema +func (checker *SchemaEvolutionChecker) checkAvroForwardCompatibility( + oldSchema, newSchema map[string]interface{}, + result *CompatibilityResult, +) { + // Check if fields were added without defaults in old schema + oldFields := checker.extractAvroFields(oldSchema) + newFields := checker.extractAvroFields(newSchema) + + for fieldName, newField := range newFields { + if _, exists := oldFields[fieldName]; !exists { + // New field added - for forward compatibility, the new field should have a default + // so that old schema can ignore it when reading data written with new schema + if _, hasDefault := newField["default"]; !hasDefault { + result.Compatible = false + result.Issues = append(result.Issues, + fmt.Sprintf("New field '%s' cannot be read by old schema (no default)", fieldName)) + } + } else { + // Field exists, check type compatibility (reverse direction) + oldField := oldFields[fieldName] + if !checker.areAvroTypesCompatible(newField["type"], oldField["type"], false) { + result.Compatible = false + result.Issues = append(result.Issues, + fmt.Sprintf("Field '%s' type change breaks forward compatibility", fieldName)) + } + } + } + + // Check if fields were removed + for fieldName := range oldFields { + if _, exists := newFields[fieldName]; !exists { + result.Compatible = false + result.Issues = append(result.Issues, + fmt.Sprintf("Field '%s' was removed, breaking forward compatibility", fieldName)) + } + } +} + +// extractAvroFields extracts field information from an Avro schema +func (checker *SchemaEvolutionChecker) extractAvroFields(schema map[string]interface{}) map[string]map[string]interface{} { + fields := make(map[string]map[string]interface{}) + + if fieldsArray, ok := schema["fields"].([]interface{}); ok { + for _, fieldInterface := range fieldsArray { + if field, ok := fieldInterface.(map[string]interface{}); ok { + if name, ok := field["name"].(string); ok { + fields[name] = field + } + } + } + } + + return fields +} + +// areAvroTypesCompatible checks if two Avro types are compatible +func (checker *SchemaEvolutionChecker) areAvroTypesCompatible(oldType, newType interface{}, backward bool) bool { + // Simplified type compatibility check + // In a full implementation, this would handle complex types, unions, etc. + + oldTypeStr := fmt.Sprintf("%v", oldType) + newTypeStr := fmt.Sprintf("%v", newType) + + // Same type is always compatible + if oldTypeStr == newTypeStr { + return true + } + + // Check for promotable types (e.g., int -> long, float -> double) + if backward { + return checker.isPromotableType(oldTypeStr, newTypeStr) + } else { + return checker.isPromotableType(newTypeStr, oldTypeStr) + } +} + +// isPromotableType checks if a type can be promoted to another +func (checker *SchemaEvolutionChecker) isPromotableType(from, to string) bool { + promotions := map[string][]string{ + "int": {"long", "float", "double"}, + "long": {"float", "double"}, + "float": {"double"}, + "string": {"bytes"}, + "bytes": {"string"}, + } + + if validPromotions, exists := promotions[from]; exists { + for _, validTo := range validPromotions { + if to == validTo { + return true + } + } + } + + return false +} + +// validateAvroDataCompatibility validates compatibility by testing with actual data +func (checker *SchemaEvolutionChecker) validateAvroDataCompatibility( + oldSchema, newSchema *goavro.Codec, + level CompatibilityLevel, +) error { + // Create test data with old schema + testData := map[string]interface{}{ + "test_field": "test_value", + } + + // Try to encode with old schema + encoded, err := oldSchema.BinaryFromNative(nil, testData) + if err != nil { + // If we can't create test data, skip validation + return nil + } + + // Try to decode with new schema (backward compatibility) + if level == CompatibilityBackward || level == CompatibilityFull { + _, _, err := newSchema.NativeFromBinary(encoded) + if err != nil { + return fmt.Errorf("backward compatibility failed: %w", err) + } + } + + // Try to encode with new schema and decode with old (forward compatibility) + if level == CompatibilityForward || level == CompatibilityFull { + newEncoded, err := newSchema.BinaryFromNative(nil, testData) + if err == nil { + _, _, err = oldSchema.NativeFromBinary(newEncoded) + if err != nil { + return fmt.Errorf("forward compatibility failed: %w", err) + } + } + } + + return nil +} + +// checkProtobufCompatibility checks Protobuf schema compatibility +func (checker *SchemaEvolutionChecker) checkProtobufCompatibility( + oldSchemaStr, newSchemaStr string, + level CompatibilityLevel, +) (*CompatibilityResult, error) { + + result := &CompatibilityResult{ + Compatible: true, + Issues: []string{}, + Level: level, + } + + // For now, implement basic Protobuf compatibility rules + // In a full implementation, this would parse .proto files and check field numbers, types, etc. + + // Basic check: if schemas are identical, they're compatible + if oldSchemaStr == newSchemaStr { + return result, nil + } + + // For protobuf, we need to parse the schema and check: + // - Field numbers haven't changed + // - Required fields haven't been removed + // - Field types are compatible + + // Simplified implementation - mark as compatible with warning + result.Issues = append(result.Issues, "Protobuf compatibility checking is simplified - manual review recommended") + + return result, nil +} + +// checkJSONSchemaCompatibility checks JSON Schema compatibility +func (checker *SchemaEvolutionChecker) checkJSONSchemaCompatibility( + oldSchemaStr, newSchemaStr string, + level CompatibilityLevel, +) (*CompatibilityResult, error) { + + result := &CompatibilityResult{ + Compatible: true, + Issues: []string{}, + Level: level, + } + + // Parse JSON schemas + var oldSchema, newSchema map[string]interface{} + if err := json.Unmarshal([]byte(oldSchemaStr), &oldSchema); err != nil { + return nil, fmt.Errorf("failed to parse old JSON schema: %w", err) + } + if err := json.Unmarshal([]byte(newSchemaStr), &newSchema); err != nil { + return nil, fmt.Errorf("failed to parse new JSON schema: %w", err) + } + + // Check compatibility based on level + switch level { + case CompatibilityBackward: + checker.checkJSONSchemaBackwardCompatibility(oldSchema, newSchema, result) + case CompatibilityForward: + checker.checkJSONSchemaForwardCompatibility(oldSchema, newSchema, result) + case CompatibilityFull: + checker.checkJSONSchemaBackwardCompatibility(oldSchema, newSchema, result) + if result.Compatible { + checker.checkJSONSchemaForwardCompatibility(oldSchema, newSchema, result) + } + } + + return result, nil +} + +// checkJSONSchemaBackwardCompatibility checks JSON Schema backward compatibility +func (checker *SchemaEvolutionChecker) checkJSONSchemaBackwardCompatibility( + oldSchema, newSchema map[string]interface{}, + result *CompatibilityResult, +) { + // Check if required fields were added + oldRequired := checker.extractJSONSchemaRequired(oldSchema) + newRequired := checker.extractJSONSchemaRequired(newSchema) + + for _, field := range newRequired { + if !contains(oldRequired, field) { + result.Compatible = false + result.Issues = append(result.Issues, + fmt.Sprintf("New required field '%s' breaks backward compatibility", field)) + } + } + + // Check if properties were removed + oldProperties := checker.extractJSONSchemaProperties(oldSchema) + newProperties := checker.extractJSONSchemaProperties(newSchema) + + for propName := range oldProperties { + if _, exists := newProperties[propName]; !exists { + result.Compatible = false + result.Issues = append(result.Issues, + fmt.Sprintf("Property '%s' was removed, breaking backward compatibility", propName)) + } + } +} + +// checkJSONSchemaForwardCompatibility checks JSON Schema forward compatibility +func (checker *SchemaEvolutionChecker) checkJSONSchemaForwardCompatibility( + oldSchema, newSchema map[string]interface{}, + result *CompatibilityResult, +) { + // Check if required fields were removed + oldRequired := checker.extractJSONSchemaRequired(oldSchema) + newRequired := checker.extractJSONSchemaRequired(newSchema) + + for _, field := range oldRequired { + if !contains(newRequired, field) { + result.Compatible = false + result.Issues = append(result.Issues, + fmt.Sprintf("Required field '%s' was removed, breaking forward compatibility", field)) + } + } + + // Check if properties were added + oldProperties := checker.extractJSONSchemaProperties(oldSchema) + newProperties := checker.extractJSONSchemaProperties(newSchema) + + for propName := range newProperties { + if _, exists := oldProperties[propName]; !exists { + result.Issues = append(result.Issues, + fmt.Sprintf("New property '%s' added - ensure old schema can handle it", propName)) + } + } +} + +// extractJSONSchemaRequired extracts required fields from JSON Schema +func (checker *SchemaEvolutionChecker) extractJSONSchemaRequired(schema map[string]interface{}) []string { + if required, ok := schema["required"].([]interface{}); ok { + var fields []string + for _, field := range required { + if fieldStr, ok := field.(string); ok { + fields = append(fields, fieldStr) + } + } + return fields + } + return []string{} +} + +// extractJSONSchemaProperties extracts properties from JSON Schema +func (checker *SchemaEvolutionChecker) extractJSONSchemaProperties(schema map[string]interface{}) map[string]interface{} { + if properties, ok := schema["properties"].(map[string]interface{}); ok { + return properties + } + return make(map[string]interface{}) +} + +// contains checks if a slice contains a string +func contains(slice []string, item string) bool { + for _, s := range slice { + if s == item { + return true + } + } + return false +} + +// GetCompatibilityLevel returns the compatibility level for a subject +func (checker *SchemaEvolutionChecker) GetCompatibilityLevel(subject string) CompatibilityLevel { + // In a real implementation, this would query the schema registry + // For now, return a default level + return CompatibilityBackward +} + +// SetCompatibilityLevel sets the compatibility level for a subject +func (checker *SchemaEvolutionChecker) SetCompatibilityLevel(subject string, level CompatibilityLevel) error { + // In a real implementation, this would update the schema registry + return nil +} + +// CanEvolve checks if a schema can be evolved according to the compatibility rules +func (checker *SchemaEvolutionChecker) CanEvolve( + subject string, + currentSchemaStr, newSchemaStr string, + format Format, +) (*CompatibilityResult, error) { + + level := checker.GetCompatibilityLevel(subject) + return checker.CheckCompatibility(currentSchemaStr, newSchemaStr, format, level) +} + +// SuggestEvolution suggests how to evolve a schema to maintain compatibility +func (checker *SchemaEvolutionChecker) SuggestEvolution( + oldSchemaStr, newSchemaStr string, + format Format, + level CompatibilityLevel, +) ([]string, error) { + + suggestions := []string{} + + result, err := checker.CheckCompatibility(oldSchemaStr, newSchemaStr, format, level) + if err != nil { + return nil, err + } + + if result.Compatible { + suggestions = append(suggestions, "Schema evolution is compatible") + return suggestions, nil + } + + // Analyze issues and provide suggestions + for _, issue := range result.Issues { + if strings.Contains(issue, "required field") && strings.Contains(issue, "added") { + suggestions = append(suggestions, "Add default values to new required fields") + } + if strings.Contains(issue, "removed") { + suggestions = append(suggestions, "Consider deprecating fields instead of removing them") + } + if strings.Contains(issue, "type changed") { + suggestions = append(suggestions, "Use type promotion or union types for type changes") + } + } + + if len(suggestions) == 0 { + suggestions = append(suggestions, "Manual schema review required - compatibility issues detected") + } + + return suggestions, nil +} |
