aboutsummaryrefslogtreecommitdiff
path: root/weed/query/engine/parquet_scanner.go
diff options
context:
space:
mode:
Diffstat (limited to 'weed/query/engine/parquet_scanner.go')
-rw-r--r--weed/query/engine/parquet_scanner.go31
1 files changed, 21 insertions, 10 deletions
diff --git a/weed/query/engine/parquet_scanner.go b/weed/query/engine/parquet_scanner.go
index 113cd814a..e4b5252c7 100644
--- a/weed/query/engine/parquet_scanner.go
+++ b/weed/query/engine/parquet_scanner.go
@@ -21,7 +21,7 @@ import (
// Assumptions:
// 1. All MQ messages are stored in Parquet format in topic partitions
// 2. Each partition directory contains dated Parquet files
-// 3. System columns (_timestamp_ns, _key) are added to user schema
+// 3. System columns (_ts_ns, _key) are added to user schema
// 4. Predicate pushdown is used for efficient scanning
type ParquetScanner struct {
filerClient filer_pb.FilerClient
@@ -55,17 +55,28 @@ func NewParquetScanner(filerClient filer_pb.FilerClient, namespace, topicName st
return nil, fmt.Errorf("failed to read topic config: %v", err)
}
- // Build complete schema with system columns
- recordType := topicConf.GetRecordType()
- if recordType == nil {
- return nil, NoSchemaError{Namespace: namespace, Topic: topicName}
+ // Build complete schema with system columns - prefer flat schema if available
+ var recordType *schema_pb.RecordType
+
+ if topicConf.GetMessageRecordType() != nil {
+ // New flat schema format - use directly
+ recordType = topicConf.GetMessageRecordType()
}
- // Add system columns that MQ adds to all records
- recordType = schema.NewRecordTypeBuilder(recordType).
- WithField(SW_COLUMN_NAME_TIMESTAMP, schema.TypeInt64).
- WithField(SW_COLUMN_NAME_KEY, schema.TypeBytes).
- RecordTypeEnd()
+ if recordType == nil || len(recordType.Fields) == 0 {
+ // For topics without schema, create a minimal schema with system fields and _value
+ recordType = schema.RecordTypeBegin().
+ WithField(SW_COLUMN_NAME_TIMESTAMP, schema.TypeInt64).
+ WithField(SW_COLUMN_NAME_KEY, schema.TypeBytes).
+ WithField(SW_COLUMN_NAME_VALUE, schema.TypeBytes). // Raw message value
+ RecordTypeEnd()
+ } else {
+ // Add system columns that MQ adds to all records
+ recordType = schema.NewRecordTypeBuilder(recordType).
+ WithField(SW_COLUMN_NAME_TIMESTAMP, schema.TypeInt64).
+ WithField(SW_COLUMN_NAME_KEY, schema.TypeBytes).
+ RecordTypeEnd()
+ }
// Convert to Parquet levels for efficient reading
parquetLevels, err := schema.ToParquetLevels(recordType)