diff options
Diffstat (limited to 'weed/query/engine/parquet_scanner.go')
| -rw-r--r-- | weed/query/engine/parquet_scanner.go | 31 |
1 files changed, 21 insertions, 10 deletions
diff --git a/weed/query/engine/parquet_scanner.go b/weed/query/engine/parquet_scanner.go index 113cd814a..e4b5252c7 100644 --- a/weed/query/engine/parquet_scanner.go +++ b/weed/query/engine/parquet_scanner.go @@ -21,7 +21,7 @@ import ( // Assumptions: // 1. All MQ messages are stored in Parquet format in topic partitions // 2. Each partition directory contains dated Parquet files -// 3. System columns (_timestamp_ns, _key) are added to user schema +// 3. System columns (_ts_ns, _key) are added to user schema // 4. Predicate pushdown is used for efficient scanning type ParquetScanner struct { filerClient filer_pb.FilerClient @@ -55,17 +55,28 @@ func NewParquetScanner(filerClient filer_pb.FilerClient, namespace, topicName st return nil, fmt.Errorf("failed to read topic config: %v", err) } - // Build complete schema with system columns - recordType := topicConf.GetRecordType() - if recordType == nil { - return nil, NoSchemaError{Namespace: namespace, Topic: topicName} + // Build complete schema with system columns - prefer flat schema if available + var recordType *schema_pb.RecordType + + if topicConf.GetMessageRecordType() != nil { + // New flat schema format - use directly + recordType = topicConf.GetMessageRecordType() } - // Add system columns that MQ adds to all records - recordType = schema.NewRecordTypeBuilder(recordType). - WithField(SW_COLUMN_NAME_TIMESTAMP, schema.TypeInt64). - WithField(SW_COLUMN_NAME_KEY, schema.TypeBytes). - RecordTypeEnd() + if recordType == nil || len(recordType.Fields) == 0 { + // For topics without schema, create a minimal schema with system fields and _value + recordType = schema.RecordTypeBegin(). + WithField(SW_COLUMN_NAME_TIMESTAMP, schema.TypeInt64). + WithField(SW_COLUMN_NAME_KEY, schema.TypeBytes). + WithField(SW_COLUMN_NAME_VALUE, schema.TypeBytes). // Raw message value + RecordTypeEnd() + } else { + // Add system columns that MQ adds to all records + recordType = schema.NewRecordTypeBuilder(recordType). + WithField(SW_COLUMN_NAME_TIMESTAMP, schema.TypeInt64). + WithField(SW_COLUMN_NAME_KEY, schema.TypeBytes). + RecordTypeEnd() + } // Convert to Parquet levels for efficient reading parquetLevels, err := schema.ToParquetLevels(recordType) |
