1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
|
package engine
import (
"testing"
"github.com/seaweedfs/seaweedfs/weed/mq/schema"
)
// TestConvertMQSchemaToTableInfo_NoSchema tests that topics without schemas
// get a default schema with system fields and _value field
func TestConvertMQSchemaToTableInfo_NoSchema(t *testing.T) {
catalog := NewSchemaCatalog("localhost:9333")
tests := []struct {
name string
mqSchema *schema.Schema
expectError bool
checkFields func(*testing.T, *TableInfo)
}{
{
name: "nil schema",
mqSchema: nil,
expectError: false,
checkFields: func(t *testing.T, info *TableInfo) {
if info.Schema != nil {
t.Error("Expected Schema to be nil for topics without schema")
}
if len(info.Columns) != 4 {
t.Errorf("Expected 4 columns, got %d", len(info.Columns))
}
expectedCols := map[string]string{
"_ts": "TIMESTAMP",
"_key": "VARBINARY",
"_source": "VARCHAR(255)",
"_value": "VARBINARY",
}
for _, col := range info.Columns {
expectedType, ok := expectedCols[col.Name]
if !ok {
t.Errorf("Unexpected column: %s", col.Name)
continue
}
if col.Type != expectedType {
t.Errorf("Column %s: expected type %s, got %s", col.Name, expectedType, col.Type)
}
}
},
},
{
name: "schema with nil RecordType",
mqSchema: &schema.Schema{
RecordType: nil,
RevisionId: 1,
},
expectError: false,
checkFields: func(t *testing.T, info *TableInfo) {
if info.Schema != nil {
t.Error("Expected Schema to be nil for topics without RecordType")
}
if len(info.Columns) != 4 {
t.Errorf("Expected 4 columns, got %d", len(info.Columns))
}
},
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
tableInfo, err := catalog.convertMQSchemaToTableInfo("test_namespace", "test_topic", tt.mqSchema)
if tt.expectError {
if err == nil {
t.Error("Expected error but got none")
}
return
}
if err != nil {
t.Errorf("Unexpected error: %v", err)
return
}
if tableInfo == nil {
t.Error("Expected tableInfo but got nil")
return
}
if tt.checkFields != nil {
tt.checkFields(t, tableInfo)
}
// Basic checks
if tableInfo.Name != "test_topic" {
t.Errorf("Expected Name 'test_topic', got '%s'", tableInfo.Name)
}
if tableInfo.Namespace != "test_namespace" {
t.Errorf("Expected Namespace 'test_namespace', got '%s'", tableInfo.Namespace)
}
})
}
}
|