aboutsummaryrefslogtreecommitdiff
path: root/weed/query
diff options
context:
space:
mode:
Diffstat (limited to 'weed/query')
-rw-r--r--weed/query/engine/alias_timestamp_integration_test.go48
-rw-r--r--weed/query/engine/broker_client.go115
-rw-r--r--weed/query/engine/catalog.go40
-rw-r--r--weed/query/engine/catalog_no_schema_test.go101
-rw-r--r--weed/query/engine/cockroach_parser_success_test.go6
-rw-r--r--weed/query/engine/complete_sql_fixes_test.go78
-rw-r--r--weed/query/engine/describe.go69
-rw-r--r--weed/query/engine/engine.go327
-rw-r--r--weed/query/engine/engine_test.go4
-rw-r--r--weed/query/engine/fast_path_predicate_validation_test.go10
-rw-r--r--weed/query/engine/hybrid_message_scanner.go339
-rw-r--r--weed/query/engine/mock_test.go9
-rw-r--r--weed/query/engine/mocks_test.go69
-rw-r--r--weed/query/engine/parquet_scanner.go31
-rw-r--r--weed/query/engine/parsing_debug_test.go6
-rw-r--r--weed/query/engine/postgresql_only_test.go4
-rw-r--r--weed/query/engine/sql_alias_support_test.go88
-rw-r--r--weed/query/engine/sql_feature_diagnostic_test.go20
-rw-r--r--weed/query/engine/string_concatenation_test.go2
-rw-r--r--weed/query/engine/string_literal_function_test.go4
-rw-r--r--weed/query/engine/system_columns.go15
-rw-r--r--weed/query/engine/timestamp_integration_test.go32
-rw-r--r--weed/query/engine/timestamp_query_fixes_test.go72
-rw-r--r--weed/query/engine/where_clause_debug_test.go20
-rw-r--r--weed/query/engine/where_validation_test.go24
25 files changed, 1024 insertions, 509 deletions
diff --git a/weed/query/engine/alias_timestamp_integration_test.go b/weed/query/engine/alias_timestamp_integration_test.go
index eca8161db..d175d4cf5 100644
--- a/weed/query/engine/alias_timestamp_integration_test.go
+++ b/weed/query/engine/alias_timestamp_integration_test.go
@@ -25,13 +25,13 @@ func TestAliasTimestampIntegration(t *testing.T) {
// Create test record
testRecord := &schema_pb.RecordValue{
Fields: map[string]*schema_pb.Value{
- "_timestamp_ns": {Kind: &schema_pb.Value_Int64Value{Int64Value: timestamp}},
- "id": {Kind: &schema_pb.Value_Int64Value{Int64Value: int64(1000 + i)}},
+ "_ts_ns": {Kind: &schema_pb.Value_Int64Value{Int64Value: timestamp}},
+ "id": {Kind: &schema_pb.Value_Int64Value{Int64Value: int64(1000 + i)}},
},
}
// Test equality with alias (this was the originally failing pattern)
- sql := "SELECT _timestamp_ns AS ts, id FROM test WHERE ts = " + strconv.FormatInt(timestamp, 10)
+ sql := "SELECT _ts_ns AS ts, id FROM test WHERE ts = " + strconv.FormatInt(timestamp, 10)
stmt, err := ParseSQL(sql)
assert.NoError(t, err, "Should parse alias equality query for timestamp %d", timestamp)
@@ -43,7 +43,7 @@ func TestAliasTimestampIntegration(t *testing.T) {
assert.True(t, result, "Should match exact large timestamp using alias")
// Test precision - off by 1 nanosecond should not match
- sqlOffBy1 := "SELECT _timestamp_ns AS ts, id FROM test WHERE ts = " + strconv.FormatInt(timestamp+1, 10)
+ sqlOffBy1 := "SELECT _ts_ns AS ts, id FROM test WHERE ts = " + strconv.FormatInt(timestamp+1, 10)
stmt2, err := ParseSQL(sqlOffBy1)
assert.NoError(t, err)
selectStmt2 := stmt2.(*SelectStatement)
@@ -62,23 +62,23 @@ func TestAliasTimestampIntegration(t *testing.T) {
testRecords := []*schema_pb.RecordValue{
{
Fields: map[string]*schema_pb.Value{
- "_timestamp_ns": {Kind: &schema_pb.Value_Int64Value{Int64Value: timestamp - 2}}, // Before range
+ "_ts_ns": {Kind: &schema_pb.Value_Int64Value{Int64Value: timestamp - 2}}, // Before range
},
},
{
Fields: map[string]*schema_pb.Value{
- "_timestamp_ns": {Kind: &schema_pb.Value_Int64Value{Int64Value: timestamp}}, // In range
+ "_ts_ns": {Kind: &schema_pb.Value_Int64Value{Int64Value: timestamp}}, // In range
},
},
{
Fields: map[string]*schema_pb.Value{
- "_timestamp_ns": {Kind: &schema_pb.Value_Int64Value{Int64Value: timestamp + 2}}, // After range
+ "_ts_ns": {Kind: &schema_pb.Value_Int64Value{Int64Value: timestamp + 2}}, // After range
},
},
}
// Test range query with alias
- sql := "SELECT _timestamp_ns AS ts FROM test WHERE ts >= " +
+ sql := "SELECT _ts_ns AS ts FROM test WHERE ts >= " +
strconv.FormatInt(timestamp-1, 10) + " AND ts <= " +
strconv.FormatInt(timestamp+1, 10)
stmt, err := ParseSQL(sql)
@@ -99,12 +99,12 @@ func TestAliasTimestampIntegration(t *testing.T) {
maxInt64 := int64(9223372036854775807)
testRecord := &schema_pb.RecordValue{
Fields: map[string]*schema_pb.Value{
- "_timestamp_ns": {Kind: &schema_pb.Value_Int64Value{Int64Value: maxInt64}},
+ "_ts_ns": {Kind: &schema_pb.Value_Int64Value{Int64Value: maxInt64}},
},
}
// Test with alias
- sql := "SELECT _timestamp_ns AS ts FROM test WHERE ts = " + strconv.FormatInt(maxInt64, 10)
+ sql := "SELECT _ts_ns AS ts FROM test WHERE ts = " + strconv.FormatInt(maxInt64, 10)
stmt, err := ParseSQL(sql)
assert.NoError(t, err, "Should parse max int64 with alias")
@@ -119,11 +119,11 @@ func TestAliasTimestampIntegration(t *testing.T) {
minInt64 := int64(-9223372036854775808)
testRecord2 := &schema_pb.RecordValue{
Fields: map[string]*schema_pb.Value{
- "_timestamp_ns": {Kind: &schema_pb.Value_Int64Value{Int64Value: minInt64}},
+ "_ts_ns": {Kind: &schema_pb.Value_Int64Value{Int64Value: minInt64}},
},
}
- sql2 := "SELECT _timestamp_ns AS ts FROM test WHERE ts = " + strconv.FormatInt(minInt64, 10)
+ sql2 := "SELECT _ts_ns AS ts FROM test WHERE ts = " + strconv.FormatInt(minInt64, 10)
stmt2, err := ParseSQL(sql2)
assert.NoError(t, err)
selectStmt2 := stmt2.(*SelectStatement)
@@ -141,14 +141,14 @@ func TestAliasTimestampIntegration(t *testing.T) {
testRecord := &schema_pb.RecordValue{
Fields: map[string]*schema_pb.Value{
- "_timestamp_ns": {Kind: &schema_pb.Value_Int64Value{Int64Value: timestamp1}},
- "created_at": {Kind: &schema_pb.Value_Int64Value{Int64Value: timestamp2}},
- "id": {Kind: &schema_pb.Value_Int64Value{Int64Value: 12345}},
+ "_ts_ns": {Kind: &schema_pb.Value_Int64Value{Int64Value: timestamp1}},
+ "created_at": {Kind: &schema_pb.Value_Int64Value{Int64Value: timestamp2}},
+ "id": {Kind: &schema_pb.Value_Int64Value{Int64Value: 12345}},
},
}
// Use multiple timestamp aliases in WHERE
- sql := "SELECT _timestamp_ns AS event_time, created_at AS created_time, id AS record_id FROM test " +
+ sql := "SELECT _ts_ns AS event_time, created_at AS created_time, id AS record_id FROM test " +
"WHERE event_time = " + strconv.FormatInt(timestamp1, 10) +
" AND created_time = " + strconv.FormatInt(timestamp2, 10) +
" AND record_id = 12345"
@@ -190,11 +190,11 @@ func TestAliasTimestampIntegration(t *testing.T) {
t.Run(op.sql, func(t *testing.T) {
testRecord := &schema_pb.RecordValue{
Fields: map[string]*schema_pb.Value{
- "_timestamp_ns": {Kind: &schema_pb.Value_Int64Value{Int64Value: op.value}},
+ "_ts_ns": {Kind: &schema_pb.Value_Int64Value{Int64Value: op.value}},
},
}
- sql := "SELECT _timestamp_ns AS ts FROM test WHERE " + op.sql
+ sql := "SELECT _ts_ns AS ts FROM test WHERE " + op.sql
stmt, err := ParseSQL(sql)
assert.NoError(t, err, "Should parse: %s", op.sql)
@@ -212,12 +212,12 @@ func TestAliasTimestampIntegration(t *testing.T) {
// Reproduce the exact production scenario that was originally failing
// This was the original failing pattern from the user
- originalFailingSQL := "select id, _timestamp_ns as ts from ecommerce.user_events where ts = 1756913789829292386"
+ originalFailingSQL := "select id, _ts_ns as ts from ecommerce.user_events where ts = 1756913789829292386"
testRecord := &schema_pb.RecordValue{
Fields: map[string]*schema_pb.Value{
- "_timestamp_ns": {Kind: &schema_pb.Value_Int64Value{Int64Value: 1756913789829292386}},
- "id": {Kind: &schema_pb.Value_Int64Value{Int64Value: 82460}},
+ "_ts_ns": {Kind: &schema_pb.Value_Int64Value{Int64Value: 1756913789829292386}},
+ "id": {Kind: &schema_pb.Value_Int64Value{Int64Value: 82460}},
},
}
@@ -232,11 +232,11 @@ func TestAliasTimestampIntegration(t *testing.T) {
assert.True(t, result, "The originally failing production query should now work perfectly")
// Also test the other originally failing timestamp
- originalFailingSQL2 := "select id, _timestamp_ns as ts from ecommerce.user_events where ts = 1756947416566456262"
+ originalFailingSQL2 := "select id, _ts_ns as ts from ecommerce.user_events where ts = 1756947416566456262"
testRecord2 := &schema_pb.RecordValue{
Fields: map[string]*schema_pb.Value{
- "_timestamp_ns": {Kind: &schema_pb.Value_Int64Value{Int64Value: 1756947416566456262}},
- "id": {Kind: &schema_pb.Value_Int64Value{Int64Value: 897795}},
+ "_ts_ns": {Kind: &schema_pb.Value_Int64Value{Int64Value: 1756947416566456262}},
+ "id": {Kind: &schema_pb.Value_Int64Value{Int64Value: 897795}},
},
}
diff --git a/weed/query/engine/broker_client.go b/weed/query/engine/broker_client.go
index 9b5f9819c..3e6517678 100644
--- a/weed/query/engine/broker_client.go
+++ b/weed/query/engine/broker_client.go
@@ -5,14 +5,15 @@ import (
"encoding/binary"
"fmt"
"io"
- "strconv"
"strings"
"time"
"github.com/seaweedfs/seaweedfs/weed/cluster"
"github.com/seaweedfs/seaweedfs/weed/filer"
+ "github.com/seaweedfs/seaweedfs/weed/glog"
"github.com/seaweedfs/seaweedfs/weed/mq/pub_balancer"
"github.com/seaweedfs/seaweedfs/weed/mq/topic"
+ "github.com/seaweedfs/seaweedfs/weed/pb"
"github.com/seaweedfs/seaweedfs/weed/pb/filer_pb"
"github.com/seaweedfs/seaweedfs/weed/pb/master_pb"
"github.com/seaweedfs/seaweedfs/weed/pb/mq_pb"
@@ -39,8 +40,9 @@ type BrokerClient struct {
// NewBrokerClient creates a new MQ broker client
// Uses master HTTP address and converts it to gRPC address for service discovery
func NewBrokerClient(masterHTTPAddress string) *BrokerClient {
- // Convert HTTP address to gRPC address (typically HTTP port + 10000)
- masterGRPCAddress := convertHTTPToGRPC(masterHTTPAddress)
+ // Convert HTTP address to gRPC address using pb.ServerAddress method
+ httpAddr := pb.ServerAddress(masterHTTPAddress)
+ masterGRPCAddress := httpAddr.ToGrpcAddress()
return &BrokerClient{
masterAddress: masterGRPCAddress,
@@ -48,20 +50,7 @@ func NewBrokerClient(masterHTTPAddress string) *BrokerClient {
}
}
-// convertHTTPToGRPC converts HTTP address to gRPC address
-// Follows SeaweedFS convention: gRPC port = HTTP port + 10000
-func convertHTTPToGRPC(httpAddress string) string {
- if strings.Contains(httpAddress, ":") {
- parts := strings.Split(httpAddress, ":")
- if len(parts) == 2 {
- if port, err := strconv.Atoi(parts[1]); err == nil {
- return fmt.Sprintf("%s:%d", parts[0], port+10000)
- }
- }
- }
- // Fallback: return original address if conversion fails
- return httpAddress
-}
+// No need for convertHTTPToGRPC - pb.ServerAddress.ToGrpcAddress() already handles this
// discoverFiler finds a filer from the master server
func (c *BrokerClient) discoverFiler() error {
@@ -92,7 +81,8 @@ func (c *BrokerClient) discoverFiler() error {
// Use the first available filer and convert HTTP address to gRPC
filerHTTPAddress := resp.ClusterNodes[0].Address
- c.filerAddress = convertHTTPToGRPC(filerHTTPAddress)
+ httpAddr := pb.ServerAddress(filerHTTPAddress)
+ c.filerAddress = httpAddr.ToGrpcAddress()
return nil
}
@@ -175,7 +165,6 @@ func (f *filerClientImpl) GetDataCenter() string {
}
// ListNamespaces retrieves all MQ namespaces (databases) from the filer
-// RESOLVED: Now queries actual topic directories instead of hardcoded values
func (c *BrokerClient) ListNamespaces(ctx context.Context) ([]string, error) {
// Get filer client to list directories under /topics
filerClient, err := c.GetFilerClient()
@@ -204,8 +193,8 @@ func (c *BrokerClient) ListNamespaces(ctx context.Context) ([]string, error) {
return fmt.Errorf("failed to receive entry: %v", recvErr)
}
- // Only include directories (namespaces), skip files
- if resp.Entry != nil && resp.Entry.IsDirectory {
+ // Only include directories (namespaces), skip files and system directories (starting with .)
+ if resp.Entry != nil && resp.Entry.IsDirectory && !strings.HasPrefix(resp.Entry.Name, ".") {
namespaces = append(namespaces, resp.Entry.Name)
}
}
@@ -222,7 +211,6 @@ func (c *BrokerClient) ListNamespaces(ctx context.Context) ([]string, error) {
}
// ListTopics retrieves all topics in a namespace from the filer
-// RESOLVED: Now queries actual topic directories instead of hardcoded values
func (c *BrokerClient) ListTopics(ctx context.Context, namespace string) ([]string, error) {
// Get filer client to list directories under /topics/{namespace}
filerClient, err := c.GetFilerClient()
@@ -271,16 +259,18 @@ func (c *BrokerClient) ListTopics(ctx context.Context, namespace string) ([]stri
return topics, nil
}
-// GetTopicSchema retrieves schema information for a specific topic
-// Reads the actual schema from topic configuration stored in filer
-func (c *BrokerClient) GetTopicSchema(ctx context.Context, namespace, topicName string) (*schema_pb.RecordType, error) {
+// GetTopicSchema retrieves the flat schema and key columns for a topic
+// Returns (flatSchema, keyColumns, schemaFormat, error)
+func (c *BrokerClient) GetTopicSchema(ctx context.Context, namespace, topicName string) (*schema_pb.RecordType, []string, string, error) {
// Get filer client to read topic configuration
filerClient, err := c.GetFilerClient()
if err != nil {
- return nil, fmt.Errorf("failed to get filer client: %v", err)
+ return nil, nil, "", fmt.Errorf("failed to get filer client: %v", err)
}
- var recordType *schema_pb.RecordType
+ var flatSchema *schema_pb.RecordType
+ var keyColumns []string
+ var schemaFormat string
err = filerClient.WithFilerClient(false, func(client filer_pb.SeaweedFilerClient) error {
// Read topic.conf file from /topics/{namespace}/{topic}/topic.conf
topicDir := fmt.Sprintf("/topics/%s/%s", namespace, topicName)
@@ -306,30 +296,23 @@ func (c *BrokerClient) GetTopicSchema(ctx context.Context, namespace, topicName
return fmt.Errorf("failed to unmarshal topic %s.%s configuration: %v", namespace, topicName, err)
}
- // Extract the record type (schema)
- if conf.RecordType != nil {
- recordType = conf.RecordType
- } else {
- return fmt.Errorf("no schema found for topic %s.%s", namespace, topicName)
- }
+ // Extract flat schema, key columns, and schema format
+ flatSchema = conf.MessageRecordType
+ keyColumns = conf.KeyColumns
+ schemaFormat = conf.SchemaFormat
return nil
})
if err != nil {
- return nil, err
- }
-
- if recordType == nil {
- return nil, fmt.Errorf("no record type found for topic %s.%s", namespace, topicName)
+ return nil, nil, "", err
}
- return recordType, nil
+ return flatSchema, keyColumns, schemaFormat, nil
}
-// ConfigureTopic creates or modifies a topic configuration
-// Assumption: Uses existing ConfigureTopic gRPC method for topic management
-func (c *BrokerClient) ConfigureTopic(ctx context.Context, namespace, topicName string, partitionCount int32, recordType *schema_pb.RecordType) error {
+// ConfigureTopic creates or modifies a topic using flat schema format
+func (c *BrokerClient) ConfigureTopic(ctx context.Context, namespace, topicName string, partitionCount int32, flatSchema *schema_pb.RecordType, keyColumns []string) error {
if err := c.findBrokerBalancer(); err != nil {
return err
}
@@ -342,14 +325,15 @@ func (c *BrokerClient) ConfigureTopic(ctx context.Context, namespace, topicName
client := mq_pb.NewSeaweedMessagingClient(conn)
- // Create topic configuration
+ // Create topic configuration using flat schema format
_, err = client.ConfigureTopic(ctx, &mq_pb.ConfigureTopicRequest{
Topic: &schema_pb.Topic{
Namespace: namespace,
Name: topicName,
},
- PartitionCount: partitionCount,
- RecordType: recordType,
+ PartitionCount: partitionCount,
+ MessageRecordType: flatSchema,
+ KeyColumns: keyColumns,
})
if err != nil {
return fmt.Errorf("failed to configure topic %s.%s: %v", namespace, topicName, err)
@@ -433,15 +417,21 @@ func (c *BrokerClient) ListTopicPartitions(ctx context.Context, namespace, topic
// Uses buffer_start metadata from disk files for precise deduplication
// This prevents double-counting when combining with disk-based data
func (c *BrokerClient) GetUnflushedMessages(ctx context.Context, namespace, topicName string, partition topic.Partition, startTimeNs int64) ([]*filer_pb.LogEntry, error) {
+ glog.V(2).Infof("GetUnflushedMessages called for %s/%s, partition: RangeStart=%d, RangeStop=%d",
+ namespace, topicName, partition.RangeStart, partition.RangeStop)
+
// Step 1: Find the broker that hosts this partition
if err := c.findBrokerBalancer(); err != nil {
+ glog.V(2).Infof("Failed to find broker balancer: %v", err)
// Return empty slice if we can't find broker - prevents double-counting
return []*filer_pb.LogEntry{}, nil
}
+ glog.V(2).Infof("Found broker at address: %s", c.brokerAddress)
// Step 2: Connect to broker
conn, err := grpc.Dial(c.brokerAddress, c.grpcDialOption)
if err != nil {
+ glog.V(2).Infof("Failed to connect to broker %s: %v", c.brokerAddress, err)
// Return empty slice if connection fails - prevents double-counting
return []*filer_pb.LogEntry{}, nil
}
@@ -449,16 +439,20 @@ func (c *BrokerClient) GetUnflushedMessages(ctx context.Context, namespace, topi
client := mq_pb.NewSeaweedMessagingClient(conn)
- // Step 3: Get earliest buffer_start from disk files for precise deduplication
+ // Step 3: For unflushed messages, always start from 0 to get all in-memory data
+ // The buffer_start metadata in log files uses timestamp-based indices for uniqueness,
+ // but the broker's LogBuffer uses sequential indices internally (0, 1, 2, 3...)
+ // For unflushed data queries, we want all messages in the buffer regardless of their
+ // timestamp-based buffer indices, so we always use 0.
topicObj := topic.Topic{Namespace: namespace, Name: topicName}
partitionPath := topic.PartitionDir(topicObj, partition)
- earliestBufferIndex, err := c.getEarliestBufferStart(ctx, partitionPath)
- if err != nil {
- // If we can't get buffer info, use 0 (get all unflushed data)
- earliestBufferIndex = 0
- }
+ glog.V(2).Infof("Getting buffer start from partition path: %s", partitionPath)
+
+ // Always use 0 for unflushed messages to ensure we get all in-memory data
+ earliestBufferOffset := int64(0)
+ glog.V(2).Infof("Using StartBufferOffset=0 for unflushed messages (buffer offsets are sequential internally)")
- // Step 4: Prepare request using buffer index filtering only
+ // Step 4: Prepare request using buffer offset filtering only
request := &mq_pb.GetUnflushedMessagesRequest{
Topic: &schema_pb.Topic{
Namespace: namespace,
@@ -470,12 +464,14 @@ func (c *BrokerClient) GetUnflushedMessages(ctx context.Context, namespace, topi
RangeStop: partition.RangeStop,
UnixTimeNs: partition.UnixTimeNs,
},
- StartBufferIndex: earliestBufferIndex,
+ StartBufferOffset: earliestBufferOffset,
}
// Step 5: Call the broker streaming API
+ glog.V(2).Infof("Calling GetUnflushedMessages gRPC with StartBufferOffset=%d", earliestBufferOffset)
stream, err := client.GetUnflushedMessages(ctx, request)
if err != nil {
+ glog.V(2).Infof("GetUnflushedMessages gRPC call failed: %v", err)
// Return empty slice if gRPC call fails - prevents double-counting
return []*filer_pb.LogEntry{}, nil
}
@@ -558,19 +554,6 @@ func (c *BrokerClient) getEarliestBufferStart(ctx context.Context, partitionPath
return nil
})
- // Debug: Show buffer_start determination logic in EXPLAIN mode
- if isDebugMode(ctx) && len(bufferStartSources) > 0 {
- if logFileCount == 0 && parquetFileCount > 0 {
- fmt.Printf("Debug: Using Parquet buffer_start metadata (binary format, no log files) - sources: %v\n", bufferStartSources)
- } else if logFileCount > 0 && parquetFileCount > 0 {
- fmt.Printf("Debug: Using mixed sources for buffer_start (binary format) - log files: %d, Parquet files: %d, sources: %v\n",
- logFileCount, parquetFileCount, bufferStartSources)
- } else {
- fmt.Printf("Debug: Using log file buffer_start metadata (binary format) - sources: %v\n", bufferStartSources)
- }
- fmt.Printf("Debug: Earliest buffer_start index: %d\n", earliestBufferIndex)
- }
-
if err != nil {
return 0, fmt.Errorf("failed to scan partition directory: %v", err)
}
diff --git a/weed/query/engine/catalog.go b/weed/query/engine/catalog.go
index 4cd39f3f0..f53e4cb2a 100644
--- a/weed/query/engine/catalog.go
+++ b/weed/query/engine/catalog.go
@@ -17,9 +17,9 @@ import (
type BrokerClientInterface interface {
ListNamespaces(ctx context.Context) ([]string, error)
ListTopics(ctx context.Context, namespace string) ([]string, error)
- GetTopicSchema(ctx context.Context, namespace, topic string) (*schema_pb.RecordType, error)
+ GetTopicSchema(ctx context.Context, namespace, topic string) (*schema_pb.RecordType, []string, string, error) // Returns (flatSchema, keyColumns, schemaFormat, error)
+ ConfigureTopic(ctx context.Context, namespace, topicName string, partitionCount int32, flatSchema *schema_pb.RecordType, keyColumns []string) error
GetFilerClient() (filer_pb.FilerClient, error)
- ConfigureTopic(ctx context.Context, namespace, topicName string, partitionCount int32, recordType *schema_pb.RecordType) error
DeleteTopic(ctx context.Context, namespace, topicName string) error
// GetUnflushedMessages returns only messages that haven't been flushed to disk yet
// This prevents double-counting when combining with disk-based data
@@ -151,12 +151,24 @@ func (c *SchemaCatalog) ListTables(database string) ([]string, error) {
tables := make([]string, 0, len(db.Tables))
for name := range db.Tables {
+ // Skip .meta table
+ if name == ".meta" {
+ continue
+ }
tables = append(tables, name)
}
return tables, nil
}
- return topics, nil
+ // Filter out .meta table from topics
+ filtered := make([]string, 0, len(topics))
+ for _, topic := range topics {
+ if topic != ".meta" {
+ filtered = append(filtered, topic)
+ }
+ }
+
+ return filtered, nil
}
// GetTableInfo returns detailed schema information for a table
@@ -185,7 +197,7 @@ func (c *SchemaCatalog) GetTableInfo(database, table string) (*TableInfo, error)
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()
- recordType, err := c.brokerClient.GetTopicSchema(ctx, database, table)
+ recordType, _, _, err := c.brokerClient.GetTopicSchema(ctx, database, table)
if err != nil {
// If broker unavailable and we have expired cached data, return it
if exists {
@@ -278,7 +290,27 @@ func (c *SchemaCatalog) RegisterTopic(namespace, topicName string, mqSchema *sch
// 1. MQ scalar types map directly to SQL types
// 2. Complex types (arrays, maps) are serialized as JSON strings
// 3. All fields are nullable unless specifically marked otherwise
+// 4. If no schema is defined, create a default schema with system fields and _value
func (c *SchemaCatalog) convertMQSchemaToTableInfo(namespace, topicName string, mqSchema *schema.Schema) (*TableInfo, error) {
+ // Check if the schema has a valid RecordType
+ if mqSchema == nil || mqSchema.RecordType == nil {
+ // For topics without schema, create a default schema with system fields and _value
+ columns := []ColumnInfo{
+ {Name: SW_DISPLAY_NAME_TIMESTAMP, Type: "TIMESTAMP", Nullable: true},
+ {Name: SW_COLUMN_NAME_KEY, Type: "VARBINARY", Nullable: true},
+ {Name: SW_COLUMN_NAME_SOURCE, Type: "VARCHAR(255)", Nullable: true},
+ {Name: SW_COLUMN_NAME_VALUE, Type: "VARBINARY", Nullable: true},
+ }
+
+ return &TableInfo{
+ Name: topicName,
+ Namespace: namespace,
+ Schema: nil, // No schema defined
+ Columns: columns,
+ RevisionId: 0,
+ }, nil
+ }
+
columns := make([]ColumnInfo, len(mqSchema.RecordType.Fields))
for i, field := range mqSchema.RecordType.Fields {
diff --git a/weed/query/engine/catalog_no_schema_test.go b/weed/query/engine/catalog_no_schema_test.go
new file mode 100644
index 000000000..0c0312cee
--- /dev/null
+++ b/weed/query/engine/catalog_no_schema_test.go
@@ -0,0 +1,101 @@
+package engine
+
+import (
+ "testing"
+
+ "github.com/seaweedfs/seaweedfs/weed/mq/schema"
+)
+
+// TestConvertMQSchemaToTableInfo_NoSchema tests that topics without schemas
+// get a default schema with system fields and _value field
+func TestConvertMQSchemaToTableInfo_NoSchema(t *testing.T) {
+ catalog := NewSchemaCatalog("localhost:9333")
+
+ tests := []struct {
+ name string
+ mqSchema *schema.Schema
+ expectError bool
+ checkFields func(*testing.T, *TableInfo)
+ }{
+ {
+ name: "nil schema",
+ mqSchema: nil,
+ expectError: false,
+ checkFields: func(t *testing.T, info *TableInfo) {
+ if info.Schema != nil {
+ t.Error("Expected Schema to be nil for topics without schema")
+ }
+ if len(info.Columns) != 4 {
+ t.Errorf("Expected 4 columns, got %d", len(info.Columns))
+ }
+ expectedCols := map[string]string{
+ "_ts": "TIMESTAMP",
+ "_key": "VARBINARY",
+ "_source": "VARCHAR(255)",
+ "_value": "VARBINARY",
+ }
+ for _, col := range info.Columns {
+ expectedType, ok := expectedCols[col.Name]
+ if !ok {
+ t.Errorf("Unexpected column: %s", col.Name)
+ continue
+ }
+ if col.Type != expectedType {
+ t.Errorf("Column %s: expected type %s, got %s", col.Name, expectedType, col.Type)
+ }
+ }
+ },
+ },
+ {
+ name: "schema with nil RecordType",
+ mqSchema: &schema.Schema{
+ RecordType: nil,
+ RevisionId: 1,
+ },
+ expectError: false,
+ checkFields: func(t *testing.T, info *TableInfo) {
+ if info.Schema != nil {
+ t.Error("Expected Schema to be nil for topics without RecordType")
+ }
+ if len(info.Columns) != 4 {
+ t.Errorf("Expected 4 columns, got %d", len(info.Columns))
+ }
+ },
+ },
+ }
+
+ for _, tt := range tests {
+ t.Run(tt.name, func(t *testing.T) {
+ tableInfo, err := catalog.convertMQSchemaToTableInfo("test_namespace", "test_topic", tt.mqSchema)
+
+ if tt.expectError {
+ if err == nil {
+ t.Error("Expected error but got none")
+ }
+ return
+ }
+
+ if err != nil {
+ t.Errorf("Unexpected error: %v", err)
+ return
+ }
+
+ if tableInfo == nil {
+ t.Error("Expected tableInfo but got nil")
+ return
+ }
+
+ if tt.checkFields != nil {
+ tt.checkFields(t, tableInfo)
+ }
+
+ // Basic checks
+ if tableInfo.Name != "test_topic" {
+ t.Errorf("Expected Name 'test_topic', got '%s'", tableInfo.Name)
+ }
+ if tableInfo.Namespace != "test_namespace" {
+ t.Errorf("Expected Namespace 'test_namespace', got '%s'", tableInfo.Namespace)
+ }
+ })
+ }
+}
diff --git a/weed/query/engine/cockroach_parser_success_test.go b/weed/query/engine/cockroach_parser_success_test.go
index 499d0c28e..f810e604c 100644
--- a/weed/query/engine/cockroach_parser_success_test.go
+++ b/weed/query/engine/cockroach_parser_success_test.go
@@ -73,17 +73,17 @@ func TestCockroachDBParserSuccess(t *testing.T) {
result, err := engine.ExecuteSQL(context.Background(), tc.sql)
if err != nil {
- t.Errorf("❌ %s - Query failed: %v", tc.desc, err)
+ t.Errorf("%s - Query failed: %v", tc.desc, err)
return
}
if result.Error != nil {
- t.Errorf("❌ %s - Query result error: %v", tc.desc, result.Error)
+ t.Errorf("%s - Query result error: %v", tc.desc, result.Error)
return
}
if len(result.Rows) == 0 {
- t.Errorf("❌ %s - Expected at least one row", tc.desc)
+ t.Errorf("%s - Expected at least one row", tc.desc)
return
}
diff --git a/weed/query/engine/complete_sql_fixes_test.go b/weed/query/engine/complete_sql_fixes_test.go
index 19d7d59fb..e984ce0e1 100644
--- a/weed/query/engine/complete_sql_fixes_test.go
+++ b/weed/query/engine/complete_sql_fixes_test.go
@@ -24,19 +24,19 @@ func TestCompleteSQLFixes(t *testing.T) {
name: "OriginalFailingQuery1",
timestamp: 1756947416566456262,
id: 897795,
- sql: "select id, _timestamp_ns as ts from ecommerce.user_events where ts = 1756947416566456262",
+ sql: "select id, _ts_ns as ts from ecommerce.user_events where ts = 1756947416566456262",
},
{
name: "OriginalFailingQuery2",
timestamp: 1756947416566439304,
id: 715356,
- sql: "select id, _timestamp_ns as ts from ecommerce.user_events where ts = 1756947416566439304",
+ sql: "select id, _ts_ns as ts from ecommerce.user_events where ts = 1756947416566439304",
},
{
name: "CurrentDataQuery",
timestamp: 1756913789829292386,
id: 82460,
- sql: "select id, _timestamp_ns as ts from ecommerce.user_events where ts = 1756913789829292386",
+ sql: "select id, _ts_ns as ts from ecommerce.user_events where ts = 1756913789829292386",
},
}
@@ -45,8 +45,8 @@ func TestCompleteSQLFixes(t *testing.T) {
// Create test record matching the production data
testRecord := &schema_pb.RecordValue{
Fields: map[string]*schema_pb.Value{
- "_timestamp_ns": {Kind: &schema_pb.Value_Int64Value{Int64Value: tc.timestamp}},
- "id": {Kind: &schema_pb.Value_Int64Value{Int64Value: tc.id}},
+ "_ts_ns": {Kind: &schema_pb.Value_Int64Value{Int64Value: tc.timestamp}},
+ "id": {Kind: &schema_pb.Value_Int64Value{Int64Value: tc.id}},
},
}
@@ -67,8 +67,8 @@ func TestCompleteSQLFixes(t *testing.T) {
// Verify precision is maintained (timestamp fixes)
testRecordOffBy1 := &schema_pb.RecordValue{
Fields: map[string]*schema_pb.Value{
- "_timestamp_ns": {Kind: &schema_pb.Value_Int64Value{Int64Value: tc.timestamp + 1}},
- "id": {Kind: &schema_pb.Value_Int64Value{Int64Value: tc.id}},
+ "_ts_ns": {Kind: &schema_pb.Value_Int64Value{Int64Value: tc.timestamp + 1}},
+ "id": {Kind: &schema_pb.Value_Int64Value{Int64Value: tc.id}},
},
}
@@ -84,9 +84,9 @@ func TestCompleteSQLFixes(t *testing.T) {
testRecord := &schema_pb.RecordValue{
Fields: map[string]*schema_pb.Value{
- "_timestamp_ns": {Kind: &schema_pb.Value_Int64Value{Int64Value: largeTimestamp}},
- "id": {Kind: &schema_pb.Value_Int64Value{Int64Value: 897795}},
- "user_id": {Kind: &schema_pb.Value_StringValue{StringValue: "user123"}},
+ "_ts_ns": {Kind: &schema_pb.Value_Int64Value{Int64Value: largeTimestamp}},
+ "id": {Kind: &schema_pb.Value_Int64Value{Int64Value: 897795}},
+ "user_id": {Kind: &schema_pb.Value_StringValue{StringValue: "user123"}},
},
}
@@ -96,7 +96,7 @@ func TestCompleteSQLFixes(t *testing.T) {
// 3. Multiple conditions
// 4. Different data types
sql := `SELECT
- _timestamp_ns AS ts,
+ _ts_ns AS ts,
id AS record_id,
user_id AS uid
FROM ecommerce.user_events
@@ -117,9 +117,9 @@ func TestCompleteSQLFixes(t *testing.T) {
// Test that precision is still maintained in complex queries
testRecordDifferentTimestamp := &schema_pb.RecordValue{
Fields: map[string]*schema_pb.Value{
- "_timestamp_ns": {Kind: &schema_pb.Value_Int64Value{Int64Value: largeTimestamp + 1}}, // Off by 1ns
- "id": {Kind: &schema_pb.Value_Int64Value{Int64Value: 897795}},
- "user_id": {Kind: &schema_pb.Value_StringValue{StringValue: "user123"}},
+ "_ts_ns": {Kind: &schema_pb.Value_Int64Value{Int64Value: largeTimestamp + 1}}, // Off by 1ns
+ "id": {Kind: &schema_pb.Value_Int64Value{Int64Value: 897795}},
+ "user_id": {Kind: &schema_pb.Value_StringValue{StringValue: "user123"}},
},
}
@@ -131,13 +131,13 @@ func TestCompleteSQLFixes(t *testing.T) {
// Ensure that non-alias queries continue to work exactly as before
testRecord := &schema_pb.RecordValue{
Fields: map[string]*schema_pb.Value{
- "_timestamp_ns": {Kind: &schema_pb.Value_Int64Value{Int64Value: 1756947416566456262}},
- "id": {Kind: &schema_pb.Value_Int64Value{Int64Value: 897795}},
+ "_ts_ns": {Kind: &schema_pb.Value_Int64Value{Int64Value: 1756947416566456262}},
+ "id": {Kind: &schema_pb.Value_Int64Value{Int64Value: 897795}},
},
}
// Traditional query (no aliases) - should work exactly as before
- traditionalSQL := "SELECT _timestamp_ns, id FROM ecommerce.user_events WHERE _timestamp_ns = 1756947416566456262 AND id = 897795"
+ traditionalSQL := "SELECT _ts_ns, id FROM ecommerce.user_events WHERE _ts_ns = 1756947416566456262 AND id = 897795"
stmt, err := ParseSQL(traditionalSQL)
assert.NoError(t, err)
@@ -162,13 +162,13 @@ func TestCompleteSQLFixes(t *testing.T) {
// Test that the fixes don't introduce performance or stability issues
testRecord := &schema_pb.RecordValue{
Fields: map[string]*schema_pb.Value{
- "_timestamp_ns": {Kind: &schema_pb.Value_Int64Value{Int64Value: 1756947416566456262}},
- "id": {Kind: &schema_pb.Value_Int64Value{Int64Value: 897795}},
+ "_ts_ns": {Kind: &schema_pb.Value_Int64Value{Int64Value: 1756947416566456262}},
+ "id": {Kind: &schema_pb.Value_Int64Value{Int64Value: 897795}},
},
}
// Run the same query many times to test stability
- sql := "SELECT _timestamp_ns AS ts, id FROM test WHERE ts = 1756947416566456262"
+ sql := "SELECT _ts_ns AS ts, id FROM test WHERE ts = 1756947416566456262"
stmt, err := ParseSQL(sql)
assert.NoError(t, err)
@@ -194,7 +194,7 @@ func TestCompleteSQLFixes(t *testing.T) {
// Test with nil SelectExprs (should fall back to no-alias behavior)
compExpr := &ComparisonExpr{
- Left: &ColName{Name: stringValue("_timestamp_ns")},
+ Left: &ColName{Name: stringValue("_ts_ns")},
Operator: "=",
Right: &SQLVal{Type: IntVal, Val: []byte("1756947416566456262")},
}
@@ -218,43 +218,43 @@ func TestSQLFixesSummary(t *testing.T) {
// The "before and after" test
testRecord := &schema_pb.RecordValue{
Fields: map[string]*schema_pb.Value{
- "_timestamp_ns": {Kind: &schema_pb.Value_Int64Value{Int64Value: 1756947416566456262}},
- "id": {Kind: &schema_pb.Value_Int64Value{Int64Value: 897795}},
+ "_ts_ns": {Kind: &schema_pb.Value_Int64Value{Int64Value: 1756947416566456262}},
+ "id": {Kind: &schema_pb.Value_Int64Value{Int64Value: 897795}},
},
}
// What was failing before (would return 0 rows)
- failingSQL := "SELECT id, _timestamp_ns AS ts FROM ecommerce.user_events WHERE ts = 1756947416566456262"
+ failingSQL := "SELECT id, _ts_ns AS ts FROM ecommerce.user_events WHERE ts = 1756947416566456262"
// What works now
stmt, err := ParseSQL(failingSQL)
- assert.NoError(t, err, "✅ SQL parsing works")
+ assert.NoError(t, err, "SQL parsing works")
selectStmt := stmt.(*SelectStatement)
predicate, err := engine.buildPredicateWithContext(selectStmt.Where.Expr, selectStmt.SelectExprs)
- assert.NoError(t, err, "✅ Predicate building works with aliases")
+ assert.NoError(t, err, "Predicate building works with aliases")
result := predicate(testRecord)
- assert.True(t, result, "✅ Originally failing query now works perfectly")
+ assert.True(t, result, "Originally failing query now works perfectly")
// Verify precision is maintained
testRecordOffBy1 := &schema_pb.RecordValue{
Fields: map[string]*schema_pb.Value{
- "_timestamp_ns": {Kind: &schema_pb.Value_Int64Value{Int64Value: 1756947416566456263}},
- "id": {Kind: &schema_pb.Value_Int64Value{Int64Value: 897795}},
+ "_ts_ns": {Kind: &schema_pb.Value_Int64Value{Int64Value: 1756947416566456263}},
+ "id": {Kind: &schema_pb.Value_Int64Value{Int64Value: 897795}},
},
}
result2 := predicate(testRecordOffBy1)
- assert.False(t, result2, "✅ Nanosecond precision maintained")
-
- t.Log("🎉 ALL SQL FIXES VERIFIED:")
- t.Log(" ✅ Timestamp precision for large int64 values")
- t.Log(" ✅ SQL alias resolution in WHERE clauses")
- t.Log(" ✅ Scan boundary fixes for equality queries")
- t.Log(" ✅ Range query fixes for equal boundaries")
- t.Log(" ✅ Hybrid scanner time range handling")
- t.Log(" ✅ Backward compatibility maintained")
- t.Log(" ✅ Production stability verified")
+ assert.False(t, result2, "Nanosecond precision maintained")
+
+ t.Log("ALL SQL FIXES VERIFIED:")
+ t.Log(" Timestamp precision for large int64 values")
+ t.Log(" SQL alias resolution in WHERE clauses")
+ t.Log(" Scan boundary fixes for equality queries")
+ t.Log(" Range query fixes for equal boundaries")
+ t.Log(" Hybrid scanner time range handling")
+ t.Log(" Backward compatibility maintained")
+ t.Log(" Production stability verified")
})
}
diff --git a/weed/query/engine/describe.go b/weed/query/engine/describe.go
index 3a26bb2a6..415fc8e17 100644
--- a/weed/query/engine/describe.go
+++ b/weed/query/engine/describe.go
@@ -27,8 +27,8 @@ func (e *SQLEngine) executeDescribeStatement(ctx context.Context, tableName stri
}
}
- // Get topic schema from broker
- recordType, err := e.catalog.brokerClient.GetTopicSchema(ctx, database, tableName)
+ // Get flat schema and key columns from broker
+ flatSchema, keyColumns, _, err := e.catalog.brokerClient.GetTopicSchema(ctx, database, tableName)
if err != nil {
return &QueryResult{Error: err}, err
}
@@ -44,38 +44,71 @@ func (e *SQLEngine) executeDescribeStatement(ctx context.Context, tableName stri
{"_source", "VARCHAR(255)", "System column: Data source (parquet/log)"},
}
- // Format schema as DESCRIBE output (regular fields + system columns)
- totalRows := len(recordType.Fields) + len(systemColumns)
+ // If no schema is defined, include _value field
+ if flatSchema == nil {
+ systemColumns = append(systemColumns, struct {
+ Name string
+ Type string
+ Extra string
+ }{SW_COLUMN_NAME_VALUE, "VARBINARY", "Raw message value (no schema defined)"})
+ }
+
+ // Calculate total rows: schema fields + system columns
+ totalRows := len(systemColumns)
+ if flatSchema != nil {
+ totalRows += len(flatSchema.Fields)
+ }
+
+ // Create key column lookup map
+ keyColumnMap := make(map[string]bool)
+ for _, keyCol := range keyColumns {
+ keyColumnMap[keyCol] = true
+ }
+
result := &QueryResult{
Columns: []string{"Field", "Type", "Null", "Key", "Default", "Extra"},
Rows: make([][]sqltypes.Value, totalRows),
}
- // Add regular fields
- for i, field := range recordType.Fields {
- sqlType := e.convertMQTypeToSQL(field.Type)
-
- result.Rows[i] = []sqltypes.Value{
- sqltypes.NewVarChar(field.Name), // Field
- sqltypes.NewVarChar(sqlType), // Type
- sqltypes.NewVarChar("YES"), // Null (assume nullable)
- sqltypes.NewVarChar(""), // Key (no keys for now)
- sqltypes.NewVarChar("NULL"), // Default
- sqltypes.NewVarChar(""), // Extra
+ rowIndex := 0
+
+ // Add schema fields - mark key columns appropriately
+ if flatSchema != nil {
+ for _, field := range flatSchema.Fields {
+ sqlType := e.convertMQTypeToSQL(field.Type)
+ isKey := keyColumnMap[field.Name]
+ keyType := ""
+ if isKey {
+ keyType = "PRI" // Primary key
+ }
+ extra := "Data field"
+ if isKey {
+ extra = "Key field"
+ }
+
+ result.Rows[rowIndex] = []sqltypes.Value{
+ sqltypes.NewVarChar(field.Name),
+ sqltypes.NewVarChar(sqlType),
+ sqltypes.NewVarChar("YES"),
+ sqltypes.NewVarChar(keyType),
+ sqltypes.NewVarChar("NULL"),
+ sqltypes.NewVarChar(extra),
+ }
+ rowIndex++
}
}
// Add system columns
- for i, sysCol := range systemColumns {
- rowIndex := len(recordType.Fields) + i
+ for _, sysCol := range systemColumns {
result.Rows[rowIndex] = []sqltypes.Value{
sqltypes.NewVarChar(sysCol.Name), // Field
sqltypes.NewVarChar(sysCol.Type), // Type
sqltypes.NewVarChar("YES"), // Null
- sqltypes.NewVarChar(""), // Key
+ sqltypes.NewVarChar("SYS"), // Key - mark as system column
sqltypes.NewVarChar("NULL"), // Default
sqltypes.NewVarChar(sysCol.Extra), // Extra - description
}
+ rowIndex++
}
return result, nil
diff --git a/weed/query/engine/engine.go b/weed/query/engine/engine.go
index ffed03f35..e00fd78ca 100644
--- a/weed/query/engine/engine.go
+++ b/weed/query/engine/engine.go
@@ -1513,47 +1513,49 @@ func (e *SQLEngine) executeSelectStatementWithPlan(ctx context.Context, stmt *Se
var result *QueryResult
var err error
- if hasAggregations {
- // Extract table information for aggregation execution
- var database, tableName string
- if len(stmt.From) == 1 {
- if table, ok := stmt.From[0].(*AliasedTableExpr); ok {
- if tableExpr, ok := table.Expr.(TableName); ok {
- tableName = tableExpr.Name.String()
- if tableExpr.Qualifier != nil && tableExpr.Qualifier.String() != "" {
- database = tableExpr.Qualifier.String()
- }
+ // Extract table information for execution (needed for both aggregation and regular queries)
+ var database, tableName string
+ if len(stmt.From) == 1 {
+ if table, ok := stmt.From[0].(*AliasedTableExpr); ok {
+ if tableExpr, ok := table.Expr.(TableName); ok {
+ tableName = tableExpr.Name.String()
+ if tableExpr.Qualifier != nil && tableExpr.Qualifier.String() != "" {
+ database = tableExpr.Qualifier.String()
}
}
}
+ }
- // Use current database if not specified
+ // Use current database if not specified
+ if database == "" {
+ database = e.catalog.currentDatabase
if database == "" {
- database = e.catalog.currentDatabase
- if database == "" {
- database = "default"
- }
- }
-
- // Create hybrid scanner for aggregation execution
- var filerClient filer_pb.FilerClient
- if e.catalog.brokerClient != nil {
- filerClient, err = e.catalog.brokerClient.GetFilerClient()
- if err != nil {
- return &QueryResult{Error: err}, err
- }
+ database = "default"
}
+ }
- hybridScanner, err := NewHybridMessageScanner(filerClient, e.catalog.brokerClient, database, tableName, e)
+ // CRITICAL FIX: Always use HybridMessageScanner for ALL queries to read both flushed and unflushed data
+ // Create hybrid scanner for both aggregation and regular SELECT queries
+ var filerClient filer_pb.FilerClient
+ if e.catalog.brokerClient != nil {
+ filerClient, err = e.catalog.brokerClient.GetFilerClient()
if err != nil {
return &QueryResult{Error: err}, err
}
+ }
+ hybridScanner, err := NewHybridMessageScanner(filerClient, e.catalog.brokerClient, database, tableName, e)
+ if err != nil {
+ return &QueryResult{Error: err}, err
+ }
+
+ if hasAggregations {
// Execute aggregation query with plan tracking
result, err = e.executeAggregationQueryWithPlan(ctx, hybridScanner, aggregations, stmt, plan)
} else {
- // Regular SELECT query with plan tracking
- result, err = e.executeSelectStatementWithBrokerStats(ctx, stmt, plan)
+ // CRITICAL FIX: Use HybridMessageScanner for regular SELECT queries too
+ // This ensures both flushed and unflushed data are read
+ result, err = e.executeRegularSelectWithHybridScanner(ctx, hybridScanner, stmt, plan)
}
if err == nil && result != nil {
@@ -1981,6 +1983,198 @@ func (e *SQLEngine) executeSelectStatement(ctx context.Context, stmt *SelectStat
return e.ConvertToSQLResultWithExpressions(hybridScanner, results, stmt.SelectExprs), nil
}
+// executeRegularSelectWithHybridScanner handles regular SELECT queries using HybridMessageScanner
+// This ensures both flushed and unflushed data are read, fixing the SQL empty results issue
+func (e *SQLEngine) executeRegularSelectWithHybridScanner(ctx context.Context, hybridScanner *HybridMessageScanner, stmt *SelectStatement, plan *QueryExecutionPlan) (*QueryResult, error) {
+ // Parse SELECT expressions to determine columns and detect aggregations
+ var columns []string
+ var aggregations []AggregationSpec
+ var hasAggregations bool
+ selectAll := false
+ baseColumnsSet := make(map[string]bool) // Track base columns needed for expressions
+
+ for _, selectExpr := range stmt.SelectExprs {
+ switch expr := selectExpr.(type) {
+ case *StarExpr:
+ selectAll = true
+ case *AliasedExpr:
+ switch col := expr.Expr.(type) {
+ case *ColName:
+ columnName := col.Name.String()
+ columns = append(columns, columnName)
+ baseColumnsSet[columnName] = true
+ case *FuncExpr:
+ funcName := strings.ToLower(col.Name.String())
+ if e.isAggregationFunction(funcName) {
+ // Handle aggregation functions
+ aggSpec, err := e.parseAggregationFunction(col, expr)
+ if err != nil {
+ return &QueryResult{Error: err}, err
+ }
+ aggregations = append(aggregations, *aggSpec)
+ hasAggregations = true
+ } else if e.isStringFunction(funcName) {
+ // Handle string functions like UPPER, LENGTH, etc.
+ columns = append(columns, e.getStringFunctionAlias(col))
+ // Extract base columns needed for this string function
+ e.extractBaseColumnsFromFunction(col, baseColumnsSet)
+ } else if e.isDateTimeFunction(funcName) {
+ // Handle datetime functions like CURRENT_DATE, NOW, EXTRACT, DATE_TRUNC
+ columns = append(columns, e.getDateTimeFunctionAlias(col))
+ // Extract base columns needed for this datetime function
+ e.extractBaseColumnsFromFunction(col, baseColumnsSet)
+ } else {
+ return &QueryResult{Error: fmt.Errorf("unsupported function: %s", funcName)}, fmt.Errorf("unsupported function: %s", funcName)
+ }
+ default:
+ err := fmt.Errorf("unsupported SELECT expression: %T", col)
+ return &QueryResult{Error: err}, err
+ }
+ default:
+ err := fmt.Errorf("unsupported SELECT expression: %T", expr)
+ return &QueryResult{Error: err}, err
+ }
+ }
+
+ // If we have aggregations, delegate to aggregation handler
+ if hasAggregations {
+ return e.executeAggregationQuery(ctx, hybridScanner, aggregations, stmt)
+ }
+
+ // Parse WHERE clause for predicate pushdown
+ var predicate func(*schema_pb.RecordValue) bool
+ var err error
+ if stmt.Where != nil {
+ predicate, err = e.buildPredicateWithContext(stmt.Where.Expr, stmt.SelectExprs)
+ if err != nil {
+ return &QueryResult{Error: err}, err
+ }
+ }
+
+ // Parse LIMIT and OFFSET clauses
+ // Use -1 to distinguish "no LIMIT" from "LIMIT 0"
+ limit := -1
+ offset := 0
+ if stmt.Limit != nil && stmt.Limit.Rowcount != nil {
+ switch limitExpr := stmt.Limit.Rowcount.(type) {
+ case *SQLVal:
+ if limitExpr.Type == IntVal {
+ var parseErr error
+ limit64, parseErr := strconv.ParseInt(string(limitExpr.Val), 10, 64)
+ if parseErr != nil {
+ return &QueryResult{Error: parseErr}, parseErr
+ }
+ if limit64 > math.MaxInt32 || limit64 < 0 {
+ return &QueryResult{Error: fmt.Errorf("LIMIT value %d is out of valid range", limit64)}, fmt.Errorf("LIMIT value %d is out of valid range", limit64)
+ }
+ limit = int(limit64)
+ }
+ }
+ }
+
+ // Parse OFFSET clause if present
+ if stmt.Limit != nil && stmt.Limit.Offset != nil {
+ switch offsetExpr := stmt.Limit.Offset.(type) {
+ case *SQLVal:
+ if offsetExpr.Type == IntVal {
+ var parseErr error
+ offset64, parseErr := strconv.ParseInt(string(offsetExpr.Val), 10, 64)
+ if parseErr != nil {
+ return &QueryResult{Error: parseErr}, parseErr
+ }
+ if offset64 > math.MaxInt32 || offset64 < 0 {
+ return &QueryResult{Error: fmt.Errorf("OFFSET value %d is out of valid range", offset64)}, fmt.Errorf("OFFSET value %d is out of valid range", offset64)
+ }
+ offset = int(offset64)
+ }
+ }
+ }
+
+ // Build hybrid scan options
+ // Extract time filters from WHERE clause to optimize scanning
+ startTimeNs, stopTimeNs := int64(0), int64(0)
+ if stmt.Where != nil {
+ startTimeNs, stopTimeNs = e.extractTimeFilters(stmt.Where.Expr)
+ }
+
+ hybridScanOptions := HybridScanOptions{
+ StartTimeNs: startTimeNs, // Extracted from WHERE clause time comparisons
+ StopTimeNs: stopTimeNs, // Extracted from WHERE clause time comparisons
+ Limit: limit,
+ Offset: offset,
+ Predicate: predicate,
+ }
+
+ if !selectAll {
+ // Convert baseColumnsSet to slice for hybrid scan options
+ baseColumns := make([]string, 0, len(baseColumnsSet))
+ for columnName := range baseColumnsSet {
+ baseColumns = append(baseColumns, columnName)
+ }
+ // Use base columns (not expression aliases) for data retrieval
+ if len(baseColumns) > 0 {
+ hybridScanOptions.Columns = baseColumns
+ } else {
+ // If no base columns found (shouldn't happen), use original columns
+ hybridScanOptions.Columns = columns
+ }
+ }
+
+ // Execute the hybrid scan (both flushed and unflushed data)
+ var results []HybridScanResult
+ if plan != nil {
+ // EXPLAIN mode - capture broker buffer stats
+ var stats *HybridScanStats
+ results, stats, err = hybridScanner.ScanWithStats(ctx, hybridScanOptions)
+ if err != nil {
+ return &QueryResult{Error: err}, err
+ }
+
+ // Populate plan with broker buffer information
+ if stats != nil {
+ plan.BrokerBufferQueried = stats.BrokerBufferQueried
+ plan.BrokerBufferMessages = stats.BrokerBufferMessages
+ plan.BufferStartIndex = stats.BufferStartIndex
+
+ // Add broker_buffer to data sources if buffer was queried
+ if stats.BrokerBufferQueried {
+ // Check if broker_buffer is already in data sources
+ hasBrokerBuffer := false
+ for _, source := range plan.DataSources {
+ if source == "broker_buffer" {
+ hasBrokerBuffer = true
+ break
+ }
+ }
+ if !hasBrokerBuffer {
+ plan.DataSources = append(plan.DataSources, "broker_buffer")
+ }
+ }
+ }
+ } else {
+ // Normal mode - just get results
+ results, err = hybridScanner.Scan(ctx, hybridScanOptions)
+ if err != nil {
+ return &QueryResult{Error: err}, err
+ }
+ }
+
+ // Convert to SQL result format
+ if selectAll {
+ if len(columns) > 0 {
+ // SELECT *, specific_columns - include both auto-discovered and explicit columns
+ return hybridScanner.ConvertToSQLResultWithMixedColumns(results, columns), nil
+ } else {
+ // SELECT * only - let converter determine all columns (excludes system columns)
+ columns = nil
+ return hybridScanner.ConvertToSQLResult(results, columns), nil
+ }
+ }
+
+ // Handle custom column expressions (including arithmetic)
+ return e.ConvertToSQLResultWithExpressions(hybridScanner, results, stmt.SelectExprs), nil
+}
+
// executeSelectStatementWithBrokerStats handles SELECT queries with broker buffer statistics capture
// This is used by EXPLAIN queries to capture complete data source information including broker memory
func (e *SQLEngine) executeSelectStatementWithBrokerStats(ctx context.Context, stmt *SelectStatement, plan *QueryExecutionPlan) (*QueryResult, error) {
@@ -2237,10 +2431,6 @@ func (e *SQLEngine) executeSelectStatementWithBrokerStats(ctx context.Context, s
plan.Details[PlanDetailStartTimeNs] = startTimeNs
plan.Details[PlanDetailStopTimeNs] = stopTimeNs
- if isDebugMode(ctx) {
- fmt.Printf("Debug: Time filters extracted - startTimeNs=%d stopTimeNs=%d\n", startTimeNs, stopTimeNs)
- }
-
// Collect actual file information for each partition
var parquetFiles []string
var liveLogFiles []string
@@ -2261,9 +2451,6 @@ func (e *SQLEngine) executeSelectStatementWithBrokerStats(ctx context.Context, s
columnPrunedCount := beforeColumnPrune - len(filteredStats)
if columnPrunedCount > 0 {
- if isDebugMode(ctx) {
- fmt.Printf("Debug: Column statistics pruning skipped %d parquet files in %s\n", columnPrunedCount, partitionPath)
- }
// Track column statistics optimization
if !contains(plan.OptimizationsUsed, "column_statistics_pruning") {
plan.OptimizationsUsed = append(plan.OptimizationsUsed, "column_statistics_pruning")
@@ -2275,9 +2462,6 @@ func (e *SQLEngine) executeSelectStatementWithBrokerStats(ctx context.Context, s
}
} else {
parquetReadErrors = append(parquetReadErrors, fmt.Sprintf("%s: %v", partitionPath, err))
- if isDebugMode(ctx) {
- fmt.Printf("Debug: Failed to read parquet statistics in %s: %v\n", partitionPath, err)
- }
}
// Merge accurate parquet sources from metadata
@@ -2298,9 +2482,6 @@ func (e *SQLEngine) executeSelectStatementWithBrokerStats(ctx context.Context, s
}
} else {
liveLogListErrors = append(liveLogListErrors, fmt.Sprintf("%s: %v", partitionPath, err))
- if isDebugMode(ctx) {
- fmt.Printf("Debug: Failed to list live log files in %s: %v\n", partitionPath, err)
- }
}
}
@@ -2559,7 +2740,6 @@ func pruneParquetFilesByTime(ctx context.Context, parquetStats []*ParquetFileSta
return parquetStats
}
- debugEnabled := ctx != nil && isDebugMode(ctx)
qStart := startTimeNs
qStop := stopTimeNs
if qStop == 0 {
@@ -2568,21 +2748,10 @@ func pruneParquetFilesByTime(ctx context.Context, parquetStats []*ParquetFileSta
n := 0
for _, fs := range parquetStats {
- if debugEnabled {
- fmt.Printf("Debug: Checking parquet file %s for pruning\n", fs.FileName)
- }
if minNs, maxNs, ok := hybridScanner.getTimestampRangeFromStats(fs); ok {
- if debugEnabled {
- fmt.Printf("Debug: Prune check parquet %s min=%d max=%d qStart=%d qStop=%d\n", fs.FileName, minNs, maxNs, qStart, qStop)
- }
if qStop < minNs || (qStart != 0 && qStart > maxNs) {
- if debugEnabled {
- fmt.Printf("Debug: Skipping parquet file %s due to no time overlap\n", fs.FileName)
- }
continue
}
- } else if debugEnabled {
- fmt.Printf("Debug: No stats range available for parquet %s, cannot prune\n", fs.FileName)
}
parquetStats[n] = fs
n++
@@ -2596,13 +2765,9 @@ func (e *SQLEngine) pruneParquetFilesByColumnStats(ctx context.Context, parquetS
return parquetStats
}
- debugEnabled := ctx != nil && isDebugMode(ctx)
n := 0
for _, fs := range parquetStats {
if e.canSkipParquetFile(ctx, fs, whereExpr) {
- if debugEnabled {
- fmt.Printf("Debug: Skipping parquet file %s due to column statistics pruning\n", fs.FileName)
- }
continue
}
parquetStats[n] = fs
@@ -2726,7 +2891,6 @@ func (e *SQLEngine) flipOperator(op string) string {
// populatePlanFileDetails populates execution plan with detailed file information for partitions
// Includes column statistics pruning optimization when WHERE clause is provided
func (e *SQLEngine) populatePlanFileDetails(ctx context.Context, plan *QueryExecutionPlan, hybridScanner *HybridMessageScanner, partitions []string, stmt *SelectStatement) {
- debugEnabled := ctx != nil && isDebugMode(ctx)
// Collect actual file information for each partition
var parquetFiles []string
var liveLogFiles []string
@@ -2750,9 +2914,6 @@ func (e *SQLEngine) populatePlanFileDetails(ctx context.Context, plan *QueryExec
columnPrunedCount := beforeColumnPrune - len(filteredStats)
if columnPrunedCount > 0 {
- if debugEnabled {
- fmt.Printf("Debug: Column statistics pruning skipped %d parquet files in %s\n", columnPrunedCount, partitionPath)
- }
// Track column statistics optimization
if !contains(plan.OptimizationsUsed, "column_statistics_pruning") {
plan.OptimizationsUsed = append(plan.OptimizationsUsed, "column_statistics_pruning")
@@ -2765,9 +2926,6 @@ func (e *SQLEngine) populatePlanFileDetails(ctx context.Context, plan *QueryExec
}
} else {
parquetReadErrors = append(parquetReadErrors, fmt.Sprintf("%s: %v", partitionPath, err))
- if debugEnabled {
- fmt.Printf("Debug: Failed to read parquet statistics in %s: %v\n", partitionPath, err)
- }
}
// Merge accurate parquet sources from metadata
@@ -2788,9 +2946,6 @@ func (e *SQLEngine) populatePlanFileDetails(ctx context.Context, plan *QueryExec
}
} else {
liveLogListErrors = append(liveLogListErrors, fmt.Sprintf("%s: %v", partitionPath, err))
- if debugEnabled {
- fmt.Printf("Debug: Failed to list live log files in %s: %v\n", partitionPath, err)
- }
}
}
@@ -3848,7 +4003,7 @@ func (e *SQLEngine) createTable(ctx context.Context, stmt *DDLStatement) (*Query
// Create the topic via broker using configurable partition count
partitionCount := e.catalog.GetDefaultPartitionCount()
- err := e.catalog.brokerClient.ConfigureTopic(ctx, database, tableName, partitionCount, recordType)
+ err := e.catalog.brokerClient.ConfigureTopic(ctx, database, tableName, partitionCount, recordType, nil)
if err != nil {
return &QueryResult{Error: err}, err
}
@@ -4283,29 +4438,29 @@ func (e *SQLEngine) eachLogEntryInFile(filerClient filer_pb.FilerClient, filePat
// convertLogEntryToRecordValue helper method (reuse existing logic)
func (e *SQLEngine) convertLogEntryToRecordValue(logEntry *filer_pb.LogEntry) (*schema_pb.RecordValue, string, error) {
- // Parse the log entry data as Protocol Buffer (not JSON!)
+ // Try to unmarshal as RecordValue first (schematized data)
recordValue := &schema_pb.RecordValue{}
- if err := proto.Unmarshal(logEntry.Data, recordValue); err != nil {
- return nil, "", fmt.Errorf("failed to unmarshal log entry protobuf: %v", err)
- }
+ err := proto.Unmarshal(logEntry.Data, recordValue)
+ if err == nil {
+ // Successfully unmarshaled as RecordValue (valid protobuf)
+ // Initialize Fields map if nil
+ if recordValue.Fields == nil {
+ recordValue.Fields = make(map[string]*schema_pb.Value)
+ }
- // Ensure Fields map exists
- if recordValue.Fields == nil {
- recordValue.Fields = make(map[string]*schema_pb.Value)
- }
+ // Add system columns from LogEntry
+ recordValue.Fields[SW_COLUMN_NAME_TIMESTAMP] = &schema_pb.Value{
+ Kind: &schema_pb.Value_Int64Value{Int64Value: logEntry.TsNs},
+ }
+ recordValue.Fields[SW_COLUMN_NAME_KEY] = &schema_pb.Value{
+ Kind: &schema_pb.Value_BytesValue{BytesValue: logEntry.Key},
+ }
- // Add system columns
- recordValue.Fields[SW_COLUMN_NAME_TIMESTAMP] = &schema_pb.Value{
- Kind: &schema_pb.Value_Int64Value{Int64Value: logEntry.TsNs},
- }
- recordValue.Fields[SW_COLUMN_NAME_KEY] = &schema_pb.Value{
- Kind: &schema_pb.Value_BytesValue{BytesValue: logEntry.Key},
+ return recordValue, "live_log", nil
}
- // User data fields are already present in the protobuf-deserialized recordValue
- // No additional processing needed since proto.Unmarshal already populated the Fields map
-
- return recordValue, "live_log", nil
+ // Failed to unmarshal as RecordValue - invalid protobuf data
+ return nil, "", fmt.Errorf("failed to unmarshal log entry protobuf: %w", err)
}
// extractTimestampFromFilename extracts timestamp from parquet filename
@@ -4782,7 +4937,7 @@ func (e *SQLEngine) findColumnValue(result HybridScanResult, columnName string)
// discoverAndRegisterTopic attempts to discover an existing topic and register it in the SQL catalog
func (e *SQLEngine) discoverAndRegisterTopic(ctx context.Context, database, tableName string) error {
// First, check if topic exists by trying to get its schema from the broker/filer
- recordType, err := e.catalog.brokerClient.GetTopicSchema(ctx, database, tableName)
+ recordType, _, _, err := e.catalog.brokerClient.GetTopicSchema(ctx, database, tableName)
if err != nil {
return fmt.Errorf("topic %s.%s not found or no schema available: %v", database, tableName, err)
}
diff --git a/weed/query/engine/engine_test.go b/weed/query/engine/engine_test.go
index 8193afef6..96c5507b0 100644
--- a/weed/query/engine/engine_test.go
+++ b/weed/query/engine/engine_test.go
@@ -1101,7 +1101,7 @@ func TestSQLEngine_ConvertLogEntryToRecordValue_ComplexDataTypes(t *testing.T) {
"float_field": {Kind: &schema_pb.Value_FloatValue{FloatValue: 3.14159}},
"double_field": {Kind: &schema_pb.Value_DoubleValue{DoubleValue: 2.718281828}},
"bool_field": {Kind: &schema_pb.Value_BoolValue{BoolValue: true}},
- "string_field": {Kind: &schema_pb.Value_StringValue{StringValue: "test string with unicode 🎉"}},
+ "string_field": {Kind: &schema_pb.Value_StringValue{StringValue: "test string with unicode party"}},
"bytes_field": {Kind: &schema_pb.Value_BytesValue{BytesValue: []byte{0x01, 0x02, 0x03}}},
},
}
@@ -1129,7 +1129,7 @@ func TestSQLEngine_ConvertLogEntryToRecordValue_ComplexDataTypes(t *testing.T) {
assert.Equal(t, float32(3.14159), result.Fields["float_field"].GetFloatValue())
assert.Equal(t, 2.718281828, result.Fields["double_field"].GetDoubleValue())
assert.Equal(t, true, result.Fields["bool_field"].GetBoolValue())
- assert.Equal(t, "test string with unicode 🎉", result.Fields["string_field"].GetStringValue())
+ assert.Equal(t, "test string with unicode party", result.Fields["string_field"].GetStringValue())
assert.Equal(t, []byte{0x01, 0x02, 0x03}, result.Fields["bytes_field"].GetBytesValue())
// System columns should still be present
diff --git a/weed/query/engine/fast_path_predicate_validation_test.go b/weed/query/engine/fast_path_predicate_validation_test.go
index 3322ed51f..3918fdbf0 100644
--- a/weed/query/engine/fast_path_predicate_validation_test.go
+++ b/weed/query/engine/fast_path_predicate_validation_test.go
@@ -93,7 +93,7 @@ func TestFastPathPredicateValidation(t *testing.T) {
},
{
name: "Internal timestamp column",
- whereClause: "_timestamp_ns > 1640995200000000000",
+ whereClause: "_ts_ns > 1640995200000000000",
expectedTimeOnly: true,
expectedStartTimeNs: 1640995200000000000,
description: "Internal timestamp column should allow fast path",
@@ -139,7 +139,7 @@ func TestFastPathPredicateValidation(t *testing.T) {
t.Errorf("Expected stopTimeNs=%d, got %d", tc.expectedStopTimeNs, stopTimeNs)
}
- t.Logf("✅ %s: onlyTimePredicates=%v, startTimeNs=%d, stopTimeNs=%d",
+ t.Logf("%s: onlyTimePredicates=%v, startTimeNs=%d, stopTimeNs=%d",
tc.name, onlyTimePredicates, startTimeNs, stopTimeNs)
})
}
@@ -212,7 +212,7 @@ func TestFastPathAggregationSafety(t *testing.T) {
tc.shouldUseFastPath, canAttemptFastPath, tc.description)
}
- t.Logf("✅ %s: canAttemptFastPath=%v (onlyTimePredicates=%v, startTimeNs=%d, stopTimeNs=%d)",
+ t.Logf("%s: canAttemptFastPath=%v (onlyTimePredicates=%v, startTimeNs=%d, stopTimeNs=%d)",
tc.name, canAttemptFastPath, onlyTimePredicates, startTimeNs, stopTimeNs)
})
}
@@ -233,7 +233,7 @@ func TestTimestampColumnDetection(t *testing.T) {
description: "System timestamp display column should be detected",
},
{
- columnName: "_timestamp_ns",
+ columnName: "_ts_ns",
isTimestamp: true,
description: "Internal timestamp column should be detected",
},
@@ -266,7 +266,7 @@ func TestTimestampColumnDetection(t *testing.T) {
t.Errorf("Expected isTimestampColumn(%s)=%v, got %v. %s",
tc.columnName, tc.isTimestamp, isTimestamp, tc.description)
}
- t.Logf("✅ Column '%s': isTimestamp=%v", tc.columnName, isTimestamp)
+ t.Logf("Column '%s': isTimestamp=%v", tc.columnName, isTimestamp)
})
}
}
diff --git a/weed/query/engine/hybrid_message_scanner.go b/weed/query/engine/hybrid_message_scanner.go
index eee57bc23..c09ce2f54 100644
--- a/weed/query/engine/hybrid_message_scanner.go
+++ b/weed/query/engine/hybrid_message_scanner.go
@@ -15,6 +15,7 @@ import (
"github.com/parquet-go/parquet-go"
"github.com/seaweedfs/seaweedfs/weed/filer"
+ "github.com/seaweedfs/seaweedfs/weed/mq"
"github.com/seaweedfs/seaweedfs/weed/mq/logstore"
"github.com/seaweedfs/seaweedfs/weed/mq/schema"
"github.com/seaweedfs/seaweedfs/weed/mq/topic"
@@ -41,6 +42,7 @@ type HybridMessageScanner struct {
brokerClient BrokerClientInterface // For querying unflushed data
topic topic.Topic
recordSchema *schema_pb.RecordType
+ schemaFormat string // Serialization format: "AVRO", "PROTOBUF", "JSON_SCHEMA", or empty for schemaless
parquetLevels *schema.ParquetLevels
engine *SQLEngine // Reference for system column formatting
}
@@ -59,26 +61,32 @@ func NewHybridMessageScanner(filerClient filer_pb.FilerClient, brokerClient Brok
Name: topicName,
}
- // Get topic schema from broker client (works with both real and mock clients)
- recordType, err := brokerClient.GetTopicSchema(context.Background(), namespace, topicName)
+ // Get flat schema from broker client
+ recordType, _, schemaFormat, err := brokerClient.GetTopicSchema(context.Background(), namespace, topicName)
if err != nil {
- return nil, fmt.Errorf("failed to get topic schema: %v", err)
- }
- if recordType == nil {
- return nil, NoSchemaError{Namespace: namespace, Topic: topicName}
+ return nil, fmt.Errorf("failed to get topic record type: %v", err)
}
- // Create a copy of the recordType to avoid modifying the original
- recordTypeCopy := &schema_pb.RecordType{
- Fields: make([]*schema_pb.Field, len(recordType.Fields)),
- }
- copy(recordTypeCopy.Fields, recordType.Fields)
+ if recordType == nil || len(recordType.Fields) == 0 {
+ // For topics without schema, create a minimal schema with system fields and _value
+ recordType = schema.RecordTypeBegin().
+ WithField(SW_COLUMN_NAME_TIMESTAMP, schema.TypeInt64).
+ WithField(SW_COLUMN_NAME_KEY, schema.TypeBytes).
+ WithField(SW_COLUMN_NAME_VALUE, schema.TypeBytes). // Raw message value
+ RecordTypeEnd()
+ } else {
+ // Create a copy of the recordType to avoid modifying the original
+ recordTypeCopy := &schema_pb.RecordType{
+ Fields: make([]*schema_pb.Field, len(recordType.Fields)),
+ }
+ copy(recordTypeCopy.Fields, recordType.Fields)
- // Add system columns that MQ adds to all records
- recordType = schema.NewRecordTypeBuilder(recordTypeCopy).
- WithField(SW_COLUMN_NAME_TIMESTAMP, schema.TypeInt64).
- WithField(SW_COLUMN_NAME_KEY, schema.TypeBytes).
- RecordTypeEnd()
+ // Add system columns that MQ adds to all records
+ recordType = schema.NewRecordTypeBuilder(recordTypeCopy).
+ WithField(SW_COLUMN_NAME_TIMESTAMP, schema.TypeInt64).
+ WithField(SW_COLUMN_NAME_KEY, schema.TypeBytes).
+ RecordTypeEnd()
+ }
// Convert to Parquet levels for efficient reading
parquetLevels, err := schema.ToParquetLevels(recordType)
@@ -91,6 +99,7 @@ func NewHybridMessageScanner(filerClient filer_pb.FilerClient, brokerClient Brok
brokerClient: brokerClient,
topic: t,
recordSchema: recordType,
+ schemaFormat: schemaFormat,
parquetLevels: parquetLevels,
engine: engine,
}, nil
@@ -335,9 +344,6 @@ func (hms *HybridMessageScanner) scanUnflushedDataWithStats(ctx context.Context,
unflushedEntries, err := hms.brokerClient.GetUnflushedMessages(ctx, hms.topic.Namespace, hms.topic.Name, partition, options.StartTimeNs)
if err != nil {
// Log error but don't fail the query - continue with disk data only
- if isDebugMode(ctx) {
- fmt.Printf("Debug: Failed to get unflushed messages: %v\n", err)
- }
// Reset queried flag on error
stats.BrokerBufferQueried = false
return results, stats, nil
@@ -346,18 +352,19 @@ func (hms *HybridMessageScanner) scanUnflushedDataWithStats(ctx context.Context,
// Capture stats for EXPLAIN
stats.BrokerBufferMessages = len(unflushedEntries)
- // Debug logging for EXPLAIN mode
- if isDebugMode(ctx) {
- fmt.Printf("Debug: Broker buffer queried - found %d unflushed messages\n", len(unflushedEntries))
- if len(unflushedEntries) > 0 {
- fmt.Printf("Debug: Using buffer_start deduplication for precise real-time data\n")
- }
- }
-
// Step 2: Process unflushed entries (already deduplicated by broker)
for _, logEntry := range unflushedEntries {
+ // Pre-decode DataMessage for reuse in both control check and conversion
+ var dataMessage *mq_pb.DataMessage
+ if len(logEntry.Data) > 0 {
+ dataMessage = &mq_pb.DataMessage{}
+ if err := proto.Unmarshal(logEntry.Data, dataMessage); err != nil {
+ dataMessage = nil // Failed to decode, treat as raw data
+ }
+ }
+
// Skip control entries without actual data
- if hms.isControlEntry(logEntry) {
+ if hms.isControlEntryWithDecoded(logEntry, dataMessage) {
continue // Skip this entry
}
@@ -370,11 +377,8 @@ func (hms *HybridMessageScanner) scanUnflushedDataWithStats(ctx context.Context,
}
// Convert LogEntry to RecordValue format (same as disk data)
- recordValue, _, err := hms.convertLogEntryToRecordValue(logEntry)
+ recordValue, _, err := hms.convertLogEntryToRecordValueWithDecoded(logEntry, dataMessage)
if err != nil {
- if isDebugMode(ctx) {
- fmt.Printf("Debug: Failed to convert unflushed log entry: %v\n", err)
- }
continue // Skip malformed messages
}
@@ -429,10 +433,6 @@ func (hms *HybridMessageScanner) scanUnflushedDataWithStats(ctx context.Context,
}
}
- if isDebugMode(ctx) {
- fmt.Printf("Debug: Retrieved %d unflushed messages from broker\n", len(results))
- }
-
return results, stats, nil
}
@@ -543,12 +543,8 @@ func (hms *HybridMessageScanner) scanPartitionHybridWithStats(ctx context.Contex
if err != nil {
// Don't fail the query if broker scanning fails, but provide clear warning to user
// This ensures users are aware that results may not include the most recent data
- if isDebugMode(ctx) {
- fmt.Printf("Debug: Failed to scan unflushed data from broker: %v\n", err)
- } else {
- fmt.Printf("Warning: Unable to access real-time data from message broker: %v\n", err)
- fmt.Printf("Note: Query results may not include the most recent unflushed messages\n")
- }
+ fmt.Printf("Warning: Unable to access real-time data from message broker: %v\n", err)
+ fmt.Printf("Note: Query results may not include the most recent unflushed messages\n")
} else if unflushedStats != nil {
stats.BrokerBufferQueried = unflushedStats.BrokerBufferQueried
stats.BrokerBufferMessages = unflushedStats.BrokerBufferMessages
@@ -652,35 +648,114 @@ func (hms *HybridMessageScanner) countLiveLogFiles(partition topic.Partition) (i
// Based on MQ system analysis, control entries are:
// 1. DataMessages with populated Ctrl field (publisher close signals)
// 2. Entries with empty keys (as filtered by subscriber)
-// 3. Entries with no data
+// NOTE: Messages with empty data but valid keys (like NOOP messages) are NOT control entries
func (hms *HybridMessageScanner) isControlEntry(logEntry *filer_pb.LogEntry) bool {
- // Skip entries with no data
- if len(logEntry.Data) == 0 {
- return true
+ // Pre-decode DataMessage if needed
+ var dataMessage *mq_pb.DataMessage
+ if len(logEntry.Data) > 0 {
+ dataMessage = &mq_pb.DataMessage{}
+ if err := proto.Unmarshal(logEntry.Data, dataMessage); err != nil {
+ dataMessage = nil // Failed to decode, treat as raw data
+ }
}
+ return hms.isControlEntryWithDecoded(logEntry, dataMessage)
+}
+// isControlEntryWithDecoded checks if a log entry is a control entry using pre-decoded DataMessage
+// This avoids duplicate protobuf unmarshaling when the DataMessage is already decoded
+func (hms *HybridMessageScanner) isControlEntryWithDecoded(logEntry *filer_pb.LogEntry, dataMessage *mq_pb.DataMessage) bool {
// Skip entries with empty keys (same logic as subscriber)
if len(logEntry.Key) == 0 {
return true
}
// Check if this is a DataMessage with control field populated
- dataMessage := &mq_pb.DataMessage{}
- if err := proto.Unmarshal(logEntry.Data, dataMessage); err == nil {
- // If it has a control field, it's a control message
- if dataMessage.Ctrl != nil {
- return true
- }
+ if dataMessage != nil && dataMessage.Ctrl != nil {
+ return true
}
+ // Messages with valid keys (even if data is empty) are legitimate messages
+ // Examples: NOOP messages from Schema Registry
return false
}
+// isNullOrEmpty checks if a schema_pb.Value is null or empty
+func isNullOrEmpty(value *schema_pb.Value) bool {
+ if value == nil {
+ return true
+ }
+
+ switch v := value.Kind.(type) {
+ case *schema_pb.Value_StringValue:
+ return v.StringValue == ""
+ case *schema_pb.Value_BytesValue:
+ return len(v.BytesValue) == 0
+ case *schema_pb.Value_ListValue:
+ return v.ListValue == nil || len(v.ListValue.Values) == 0
+ case nil:
+ return true // No kind set means null
+ default:
+ return false
+ }
+}
+
+// isSchemaless checks if the scanner is configured for a schema-less topic
+// Schema-less topics only have system fields: _ts_ns, _key, and _value
+func (hms *HybridMessageScanner) isSchemaless() bool {
+ // Schema-less topics only have system fields: _ts_ns, _key, and _value
+ // System topics like _schemas are NOT schema-less - they have structured data
+ // We just need to map their fields during read
+
+ if hms.recordSchema == nil {
+ return false
+ }
+
+ // Count only non-system data fields (exclude _ts_ns and _key which are always present)
+ // Schema-less topics should only have _value as the data field
+ hasValue := false
+ dataFieldCount := 0
+
+ for _, field := range hms.recordSchema.Fields {
+ switch field.Name {
+ case SW_COLUMN_NAME_TIMESTAMP, SW_COLUMN_NAME_KEY:
+ // System fields - ignore
+ continue
+ case SW_COLUMN_NAME_VALUE:
+ hasValue = true
+ dataFieldCount++
+ default:
+ // Any other field means it's not schema-less
+ dataFieldCount++
+ }
+ }
+
+ // Schema-less = only has _value field as the data field (plus system fields)
+ return hasValue && dataFieldCount == 1
+}
+
// convertLogEntryToRecordValue converts a filer_pb.LogEntry to schema_pb.RecordValue
// This handles both:
// 1. Live log entries (raw message format)
// 2. Parquet entries (already in schema_pb.RecordValue format)
+// 3. Schema-less topics (raw bytes in _value field)
func (hms *HybridMessageScanner) convertLogEntryToRecordValue(logEntry *filer_pb.LogEntry) (*schema_pb.RecordValue, string, error) {
+ // For schema-less topics, put raw data directly into _value field
+ if hms.isSchemaless() {
+ recordValue := &schema_pb.RecordValue{
+ Fields: make(map[string]*schema_pb.Value),
+ }
+ recordValue.Fields[SW_COLUMN_NAME_TIMESTAMP] = &schema_pb.Value{
+ Kind: &schema_pb.Value_Int64Value{Int64Value: logEntry.TsNs},
+ }
+ recordValue.Fields[SW_COLUMN_NAME_KEY] = &schema_pb.Value{
+ Kind: &schema_pb.Value_BytesValue{BytesValue: logEntry.Key},
+ }
+ recordValue.Fields[SW_COLUMN_NAME_VALUE] = &schema_pb.Value{
+ Kind: &schema_pb.Value_BytesValue{BytesValue: logEntry.Data},
+ }
+ return recordValue, "live_log", nil
+ }
+
// Try to unmarshal as RecordValue first (Parquet format)
recordValue := &schema_pb.RecordValue{}
if err := proto.Unmarshal(logEntry.Data, recordValue); err == nil {
@@ -705,6 +780,14 @@ func (hms *HybridMessageScanner) convertLogEntryToRecordValue(logEntry *filer_pb
return hms.parseRawMessageWithSchema(logEntry)
}
+// min returns the minimum of two integers
+func min(a, b int) int {
+ if a < b {
+ return a
+ }
+ return b
+}
+
// parseRawMessageWithSchema parses raw live message data using the topic's schema
// This provides proper type conversion and field mapping instead of treating everything as strings
func (hms *HybridMessageScanner) parseRawMessageWithSchema(logEntry *filer_pb.LogEntry) (*schema_pb.RecordValue, string, error) {
@@ -722,51 +805,136 @@ func (hms *HybridMessageScanner) parseRawMessageWithSchema(logEntry *filer_pb.Lo
// Parse message data based on schema
if hms.recordSchema == nil || len(hms.recordSchema.Fields) == 0 {
- // Fallback: No schema available, treat as single "data" field
- recordValue.Fields["data"] = &schema_pb.Value{
- Kind: &schema_pb.Value_StringValue{StringValue: string(logEntry.Data)},
+ // Fallback: No schema available, use "_value" for schema-less topics only
+ if hms.isSchemaless() {
+ recordValue.Fields[SW_COLUMN_NAME_VALUE] = &schema_pb.Value{
+ Kind: &schema_pb.Value_BytesValue{BytesValue: logEntry.Data},
+ }
}
return recordValue, "live_log", nil
}
- // Attempt schema-aware parsing
- // Strategy 1: Try JSON parsing first (most common for live messages)
- if parsedRecord, err := hms.parseJSONMessage(logEntry.Data); err == nil {
- // Successfully parsed as JSON, merge with system columns
- for fieldName, fieldValue := range parsedRecord.Fields {
- recordValue.Fields[fieldName] = fieldValue
+ // Use schema format to directly choose the right decoder
+ // This avoids trying multiple decoders and improves performance
+ var parsedRecord *schema_pb.RecordValue
+ var err error
+
+ switch hms.schemaFormat {
+ case "AVRO":
+ // AVRO format - use Avro decoder
+ // Note: Avro decoding requires schema registry integration
+ // For now, fall through to JSON as many Avro messages are also valid JSON
+ parsedRecord, err = hms.parseJSONMessage(logEntry.Data)
+ case "PROTOBUF":
+ // PROTOBUF format - use protobuf decoder
+ parsedRecord, err = hms.parseProtobufMessage(logEntry.Data)
+ case "JSON_SCHEMA", "":
+ // JSON_SCHEMA format or empty (default to JSON)
+ // JSON is the most common format for schema registry
+ parsedRecord, err = hms.parseJSONMessage(logEntry.Data)
+ if err != nil {
+ // Try protobuf as fallback
+ parsedRecord, err = hms.parseProtobufMessage(logEntry.Data)
+ }
+ default:
+ // Unknown format - try JSON first, then protobuf as fallback
+ parsedRecord, err = hms.parseJSONMessage(logEntry.Data)
+ if err != nil {
+ parsedRecord, err = hms.parseProtobufMessage(logEntry.Data)
}
- return recordValue, "live_log", nil
}
- // Strategy 2: Try protobuf parsing (binary messages)
- if parsedRecord, err := hms.parseProtobufMessage(logEntry.Data); err == nil {
- // Successfully parsed as protobuf, merge with system columns
+ if err == nil && parsedRecord != nil {
+ // Successfully parsed, merge with system columns
for fieldName, fieldValue := range parsedRecord.Fields {
recordValue.Fields[fieldName] = fieldValue
}
return recordValue, "live_log", nil
}
- // Strategy 3: Fallback to single field with raw data
- // If schema has a single field, map the raw data to it with type conversion
+ // Fallback: If schema has a single field, map the raw data to it with type conversion
if len(hms.recordSchema.Fields) == 1 {
field := hms.recordSchema.Fields[0]
- convertedValue, err := hms.convertRawDataToSchemaValue(logEntry.Data, field.Type)
- if err == nil {
+ convertedValue, convErr := hms.convertRawDataToSchemaValue(logEntry.Data, field.Type)
+ if convErr == nil {
recordValue.Fields[field.Name] = convertedValue
return recordValue, "live_log", nil
}
}
- // Final fallback: treat as string data field
- recordValue.Fields["data"] = &schema_pb.Value{
- Kind: &schema_pb.Value_StringValue{StringValue: string(logEntry.Data)},
+ // Final fallback: treat as bytes field for schema-less topics only
+ if hms.isSchemaless() {
+ recordValue.Fields[SW_COLUMN_NAME_VALUE] = &schema_pb.Value{
+ Kind: &schema_pb.Value_BytesValue{BytesValue: logEntry.Data},
+ }
}
return recordValue, "live_log", nil
}
+// convertLogEntryToRecordValueWithDecoded converts a filer_pb.LogEntry to schema_pb.RecordValue
+// using a pre-decoded DataMessage to avoid duplicate protobuf unmarshaling
+func (hms *HybridMessageScanner) convertLogEntryToRecordValueWithDecoded(logEntry *filer_pb.LogEntry, dataMessage *mq_pb.DataMessage) (*schema_pb.RecordValue, string, error) {
+ // IMPORTANT: Check for schema-less topics FIRST
+ // Schema-less topics (like _schemas) should store raw data directly in _value field
+ if hms.isSchemaless() {
+ recordValue := &schema_pb.RecordValue{
+ Fields: make(map[string]*schema_pb.Value),
+ }
+ recordValue.Fields[SW_COLUMN_NAME_TIMESTAMP] = &schema_pb.Value{
+ Kind: &schema_pb.Value_Int64Value{Int64Value: logEntry.TsNs},
+ }
+ recordValue.Fields[SW_COLUMN_NAME_KEY] = &schema_pb.Value{
+ Kind: &schema_pb.Value_BytesValue{BytesValue: logEntry.Key},
+ }
+ recordValue.Fields[SW_COLUMN_NAME_VALUE] = &schema_pb.Value{
+ Kind: &schema_pb.Value_BytesValue{BytesValue: logEntry.Data},
+ }
+ return recordValue, "live_log", nil
+ }
+
+ // CRITICAL: The broker stores DataMessage.Value directly in LogEntry.Data
+ // So we need to try unmarshaling LogEntry.Data as RecordValue first
+ var recordValueBytes []byte
+
+ if dataMessage != nil && len(dataMessage.Value) > 0 {
+ // DataMessage has a Value field - use it
+ recordValueBytes = dataMessage.Value
+ } else {
+ // DataMessage doesn't have Value, use LogEntry.Data directly
+ // This is the normal case when broker stores messages
+ recordValueBytes = logEntry.Data
+ }
+
+ // Try to unmarshal as RecordValue
+ if len(recordValueBytes) > 0 {
+ recordValue := &schema_pb.RecordValue{}
+ if err := proto.Unmarshal(recordValueBytes, recordValue); err == nil {
+ // Successfully unmarshaled as RecordValue
+
+ // Ensure Fields map exists
+ if recordValue.Fields == nil {
+ recordValue.Fields = make(map[string]*schema_pb.Value)
+ }
+
+ // Add system columns from LogEntry
+ recordValue.Fields[SW_COLUMN_NAME_TIMESTAMP] = &schema_pb.Value{
+ Kind: &schema_pb.Value_Int64Value{Int64Value: logEntry.TsNs},
+ }
+ recordValue.Fields[SW_COLUMN_NAME_KEY] = &schema_pb.Value{
+ Kind: &schema_pb.Value_BytesValue{BytesValue: logEntry.Key},
+ }
+
+ return recordValue, "live_log", nil
+ }
+ // If unmarshaling as RecordValue fails, fall back to schema-aware parsing
+ }
+
+ // For cases where protobuf unmarshaling failed or data is empty,
+ // attempt schema-aware parsing to try JSON, protobuf, and other formats
+ return hms.parseRawMessageWithSchema(logEntry)
+}
+
// parseJSONMessage attempts to parse raw data as JSON and map to schema fields
func (hms *HybridMessageScanner) parseJSONMessage(data []byte) (*schema_pb.RecordValue, error) {
// Try to parse as JSON
@@ -950,6 +1118,11 @@ func (hms *HybridMessageScanner) ConvertToSQLResult(results []HybridScanResult,
for columnName := range columnSet {
columns = append(columns, columnName)
}
+
+ // If no data columns were found, include system columns so we have something to display
+ if len(columns) == 0 {
+ columns = []string{SW_DISPLAY_NAME_TIMESTAMP, SW_COLUMN_NAME_KEY}
+ }
}
// Convert to SQL rows
@@ -1037,6 +1210,11 @@ func (hms *HybridMessageScanner) ConvertToSQLResultWithMixedColumns(results []Hy
columns = append(columns, col)
}
+ // If no data columns were found and no explicit columns specified, include system columns
+ if len(columns) == 0 {
+ columns = []string{SW_DISPLAY_NAME_TIMESTAMP, SW_COLUMN_NAME_KEY}
+ }
+
// Convert to SQL rows
rows := make([][]sqltypes.Value, len(results))
for i, result := range results {
@@ -1123,10 +1301,10 @@ func (h *HybridMessageScanner) extractParquetFileStats(entry *filer_pb.Entry, lo
}
// Populate optional min/max from filer extended attributes (writer stores ns timestamps)
if entry != nil && entry.Extended != nil {
- if minBytes, ok := entry.Extended["min"]; ok && len(minBytes) == 8 {
+ if minBytes, ok := entry.Extended[mq.ExtendedAttrTimestampMin]; ok && len(minBytes) == 8 {
fileStats.MinTimestampNs = int64(binary.BigEndian.Uint64(minBytes))
}
- if maxBytes, ok := entry.Extended["max"]; ok && len(maxBytes) == 8 {
+ if maxBytes, ok := entry.Extended[mq.ExtendedAttrTimestampMax]; ok && len(maxBytes) == 8 {
fileStats.MaxTimestampNs = int64(binary.BigEndian.Uint64(maxBytes))
}
}
@@ -1538,13 +1716,22 @@ func (s *StreamingFlushedDataSource) startStreaming() {
// Message processing function
eachLogEntryFn := func(logEntry *filer_pb.LogEntry) (isDone bool, err error) {
+ // Pre-decode DataMessage for reuse in both control check and conversion
+ var dataMessage *mq_pb.DataMessage
+ if len(logEntry.Data) > 0 {
+ dataMessage = &mq_pb.DataMessage{}
+ if err := proto.Unmarshal(logEntry.Data, dataMessage); err != nil {
+ dataMessage = nil // Failed to decode, treat as raw data
+ }
+ }
+
// Skip control entries without actual data
- if s.hms.isControlEntry(logEntry) {
+ if s.hms.isControlEntryWithDecoded(logEntry, dataMessage) {
return false, nil // Skip this entry
}
// Convert log entry to schema_pb.RecordValue for consistent processing
- recordValue, source, convertErr := s.hms.convertLogEntryToRecordValue(logEntry)
+ recordValue, source, convertErr := s.hms.convertLogEntryToRecordValueWithDecoded(logEntry, dataMessage)
if convertErr != nil {
return false, fmt.Errorf("failed to convert log entry: %v", convertErr)
}
diff --git a/weed/query/engine/mock_test.go b/weed/query/engine/mock_test.go
index d00ec1761..697c98494 100644
--- a/weed/query/engine/mock_test.go
+++ b/weed/query/engine/mock_test.go
@@ -27,13 +27,16 @@ func TestMockBrokerClient_BasicFunctionality(t *testing.T) {
}
// Test GetTopicSchema
- schema, err := mockBroker.GetTopicSchema(context.Background(), "default", "user_events")
+ schema, keyColumns, _, err := mockBroker.GetTopicSchema(context.Background(), "default", "user_events")
if err != nil {
t.Fatalf("Expected no error, got %v", err)
}
if len(schema.Fields) != 3 {
t.Errorf("Expected 3 fields in user_events schema, got %d", len(schema.Fields))
}
+ if len(keyColumns) == 0 {
+ t.Error("Expected at least one key column")
+ }
}
func TestMockBrokerClient_FailureScenarios(t *testing.T) {
@@ -53,7 +56,7 @@ func TestMockBrokerClient_FailureScenarios(t *testing.T) {
t.Error("Expected error when mock is configured to fail")
}
- _, err = mockBroker.GetTopicSchema(context.Background(), "default", "user_events")
+ _, _, _, err = mockBroker.GetTopicSchema(context.Background(), "default", "user_events")
if err == nil {
t.Error("Expected error when mock is configured to fail")
}
@@ -81,7 +84,7 @@ func TestMockBrokerClient_TopicManagement(t *testing.T) {
mockBroker := NewMockBrokerClient()
// Test ConfigureTopic (add a new topic)
- err := mockBroker.ConfigureTopic(context.Background(), "test", "new-topic", 1, nil)
+ err := mockBroker.ConfigureTopic(context.Background(), "test", "new-topic", 1, nil, []string{})
if err != nil {
t.Fatalf("Expected no error, got %v", err)
}
diff --git a/weed/query/engine/mocks_test.go b/weed/query/engine/mocks_test.go
index 733d99af7..2f72ed9ed 100644
--- a/weed/query/engine/mocks_test.go
+++ b/weed/query/engine/mocks_test.go
@@ -879,17 +879,51 @@ func (m *MockBrokerClient) ListTopics(ctx context.Context, namespace string) ([]
return []string{}, nil
}
-// GetTopicSchema returns the mock schema for a topic
-func (m *MockBrokerClient) GetTopicSchema(ctx context.Context, namespace, topic string) (*schema_pb.RecordType, error) {
+// GetTopicSchema returns flat schema and key columns for a topic
+func (m *MockBrokerClient) GetTopicSchema(ctx context.Context, namespace, topic string) (*schema_pb.RecordType, []string, string, error) {
if m.shouldFail {
- return nil, fmt.Errorf("mock broker failure: %s", m.failMessage)
+ return nil, nil, "", fmt.Errorf("mock broker failure: %s", m.failMessage)
}
key := fmt.Sprintf("%s.%s", namespace, topic)
if schema, exists := m.schemas[key]; exists {
- return schema, nil
+ // For testing, assume first field is key column
+ var keyColumns []string
+ if len(schema.Fields) > 0 {
+ keyColumns = []string{schema.Fields[0].Name}
+ }
+ return schema, keyColumns, "", nil // Schema format empty for mocks
+ }
+ return nil, nil, "", fmt.Errorf("topic %s not found", key)
+}
+
+// ConfigureTopic creates or modifies a topic using flat schema format
+func (m *MockBrokerClient) ConfigureTopic(ctx context.Context, namespace, topicName string, partitionCount int32, flatSchema *schema_pb.RecordType, keyColumns []string) error {
+ if m.shouldFail {
+ return fmt.Errorf("mock broker failure: %s", m.failMessage)
+ }
+
+ // Store the schema for future retrieval
+ key := fmt.Sprintf("%s.%s", namespace, topicName)
+ m.schemas[key] = flatSchema
+
+ // Add topic to namespace if it doesn't exist
+ if topics, exists := m.topics[namespace]; exists {
+ found := false
+ for _, t := range topics {
+ if t == topicName {
+ found = true
+ break
+ }
+ }
+ if !found {
+ m.topics[namespace] = append(topics, topicName)
+ }
+ } else {
+ m.topics[namespace] = []string{topicName}
}
- return nil, fmt.Errorf("topic %s not found", key)
+
+ return nil
}
// GetFilerClient returns a mock filer client
@@ -960,31 +994,6 @@ func (t *TestHybridMessageScanner) ScanMessages(ctx context.Context, options Hyb
return generateSampleHybridData(t.topicName, options), nil
}
-// ConfigureTopic creates or updates a topic configuration (mock implementation)
-func (m *MockBrokerClient) ConfigureTopic(ctx context.Context, namespace, topicName string, partitionCount int32, recordType *schema_pb.RecordType) error {
- if m.shouldFail {
- return fmt.Errorf("mock broker failure: %s", m.failMessage)
- }
-
- // Store the schema in our mock data
- key := fmt.Sprintf("%s.%s", namespace, topicName)
- m.schemas[key] = recordType
-
- // Add to topics list if not already present
- if topics, exists := m.topics[namespace]; exists {
- for _, topic := range topics {
- if topic == topicName {
- return nil // Already exists
- }
- }
- m.topics[namespace] = append(topics, topicName)
- } else {
- m.topics[namespace] = []string{topicName}
- }
-
- return nil
-}
-
// DeleteTopic removes a topic and all its data (mock implementation)
func (m *MockBrokerClient) DeleteTopic(ctx context.Context, namespace, topicName string) error {
if m.shouldFail {
diff --git a/weed/query/engine/parquet_scanner.go b/weed/query/engine/parquet_scanner.go
index 113cd814a..e4b5252c7 100644
--- a/weed/query/engine/parquet_scanner.go
+++ b/weed/query/engine/parquet_scanner.go
@@ -21,7 +21,7 @@ import (
// Assumptions:
// 1. All MQ messages are stored in Parquet format in topic partitions
// 2. Each partition directory contains dated Parquet files
-// 3. System columns (_timestamp_ns, _key) are added to user schema
+// 3. System columns (_ts_ns, _key) are added to user schema
// 4. Predicate pushdown is used for efficient scanning
type ParquetScanner struct {
filerClient filer_pb.FilerClient
@@ -55,17 +55,28 @@ func NewParquetScanner(filerClient filer_pb.FilerClient, namespace, topicName st
return nil, fmt.Errorf("failed to read topic config: %v", err)
}
- // Build complete schema with system columns
- recordType := topicConf.GetRecordType()
- if recordType == nil {
- return nil, NoSchemaError{Namespace: namespace, Topic: topicName}
+ // Build complete schema with system columns - prefer flat schema if available
+ var recordType *schema_pb.RecordType
+
+ if topicConf.GetMessageRecordType() != nil {
+ // New flat schema format - use directly
+ recordType = topicConf.GetMessageRecordType()
}
- // Add system columns that MQ adds to all records
- recordType = schema.NewRecordTypeBuilder(recordType).
- WithField(SW_COLUMN_NAME_TIMESTAMP, schema.TypeInt64).
- WithField(SW_COLUMN_NAME_KEY, schema.TypeBytes).
- RecordTypeEnd()
+ if recordType == nil || len(recordType.Fields) == 0 {
+ // For topics without schema, create a minimal schema with system fields and _value
+ recordType = schema.RecordTypeBegin().
+ WithField(SW_COLUMN_NAME_TIMESTAMP, schema.TypeInt64).
+ WithField(SW_COLUMN_NAME_KEY, schema.TypeBytes).
+ WithField(SW_COLUMN_NAME_VALUE, schema.TypeBytes). // Raw message value
+ RecordTypeEnd()
+ } else {
+ // Add system columns that MQ adds to all records
+ recordType = schema.NewRecordTypeBuilder(recordType).
+ WithField(SW_COLUMN_NAME_TIMESTAMP, schema.TypeInt64).
+ WithField(SW_COLUMN_NAME_KEY, schema.TypeBytes).
+ RecordTypeEnd()
+ }
// Convert to Parquet levels for efficient reading
parquetLevels, err := schema.ToParquetLevels(recordType)
diff --git a/weed/query/engine/parsing_debug_test.go b/weed/query/engine/parsing_debug_test.go
index 3fa9be17b..6177b0aa6 100644
--- a/weed/query/engine/parsing_debug_test.go
+++ b/weed/query/engine/parsing_debug_test.go
@@ -36,7 +36,7 @@ func TestBasicParsing(t *testing.T) {
if selectStmt.Where != nil {
t.Logf(" WHERE expression type: %T", selectStmt.Where.Expr)
} else {
- t.Logf(" ❌ WHERE clause is NIL - this is the bug!")
+ t.Logf(" WHERE clause is NIL - this is the bug!")
}
} else {
t.Errorf("Expected SelectStatement, got %T", stmt)
@@ -62,10 +62,10 @@ func TestCockroachParserDirectly(t *testing.T) {
if selectStmt, ok := stmt.(*SelectStatement); ok {
if selectStmt.Where == nil {
- t.Errorf("❌ Our ParseSQL is not extracting WHERE clauses!")
+ t.Errorf("Our ParseSQL is not extracting WHERE clauses!")
t.Errorf("This means the issue is in our CockroachDB AST conversion")
} else {
- t.Logf("✅ Our ParseSQL extracted WHERE clause: %T", selectStmt.Where.Expr)
+ t.Logf("Our ParseSQL extracted WHERE clause: %T", selectStmt.Where.Expr)
}
}
}
diff --git a/weed/query/engine/postgresql_only_test.go b/weed/query/engine/postgresql_only_test.go
index d98cab9f0..d40e81b11 100644
--- a/weed/query/engine/postgresql_only_test.go
+++ b/weed/query/engine/postgresql_only_test.go
@@ -67,7 +67,7 @@ func TestPostgreSQLOnlySupport(t *testing.T) {
if tc.shouldError {
// We expect this query to fail
if err == nil && result.Error == nil {
- t.Errorf("❌ Expected error for %s, but query succeeded", tc.desc)
+ t.Errorf("Expected error for %s, but query succeeded", tc.desc)
return
}
@@ -81,7 +81,7 @@ func TestPostgreSQLOnlySupport(t *testing.T) {
}
if !strings.Contains(errorText, tc.errorMsg) {
- t.Errorf("❌ Expected error containing '%s', got: %s", tc.errorMsg, errorText)
+ t.Errorf("Expected error containing '%s', got: %s", tc.errorMsg, errorText)
return
}
}
diff --git a/weed/query/engine/sql_alias_support_test.go b/weed/query/engine/sql_alias_support_test.go
index a081d7183..dbe91f821 100644
--- a/weed/query/engine/sql_alias_support_test.go
+++ b/weed/query/engine/sql_alias_support_test.go
@@ -17,7 +17,7 @@ func TestSQLAliasResolution(t *testing.T) {
// Create SELECT expressions with aliases
selectExprs := []SelectExpr{
&AliasedExpr{
- Expr: &ColName{Name: stringValue("_timestamp_ns")},
+ Expr: &ColName{Name: stringValue("_ts_ns")},
As: aliasValue("ts"),
},
&AliasedExpr{
@@ -28,7 +28,7 @@ func TestSQLAliasResolution(t *testing.T) {
// Test alias resolution
resolved := engine.resolveColumnAlias("ts", selectExprs)
- assert.Equal(t, "_timestamp_ns", resolved, "Should resolve 'ts' alias to '_timestamp_ns'")
+ assert.Equal(t, "_ts_ns", resolved, "Should resolve 'ts' alias to '_ts_ns'")
resolved = engine.resolveColumnAlias("record_id", selectExprs)
assert.Equal(t, "id", resolved, "Should resolve 'record_id' alias to 'id'")
@@ -42,13 +42,13 @@ func TestSQLAliasResolution(t *testing.T) {
// Test using a single alias in WHERE clause
testRecord := &schema_pb.RecordValue{
Fields: map[string]*schema_pb.Value{
- "_timestamp_ns": {Kind: &schema_pb.Value_Int64Value{Int64Value: 1756947416566456262}},
- "id": {Kind: &schema_pb.Value_Int64Value{Int64Value: 12345}},
+ "_ts_ns": {Kind: &schema_pb.Value_Int64Value{Int64Value: 1756947416566456262}},
+ "id": {Kind: &schema_pb.Value_Int64Value{Int64Value: 12345}},
},
}
// Parse SQL with alias in WHERE
- sql := "SELECT _timestamp_ns AS ts, id FROM test WHERE ts = 1756947416566456262"
+ sql := "SELECT _ts_ns AS ts, id FROM test WHERE ts = 1756947416566456262"
stmt, err := ParseSQL(sql)
assert.NoError(t, err, "Should parse SQL with alias in WHERE")
@@ -60,10 +60,10 @@ func TestSQLAliasResolution(t *testing.T) {
// Test the predicate
result := predicate(testRecord)
- assert.True(t, result, "Predicate should match using alias 'ts' for '_timestamp_ns'")
+ assert.True(t, result, "Predicate should match using alias 'ts' for '_ts_ns'")
// Test with non-matching value
- sql2 := "SELECT _timestamp_ns AS ts, id FROM test WHERE ts = 999999"
+ sql2 := "SELECT _ts_ns AS ts, id FROM test WHERE ts = 999999"
stmt2, err := ParseSQL(sql2)
assert.NoError(t, err)
selectStmt2 := stmt2.(*SelectStatement)
@@ -79,13 +79,13 @@ func TestSQLAliasResolution(t *testing.T) {
// Test using multiple aliases in WHERE clause
testRecord := &schema_pb.RecordValue{
Fields: map[string]*schema_pb.Value{
- "_timestamp_ns": {Kind: &schema_pb.Value_Int64Value{Int64Value: 1756947416566456262}},
- "id": {Kind: &schema_pb.Value_Int64Value{Int64Value: 82460}},
+ "_ts_ns": {Kind: &schema_pb.Value_Int64Value{Int64Value: 1756947416566456262}},
+ "id": {Kind: &schema_pb.Value_Int64Value{Int64Value: 82460}},
},
}
// Parse SQL with multiple aliases in WHERE
- sql := "SELECT _timestamp_ns AS ts, id AS record_id FROM test WHERE ts = 1756947416566456262 AND record_id = 82460"
+ sql := "SELECT _ts_ns AS ts, id AS record_id FROM test WHERE ts = 1756947416566456262 AND record_id = 82460"
stmt, err := ParseSQL(sql)
assert.NoError(t, err, "Should parse SQL with multiple aliases")
@@ -102,8 +102,8 @@ func TestSQLAliasResolution(t *testing.T) {
// Test with one condition not matching
testRecord2 := &schema_pb.RecordValue{
Fields: map[string]*schema_pb.Value{
- "_timestamp_ns": {Kind: &schema_pb.Value_Int64Value{Int64Value: 1756947416566456262}},
- "id": {Kind: &schema_pb.Value_Int64Value{Int64Value: 99999}}, // Different ID
+ "_ts_ns": {Kind: &schema_pb.Value_Int64Value{Int64Value: 1756947416566456262}},
+ "id": {Kind: &schema_pb.Value_Int64Value{Int64Value: 99999}}, // Different ID
},
}
@@ -116,23 +116,23 @@ func TestSQLAliasResolution(t *testing.T) {
testRecords := []*schema_pb.RecordValue{
{
Fields: map[string]*schema_pb.Value{
- "_timestamp_ns": {Kind: &schema_pb.Value_Int64Value{Int64Value: 1756947416566456260}}, // Below range
+ "_ts_ns": {Kind: &schema_pb.Value_Int64Value{Int64Value: 1756947416566456260}}, // Below range
},
},
{
Fields: map[string]*schema_pb.Value{
- "_timestamp_ns": {Kind: &schema_pb.Value_Int64Value{Int64Value: 1756947416566456262}}, // In range
+ "_ts_ns": {Kind: &schema_pb.Value_Int64Value{Int64Value: 1756947416566456262}}, // In range
},
},
{
Fields: map[string]*schema_pb.Value{
- "_timestamp_ns": {Kind: &schema_pb.Value_Int64Value{Int64Value: 1756947416566456265}}, // Above range
+ "_ts_ns": {Kind: &schema_pb.Value_Int64Value{Int64Value: 1756947416566456265}}, // Above range
},
},
}
// Test range query with alias
- sql := "SELECT _timestamp_ns AS ts FROM test WHERE ts > 1756947416566456261 AND ts < 1756947416566456264"
+ sql := "SELECT _ts_ns AS ts FROM test WHERE ts > 1756947416566456261 AND ts < 1756947416566456264"
stmt, err := ParseSQL(sql)
assert.NoError(t, err, "Should parse range query with alias")
@@ -150,14 +150,14 @@ func TestSQLAliasResolution(t *testing.T) {
// Test mixing aliased and non-aliased columns in WHERE
testRecord := &schema_pb.RecordValue{
Fields: map[string]*schema_pb.Value{
- "_timestamp_ns": {Kind: &schema_pb.Value_Int64Value{Int64Value: 1756947416566456262}},
- "id": {Kind: &schema_pb.Value_Int64Value{Int64Value: 82460}},
- "status": {Kind: &schema_pb.Value_StringValue{StringValue: "active"}},
+ "_ts_ns": {Kind: &schema_pb.Value_Int64Value{Int64Value: 1756947416566456262}},
+ "id": {Kind: &schema_pb.Value_Int64Value{Int64Value: 82460}},
+ "status": {Kind: &schema_pb.Value_StringValue{StringValue: "active"}},
},
}
// Use alias for one column, direct name for another
- sql := "SELECT _timestamp_ns AS ts, id, status FROM test WHERE ts = 1756947416566456262 AND status = 'active'"
+ sql := "SELECT _ts_ns AS ts, id, status FROM test WHERE ts = 1756947416566456262 AND status = 'active'"
stmt, err := ParseSQL(sql)
assert.NoError(t, err, "Should parse mixed alias/direct query")
@@ -175,13 +175,13 @@ func TestSQLAliasResolution(t *testing.T) {
testRecord := &schema_pb.RecordValue{
Fields: map[string]*schema_pb.Value{
- "_timestamp_ns": {Kind: &schema_pb.Value_Int64Value{Int64Value: largeTimestamp}},
- "id": {Kind: &schema_pb.Value_Int64Value{Int64Value: 897795}},
+ "_ts_ns": {Kind: &schema_pb.Value_Int64Value{Int64Value: largeTimestamp}},
+ "id": {Kind: &schema_pb.Value_Int64Value{Int64Value: 897795}},
},
}
// Test that large timestamp precision is maintained with aliases
- sql := "SELECT _timestamp_ns AS ts, id FROM test WHERE ts = 1756947416566456262"
+ sql := "SELECT _ts_ns AS ts, id FROM test WHERE ts = 1756947416566456262"
stmt, err := ParseSQL(sql)
assert.NoError(t, err)
@@ -193,7 +193,7 @@ func TestSQLAliasResolution(t *testing.T) {
assert.True(t, result, "Large timestamp precision should be maintained with aliases")
// Test precision with off-by-one (should not match)
- sql2 := "SELECT _timestamp_ns AS ts, id FROM test WHERE ts = 1756947416566456263" // +1
+ sql2 := "SELECT _ts_ns AS ts, id FROM test WHERE ts = 1756947416566456263" // +1
stmt2, err := ParseSQL(sql2)
assert.NoError(t, err)
selectStmt2 := stmt2.(*SelectStatement)
@@ -229,7 +229,7 @@ func TestSQLAliasResolution(t *testing.T) {
// Test all comparison operators work with aliases
testRecord := &schema_pb.RecordValue{
Fields: map[string]*schema_pb.Value{
- "_timestamp_ns": {Kind: &schema_pb.Value_Int64Value{Int64Value: 1000}},
+ "_ts_ns": {Kind: &schema_pb.Value_Int64Value{Int64Value: 1000}},
},
}
@@ -252,7 +252,7 @@ func TestSQLAliasResolution(t *testing.T) {
for _, test := range operators {
t.Run(test.op+"_"+test.value, func(t *testing.T) {
- sql := "SELECT _timestamp_ns AS ts FROM test WHERE ts " + test.op + " " + test.value
+ sql := "SELECT _ts_ns AS ts FROM test WHERE ts " + test.op + " " + test.value
stmt, err := ParseSQL(sql)
assert.NoError(t, err, "Should parse operator: %s", test.op)
@@ -270,13 +270,13 @@ func TestSQLAliasResolution(t *testing.T) {
// Ensure non-alias queries still work exactly as before
testRecord := &schema_pb.RecordValue{
Fields: map[string]*schema_pb.Value{
- "_timestamp_ns": {Kind: &schema_pb.Value_Int64Value{Int64Value: 1756947416566456262}},
- "id": {Kind: &schema_pb.Value_Int64Value{Int64Value: 12345}},
+ "_ts_ns": {Kind: &schema_pb.Value_Int64Value{Int64Value: 1756947416566456262}},
+ "id": {Kind: &schema_pb.Value_Int64Value{Int64Value: 12345}},
},
}
// Test traditional query (no aliases)
- sql := "SELECT _timestamp_ns, id FROM test WHERE _timestamp_ns = 1756947416566456262"
+ sql := "SELECT _ts_ns, id FROM test WHERE _ts_ns = 1756947416566456262"
stmt, err := ParseSQL(sql)
assert.NoError(t, err)
@@ -307,13 +307,13 @@ func TestAliasIntegrationWithProductionScenarios(t *testing.T) {
// Test the exact query pattern that was originally failing
testRecord := &schema_pb.RecordValue{
Fields: map[string]*schema_pb.Value{
- "_timestamp_ns": {Kind: &schema_pb.Value_Int64Value{Int64Value: 1756913789829292386}},
- "id": {Kind: &schema_pb.Value_Int64Value{Int64Value: 82460}},
+ "_ts_ns": {Kind: &schema_pb.Value_Int64Value{Int64Value: 1756913789829292386}},
+ "id": {Kind: &schema_pb.Value_Int64Value{Int64Value: 82460}},
},
}
// This was the original failing pattern
- sql := "SELECT id, _timestamp_ns AS ts FROM ecommerce.user_events WHERE ts = 1756913789829292386"
+ sql := "SELECT id, _ts_ns AS ts FROM ecommerce.user_events WHERE ts = 1756913789829292386"
stmt, err := ParseSQL(sql)
assert.NoError(t, err, "Should parse the originally failing query pattern")
@@ -329,16 +329,16 @@ func TestAliasIntegrationWithProductionScenarios(t *testing.T) {
// Test a more complex production-like query
testRecord := &schema_pb.RecordValue{
Fields: map[string]*schema_pb.Value{
- "_timestamp_ns": {Kind: &schema_pb.Value_Int64Value{Int64Value: 1756947416566456262}},
- "id": {Kind: &schema_pb.Value_Int64Value{Int64Value: 897795}},
- "user_id": {Kind: &schema_pb.Value_StringValue{StringValue: "user123"}},
- "event_type": {Kind: &schema_pb.Value_StringValue{StringValue: "click"}},
+ "_ts_ns": {Kind: &schema_pb.Value_Int64Value{Int64Value: 1756947416566456262}},
+ "id": {Kind: &schema_pb.Value_Int64Value{Int64Value: 897795}},
+ "user_id": {Kind: &schema_pb.Value_StringValue{StringValue: "user123"}},
+ "event_type": {Kind: &schema_pb.Value_StringValue{StringValue: "click"}},
},
}
sql := `SELECT
id AS event_id,
- _timestamp_ns AS event_time,
+ _ts_ns AS event_time,
user_id AS uid,
event_type AS action
FROM ecommerce.user_events
@@ -359,10 +359,10 @@ func TestAliasIntegrationWithProductionScenarios(t *testing.T) {
// Test partial match failure
testRecord2 := &schema_pb.RecordValue{
Fields: map[string]*schema_pb.Value{
- "_timestamp_ns": {Kind: &schema_pb.Value_Int64Value{Int64Value: 1756947416566456262}},
- "id": {Kind: &schema_pb.Value_Int64Value{Int64Value: 897795}},
- "user_id": {Kind: &schema_pb.Value_StringValue{StringValue: "user999"}}, // Different user
- "event_type": {Kind: &schema_pb.Value_StringValue{StringValue: "click"}},
+ "_ts_ns": {Kind: &schema_pb.Value_Int64Value{Int64Value: 1756947416566456262}},
+ "id": {Kind: &schema_pb.Value_Int64Value{Int64Value: 897795}},
+ "user_id": {Kind: &schema_pb.Value_StringValue{StringValue: "user999"}}, // Different user
+ "event_type": {Kind: &schema_pb.Value_StringValue{StringValue: "click"}},
},
}
@@ -374,13 +374,13 @@ func TestAliasIntegrationWithProductionScenarios(t *testing.T) {
// Ensure alias resolution doesn't significantly impact performance
testRecord := &schema_pb.RecordValue{
Fields: map[string]*schema_pb.Value{
- "_timestamp_ns": {Kind: &schema_pb.Value_Int64Value{Int64Value: 1756947416566456262}},
+ "_ts_ns": {Kind: &schema_pb.Value_Int64Value{Int64Value: 1756947416566456262}},
},
}
// Build predicates for comparison
- sqlWithAlias := "SELECT _timestamp_ns AS ts FROM test WHERE ts = 1756947416566456262"
- sqlWithoutAlias := "SELECT _timestamp_ns FROM test WHERE _timestamp_ns = 1756947416566456262"
+ sqlWithAlias := "SELECT _ts_ns AS ts FROM test WHERE ts = 1756947416566456262"
+ sqlWithoutAlias := "SELECT _ts_ns FROM test WHERE _ts_ns = 1756947416566456262"
stmtWithAlias, err := ParseSQL(sqlWithAlias)
assert.NoError(t, err)
diff --git a/weed/query/engine/sql_feature_diagnostic_test.go b/weed/query/engine/sql_feature_diagnostic_test.go
index bbe775615..f578539fc 100644
--- a/weed/query/engine/sql_feature_diagnostic_test.go
+++ b/weed/query/engine/sql_feature_diagnostic_test.go
@@ -109,12 +109,12 @@ func TestSQLFeatureDiagnostic(t *testing.T) {
// Summary
t.Log("\n" + strings.Repeat("=", 80))
t.Log("FEATURE SUMMARY:")
- t.Log(" ✅ LIMIT: FULLY WORKING - Correctly limits result rows")
- t.Log(" ✅ OFFSET: FULLY WORKING - Correctly skips rows")
- t.Log(" ✅ WHERE: FULLY WORKING - All comparison operators working")
- t.Log(" ✅ SELECT: WORKING - Supports *, columns, functions, arithmetic")
- t.Log(" ✅ Functions: WORKING - String and datetime functions work")
- t.Log(" ✅ Arithmetic: WORKING - +, -, *, / operations work")
+ t.Log(" LIMIT: FULLY WORKING - Correctly limits result rows")
+ t.Log(" OFFSET: FULLY WORKING - Correctly skips rows")
+ t.Log(" WHERE: FULLY WORKING - All comparison operators working")
+ t.Log(" SELECT: WORKING - Supports *, columns, functions, arithmetic")
+ t.Log(" Functions: WORKING - String and datetime functions work")
+ t.Log(" Arithmetic: WORKING - +, -, *, / operations work")
t.Log(strings.Repeat("=", 80))
}
@@ -144,12 +144,12 @@ func TestSQLWhereClauseIssue(t *testing.T) {
t.Logf("WHERE id = %s returned %d rows", firstId, actualCount)
if actualCount == allCount {
- t.Log("❌ CONFIRMED: WHERE clause is completely ignored")
+ t.Log("CONFIRMED: WHERE clause is completely ignored")
t.Log(" - Query parsed successfully")
t.Log(" - No errors returned")
t.Log(" - But filtering logic not implemented in execution")
} else if actualCount == 1 {
- t.Log("✅ WHERE clause working correctly")
+ t.Log("WHERE clause working correctly")
} else {
t.Logf("❓ Unexpected result: got %d rows instead of 1 or %d", actualCount, allCount)
}
@@ -162,8 +162,8 @@ func TestSQLWhereClauseIssue(t *testing.T) {
t.Logf("WHERE 1 = 0 returned %d rows", impossibleCount)
if impossibleCount == allCount {
- t.Log("❌ CONFIRMED: Even impossible WHERE conditions are ignored")
+ t.Log("CONFIRMED: Even impossible WHERE conditions are ignored")
} else if impossibleCount == 0 {
- t.Log("✅ Impossible WHERE condition correctly returns no rows")
+ t.Log("Impossible WHERE condition correctly returns no rows")
}
}
diff --git a/weed/query/engine/string_concatenation_test.go b/weed/query/engine/string_concatenation_test.go
index c4843bef6..a2f869c10 100644
--- a/weed/query/engine/string_concatenation_test.go
+++ b/weed/query/engine/string_concatenation_test.go
@@ -177,7 +177,7 @@ func TestSQLEngine_StringConcatenationBugReproduction(t *testing.T) {
}
}
- t.Logf("✅ SUCCESS: Complex string concatenation works correctly!")
+ t.Logf("SUCCESS: Complex string concatenation works correctly!")
t.Logf("Query: %s", query)
for i, row := range result.Rows {
diff --git a/weed/query/engine/string_literal_function_test.go b/weed/query/engine/string_literal_function_test.go
index 828d8c9ed..787c86c08 100644
--- a/weed/query/engine/string_literal_function_test.go
+++ b/weed/query/engine/string_literal_function_test.go
@@ -183,7 +183,7 @@ func TestSQLEngine_StringFunctionErrorHandling(t *testing.T) {
t.Fatalf("UPPER function should work, got query error: %v", result.Error)
}
- t.Logf("✅ UPPER function works correctly")
+ t.Logf("UPPER function works correctly")
// This should now work (previously would error as "unsupported aggregation function")
result2, err2 := engine.ExecuteSQL(context.Background(), "SELECT LENGTH(action) FROM user_events LIMIT 1")
@@ -194,5 +194,5 @@ func TestSQLEngine_StringFunctionErrorHandling(t *testing.T) {
t.Fatalf("LENGTH function should work, got query error: %v", result2.Error)
}
- t.Logf("✅ LENGTH function works correctly")
+ t.Logf("LENGTH function works correctly")
}
diff --git a/weed/query/engine/system_columns.go b/weed/query/engine/system_columns.go
index 12757d4eb..a982416ed 100644
--- a/weed/query/engine/system_columns.go
+++ b/weed/query/engine/system_columns.go
@@ -9,18 +9,19 @@ import (
// System column constants used throughout the SQL engine
const (
- SW_COLUMN_NAME_TIMESTAMP = "_timestamp_ns" // Message timestamp in nanoseconds (internal)
- SW_COLUMN_NAME_KEY = "_key" // Message key
- SW_COLUMN_NAME_SOURCE = "_source" // Data source (live_log, parquet_archive, etc.)
+ SW_COLUMN_NAME_TIMESTAMP = "_ts_ns" // Message timestamp in nanoseconds (internal)
+ SW_COLUMN_NAME_KEY = "_key" // Message key
+ SW_COLUMN_NAME_SOURCE = "_source" // Data source (live_log, parquet_archive, etc.)
+ SW_COLUMN_NAME_VALUE = "_value" // Raw message value (for schema-less topics)
)
// System column display names (what users see)
const (
SW_DISPLAY_NAME_TIMESTAMP = "_ts" // User-facing timestamp column name
- // Note: _key and _source keep the same names, only _timestamp_ns changes to _ts
+ // Note: _key and _source keep the same names, only _ts_ns changes to _ts
)
-// isSystemColumn checks if a column is a system column (_timestamp_ns, _key, _source)
+// isSystemColumn checks if a column is a system column (_ts_ns, _key, _source)
func (e *SQLEngine) isSystemColumn(columnName string) bool {
lowerName := strings.ToLower(columnName)
return lowerName == SW_COLUMN_NAME_TIMESTAMP ||
@@ -91,7 +92,7 @@ func (e *SQLEngine) getSystemColumnGlobalMin(columnName string, allFileStats map
switch lowerName {
case SW_COLUMN_NAME_TIMESTAMP:
// For timestamps, find the earliest timestamp across all files
- // This should match what's in the Extended["min"] metadata
+ // This should match what's in the Extended[mq.ExtendedAttrTimestampMin] metadata
var minTimestamp *int64
for _, fileStats := range allFileStats {
for _, fileStat := range fileStats {
@@ -128,7 +129,7 @@ func (e *SQLEngine) getSystemColumnGlobalMax(columnName string, allFileStats map
switch lowerName {
case SW_COLUMN_NAME_TIMESTAMP:
// For timestamps, find the latest timestamp across all files
- // This should match what's in the Extended["max"] metadata
+ // This should match what's in the Extended[mq.ExtendedAttrTimestampMax] metadata
var maxTimestamp *int64
for _, fileStats := range allFileStats {
for _, fileStat := range fileStats {
diff --git a/weed/query/engine/timestamp_integration_test.go b/weed/query/engine/timestamp_integration_test.go
index 2f53e6d6e..cb156103c 100644
--- a/weed/query/engine/timestamp_integration_test.go
+++ b/weed/query/engine/timestamp_integration_test.go
@@ -29,13 +29,13 @@ func TestTimestampIntegrationScenarios(t *testing.T) {
// Create a test record
record := &schema_pb.RecordValue{
Fields: map[string]*schema_pb.Value{
- "_timestamp_ns": {Kind: &schema_pb.Value_Int64Value{Int64Value: ts.timestamp}},
- "id": {Kind: &schema_pb.Value_Int64Value{Int64Value: ts.id}},
+ "_ts_ns": {Kind: &schema_pb.Value_Int64Value{Int64Value: ts.timestamp}},
+ "id": {Kind: &schema_pb.Value_Int64Value{Int64Value: ts.id}},
},
}
// Build SQL query
- sql := "SELECT id, _timestamp_ns FROM test WHERE _timestamp_ns = " + strconv.FormatInt(ts.timestamp, 10)
+ sql := "SELECT id, _ts_ns FROM test WHERE _ts_ns = " + strconv.FormatInt(ts.timestamp, 10)
stmt, err := ParseSQL(sql)
assert.NoError(t, err)
@@ -57,8 +57,8 @@ func TestTimestampIntegrationScenarios(t *testing.T) {
// Test that close but different timestamps don't match
closeRecord := &schema_pb.RecordValue{
Fields: map[string]*schema_pb.Value{
- "_timestamp_ns": {Kind: &schema_pb.Value_Int64Value{Int64Value: ts.timestamp + 1}},
- "id": {Kind: &schema_pb.Value_Int64Value{Int64Value: ts.id}},
+ "_ts_ns": {Kind: &schema_pb.Value_Int64Value{Int64Value: ts.timestamp + 1}},
+ "id": {Kind: &schema_pb.Value_Int64Value{Int64Value: ts.id}},
},
}
result = predicate(closeRecord)
@@ -76,17 +76,17 @@ func TestTimestampIntegrationScenarios(t *testing.T) {
}{
{
name: "RangeWithDifferentBounds",
- sql: "SELECT * FROM test WHERE _timestamp_ns >= 1756913789829292386 AND _timestamp_ns <= 1756947416566456262",
+ sql: "SELECT * FROM test WHERE _ts_ns >= 1756913789829292386 AND _ts_ns <= 1756947416566456262",
shouldSet: struct{ start, stop bool }{true, true},
},
{
name: "RangeWithSameBounds",
- sql: "SELECT * FROM test WHERE _timestamp_ns >= 1756913789829292386 AND _timestamp_ns <= 1756913789829292386",
+ sql: "SELECT * FROM test WHERE _ts_ns >= 1756913789829292386 AND _ts_ns <= 1756913789829292386",
shouldSet: struct{ start, stop bool }{true, false}, // Fix #4: equal bounds should not set stop
},
{
name: "OpenEndedRange",
- sql: "SELECT * FROM test WHERE _timestamp_ns >= 1756913789829292386",
+ sql: "SELECT * FROM test WHERE _ts_ns >= 1756913789829292386",
shouldSet: struct{ start, stop bool }{true, false},
},
}
@@ -117,8 +117,8 @@ func TestTimestampIntegrationScenarios(t *testing.T) {
t.Run("ProductionScenarioReproduction", func(t *testing.T) {
// This test reproduces the exact production scenario that was failing
- // Original failing query: WHERE _timestamp_ns = 1756947416566456262
- sql := "SELECT id, _timestamp_ns FROM ecommerce.user_events WHERE _timestamp_ns = 1756947416566456262"
+ // Original failing query: WHERE _ts_ns = 1756947416566456262
+ sql := "SELECT id, _ts_ns FROM ecommerce.user_events WHERE _ts_ns = 1756947416566456262"
stmt, err := ParseSQL(sql)
assert.NoError(t, err, "Should parse the production query that was failing")
@@ -136,8 +136,8 @@ func TestTimestampIntegrationScenarios(t *testing.T) {
// Test with the actual record that exists in production
productionRecord := &schema_pb.RecordValue{
Fields: map[string]*schema_pb.Value{
- "_timestamp_ns": {Kind: &schema_pb.Value_Int64Value{Int64Value: 1756947416566456262}},
- "id": {Kind: &schema_pb.Value_Int64Value{Int64Value: 897795}},
+ "_ts_ns": {Kind: &schema_pb.Value_Int64Value{Int64Value: 1756947416566456262}},
+ "id": {Kind: &schema_pb.Value_Int64Value{Int64Value: 897795}},
},
}
@@ -147,8 +147,8 @@ func TestTimestampIntegrationScenarios(t *testing.T) {
// Verify precision - test that a timestamp differing by just 1 nanosecond doesn't match
slightlyDifferentRecord := &schema_pb.RecordValue{
Fields: map[string]*schema_pb.Value{
- "_timestamp_ns": {Kind: &schema_pb.Value_Int64Value{Int64Value: 1756947416566456263}},
- "id": {Kind: &schema_pb.Value_Int64Value{Int64Value: 897795}},
+ "_ts_ns": {Kind: &schema_pb.Value_Int64Value{Int64Value: 1756947416566456263}},
+ "id": {Kind: &schema_pb.Value_Int64Value{Int64Value: 897795}},
},
}
@@ -167,11 +167,11 @@ func TestRegressionPrevention(t *testing.T) {
record := &schema_pb.RecordValue{
Fields: map[string]*schema_pb.Value{
- "_timestamp_ns": {Kind: &schema_pb.Value_Int64Value{Int64Value: smallTimestamp}},
+ "_ts_ns": {Kind: &schema_pb.Value_Int64Value{Int64Value: smallTimestamp}},
},
}
- result := engine.valuesEqual(record.Fields["_timestamp_ns"], smallTimestamp)
+ result := engine.valuesEqual(record.Fields["_ts_ns"], smallTimestamp)
assert.True(t, result, "Small timestamps should continue to work")
})
diff --git a/weed/query/engine/timestamp_query_fixes_test.go b/weed/query/engine/timestamp_query_fixes_test.go
index 633738a00..2f5f08cbd 100644
--- a/weed/query/engine/timestamp_query_fixes_test.go
+++ b/weed/query/engine/timestamp_query_fixes_test.go
@@ -21,31 +21,31 @@ func TestTimestampQueryFixes(t *testing.T) {
// Test that large int64 timestamps don't lose precision in comparisons
testRecord := &schema_pb.RecordValue{
Fields: map[string]*schema_pb.Value{
- "_timestamp_ns": {Kind: &schema_pb.Value_Int64Value{Int64Value: largeTimestamp1}},
- "id": {Kind: &schema_pb.Value_Int64Value{Int64Value: 12345}},
+ "_ts_ns": {Kind: &schema_pb.Value_Int64Value{Int64Value: largeTimestamp1}},
+ "id": {Kind: &schema_pb.Value_Int64Value{Int64Value: 12345}},
},
}
// Test equality comparison
- result := engine.valuesEqual(testRecord.Fields["_timestamp_ns"], largeTimestamp1)
+ result := engine.valuesEqual(testRecord.Fields["_ts_ns"], largeTimestamp1)
assert.True(t, result, "Large timestamp equality should work without precision loss")
// Test inequality comparison
- result = engine.valuesEqual(testRecord.Fields["_timestamp_ns"], largeTimestamp1+1)
+ result = engine.valuesEqual(testRecord.Fields["_ts_ns"], largeTimestamp1+1)
assert.False(t, result, "Large timestamp inequality should be detected accurately")
// Test less than comparison
- result = engine.valueLessThan(testRecord.Fields["_timestamp_ns"], largeTimestamp1+1)
+ result = engine.valueLessThan(testRecord.Fields["_ts_ns"], largeTimestamp1+1)
assert.True(t, result, "Large timestamp less-than should work without precision loss")
// Test greater than comparison
- result = engine.valueGreaterThan(testRecord.Fields["_timestamp_ns"], largeTimestamp1-1)
+ result = engine.valueGreaterThan(testRecord.Fields["_ts_ns"], largeTimestamp1-1)
assert.True(t, result, "Large timestamp greater-than should work without precision loss")
})
t.Run("Fix2_TimeFilterExtraction", func(t *testing.T) {
// Test that equality queries don't set stopTimeNs (which causes premature termination)
- equalitySQL := "SELECT * FROM test WHERE _timestamp_ns = " + strconv.FormatInt(largeTimestamp2, 10)
+ equalitySQL := "SELECT * FROM test WHERE _ts_ns = " + strconv.FormatInt(largeTimestamp2, 10)
stmt, err := ParseSQL(equalitySQL)
assert.NoError(t, err)
@@ -58,8 +58,8 @@ func TestTimestampQueryFixes(t *testing.T) {
t.Run("Fix3_RangeBoundaryFix", func(t *testing.T) {
// Test that range queries with equal boundaries don't cause premature termination
- rangeSQL := "SELECT * FROM test WHERE _timestamp_ns >= " + strconv.FormatInt(largeTimestamp3, 10) +
- " AND _timestamp_ns <= " + strconv.FormatInt(largeTimestamp3, 10)
+ rangeSQL := "SELECT * FROM test WHERE _ts_ns >= " + strconv.FormatInt(largeTimestamp3, 10) +
+ " AND _ts_ns <= " + strconv.FormatInt(largeTimestamp3, 10)
stmt, err := ParseSQL(rangeSQL)
assert.NoError(t, err)
@@ -73,8 +73,8 @@ func TestTimestampQueryFixes(t *testing.T) {
t.Run("Fix4_DifferentRangeBoundaries", func(t *testing.T) {
// Test that normal range queries still work correctly
- rangeSQL := "SELECT * FROM test WHERE _timestamp_ns >= " + strconv.FormatInt(largeTimestamp1, 10) +
- " AND _timestamp_ns <= " + strconv.FormatInt(largeTimestamp2, 10)
+ rangeSQL := "SELECT * FROM test WHERE _ts_ns >= " + strconv.FormatInt(largeTimestamp1, 10) +
+ " AND _ts_ns <= " + strconv.FormatInt(largeTimestamp2, 10)
stmt, err := ParseSQL(rangeSQL)
assert.NoError(t, err)
@@ -87,7 +87,7 @@ func TestTimestampQueryFixes(t *testing.T) {
t.Run("Fix5_PredicateAccuracy", func(t *testing.T) {
// Test that predicates correctly evaluate large timestamp equality
- equalitySQL := "SELECT * FROM test WHERE _timestamp_ns = " + strconv.FormatInt(largeTimestamp1, 10)
+ equalitySQL := "SELECT * FROM test WHERE _ts_ns = " + strconv.FormatInt(largeTimestamp1, 10)
stmt, err := ParseSQL(equalitySQL)
assert.NoError(t, err)
@@ -98,8 +98,8 @@ func TestTimestampQueryFixes(t *testing.T) {
// Test with matching record
matchingRecord := &schema_pb.RecordValue{
Fields: map[string]*schema_pb.Value{
- "_timestamp_ns": {Kind: &schema_pb.Value_Int64Value{Int64Value: largeTimestamp1}},
- "id": {Kind: &schema_pb.Value_Int64Value{Int64Value: 897795}},
+ "_ts_ns": {Kind: &schema_pb.Value_Int64Value{Int64Value: largeTimestamp1}},
+ "id": {Kind: &schema_pb.Value_Int64Value{Int64Value: 897795}},
},
}
@@ -109,8 +109,8 @@ func TestTimestampQueryFixes(t *testing.T) {
// Test with non-matching record
nonMatchingRecord := &schema_pb.RecordValue{
Fields: map[string]*schema_pb.Value{
- "_timestamp_ns": {Kind: &schema_pb.Value_Int64Value{Int64Value: largeTimestamp1 + 1}},
- "id": {Kind: &schema_pb.Value_Int64Value{Int64Value: 12345}},
+ "_ts_ns": {Kind: &schema_pb.Value_Int64Value{Int64Value: largeTimestamp1 + 1}},
+ "id": {Kind: &schema_pb.Value_Int64Value{Int64Value: 12345}},
},
}
@@ -122,7 +122,7 @@ func TestTimestampQueryFixes(t *testing.T) {
// Test all comparison operators work correctly with large timestamps
testRecord := &schema_pb.RecordValue{
Fields: map[string]*schema_pb.Value{
- "_timestamp_ns": {Kind: &schema_pb.Value_Int64Value{Int64Value: largeTimestamp2}},
+ "_ts_ns": {Kind: &schema_pb.Value_Int64Value{Int64Value: largeTimestamp2}},
},
}
@@ -130,16 +130,16 @@ func TestTimestampQueryFixes(t *testing.T) {
sql string
expected bool
}{
- {"_timestamp_ns = " + strconv.FormatInt(largeTimestamp2, 10), true},
- {"_timestamp_ns = " + strconv.FormatInt(largeTimestamp2+1, 10), false},
- {"_timestamp_ns > " + strconv.FormatInt(largeTimestamp2-1, 10), true},
- {"_timestamp_ns > " + strconv.FormatInt(largeTimestamp2, 10), false},
- {"_timestamp_ns >= " + strconv.FormatInt(largeTimestamp2, 10), true},
- {"_timestamp_ns >= " + strconv.FormatInt(largeTimestamp2+1, 10), false},
- {"_timestamp_ns < " + strconv.FormatInt(largeTimestamp2+1, 10), true},
- {"_timestamp_ns < " + strconv.FormatInt(largeTimestamp2, 10), false},
- {"_timestamp_ns <= " + strconv.FormatInt(largeTimestamp2, 10), true},
- {"_timestamp_ns <= " + strconv.FormatInt(largeTimestamp2-1, 10), false},
+ {"_ts_ns = " + strconv.FormatInt(largeTimestamp2, 10), true},
+ {"_ts_ns = " + strconv.FormatInt(largeTimestamp2+1, 10), false},
+ {"_ts_ns > " + strconv.FormatInt(largeTimestamp2-1, 10), true},
+ {"_ts_ns > " + strconv.FormatInt(largeTimestamp2, 10), false},
+ {"_ts_ns >= " + strconv.FormatInt(largeTimestamp2, 10), true},
+ {"_ts_ns >= " + strconv.FormatInt(largeTimestamp2+1, 10), false},
+ {"_ts_ns < " + strconv.FormatInt(largeTimestamp2+1, 10), true},
+ {"_ts_ns < " + strconv.FormatInt(largeTimestamp2, 10), false},
+ {"_ts_ns <= " + strconv.FormatInt(largeTimestamp2, 10), true},
+ {"_ts_ns <= " + strconv.FormatInt(largeTimestamp2-1, 10), false},
}
for _, op := range operators {
@@ -163,22 +163,22 @@ func TestTimestampQueryFixes(t *testing.T) {
maxInt64 := int64(9223372036854775807)
testRecord := &schema_pb.RecordValue{
Fields: map[string]*schema_pb.Value{
- "_timestamp_ns": {Kind: &schema_pb.Value_Int64Value{Int64Value: maxInt64}},
+ "_ts_ns": {Kind: &schema_pb.Value_Int64Value{Int64Value: maxInt64}},
},
}
// Test equality with maximum int64
- result := engine.valuesEqual(testRecord.Fields["_timestamp_ns"], maxInt64)
+ result := engine.valuesEqual(testRecord.Fields["_ts_ns"], maxInt64)
assert.True(t, result, "Should handle maximum int64 value correctly")
// Test with zero timestamp
zeroRecord := &schema_pb.RecordValue{
Fields: map[string]*schema_pb.Value{
- "_timestamp_ns": {Kind: &schema_pb.Value_Int64Value{Int64Value: 0}},
+ "_ts_ns": {Kind: &schema_pb.Value_Int64Value{Int64Value: 0}},
},
}
- result = engine.valuesEqual(zeroRecord.Fields["_timestamp_ns"], int64(0))
+ result = engine.valuesEqual(zeroRecord.Fields["_ts_ns"], int64(0))
assert.True(t, result, "Should handle zero timestamp correctly")
})
}
@@ -195,19 +195,19 @@ func TestOriginalFailingQueries(t *testing.T) {
}{
{
name: "OriginalQuery1",
- sql: "select id, _timestamp_ns from ecommerce.user_events where _timestamp_ns = 1756947416566456262",
+ sql: "select id, _ts_ns from ecommerce.user_events where _ts_ns = 1756947416566456262",
timestamp: 1756947416566456262,
id: 897795,
},
{
name: "OriginalQuery2",
- sql: "select id, _timestamp_ns from ecommerce.user_events where _timestamp_ns = 1756947416566439304",
+ sql: "select id, _ts_ns from ecommerce.user_events where _ts_ns = 1756947416566439304",
timestamp: 1756947416566439304,
id: 715356,
},
{
name: "CurrentDataQuery",
- sql: "select id, _timestamp_ns from ecommerce.user_events where _timestamp_ns = 1756913789829292386",
+ sql: "select id, _ts_ns from ecommerce.user_events where _ts_ns = 1756913789829292386",
timestamp: 1756913789829292386,
id: 82460,
},
@@ -233,8 +233,8 @@ func TestOriginalFailingQueries(t *testing.T) {
// Test with matching record
matchingRecord := &schema_pb.RecordValue{
Fields: map[string]*schema_pb.Value{
- "_timestamp_ns": {Kind: &schema_pb.Value_Int64Value{Int64Value: query.timestamp}},
- "id": {Kind: &schema_pb.Value_Int64Value{Int64Value: query.id}},
+ "_ts_ns": {Kind: &schema_pb.Value_Int64Value{Int64Value: query.timestamp}},
+ "id": {Kind: &schema_pb.Value_Int64Value{Int64Value: query.id}},
},
}
diff --git a/weed/query/engine/where_clause_debug_test.go b/weed/query/engine/where_clause_debug_test.go
index 0907524bb..382da4594 100644
--- a/weed/query/engine/where_clause_debug_test.go
+++ b/weed/query/engine/where_clause_debug_test.go
@@ -203,11 +203,11 @@ func TestWhereClauseEndToEnd(t *testing.T) {
// CRITICAL TEST: This should detect the WHERE clause bug
if impossibleCount == baselineCount {
- t.Errorf("❌ WHERE CLAUSE BUG CONFIRMED:")
+ t.Errorf("WHERE CLAUSE BUG CONFIRMED:")
t.Errorf(" Impossible condition returned same row count as no WHERE clause")
t.Errorf(" This proves WHERE filtering is not being applied")
} else if impossibleCount == 0 {
- t.Logf("✅ Impossible WHERE condition correctly returns 0 rows")
+ t.Logf("Impossible WHERE condition correctly returns 0 rows")
}
// Test 3: Specific ID filtering
@@ -222,11 +222,11 @@ func TestWhereClauseEndToEnd(t *testing.T) {
t.Logf("WHERE id = %s: %d rows", firstId, specificCount)
if specificCount == baselineCount {
- t.Errorf("❌ WHERE clause bug: Specific ID filter returned all rows")
+ t.Errorf("WHERE clause bug: Specific ID filter returned all rows")
} else if specificCount == 1 {
- t.Logf("✅ Specific ID WHERE clause working correctly")
+ t.Logf("Specific ID WHERE clause working correctly")
} else {
- t.Logf("❓ Unexpected: Specific ID returned %d rows", specificCount)
+ t.Logf("Unexpected: Specific ID returned %d rows", specificCount)
}
}
@@ -250,10 +250,10 @@ func TestWhereClauseEndToEnd(t *testing.T) {
}
if nonMatchingCount > 0 {
- t.Errorf("❌ WHERE clause bug: %d rows have id <= 10,000,000 but should be filtered out", nonMatchingCount)
+ t.Errorf("WHERE clause bug: %d rows have id <= 10,000,000 but should be filtered out", nonMatchingCount)
t.Errorf(" Sample IDs that should be filtered: %v", getSampleIds(rangeResult, 3))
} else {
- t.Logf("✅ WHERE id > 10000000 correctly filtered results")
+ t.Logf("WHERE id > 10000000 correctly filtered results")
}
}
@@ -317,14 +317,14 @@ func TestSpecificWhereClauseBug(t *testing.T) {
t.Logf("Row %d: id = %d", i+1, idVal)
if idVal <= 10000000 {
bugDetected = true
- t.Errorf("❌ BUG: id %d should be filtered out (≤ 10,000,000)", idVal)
+ t.Errorf("BUG: id %d should be filtered out (<= 10,000,000)", idVal)
}
}
}
if !bugDetected {
- t.Log("✅ WHERE clause working correctly - all IDs > 10,000,000")
+ t.Log("WHERE clause working correctly - all IDs > 10,000,000")
} else {
- t.Error("❌ WHERE clause bug confirmed: Returned IDs that should be filtered out")
+ t.Error("WHERE clause bug confirmed: Returned IDs that should be filtered out")
}
}
diff --git a/weed/query/engine/where_validation_test.go b/weed/query/engine/where_validation_test.go
index 4c2d8b903..4ba7d1c70 100644
--- a/weed/query/engine/where_validation_test.go
+++ b/weed/query/engine/where_validation_test.go
@@ -37,9 +37,9 @@ func TestWhereClauseValidation(t *testing.T) {
t.Logf("WHERE id = %s: %d rows", firstId, len(specificResult.Rows))
if len(specificResult.Rows) == 1 {
- t.Logf("✅ Specific ID filtering works correctly")
+ t.Logf("Specific ID filtering works correctly")
} else {
- t.Errorf("❌ Expected 1 row, got %d rows", len(specificResult.Rows))
+ t.Errorf("Expected 1 row, got %d rows", len(specificResult.Rows))
}
// Test 3: Range filtering (find actual data ranges)
@@ -73,16 +73,16 @@ func TestWhereClauseValidation(t *testing.T) {
for _, row := range rangeResult.Rows {
if idVal, err := strconv.ParseInt(row[0].ToString(), 10, 64); err == nil {
if idVal <= threshold {
- t.Errorf("❌ Found ID %d which should be filtered out (≤ %d)", idVal, threshold)
+ t.Errorf("Found ID %d which should be filtered out (<= %d)", idVal, threshold)
allCorrect = false
}
}
}
if allCorrect && len(rangeResult.Rows) > 0 {
- t.Logf("✅ Range filtering works correctly - all returned IDs > %d", threshold)
+ t.Logf("Range filtering works correctly - all returned IDs > %d", threshold)
} else if len(rangeResult.Rows) == 0 {
- t.Logf("✅ Range filtering works correctly - no IDs > %d in data", threshold)
+ t.Logf("Range filtering works correctly - no IDs > %d in data", threshold)
}
// Test 4: String filtering
@@ -98,17 +98,17 @@ func TestWhereClauseValidation(t *testing.T) {
statusCorrect := true
for _, row := range statusResult.Rows {
if len(row) > 1 && row[1].ToString() != "active" {
- t.Errorf("❌ Found status '%s' which should be filtered out", row[1].ToString())
+ t.Errorf("Found status '%s' which should be filtered out", row[1].ToString())
statusCorrect = false
}
}
if statusCorrect {
- t.Logf("✅ String filtering works correctly")
+ t.Logf("String filtering works correctly")
}
// Test 5: Comparison with actual real-world case
- t.Log("\n🎯 TESTING REAL-WORLD CASE:")
+ t.Log("\nTESTING REAL-WORLD CASE:")
realWorldResult, err := engine.ExecuteSQL(context.Background(),
"SELECT id FROM user_events WHERE id > 10000000 LIMIT 10 OFFSET 5")
if err != nil {
@@ -128,9 +128,9 @@ func TestWhereClauseValidation(t *testing.T) {
}
if violationCount == 0 {
- t.Logf("✅ Real-world case FIXED: No violations found")
+ t.Logf("Real-world case FIXED: No violations found")
} else {
- t.Errorf("❌ Real-world case FAILED: %d violations found", violationCount)
+ t.Errorf("Real-world case FAILED: %d violations found", violationCount)
}
}
@@ -168,7 +168,7 @@ func TestWhereClauseComparisonOperators(t *testing.T) {
result, err := engine.ExecuteSQL(context.Background(), sql)
if err != nil {
- t.Errorf("❌ Operator %s failed: %v", op.op, err)
+ t.Errorf("Operator %s failed: %v", op.op, err)
continue
}
@@ -176,7 +176,7 @@ func TestWhereClauseComparisonOperators(t *testing.T) {
// Basic validation - should not return more rows than baseline
if len(result.Rows) > len(baselineResult.Rows) {
- t.Errorf("❌ Operator %s returned more rows than baseline", op.op)
+ t.Errorf("Operator %s returned more rows than baseline", op.op)
}
}
}