aboutsummaryrefslogtreecommitdiff
path: root/weed/query/engine/parquet_scanner.go
diff options
context:
space:
mode:
Diffstat (limited to 'weed/query/engine/parquet_scanner.go')
-rw-r--r--weed/query/engine/parquet_scanner.go438
1 files changed, 438 insertions, 0 deletions
diff --git a/weed/query/engine/parquet_scanner.go b/weed/query/engine/parquet_scanner.go
new file mode 100644
index 000000000..113cd814a
--- /dev/null
+++ b/weed/query/engine/parquet_scanner.go
@@ -0,0 +1,438 @@
+package engine
+
+import (
+ "context"
+ "fmt"
+ "math/big"
+ "time"
+
+ "github.com/parquet-go/parquet-go"
+ "github.com/seaweedfs/seaweedfs/weed/filer"
+ "github.com/seaweedfs/seaweedfs/weed/mq/schema"
+ "github.com/seaweedfs/seaweedfs/weed/mq/topic"
+ "github.com/seaweedfs/seaweedfs/weed/pb/filer_pb"
+ "github.com/seaweedfs/seaweedfs/weed/pb/mq_pb"
+ "github.com/seaweedfs/seaweedfs/weed/pb/schema_pb"
+ "github.com/seaweedfs/seaweedfs/weed/query/sqltypes"
+ "github.com/seaweedfs/seaweedfs/weed/util/chunk_cache"
+)
+
+// ParquetScanner scans MQ topic Parquet files for SELECT queries
+// Assumptions:
+// 1. All MQ messages are stored in Parquet format in topic partitions
+// 2. Each partition directory contains dated Parquet files
+// 3. System columns (_timestamp_ns, _key) are added to user schema
+// 4. Predicate pushdown is used for efficient scanning
+type ParquetScanner struct {
+ filerClient filer_pb.FilerClient
+ chunkCache chunk_cache.ChunkCache
+ topic topic.Topic
+ recordSchema *schema_pb.RecordType
+ parquetLevels *schema.ParquetLevels
+}
+
+// NewParquetScanner creates a scanner for a specific MQ topic
+// Assumption: Topic exists and has Parquet files in partition directories
+func NewParquetScanner(filerClient filer_pb.FilerClient, namespace, topicName string) (*ParquetScanner, error) {
+ // Check if filerClient is available
+ if filerClient == nil {
+ return nil, fmt.Errorf("filerClient is required but not available")
+ }
+
+ // Create topic reference
+ t := topic.Topic{
+ Namespace: namespace,
+ Name: topicName,
+ }
+
+ // Read topic configuration to get schema
+ var topicConf *mq_pb.ConfigureTopicResponse
+ var err error
+ if err := filerClient.WithFilerClient(false, func(client filer_pb.SeaweedFilerClient) error {
+ topicConf, err = t.ReadConfFile(client)
+ return err
+ }); err != nil {
+ return nil, fmt.Errorf("failed to read topic config: %v", err)
+ }
+
+ // Build complete schema with system columns
+ recordType := topicConf.GetRecordType()
+ if recordType == nil {
+ return nil, NoSchemaError{Namespace: namespace, Topic: topicName}
+ }
+
+ // Add system columns that MQ adds to all records
+ recordType = schema.NewRecordTypeBuilder(recordType).
+ WithField(SW_COLUMN_NAME_TIMESTAMP, schema.TypeInt64).
+ WithField(SW_COLUMN_NAME_KEY, schema.TypeBytes).
+ RecordTypeEnd()
+
+ // Convert to Parquet levels for efficient reading
+ parquetLevels, err := schema.ToParquetLevels(recordType)
+ if err != nil {
+ return nil, fmt.Errorf("failed to create Parquet levels: %v", err)
+ }
+
+ return &ParquetScanner{
+ filerClient: filerClient,
+ chunkCache: chunk_cache.NewChunkCacheInMemory(256), // Same as MQ logstore
+ topic: t,
+ recordSchema: recordType,
+ parquetLevels: parquetLevels,
+ }, nil
+}
+
+// ScanOptions configure how the scanner reads data
+type ScanOptions struct {
+ // Time range filtering (Unix nanoseconds)
+ StartTimeNs int64
+ StopTimeNs int64
+
+ // Column projection - if empty, select all columns
+ Columns []string
+
+ // Row limit - 0 means no limit
+ Limit int
+
+ // Predicate for WHERE clause filtering
+ Predicate func(*schema_pb.RecordValue) bool
+}
+
+// ScanResult represents a single scanned record
+type ScanResult struct {
+ Values map[string]*schema_pb.Value // Column name -> value
+ Timestamp int64 // Message timestamp (_ts_ns)
+ Key []byte // Message key (_key)
+}
+
+// Scan reads records from the topic's Parquet files
+// Assumptions:
+// 1. Scans all partitions of the topic
+// 2. Applies time filtering at Parquet level for efficiency
+// 3. Applies predicates and projections after reading
+func (ps *ParquetScanner) Scan(ctx context.Context, options ScanOptions) ([]ScanResult, error) {
+ var results []ScanResult
+
+ // Get all partitions for this topic
+ // TODO: Implement proper partition discovery
+ // For now, assume partition 0 exists
+ partitions := []topic.Partition{{RangeStart: 0, RangeStop: 1000}}
+
+ for _, partition := range partitions {
+ partitionResults, err := ps.scanPartition(ctx, partition, options)
+ if err != nil {
+ return nil, fmt.Errorf("failed to scan partition %v: %v", partition, err)
+ }
+
+ results = append(results, partitionResults...)
+
+ // Apply global limit across all partitions
+ if options.Limit > 0 && len(results) >= options.Limit {
+ results = results[:options.Limit]
+ break
+ }
+ }
+
+ return results, nil
+}
+
+// scanPartition scans a specific topic partition
+func (ps *ParquetScanner) scanPartition(ctx context.Context, partition topic.Partition, options ScanOptions) ([]ScanResult, error) {
+ // partitionDir := topic.PartitionDir(ps.topic, partition) // TODO: Use for actual file listing
+
+ var results []ScanResult
+
+ // List Parquet files in partition directory
+ // TODO: Implement proper file listing with date range filtering
+ // For now, this is a placeholder that would list actual Parquet files
+
+ // Simulate file processing - in real implementation, this would:
+ // 1. List files in partitionDir via filerClient
+ // 2. Filter files by date range if time filtering is enabled
+ // 3. Process each Parquet file in chronological order
+
+ // Placeholder: Create sample data for testing
+ if len(results) == 0 {
+ // Generate sample data for demonstration
+ sampleData := ps.generateSampleData(options)
+ results = append(results, sampleData...)
+ }
+
+ return results, nil
+}
+
+// scanParquetFile scans a single Parquet file (real implementation)
+func (ps *ParquetScanner) scanParquetFile(ctx context.Context, entry *filer_pb.Entry, options ScanOptions) ([]ScanResult, error) {
+ var results []ScanResult
+
+ // Create reader for the Parquet file (same pattern as logstore)
+ lookupFileIdFn := filer.LookupFn(ps.filerClient)
+ fileSize := filer.FileSize(entry)
+ visibleIntervals, _ := filer.NonOverlappingVisibleIntervals(ctx, lookupFileIdFn, entry.Chunks, 0, int64(fileSize))
+ chunkViews := filer.ViewFromVisibleIntervals(visibleIntervals, 0, int64(fileSize))
+ readerCache := filer.NewReaderCache(32, ps.chunkCache, lookupFileIdFn)
+ readerAt := filer.NewChunkReaderAtFromClient(ctx, readerCache, chunkViews, int64(fileSize))
+
+ // Create Parquet reader
+ parquetReader := parquet.NewReader(readerAt)
+ defer parquetReader.Close()
+
+ rows := make([]parquet.Row, 128) // Read in batches like logstore
+
+ for {
+ rowCount, readErr := parquetReader.ReadRows(rows)
+
+ // Process rows even if EOF
+ for i := 0; i < rowCount; i++ {
+ // Convert Parquet row to schema value
+ recordValue, err := schema.ToRecordValue(ps.recordSchema, ps.parquetLevels, rows[i])
+ if err != nil {
+ return nil, fmt.Errorf("failed to convert row: %v", err)
+ }
+
+ // Extract system columns
+ timestamp := recordValue.Fields[SW_COLUMN_NAME_TIMESTAMP].GetInt64Value()
+ key := recordValue.Fields[SW_COLUMN_NAME_KEY].GetBytesValue()
+
+ // Apply time filtering
+ if options.StartTimeNs > 0 && timestamp < options.StartTimeNs {
+ continue
+ }
+ if options.StopTimeNs > 0 && timestamp >= options.StopTimeNs {
+ break // Assume data is time-ordered
+ }
+
+ // Apply predicate filtering (WHERE clause)
+ if options.Predicate != nil && !options.Predicate(recordValue) {
+ continue
+ }
+
+ // Apply column projection
+ values := make(map[string]*schema_pb.Value)
+ if len(options.Columns) == 0 {
+ // Select all columns (excluding system columns from user view)
+ for name, value := range recordValue.Fields {
+ if name != SW_COLUMN_NAME_TIMESTAMP && name != SW_COLUMN_NAME_KEY {
+ values[name] = value
+ }
+ }
+ } else {
+ // Select specified columns only
+ for _, columnName := range options.Columns {
+ if value, exists := recordValue.Fields[columnName]; exists {
+ values[columnName] = value
+ }
+ }
+ }
+
+ results = append(results, ScanResult{
+ Values: values,
+ Timestamp: timestamp,
+ Key: key,
+ })
+
+ // Apply row limit
+ if options.Limit > 0 && len(results) >= options.Limit {
+ return results, nil
+ }
+ }
+
+ if readErr != nil {
+ break // EOF or error
+ }
+ }
+
+ return results, nil
+}
+
+// generateSampleData creates sample data for testing when no real Parquet files exist
+func (ps *ParquetScanner) generateSampleData(options ScanOptions) []ScanResult {
+ now := time.Now().UnixNano()
+
+ sampleData := []ScanResult{
+ {
+ Values: map[string]*schema_pb.Value{
+ "user_id": {Kind: &schema_pb.Value_Int32Value{Int32Value: 1001}},
+ "event_type": {Kind: &schema_pb.Value_StringValue{StringValue: "login"}},
+ "data": {Kind: &schema_pb.Value_StringValue{StringValue: `{"ip": "192.168.1.1"}`}},
+ },
+ Timestamp: now - 3600000000000, // 1 hour ago
+ Key: []byte("user-1001"),
+ },
+ {
+ Values: map[string]*schema_pb.Value{
+ "user_id": {Kind: &schema_pb.Value_Int32Value{Int32Value: 1002}},
+ "event_type": {Kind: &schema_pb.Value_StringValue{StringValue: "page_view"}},
+ "data": {Kind: &schema_pb.Value_StringValue{StringValue: `{"page": "/dashboard"}`}},
+ },
+ Timestamp: now - 1800000000000, // 30 minutes ago
+ Key: []byte("user-1002"),
+ },
+ {
+ Values: map[string]*schema_pb.Value{
+ "user_id": {Kind: &schema_pb.Value_Int32Value{Int32Value: 1001}},
+ "event_type": {Kind: &schema_pb.Value_StringValue{StringValue: "logout"}},
+ "data": {Kind: &schema_pb.Value_StringValue{StringValue: `{"session_duration": 3600}`}},
+ },
+ Timestamp: now - 900000000000, // 15 minutes ago
+ Key: []byte("user-1001"),
+ },
+ }
+
+ // Apply predicate filtering if specified
+ if options.Predicate != nil {
+ var filtered []ScanResult
+ for _, result := range sampleData {
+ // Convert to RecordValue for predicate testing
+ recordValue := &schema_pb.RecordValue{Fields: make(map[string]*schema_pb.Value)}
+ for k, v := range result.Values {
+ recordValue.Fields[k] = v
+ }
+ recordValue.Fields[SW_COLUMN_NAME_TIMESTAMP] = &schema_pb.Value{Kind: &schema_pb.Value_Int64Value{Int64Value: result.Timestamp}}
+ recordValue.Fields[SW_COLUMN_NAME_KEY] = &schema_pb.Value{Kind: &schema_pb.Value_BytesValue{BytesValue: result.Key}}
+
+ if options.Predicate(recordValue) {
+ filtered = append(filtered, result)
+ }
+ }
+ sampleData = filtered
+ }
+
+ // Apply limit
+ if options.Limit > 0 && len(sampleData) > options.Limit {
+ sampleData = sampleData[:options.Limit]
+ }
+
+ return sampleData
+}
+
+// ConvertToSQLResult converts ScanResults to SQL query results
+func (ps *ParquetScanner) ConvertToSQLResult(results []ScanResult, columns []string) *QueryResult {
+ if len(results) == 0 {
+ return &QueryResult{
+ Columns: columns,
+ Rows: [][]sqltypes.Value{},
+ }
+ }
+
+ // Determine columns if not specified
+ if len(columns) == 0 {
+ columnSet := make(map[string]bool)
+ for _, result := range results {
+ for columnName := range result.Values {
+ columnSet[columnName] = true
+ }
+ }
+
+ columns = make([]string, 0, len(columnSet))
+ for columnName := range columnSet {
+ columns = append(columns, columnName)
+ }
+ }
+
+ // Convert to SQL rows
+ rows := make([][]sqltypes.Value, len(results))
+ for i, result := range results {
+ row := make([]sqltypes.Value, len(columns))
+ for j, columnName := range columns {
+ if value, exists := result.Values[columnName]; exists {
+ row[j] = convertSchemaValueToSQL(value)
+ } else {
+ row[j] = sqltypes.NULL
+ }
+ }
+ rows[i] = row
+ }
+
+ return &QueryResult{
+ Columns: columns,
+ Rows: rows,
+ }
+}
+
+// convertSchemaValueToSQL converts schema_pb.Value to sqltypes.Value
+func convertSchemaValueToSQL(value *schema_pb.Value) sqltypes.Value {
+ if value == nil {
+ return sqltypes.NULL
+ }
+
+ switch v := value.Kind.(type) {
+ case *schema_pb.Value_BoolValue:
+ if v.BoolValue {
+ return sqltypes.NewInt32(1)
+ }
+ return sqltypes.NewInt32(0)
+ case *schema_pb.Value_Int32Value:
+ return sqltypes.NewInt32(v.Int32Value)
+ case *schema_pb.Value_Int64Value:
+ return sqltypes.NewInt64(v.Int64Value)
+ case *schema_pb.Value_FloatValue:
+ return sqltypes.NewFloat32(v.FloatValue)
+ case *schema_pb.Value_DoubleValue:
+ return sqltypes.NewFloat64(v.DoubleValue)
+ case *schema_pb.Value_BytesValue:
+ return sqltypes.NewVarBinary(string(v.BytesValue))
+ case *schema_pb.Value_StringValue:
+ return sqltypes.NewVarChar(v.StringValue)
+ // Parquet logical types
+ case *schema_pb.Value_TimestampValue:
+ timestampValue := value.GetTimestampValue()
+ if timestampValue == nil {
+ return sqltypes.NULL
+ }
+ // Convert microseconds to time.Time and format as datetime string
+ timestamp := time.UnixMicro(timestampValue.TimestampMicros)
+ return sqltypes.MakeTrusted(sqltypes.Datetime, []byte(timestamp.Format("2006-01-02 15:04:05")))
+ case *schema_pb.Value_DateValue:
+ dateValue := value.GetDateValue()
+ if dateValue == nil {
+ return sqltypes.NULL
+ }
+ // Convert days since epoch to date string
+ date := time.Unix(int64(dateValue.DaysSinceEpoch)*86400, 0).UTC()
+ return sqltypes.MakeTrusted(sqltypes.Date, []byte(date.Format("2006-01-02")))
+ case *schema_pb.Value_DecimalValue:
+ decimalValue := value.GetDecimalValue()
+ if decimalValue == nil {
+ return sqltypes.NULL
+ }
+ // Convert decimal bytes to string representation
+ decimalStr := decimalToStringHelper(decimalValue)
+ return sqltypes.MakeTrusted(sqltypes.Decimal, []byte(decimalStr))
+ case *schema_pb.Value_TimeValue:
+ timeValue := value.GetTimeValue()
+ if timeValue == nil {
+ return sqltypes.NULL
+ }
+ // Convert microseconds since midnight to time string
+ duration := time.Duration(timeValue.TimeMicros) * time.Microsecond
+ timeOfDay := time.Date(0, 1, 1, 0, 0, 0, 0, time.UTC).Add(duration)
+ return sqltypes.MakeTrusted(sqltypes.Time, []byte(timeOfDay.Format("15:04:05")))
+ default:
+ return sqltypes.NewVarChar(fmt.Sprintf("%v", value))
+ }
+}
+
+// decimalToStringHelper converts a DecimalValue to string representation
+// This is a standalone version of the engine's decimalToString method
+func decimalToStringHelper(decimalValue *schema_pb.DecimalValue) string {
+ if decimalValue == nil || decimalValue.Value == nil {
+ return "0"
+ }
+
+ // Convert bytes back to big.Int
+ intValue := new(big.Int).SetBytes(decimalValue.Value)
+
+ // Convert to string with proper decimal placement
+ str := intValue.String()
+
+ // Handle decimal placement based on scale
+ scale := int(decimalValue.Scale)
+ if scale > 0 && len(str) > scale {
+ // Insert decimal point
+ decimalPos := len(str) - scale
+ return str[:decimalPos] + "." + str[decimalPos:]
+ }
+
+ return str
+}