1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
|
package engine
import (
"context"
"testing"
"github.com/seaweedfs/seaweedfs/weed/pb/schema_pb"
)
// TestSchemaAwareParsing tests the schema-aware message parsing functionality
func TestSchemaAwareParsing(t *testing.T) {
// Create a mock HybridMessageScanner with schema
recordSchema := &schema_pb.RecordType{
Fields: []*schema_pb.Field{
{
Name: "user_id",
Type: &schema_pb.Type{Kind: &schema_pb.Type_ScalarType{ScalarType: schema_pb.ScalarType_INT32}},
},
{
Name: "event_type",
Type: &schema_pb.Type{Kind: &schema_pb.Type_ScalarType{ScalarType: schema_pb.ScalarType_STRING}},
},
{
Name: "cpu_usage",
Type: &schema_pb.Type{Kind: &schema_pb.Type_ScalarType{ScalarType: schema_pb.ScalarType_DOUBLE}},
},
{
Name: "is_active",
Type: &schema_pb.Type{Kind: &schema_pb.Type_ScalarType{ScalarType: schema_pb.ScalarType_BOOL}},
},
},
}
scanner := &HybridMessageScanner{
recordSchema: recordSchema,
}
t.Run("JSON Message Parsing", func(t *testing.T) {
jsonData := []byte(`{"user_id": 1234, "event_type": "login", "cpu_usage": 75.5, "is_active": true}`)
result, err := scanner.parseJSONMessage(jsonData)
if err != nil {
t.Fatalf("Failed to parse JSON message: %v", err)
}
// Verify user_id as int32
if userIdVal := result.Fields["user_id"]; userIdVal == nil {
t.Error("user_id field missing")
} else if userIdVal.GetInt32Value() != 1234 {
t.Errorf("Expected user_id=1234, got %v", userIdVal.GetInt32Value())
}
// Verify event_type as string
if eventTypeVal := result.Fields["event_type"]; eventTypeVal == nil {
t.Error("event_type field missing")
} else if eventTypeVal.GetStringValue() != "login" {
t.Errorf("Expected event_type='login', got %v", eventTypeVal.GetStringValue())
}
// Verify cpu_usage as double
if cpuVal := result.Fields["cpu_usage"]; cpuVal == nil {
t.Error("cpu_usage field missing")
} else if cpuVal.GetDoubleValue() != 75.5 {
t.Errorf("Expected cpu_usage=75.5, got %v", cpuVal.GetDoubleValue())
}
// Verify is_active as bool
if isActiveVal := result.Fields["is_active"]; isActiveVal == nil {
t.Error("is_active field missing")
} else if !isActiveVal.GetBoolValue() {
t.Errorf("Expected is_active=true, got %v", isActiveVal.GetBoolValue())
}
t.Logf("JSON parsing correctly converted types: int32=%d, string='%s', double=%.1f, bool=%v",
result.Fields["user_id"].GetInt32Value(),
result.Fields["event_type"].GetStringValue(),
result.Fields["cpu_usage"].GetDoubleValue(),
result.Fields["is_active"].GetBoolValue())
})
t.Run("Raw Data Type Conversion", func(t *testing.T) {
// Test string conversion
stringType := &schema_pb.Type{Kind: &schema_pb.Type_ScalarType{ScalarType: schema_pb.ScalarType_STRING}}
stringVal, err := scanner.convertRawDataToSchemaValue([]byte("hello world"), stringType)
if err != nil {
t.Errorf("Failed to convert string: %v", err)
} else if stringVal.GetStringValue() != "hello world" {
t.Errorf("String conversion failed: got %v", stringVal.GetStringValue())
}
// Test int32 conversion
int32Type := &schema_pb.Type{Kind: &schema_pb.Type_ScalarType{ScalarType: schema_pb.ScalarType_INT32}}
int32Val, err := scanner.convertRawDataToSchemaValue([]byte("42"), int32Type)
if err != nil {
t.Errorf("Failed to convert int32: %v", err)
} else if int32Val.GetInt32Value() != 42 {
t.Errorf("Int32 conversion failed: got %v", int32Val.GetInt32Value())
}
// Test double conversion
doubleType := &schema_pb.Type{Kind: &schema_pb.Type_ScalarType{ScalarType: schema_pb.ScalarType_DOUBLE}}
doubleVal, err := scanner.convertRawDataToSchemaValue([]byte("3.14159"), doubleType)
if err != nil {
t.Errorf("Failed to convert double: %v", err)
} else if doubleVal.GetDoubleValue() != 3.14159 {
t.Errorf("Double conversion failed: got %v", doubleVal.GetDoubleValue())
}
// Test bool conversion
boolType := &schema_pb.Type{Kind: &schema_pb.Type_ScalarType{ScalarType: schema_pb.ScalarType_BOOL}}
boolVal, err := scanner.convertRawDataToSchemaValue([]byte("true"), boolType)
if err != nil {
t.Errorf("Failed to convert bool: %v", err)
} else if !boolVal.GetBoolValue() {
t.Errorf("Bool conversion failed: got %v", boolVal.GetBoolValue())
}
t.Log("Raw data type conversions working correctly")
})
t.Run("Invalid JSON Graceful Handling", func(t *testing.T) {
invalidJSON := []byte(`{"user_id": 1234, "malformed": }`)
_, err := scanner.parseJSONMessage(invalidJSON)
if err == nil {
t.Error("Expected error for invalid JSON, but got none")
}
t.Log("Invalid JSON handled gracefully with error")
})
}
// TestSchemaAwareParsingIntegration tests the full integration with SQL engine
func TestSchemaAwareParsingIntegration(t *testing.T) {
engine := NewTestSQLEngine()
// Test that the enhanced schema-aware parsing doesn't break existing functionality
result, err := engine.ExecuteSQL(context.Background(), "SELECT *, _source FROM user_events LIMIT 2")
if err != nil {
t.Fatalf("Schema-aware parsing broke basic SELECT: %v", err)
}
if len(result.Rows) == 0 {
t.Error("No rows returned - schema parsing may have issues")
}
// Check that _source column is still present (hybrid functionality)
foundSourceColumn := false
for _, col := range result.Columns {
if col == "_source" {
foundSourceColumn = true
break
}
}
if !foundSourceColumn {
t.Log("_source column missing - running in fallback mode without real cluster")
}
t.Log("Schema-aware parsing integrates correctly with SQL engine")
}
|