diff options
Diffstat (limited to 'weed/query')
25 files changed, 1024 insertions, 509 deletions
diff --git a/weed/query/engine/alias_timestamp_integration_test.go b/weed/query/engine/alias_timestamp_integration_test.go index eca8161db..d175d4cf5 100644 --- a/weed/query/engine/alias_timestamp_integration_test.go +++ b/weed/query/engine/alias_timestamp_integration_test.go @@ -25,13 +25,13 @@ func TestAliasTimestampIntegration(t *testing.T) { // Create test record testRecord := &schema_pb.RecordValue{ Fields: map[string]*schema_pb.Value{ - "_timestamp_ns": {Kind: &schema_pb.Value_Int64Value{Int64Value: timestamp}}, - "id": {Kind: &schema_pb.Value_Int64Value{Int64Value: int64(1000 + i)}}, + "_ts_ns": {Kind: &schema_pb.Value_Int64Value{Int64Value: timestamp}}, + "id": {Kind: &schema_pb.Value_Int64Value{Int64Value: int64(1000 + i)}}, }, } // Test equality with alias (this was the originally failing pattern) - sql := "SELECT _timestamp_ns AS ts, id FROM test WHERE ts = " + strconv.FormatInt(timestamp, 10) + sql := "SELECT _ts_ns AS ts, id FROM test WHERE ts = " + strconv.FormatInt(timestamp, 10) stmt, err := ParseSQL(sql) assert.NoError(t, err, "Should parse alias equality query for timestamp %d", timestamp) @@ -43,7 +43,7 @@ func TestAliasTimestampIntegration(t *testing.T) { assert.True(t, result, "Should match exact large timestamp using alias") // Test precision - off by 1 nanosecond should not match - sqlOffBy1 := "SELECT _timestamp_ns AS ts, id FROM test WHERE ts = " + strconv.FormatInt(timestamp+1, 10) + sqlOffBy1 := "SELECT _ts_ns AS ts, id FROM test WHERE ts = " + strconv.FormatInt(timestamp+1, 10) stmt2, err := ParseSQL(sqlOffBy1) assert.NoError(t, err) selectStmt2 := stmt2.(*SelectStatement) @@ -62,23 +62,23 @@ func TestAliasTimestampIntegration(t *testing.T) { testRecords := []*schema_pb.RecordValue{ { Fields: map[string]*schema_pb.Value{ - "_timestamp_ns": {Kind: &schema_pb.Value_Int64Value{Int64Value: timestamp - 2}}, // Before range + "_ts_ns": {Kind: &schema_pb.Value_Int64Value{Int64Value: timestamp - 2}}, // Before range }, }, { Fields: map[string]*schema_pb.Value{ - "_timestamp_ns": {Kind: &schema_pb.Value_Int64Value{Int64Value: timestamp}}, // In range + "_ts_ns": {Kind: &schema_pb.Value_Int64Value{Int64Value: timestamp}}, // In range }, }, { Fields: map[string]*schema_pb.Value{ - "_timestamp_ns": {Kind: &schema_pb.Value_Int64Value{Int64Value: timestamp + 2}}, // After range + "_ts_ns": {Kind: &schema_pb.Value_Int64Value{Int64Value: timestamp + 2}}, // After range }, }, } // Test range query with alias - sql := "SELECT _timestamp_ns AS ts FROM test WHERE ts >= " + + sql := "SELECT _ts_ns AS ts FROM test WHERE ts >= " + strconv.FormatInt(timestamp-1, 10) + " AND ts <= " + strconv.FormatInt(timestamp+1, 10) stmt, err := ParseSQL(sql) @@ -99,12 +99,12 @@ func TestAliasTimestampIntegration(t *testing.T) { maxInt64 := int64(9223372036854775807) testRecord := &schema_pb.RecordValue{ Fields: map[string]*schema_pb.Value{ - "_timestamp_ns": {Kind: &schema_pb.Value_Int64Value{Int64Value: maxInt64}}, + "_ts_ns": {Kind: &schema_pb.Value_Int64Value{Int64Value: maxInt64}}, }, } // Test with alias - sql := "SELECT _timestamp_ns AS ts FROM test WHERE ts = " + strconv.FormatInt(maxInt64, 10) + sql := "SELECT _ts_ns AS ts FROM test WHERE ts = " + strconv.FormatInt(maxInt64, 10) stmt, err := ParseSQL(sql) assert.NoError(t, err, "Should parse max int64 with alias") @@ -119,11 +119,11 @@ func TestAliasTimestampIntegration(t *testing.T) { minInt64 := int64(-9223372036854775808) testRecord2 := &schema_pb.RecordValue{ Fields: map[string]*schema_pb.Value{ - "_timestamp_ns": {Kind: &schema_pb.Value_Int64Value{Int64Value: minInt64}}, + "_ts_ns": {Kind: &schema_pb.Value_Int64Value{Int64Value: minInt64}}, }, } - sql2 := "SELECT _timestamp_ns AS ts FROM test WHERE ts = " + strconv.FormatInt(minInt64, 10) + sql2 := "SELECT _ts_ns AS ts FROM test WHERE ts = " + strconv.FormatInt(minInt64, 10) stmt2, err := ParseSQL(sql2) assert.NoError(t, err) selectStmt2 := stmt2.(*SelectStatement) @@ -141,14 +141,14 @@ func TestAliasTimestampIntegration(t *testing.T) { testRecord := &schema_pb.RecordValue{ Fields: map[string]*schema_pb.Value{ - "_timestamp_ns": {Kind: &schema_pb.Value_Int64Value{Int64Value: timestamp1}}, - "created_at": {Kind: &schema_pb.Value_Int64Value{Int64Value: timestamp2}}, - "id": {Kind: &schema_pb.Value_Int64Value{Int64Value: 12345}}, + "_ts_ns": {Kind: &schema_pb.Value_Int64Value{Int64Value: timestamp1}}, + "created_at": {Kind: &schema_pb.Value_Int64Value{Int64Value: timestamp2}}, + "id": {Kind: &schema_pb.Value_Int64Value{Int64Value: 12345}}, }, } // Use multiple timestamp aliases in WHERE - sql := "SELECT _timestamp_ns AS event_time, created_at AS created_time, id AS record_id FROM test " + + sql := "SELECT _ts_ns AS event_time, created_at AS created_time, id AS record_id FROM test " + "WHERE event_time = " + strconv.FormatInt(timestamp1, 10) + " AND created_time = " + strconv.FormatInt(timestamp2, 10) + " AND record_id = 12345" @@ -190,11 +190,11 @@ func TestAliasTimestampIntegration(t *testing.T) { t.Run(op.sql, func(t *testing.T) { testRecord := &schema_pb.RecordValue{ Fields: map[string]*schema_pb.Value{ - "_timestamp_ns": {Kind: &schema_pb.Value_Int64Value{Int64Value: op.value}}, + "_ts_ns": {Kind: &schema_pb.Value_Int64Value{Int64Value: op.value}}, }, } - sql := "SELECT _timestamp_ns AS ts FROM test WHERE " + op.sql + sql := "SELECT _ts_ns AS ts FROM test WHERE " + op.sql stmt, err := ParseSQL(sql) assert.NoError(t, err, "Should parse: %s", op.sql) @@ -212,12 +212,12 @@ func TestAliasTimestampIntegration(t *testing.T) { // Reproduce the exact production scenario that was originally failing // This was the original failing pattern from the user - originalFailingSQL := "select id, _timestamp_ns as ts from ecommerce.user_events where ts = 1756913789829292386" + originalFailingSQL := "select id, _ts_ns as ts from ecommerce.user_events where ts = 1756913789829292386" testRecord := &schema_pb.RecordValue{ Fields: map[string]*schema_pb.Value{ - "_timestamp_ns": {Kind: &schema_pb.Value_Int64Value{Int64Value: 1756913789829292386}}, - "id": {Kind: &schema_pb.Value_Int64Value{Int64Value: 82460}}, + "_ts_ns": {Kind: &schema_pb.Value_Int64Value{Int64Value: 1756913789829292386}}, + "id": {Kind: &schema_pb.Value_Int64Value{Int64Value: 82460}}, }, } @@ -232,11 +232,11 @@ func TestAliasTimestampIntegration(t *testing.T) { assert.True(t, result, "The originally failing production query should now work perfectly") // Also test the other originally failing timestamp - originalFailingSQL2 := "select id, _timestamp_ns as ts from ecommerce.user_events where ts = 1756947416566456262" + originalFailingSQL2 := "select id, _ts_ns as ts from ecommerce.user_events where ts = 1756947416566456262" testRecord2 := &schema_pb.RecordValue{ Fields: map[string]*schema_pb.Value{ - "_timestamp_ns": {Kind: &schema_pb.Value_Int64Value{Int64Value: 1756947416566456262}}, - "id": {Kind: &schema_pb.Value_Int64Value{Int64Value: 897795}}, + "_ts_ns": {Kind: &schema_pb.Value_Int64Value{Int64Value: 1756947416566456262}}, + "id": {Kind: &schema_pb.Value_Int64Value{Int64Value: 897795}}, }, } diff --git a/weed/query/engine/broker_client.go b/weed/query/engine/broker_client.go index 9b5f9819c..3e6517678 100644 --- a/weed/query/engine/broker_client.go +++ b/weed/query/engine/broker_client.go @@ -5,14 +5,15 @@ import ( "encoding/binary" "fmt" "io" - "strconv" "strings" "time" "github.com/seaweedfs/seaweedfs/weed/cluster" "github.com/seaweedfs/seaweedfs/weed/filer" + "github.com/seaweedfs/seaweedfs/weed/glog" "github.com/seaweedfs/seaweedfs/weed/mq/pub_balancer" "github.com/seaweedfs/seaweedfs/weed/mq/topic" + "github.com/seaweedfs/seaweedfs/weed/pb" "github.com/seaweedfs/seaweedfs/weed/pb/filer_pb" "github.com/seaweedfs/seaweedfs/weed/pb/master_pb" "github.com/seaweedfs/seaweedfs/weed/pb/mq_pb" @@ -39,8 +40,9 @@ type BrokerClient struct { // NewBrokerClient creates a new MQ broker client // Uses master HTTP address and converts it to gRPC address for service discovery func NewBrokerClient(masterHTTPAddress string) *BrokerClient { - // Convert HTTP address to gRPC address (typically HTTP port + 10000) - masterGRPCAddress := convertHTTPToGRPC(masterHTTPAddress) + // Convert HTTP address to gRPC address using pb.ServerAddress method + httpAddr := pb.ServerAddress(masterHTTPAddress) + masterGRPCAddress := httpAddr.ToGrpcAddress() return &BrokerClient{ masterAddress: masterGRPCAddress, @@ -48,20 +50,7 @@ func NewBrokerClient(masterHTTPAddress string) *BrokerClient { } } -// convertHTTPToGRPC converts HTTP address to gRPC address -// Follows SeaweedFS convention: gRPC port = HTTP port + 10000 -func convertHTTPToGRPC(httpAddress string) string { - if strings.Contains(httpAddress, ":") { - parts := strings.Split(httpAddress, ":") - if len(parts) == 2 { - if port, err := strconv.Atoi(parts[1]); err == nil { - return fmt.Sprintf("%s:%d", parts[0], port+10000) - } - } - } - // Fallback: return original address if conversion fails - return httpAddress -} +// No need for convertHTTPToGRPC - pb.ServerAddress.ToGrpcAddress() already handles this // discoverFiler finds a filer from the master server func (c *BrokerClient) discoverFiler() error { @@ -92,7 +81,8 @@ func (c *BrokerClient) discoverFiler() error { // Use the first available filer and convert HTTP address to gRPC filerHTTPAddress := resp.ClusterNodes[0].Address - c.filerAddress = convertHTTPToGRPC(filerHTTPAddress) + httpAddr := pb.ServerAddress(filerHTTPAddress) + c.filerAddress = httpAddr.ToGrpcAddress() return nil } @@ -175,7 +165,6 @@ func (f *filerClientImpl) GetDataCenter() string { } // ListNamespaces retrieves all MQ namespaces (databases) from the filer -// RESOLVED: Now queries actual topic directories instead of hardcoded values func (c *BrokerClient) ListNamespaces(ctx context.Context) ([]string, error) { // Get filer client to list directories under /topics filerClient, err := c.GetFilerClient() @@ -204,8 +193,8 @@ func (c *BrokerClient) ListNamespaces(ctx context.Context) ([]string, error) { return fmt.Errorf("failed to receive entry: %v", recvErr) } - // Only include directories (namespaces), skip files - if resp.Entry != nil && resp.Entry.IsDirectory { + // Only include directories (namespaces), skip files and system directories (starting with .) + if resp.Entry != nil && resp.Entry.IsDirectory && !strings.HasPrefix(resp.Entry.Name, ".") { namespaces = append(namespaces, resp.Entry.Name) } } @@ -222,7 +211,6 @@ func (c *BrokerClient) ListNamespaces(ctx context.Context) ([]string, error) { } // ListTopics retrieves all topics in a namespace from the filer -// RESOLVED: Now queries actual topic directories instead of hardcoded values func (c *BrokerClient) ListTopics(ctx context.Context, namespace string) ([]string, error) { // Get filer client to list directories under /topics/{namespace} filerClient, err := c.GetFilerClient() @@ -271,16 +259,18 @@ func (c *BrokerClient) ListTopics(ctx context.Context, namespace string) ([]stri return topics, nil } -// GetTopicSchema retrieves schema information for a specific topic -// Reads the actual schema from topic configuration stored in filer -func (c *BrokerClient) GetTopicSchema(ctx context.Context, namespace, topicName string) (*schema_pb.RecordType, error) { +// GetTopicSchema retrieves the flat schema and key columns for a topic +// Returns (flatSchema, keyColumns, schemaFormat, error) +func (c *BrokerClient) GetTopicSchema(ctx context.Context, namespace, topicName string) (*schema_pb.RecordType, []string, string, error) { // Get filer client to read topic configuration filerClient, err := c.GetFilerClient() if err != nil { - return nil, fmt.Errorf("failed to get filer client: %v", err) + return nil, nil, "", fmt.Errorf("failed to get filer client: %v", err) } - var recordType *schema_pb.RecordType + var flatSchema *schema_pb.RecordType + var keyColumns []string + var schemaFormat string err = filerClient.WithFilerClient(false, func(client filer_pb.SeaweedFilerClient) error { // Read topic.conf file from /topics/{namespace}/{topic}/topic.conf topicDir := fmt.Sprintf("/topics/%s/%s", namespace, topicName) @@ -306,30 +296,23 @@ func (c *BrokerClient) GetTopicSchema(ctx context.Context, namespace, topicName return fmt.Errorf("failed to unmarshal topic %s.%s configuration: %v", namespace, topicName, err) } - // Extract the record type (schema) - if conf.RecordType != nil { - recordType = conf.RecordType - } else { - return fmt.Errorf("no schema found for topic %s.%s", namespace, topicName) - } + // Extract flat schema, key columns, and schema format + flatSchema = conf.MessageRecordType + keyColumns = conf.KeyColumns + schemaFormat = conf.SchemaFormat return nil }) if err != nil { - return nil, err - } - - if recordType == nil { - return nil, fmt.Errorf("no record type found for topic %s.%s", namespace, topicName) + return nil, nil, "", err } - return recordType, nil + return flatSchema, keyColumns, schemaFormat, nil } -// ConfigureTopic creates or modifies a topic configuration -// Assumption: Uses existing ConfigureTopic gRPC method for topic management -func (c *BrokerClient) ConfigureTopic(ctx context.Context, namespace, topicName string, partitionCount int32, recordType *schema_pb.RecordType) error { +// ConfigureTopic creates or modifies a topic using flat schema format +func (c *BrokerClient) ConfigureTopic(ctx context.Context, namespace, topicName string, partitionCount int32, flatSchema *schema_pb.RecordType, keyColumns []string) error { if err := c.findBrokerBalancer(); err != nil { return err } @@ -342,14 +325,15 @@ func (c *BrokerClient) ConfigureTopic(ctx context.Context, namespace, topicName client := mq_pb.NewSeaweedMessagingClient(conn) - // Create topic configuration + // Create topic configuration using flat schema format _, err = client.ConfigureTopic(ctx, &mq_pb.ConfigureTopicRequest{ Topic: &schema_pb.Topic{ Namespace: namespace, Name: topicName, }, - PartitionCount: partitionCount, - RecordType: recordType, + PartitionCount: partitionCount, + MessageRecordType: flatSchema, + KeyColumns: keyColumns, }) if err != nil { return fmt.Errorf("failed to configure topic %s.%s: %v", namespace, topicName, err) @@ -433,15 +417,21 @@ func (c *BrokerClient) ListTopicPartitions(ctx context.Context, namespace, topic // Uses buffer_start metadata from disk files for precise deduplication // This prevents double-counting when combining with disk-based data func (c *BrokerClient) GetUnflushedMessages(ctx context.Context, namespace, topicName string, partition topic.Partition, startTimeNs int64) ([]*filer_pb.LogEntry, error) { + glog.V(2).Infof("GetUnflushedMessages called for %s/%s, partition: RangeStart=%d, RangeStop=%d", + namespace, topicName, partition.RangeStart, partition.RangeStop) + // Step 1: Find the broker that hosts this partition if err := c.findBrokerBalancer(); err != nil { + glog.V(2).Infof("Failed to find broker balancer: %v", err) // Return empty slice if we can't find broker - prevents double-counting return []*filer_pb.LogEntry{}, nil } + glog.V(2).Infof("Found broker at address: %s", c.brokerAddress) // Step 2: Connect to broker conn, err := grpc.Dial(c.brokerAddress, c.grpcDialOption) if err != nil { + glog.V(2).Infof("Failed to connect to broker %s: %v", c.brokerAddress, err) // Return empty slice if connection fails - prevents double-counting return []*filer_pb.LogEntry{}, nil } @@ -449,16 +439,20 @@ func (c *BrokerClient) GetUnflushedMessages(ctx context.Context, namespace, topi client := mq_pb.NewSeaweedMessagingClient(conn) - // Step 3: Get earliest buffer_start from disk files for precise deduplication + // Step 3: For unflushed messages, always start from 0 to get all in-memory data + // The buffer_start metadata in log files uses timestamp-based indices for uniqueness, + // but the broker's LogBuffer uses sequential indices internally (0, 1, 2, 3...) + // For unflushed data queries, we want all messages in the buffer regardless of their + // timestamp-based buffer indices, so we always use 0. topicObj := topic.Topic{Namespace: namespace, Name: topicName} partitionPath := topic.PartitionDir(topicObj, partition) - earliestBufferIndex, err := c.getEarliestBufferStart(ctx, partitionPath) - if err != nil { - // If we can't get buffer info, use 0 (get all unflushed data) - earliestBufferIndex = 0 - } + glog.V(2).Infof("Getting buffer start from partition path: %s", partitionPath) + + // Always use 0 for unflushed messages to ensure we get all in-memory data + earliestBufferOffset := int64(0) + glog.V(2).Infof("Using StartBufferOffset=0 for unflushed messages (buffer offsets are sequential internally)") - // Step 4: Prepare request using buffer index filtering only + // Step 4: Prepare request using buffer offset filtering only request := &mq_pb.GetUnflushedMessagesRequest{ Topic: &schema_pb.Topic{ Namespace: namespace, @@ -470,12 +464,14 @@ func (c *BrokerClient) GetUnflushedMessages(ctx context.Context, namespace, topi RangeStop: partition.RangeStop, UnixTimeNs: partition.UnixTimeNs, }, - StartBufferIndex: earliestBufferIndex, + StartBufferOffset: earliestBufferOffset, } // Step 5: Call the broker streaming API + glog.V(2).Infof("Calling GetUnflushedMessages gRPC with StartBufferOffset=%d", earliestBufferOffset) stream, err := client.GetUnflushedMessages(ctx, request) if err != nil { + glog.V(2).Infof("GetUnflushedMessages gRPC call failed: %v", err) // Return empty slice if gRPC call fails - prevents double-counting return []*filer_pb.LogEntry{}, nil } @@ -558,19 +554,6 @@ func (c *BrokerClient) getEarliestBufferStart(ctx context.Context, partitionPath return nil }) - // Debug: Show buffer_start determination logic in EXPLAIN mode - if isDebugMode(ctx) && len(bufferStartSources) > 0 { - if logFileCount == 0 && parquetFileCount > 0 { - fmt.Printf("Debug: Using Parquet buffer_start metadata (binary format, no log files) - sources: %v\n", bufferStartSources) - } else if logFileCount > 0 && parquetFileCount > 0 { - fmt.Printf("Debug: Using mixed sources for buffer_start (binary format) - log files: %d, Parquet files: %d, sources: %v\n", - logFileCount, parquetFileCount, bufferStartSources) - } else { - fmt.Printf("Debug: Using log file buffer_start metadata (binary format) - sources: %v\n", bufferStartSources) - } - fmt.Printf("Debug: Earliest buffer_start index: %d\n", earliestBufferIndex) - } - if err != nil { return 0, fmt.Errorf("failed to scan partition directory: %v", err) } diff --git a/weed/query/engine/catalog.go b/weed/query/engine/catalog.go index 4cd39f3f0..f53e4cb2a 100644 --- a/weed/query/engine/catalog.go +++ b/weed/query/engine/catalog.go @@ -17,9 +17,9 @@ import ( type BrokerClientInterface interface { ListNamespaces(ctx context.Context) ([]string, error) ListTopics(ctx context.Context, namespace string) ([]string, error) - GetTopicSchema(ctx context.Context, namespace, topic string) (*schema_pb.RecordType, error) + GetTopicSchema(ctx context.Context, namespace, topic string) (*schema_pb.RecordType, []string, string, error) // Returns (flatSchema, keyColumns, schemaFormat, error) + ConfigureTopic(ctx context.Context, namespace, topicName string, partitionCount int32, flatSchema *schema_pb.RecordType, keyColumns []string) error GetFilerClient() (filer_pb.FilerClient, error) - ConfigureTopic(ctx context.Context, namespace, topicName string, partitionCount int32, recordType *schema_pb.RecordType) error DeleteTopic(ctx context.Context, namespace, topicName string) error // GetUnflushedMessages returns only messages that haven't been flushed to disk yet // This prevents double-counting when combining with disk-based data @@ -151,12 +151,24 @@ func (c *SchemaCatalog) ListTables(database string) ([]string, error) { tables := make([]string, 0, len(db.Tables)) for name := range db.Tables { + // Skip .meta table + if name == ".meta" { + continue + } tables = append(tables, name) } return tables, nil } - return topics, nil + // Filter out .meta table from topics + filtered := make([]string, 0, len(topics)) + for _, topic := range topics { + if topic != ".meta" { + filtered = append(filtered, topic) + } + } + + return filtered, nil } // GetTableInfo returns detailed schema information for a table @@ -185,7 +197,7 @@ func (c *SchemaCatalog) GetTableInfo(database, table string) (*TableInfo, error) ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) defer cancel() - recordType, err := c.brokerClient.GetTopicSchema(ctx, database, table) + recordType, _, _, err := c.brokerClient.GetTopicSchema(ctx, database, table) if err != nil { // If broker unavailable and we have expired cached data, return it if exists { @@ -278,7 +290,27 @@ func (c *SchemaCatalog) RegisterTopic(namespace, topicName string, mqSchema *sch // 1. MQ scalar types map directly to SQL types // 2. Complex types (arrays, maps) are serialized as JSON strings // 3. All fields are nullable unless specifically marked otherwise +// 4. If no schema is defined, create a default schema with system fields and _value func (c *SchemaCatalog) convertMQSchemaToTableInfo(namespace, topicName string, mqSchema *schema.Schema) (*TableInfo, error) { + // Check if the schema has a valid RecordType + if mqSchema == nil || mqSchema.RecordType == nil { + // For topics without schema, create a default schema with system fields and _value + columns := []ColumnInfo{ + {Name: SW_DISPLAY_NAME_TIMESTAMP, Type: "TIMESTAMP", Nullable: true}, + {Name: SW_COLUMN_NAME_KEY, Type: "VARBINARY", Nullable: true}, + {Name: SW_COLUMN_NAME_SOURCE, Type: "VARCHAR(255)", Nullable: true}, + {Name: SW_COLUMN_NAME_VALUE, Type: "VARBINARY", Nullable: true}, + } + + return &TableInfo{ + Name: topicName, + Namespace: namespace, + Schema: nil, // No schema defined + Columns: columns, + RevisionId: 0, + }, nil + } + columns := make([]ColumnInfo, len(mqSchema.RecordType.Fields)) for i, field := range mqSchema.RecordType.Fields { diff --git a/weed/query/engine/catalog_no_schema_test.go b/weed/query/engine/catalog_no_schema_test.go new file mode 100644 index 000000000..0c0312cee --- /dev/null +++ b/weed/query/engine/catalog_no_schema_test.go @@ -0,0 +1,101 @@ +package engine + +import ( + "testing" + + "github.com/seaweedfs/seaweedfs/weed/mq/schema" +) + +// TestConvertMQSchemaToTableInfo_NoSchema tests that topics without schemas +// get a default schema with system fields and _value field +func TestConvertMQSchemaToTableInfo_NoSchema(t *testing.T) { + catalog := NewSchemaCatalog("localhost:9333") + + tests := []struct { + name string + mqSchema *schema.Schema + expectError bool + checkFields func(*testing.T, *TableInfo) + }{ + { + name: "nil schema", + mqSchema: nil, + expectError: false, + checkFields: func(t *testing.T, info *TableInfo) { + if info.Schema != nil { + t.Error("Expected Schema to be nil for topics without schema") + } + if len(info.Columns) != 4 { + t.Errorf("Expected 4 columns, got %d", len(info.Columns)) + } + expectedCols := map[string]string{ + "_ts": "TIMESTAMP", + "_key": "VARBINARY", + "_source": "VARCHAR(255)", + "_value": "VARBINARY", + } + for _, col := range info.Columns { + expectedType, ok := expectedCols[col.Name] + if !ok { + t.Errorf("Unexpected column: %s", col.Name) + continue + } + if col.Type != expectedType { + t.Errorf("Column %s: expected type %s, got %s", col.Name, expectedType, col.Type) + } + } + }, + }, + { + name: "schema with nil RecordType", + mqSchema: &schema.Schema{ + RecordType: nil, + RevisionId: 1, + }, + expectError: false, + checkFields: func(t *testing.T, info *TableInfo) { + if info.Schema != nil { + t.Error("Expected Schema to be nil for topics without RecordType") + } + if len(info.Columns) != 4 { + t.Errorf("Expected 4 columns, got %d", len(info.Columns)) + } + }, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + tableInfo, err := catalog.convertMQSchemaToTableInfo("test_namespace", "test_topic", tt.mqSchema) + + if tt.expectError { + if err == nil { + t.Error("Expected error but got none") + } + return + } + + if err != nil { + t.Errorf("Unexpected error: %v", err) + return + } + + if tableInfo == nil { + t.Error("Expected tableInfo but got nil") + return + } + + if tt.checkFields != nil { + tt.checkFields(t, tableInfo) + } + + // Basic checks + if tableInfo.Name != "test_topic" { + t.Errorf("Expected Name 'test_topic', got '%s'", tableInfo.Name) + } + if tableInfo.Namespace != "test_namespace" { + t.Errorf("Expected Namespace 'test_namespace', got '%s'", tableInfo.Namespace) + } + }) + } +} diff --git a/weed/query/engine/cockroach_parser_success_test.go b/weed/query/engine/cockroach_parser_success_test.go index 499d0c28e..f810e604c 100644 --- a/weed/query/engine/cockroach_parser_success_test.go +++ b/weed/query/engine/cockroach_parser_success_test.go @@ -73,17 +73,17 @@ func TestCockroachDBParserSuccess(t *testing.T) { result, err := engine.ExecuteSQL(context.Background(), tc.sql) if err != nil { - t.Errorf("❌ %s - Query failed: %v", tc.desc, err) + t.Errorf("%s - Query failed: %v", tc.desc, err) return } if result.Error != nil { - t.Errorf("❌ %s - Query result error: %v", tc.desc, result.Error) + t.Errorf("%s - Query result error: %v", tc.desc, result.Error) return } if len(result.Rows) == 0 { - t.Errorf("❌ %s - Expected at least one row", tc.desc) + t.Errorf("%s - Expected at least one row", tc.desc) return } diff --git a/weed/query/engine/complete_sql_fixes_test.go b/weed/query/engine/complete_sql_fixes_test.go index 19d7d59fb..e984ce0e1 100644 --- a/weed/query/engine/complete_sql_fixes_test.go +++ b/weed/query/engine/complete_sql_fixes_test.go @@ -24,19 +24,19 @@ func TestCompleteSQLFixes(t *testing.T) { name: "OriginalFailingQuery1", timestamp: 1756947416566456262, id: 897795, - sql: "select id, _timestamp_ns as ts from ecommerce.user_events where ts = 1756947416566456262", + sql: "select id, _ts_ns as ts from ecommerce.user_events where ts = 1756947416566456262", }, { name: "OriginalFailingQuery2", timestamp: 1756947416566439304, id: 715356, - sql: "select id, _timestamp_ns as ts from ecommerce.user_events where ts = 1756947416566439304", + sql: "select id, _ts_ns as ts from ecommerce.user_events where ts = 1756947416566439304", }, { name: "CurrentDataQuery", timestamp: 1756913789829292386, id: 82460, - sql: "select id, _timestamp_ns as ts from ecommerce.user_events where ts = 1756913789829292386", + sql: "select id, _ts_ns as ts from ecommerce.user_events where ts = 1756913789829292386", }, } @@ -45,8 +45,8 @@ func TestCompleteSQLFixes(t *testing.T) { // Create test record matching the production data testRecord := &schema_pb.RecordValue{ Fields: map[string]*schema_pb.Value{ - "_timestamp_ns": {Kind: &schema_pb.Value_Int64Value{Int64Value: tc.timestamp}}, - "id": {Kind: &schema_pb.Value_Int64Value{Int64Value: tc.id}}, + "_ts_ns": {Kind: &schema_pb.Value_Int64Value{Int64Value: tc.timestamp}}, + "id": {Kind: &schema_pb.Value_Int64Value{Int64Value: tc.id}}, }, } @@ -67,8 +67,8 @@ func TestCompleteSQLFixes(t *testing.T) { // Verify precision is maintained (timestamp fixes) testRecordOffBy1 := &schema_pb.RecordValue{ Fields: map[string]*schema_pb.Value{ - "_timestamp_ns": {Kind: &schema_pb.Value_Int64Value{Int64Value: tc.timestamp + 1}}, - "id": {Kind: &schema_pb.Value_Int64Value{Int64Value: tc.id}}, + "_ts_ns": {Kind: &schema_pb.Value_Int64Value{Int64Value: tc.timestamp + 1}}, + "id": {Kind: &schema_pb.Value_Int64Value{Int64Value: tc.id}}, }, } @@ -84,9 +84,9 @@ func TestCompleteSQLFixes(t *testing.T) { testRecord := &schema_pb.RecordValue{ Fields: map[string]*schema_pb.Value{ - "_timestamp_ns": {Kind: &schema_pb.Value_Int64Value{Int64Value: largeTimestamp}}, - "id": {Kind: &schema_pb.Value_Int64Value{Int64Value: 897795}}, - "user_id": {Kind: &schema_pb.Value_StringValue{StringValue: "user123"}}, + "_ts_ns": {Kind: &schema_pb.Value_Int64Value{Int64Value: largeTimestamp}}, + "id": {Kind: &schema_pb.Value_Int64Value{Int64Value: 897795}}, + "user_id": {Kind: &schema_pb.Value_StringValue{StringValue: "user123"}}, }, } @@ -96,7 +96,7 @@ func TestCompleteSQLFixes(t *testing.T) { // 3. Multiple conditions // 4. Different data types sql := `SELECT - _timestamp_ns AS ts, + _ts_ns AS ts, id AS record_id, user_id AS uid FROM ecommerce.user_events @@ -117,9 +117,9 @@ func TestCompleteSQLFixes(t *testing.T) { // Test that precision is still maintained in complex queries testRecordDifferentTimestamp := &schema_pb.RecordValue{ Fields: map[string]*schema_pb.Value{ - "_timestamp_ns": {Kind: &schema_pb.Value_Int64Value{Int64Value: largeTimestamp + 1}}, // Off by 1ns - "id": {Kind: &schema_pb.Value_Int64Value{Int64Value: 897795}}, - "user_id": {Kind: &schema_pb.Value_StringValue{StringValue: "user123"}}, + "_ts_ns": {Kind: &schema_pb.Value_Int64Value{Int64Value: largeTimestamp + 1}}, // Off by 1ns + "id": {Kind: &schema_pb.Value_Int64Value{Int64Value: 897795}}, + "user_id": {Kind: &schema_pb.Value_StringValue{StringValue: "user123"}}, }, } @@ -131,13 +131,13 @@ func TestCompleteSQLFixes(t *testing.T) { // Ensure that non-alias queries continue to work exactly as before testRecord := &schema_pb.RecordValue{ Fields: map[string]*schema_pb.Value{ - "_timestamp_ns": {Kind: &schema_pb.Value_Int64Value{Int64Value: 1756947416566456262}}, - "id": {Kind: &schema_pb.Value_Int64Value{Int64Value: 897795}}, + "_ts_ns": {Kind: &schema_pb.Value_Int64Value{Int64Value: 1756947416566456262}}, + "id": {Kind: &schema_pb.Value_Int64Value{Int64Value: 897795}}, }, } // Traditional query (no aliases) - should work exactly as before - traditionalSQL := "SELECT _timestamp_ns, id FROM ecommerce.user_events WHERE _timestamp_ns = 1756947416566456262 AND id = 897795" + traditionalSQL := "SELECT _ts_ns, id FROM ecommerce.user_events WHERE _ts_ns = 1756947416566456262 AND id = 897795" stmt, err := ParseSQL(traditionalSQL) assert.NoError(t, err) @@ -162,13 +162,13 @@ func TestCompleteSQLFixes(t *testing.T) { // Test that the fixes don't introduce performance or stability issues testRecord := &schema_pb.RecordValue{ Fields: map[string]*schema_pb.Value{ - "_timestamp_ns": {Kind: &schema_pb.Value_Int64Value{Int64Value: 1756947416566456262}}, - "id": {Kind: &schema_pb.Value_Int64Value{Int64Value: 897795}}, + "_ts_ns": {Kind: &schema_pb.Value_Int64Value{Int64Value: 1756947416566456262}}, + "id": {Kind: &schema_pb.Value_Int64Value{Int64Value: 897795}}, }, } // Run the same query many times to test stability - sql := "SELECT _timestamp_ns AS ts, id FROM test WHERE ts = 1756947416566456262" + sql := "SELECT _ts_ns AS ts, id FROM test WHERE ts = 1756947416566456262" stmt, err := ParseSQL(sql) assert.NoError(t, err) @@ -194,7 +194,7 @@ func TestCompleteSQLFixes(t *testing.T) { // Test with nil SelectExprs (should fall back to no-alias behavior) compExpr := &ComparisonExpr{ - Left: &ColName{Name: stringValue("_timestamp_ns")}, + Left: &ColName{Name: stringValue("_ts_ns")}, Operator: "=", Right: &SQLVal{Type: IntVal, Val: []byte("1756947416566456262")}, } @@ -218,43 +218,43 @@ func TestSQLFixesSummary(t *testing.T) { // The "before and after" test testRecord := &schema_pb.RecordValue{ Fields: map[string]*schema_pb.Value{ - "_timestamp_ns": {Kind: &schema_pb.Value_Int64Value{Int64Value: 1756947416566456262}}, - "id": {Kind: &schema_pb.Value_Int64Value{Int64Value: 897795}}, + "_ts_ns": {Kind: &schema_pb.Value_Int64Value{Int64Value: 1756947416566456262}}, + "id": {Kind: &schema_pb.Value_Int64Value{Int64Value: 897795}}, }, } // What was failing before (would return 0 rows) - failingSQL := "SELECT id, _timestamp_ns AS ts FROM ecommerce.user_events WHERE ts = 1756947416566456262" + failingSQL := "SELECT id, _ts_ns AS ts FROM ecommerce.user_events WHERE ts = 1756947416566456262" // What works now stmt, err := ParseSQL(failingSQL) - assert.NoError(t, err, "✅ SQL parsing works") + assert.NoError(t, err, "SQL parsing works") selectStmt := stmt.(*SelectStatement) predicate, err := engine.buildPredicateWithContext(selectStmt.Where.Expr, selectStmt.SelectExprs) - assert.NoError(t, err, "✅ Predicate building works with aliases") + assert.NoError(t, err, "Predicate building works with aliases") result := predicate(testRecord) - assert.True(t, result, "✅ Originally failing query now works perfectly") + assert.True(t, result, "Originally failing query now works perfectly") // Verify precision is maintained testRecordOffBy1 := &schema_pb.RecordValue{ Fields: map[string]*schema_pb.Value{ - "_timestamp_ns": {Kind: &schema_pb.Value_Int64Value{Int64Value: 1756947416566456263}}, - "id": {Kind: &schema_pb.Value_Int64Value{Int64Value: 897795}}, + "_ts_ns": {Kind: &schema_pb.Value_Int64Value{Int64Value: 1756947416566456263}}, + "id": {Kind: &schema_pb.Value_Int64Value{Int64Value: 897795}}, }, } result2 := predicate(testRecordOffBy1) - assert.False(t, result2, "✅ Nanosecond precision maintained") - - t.Log("🎉 ALL SQL FIXES VERIFIED:") - t.Log(" ✅ Timestamp precision for large int64 values") - t.Log(" ✅ SQL alias resolution in WHERE clauses") - t.Log(" ✅ Scan boundary fixes for equality queries") - t.Log(" ✅ Range query fixes for equal boundaries") - t.Log(" ✅ Hybrid scanner time range handling") - t.Log(" ✅ Backward compatibility maintained") - t.Log(" ✅ Production stability verified") + assert.False(t, result2, "Nanosecond precision maintained") + + t.Log("ALL SQL FIXES VERIFIED:") + t.Log(" Timestamp precision for large int64 values") + t.Log(" SQL alias resolution in WHERE clauses") + t.Log(" Scan boundary fixes for equality queries") + t.Log(" Range query fixes for equal boundaries") + t.Log(" Hybrid scanner time range handling") + t.Log(" Backward compatibility maintained") + t.Log(" Production stability verified") }) } diff --git a/weed/query/engine/describe.go b/weed/query/engine/describe.go index 3a26bb2a6..415fc8e17 100644 --- a/weed/query/engine/describe.go +++ b/weed/query/engine/describe.go @@ -27,8 +27,8 @@ func (e *SQLEngine) executeDescribeStatement(ctx context.Context, tableName stri } } - // Get topic schema from broker - recordType, err := e.catalog.brokerClient.GetTopicSchema(ctx, database, tableName) + // Get flat schema and key columns from broker + flatSchema, keyColumns, _, err := e.catalog.brokerClient.GetTopicSchema(ctx, database, tableName) if err != nil { return &QueryResult{Error: err}, err } @@ -44,38 +44,71 @@ func (e *SQLEngine) executeDescribeStatement(ctx context.Context, tableName stri {"_source", "VARCHAR(255)", "System column: Data source (parquet/log)"}, } - // Format schema as DESCRIBE output (regular fields + system columns) - totalRows := len(recordType.Fields) + len(systemColumns) + // If no schema is defined, include _value field + if flatSchema == nil { + systemColumns = append(systemColumns, struct { + Name string + Type string + Extra string + }{SW_COLUMN_NAME_VALUE, "VARBINARY", "Raw message value (no schema defined)"}) + } + + // Calculate total rows: schema fields + system columns + totalRows := len(systemColumns) + if flatSchema != nil { + totalRows += len(flatSchema.Fields) + } + + // Create key column lookup map + keyColumnMap := make(map[string]bool) + for _, keyCol := range keyColumns { + keyColumnMap[keyCol] = true + } + result := &QueryResult{ Columns: []string{"Field", "Type", "Null", "Key", "Default", "Extra"}, Rows: make([][]sqltypes.Value, totalRows), } - // Add regular fields - for i, field := range recordType.Fields { - sqlType := e.convertMQTypeToSQL(field.Type) - - result.Rows[i] = []sqltypes.Value{ - sqltypes.NewVarChar(field.Name), // Field - sqltypes.NewVarChar(sqlType), // Type - sqltypes.NewVarChar("YES"), // Null (assume nullable) - sqltypes.NewVarChar(""), // Key (no keys for now) - sqltypes.NewVarChar("NULL"), // Default - sqltypes.NewVarChar(""), // Extra + rowIndex := 0 + + // Add schema fields - mark key columns appropriately + if flatSchema != nil { + for _, field := range flatSchema.Fields { + sqlType := e.convertMQTypeToSQL(field.Type) + isKey := keyColumnMap[field.Name] + keyType := "" + if isKey { + keyType = "PRI" // Primary key + } + extra := "Data field" + if isKey { + extra = "Key field" + } + + result.Rows[rowIndex] = []sqltypes.Value{ + sqltypes.NewVarChar(field.Name), + sqltypes.NewVarChar(sqlType), + sqltypes.NewVarChar("YES"), + sqltypes.NewVarChar(keyType), + sqltypes.NewVarChar("NULL"), + sqltypes.NewVarChar(extra), + } + rowIndex++ } } // Add system columns - for i, sysCol := range systemColumns { - rowIndex := len(recordType.Fields) + i + for _, sysCol := range systemColumns { result.Rows[rowIndex] = []sqltypes.Value{ sqltypes.NewVarChar(sysCol.Name), // Field sqltypes.NewVarChar(sysCol.Type), // Type sqltypes.NewVarChar("YES"), // Null - sqltypes.NewVarChar(""), // Key + sqltypes.NewVarChar("SYS"), // Key - mark as system column sqltypes.NewVarChar("NULL"), // Default sqltypes.NewVarChar(sysCol.Extra), // Extra - description } + rowIndex++ } return result, nil diff --git a/weed/query/engine/engine.go b/weed/query/engine/engine.go index ffed03f35..e00fd78ca 100644 --- a/weed/query/engine/engine.go +++ b/weed/query/engine/engine.go @@ -1513,47 +1513,49 @@ func (e *SQLEngine) executeSelectStatementWithPlan(ctx context.Context, stmt *Se var result *QueryResult var err error - if hasAggregations { - // Extract table information for aggregation execution - var database, tableName string - if len(stmt.From) == 1 { - if table, ok := stmt.From[0].(*AliasedTableExpr); ok { - if tableExpr, ok := table.Expr.(TableName); ok { - tableName = tableExpr.Name.String() - if tableExpr.Qualifier != nil && tableExpr.Qualifier.String() != "" { - database = tableExpr.Qualifier.String() - } + // Extract table information for execution (needed for both aggregation and regular queries) + var database, tableName string + if len(stmt.From) == 1 { + if table, ok := stmt.From[0].(*AliasedTableExpr); ok { + if tableExpr, ok := table.Expr.(TableName); ok { + tableName = tableExpr.Name.String() + if tableExpr.Qualifier != nil && tableExpr.Qualifier.String() != "" { + database = tableExpr.Qualifier.String() } } } + } - // Use current database if not specified + // Use current database if not specified + if database == "" { + database = e.catalog.currentDatabase if database == "" { - database = e.catalog.currentDatabase - if database == "" { - database = "default" - } - } - - // Create hybrid scanner for aggregation execution - var filerClient filer_pb.FilerClient - if e.catalog.brokerClient != nil { - filerClient, err = e.catalog.brokerClient.GetFilerClient() - if err != nil { - return &QueryResult{Error: err}, err - } + database = "default" } + } - hybridScanner, err := NewHybridMessageScanner(filerClient, e.catalog.brokerClient, database, tableName, e) + // CRITICAL FIX: Always use HybridMessageScanner for ALL queries to read both flushed and unflushed data + // Create hybrid scanner for both aggregation and regular SELECT queries + var filerClient filer_pb.FilerClient + if e.catalog.brokerClient != nil { + filerClient, err = e.catalog.brokerClient.GetFilerClient() if err != nil { return &QueryResult{Error: err}, err } + } + hybridScanner, err := NewHybridMessageScanner(filerClient, e.catalog.brokerClient, database, tableName, e) + if err != nil { + return &QueryResult{Error: err}, err + } + + if hasAggregations { // Execute aggregation query with plan tracking result, err = e.executeAggregationQueryWithPlan(ctx, hybridScanner, aggregations, stmt, plan) } else { - // Regular SELECT query with plan tracking - result, err = e.executeSelectStatementWithBrokerStats(ctx, stmt, plan) + // CRITICAL FIX: Use HybridMessageScanner for regular SELECT queries too + // This ensures both flushed and unflushed data are read + result, err = e.executeRegularSelectWithHybridScanner(ctx, hybridScanner, stmt, plan) } if err == nil && result != nil { @@ -1981,6 +1983,198 @@ func (e *SQLEngine) executeSelectStatement(ctx context.Context, stmt *SelectStat return e.ConvertToSQLResultWithExpressions(hybridScanner, results, stmt.SelectExprs), nil } +// executeRegularSelectWithHybridScanner handles regular SELECT queries using HybridMessageScanner +// This ensures both flushed and unflushed data are read, fixing the SQL empty results issue +func (e *SQLEngine) executeRegularSelectWithHybridScanner(ctx context.Context, hybridScanner *HybridMessageScanner, stmt *SelectStatement, plan *QueryExecutionPlan) (*QueryResult, error) { + // Parse SELECT expressions to determine columns and detect aggregations + var columns []string + var aggregations []AggregationSpec + var hasAggregations bool + selectAll := false + baseColumnsSet := make(map[string]bool) // Track base columns needed for expressions + + for _, selectExpr := range stmt.SelectExprs { + switch expr := selectExpr.(type) { + case *StarExpr: + selectAll = true + case *AliasedExpr: + switch col := expr.Expr.(type) { + case *ColName: + columnName := col.Name.String() + columns = append(columns, columnName) + baseColumnsSet[columnName] = true + case *FuncExpr: + funcName := strings.ToLower(col.Name.String()) + if e.isAggregationFunction(funcName) { + // Handle aggregation functions + aggSpec, err := e.parseAggregationFunction(col, expr) + if err != nil { + return &QueryResult{Error: err}, err + } + aggregations = append(aggregations, *aggSpec) + hasAggregations = true + } else if e.isStringFunction(funcName) { + // Handle string functions like UPPER, LENGTH, etc. + columns = append(columns, e.getStringFunctionAlias(col)) + // Extract base columns needed for this string function + e.extractBaseColumnsFromFunction(col, baseColumnsSet) + } else if e.isDateTimeFunction(funcName) { + // Handle datetime functions like CURRENT_DATE, NOW, EXTRACT, DATE_TRUNC + columns = append(columns, e.getDateTimeFunctionAlias(col)) + // Extract base columns needed for this datetime function + e.extractBaseColumnsFromFunction(col, baseColumnsSet) + } else { + return &QueryResult{Error: fmt.Errorf("unsupported function: %s", funcName)}, fmt.Errorf("unsupported function: %s", funcName) + } + default: + err := fmt.Errorf("unsupported SELECT expression: %T", col) + return &QueryResult{Error: err}, err + } + default: + err := fmt.Errorf("unsupported SELECT expression: %T", expr) + return &QueryResult{Error: err}, err + } + } + + // If we have aggregations, delegate to aggregation handler + if hasAggregations { + return e.executeAggregationQuery(ctx, hybridScanner, aggregations, stmt) + } + + // Parse WHERE clause for predicate pushdown + var predicate func(*schema_pb.RecordValue) bool + var err error + if stmt.Where != nil { + predicate, err = e.buildPredicateWithContext(stmt.Where.Expr, stmt.SelectExprs) + if err != nil { + return &QueryResult{Error: err}, err + } + } + + // Parse LIMIT and OFFSET clauses + // Use -1 to distinguish "no LIMIT" from "LIMIT 0" + limit := -1 + offset := 0 + if stmt.Limit != nil && stmt.Limit.Rowcount != nil { + switch limitExpr := stmt.Limit.Rowcount.(type) { + case *SQLVal: + if limitExpr.Type == IntVal { + var parseErr error + limit64, parseErr := strconv.ParseInt(string(limitExpr.Val), 10, 64) + if parseErr != nil { + return &QueryResult{Error: parseErr}, parseErr + } + if limit64 > math.MaxInt32 || limit64 < 0 { + return &QueryResult{Error: fmt.Errorf("LIMIT value %d is out of valid range", limit64)}, fmt.Errorf("LIMIT value %d is out of valid range", limit64) + } + limit = int(limit64) + } + } + } + + // Parse OFFSET clause if present + if stmt.Limit != nil && stmt.Limit.Offset != nil { + switch offsetExpr := stmt.Limit.Offset.(type) { + case *SQLVal: + if offsetExpr.Type == IntVal { + var parseErr error + offset64, parseErr := strconv.ParseInt(string(offsetExpr.Val), 10, 64) + if parseErr != nil { + return &QueryResult{Error: parseErr}, parseErr + } + if offset64 > math.MaxInt32 || offset64 < 0 { + return &QueryResult{Error: fmt.Errorf("OFFSET value %d is out of valid range", offset64)}, fmt.Errorf("OFFSET value %d is out of valid range", offset64) + } + offset = int(offset64) + } + } + } + + // Build hybrid scan options + // Extract time filters from WHERE clause to optimize scanning + startTimeNs, stopTimeNs := int64(0), int64(0) + if stmt.Where != nil { + startTimeNs, stopTimeNs = e.extractTimeFilters(stmt.Where.Expr) + } + + hybridScanOptions := HybridScanOptions{ + StartTimeNs: startTimeNs, // Extracted from WHERE clause time comparisons + StopTimeNs: stopTimeNs, // Extracted from WHERE clause time comparisons + Limit: limit, + Offset: offset, + Predicate: predicate, + } + + if !selectAll { + // Convert baseColumnsSet to slice for hybrid scan options + baseColumns := make([]string, 0, len(baseColumnsSet)) + for columnName := range baseColumnsSet { + baseColumns = append(baseColumns, columnName) + } + // Use base columns (not expression aliases) for data retrieval + if len(baseColumns) > 0 { + hybridScanOptions.Columns = baseColumns + } else { + // If no base columns found (shouldn't happen), use original columns + hybridScanOptions.Columns = columns + } + } + + // Execute the hybrid scan (both flushed and unflushed data) + var results []HybridScanResult + if plan != nil { + // EXPLAIN mode - capture broker buffer stats + var stats *HybridScanStats + results, stats, err = hybridScanner.ScanWithStats(ctx, hybridScanOptions) + if err != nil { + return &QueryResult{Error: err}, err + } + + // Populate plan with broker buffer information + if stats != nil { + plan.BrokerBufferQueried = stats.BrokerBufferQueried + plan.BrokerBufferMessages = stats.BrokerBufferMessages + plan.BufferStartIndex = stats.BufferStartIndex + + // Add broker_buffer to data sources if buffer was queried + if stats.BrokerBufferQueried { + // Check if broker_buffer is already in data sources + hasBrokerBuffer := false + for _, source := range plan.DataSources { + if source == "broker_buffer" { + hasBrokerBuffer = true + break + } + } + if !hasBrokerBuffer { + plan.DataSources = append(plan.DataSources, "broker_buffer") + } + } + } + } else { + // Normal mode - just get results + results, err = hybridScanner.Scan(ctx, hybridScanOptions) + if err != nil { + return &QueryResult{Error: err}, err + } + } + + // Convert to SQL result format + if selectAll { + if len(columns) > 0 { + // SELECT *, specific_columns - include both auto-discovered and explicit columns + return hybridScanner.ConvertToSQLResultWithMixedColumns(results, columns), nil + } else { + // SELECT * only - let converter determine all columns (excludes system columns) + columns = nil + return hybridScanner.ConvertToSQLResult(results, columns), nil + } + } + + // Handle custom column expressions (including arithmetic) + return e.ConvertToSQLResultWithExpressions(hybridScanner, results, stmt.SelectExprs), nil +} + // executeSelectStatementWithBrokerStats handles SELECT queries with broker buffer statistics capture // This is used by EXPLAIN queries to capture complete data source information including broker memory func (e *SQLEngine) executeSelectStatementWithBrokerStats(ctx context.Context, stmt *SelectStatement, plan *QueryExecutionPlan) (*QueryResult, error) { @@ -2237,10 +2431,6 @@ func (e *SQLEngine) executeSelectStatementWithBrokerStats(ctx context.Context, s plan.Details[PlanDetailStartTimeNs] = startTimeNs plan.Details[PlanDetailStopTimeNs] = stopTimeNs - if isDebugMode(ctx) { - fmt.Printf("Debug: Time filters extracted - startTimeNs=%d stopTimeNs=%d\n", startTimeNs, stopTimeNs) - } - // Collect actual file information for each partition var parquetFiles []string var liveLogFiles []string @@ -2261,9 +2451,6 @@ func (e *SQLEngine) executeSelectStatementWithBrokerStats(ctx context.Context, s columnPrunedCount := beforeColumnPrune - len(filteredStats) if columnPrunedCount > 0 { - if isDebugMode(ctx) { - fmt.Printf("Debug: Column statistics pruning skipped %d parquet files in %s\n", columnPrunedCount, partitionPath) - } // Track column statistics optimization if !contains(plan.OptimizationsUsed, "column_statistics_pruning") { plan.OptimizationsUsed = append(plan.OptimizationsUsed, "column_statistics_pruning") @@ -2275,9 +2462,6 @@ func (e *SQLEngine) executeSelectStatementWithBrokerStats(ctx context.Context, s } } else { parquetReadErrors = append(parquetReadErrors, fmt.Sprintf("%s: %v", partitionPath, err)) - if isDebugMode(ctx) { - fmt.Printf("Debug: Failed to read parquet statistics in %s: %v\n", partitionPath, err) - } } // Merge accurate parquet sources from metadata @@ -2298,9 +2482,6 @@ func (e *SQLEngine) executeSelectStatementWithBrokerStats(ctx context.Context, s } } else { liveLogListErrors = append(liveLogListErrors, fmt.Sprintf("%s: %v", partitionPath, err)) - if isDebugMode(ctx) { - fmt.Printf("Debug: Failed to list live log files in %s: %v\n", partitionPath, err) - } } } @@ -2559,7 +2740,6 @@ func pruneParquetFilesByTime(ctx context.Context, parquetStats []*ParquetFileSta return parquetStats } - debugEnabled := ctx != nil && isDebugMode(ctx) qStart := startTimeNs qStop := stopTimeNs if qStop == 0 { @@ -2568,21 +2748,10 @@ func pruneParquetFilesByTime(ctx context.Context, parquetStats []*ParquetFileSta n := 0 for _, fs := range parquetStats { - if debugEnabled { - fmt.Printf("Debug: Checking parquet file %s for pruning\n", fs.FileName) - } if minNs, maxNs, ok := hybridScanner.getTimestampRangeFromStats(fs); ok { - if debugEnabled { - fmt.Printf("Debug: Prune check parquet %s min=%d max=%d qStart=%d qStop=%d\n", fs.FileName, minNs, maxNs, qStart, qStop) - } if qStop < minNs || (qStart != 0 && qStart > maxNs) { - if debugEnabled { - fmt.Printf("Debug: Skipping parquet file %s due to no time overlap\n", fs.FileName) - } continue } - } else if debugEnabled { - fmt.Printf("Debug: No stats range available for parquet %s, cannot prune\n", fs.FileName) } parquetStats[n] = fs n++ @@ -2596,13 +2765,9 @@ func (e *SQLEngine) pruneParquetFilesByColumnStats(ctx context.Context, parquetS return parquetStats } - debugEnabled := ctx != nil && isDebugMode(ctx) n := 0 for _, fs := range parquetStats { if e.canSkipParquetFile(ctx, fs, whereExpr) { - if debugEnabled { - fmt.Printf("Debug: Skipping parquet file %s due to column statistics pruning\n", fs.FileName) - } continue } parquetStats[n] = fs @@ -2726,7 +2891,6 @@ func (e *SQLEngine) flipOperator(op string) string { // populatePlanFileDetails populates execution plan with detailed file information for partitions // Includes column statistics pruning optimization when WHERE clause is provided func (e *SQLEngine) populatePlanFileDetails(ctx context.Context, plan *QueryExecutionPlan, hybridScanner *HybridMessageScanner, partitions []string, stmt *SelectStatement) { - debugEnabled := ctx != nil && isDebugMode(ctx) // Collect actual file information for each partition var parquetFiles []string var liveLogFiles []string @@ -2750,9 +2914,6 @@ func (e *SQLEngine) populatePlanFileDetails(ctx context.Context, plan *QueryExec columnPrunedCount := beforeColumnPrune - len(filteredStats) if columnPrunedCount > 0 { - if debugEnabled { - fmt.Printf("Debug: Column statistics pruning skipped %d parquet files in %s\n", columnPrunedCount, partitionPath) - } // Track column statistics optimization if !contains(plan.OptimizationsUsed, "column_statistics_pruning") { plan.OptimizationsUsed = append(plan.OptimizationsUsed, "column_statistics_pruning") @@ -2765,9 +2926,6 @@ func (e *SQLEngine) populatePlanFileDetails(ctx context.Context, plan *QueryExec } } else { parquetReadErrors = append(parquetReadErrors, fmt.Sprintf("%s: %v", partitionPath, err)) - if debugEnabled { - fmt.Printf("Debug: Failed to read parquet statistics in %s: %v\n", partitionPath, err) - } } // Merge accurate parquet sources from metadata @@ -2788,9 +2946,6 @@ func (e *SQLEngine) populatePlanFileDetails(ctx context.Context, plan *QueryExec } } else { liveLogListErrors = append(liveLogListErrors, fmt.Sprintf("%s: %v", partitionPath, err)) - if debugEnabled { - fmt.Printf("Debug: Failed to list live log files in %s: %v\n", partitionPath, err) - } } } @@ -3848,7 +4003,7 @@ func (e *SQLEngine) createTable(ctx context.Context, stmt *DDLStatement) (*Query // Create the topic via broker using configurable partition count partitionCount := e.catalog.GetDefaultPartitionCount() - err := e.catalog.brokerClient.ConfigureTopic(ctx, database, tableName, partitionCount, recordType) + err := e.catalog.brokerClient.ConfigureTopic(ctx, database, tableName, partitionCount, recordType, nil) if err != nil { return &QueryResult{Error: err}, err } @@ -4283,29 +4438,29 @@ func (e *SQLEngine) eachLogEntryInFile(filerClient filer_pb.FilerClient, filePat // convertLogEntryToRecordValue helper method (reuse existing logic) func (e *SQLEngine) convertLogEntryToRecordValue(logEntry *filer_pb.LogEntry) (*schema_pb.RecordValue, string, error) { - // Parse the log entry data as Protocol Buffer (not JSON!) + // Try to unmarshal as RecordValue first (schematized data) recordValue := &schema_pb.RecordValue{} - if err := proto.Unmarshal(logEntry.Data, recordValue); err != nil { - return nil, "", fmt.Errorf("failed to unmarshal log entry protobuf: %v", err) - } + err := proto.Unmarshal(logEntry.Data, recordValue) + if err == nil { + // Successfully unmarshaled as RecordValue (valid protobuf) + // Initialize Fields map if nil + if recordValue.Fields == nil { + recordValue.Fields = make(map[string]*schema_pb.Value) + } - // Ensure Fields map exists - if recordValue.Fields == nil { - recordValue.Fields = make(map[string]*schema_pb.Value) - } + // Add system columns from LogEntry + recordValue.Fields[SW_COLUMN_NAME_TIMESTAMP] = &schema_pb.Value{ + Kind: &schema_pb.Value_Int64Value{Int64Value: logEntry.TsNs}, + } + recordValue.Fields[SW_COLUMN_NAME_KEY] = &schema_pb.Value{ + Kind: &schema_pb.Value_BytesValue{BytesValue: logEntry.Key}, + } - // Add system columns - recordValue.Fields[SW_COLUMN_NAME_TIMESTAMP] = &schema_pb.Value{ - Kind: &schema_pb.Value_Int64Value{Int64Value: logEntry.TsNs}, - } - recordValue.Fields[SW_COLUMN_NAME_KEY] = &schema_pb.Value{ - Kind: &schema_pb.Value_BytesValue{BytesValue: logEntry.Key}, + return recordValue, "live_log", nil } - // User data fields are already present in the protobuf-deserialized recordValue - // No additional processing needed since proto.Unmarshal already populated the Fields map - - return recordValue, "live_log", nil + // Failed to unmarshal as RecordValue - invalid protobuf data + return nil, "", fmt.Errorf("failed to unmarshal log entry protobuf: %w", err) } // extractTimestampFromFilename extracts timestamp from parquet filename @@ -4782,7 +4937,7 @@ func (e *SQLEngine) findColumnValue(result HybridScanResult, columnName string) // discoverAndRegisterTopic attempts to discover an existing topic and register it in the SQL catalog func (e *SQLEngine) discoverAndRegisterTopic(ctx context.Context, database, tableName string) error { // First, check if topic exists by trying to get its schema from the broker/filer - recordType, err := e.catalog.brokerClient.GetTopicSchema(ctx, database, tableName) + recordType, _, _, err := e.catalog.brokerClient.GetTopicSchema(ctx, database, tableName) if err != nil { return fmt.Errorf("topic %s.%s not found or no schema available: %v", database, tableName, err) } diff --git a/weed/query/engine/engine_test.go b/weed/query/engine/engine_test.go index 8193afef6..96c5507b0 100644 --- a/weed/query/engine/engine_test.go +++ b/weed/query/engine/engine_test.go @@ -1101,7 +1101,7 @@ func TestSQLEngine_ConvertLogEntryToRecordValue_ComplexDataTypes(t *testing.T) { "float_field": {Kind: &schema_pb.Value_FloatValue{FloatValue: 3.14159}}, "double_field": {Kind: &schema_pb.Value_DoubleValue{DoubleValue: 2.718281828}}, "bool_field": {Kind: &schema_pb.Value_BoolValue{BoolValue: true}}, - "string_field": {Kind: &schema_pb.Value_StringValue{StringValue: "test string with unicode 🎉"}}, + "string_field": {Kind: &schema_pb.Value_StringValue{StringValue: "test string with unicode party"}}, "bytes_field": {Kind: &schema_pb.Value_BytesValue{BytesValue: []byte{0x01, 0x02, 0x03}}}, }, } @@ -1129,7 +1129,7 @@ func TestSQLEngine_ConvertLogEntryToRecordValue_ComplexDataTypes(t *testing.T) { assert.Equal(t, float32(3.14159), result.Fields["float_field"].GetFloatValue()) assert.Equal(t, 2.718281828, result.Fields["double_field"].GetDoubleValue()) assert.Equal(t, true, result.Fields["bool_field"].GetBoolValue()) - assert.Equal(t, "test string with unicode 🎉", result.Fields["string_field"].GetStringValue()) + assert.Equal(t, "test string with unicode party", result.Fields["string_field"].GetStringValue()) assert.Equal(t, []byte{0x01, 0x02, 0x03}, result.Fields["bytes_field"].GetBytesValue()) // System columns should still be present diff --git a/weed/query/engine/fast_path_predicate_validation_test.go b/weed/query/engine/fast_path_predicate_validation_test.go index 3322ed51f..3918fdbf0 100644 --- a/weed/query/engine/fast_path_predicate_validation_test.go +++ b/weed/query/engine/fast_path_predicate_validation_test.go @@ -93,7 +93,7 @@ func TestFastPathPredicateValidation(t *testing.T) { }, { name: "Internal timestamp column", - whereClause: "_timestamp_ns > 1640995200000000000", + whereClause: "_ts_ns > 1640995200000000000", expectedTimeOnly: true, expectedStartTimeNs: 1640995200000000000, description: "Internal timestamp column should allow fast path", @@ -139,7 +139,7 @@ func TestFastPathPredicateValidation(t *testing.T) { t.Errorf("Expected stopTimeNs=%d, got %d", tc.expectedStopTimeNs, stopTimeNs) } - t.Logf("✅ %s: onlyTimePredicates=%v, startTimeNs=%d, stopTimeNs=%d", + t.Logf("%s: onlyTimePredicates=%v, startTimeNs=%d, stopTimeNs=%d", tc.name, onlyTimePredicates, startTimeNs, stopTimeNs) }) } @@ -212,7 +212,7 @@ func TestFastPathAggregationSafety(t *testing.T) { tc.shouldUseFastPath, canAttemptFastPath, tc.description) } - t.Logf("✅ %s: canAttemptFastPath=%v (onlyTimePredicates=%v, startTimeNs=%d, stopTimeNs=%d)", + t.Logf("%s: canAttemptFastPath=%v (onlyTimePredicates=%v, startTimeNs=%d, stopTimeNs=%d)", tc.name, canAttemptFastPath, onlyTimePredicates, startTimeNs, stopTimeNs) }) } @@ -233,7 +233,7 @@ func TestTimestampColumnDetection(t *testing.T) { description: "System timestamp display column should be detected", }, { - columnName: "_timestamp_ns", + columnName: "_ts_ns", isTimestamp: true, description: "Internal timestamp column should be detected", }, @@ -266,7 +266,7 @@ func TestTimestampColumnDetection(t *testing.T) { t.Errorf("Expected isTimestampColumn(%s)=%v, got %v. %s", tc.columnName, tc.isTimestamp, isTimestamp, tc.description) } - t.Logf("✅ Column '%s': isTimestamp=%v", tc.columnName, isTimestamp) + t.Logf("Column '%s': isTimestamp=%v", tc.columnName, isTimestamp) }) } } diff --git a/weed/query/engine/hybrid_message_scanner.go b/weed/query/engine/hybrid_message_scanner.go index eee57bc23..c09ce2f54 100644 --- a/weed/query/engine/hybrid_message_scanner.go +++ b/weed/query/engine/hybrid_message_scanner.go @@ -15,6 +15,7 @@ import ( "github.com/parquet-go/parquet-go" "github.com/seaweedfs/seaweedfs/weed/filer" + "github.com/seaweedfs/seaweedfs/weed/mq" "github.com/seaweedfs/seaweedfs/weed/mq/logstore" "github.com/seaweedfs/seaweedfs/weed/mq/schema" "github.com/seaweedfs/seaweedfs/weed/mq/topic" @@ -41,6 +42,7 @@ type HybridMessageScanner struct { brokerClient BrokerClientInterface // For querying unflushed data topic topic.Topic recordSchema *schema_pb.RecordType + schemaFormat string // Serialization format: "AVRO", "PROTOBUF", "JSON_SCHEMA", or empty for schemaless parquetLevels *schema.ParquetLevels engine *SQLEngine // Reference for system column formatting } @@ -59,26 +61,32 @@ func NewHybridMessageScanner(filerClient filer_pb.FilerClient, brokerClient Brok Name: topicName, } - // Get topic schema from broker client (works with both real and mock clients) - recordType, err := brokerClient.GetTopicSchema(context.Background(), namespace, topicName) + // Get flat schema from broker client + recordType, _, schemaFormat, err := brokerClient.GetTopicSchema(context.Background(), namespace, topicName) if err != nil { - return nil, fmt.Errorf("failed to get topic schema: %v", err) - } - if recordType == nil { - return nil, NoSchemaError{Namespace: namespace, Topic: topicName} + return nil, fmt.Errorf("failed to get topic record type: %v", err) } - // Create a copy of the recordType to avoid modifying the original - recordTypeCopy := &schema_pb.RecordType{ - Fields: make([]*schema_pb.Field, len(recordType.Fields)), - } - copy(recordTypeCopy.Fields, recordType.Fields) + if recordType == nil || len(recordType.Fields) == 0 { + // For topics without schema, create a minimal schema with system fields and _value + recordType = schema.RecordTypeBegin(). + WithField(SW_COLUMN_NAME_TIMESTAMP, schema.TypeInt64). + WithField(SW_COLUMN_NAME_KEY, schema.TypeBytes). + WithField(SW_COLUMN_NAME_VALUE, schema.TypeBytes). // Raw message value + RecordTypeEnd() + } else { + // Create a copy of the recordType to avoid modifying the original + recordTypeCopy := &schema_pb.RecordType{ + Fields: make([]*schema_pb.Field, len(recordType.Fields)), + } + copy(recordTypeCopy.Fields, recordType.Fields) - // Add system columns that MQ adds to all records - recordType = schema.NewRecordTypeBuilder(recordTypeCopy). - WithField(SW_COLUMN_NAME_TIMESTAMP, schema.TypeInt64). - WithField(SW_COLUMN_NAME_KEY, schema.TypeBytes). - RecordTypeEnd() + // Add system columns that MQ adds to all records + recordType = schema.NewRecordTypeBuilder(recordTypeCopy). + WithField(SW_COLUMN_NAME_TIMESTAMP, schema.TypeInt64). + WithField(SW_COLUMN_NAME_KEY, schema.TypeBytes). + RecordTypeEnd() + } // Convert to Parquet levels for efficient reading parquetLevels, err := schema.ToParquetLevels(recordType) @@ -91,6 +99,7 @@ func NewHybridMessageScanner(filerClient filer_pb.FilerClient, brokerClient Brok brokerClient: brokerClient, topic: t, recordSchema: recordType, + schemaFormat: schemaFormat, parquetLevels: parquetLevels, engine: engine, }, nil @@ -335,9 +344,6 @@ func (hms *HybridMessageScanner) scanUnflushedDataWithStats(ctx context.Context, unflushedEntries, err := hms.brokerClient.GetUnflushedMessages(ctx, hms.topic.Namespace, hms.topic.Name, partition, options.StartTimeNs) if err != nil { // Log error but don't fail the query - continue with disk data only - if isDebugMode(ctx) { - fmt.Printf("Debug: Failed to get unflushed messages: %v\n", err) - } // Reset queried flag on error stats.BrokerBufferQueried = false return results, stats, nil @@ -346,18 +352,19 @@ func (hms *HybridMessageScanner) scanUnflushedDataWithStats(ctx context.Context, // Capture stats for EXPLAIN stats.BrokerBufferMessages = len(unflushedEntries) - // Debug logging for EXPLAIN mode - if isDebugMode(ctx) { - fmt.Printf("Debug: Broker buffer queried - found %d unflushed messages\n", len(unflushedEntries)) - if len(unflushedEntries) > 0 { - fmt.Printf("Debug: Using buffer_start deduplication for precise real-time data\n") - } - } - // Step 2: Process unflushed entries (already deduplicated by broker) for _, logEntry := range unflushedEntries { + // Pre-decode DataMessage for reuse in both control check and conversion + var dataMessage *mq_pb.DataMessage + if len(logEntry.Data) > 0 { + dataMessage = &mq_pb.DataMessage{} + if err := proto.Unmarshal(logEntry.Data, dataMessage); err != nil { + dataMessage = nil // Failed to decode, treat as raw data + } + } + // Skip control entries without actual data - if hms.isControlEntry(logEntry) { + if hms.isControlEntryWithDecoded(logEntry, dataMessage) { continue // Skip this entry } @@ -370,11 +377,8 @@ func (hms *HybridMessageScanner) scanUnflushedDataWithStats(ctx context.Context, } // Convert LogEntry to RecordValue format (same as disk data) - recordValue, _, err := hms.convertLogEntryToRecordValue(logEntry) + recordValue, _, err := hms.convertLogEntryToRecordValueWithDecoded(logEntry, dataMessage) if err != nil { - if isDebugMode(ctx) { - fmt.Printf("Debug: Failed to convert unflushed log entry: %v\n", err) - } continue // Skip malformed messages } @@ -429,10 +433,6 @@ func (hms *HybridMessageScanner) scanUnflushedDataWithStats(ctx context.Context, } } - if isDebugMode(ctx) { - fmt.Printf("Debug: Retrieved %d unflushed messages from broker\n", len(results)) - } - return results, stats, nil } @@ -543,12 +543,8 @@ func (hms *HybridMessageScanner) scanPartitionHybridWithStats(ctx context.Contex if err != nil { // Don't fail the query if broker scanning fails, but provide clear warning to user // This ensures users are aware that results may not include the most recent data - if isDebugMode(ctx) { - fmt.Printf("Debug: Failed to scan unflushed data from broker: %v\n", err) - } else { - fmt.Printf("Warning: Unable to access real-time data from message broker: %v\n", err) - fmt.Printf("Note: Query results may not include the most recent unflushed messages\n") - } + fmt.Printf("Warning: Unable to access real-time data from message broker: %v\n", err) + fmt.Printf("Note: Query results may not include the most recent unflushed messages\n") } else if unflushedStats != nil { stats.BrokerBufferQueried = unflushedStats.BrokerBufferQueried stats.BrokerBufferMessages = unflushedStats.BrokerBufferMessages @@ -652,35 +648,114 @@ func (hms *HybridMessageScanner) countLiveLogFiles(partition topic.Partition) (i // Based on MQ system analysis, control entries are: // 1. DataMessages with populated Ctrl field (publisher close signals) // 2. Entries with empty keys (as filtered by subscriber) -// 3. Entries with no data +// NOTE: Messages with empty data but valid keys (like NOOP messages) are NOT control entries func (hms *HybridMessageScanner) isControlEntry(logEntry *filer_pb.LogEntry) bool { - // Skip entries with no data - if len(logEntry.Data) == 0 { - return true + // Pre-decode DataMessage if needed + var dataMessage *mq_pb.DataMessage + if len(logEntry.Data) > 0 { + dataMessage = &mq_pb.DataMessage{} + if err := proto.Unmarshal(logEntry.Data, dataMessage); err != nil { + dataMessage = nil // Failed to decode, treat as raw data + } } + return hms.isControlEntryWithDecoded(logEntry, dataMessage) +} +// isControlEntryWithDecoded checks if a log entry is a control entry using pre-decoded DataMessage +// This avoids duplicate protobuf unmarshaling when the DataMessage is already decoded +func (hms *HybridMessageScanner) isControlEntryWithDecoded(logEntry *filer_pb.LogEntry, dataMessage *mq_pb.DataMessage) bool { // Skip entries with empty keys (same logic as subscriber) if len(logEntry.Key) == 0 { return true } // Check if this is a DataMessage with control field populated - dataMessage := &mq_pb.DataMessage{} - if err := proto.Unmarshal(logEntry.Data, dataMessage); err == nil { - // If it has a control field, it's a control message - if dataMessage.Ctrl != nil { - return true - } + if dataMessage != nil && dataMessage.Ctrl != nil { + return true } + // Messages with valid keys (even if data is empty) are legitimate messages + // Examples: NOOP messages from Schema Registry return false } +// isNullOrEmpty checks if a schema_pb.Value is null or empty +func isNullOrEmpty(value *schema_pb.Value) bool { + if value == nil { + return true + } + + switch v := value.Kind.(type) { + case *schema_pb.Value_StringValue: + return v.StringValue == "" + case *schema_pb.Value_BytesValue: + return len(v.BytesValue) == 0 + case *schema_pb.Value_ListValue: + return v.ListValue == nil || len(v.ListValue.Values) == 0 + case nil: + return true // No kind set means null + default: + return false + } +} + +// isSchemaless checks if the scanner is configured for a schema-less topic +// Schema-less topics only have system fields: _ts_ns, _key, and _value +func (hms *HybridMessageScanner) isSchemaless() bool { + // Schema-less topics only have system fields: _ts_ns, _key, and _value + // System topics like _schemas are NOT schema-less - they have structured data + // We just need to map their fields during read + + if hms.recordSchema == nil { + return false + } + + // Count only non-system data fields (exclude _ts_ns and _key which are always present) + // Schema-less topics should only have _value as the data field + hasValue := false + dataFieldCount := 0 + + for _, field := range hms.recordSchema.Fields { + switch field.Name { + case SW_COLUMN_NAME_TIMESTAMP, SW_COLUMN_NAME_KEY: + // System fields - ignore + continue + case SW_COLUMN_NAME_VALUE: + hasValue = true + dataFieldCount++ + default: + // Any other field means it's not schema-less + dataFieldCount++ + } + } + + // Schema-less = only has _value field as the data field (plus system fields) + return hasValue && dataFieldCount == 1 +} + // convertLogEntryToRecordValue converts a filer_pb.LogEntry to schema_pb.RecordValue // This handles both: // 1. Live log entries (raw message format) // 2. Parquet entries (already in schema_pb.RecordValue format) +// 3. Schema-less topics (raw bytes in _value field) func (hms *HybridMessageScanner) convertLogEntryToRecordValue(logEntry *filer_pb.LogEntry) (*schema_pb.RecordValue, string, error) { + // For schema-less topics, put raw data directly into _value field + if hms.isSchemaless() { + recordValue := &schema_pb.RecordValue{ + Fields: make(map[string]*schema_pb.Value), + } + recordValue.Fields[SW_COLUMN_NAME_TIMESTAMP] = &schema_pb.Value{ + Kind: &schema_pb.Value_Int64Value{Int64Value: logEntry.TsNs}, + } + recordValue.Fields[SW_COLUMN_NAME_KEY] = &schema_pb.Value{ + Kind: &schema_pb.Value_BytesValue{BytesValue: logEntry.Key}, + } + recordValue.Fields[SW_COLUMN_NAME_VALUE] = &schema_pb.Value{ + Kind: &schema_pb.Value_BytesValue{BytesValue: logEntry.Data}, + } + return recordValue, "live_log", nil + } + // Try to unmarshal as RecordValue first (Parquet format) recordValue := &schema_pb.RecordValue{} if err := proto.Unmarshal(logEntry.Data, recordValue); err == nil { @@ -705,6 +780,14 @@ func (hms *HybridMessageScanner) convertLogEntryToRecordValue(logEntry *filer_pb return hms.parseRawMessageWithSchema(logEntry) } +// min returns the minimum of two integers +func min(a, b int) int { + if a < b { + return a + } + return b +} + // parseRawMessageWithSchema parses raw live message data using the topic's schema // This provides proper type conversion and field mapping instead of treating everything as strings func (hms *HybridMessageScanner) parseRawMessageWithSchema(logEntry *filer_pb.LogEntry) (*schema_pb.RecordValue, string, error) { @@ -722,51 +805,136 @@ func (hms *HybridMessageScanner) parseRawMessageWithSchema(logEntry *filer_pb.Lo // Parse message data based on schema if hms.recordSchema == nil || len(hms.recordSchema.Fields) == 0 { - // Fallback: No schema available, treat as single "data" field - recordValue.Fields["data"] = &schema_pb.Value{ - Kind: &schema_pb.Value_StringValue{StringValue: string(logEntry.Data)}, + // Fallback: No schema available, use "_value" for schema-less topics only + if hms.isSchemaless() { + recordValue.Fields[SW_COLUMN_NAME_VALUE] = &schema_pb.Value{ + Kind: &schema_pb.Value_BytesValue{BytesValue: logEntry.Data}, + } } return recordValue, "live_log", nil } - // Attempt schema-aware parsing - // Strategy 1: Try JSON parsing first (most common for live messages) - if parsedRecord, err := hms.parseJSONMessage(logEntry.Data); err == nil { - // Successfully parsed as JSON, merge with system columns - for fieldName, fieldValue := range parsedRecord.Fields { - recordValue.Fields[fieldName] = fieldValue + // Use schema format to directly choose the right decoder + // This avoids trying multiple decoders and improves performance + var parsedRecord *schema_pb.RecordValue + var err error + + switch hms.schemaFormat { + case "AVRO": + // AVRO format - use Avro decoder + // Note: Avro decoding requires schema registry integration + // For now, fall through to JSON as many Avro messages are also valid JSON + parsedRecord, err = hms.parseJSONMessage(logEntry.Data) + case "PROTOBUF": + // PROTOBUF format - use protobuf decoder + parsedRecord, err = hms.parseProtobufMessage(logEntry.Data) + case "JSON_SCHEMA", "": + // JSON_SCHEMA format or empty (default to JSON) + // JSON is the most common format for schema registry + parsedRecord, err = hms.parseJSONMessage(logEntry.Data) + if err != nil { + // Try protobuf as fallback + parsedRecord, err = hms.parseProtobufMessage(logEntry.Data) + } + default: + // Unknown format - try JSON first, then protobuf as fallback + parsedRecord, err = hms.parseJSONMessage(logEntry.Data) + if err != nil { + parsedRecord, err = hms.parseProtobufMessage(logEntry.Data) } - return recordValue, "live_log", nil } - // Strategy 2: Try protobuf parsing (binary messages) - if parsedRecord, err := hms.parseProtobufMessage(logEntry.Data); err == nil { - // Successfully parsed as protobuf, merge with system columns + if err == nil && parsedRecord != nil { + // Successfully parsed, merge with system columns for fieldName, fieldValue := range parsedRecord.Fields { recordValue.Fields[fieldName] = fieldValue } return recordValue, "live_log", nil } - // Strategy 3: Fallback to single field with raw data - // If schema has a single field, map the raw data to it with type conversion + // Fallback: If schema has a single field, map the raw data to it with type conversion if len(hms.recordSchema.Fields) == 1 { field := hms.recordSchema.Fields[0] - convertedValue, err := hms.convertRawDataToSchemaValue(logEntry.Data, field.Type) - if err == nil { + convertedValue, convErr := hms.convertRawDataToSchemaValue(logEntry.Data, field.Type) + if convErr == nil { recordValue.Fields[field.Name] = convertedValue return recordValue, "live_log", nil } } - // Final fallback: treat as string data field - recordValue.Fields["data"] = &schema_pb.Value{ - Kind: &schema_pb.Value_StringValue{StringValue: string(logEntry.Data)}, + // Final fallback: treat as bytes field for schema-less topics only + if hms.isSchemaless() { + recordValue.Fields[SW_COLUMN_NAME_VALUE] = &schema_pb.Value{ + Kind: &schema_pb.Value_BytesValue{BytesValue: logEntry.Data}, + } } return recordValue, "live_log", nil } +// convertLogEntryToRecordValueWithDecoded converts a filer_pb.LogEntry to schema_pb.RecordValue +// using a pre-decoded DataMessage to avoid duplicate protobuf unmarshaling +func (hms *HybridMessageScanner) convertLogEntryToRecordValueWithDecoded(logEntry *filer_pb.LogEntry, dataMessage *mq_pb.DataMessage) (*schema_pb.RecordValue, string, error) { + // IMPORTANT: Check for schema-less topics FIRST + // Schema-less topics (like _schemas) should store raw data directly in _value field + if hms.isSchemaless() { + recordValue := &schema_pb.RecordValue{ + Fields: make(map[string]*schema_pb.Value), + } + recordValue.Fields[SW_COLUMN_NAME_TIMESTAMP] = &schema_pb.Value{ + Kind: &schema_pb.Value_Int64Value{Int64Value: logEntry.TsNs}, + } + recordValue.Fields[SW_COLUMN_NAME_KEY] = &schema_pb.Value{ + Kind: &schema_pb.Value_BytesValue{BytesValue: logEntry.Key}, + } + recordValue.Fields[SW_COLUMN_NAME_VALUE] = &schema_pb.Value{ + Kind: &schema_pb.Value_BytesValue{BytesValue: logEntry.Data}, + } + return recordValue, "live_log", nil + } + + // CRITICAL: The broker stores DataMessage.Value directly in LogEntry.Data + // So we need to try unmarshaling LogEntry.Data as RecordValue first + var recordValueBytes []byte + + if dataMessage != nil && len(dataMessage.Value) > 0 { + // DataMessage has a Value field - use it + recordValueBytes = dataMessage.Value + } else { + // DataMessage doesn't have Value, use LogEntry.Data directly + // This is the normal case when broker stores messages + recordValueBytes = logEntry.Data + } + + // Try to unmarshal as RecordValue + if len(recordValueBytes) > 0 { + recordValue := &schema_pb.RecordValue{} + if err := proto.Unmarshal(recordValueBytes, recordValue); err == nil { + // Successfully unmarshaled as RecordValue + + // Ensure Fields map exists + if recordValue.Fields == nil { + recordValue.Fields = make(map[string]*schema_pb.Value) + } + + // Add system columns from LogEntry + recordValue.Fields[SW_COLUMN_NAME_TIMESTAMP] = &schema_pb.Value{ + Kind: &schema_pb.Value_Int64Value{Int64Value: logEntry.TsNs}, + } + recordValue.Fields[SW_COLUMN_NAME_KEY] = &schema_pb.Value{ + Kind: &schema_pb.Value_BytesValue{BytesValue: logEntry.Key}, + } + + return recordValue, "live_log", nil + } + // If unmarshaling as RecordValue fails, fall back to schema-aware parsing + } + + // For cases where protobuf unmarshaling failed or data is empty, + // attempt schema-aware parsing to try JSON, protobuf, and other formats + return hms.parseRawMessageWithSchema(logEntry) +} + // parseJSONMessage attempts to parse raw data as JSON and map to schema fields func (hms *HybridMessageScanner) parseJSONMessage(data []byte) (*schema_pb.RecordValue, error) { // Try to parse as JSON @@ -950,6 +1118,11 @@ func (hms *HybridMessageScanner) ConvertToSQLResult(results []HybridScanResult, for columnName := range columnSet { columns = append(columns, columnName) } + + // If no data columns were found, include system columns so we have something to display + if len(columns) == 0 { + columns = []string{SW_DISPLAY_NAME_TIMESTAMP, SW_COLUMN_NAME_KEY} + } } // Convert to SQL rows @@ -1037,6 +1210,11 @@ func (hms *HybridMessageScanner) ConvertToSQLResultWithMixedColumns(results []Hy columns = append(columns, col) } + // If no data columns were found and no explicit columns specified, include system columns + if len(columns) == 0 { + columns = []string{SW_DISPLAY_NAME_TIMESTAMP, SW_COLUMN_NAME_KEY} + } + // Convert to SQL rows rows := make([][]sqltypes.Value, len(results)) for i, result := range results { @@ -1123,10 +1301,10 @@ func (h *HybridMessageScanner) extractParquetFileStats(entry *filer_pb.Entry, lo } // Populate optional min/max from filer extended attributes (writer stores ns timestamps) if entry != nil && entry.Extended != nil { - if minBytes, ok := entry.Extended["min"]; ok && len(minBytes) == 8 { + if minBytes, ok := entry.Extended[mq.ExtendedAttrTimestampMin]; ok && len(minBytes) == 8 { fileStats.MinTimestampNs = int64(binary.BigEndian.Uint64(minBytes)) } - if maxBytes, ok := entry.Extended["max"]; ok && len(maxBytes) == 8 { + if maxBytes, ok := entry.Extended[mq.ExtendedAttrTimestampMax]; ok && len(maxBytes) == 8 { fileStats.MaxTimestampNs = int64(binary.BigEndian.Uint64(maxBytes)) } } @@ -1538,13 +1716,22 @@ func (s *StreamingFlushedDataSource) startStreaming() { // Message processing function eachLogEntryFn := func(logEntry *filer_pb.LogEntry) (isDone bool, err error) { + // Pre-decode DataMessage for reuse in both control check and conversion + var dataMessage *mq_pb.DataMessage + if len(logEntry.Data) > 0 { + dataMessage = &mq_pb.DataMessage{} + if err := proto.Unmarshal(logEntry.Data, dataMessage); err != nil { + dataMessage = nil // Failed to decode, treat as raw data + } + } + // Skip control entries without actual data - if s.hms.isControlEntry(logEntry) { + if s.hms.isControlEntryWithDecoded(logEntry, dataMessage) { return false, nil // Skip this entry } // Convert log entry to schema_pb.RecordValue for consistent processing - recordValue, source, convertErr := s.hms.convertLogEntryToRecordValue(logEntry) + recordValue, source, convertErr := s.hms.convertLogEntryToRecordValueWithDecoded(logEntry, dataMessage) if convertErr != nil { return false, fmt.Errorf("failed to convert log entry: %v", convertErr) } diff --git a/weed/query/engine/mock_test.go b/weed/query/engine/mock_test.go index d00ec1761..697c98494 100644 --- a/weed/query/engine/mock_test.go +++ b/weed/query/engine/mock_test.go @@ -27,13 +27,16 @@ func TestMockBrokerClient_BasicFunctionality(t *testing.T) { } // Test GetTopicSchema - schema, err := mockBroker.GetTopicSchema(context.Background(), "default", "user_events") + schema, keyColumns, _, err := mockBroker.GetTopicSchema(context.Background(), "default", "user_events") if err != nil { t.Fatalf("Expected no error, got %v", err) } if len(schema.Fields) != 3 { t.Errorf("Expected 3 fields in user_events schema, got %d", len(schema.Fields)) } + if len(keyColumns) == 0 { + t.Error("Expected at least one key column") + } } func TestMockBrokerClient_FailureScenarios(t *testing.T) { @@ -53,7 +56,7 @@ func TestMockBrokerClient_FailureScenarios(t *testing.T) { t.Error("Expected error when mock is configured to fail") } - _, err = mockBroker.GetTopicSchema(context.Background(), "default", "user_events") + _, _, _, err = mockBroker.GetTopicSchema(context.Background(), "default", "user_events") if err == nil { t.Error("Expected error when mock is configured to fail") } @@ -81,7 +84,7 @@ func TestMockBrokerClient_TopicManagement(t *testing.T) { mockBroker := NewMockBrokerClient() // Test ConfigureTopic (add a new topic) - err := mockBroker.ConfigureTopic(context.Background(), "test", "new-topic", 1, nil) + err := mockBroker.ConfigureTopic(context.Background(), "test", "new-topic", 1, nil, []string{}) if err != nil { t.Fatalf("Expected no error, got %v", err) } diff --git a/weed/query/engine/mocks_test.go b/weed/query/engine/mocks_test.go index 733d99af7..2f72ed9ed 100644 --- a/weed/query/engine/mocks_test.go +++ b/weed/query/engine/mocks_test.go @@ -879,17 +879,51 @@ func (m *MockBrokerClient) ListTopics(ctx context.Context, namespace string) ([] return []string{}, nil } -// GetTopicSchema returns the mock schema for a topic -func (m *MockBrokerClient) GetTopicSchema(ctx context.Context, namespace, topic string) (*schema_pb.RecordType, error) { +// GetTopicSchema returns flat schema and key columns for a topic +func (m *MockBrokerClient) GetTopicSchema(ctx context.Context, namespace, topic string) (*schema_pb.RecordType, []string, string, error) { if m.shouldFail { - return nil, fmt.Errorf("mock broker failure: %s", m.failMessage) + return nil, nil, "", fmt.Errorf("mock broker failure: %s", m.failMessage) } key := fmt.Sprintf("%s.%s", namespace, topic) if schema, exists := m.schemas[key]; exists { - return schema, nil + // For testing, assume first field is key column + var keyColumns []string + if len(schema.Fields) > 0 { + keyColumns = []string{schema.Fields[0].Name} + } + return schema, keyColumns, "", nil // Schema format empty for mocks + } + return nil, nil, "", fmt.Errorf("topic %s not found", key) +} + +// ConfigureTopic creates or modifies a topic using flat schema format +func (m *MockBrokerClient) ConfigureTopic(ctx context.Context, namespace, topicName string, partitionCount int32, flatSchema *schema_pb.RecordType, keyColumns []string) error { + if m.shouldFail { + return fmt.Errorf("mock broker failure: %s", m.failMessage) + } + + // Store the schema for future retrieval + key := fmt.Sprintf("%s.%s", namespace, topicName) + m.schemas[key] = flatSchema + + // Add topic to namespace if it doesn't exist + if topics, exists := m.topics[namespace]; exists { + found := false + for _, t := range topics { + if t == topicName { + found = true + break + } + } + if !found { + m.topics[namespace] = append(topics, topicName) + } + } else { + m.topics[namespace] = []string{topicName} } - return nil, fmt.Errorf("topic %s not found", key) + + return nil } // GetFilerClient returns a mock filer client @@ -960,31 +994,6 @@ func (t *TestHybridMessageScanner) ScanMessages(ctx context.Context, options Hyb return generateSampleHybridData(t.topicName, options), nil } -// ConfigureTopic creates or updates a topic configuration (mock implementation) -func (m *MockBrokerClient) ConfigureTopic(ctx context.Context, namespace, topicName string, partitionCount int32, recordType *schema_pb.RecordType) error { - if m.shouldFail { - return fmt.Errorf("mock broker failure: %s", m.failMessage) - } - - // Store the schema in our mock data - key := fmt.Sprintf("%s.%s", namespace, topicName) - m.schemas[key] = recordType - - // Add to topics list if not already present - if topics, exists := m.topics[namespace]; exists { - for _, topic := range topics { - if topic == topicName { - return nil // Already exists - } - } - m.topics[namespace] = append(topics, topicName) - } else { - m.topics[namespace] = []string{topicName} - } - - return nil -} - // DeleteTopic removes a topic and all its data (mock implementation) func (m *MockBrokerClient) DeleteTopic(ctx context.Context, namespace, topicName string) error { if m.shouldFail { diff --git a/weed/query/engine/parquet_scanner.go b/weed/query/engine/parquet_scanner.go index 113cd814a..e4b5252c7 100644 --- a/weed/query/engine/parquet_scanner.go +++ b/weed/query/engine/parquet_scanner.go @@ -21,7 +21,7 @@ import ( // Assumptions: // 1. All MQ messages are stored in Parquet format in topic partitions // 2. Each partition directory contains dated Parquet files -// 3. System columns (_timestamp_ns, _key) are added to user schema +// 3. System columns (_ts_ns, _key) are added to user schema // 4. Predicate pushdown is used for efficient scanning type ParquetScanner struct { filerClient filer_pb.FilerClient @@ -55,17 +55,28 @@ func NewParquetScanner(filerClient filer_pb.FilerClient, namespace, topicName st return nil, fmt.Errorf("failed to read topic config: %v", err) } - // Build complete schema with system columns - recordType := topicConf.GetRecordType() - if recordType == nil { - return nil, NoSchemaError{Namespace: namespace, Topic: topicName} + // Build complete schema with system columns - prefer flat schema if available + var recordType *schema_pb.RecordType + + if topicConf.GetMessageRecordType() != nil { + // New flat schema format - use directly + recordType = topicConf.GetMessageRecordType() } - // Add system columns that MQ adds to all records - recordType = schema.NewRecordTypeBuilder(recordType). - WithField(SW_COLUMN_NAME_TIMESTAMP, schema.TypeInt64). - WithField(SW_COLUMN_NAME_KEY, schema.TypeBytes). - RecordTypeEnd() + if recordType == nil || len(recordType.Fields) == 0 { + // For topics without schema, create a minimal schema with system fields and _value + recordType = schema.RecordTypeBegin(). + WithField(SW_COLUMN_NAME_TIMESTAMP, schema.TypeInt64). + WithField(SW_COLUMN_NAME_KEY, schema.TypeBytes). + WithField(SW_COLUMN_NAME_VALUE, schema.TypeBytes). // Raw message value + RecordTypeEnd() + } else { + // Add system columns that MQ adds to all records + recordType = schema.NewRecordTypeBuilder(recordType). + WithField(SW_COLUMN_NAME_TIMESTAMP, schema.TypeInt64). + WithField(SW_COLUMN_NAME_KEY, schema.TypeBytes). + RecordTypeEnd() + } // Convert to Parquet levels for efficient reading parquetLevels, err := schema.ToParquetLevels(recordType) diff --git a/weed/query/engine/parsing_debug_test.go b/weed/query/engine/parsing_debug_test.go index 3fa9be17b..6177b0aa6 100644 --- a/weed/query/engine/parsing_debug_test.go +++ b/weed/query/engine/parsing_debug_test.go @@ -36,7 +36,7 @@ func TestBasicParsing(t *testing.T) { if selectStmt.Where != nil { t.Logf(" WHERE expression type: %T", selectStmt.Where.Expr) } else { - t.Logf(" ❌ WHERE clause is NIL - this is the bug!") + t.Logf(" WHERE clause is NIL - this is the bug!") } } else { t.Errorf("Expected SelectStatement, got %T", stmt) @@ -62,10 +62,10 @@ func TestCockroachParserDirectly(t *testing.T) { if selectStmt, ok := stmt.(*SelectStatement); ok { if selectStmt.Where == nil { - t.Errorf("❌ Our ParseSQL is not extracting WHERE clauses!") + t.Errorf("Our ParseSQL is not extracting WHERE clauses!") t.Errorf("This means the issue is in our CockroachDB AST conversion") } else { - t.Logf("✅ Our ParseSQL extracted WHERE clause: %T", selectStmt.Where.Expr) + t.Logf("Our ParseSQL extracted WHERE clause: %T", selectStmt.Where.Expr) } } } diff --git a/weed/query/engine/postgresql_only_test.go b/weed/query/engine/postgresql_only_test.go index d98cab9f0..d40e81b11 100644 --- a/weed/query/engine/postgresql_only_test.go +++ b/weed/query/engine/postgresql_only_test.go @@ -67,7 +67,7 @@ func TestPostgreSQLOnlySupport(t *testing.T) { if tc.shouldError { // We expect this query to fail if err == nil && result.Error == nil { - t.Errorf("❌ Expected error for %s, but query succeeded", tc.desc) + t.Errorf("Expected error for %s, but query succeeded", tc.desc) return } @@ -81,7 +81,7 @@ func TestPostgreSQLOnlySupport(t *testing.T) { } if !strings.Contains(errorText, tc.errorMsg) { - t.Errorf("❌ Expected error containing '%s', got: %s", tc.errorMsg, errorText) + t.Errorf("Expected error containing '%s', got: %s", tc.errorMsg, errorText) return } } diff --git a/weed/query/engine/sql_alias_support_test.go b/weed/query/engine/sql_alias_support_test.go index a081d7183..dbe91f821 100644 --- a/weed/query/engine/sql_alias_support_test.go +++ b/weed/query/engine/sql_alias_support_test.go @@ -17,7 +17,7 @@ func TestSQLAliasResolution(t *testing.T) { // Create SELECT expressions with aliases selectExprs := []SelectExpr{ &AliasedExpr{ - Expr: &ColName{Name: stringValue("_timestamp_ns")}, + Expr: &ColName{Name: stringValue("_ts_ns")}, As: aliasValue("ts"), }, &AliasedExpr{ @@ -28,7 +28,7 @@ func TestSQLAliasResolution(t *testing.T) { // Test alias resolution resolved := engine.resolveColumnAlias("ts", selectExprs) - assert.Equal(t, "_timestamp_ns", resolved, "Should resolve 'ts' alias to '_timestamp_ns'") + assert.Equal(t, "_ts_ns", resolved, "Should resolve 'ts' alias to '_ts_ns'") resolved = engine.resolveColumnAlias("record_id", selectExprs) assert.Equal(t, "id", resolved, "Should resolve 'record_id' alias to 'id'") @@ -42,13 +42,13 @@ func TestSQLAliasResolution(t *testing.T) { // Test using a single alias in WHERE clause testRecord := &schema_pb.RecordValue{ Fields: map[string]*schema_pb.Value{ - "_timestamp_ns": {Kind: &schema_pb.Value_Int64Value{Int64Value: 1756947416566456262}}, - "id": {Kind: &schema_pb.Value_Int64Value{Int64Value: 12345}}, + "_ts_ns": {Kind: &schema_pb.Value_Int64Value{Int64Value: 1756947416566456262}}, + "id": {Kind: &schema_pb.Value_Int64Value{Int64Value: 12345}}, }, } // Parse SQL with alias in WHERE - sql := "SELECT _timestamp_ns AS ts, id FROM test WHERE ts = 1756947416566456262" + sql := "SELECT _ts_ns AS ts, id FROM test WHERE ts = 1756947416566456262" stmt, err := ParseSQL(sql) assert.NoError(t, err, "Should parse SQL with alias in WHERE") @@ -60,10 +60,10 @@ func TestSQLAliasResolution(t *testing.T) { // Test the predicate result := predicate(testRecord) - assert.True(t, result, "Predicate should match using alias 'ts' for '_timestamp_ns'") + assert.True(t, result, "Predicate should match using alias 'ts' for '_ts_ns'") // Test with non-matching value - sql2 := "SELECT _timestamp_ns AS ts, id FROM test WHERE ts = 999999" + sql2 := "SELECT _ts_ns AS ts, id FROM test WHERE ts = 999999" stmt2, err := ParseSQL(sql2) assert.NoError(t, err) selectStmt2 := stmt2.(*SelectStatement) @@ -79,13 +79,13 @@ func TestSQLAliasResolution(t *testing.T) { // Test using multiple aliases in WHERE clause testRecord := &schema_pb.RecordValue{ Fields: map[string]*schema_pb.Value{ - "_timestamp_ns": {Kind: &schema_pb.Value_Int64Value{Int64Value: 1756947416566456262}}, - "id": {Kind: &schema_pb.Value_Int64Value{Int64Value: 82460}}, + "_ts_ns": {Kind: &schema_pb.Value_Int64Value{Int64Value: 1756947416566456262}}, + "id": {Kind: &schema_pb.Value_Int64Value{Int64Value: 82460}}, }, } // Parse SQL with multiple aliases in WHERE - sql := "SELECT _timestamp_ns AS ts, id AS record_id FROM test WHERE ts = 1756947416566456262 AND record_id = 82460" + sql := "SELECT _ts_ns AS ts, id AS record_id FROM test WHERE ts = 1756947416566456262 AND record_id = 82460" stmt, err := ParseSQL(sql) assert.NoError(t, err, "Should parse SQL with multiple aliases") @@ -102,8 +102,8 @@ func TestSQLAliasResolution(t *testing.T) { // Test with one condition not matching testRecord2 := &schema_pb.RecordValue{ Fields: map[string]*schema_pb.Value{ - "_timestamp_ns": {Kind: &schema_pb.Value_Int64Value{Int64Value: 1756947416566456262}}, - "id": {Kind: &schema_pb.Value_Int64Value{Int64Value: 99999}}, // Different ID + "_ts_ns": {Kind: &schema_pb.Value_Int64Value{Int64Value: 1756947416566456262}}, + "id": {Kind: &schema_pb.Value_Int64Value{Int64Value: 99999}}, // Different ID }, } @@ -116,23 +116,23 @@ func TestSQLAliasResolution(t *testing.T) { testRecords := []*schema_pb.RecordValue{ { Fields: map[string]*schema_pb.Value{ - "_timestamp_ns": {Kind: &schema_pb.Value_Int64Value{Int64Value: 1756947416566456260}}, // Below range + "_ts_ns": {Kind: &schema_pb.Value_Int64Value{Int64Value: 1756947416566456260}}, // Below range }, }, { Fields: map[string]*schema_pb.Value{ - "_timestamp_ns": {Kind: &schema_pb.Value_Int64Value{Int64Value: 1756947416566456262}}, // In range + "_ts_ns": {Kind: &schema_pb.Value_Int64Value{Int64Value: 1756947416566456262}}, // In range }, }, { Fields: map[string]*schema_pb.Value{ - "_timestamp_ns": {Kind: &schema_pb.Value_Int64Value{Int64Value: 1756947416566456265}}, // Above range + "_ts_ns": {Kind: &schema_pb.Value_Int64Value{Int64Value: 1756947416566456265}}, // Above range }, }, } // Test range query with alias - sql := "SELECT _timestamp_ns AS ts FROM test WHERE ts > 1756947416566456261 AND ts < 1756947416566456264" + sql := "SELECT _ts_ns AS ts FROM test WHERE ts > 1756947416566456261 AND ts < 1756947416566456264" stmt, err := ParseSQL(sql) assert.NoError(t, err, "Should parse range query with alias") @@ -150,14 +150,14 @@ func TestSQLAliasResolution(t *testing.T) { // Test mixing aliased and non-aliased columns in WHERE testRecord := &schema_pb.RecordValue{ Fields: map[string]*schema_pb.Value{ - "_timestamp_ns": {Kind: &schema_pb.Value_Int64Value{Int64Value: 1756947416566456262}}, - "id": {Kind: &schema_pb.Value_Int64Value{Int64Value: 82460}}, - "status": {Kind: &schema_pb.Value_StringValue{StringValue: "active"}}, + "_ts_ns": {Kind: &schema_pb.Value_Int64Value{Int64Value: 1756947416566456262}}, + "id": {Kind: &schema_pb.Value_Int64Value{Int64Value: 82460}}, + "status": {Kind: &schema_pb.Value_StringValue{StringValue: "active"}}, }, } // Use alias for one column, direct name for another - sql := "SELECT _timestamp_ns AS ts, id, status FROM test WHERE ts = 1756947416566456262 AND status = 'active'" + sql := "SELECT _ts_ns AS ts, id, status FROM test WHERE ts = 1756947416566456262 AND status = 'active'" stmt, err := ParseSQL(sql) assert.NoError(t, err, "Should parse mixed alias/direct query") @@ -175,13 +175,13 @@ func TestSQLAliasResolution(t *testing.T) { testRecord := &schema_pb.RecordValue{ Fields: map[string]*schema_pb.Value{ - "_timestamp_ns": {Kind: &schema_pb.Value_Int64Value{Int64Value: largeTimestamp}}, - "id": {Kind: &schema_pb.Value_Int64Value{Int64Value: 897795}}, + "_ts_ns": {Kind: &schema_pb.Value_Int64Value{Int64Value: largeTimestamp}}, + "id": {Kind: &schema_pb.Value_Int64Value{Int64Value: 897795}}, }, } // Test that large timestamp precision is maintained with aliases - sql := "SELECT _timestamp_ns AS ts, id FROM test WHERE ts = 1756947416566456262" + sql := "SELECT _ts_ns AS ts, id FROM test WHERE ts = 1756947416566456262" stmt, err := ParseSQL(sql) assert.NoError(t, err) @@ -193,7 +193,7 @@ func TestSQLAliasResolution(t *testing.T) { assert.True(t, result, "Large timestamp precision should be maintained with aliases") // Test precision with off-by-one (should not match) - sql2 := "SELECT _timestamp_ns AS ts, id FROM test WHERE ts = 1756947416566456263" // +1 + sql2 := "SELECT _ts_ns AS ts, id FROM test WHERE ts = 1756947416566456263" // +1 stmt2, err := ParseSQL(sql2) assert.NoError(t, err) selectStmt2 := stmt2.(*SelectStatement) @@ -229,7 +229,7 @@ func TestSQLAliasResolution(t *testing.T) { // Test all comparison operators work with aliases testRecord := &schema_pb.RecordValue{ Fields: map[string]*schema_pb.Value{ - "_timestamp_ns": {Kind: &schema_pb.Value_Int64Value{Int64Value: 1000}}, + "_ts_ns": {Kind: &schema_pb.Value_Int64Value{Int64Value: 1000}}, }, } @@ -252,7 +252,7 @@ func TestSQLAliasResolution(t *testing.T) { for _, test := range operators { t.Run(test.op+"_"+test.value, func(t *testing.T) { - sql := "SELECT _timestamp_ns AS ts FROM test WHERE ts " + test.op + " " + test.value + sql := "SELECT _ts_ns AS ts FROM test WHERE ts " + test.op + " " + test.value stmt, err := ParseSQL(sql) assert.NoError(t, err, "Should parse operator: %s", test.op) @@ -270,13 +270,13 @@ func TestSQLAliasResolution(t *testing.T) { // Ensure non-alias queries still work exactly as before testRecord := &schema_pb.RecordValue{ Fields: map[string]*schema_pb.Value{ - "_timestamp_ns": {Kind: &schema_pb.Value_Int64Value{Int64Value: 1756947416566456262}}, - "id": {Kind: &schema_pb.Value_Int64Value{Int64Value: 12345}}, + "_ts_ns": {Kind: &schema_pb.Value_Int64Value{Int64Value: 1756947416566456262}}, + "id": {Kind: &schema_pb.Value_Int64Value{Int64Value: 12345}}, }, } // Test traditional query (no aliases) - sql := "SELECT _timestamp_ns, id FROM test WHERE _timestamp_ns = 1756947416566456262" + sql := "SELECT _ts_ns, id FROM test WHERE _ts_ns = 1756947416566456262" stmt, err := ParseSQL(sql) assert.NoError(t, err) @@ -307,13 +307,13 @@ func TestAliasIntegrationWithProductionScenarios(t *testing.T) { // Test the exact query pattern that was originally failing testRecord := &schema_pb.RecordValue{ Fields: map[string]*schema_pb.Value{ - "_timestamp_ns": {Kind: &schema_pb.Value_Int64Value{Int64Value: 1756913789829292386}}, - "id": {Kind: &schema_pb.Value_Int64Value{Int64Value: 82460}}, + "_ts_ns": {Kind: &schema_pb.Value_Int64Value{Int64Value: 1756913789829292386}}, + "id": {Kind: &schema_pb.Value_Int64Value{Int64Value: 82460}}, }, } // This was the original failing pattern - sql := "SELECT id, _timestamp_ns AS ts FROM ecommerce.user_events WHERE ts = 1756913789829292386" + sql := "SELECT id, _ts_ns AS ts FROM ecommerce.user_events WHERE ts = 1756913789829292386" stmt, err := ParseSQL(sql) assert.NoError(t, err, "Should parse the originally failing query pattern") @@ -329,16 +329,16 @@ func TestAliasIntegrationWithProductionScenarios(t *testing.T) { // Test a more complex production-like query testRecord := &schema_pb.RecordValue{ Fields: map[string]*schema_pb.Value{ - "_timestamp_ns": {Kind: &schema_pb.Value_Int64Value{Int64Value: 1756947416566456262}}, - "id": {Kind: &schema_pb.Value_Int64Value{Int64Value: 897795}}, - "user_id": {Kind: &schema_pb.Value_StringValue{StringValue: "user123"}}, - "event_type": {Kind: &schema_pb.Value_StringValue{StringValue: "click"}}, + "_ts_ns": {Kind: &schema_pb.Value_Int64Value{Int64Value: 1756947416566456262}}, + "id": {Kind: &schema_pb.Value_Int64Value{Int64Value: 897795}}, + "user_id": {Kind: &schema_pb.Value_StringValue{StringValue: "user123"}}, + "event_type": {Kind: &schema_pb.Value_StringValue{StringValue: "click"}}, }, } sql := `SELECT id AS event_id, - _timestamp_ns AS event_time, + _ts_ns AS event_time, user_id AS uid, event_type AS action FROM ecommerce.user_events @@ -359,10 +359,10 @@ func TestAliasIntegrationWithProductionScenarios(t *testing.T) { // Test partial match failure testRecord2 := &schema_pb.RecordValue{ Fields: map[string]*schema_pb.Value{ - "_timestamp_ns": {Kind: &schema_pb.Value_Int64Value{Int64Value: 1756947416566456262}}, - "id": {Kind: &schema_pb.Value_Int64Value{Int64Value: 897795}}, - "user_id": {Kind: &schema_pb.Value_StringValue{StringValue: "user999"}}, // Different user - "event_type": {Kind: &schema_pb.Value_StringValue{StringValue: "click"}}, + "_ts_ns": {Kind: &schema_pb.Value_Int64Value{Int64Value: 1756947416566456262}}, + "id": {Kind: &schema_pb.Value_Int64Value{Int64Value: 897795}}, + "user_id": {Kind: &schema_pb.Value_StringValue{StringValue: "user999"}}, // Different user + "event_type": {Kind: &schema_pb.Value_StringValue{StringValue: "click"}}, }, } @@ -374,13 +374,13 @@ func TestAliasIntegrationWithProductionScenarios(t *testing.T) { // Ensure alias resolution doesn't significantly impact performance testRecord := &schema_pb.RecordValue{ Fields: map[string]*schema_pb.Value{ - "_timestamp_ns": {Kind: &schema_pb.Value_Int64Value{Int64Value: 1756947416566456262}}, + "_ts_ns": {Kind: &schema_pb.Value_Int64Value{Int64Value: 1756947416566456262}}, }, } // Build predicates for comparison - sqlWithAlias := "SELECT _timestamp_ns AS ts FROM test WHERE ts = 1756947416566456262" - sqlWithoutAlias := "SELECT _timestamp_ns FROM test WHERE _timestamp_ns = 1756947416566456262" + sqlWithAlias := "SELECT _ts_ns AS ts FROM test WHERE ts = 1756947416566456262" + sqlWithoutAlias := "SELECT _ts_ns FROM test WHERE _ts_ns = 1756947416566456262" stmtWithAlias, err := ParseSQL(sqlWithAlias) assert.NoError(t, err) diff --git a/weed/query/engine/sql_feature_diagnostic_test.go b/weed/query/engine/sql_feature_diagnostic_test.go index bbe775615..f578539fc 100644 --- a/weed/query/engine/sql_feature_diagnostic_test.go +++ b/weed/query/engine/sql_feature_diagnostic_test.go @@ -109,12 +109,12 @@ func TestSQLFeatureDiagnostic(t *testing.T) { // Summary t.Log("\n" + strings.Repeat("=", 80)) t.Log("FEATURE SUMMARY:") - t.Log(" ✅ LIMIT: FULLY WORKING - Correctly limits result rows") - t.Log(" ✅ OFFSET: FULLY WORKING - Correctly skips rows") - t.Log(" ✅ WHERE: FULLY WORKING - All comparison operators working") - t.Log(" ✅ SELECT: WORKING - Supports *, columns, functions, arithmetic") - t.Log(" ✅ Functions: WORKING - String and datetime functions work") - t.Log(" ✅ Arithmetic: WORKING - +, -, *, / operations work") + t.Log(" LIMIT: FULLY WORKING - Correctly limits result rows") + t.Log(" OFFSET: FULLY WORKING - Correctly skips rows") + t.Log(" WHERE: FULLY WORKING - All comparison operators working") + t.Log(" SELECT: WORKING - Supports *, columns, functions, arithmetic") + t.Log(" Functions: WORKING - String and datetime functions work") + t.Log(" Arithmetic: WORKING - +, -, *, / operations work") t.Log(strings.Repeat("=", 80)) } @@ -144,12 +144,12 @@ func TestSQLWhereClauseIssue(t *testing.T) { t.Logf("WHERE id = %s returned %d rows", firstId, actualCount) if actualCount == allCount { - t.Log("❌ CONFIRMED: WHERE clause is completely ignored") + t.Log("CONFIRMED: WHERE clause is completely ignored") t.Log(" - Query parsed successfully") t.Log(" - No errors returned") t.Log(" - But filtering logic not implemented in execution") } else if actualCount == 1 { - t.Log("✅ WHERE clause working correctly") + t.Log("WHERE clause working correctly") } else { t.Logf("❓ Unexpected result: got %d rows instead of 1 or %d", actualCount, allCount) } @@ -162,8 +162,8 @@ func TestSQLWhereClauseIssue(t *testing.T) { t.Logf("WHERE 1 = 0 returned %d rows", impossibleCount) if impossibleCount == allCount { - t.Log("❌ CONFIRMED: Even impossible WHERE conditions are ignored") + t.Log("CONFIRMED: Even impossible WHERE conditions are ignored") } else if impossibleCount == 0 { - t.Log("✅ Impossible WHERE condition correctly returns no rows") + t.Log("Impossible WHERE condition correctly returns no rows") } } diff --git a/weed/query/engine/string_concatenation_test.go b/weed/query/engine/string_concatenation_test.go index c4843bef6..a2f869c10 100644 --- a/weed/query/engine/string_concatenation_test.go +++ b/weed/query/engine/string_concatenation_test.go @@ -177,7 +177,7 @@ func TestSQLEngine_StringConcatenationBugReproduction(t *testing.T) { } } - t.Logf("✅ SUCCESS: Complex string concatenation works correctly!") + t.Logf("SUCCESS: Complex string concatenation works correctly!") t.Logf("Query: %s", query) for i, row := range result.Rows { diff --git a/weed/query/engine/string_literal_function_test.go b/weed/query/engine/string_literal_function_test.go index 828d8c9ed..787c86c08 100644 --- a/weed/query/engine/string_literal_function_test.go +++ b/weed/query/engine/string_literal_function_test.go @@ -183,7 +183,7 @@ func TestSQLEngine_StringFunctionErrorHandling(t *testing.T) { t.Fatalf("UPPER function should work, got query error: %v", result.Error) } - t.Logf("✅ UPPER function works correctly") + t.Logf("UPPER function works correctly") // This should now work (previously would error as "unsupported aggregation function") result2, err2 := engine.ExecuteSQL(context.Background(), "SELECT LENGTH(action) FROM user_events LIMIT 1") @@ -194,5 +194,5 @@ func TestSQLEngine_StringFunctionErrorHandling(t *testing.T) { t.Fatalf("LENGTH function should work, got query error: %v", result2.Error) } - t.Logf("✅ LENGTH function works correctly") + t.Logf("LENGTH function works correctly") } diff --git a/weed/query/engine/system_columns.go b/weed/query/engine/system_columns.go index 12757d4eb..a982416ed 100644 --- a/weed/query/engine/system_columns.go +++ b/weed/query/engine/system_columns.go @@ -9,18 +9,19 @@ import ( // System column constants used throughout the SQL engine const ( - SW_COLUMN_NAME_TIMESTAMP = "_timestamp_ns" // Message timestamp in nanoseconds (internal) - SW_COLUMN_NAME_KEY = "_key" // Message key - SW_COLUMN_NAME_SOURCE = "_source" // Data source (live_log, parquet_archive, etc.) + SW_COLUMN_NAME_TIMESTAMP = "_ts_ns" // Message timestamp in nanoseconds (internal) + SW_COLUMN_NAME_KEY = "_key" // Message key + SW_COLUMN_NAME_SOURCE = "_source" // Data source (live_log, parquet_archive, etc.) + SW_COLUMN_NAME_VALUE = "_value" // Raw message value (for schema-less topics) ) // System column display names (what users see) const ( SW_DISPLAY_NAME_TIMESTAMP = "_ts" // User-facing timestamp column name - // Note: _key and _source keep the same names, only _timestamp_ns changes to _ts + // Note: _key and _source keep the same names, only _ts_ns changes to _ts ) -// isSystemColumn checks if a column is a system column (_timestamp_ns, _key, _source) +// isSystemColumn checks if a column is a system column (_ts_ns, _key, _source) func (e *SQLEngine) isSystemColumn(columnName string) bool { lowerName := strings.ToLower(columnName) return lowerName == SW_COLUMN_NAME_TIMESTAMP || @@ -91,7 +92,7 @@ func (e *SQLEngine) getSystemColumnGlobalMin(columnName string, allFileStats map switch lowerName { case SW_COLUMN_NAME_TIMESTAMP: // For timestamps, find the earliest timestamp across all files - // This should match what's in the Extended["min"] metadata + // This should match what's in the Extended[mq.ExtendedAttrTimestampMin] metadata var minTimestamp *int64 for _, fileStats := range allFileStats { for _, fileStat := range fileStats { @@ -128,7 +129,7 @@ func (e *SQLEngine) getSystemColumnGlobalMax(columnName string, allFileStats map switch lowerName { case SW_COLUMN_NAME_TIMESTAMP: // For timestamps, find the latest timestamp across all files - // This should match what's in the Extended["max"] metadata + // This should match what's in the Extended[mq.ExtendedAttrTimestampMax] metadata var maxTimestamp *int64 for _, fileStats := range allFileStats { for _, fileStat := range fileStats { diff --git a/weed/query/engine/timestamp_integration_test.go b/weed/query/engine/timestamp_integration_test.go index 2f53e6d6e..cb156103c 100644 --- a/weed/query/engine/timestamp_integration_test.go +++ b/weed/query/engine/timestamp_integration_test.go @@ -29,13 +29,13 @@ func TestTimestampIntegrationScenarios(t *testing.T) { // Create a test record record := &schema_pb.RecordValue{ Fields: map[string]*schema_pb.Value{ - "_timestamp_ns": {Kind: &schema_pb.Value_Int64Value{Int64Value: ts.timestamp}}, - "id": {Kind: &schema_pb.Value_Int64Value{Int64Value: ts.id}}, + "_ts_ns": {Kind: &schema_pb.Value_Int64Value{Int64Value: ts.timestamp}}, + "id": {Kind: &schema_pb.Value_Int64Value{Int64Value: ts.id}}, }, } // Build SQL query - sql := "SELECT id, _timestamp_ns FROM test WHERE _timestamp_ns = " + strconv.FormatInt(ts.timestamp, 10) + sql := "SELECT id, _ts_ns FROM test WHERE _ts_ns = " + strconv.FormatInt(ts.timestamp, 10) stmt, err := ParseSQL(sql) assert.NoError(t, err) @@ -57,8 +57,8 @@ func TestTimestampIntegrationScenarios(t *testing.T) { // Test that close but different timestamps don't match closeRecord := &schema_pb.RecordValue{ Fields: map[string]*schema_pb.Value{ - "_timestamp_ns": {Kind: &schema_pb.Value_Int64Value{Int64Value: ts.timestamp + 1}}, - "id": {Kind: &schema_pb.Value_Int64Value{Int64Value: ts.id}}, + "_ts_ns": {Kind: &schema_pb.Value_Int64Value{Int64Value: ts.timestamp + 1}}, + "id": {Kind: &schema_pb.Value_Int64Value{Int64Value: ts.id}}, }, } result = predicate(closeRecord) @@ -76,17 +76,17 @@ func TestTimestampIntegrationScenarios(t *testing.T) { }{ { name: "RangeWithDifferentBounds", - sql: "SELECT * FROM test WHERE _timestamp_ns >= 1756913789829292386 AND _timestamp_ns <= 1756947416566456262", + sql: "SELECT * FROM test WHERE _ts_ns >= 1756913789829292386 AND _ts_ns <= 1756947416566456262", shouldSet: struct{ start, stop bool }{true, true}, }, { name: "RangeWithSameBounds", - sql: "SELECT * FROM test WHERE _timestamp_ns >= 1756913789829292386 AND _timestamp_ns <= 1756913789829292386", + sql: "SELECT * FROM test WHERE _ts_ns >= 1756913789829292386 AND _ts_ns <= 1756913789829292386", shouldSet: struct{ start, stop bool }{true, false}, // Fix #4: equal bounds should not set stop }, { name: "OpenEndedRange", - sql: "SELECT * FROM test WHERE _timestamp_ns >= 1756913789829292386", + sql: "SELECT * FROM test WHERE _ts_ns >= 1756913789829292386", shouldSet: struct{ start, stop bool }{true, false}, }, } @@ -117,8 +117,8 @@ func TestTimestampIntegrationScenarios(t *testing.T) { t.Run("ProductionScenarioReproduction", func(t *testing.T) { // This test reproduces the exact production scenario that was failing - // Original failing query: WHERE _timestamp_ns = 1756947416566456262 - sql := "SELECT id, _timestamp_ns FROM ecommerce.user_events WHERE _timestamp_ns = 1756947416566456262" + // Original failing query: WHERE _ts_ns = 1756947416566456262 + sql := "SELECT id, _ts_ns FROM ecommerce.user_events WHERE _ts_ns = 1756947416566456262" stmt, err := ParseSQL(sql) assert.NoError(t, err, "Should parse the production query that was failing") @@ -136,8 +136,8 @@ func TestTimestampIntegrationScenarios(t *testing.T) { // Test with the actual record that exists in production productionRecord := &schema_pb.RecordValue{ Fields: map[string]*schema_pb.Value{ - "_timestamp_ns": {Kind: &schema_pb.Value_Int64Value{Int64Value: 1756947416566456262}}, - "id": {Kind: &schema_pb.Value_Int64Value{Int64Value: 897795}}, + "_ts_ns": {Kind: &schema_pb.Value_Int64Value{Int64Value: 1756947416566456262}}, + "id": {Kind: &schema_pb.Value_Int64Value{Int64Value: 897795}}, }, } @@ -147,8 +147,8 @@ func TestTimestampIntegrationScenarios(t *testing.T) { // Verify precision - test that a timestamp differing by just 1 nanosecond doesn't match slightlyDifferentRecord := &schema_pb.RecordValue{ Fields: map[string]*schema_pb.Value{ - "_timestamp_ns": {Kind: &schema_pb.Value_Int64Value{Int64Value: 1756947416566456263}}, - "id": {Kind: &schema_pb.Value_Int64Value{Int64Value: 897795}}, + "_ts_ns": {Kind: &schema_pb.Value_Int64Value{Int64Value: 1756947416566456263}}, + "id": {Kind: &schema_pb.Value_Int64Value{Int64Value: 897795}}, }, } @@ -167,11 +167,11 @@ func TestRegressionPrevention(t *testing.T) { record := &schema_pb.RecordValue{ Fields: map[string]*schema_pb.Value{ - "_timestamp_ns": {Kind: &schema_pb.Value_Int64Value{Int64Value: smallTimestamp}}, + "_ts_ns": {Kind: &schema_pb.Value_Int64Value{Int64Value: smallTimestamp}}, }, } - result := engine.valuesEqual(record.Fields["_timestamp_ns"], smallTimestamp) + result := engine.valuesEqual(record.Fields["_ts_ns"], smallTimestamp) assert.True(t, result, "Small timestamps should continue to work") }) diff --git a/weed/query/engine/timestamp_query_fixes_test.go b/weed/query/engine/timestamp_query_fixes_test.go index 633738a00..2f5f08cbd 100644 --- a/weed/query/engine/timestamp_query_fixes_test.go +++ b/weed/query/engine/timestamp_query_fixes_test.go @@ -21,31 +21,31 @@ func TestTimestampQueryFixes(t *testing.T) { // Test that large int64 timestamps don't lose precision in comparisons testRecord := &schema_pb.RecordValue{ Fields: map[string]*schema_pb.Value{ - "_timestamp_ns": {Kind: &schema_pb.Value_Int64Value{Int64Value: largeTimestamp1}}, - "id": {Kind: &schema_pb.Value_Int64Value{Int64Value: 12345}}, + "_ts_ns": {Kind: &schema_pb.Value_Int64Value{Int64Value: largeTimestamp1}}, + "id": {Kind: &schema_pb.Value_Int64Value{Int64Value: 12345}}, }, } // Test equality comparison - result := engine.valuesEqual(testRecord.Fields["_timestamp_ns"], largeTimestamp1) + result := engine.valuesEqual(testRecord.Fields["_ts_ns"], largeTimestamp1) assert.True(t, result, "Large timestamp equality should work without precision loss") // Test inequality comparison - result = engine.valuesEqual(testRecord.Fields["_timestamp_ns"], largeTimestamp1+1) + result = engine.valuesEqual(testRecord.Fields["_ts_ns"], largeTimestamp1+1) assert.False(t, result, "Large timestamp inequality should be detected accurately") // Test less than comparison - result = engine.valueLessThan(testRecord.Fields["_timestamp_ns"], largeTimestamp1+1) + result = engine.valueLessThan(testRecord.Fields["_ts_ns"], largeTimestamp1+1) assert.True(t, result, "Large timestamp less-than should work without precision loss") // Test greater than comparison - result = engine.valueGreaterThan(testRecord.Fields["_timestamp_ns"], largeTimestamp1-1) + result = engine.valueGreaterThan(testRecord.Fields["_ts_ns"], largeTimestamp1-1) assert.True(t, result, "Large timestamp greater-than should work without precision loss") }) t.Run("Fix2_TimeFilterExtraction", func(t *testing.T) { // Test that equality queries don't set stopTimeNs (which causes premature termination) - equalitySQL := "SELECT * FROM test WHERE _timestamp_ns = " + strconv.FormatInt(largeTimestamp2, 10) + equalitySQL := "SELECT * FROM test WHERE _ts_ns = " + strconv.FormatInt(largeTimestamp2, 10) stmt, err := ParseSQL(equalitySQL) assert.NoError(t, err) @@ -58,8 +58,8 @@ func TestTimestampQueryFixes(t *testing.T) { t.Run("Fix3_RangeBoundaryFix", func(t *testing.T) { // Test that range queries with equal boundaries don't cause premature termination - rangeSQL := "SELECT * FROM test WHERE _timestamp_ns >= " + strconv.FormatInt(largeTimestamp3, 10) + - " AND _timestamp_ns <= " + strconv.FormatInt(largeTimestamp3, 10) + rangeSQL := "SELECT * FROM test WHERE _ts_ns >= " + strconv.FormatInt(largeTimestamp3, 10) + + " AND _ts_ns <= " + strconv.FormatInt(largeTimestamp3, 10) stmt, err := ParseSQL(rangeSQL) assert.NoError(t, err) @@ -73,8 +73,8 @@ func TestTimestampQueryFixes(t *testing.T) { t.Run("Fix4_DifferentRangeBoundaries", func(t *testing.T) { // Test that normal range queries still work correctly - rangeSQL := "SELECT * FROM test WHERE _timestamp_ns >= " + strconv.FormatInt(largeTimestamp1, 10) + - " AND _timestamp_ns <= " + strconv.FormatInt(largeTimestamp2, 10) + rangeSQL := "SELECT * FROM test WHERE _ts_ns >= " + strconv.FormatInt(largeTimestamp1, 10) + + " AND _ts_ns <= " + strconv.FormatInt(largeTimestamp2, 10) stmt, err := ParseSQL(rangeSQL) assert.NoError(t, err) @@ -87,7 +87,7 @@ func TestTimestampQueryFixes(t *testing.T) { t.Run("Fix5_PredicateAccuracy", func(t *testing.T) { // Test that predicates correctly evaluate large timestamp equality - equalitySQL := "SELECT * FROM test WHERE _timestamp_ns = " + strconv.FormatInt(largeTimestamp1, 10) + equalitySQL := "SELECT * FROM test WHERE _ts_ns = " + strconv.FormatInt(largeTimestamp1, 10) stmt, err := ParseSQL(equalitySQL) assert.NoError(t, err) @@ -98,8 +98,8 @@ func TestTimestampQueryFixes(t *testing.T) { // Test with matching record matchingRecord := &schema_pb.RecordValue{ Fields: map[string]*schema_pb.Value{ - "_timestamp_ns": {Kind: &schema_pb.Value_Int64Value{Int64Value: largeTimestamp1}}, - "id": {Kind: &schema_pb.Value_Int64Value{Int64Value: 897795}}, + "_ts_ns": {Kind: &schema_pb.Value_Int64Value{Int64Value: largeTimestamp1}}, + "id": {Kind: &schema_pb.Value_Int64Value{Int64Value: 897795}}, }, } @@ -109,8 +109,8 @@ func TestTimestampQueryFixes(t *testing.T) { // Test with non-matching record nonMatchingRecord := &schema_pb.RecordValue{ Fields: map[string]*schema_pb.Value{ - "_timestamp_ns": {Kind: &schema_pb.Value_Int64Value{Int64Value: largeTimestamp1 + 1}}, - "id": {Kind: &schema_pb.Value_Int64Value{Int64Value: 12345}}, + "_ts_ns": {Kind: &schema_pb.Value_Int64Value{Int64Value: largeTimestamp1 + 1}}, + "id": {Kind: &schema_pb.Value_Int64Value{Int64Value: 12345}}, }, } @@ -122,7 +122,7 @@ func TestTimestampQueryFixes(t *testing.T) { // Test all comparison operators work correctly with large timestamps testRecord := &schema_pb.RecordValue{ Fields: map[string]*schema_pb.Value{ - "_timestamp_ns": {Kind: &schema_pb.Value_Int64Value{Int64Value: largeTimestamp2}}, + "_ts_ns": {Kind: &schema_pb.Value_Int64Value{Int64Value: largeTimestamp2}}, }, } @@ -130,16 +130,16 @@ func TestTimestampQueryFixes(t *testing.T) { sql string expected bool }{ - {"_timestamp_ns = " + strconv.FormatInt(largeTimestamp2, 10), true}, - {"_timestamp_ns = " + strconv.FormatInt(largeTimestamp2+1, 10), false}, - {"_timestamp_ns > " + strconv.FormatInt(largeTimestamp2-1, 10), true}, - {"_timestamp_ns > " + strconv.FormatInt(largeTimestamp2, 10), false}, - {"_timestamp_ns >= " + strconv.FormatInt(largeTimestamp2, 10), true}, - {"_timestamp_ns >= " + strconv.FormatInt(largeTimestamp2+1, 10), false}, - {"_timestamp_ns < " + strconv.FormatInt(largeTimestamp2+1, 10), true}, - {"_timestamp_ns < " + strconv.FormatInt(largeTimestamp2, 10), false}, - {"_timestamp_ns <= " + strconv.FormatInt(largeTimestamp2, 10), true}, - {"_timestamp_ns <= " + strconv.FormatInt(largeTimestamp2-1, 10), false}, + {"_ts_ns = " + strconv.FormatInt(largeTimestamp2, 10), true}, + {"_ts_ns = " + strconv.FormatInt(largeTimestamp2+1, 10), false}, + {"_ts_ns > " + strconv.FormatInt(largeTimestamp2-1, 10), true}, + {"_ts_ns > " + strconv.FormatInt(largeTimestamp2, 10), false}, + {"_ts_ns >= " + strconv.FormatInt(largeTimestamp2, 10), true}, + {"_ts_ns >= " + strconv.FormatInt(largeTimestamp2+1, 10), false}, + {"_ts_ns < " + strconv.FormatInt(largeTimestamp2+1, 10), true}, + {"_ts_ns < " + strconv.FormatInt(largeTimestamp2, 10), false}, + {"_ts_ns <= " + strconv.FormatInt(largeTimestamp2, 10), true}, + {"_ts_ns <= " + strconv.FormatInt(largeTimestamp2-1, 10), false}, } for _, op := range operators { @@ -163,22 +163,22 @@ func TestTimestampQueryFixes(t *testing.T) { maxInt64 := int64(9223372036854775807) testRecord := &schema_pb.RecordValue{ Fields: map[string]*schema_pb.Value{ - "_timestamp_ns": {Kind: &schema_pb.Value_Int64Value{Int64Value: maxInt64}}, + "_ts_ns": {Kind: &schema_pb.Value_Int64Value{Int64Value: maxInt64}}, }, } // Test equality with maximum int64 - result := engine.valuesEqual(testRecord.Fields["_timestamp_ns"], maxInt64) + result := engine.valuesEqual(testRecord.Fields["_ts_ns"], maxInt64) assert.True(t, result, "Should handle maximum int64 value correctly") // Test with zero timestamp zeroRecord := &schema_pb.RecordValue{ Fields: map[string]*schema_pb.Value{ - "_timestamp_ns": {Kind: &schema_pb.Value_Int64Value{Int64Value: 0}}, + "_ts_ns": {Kind: &schema_pb.Value_Int64Value{Int64Value: 0}}, }, } - result = engine.valuesEqual(zeroRecord.Fields["_timestamp_ns"], int64(0)) + result = engine.valuesEqual(zeroRecord.Fields["_ts_ns"], int64(0)) assert.True(t, result, "Should handle zero timestamp correctly") }) } @@ -195,19 +195,19 @@ func TestOriginalFailingQueries(t *testing.T) { }{ { name: "OriginalQuery1", - sql: "select id, _timestamp_ns from ecommerce.user_events where _timestamp_ns = 1756947416566456262", + sql: "select id, _ts_ns from ecommerce.user_events where _ts_ns = 1756947416566456262", timestamp: 1756947416566456262, id: 897795, }, { name: "OriginalQuery2", - sql: "select id, _timestamp_ns from ecommerce.user_events where _timestamp_ns = 1756947416566439304", + sql: "select id, _ts_ns from ecommerce.user_events where _ts_ns = 1756947416566439304", timestamp: 1756947416566439304, id: 715356, }, { name: "CurrentDataQuery", - sql: "select id, _timestamp_ns from ecommerce.user_events where _timestamp_ns = 1756913789829292386", + sql: "select id, _ts_ns from ecommerce.user_events where _ts_ns = 1756913789829292386", timestamp: 1756913789829292386, id: 82460, }, @@ -233,8 +233,8 @@ func TestOriginalFailingQueries(t *testing.T) { // Test with matching record matchingRecord := &schema_pb.RecordValue{ Fields: map[string]*schema_pb.Value{ - "_timestamp_ns": {Kind: &schema_pb.Value_Int64Value{Int64Value: query.timestamp}}, - "id": {Kind: &schema_pb.Value_Int64Value{Int64Value: query.id}}, + "_ts_ns": {Kind: &schema_pb.Value_Int64Value{Int64Value: query.timestamp}}, + "id": {Kind: &schema_pb.Value_Int64Value{Int64Value: query.id}}, }, } diff --git a/weed/query/engine/where_clause_debug_test.go b/weed/query/engine/where_clause_debug_test.go index 0907524bb..382da4594 100644 --- a/weed/query/engine/where_clause_debug_test.go +++ b/weed/query/engine/where_clause_debug_test.go @@ -203,11 +203,11 @@ func TestWhereClauseEndToEnd(t *testing.T) { // CRITICAL TEST: This should detect the WHERE clause bug if impossibleCount == baselineCount { - t.Errorf("❌ WHERE CLAUSE BUG CONFIRMED:") + t.Errorf("WHERE CLAUSE BUG CONFIRMED:") t.Errorf(" Impossible condition returned same row count as no WHERE clause") t.Errorf(" This proves WHERE filtering is not being applied") } else if impossibleCount == 0 { - t.Logf("✅ Impossible WHERE condition correctly returns 0 rows") + t.Logf("Impossible WHERE condition correctly returns 0 rows") } // Test 3: Specific ID filtering @@ -222,11 +222,11 @@ func TestWhereClauseEndToEnd(t *testing.T) { t.Logf("WHERE id = %s: %d rows", firstId, specificCount) if specificCount == baselineCount { - t.Errorf("❌ WHERE clause bug: Specific ID filter returned all rows") + t.Errorf("WHERE clause bug: Specific ID filter returned all rows") } else if specificCount == 1 { - t.Logf("✅ Specific ID WHERE clause working correctly") + t.Logf("Specific ID WHERE clause working correctly") } else { - t.Logf("❓ Unexpected: Specific ID returned %d rows", specificCount) + t.Logf("Unexpected: Specific ID returned %d rows", specificCount) } } @@ -250,10 +250,10 @@ func TestWhereClauseEndToEnd(t *testing.T) { } if nonMatchingCount > 0 { - t.Errorf("❌ WHERE clause bug: %d rows have id <= 10,000,000 but should be filtered out", nonMatchingCount) + t.Errorf("WHERE clause bug: %d rows have id <= 10,000,000 but should be filtered out", nonMatchingCount) t.Errorf(" Sample IDs that should be filtered: %v", getSampleIds(rangeResult, 3)) } else { - t.Logf("✅ WHERE id > 10000000 correctly filtered results") + t.Logf("WHERE id > 10000000 correctly filtered results") } } @@ -317,14 +317,14 @@ func TestSpecificWhereClauseBug(t *testing.T) { t.Logf("Row %d: id = %d", i+1, idVal) if idVal <= 10000000 { bugDetected = true - t.Errorf("❌ BUG: id %d should be filtered out (≤ 10,000,000)", idVal) + t.Errorf("BUG: id %d should be filtered out (<= 10,000,000)", idVal) } } } if !bugDetected { - t.Log("✅ WHERE clause working correctly - all IDs > 10,000,000") + t.Log("WHERE clause working correctly - all IDs > 10,000,000") } else { - t.Error("❌ WHERE clause bug confirmed: Returned IDs that should be filtered out") + t.Error("WHERE clause bug confirmed: Returned IDs that should be filtered out") } } diff --git a/weed/query/engine/where_validation_test.go b/weed/query/engine/where_validation_test.go index 4c2d8b903..4ba7d1c70 100644 --- a/weed/query/engine/where_validation_test.go +++ b/weed/query/engine/where_validation_test.go @@ -37,9 +37,9 @@ func TestWhereClauseValidation(t *testing.T) { t.Logf("WHERE id = %s: %d rows", firstId, len(specificResult.Rows)) if len(specificResult.Rows) == 1 { - t.Logf("✅ Specific ID filtering works correctly") + t.Logf("Specific ID filtering works correctly") } else { - t.Errorf("❌ Expected 1 row, got %d rows", len(specificResult.Rows)) + t.Errorf("Expected 1 row, got %d rows", len(specificResult.Rows)) } // Test 3: Range filtering (find actual data ranges) @@ -73,16 +73,16 @@ func TestWhereClauseValidation(t *testing.T) { for _, row := range rangeResult.Rows { if idVal, err := strconv.ParseInt(row[0].ToString(), 10, 64); err == nil { if idVal <= threshold { - t.Errorf("❌ Found ID %d which should be filtered out (≤ %d)", idVal, threshold) + t.Errorf("Found ID %d which should be filtered out (<= %d)", idVal, threshold) allCorrect = false } } } if allCorrect && len(rangeResult.Rows) > 0 { - t.Logf("✅ Range filtering works correctly - all returned IDs > %d", threshold) + t.Logf("Range filtering works correctly - all returned IDs > %d", threshold) } else if len(rangeResult.Rows) == 0 { - t.Logf("✅ Range filtering works correctly - no IDs > %d in data", threshold) + t.Logf("Range filtering works correctly - no IDs > %d in data", threshold) } // Test 4: String filtering @@ -98,17 +98,17 @@ func TestWhereClauseValidation(t *testing.T) { statusCorrect := true for _, row := range statusResult.Rows { if len(row) > 1 && row[1].ToString() != "active" { - t.Errorf("❌ Found status '%s' which should be filtered out", row[1].ToString()) + t.Errorf("Found status '%s' which should be filtered out", row[1].ToString()) statusCorrect = false } } if statusCorrect { - t.Logf("✅ String filtering works correctly") + t.Logf("String filtering works correctly") } // Test 5: Comparison with actual real-world case - t.Log("\n🎯 TESTING REAL-WORLD CASE:") + t.Log("\nTESTING REAL-WORLD CASE:") realWorldResult, err := engine.ExecuteSQL(context.Background(), "SELECT id FROM user_events WHERE id > 10000000 LIMIT 10 OFFSET 5") if err != nil { @@ -128,9 +128,9 @@ func TestWhereClauseValidation(t *testing.T) { } if violationCount == 0 { - t.Logf("✅ Real-world case FIXED: No violations found") + t.Logf("Real-world case FIXED: No violations found") } else { - t.Errorf("❌ Real-world case FAILED: %d violations found", violationCount) + t.Errorf("Real-world case FAILED: %d violations found", violationCount) } } @@ -168,7 +168,7 @@ func TestWhereClauseComparisonOperators(t *testing.T) { result, err := engine.ExecuteSQL(context.Background(), sql) if err != nil { - t.Errorf("❌ Operator %s failed: %v", op.op, err) + t.Errorf("Operator %s failed: %v", op.op, err) continue } @@ -176,7 +176,7 @@ func TestWhereClauseComparisonOperators(t *testing.T) { // Basic validation - should not return more rows than baseline if len(result.Rows) > len(baselineResult.Rows) { - t.Errorf("❌ Operator %s returned more rows than baseline", op.op) + t.Errorf("Operator %s returned more rows than baseline", op.op) } } } |
