aboutsummaryrefslogtreecommitdiff
path: root/weed/query/engine/hybrid_message_scanner.go
diff options
context:
space:
mode:
Diffstat (limited to 'weed/query/engine/hybrid_message_scanner.go')
-rw-r--r--weed/query/engine/hybrid_message_scanner.go339
1 files changed, 263 insertions, 76 deletions
diff --git a/weed/query/engine/hybrid_message_scanner.go b/weed/query/engine/hybrid_message_scanner.go
index eee57bc23..c09ce2f54 100644
--- a/weed/query/engine/hybrid_message_scanner.go
+++ b/weed/query/engine/hybrid_message_scanner.go
@@ -15,6 +15,7 @@ import (
"github.com/parquet-go/parquet-go"
"github.com/seaweedfs/seaweedfs/weed/filer"
+ "github.com/seaweedfs/seaweedfs/weed/mq"
"github.com/seaweedfs/seaweedfs/weed/mq/logstore"
"github.com/seaweedfs/seaweedfs/weed/mq/schema"
"github.com/seaweedfs/seaweedfs/weed/mq/topic"
@@ -41,6 +42,7 @@ type HybridMessageScanner struct {
brokerClient BrokerClientInterface // For querying unflushed data
topic topic.Topic
recordSchema *schema_pb.RecordType
+ schemaFormat string // Serialization format: "AVRO", "PROTOBUF", "JSON_SCHEMA", or empty for schemaless
parquetLevels *schema.ParquetLevels
engine *SQLEngine // Reference for system column formatting
}
@@ -59,26 +61,32 @@ func NewHybridMessageScanner(filerClient filer_pb.FilerClient, brokerClient Brok
Name: topicName,
}
- // Get topic schema from broker client (works with both real and mock clients)
- recordType, err := brokerClient.GetTopicSchema(context.Background(), namespace, topicName)
+ // Get flat schema from broker client
+ recordType, _, schemaFormat, err := brokerClient.GetTopicSchema(context.Background(), namespace, topicName)
if err != nil {
- return nil, fmt.Errorf("failed to get topic schema: %v", err)
- }
- if recordType == nil {
- return nil, NoSchemaError{Namespace: namespace, Topic: topicName}
+ return nil, fmt.Errorf("failed to get topic record type: %v", err)
}
- // Create a copy of the recordType to avoid modifying the original
- recordTypeCopy := &schema_pb.RecordType{
- Fields: make([]*schema_pb.Field, len(recordType.Fields)),
- }
- copy(recordTypeCopy.Fields, recordType.Fields)
+ if recordType == nil || len(recordType.Fields) == 0 {
+ // For topics without schema, create a minimal schema with system fields and _value
+ recordType = schema.RecordTypeBegin().
+ WithField(SW_COLUMN_NAME_TIMESTAMP, schema.TypeInt64).
+ WithField(SW_COLUMN_NAME_KEY, schema.TypeBytes).
+ WithField(SW_COLUMN_NAME_VALUE, schema.TypeBytes). // Raw message value
+ RecordTypeEnd()
+ } else {
+ // Create a copy of the recordType to avoid modifying the original
+ recordTypeCopy := &schema_pb.RecordType{
+ Fields: make([]*schema_pb.Field, len(recordType.Fields)),
+ }
+ copy(recordTypeCopy.Fields, recordType.Fields)
- // Add system columns that MQ adds to all records
- recordType = schema.NewRecordTypeBuilder(recordTypeCopy).
- WithField(SW_COLUMN_NAME_TIMESTAMP, schema.TypeInt64).
- WithField(SW_COLUMN_NAME_KEY, schema.TypeBytes).
- RecordTypeEnd()
+ // Add system columns that MQ adds to all records
+ recordType = schema.NewRecordTypeBuilder(recordTypeCopy).
+ WithField(SW_COLUMN_NAME_TIMESTAMP, schema.TypeInt64).
+ WithField(SW_COLUMN_NAME_KEY, schema.TypeBytes).
+ RecordTypeEnd()
+ }
// Convert to Parquet levels for efficient reading
parquetLevels, err := schema.ToParquetLevels(recordType)
@@ -91,6 +99,7 @@ func NewHybridMessageScanner(filerClient filer_pb.FilerClient, brokerClient Brok
brokerClient: brokerClient,
topic: t,
recordSchema: recordType,
+ schemaFormat: schemaFormat,
parquetLevels: parquetLevels,
engine: engine,
}, nil
@@ -335,9 +344,6 @@ func (hms *HybridMessageScanner) scanUnflushedDataWithStats(ctx context.Context,
unflushedEntries, err := hms.brokerClient.GetUnflushedMessages(ctx, hms.topic.Namespace, hms.topic.Name, partition, options.StartTimeNs)
if err != nil {
// Log error but don't fail the query - continue with disk data only
- if isDebugMode(ctx) {
- fmt.Printf("Debug: Failed to get unflushed messages: %v\n", err)
- }
// Reset queried flag on error
stats.BrokerBufferQueried = false
return results, stats, nil
@@ -346,18 +352,19 @@ func (hms *HybridMessageScanner) scanUnflushedDataWithStats(ctx context.Context,
// Capture stats for EXPLAIN
stats.BrokerBufferMessages = len(unflushedEntries)
- // Debug logging for EXPLAIN mode
- if isDebugMode(ctx) {
- fmt.Printf("Debug: Broker buffer queried - found %d unflushed messages\n", len(unflushedEntries))
- if len(unflushedEntries) > 0 {
- fmt.Printf("Debug: Using buffer_start deduplication for precise real-time data\n")
- }
- }
-
// Step 2: Process unflushed entries (already deduplicated by broker)
for _, logEntry := range unflushedEntries {
+ // Pre-decode DataMessage for reuse in both control check and conversion
+ var dataMessage *mq_pb.DataMessage
+ if len(logEntry.Data) > 0 {
+ dataMessage = &mq_pb.DataMessage{}
+ if err := proto.Unmarshal(logEntry.Data, dataMessage); err != nil {
+ dataMessage = nil // Failed to decode, treat as raw data
+ }
+ }
+
// Skip control entries without actual data
- if hms.isControlEntry(logEntry) {
+ if hms.isControlEntryWithDecoded(logEntry, dataMessage) {
continue // Skip this entry
}
@@ -370,11 +377,8 @@ func (hms *HybridMessageScanner) scanUnflushedDataWithStats(ctx context.Context,
}
// Convert LogEntry to RecordValue format (same as disk data)
- recordValue, _, err := hms.convertLogEntryToRecordValue(logEntry)
+ recordValue, _, err := hms.convertLogEntryToRecordValueWithDecoded(logEntry, dataMessage)
if err != nil {
- if isDebugMode(ctx) {
- fmt.Printf("Debug: Failed to convert unflushed log entry: %v\n", err)
- }
continue // Skip malformed messages
}
@@ -429,10 +433,6 @@ func (hms *HybridMessageScanner) scanUnflushedDataWithStats(ctx context.Context,
}
}
- if isDebugMode(ctx) {
- fmt.Printf("Debug: Retrieved %d unflushed messages from broker\n", len(results))
- }
-
return results, stats, nil
}
@@ -543,12 +543,8 @@ func (hms *HybridMessageScanner) scanPartitionHybridWithStats(ctx context.Contex
if err != nil {
// Don't fail the query if broker scanning fails, but provide clear warning to user
// This ensures users are aware that results may not include the most recent data
- if isDebugMode(ctx) {
- fmt.Printf("Debug: Failed to scan unflushed data from broker: %v\n", err)
- } else {
- fmt.Printf("Warning: Unable to access real-time data from message broker: %v\n", err)
- fmt.Printf("Note: Query results may not include the most recent unflushed messages\n")
- }
+ fmt.Printf("Warning: Unable to access real-time data from message broker: %v\n", err)
+ fmt.Printf("Note: Query results may not include the most recent unflushed messages\n")
} else if unflushedStats != nil {
stats.BrokerBufferQueried = unflushedStats.BrokerBufferQueried
stats.BrokerBufferMessages = unflushedStats.BrokerBufferMessages
@@ -652,35 +648,114 @@ func (hms *HybridMessageScanner) countLiveLogFiles(partition topic.Partition) (i
// Based on MQ system analysis, control entries are:
// 1. DataMessages with populated Ctrl field (publisher close signals)
// 2. Entries with empty keys (as filtered by subscriber)
-// 3. Entries with no data
+// NOTE: Messages with empty data but valid keys (like NOOP messages) are NOT control entries
func (hms *HybridMessageScanner) isControlEntry(logEntry *filer_pb.LogEntry) bool {
- // Skip entries with no data
- if len(logEntry.Data) == 0 {
- return true
+ // Pre-decode DataMessage if needed
+ var dataMessage *mq_pb.DataMessage
+ if len(logEntry.Data) > 0 {
+ dataMessage = &mq_pb.DataMessage{}
+ if err := proto.Unmarshal(logEntry.Data, dataMessage); err != nil {
+ dataMessage = nil // Failed to decode, treat as raw data
+ }
}
+ return hms.isControlEntryWithDecoded(logEntry, dataMessage)
+}
+// isControlEntryWithDecoded checks if a log entry is a control entry using pre-decoded DataMessage
+// This avoids duplicate protobuf unmarshaling when the DataMessage is already decoded
+func (hms *HybridMessageScanner) isControlEntryWithDecoded(logEntry *filer_pb.LogEntry, dataMessage *mq_pb.DataMessage) bool {
// Skip entries with empty keys (same logic as subscriber)
if len(logEntry.Key) == 0 {
return true
}
// Check if this is a DataMessage with control field populated
- dataMessage := &mq_pb.DataMessage{}
- if err := proto.Unmarshal(logEntry.Data, dataMessage); err == nil {
- // If it has a control field, it's a control message
- if dataMessage.Ctrl != nil {
- return true
- }
+ if dataMessage != nil && dataMessage.Ctrl != nil {
+ return true
}
+ // Messages with valid keys (even if data is empty) are legitimate messages
+ // Examples: NOOP messages from Schema Registry
return false
}
+// isNullOrEmpty checks if a schema_pb.Value is null or empty
+func isNullOrEmpty(value *schema_pb.Value) bool {
+ if value == nil {
+ return true
+ }
+
+ switch v := value.Kind.(type) {
+ case *schema_pb.Value_StringValue:
+ return v.StringValue == ""
+ case *schema_pb.Value_BytesValue:
+ return len(v.BytesValue) == 0
+ case *schema_pb.Value_ListValue:
+ return v.ListValue == nil || len(v.ListValue.Values) == 0
+ case nil:
+ return true // No kind set means null
+ default:
+ return false
+ }
+}
+
+// isSchemaless checks if the scanner is configured for a schema-less topic
+// Schema-less topics only have system fields: _ts_ns, _key, and _value
+func (hms *HybridMessageScanner) isSchemaless() bool {
+ // Schema-less topics only have system fields: _ts_ns, _key, and _value
+ // System topics like _schemas are NOT schema-less - they have structured data
+ // We just need to map their fields during read
+
+ if hms.recordSchema == nil {
+ return false
+ }
+
+ // Count only non-system data fields (exclude _ts_ns and _key which are always present)
+ // Schema-less topics should only have _value as the data field
+ hasValue := false
+ dataFieldCount := 0
+
+ for _, field := range hms.recordSchema.Fields {
+ switch field.Name {
+ case SW_COLUMN_NAME_TIMESTAMP, SW_COLUMN_NAME_KEY:
+ // System fields - ignore
+ continue
+ case SW_COLUMN_NAME_VALUE:
+ hasValue = true
+ dataFieldCount++
+ default:
+ // Any other field means it's not schema-less
+ dataFieldCount++
+ }
+ }
+
+ // Schema-less = only has _value field as the data field (plus system fields)
+ return hasValue && dataFieldCount == 1
+}
+
// convertLogEntryToRecordValue converts a filer_pb.LogEntry to schema_pb.RecordValue
// This handles both:
// 1. Live log entries (raw message format)
// 2. Parquet entries (already in schema_pb.RecordValue format)
+// 3. Schema-less topics (raw bytes in _value field)
func (hms *HybridMessageScanner) convertLogEntryToRecordValue(logEntry *filer_pb.LogEntry) (*schema_pb.RecordValue, string, error) {
+ // For schema-less topics, put raw data directly into _value field
+ if hms.isSchemaless() {
+ recordValue := &schema_pb.RecordValue{
+ Fields: make(map[string]*schema_pb.Value),
+ }
+ recordValue.Fields[SW_COLUMN_NAME_TIMESTAMP] = &schema_pb.Value{
+ Kind: &schema_pb.Value_Int64Value{Int64Value: logEntry.TsNs},
+ }
+ recordValue.Fields[SW_COLUMN_NAME_KEY] = &schema_pb.Value{
+ Kind: &schema_pb.Value_BytesValue{BytesValue: logEntry.Key},
+ }
+ recordValue.Fields[SW_COLUMN_NAME_VALUE] = &schema_pb.Value{
+ Kind: &schema_pb.Value_BytesValue{BytesValue: logEntry.Data},
+ }
+ return recordValue, "live_log", nil
+ }
+
// Try to unmarshal as RecordValue first (Parquet format)
recordValue := &schema_pb.RecordValue{}
if err := proto.Unmarshal(logEntry.Data, recordValue); err == nil {
@@ -705,6 +780,14 @@ func (hms *HybridMessageScanner) convertLogEntryToRecordValue(logEntry *filer_pb
return hms.parseRawMessageWithSchema(logEntry)
}
+// min returns the minimum of two integers
+func min(a, b int) int {
+ if a < b {
+ return a
+ }
+ return b
+}
+
// parseRawMessageWithSchema parses raw live message data using the topic's schema
// This provides proper type conversion and field mapping instead of treating everything as strings
func (hms *HybridMessageScanner) parseRawMessageWithSchema(logEntry *filer_pb.LogEntry) (*schema_pb.RecordValue, string, error) {
@@ -722,51 +805,136 @@ func (hms *HybridMessageScanner) parseRawMessageWithSchema(logEntry *filer_pb.Lo
// Parse message data based on schema
if hms.recordSchema == nil || len(hms.recordSchema.Fields) == 0 {
- // Fallback: No schema available, treat as single "data" field
- recordValue.Fields["data"] = &schema_pb.Value{
- Kind: &schema_pb.Value_StringValue{StringValue: string(logEntry.Data)},
+ // Fallback: No schema available, use "_value" for schema-less topics only
+ if hms.isSchemaless() {
+ recordValue.Fields[SW_COLUMN_NAME_VALUE] = &schema_pb.Value{
+ Kind: &schema_pb.Value_BytesValue{BytesValue: logEntry.Data},
+ }
}
return recordValue, "live_log", nil
}
- // Attempt schema-aware parsing
- // Strategy 1: Try JSON parsing first (most common for live messages)
- if parsedRecord, err := hms.parseJSONMessage(logEntry.Data); err == nil {
- // Successfully parsed as JSON, merge with system columns
- for fieldName, fieldValue := range parsedRecord.Fields {
- recordValue.Fields[fieldName] = fieldValue
+ // Use schema format to directly choose the right decoder
+ // This avoids trying multiple decoders and improves performance
+ var parsedRecord *schema_pb.RecordValue
+ var err error
+
+ switch hms.schemaFormat {
+ case "AVRO":
+ // AVRO format - use Avro decoder
+ // Note: Avro decoding requires schema registry integration
+ // For now, fall through to JSON as many Avro messages are also valid JSON
+ parsedRecord, err = hms.parseJSONMessage(logEntry.Data)
+ case "PROTOBUF":
+ // PROTOBUF format - use protobuf decoder
+ parsedRecord, err = hms.parseProtobufMessage(logEntry.Data)
+ case "JSON_SCHEMA", "":
+ // JSON_SCHEMA format or empty (default to JSON)
+ // JSON is the most common format for schema registry
+ parsedRecord, err = hms.parseJSONMessage(logEntry.Data)
+ if err != nil {
+ // Try protobuf as fallback
+ parsedRecord, err = hms.parseProtobufMessage(logEntry.Data)
+ }
+ default:
+ // Unknown format - try JSON first, then protobuf as fallback
+ parsedRecord, err = hms.parseJSONMessage(logEntry.Data)
+ if err != nil {
+ parsedRecord, err = hms.parseProtobufMessage(logEntry.Data)
}
- return recordValue, "live_log", nil
}
- // Strategy 2: Try protobuf parsing (binary messages)
- if parsedRecord, err := hms.parseProtobufMessage(logEntry.Data); err == nil {
- // Successfully parsed as protobuf, merge with system columns
+ if err == nil && parsedRecord != nil {
+ // Successfully parsed, merge with system columns
for fieldName, fieldValue := range parsedRecord.Fields {
recordValue.Fields[fieldName] = fieldValue
}
return recordValue, "live_log", nil
}
- // Strategy 3: Fallback to single field with raw data
- // If schema has a single field, map the raw data to it with type conversion
+ // Fallback: If schema has a single field, map the raw data to it with type conversion
if len(hms.recordSchema.Fields) == 1 {
field := hms.recordSchema.Fields[0]
- convertedValue, err := hms.convertRawDataToSchemaValue(logEntry.Data, field.Type)
- if err == nil {
+ convertedValue, convErr := hms.convertRawDataToSchemaValue(logEntry.Data, field.Type)
+ if convErr == nil {
recordValue.Fields[field.Name] = convertedValue
return recordValue, "live_log", nil
}
}
- // Final fallback: treat as string data field
- recordValue.Fields["data"] = &schema_pb.Value{
- Kind: &schema_pb.Value_StringValue{StringValue: string(logEntry.Data)},
+ // Final fallback: treat as bytes field for schema-less topics only
+ if hms.isSchemaless() {
+ recordValue.Fields[SW_COLUMN_NAME_VALUE] = &schema_pb.Value{
+ Kind: &schema_pb.Value_BytesValue{BytesValue: logEntry.Data},
+ }
}
return recordValue, "live_log", nil
}
+// convertLogEntryToRecordValueWithDecoded converts a filer_pb.LogEntry to schema_pb.RecordValue
+// using a pre-decoded DataMessage to avoid duplicate protobuf unmarshaling
+func (hms *HybridMessageScanner) convertLogEntryToRecordValueWithDecoded(logEntry *filer_pb.LogEntry, dataMessage *mq_pb.DataMessage) (*schema_pb.RecordValue, string, error) {
+ // IMPORTANT: Check for schema-less topics FIRST
+ // Schema-less topics (like _schemas) should store raw data directly in _value field
+ if hms.isSchemaless() {
+ recordValue := &schema_pb.RecordValue{
+ Fields: make(map[string]*schema_pb.Value),
+ }
+ recordValue.Fields[SW_COLUMN_NAME_TIMESTAMP] = &schema_pb.Value{
+ Kind: &schema_pb.Value_Int64Value{Int64Value: logEntry.TsNs},
+ }
+ recordValue.Fields[SW_COLUMN_NAME_KEY] = &schema_pb.Value{
+ Kind: &schema_pb.Value_BytesValue{BytesValue: logEntry.Key},
+ }
+ recordValue.Fields[SW_COLUMN_NAME_VALUE] = &schema_pb.Value{
+ Kind: &schema_pb.Value_BytesValue{BytesValue: logEntry.Data},
+ }
+ return recordValue, "live_log", nil
+ }
+
+ // CRITICAL: The broker stores DataMessage.Value directly in LogEntry.Data
+ // So we need to try unmarshaling LogEntry.Data as RecordValue first
+ var recordValueBytes []byte
+
+ if dataMessage != nil && len(dataMessage.Value) > 0 {
+ // DataMessage has a Value field - use it
+ recordValueBytes = dataMessage.Value
+ } else {
+ // DataMessage doesn't have Value, use LogEntry.Data directly
+ // This is the normal case when broker stores messages
+ recordValueBytes = logEntry.Data
+ }
+
+ // Try to unmarshal as RecordValue
+ if len(recordValueBytes) > 0 {
+ recordValue := &schema_pb.RecordValue{}
+ if err := proto.Unmarshal(recordValueBytes, recordValue); err == nil {
+ // Successfully unmarshaled as RecordValue
+
+ // Ensure Fields map exists
+ if recordValue.Fields == nil {
+ recordValue.Fields = make(map[string]*schema_pb.Value)
+ }
+
+ // Add system columns from LogEntry
+ recordValue.Fields[SW_COLUMN_NAME_TIMESTAMP] = &schema_pb.Value{
+ Kind: &schema_pb.Value_Int64Value{Int64Value: logEntry.TsNs},
+ }
+ recordValue.Fields[SW_COLUMN_NAME_KEY] = &schema_pb.Value{
+ Kind: &schema_pb.Value_BytesValue{BytesValue: logEntry.Key},
+ }
+
+ return recordValue, "live_log", nil
+ }
+ // If unmarshaling as RecordValue fails, fall back to schema-aware parsing
+ }
+
+ // For cases where protobuf unmarshaling failed or data is empty,
+ // attempt schema-aware parsing to try JSON, protobuf, and other formats
+ return hms.parseRawMessageWithSchema(logEntry)
+}
+
// parseJSONMessage attempts to parse raw data as JSON and map to schema fields
func (hms *HybridMessageScanner) parseJSONMessage(data []byte) (*schema_pb.RecordValue, error) {
// Try to parse as JSON
@@ -950,6 +1118,11 @@ func (hms *HybridMessageScanner) ConvertToSQLResult(results []HybridScanResult,
for columnName := range columnSet {
columns = append(columns, columnName)
}
+
+ // If no data columns were found, include system columns so we have something to display
+ if len(columns) == 0 {
+ columns = []string{SW_DISPLAY_NAME_TIMESTAMP, SW_COLUMN_NAME_KEY}
+ }
}
// Convert to SQL rows
@@ -1037,6 +1210,11 @@ func (hms *HybridMessageScanner) ConvertToSQLResultWithMixedColumns(results []Hy
columns = append(columns, col)
}
+ // If no data columns were found and no explicit columns specified, include system columns
+ if len(columns) == 0 {
+ columns = []string{SW_DISPLAY_NAME_TIMESTAMP, SW_COLUMN_NAME_KEY}
+ }
+
// Convert to SQL rows
rows := make([][]sqltypes.Value, len(results))
for i, result := range results {
@@ -1123,10 +1301,10 @@ func (h *HybridMessageScanner) extractParquetFileStats(entry *filer_pb.Entry, lo
}
// Populate optional min/max from filer extended attributes (writer stores ns timestamps)
if entry != nil && entry.Extended != nil {
- if minBytes, ok := entry.Extended["min"]; ok && len(minBytes) == 8 {
+ if minBytes, ok := entry.Extended[mq.ExtendedAttrTimestampMin]; ok && len(minBytes) == 8 {
fileStats.MinTimestampNs = int64(binary.BigEndian.Uint64(minBytes))
}
- if maxBytes, ok := entry.Extended["max"]; ok && len(maxBytes) == 8 {
+ if maxBytes, ok := entry.Extended[mq.ExtendedAttrTimestampMax]; ok && len(maxBytes) == 8 {
fileStats.MaxTimestampNs = int64(binary.BigEndian.Uint64(maxBytes))
}
}
@@ -1538,13 +1716,22 @@ func (s *StreamingFlushedDataSource) startStreaming() {
// Message processing function
eachLogEntryFn := func(logEntry *filer_pb.LogEntry) (isDone bool, err error) {
+ // Pre-decode DataMessage for reuse in both control check and conversion
+ var dataMessage *mq_pb.DataMessage
+ if len(logEntry.Data) > 0 {
+ dataMessage = &mq_pb.DataMessage{}
+ if err := proto.Unmarshal(logEntry.Data, dataMessage); err != nil {
+ dataMessage = nil // Failed to decode, treat as raw data
+ }
+ }
+
// Skip control entries without actual data
- if s.hms.isControlEntry(logEntry) {
+ if s.hms.isControlEntryWithDecoded(logEntry, dataMessage) {
return false, nil // Skip this entry
}
// Convert log entry to schema_pb.RecordValue for consistent processing
- recordValue, source, convertErr := s.hms.convertLogEntryToRecordValue(logEntry)
+ recordValue, source, convertErr := s.hms.convertLogEntryToRecordValueWithDecoded(logEntry, dataMessage)
if convertErr != nil {
return false, fmt.Errorf("failed to convert log entry: %v", convertErr)
}