diff options
Diffstat (limited to 'weed/query/engine/hybrid_message_scanner.go')
| -rw-r--r-- | weed/query/engine/hybrid_message_scanner.go | 339 |
1 files changed, 263 insertions, 76 deletions
diff --git a/weed/query/engine/hybrid_message_scanner.go b/weed/query/engine/hybrid_message_scanner.go index eee57bc23..c09ce2f54 100644 --- a/weed/query/engine/hybrid_message_scanner.go +++ b/weed/query/engine/hybrid_message_scanner.go @@ -15,6 +15,7 @@ import ( "github.com/parquet-go/parquet-go" "github.com/seaweedfs/seaweedfs/weed/filer" + "github.com/seaweedfs/seaweedfs/weed/mq" "github.com/seaweedfs/seaweedfs/weed/mq/logstore" "github.com/seaweedfs/seaweedfs/weed/mq/schema" "github.com/seaweedfs/seaweedfs/weed/mq/topic" @@ -41,6 +42,7 @@ type HybridMessageScanner struct { brokerClient BrokerClientInterface // For querying unflushed data topic topic.Topic recordSchema *schema_pb.RecordType + schemaFormat string // Serialization format: "AVRO", "PROTOBUF", "JSON_SCHEMA", or empty for schemaless parquetLevels *schema.ParquetLevels engine *SQLEngine // Reference for system column formatting } @@ -59,26 +61,32 @@ func NewHybridMessageScanner(filerClient filer_pb.FilerClient, brokerClient Brok Name: topicName, } - // Get topic schema from broker client (works with both real and mock clients) - recordType, err := brokerClient.GetTopicSchema(context.Background(), namespace, topicName) + // Get flat schema from broker client + recordType, _, schemaFormat, err := brokerClient.GetTopicSchema(context.Background(), namespace, topicName) if err != nil { - return nil, fmt.Errorf("failed to get topic schema: %v", err) - } - if recordType == nil { - return nil, NoSchemaError{Namespace: namespace, Topic: topicName} + return nil, fmt.Errorf("failed to get topic record type: %v", err) } - // Create a copy of the recordType to avoid modifying the original - recordTypeCopy := &schema_pb.RecordType{ - Fields: make([]*schema_pb.Field, len(recordType.Fields)), - } - copy(recordTypeCopy.Fields, recordType.Fields) + if recordType == nil || len(recordType.Fields) == 0 { + // For topics without schema, create a minimal schema with system fields and _value + recordType = schema.RecordTypeBegin(). + WithField(SW_COLUMN_NAME_TIMESTAMP, schema.TypeInt64). + WithField(SW_COLUMN_NAME_KEY, schema.TypeBytes). + WithField(SW_COLUMN_NAME_VALUE, schema.TypeBytes). // Raw message value + RecordTypeEnd() + } else { + // Create a copy of the recordType to avoid modifying the original + recordTypeCopy := &schema_pb.RecordType{ + Fields: make([]*schema_pb.Field, len(recordType.Fields)), + } + copy(recordTypeCopy.Fields, recordType.Fields) - // Add system columns that MQ adds to all records - recordType = schema.NewRecordTypeBuilder(recordTypeCopy). - WithField(SW_COLUMN_NAME_TIMESTAMP, schema.TypeInt64). - WithField(SW_COLUMN_NAME_KEY, schema.TypeBytes). - RecordTypeEnd() + // Add system columns that MQ adds to all records + recordType = schema.NewRecordTypeBuilder(recordTypeCopy). + WithField(SW_COLUMN_NAME_TIMESTAMP, schema.TypeInt64). + WithField(SW_COLUMN_NAME_KEY, schema.TypeBytes). + RecordTypeEnd() + } // Convert to Parquet levels for efficient reading parquetLevels, err := schema.ToParquetLevels(recordType) @@ -91,6 +99,7 @@ func NewHybridMessageScanner(filerClient filer_pb.FilerClient, brokerClient Brok brokerClient: brokerClient, topic: t, recordSchema: recordType, + schemaFormat: schemaFormat, parquetLevels: parquetLevels, engine: engine, }, nil @@ -335,9 +344,6 @@ func (hms *HybridMessageScanner) scanUnflushedDataWithStats(ctx context.Context, unflushedEntries, err := hms.brokerClient.GetUnflushedMessages(ctx, hms.topic.Namespace, hms.topic.Name, partition, options.StartTimeNs) if err != nil { // Log error but don't fail the query - continue with disk data only - if isDebugMode(ctx) { - fmt.Printf("Debug: Failed to get unflushed messages: %v\n", err) - } // Reset queried flag on error stats.BrokerBufferQueried = false return results, stats, nil @@ -346,18 +352,19 @@ func (hms *HybridMessageScanner) scanUnflushedDataWithStats(ctx context.Context, // Capture stats for EXPLAIN stats.BrokerBufferMessages = len(unflushedEntries) - // Debug logging for EXPLAIN mode - if isDebugMode(ctx) { - fmt.Printf("Debug: Broker buffer queried - found %d unflushed messages\n", len(unflushedEntries)) - if len(unflushedEntries) > 0 { - fmt.Printf("Debug: Using buffer_start deduplication for precise real-time data\n") - } - } - // Step 2: Process unflushed entries (already deduplicated by broker) for _, logEntry := range unflushedEntries { + // Pre-decode DataMessage for reuse in both control check and conversion + var dataMessage *mq_pb.DataMessage + if len(logEntry.Data) > 0 { + dataMessage = &mq_pb.DataMessage{} + if err := proto.Unmarshal(logEntry.Data, dataMessage); err != nil { + dataMessage = nil // Failed to decode, treat as raw data + } + } + // Skip control entries without actual data - if hms.isControlEntry(logEntry) { + if hms.isControlEntryWithDecoded(logEntry, dataMessage) { continue // Skip this entry } @@ -370,11 +377,8 @@ func (hms *HybridMessageScanner) scanUnflushedDataWithStats(ctx context.Context, } // Convert LogEntry to RecordValue format (same as disk data) - recordValue, _, err := hms.convertLogEntryToRecordValue(logEntry) + recordValue, _, err := hms.convertLogEntryToRecordValueWithDecoded(logEntry, dataMessage) if err != nil { - if isDebugMode(ctx) { - fmt.Printf("Debug: Failed to convert unflushed log entry: %v\n", err) - } continue // Skip malformed messages } @@ -429,10 +433,6 @@ func (hms *HybridMessageScanner) scanUnflushedDataWithStats(ctx context.Context, } } - if isDebugMode(ctx) { - fmt.Printf("Debug: Retrieved %d unflushed messages from broker\n", len(results)) - } - return results, stats, nil } @@ -543,12 +543,8 @@ func (hms *HybridMessageScanner) scanPartitionHybridWithStats(ctx context.Contex if err != nil { // Don't fail the query if broker scanning fails, but provide clear warning to user // This ensures users are aware that results may not include the most recent data - if isDebugMode(ctx) { - fmt.Printf("Debug: Failed to scan unflushed data from broker: %v\n", err) - } else { - fmt.Printf("Warning: Unable to access real-time data from message broker: %v\n", err) - fmt.Printf("Note: Query results may not include the most recent unflushed messages\n") - } + fmt.Printf("Warning: Unable to access real-time data from message broker: %v\n", err) + fmt.Printf("Note: Query results may not include the most recent unflushed messages\n") } else if unflushedStats != nil { stats.BrokerBufferQueried = unflushedStats.BrokerBufferQueried stats.BrokerBufferMessages = unflushedStats.BrokerBufferMessages @@ -652,35 +648,114 @@ func (hms *HybridMessageScanner) countLiveLogFiles(partition topic.Partition) (i // Based on MQ system analysis, control entries are: // 1. DataMessages with populated Ctrl field (publisher close signals) // 2. Entries with empty keys (as filtered by subscriber) -// 3. Entries with no data +// NOTE: Messages with empty data but valid keys (like NOOP messages) are NOT control entries func (hms *HybridMessageScanner) isControlEntry(logEntry *filer_pb.LogEntry) bool { - // Skip entries with no data - if len(logEntry.Data) == 0 { - return true + // Pre-decode DataMessage if needed + var dataMessage *mq_pb.DataMessage + if len(logEntry.Data) > 0 { + dataMessage = &mq_pb.DataMessage{} + if err := proto.Unmarshal(logEntry.Data, dataMessage); err != nil { + dataMessage = nil // Failed to decode, treat as raw data + } } + return hms.isControlEntryWithDecoded(logEntry, dataMessage) +} +// isControlEntryWithDecoded checks if a log entry is a control entry using pre-decoded DataMessage +// This avoids duplicate protobuf unmarshaling when the DataMessage is already decoded +func (hms *HybridMessageScanner) isControlEntryWithDecoded(logEntry *filer_pb.LogEntry, dataMessage *mq_pb.DataMessage) bool { // Skip entries with empty keys (same logic as subscriber) if len(logEntry.Key) == 0 { return true } // Check if this is a DataMessage with control field populated - dataMessage := &mq_pb.DataMessage{} - if err := proto.Unmarshal(logEntry.Data, dataMessage); err == nil { - // If it has a control field, it's a control message - if dataMessage.Ctrl != nil { - return true - } + if dataMessage != nil && dataMessage.Ctrl != nil { + return true } + // Messages with valid keys (even if data is empty) are legitimate messages + // Examples: NOOP messages from Schema Registry return false } +// isNullOrEmpty checks if a schema_pb.Value is null or empty +func isNullOrEmpty(value *schema_pb.Value) bool { + if value == nil { + return true + } + + switch v := value.Kind.(type) { + case *schema_pb.Value_StringValue: + return v.StringValue == "" + case *schema_pb.Value_BytesValue: + return len(v.BytesValue) == 0 + case *schema_pb.Value_ListValue: + return v.ListValue == nil || len(v.ListValue.Values) == 0 + case nil: + return true // No kind set means null + default: + return false + } +} + +// isSchemaless checks if the scanner is configured for a schema-less topic +// Schema-less topics only have system fields: _ts_ns, _key, and _value +func (hms *HybridMessageScanner) isSchemaless() bool { + // Schema-less topics only have system fields: _ts_ns, _key, and _value + // System topics like _schemas are NOT schema-less - they have structured data + // We just need to map their fields during read + + if hms.recordSchema == nil { + return false + } + + // Count only non-system data fields (exclude _ts_ns and _key which are always present) + // Schema-less topics should only have _value as the data field + hasValue := false + dataFieldCount := 0 + + for _, field := range hms.recordSchema.Fields { + switch field.Name { + case SW_COLUMN_NAME_TIMESTAMP, SW_COLUMN_NAME_KEY: + // System fields - ignore + continue + case SW_COLUMN_NAME_VALUE: + hasValue = true + dataFieldCount++ + default: + // Any other field means it's not schema-less + dataFieldCount++ + } + } + + // Schema-less = only has _value field as the data field (plus system fields) + return hasValue && dataFieldCount == 1 +} + // convertLogEntryToRecordValue converts a filer_pb.LogEntry to schema_pb.RecordValue // This handles both: // 1. Live log entries (raw message format) // 2. Parquet entries (already in schema_pb.RecordValue format) +// 3. Schema-less topics (raw bytes in _value field) func (hms *HybridMessageScanner) convertLogEntryToRecordValue(logEntry *filer_pb.LogEntry) (*schema_pb.RecordValue, string, error) { + // For schema-less topics, put raw data directly into _value field + if hms.isSchemaless() { + recordValue := &schema_pb.RecordValue{ + Fields: make(map[string]*schema_pb.Value), + } + recordValue.Fields[SW_COLUMN_NAME_TIMESTAMP] = &schema_pb.Value{ + Kind: &schema_pb.Value_Int64Value{Int64Value: logEntry.TsNs}, + } + recordValue.Fields[SW_COLUMN_NAME_KEY] = &schema_pb.Value{ + Kind: &schema_pb.Value_BytesValue{BytesValue: logEntry.Key}, + } + recordValue.Fields[SW_COLUMN_NAME_VALUE] = &schema_pb.Value{ + Kind: &schema_pb.Value_BytesValue{BytesValue: logEntry.Data}, + } + return recordValue, "live_log", nil + } + // Try to unmarshal as RecordValue first (Parquet format) recordValue := &schema_pb.RecordValue{} if err := proto.Unmarshal(logEntry.Data, recordValue); err == nil { @@ -705,6 +780,14 @@ func (hms *HybridMessageScanner) convertLogEntryToRecordValue(logEntry *filer_pb return hms.parseRawMessageWithSchema(logEntry) } +// min returns the minimum of two integers +func min(a, b int) int { + if a < b { + return a + } + return b +} + // parseRawMessageWithSchema parses raw live message data using the topic's schema // This provides proper type conversion and field mapping instead of treating everything as strings func (hms *HybridMessageScanner) parseRawMessageWithSchema(logEntry *filer_pb.LogEntry) (*schema_pb.RecordValue, string, error) { @@ -722,51 +805,136 @@ func (hms *HybridMessageScanner) parseRawMessageWithSchema(logEntry *filer_pb.Lo // Parse message data based on schema if hms.recordSchema == nil || len(hms.recordSchema.Fields) == 0 { - // Fallback: No schema available, treat as single "data" field - recordValue.Fields["data"] = &schema_pb.Value{ - Kind: &schema_pb.Value_StringValue{StringValue: string(logEntry.Data)}, + // Fallback: No schema available, use "_value" for schema-less topics only + if hms.isSchemaless() { + recordValue.Fields[SW_COLUMN_NAME_VALUE] = &schema_pb.Value{ + Kind: &schema_pb.Value_BytesValue{BytesValue: logEntry.Data}, + } } return recordValue, "live_log", nil } - // Attempt schema-aware parsing - // Strategy 1: Try JSON parsing first (most common for live messages) - if parsedRecord, err := hms.parseJSONMessage(logEntry.Data); err == nil { - // Successfully parsed as JSON, merge with system columns - for fieldName, fieldValue := range parsedRecord.Fields { - recordValue.Fields[fieldName] = fieldValue + // Use schema format to directly choose the right decoder + // This avoids trying multiple decoders and improves performance + var parsedRecord *schema_pb.RecordValue + var err error + + switch hms.schemaFormat { + case "AVRO": + // AVRO format - use Avro decoder + // Note: Avro decoding requires schema registry integration + // For now, fall through to JSON as many Avro messages are also valid JSON + parsedRecord, err = hms.parseJSONMessage(logEntry.Data) + case "PROTOBUF": + // PROTOBUF format - use protobuf decoder + parsedRecord, err = hms.parseProtobufMessage(logEntry.Data) + case "JSON_SCHEMA", "": + // JSON_SCHEMA format or empty (default to JSON) + // JSON is the most common format for schema registry + parsedRecord, err = hms.parseJSONMessage(logEntry.Data) + if err != nil { + // Try protobuf as fallback + parsedRecord, err = hms.parseProtobufMessage(logEntry.Data) + } + default: + // Unknown format - try JSON first, then protobuf as fallback + parsedRecord, err = hms.parseJSONMessage(logEntry.Data) + if err != nil { + parsedRecord, err = hms.parseProtobufMessage(logEntry.Data) } - return recordValue, "live_log", nil } - // Strategy 2: Try protobuf parsing (binary messages) - if parsedRecord, err := hms.parseProtobufMessage(logEntry.Data); err == nil { - // Successfully parsed as protobuf, merge with system columns + if err == nil && parsedRecord != nil { + // Successfully parsed, merge with system columns for fieldName, fieldValue := range parsedRecord.Fields { recordValue.Fields[fieldName] = fieldValue } return recordValue, "live_log", nil } - // Strategy 3: Fallback to single field with raw data - // If schema has a single field, map the raw data to it with type conversion + // Fallback: If schema has a single field, map the raw data to it with type conversion if len(hms.recordSchema.Fields) == 1 { field := hms.recordSchema.Fields[0] - convertedValue, err := hms.convertRawDataToSchemaValue(logEntry.Data, field.Type) - if err == nil { + convertedValue, convErr := hms.convertRawDataToSchemaValue(logEntry.Data, field.Type) + if convErr == nil { recordValue.Fields[field.Name] = convertedValue return recordValue, "live_log", nil } } - // Final fallback: treat as string data field - recordValue.Fields["data"] = &schema_pb.Value{ - Kind: &schema_pb.Value_StringValue{StringValue: string(logEntry.Data)}, + // Final fallback: treat as bytes field for schema-less topics only + if hms.isSchemaless() { + recordValue.Fields[SW_COLUMN_NAME_VALUE] = &schema_pb.Value{ + Kind: &schema_pb.Value_BytesValue{BytesValue: logEntry.Data}, + } } return recordValue, "live_log", nil } +// convertLogEntryToRecordValueWithDecoded converts a filer_pb.LogEntry to schema_pb.RecordValue +// using a pre-decoded DataMessage to avoid duplicate protobuf unmarshaling +func (hms *HybridMessageScanner) convertLogEntryToRecordValueWithDecoded(logEntry *filer_pb.LogEntry, dataMessage *mq_pb.DataMessage) (*schema_pb.RecordValue, string, error) { + // IMPORTANT: Check for schema-less topics FIRST + // Schema-less topics (like _schemas) should store raw data directly in _value field + if hms.isSchemaless() { + recordValue := &schema_pb.RecordValue{ + Fields: make(map[string]*schema_pb.Value), + } + recordValue.Fields[SW_COLUMN_NAME_TIMESTAMP] = &schema_pb.Value{ + Kind: &schema_pb.Value_Int64Value{Int64Value: logEntry.TsNs}, + } + recordValue.Fields[SW_COLUMN_NAME_KEY] = &schema_pb.Value{ + Kind: &schema_pb.Value_BytesValue{BytesValue: logEntry.Key}, + } + recordValue.Fields[SW_COLUMN_NAME_VALUE] = &schema_pb.Value{ + Kind: &schema_pb.Value_BytesValue{BytesValue: logEntry.Data}, + } + return recordValue, "live_log", nil + } + + // CRITICAL: The broker stores DataMessage.Value directly in LogEntry.Data + // So we need to try unmarshaling LogEntry.Data as RecordValue first + var recordValueBytes []byte + + if dataMessage != nil && len(dataMessage.Value) > 0 { + // DataMessage has a Value field - use it + recordValueBytes = dataMessage.Value + } else { + // DataMessage doesn't have Value, use LogEntry.Data directly + // This is the normal case when broker stores messages + recordValueBytes = logEntry.Data + } + + // Try to unmarshal as RecordValue + if len(recordValueBytes) > 0 { + recordValue := &schema_pb.RecordValue{} + if err := proto.Unmarshal(recordValueBytes, recordValue); err == nil { + // Successfully unmarshaled as RecordValue + + // Ensure Fields map exists + if recordValue.Fields == nil { + recordValue.Fields = make(map[string]*schema_pb.Value) + } + + // Add system columns from LogEntry + recordValue.Fields[SW_COLUMN_NAME_TIMESTAMP] = &schema_pb.Value{ + Kind: &schema_pb.Value_Int64Value{Int64Value: logEntry.TsNs}, + } + recordValue.Fields[SW_COLUMN_NAME_KEY] = &schema_pb.Value{ + Kind: &schema_pb.Value_BytesValue{BytesValue: logEntry.Key}, + } + + return recordValue, "live_log", nil + } + // If unmarshaling as RecordValue fails, fall back to schema-aware parsing + } + + // For cases where protobuf unmarshaling failed or data is empty, + // attempt schema-aware parsing to try JSON, protobuf, and other formats + return hms.parseRawMessageWithSchema(logEntry) +} + // parseJSONMessage attempts to parse raw data as JSON and map to schema fields func (hms *HybridMessageScanner) parseJSONMessage(data []byte) (*schema_pb.RecordValue, error) { // Try to parse as JSON @@ -950,6 +1118,11 @@ func (hms *HybridMessageScanner) ConvertToSQLResult(results []HybridScanResult, for columnName := range columnSet { columns = append(columns, columnName) } + + // If no data columns were found, include system columns so we have something to display + if len(columns) == 0 { + columns = []string{SW_DISPLAY_NAME_TIMESTAMP, SW_COLUMN_NAME_KEY} + } } // Convert to SQL rows @@ -1037,6 +1210,11 @@ func (hms *HybridMessageScanner) ConvertToSQLResultWithMixedColumns(results []Hy columns = append(columns, col) } + // If no data columns were found and no explicit columns specified, include system columns + if len(columns) == 0 { + columns = []string{SW_DISPLAY_NAME_TIMESTAMP, SW_COLUMN_NAME_KEY} + } + // Convert to SQL rows rows := make([][]sqltypes.Value, len(results)) for i, result := range results { @@ -1123,10 +1301,10 @@ func (h *HybridMessageScanner) extractParquetFileStats(entry *filer_pb.Entry, lo } // Populate optional min/max from filer extended attributes (writer stores ns timestamps) if entry != nil && entry.Extended != nil { - if minBytes, ok := entry.Extended["min"]; ok && len(minBytes) == 8 { + if minBytes, ok := entry.Extended[mq.ExtendedAttrTimestampMin]; ok && len(minBytes) == 8 { fileStats.MinTimestampNs = int64(binary.BigEndian.Uint64(minBytes)) } - if maxBytes, ok := entry.Extended["max"]; ok && len(maxBytes) == 8 { + if maxBytes, ok := entry.Extended[mq.ExtendedAttrTimestampMax]; ok && len(maxBytes) == 8 { fileStats.MaxTimestampNs = int64(binary.BigEndian.Uint64(maxBytes)) } } @@ -1538,13 +1716,22 @@ func (s *StreamingFlushedDataSource) startStreaming() { // Message processing function eachLogEntryFn := func(logEntry *filer_pb.LogEntry) (isDone bool, err error) { + // Pre-decode DataMessage for reuse in both control check and conversion + var dataMessage *mq_pb.DataMessage + if len(logEntry.Data) > 0 { + dataMessage = &mq_pb.DataMessage{} + if err := proto.Unmarshal(logEntry.Data, dataMessage); err != nil { + dataMessage = nil // Failed to decode, treat as raw data + } + } + // Skip control entries without actual data - if s.hms.isControlEntry(logEntry) { + if s.hms.isControlEntryWithDecoded(logEntry, dataMessage) { return false, nil // Skip this entry } // Convert log entry to schema_pb.RecordValue for consistent processing - recordValue, source, convertErr := s.hms.convertLogEntryToRecordValue(logEntry) + recordValue, source, convertErr := s.hms.convertLogEntryToRecordValueWithDecoded(logEntry, dataMessage) if convertErr != nil { return false, fmt.Errorf("failed to convert log entry: %v", convertErr) } |
