aboutsummaryrefslogtreecommitdiff
path: root/weed/mq/schema/flat_schema_utils_test.go
diff options
context:
space:
mode:
Diffstat (limited to 'weed/mq/schema/flat_schema_utils_test.go')
-rw-r--r--weed/mq/schema/flat_schema_utils_test.go265
1 files changed, 265 insertions, 0 deletions
diff --git a/weed/mq/schema/flat_schema_utils_test.go b/weed/mq/schema/flat_schema_utils_test.go
new file mode 100644
index 000000000..2bce9014c
--- /dev/null
+++ b/weed/mq/schema/flat_schema_utils_test.go
@@ -0,0 +1,265 @@
+package schema
+
+import (
+ "reflect"
+ "testing"
+
+ "github.com/seaweedfs/seaweedfs/weed/pb/schema_pb"
+)
+
+func TestSplitFlatSchemaToKeyValue(t *testing.T) {
+ // Create a test flat schema
+ flatSchema := &schema_pb.RecordType{
+ Fields: []*schema_pb.Field{
+ {
+ Name: "user_id",
+ FieldIndex: 0,
+ Type: &schema_pb.Type{Kind: &schema_pb.Type_ScalarType{ScalarType: schema_pb.ScalarType_INT64}},
+ },
+ {
+ Name: "session_id",
+ FieldIndex: 1,
+ Type: &schema_pb.Type{Kind: &schema_pb.Type_ScalarType{ScalarType: schema_pb.ScalarType_STRING}},
+ },
+ {
+ Name: "event_type",
+ FieldIndex: 2,
+ Type: &schema_pb.Type{Kind: &schema_pb.Type_ScalarType{ScalarType: schema_pb.ScalarType_STRING}},
+ },
+ {
+ Name: "timestamp",
+ FieldIndex: 3,
+ Type: &schema_pb.Type{Kind: &schema_pb.Type_ScalarType{ScalarType: schema_pb.ScalarType_INT64}},
+ },
+ },
+ }
+
+ keyColumns := []string{"user_id", "session_id"}
+
+ keySchema, valueSchema, err := SplitFlatSchemaToKeyValue(flatSchema, keyColumns)
+ if err != nil {
+ t.Fatalf("SplitFlatSchemaToKeyValue failed: %v", err)
+ }
+
+ // Verify key schema
+ if keySchema == nil {
+ t.Fatal("Expected key schema, got nil")
+ }
+ if len(keySchema.Fields) != 2 {
+ t.Errorf("Expected 2 key fields, got %d", len(keySchema.Fields))
+ }
+ if keySchema.Fields[0].Name != "user_id" || keySchema.Fields[1].Name != "session_id" {
+ t.Errorf("Key field names incorrect: %v", []string{keySchema.Fields[0].Name, keySchema.Fields[1].Name})
+ }
+
+ // Verify value schema
+ if valueSchema == nil {
+ t.Fatal("Expected value schema, got nil")
+ }
+ if len(valueSchema.Fields) != 2 {
+ t.Errorf("Expected 2 value fields, got %d", len(valueSchema.Fields))
+ }
+ if valueSchema.Fields[0].Name != "event_type" || valueSchema.Fields[1].Name != "timestamp" {
+ t.Errorf("Value field names incorrect: %v", []string{valueSchema.Fields[0].Name, valueSchema.Fields[1].Name})
+ }
+
+ // Verify field indices are reindexed
+ for i, field := range keySchema.Fields {
+ if field.FieldIndex != int32(i) {
+ t.Errorf("Key field %s has incorrect index %d, expected %d", field.Name, field.FieldIndex, i)
+ }
+ }
+ for i, field := range valueSchema.Fields {
+ if field.FieldIndex != int32(i) {
+ t.Errorf("Value field %s has incorrect index %d, expected %d", field.Name, field.FieldIndex, i)
+ }
+ }
+}
+
+func TestSplitFlatSchemaToKeyValueMissingColumns(t *testing.T) {
+ flatSchema := &schema_pb.RecordType{
+ Fields: []*schema_pb.Field{
+ {Name: "field1", Type: &schema_pb.Type{Kind: &schema_pb.Type_ScalarType{ScalarType: schema_pb.ScalarType_STRING}}},
+ },
+ }
+
+ keyColumns := []string{"field1", "missing_field"}
+
+ _, _, err := SplitFlatSchemaToKeyValue(flatSchema, keyColumns)
+ if err == nil {
+ t.Error("Expected error for missing key column, got nil")
+ }
+ if !contains(err.Error(), "missing_field") {
+ t.Errorf("Error should mention missing_field: %v", err)
+ }
+}
+
+func TestCombineFlatSchemaFromKeyValue(t *testing.T) {
+ keySchema := &schema_pb.RecordType{
+ Fields: []*schema_pb.Field{
+ {
+ Name: "user_id",
+ FieldIndex: 0,
+ Type: &schema_pb.Type{Kind: &schema_pb.Type_ScalarType{ScalarType: schema_pb.ScalarType_INT64}},
+ },
+ {
+ Name: "session_id",
+ FieldIndex: 1,
+ Type: &schema_pb.Type{Kind: &schema_pb.Type_ScalarType{ScalarType: schema_pb.ScalarType_STRING}},
+ },
+ },
+ }
+
+ valueSchema := &schema_pb.RecordType{
+ Fields: []*schema_pb.Field{
+ {
+ Name: "event_type",
+ FieldIndex: 0,
+ Type: &schema_pb.Type{Kind: &schema_pb.Type_ScalarType{ScalarType: schema_pb.ScalarType_STRING}},
+ },
+ {
+ Name: "timestamp",
+ FieldIndex: 1,
+ Type: &schema_pb.Type{Kind: &schema_pb.Type_ScalarType{ScalarType: schema_pb.ScalarType_INT64}},
+ },
+ },
+ }
+
+ flatSchema, keyColumns := CombineFlatSchemaFromKeyValue(keySchema, valueSchema)
+
+ // Verify combined schema
+ if flatSchema == nil {
+ t.Fatal("Expected flat schema, got nil")
+ }
+ if len(flatSchema.Fields) != 4 {
+ t.Errorf("Expected 4 fields, got %d", len(flatSchema.Fields))
+ }
+
+ // Verify key columns
+ expectedKeyColumns := []string{"user_id", "session_id"}
+ if !reflect.DeepEqual(keyColumns, expectedKeyColumns) {
+ t.Errorf("Expected key columns %v, got %v", expectedKeyColumns, keyColumns)
+ }
+
+ // Verify field order (key fields first)
+ expectedNames := []string{"user_id", "session_id", "event_type", "timestamp"}
+ actualNames := make([]string, len(flatSchema.Fields))
+ for i, field := range flatSchema.Fields {
+ actualNames[i] = field.Name
+ }
+ if !reflect.DeepEqual(actualNames, expectedNames) {
+ t.Errorf("Expected field names %v, got %v", expectedNames, actualNames)
+ }
+
+ // Verify field indices are sequential
+ for i, field := range flatSchema.Fields {
+ if field.FieldIndex != int32(i) {
+ t.Errorf("Field %s has incorrect index %d, expected %d", field.Name, field.FieldIndex, i)
+ }
+ }
+}
+
+func TestExtractKeyColumnsFromCombinedSchema(t *testing.T) {
+ // Create a combined schema with key_ prefixes (as created by CreateCombinedRecordType)
+ combinedSchema := &schema_pb.RecordType{
+ Fields: []*schema_pb.Field{
+ {
+ Name: "key_user_id",
+ FieldIndex: 0,
+ Type: &schema_pb.Type{Kind: &schema_pb.Type_ScalarType{ScalarType: schema_pb.ScalarType_INT64}},
+ },
+ {
+ Name: "key_session_id",
+ FieldIndex: 1,
+ Type: &schema_pb.Type{Kind: &schema_pb.Type_ScalarType{ScalarType: schema_pb.ScalarType_STRING}},
+ },
+ {
+ Name: "event_type",
+ FieldIndex: 2,
+ Type: &schema_pb.Type{Kind: &schema_pb.Type_ScalarType{ScalarType: schema_pb.ScalarType_STRING}},
+ },
+ {
+ Name: "timestamp",
+ FieldIndex: 3,
+ Type: &schema_pb.Type{Kind: &schema_pb.Type_ScalarType{ScalarType: schema_pb.ScalarType_INT64}},
+ },
+ },
+ }
+
+ flatSchema, keyColumns := ExtractKeyColumnsFromCombinedSchema(combinedSchema)
+
+ // Verify flat schema
+ if flatSchema == nil {
+ t.Fatal("Expected flat schema, got nil")
+ }
+ if len(flatSchema.Fields) != 4 {
+ t.Errorf("Expected 4 fields, got %d", len(flatSchema.Fields))
+ }
+
+ // Verify key columns (should be sorted)
+ expectedKeyColumns := []string{"session_id", "user_id"}
+ if !reflect.DeepEqual(keyColumns, expectedKeyColumns) {
+ t.Errorf("Expected key columns %v, got %v", expectedKeyColumns, keyColumns)
+ }
+
+ // Verify field names (key_ prefixes removed)
+ expectedNames := []string{"user_id", "session_id", "event_type", "timestamp"}
+ actualNames := make([]string, len(flatSchema.Fields))
+ for i, field := range flatSchema.Fields {
+ actualNames[i] = field.Name
+ }
+ if !reflect.DeepEqual(actualNames, expectedNames) {
+ t.Errorf("Expected field names %v, got %v", expectedNames, actualNames)
+ }
+}
+
+func TestValidateKeyColumns(t *testing.T) {
+ schema := &schema_pb.RecordType{
+ Fields: []*schema_pb.Field{
+ {Name: "field1", Type: &schema_pb.Type{Kind: &schema_pb.Type_ScalarType{ScalarType: schema_pb.ScalarType_STRING}}},
+ {Name: "field2", Type: &schema_pb.Type{Kind: &schema_pb.Type_ScalarType{ScalarType: schema_pb.ScalarType_INT64}}},
+ },
+ }
+
+ // Valid key columns
+ err := ValidateKeyColumns(schema, []string{"field1"})
+ if err != nil {
+ t.Errorf("Expected no error for valid key columns, got: %v", err)
+ }
+
+ // Invalid key columns
+ err = ValidateKeyColumns(schema, []string{"field1", "missing_field"})
+ if err == nil {
+ t.Error("Expected error for invalid key columns, got nil")
+ }
+
+ // Nil schema should not error
+ err = ValidateKeyColumns(nil, []string{"any_field"})
+ if err != nil {
+ t.Errorf("Expected no error for nil schema, got: %v", err)
+ }
+
+ // Empty key columns should not error
+ err = ValidateKeyColumns(schema, []string{})
+ if err != nil {
+ t.Errorf("Expected no error for empty key columns, got: %v", err)
+ }
+}
+
+// Helper function to check if string contains substring
+func contains(str, substr string) bool {
+ return len(str) >= len(substr) &&
+ (len(substr) == 0 || str[len(str)-len(substr):] == substr ||
+ str[:len(substr)] == substr ||
+ len(str) > len(substr) && (str[len(str)-len(substr)-1:len(str)-len(substr)] == " " || str[len(str)-len(substr)-1] == ' ') && str[len(str)-len(substr):] == substr ||
+ findInString(str, substr))
+}
+
+func findInString(str, substr string) bool {
+ for i := 0; i <= len(str)-len(substr); i++ {
+ if str[i:i+len(substr)] == substr {
+ return true
+ }
+ }
+ return false
+}