aboutsummaryrefslogtreecommitdiff
path: root/weed/query/engine/hybrid_message_scanner.go
diff options
context:
space:
mode:
Diffstat (limited to 'weed/query/engine/hybrid_message_scanner.go')
-rw-r--r--weed/query/engine/hybrid_message_scanner.go1668
1 files changed, 1668 insertions, 0 deletions
diff --git a/weed/query/engine/hybrid_message_scanner.go b/weed/query/engine/hybrid_message_scanner.go
new file mode 100644
index 000000000..2584b54a6
--- /dev/null
+++ b/weed/query/engine/hybrid_message_scanner.go
@@ -0,0 +1,1668 @@
+package engine
+
+import (
+ "container/heap"
+ "context"
+ "encoding/json"
+ "fmt"
+ "io"
+ "strconv"
+ "strings"
+ "sync"
+ "sync/atomic"
+ "time"
+
+ "github.com/parquet-go/parquet-go"
+ "github.com/seaweedfs/seaweedfs/weed/filer"
+ "github.com/seaweedfs/seaweedfs/weed/mq/logstore"
+ "github.com/seaweedfs/seaweedfs/weed/mq/schema"
+ "github.com/seaweedfs/seaweedfs/weed/mq/topic"
+ "github.com/seaweedfs/seaweedfs/weed/pb/filer_pb"
+ "github.com/seaweedfs/seaweedfs/weed/pb/mq_pb"
+ "github.com/seaweedfs/seaweedfs/weed/pb/schema_pb"
+ "github.com/seaweedfs/seaweedfs/weed/query/sqltypes"
+ "github.com/seaweedfs/seaweedfs/weed/util"
+ "github.com/seaweedfs/seaweedfs/weed/util/chunk_cache"
+ "github.com/seaweedfs/seaweedfs/weed/util/log_buffer"
+ "github.com/seaweedfs/seaweedfs/weed/wdclient"
+ "google.golang.org/protobuf/proto"
+)
+
+// HybridMessageScanner scans from ALL data sources:
+// Architecture:
+// 1. Unflushed in-memory data from brokers (mq_pb.DataMessage format) - REAL-TIME
+// 2. Recent/live messages in log files (filer_pb.LogEntry format) - FLUSHED
+// 3. Older messages in Parquet files (schema_pb.RecordValue format) - ARCHIVED
+// 4. Seamlessly merges data from all sources chronologically
+// 5. Provides complete real-time view of all messages in a topic
+type HybridMessageScanner struct {
+ filerClient filer_pb.FilerClient
+ brokerClient BrokerClientInterface // For querying unflushed data
+ topic topic.Topic
+ recordSchema *schema_pb.RecordType
+ parquetLevels *schema.ParquetLevels
+ engine *SQLEngine // Reference for system column formatting
+}
+
+// NewHybridMessageScanner creates a scanner that reads from all data sources
+// This provides complete real-time message coverage including unflushed data
+func NewHybridMessageScanner(filerClient filer_pb.FilerClient, brokerClient BrokerClientInterface, namespace, topicName string, engine *SQLEngine) (*HybridMessageScanner, error) {
+ // Check if filerClient is available
+ if filerClient == nil {
+ return nil, fmt.Errorf("filerClient is required but not available")
+ }
+
+ // Create topic reference
+ t := topic.Topic{
+ Namespace: namespace,
+ Name: topicName,
+ }
+
+ // Get topic schema from broker client (works with both real and mock clients)
+ recordType, err := brokerClient.GetTopicSchema(context.Background(), namespace, topicName)
+ if err != nil {
+ return nil, fmt.Errorf("failed to get topic schema: %v", err)
+ }
+ if recordType == nil {
+ return nil, NoSchemaError{Namespace: namespace, Topic: topicName}
+ }
+
+ // Create a copy of the recordType to avoid modifying the original
+ recordTypeCopy := &schema_pb.RecordType{
+ Fields: make([]*schema_pb.Field, len(recordType.Fields)),
+ }
+ copy(recordTypeCopy.Fields, recordType.Fields)
+
+ // Add system columns that MQ adds to all records
+ recordType = schema.NewRecordTypeBuilder(recordTypeCopy).
+ WithField(SW_COLUMN_NAME_TIMESTAMP, schema.TypeInt64).
+ WithField(SW_COLUMN_NAME_KEY, schema.TypeBytes).
+ RecordTypeEnd()
+
+ // Convert to Parquet levels for efficient reading
+ parquetLevels, err := schema.ToParquetLevels(recordType)
+ if err != nil {
+ return nil, fmt.Errorf("failed to create Parquet levels: %v", err)
+ }
+
+ return &HybridMessageScanner{
+ filerClient: filerClient,
+ brokerClient: brokerClient,
+ topic: t,
+ recordSchema: recordType,
+ parquetLevels: parquetLevels,
+ engine: engine,
+ }, nil
+}
+
+// HybridScanOptions configure how the scanner reads from both live and archived data
+type HybridScanOptions struct {
+ // Time range filtering (Unix nanoseconds)
+ StartTimeNs int64
+ StopTimeNs int64
+
+ // Column projection - if empty, select all columns
+ Columns []string
+
+ // Row limit - 0 means no limit
+ Limit int
+
+ // Row offset - 0 means no offset
+ Offset int
+
+ // Predicate for WHERE clause filtering
+ Predicate func(*schema_pb.RecordValue) bool
+}
+
+// HybridScanResult represents a message from either live logs or Parquet files
+type HybridScanResult struct {
+ Values map[string]*schema_pb.Value // Column name -> value
+ Timestamp int64 // Message timestamp (_ts_ns)
+ Key []byte // Message key (_key)
+ Source string // "live_log" or "parquet_archive" or "in_memory_broker"
+}
+
+// HybridScanStats contains statistics about data sources scanned
+type HybridScanStats struct {
+ BrokerBufferQueried bool
+ BrokerBufferMessages int
+ BufferStartIndex int64
+ PartitionsScanned int
+ LiveLogFilesScanned int // Number of live log files processed
+}
+
+// ParquetColumnStats holds statistics for a single column from parquet metadata
+type ParquetColumnStats struct {
+ ColumnName string
+ MinValue *schema_pb.Value
+ MaxValue *schema_pb.Value
+ NullCount int64
+ RowCount int64
+}
+
+// ParquetFileStats holds aggregated statistics for a parquet file
+type ParquetFileStats struct {
+ FileName string
+ RowCount int64
+ ColumnStats map[string]*ParquetColumnStats
+}
+
+// StreamingDataSource provides a streaming interface for reading scan results
+type StreamingDataSource interface {
+ Next() (*HybridScanResult, error) // Returns next result or nil when done
+ HasMore() bool // Returns true if more data available
+ Close() error // Clean up resources
+}
+
+// StreamingMergeItem represents an item in the priority queue for streaming merge
+type StreamingMergeItem struct {
+ Result *HybridScanResult
+ SourceID int
+ DataSource StreamingDataSource
+}
+
+// StreamingMergeHeap implements heap.Interface for merging sorted streams by timestamp
+type StreamingMergeHeap []*StreamingMergeItem
+
+func (h StreamingMergeHeap) Len() int { return len(h) }
+
+func (h StreamingMergeHeap) Less(i, j int) bool {
+ // Sort by timestamp (ascending order)
+ return h[i].Result.Timestamp < h[j].Result.Timestamp
+}
+
+func (h StreamingMergeHeap) Swap(i, j int) { h[i], h[j] = h[j], h[i] }
+
+func (h *StreamingMergeHeap) Push(x interface{}) {
+ *h = append(*h, x.(*StreamingMergeItem))
+}
+
+func (h *StreamingMergeHeap) Pop() interface{} {
+ old := *h
+ n := len(old)
+ item := old[n-1]
+ *h = old[0 : n-1]
+ return item
+}
+
+// Scan reads messages from both live logs and archived Parquet files
+// Uses SeaweedFS MQ's GenMergedReadFunc for seamless integration
+// Assumptions:
+// 1. Chronologically merges live and archived data
+// 2. Applies filtering at the lowest level for efficiency
+// 3. Handles schema evolution transparently
+func (hms *HybridMessageScanner) Scan(ctx context.Context, options HybridScanOptions) ([]HybridScanResult, error) {
+ results, _, err := hms.ScanWithStats(ctx, options)
+ return results, err
+}
+
+// ScanWithStats reads messages and returns scan statistics for execution plans
+func (hms *HybridMessageScanner) ScanWithStats(ctx context.Context, options HybridScanOptions) ([]HybridScanResult, *HybridScanStats, error) {
+ var results []HybridScanResult
+ stats := &HybridScanStats{}
+
+ // Get all partitions for this topic via MQ broker discovery
+ partitions, err := hms.discoverTopicPartitions(ctx)
+ if err != nil {
+ return nil, stats, fmt.Errorf("failed to discover partitions for topic %s: %v", hms.topic.String(), err)
+ }
+
+ stats.PartitionsScanned = len(partitions)
+
+ for _, partition := range partitions {
+ partitionResults, partitionStats, err := hms.scanPartitionHybridWithStats(ctx, partition, options)
+ if err != nil {
+ return nil, stats, fmt.Errorf("failed to scan partition %v: %v", partition, err)
+ }
+
+ results = append(results, partitionResults...)
+
+ // Aggregate broker buffer stats
+ if partitionStats != nil {
+ if partitionStats.BrokerBufferQueried {
+ stats.BrokerBufferQueried = true
+ }
+ stats.BrokerBufferMessages += partitionStats.BrokerBufferMessages
+ if partitionStats.BufferStartIndex > 0 && (stats.BufferStartIndex == 0 || partitionStats.BufferStartIndex < stats.BufferStartIndex) {
+ stats.BufferStartIndex = partitionStats.BufferStartIndex
+ }
+ }
+
+ // Apply global limit (without offset) across all partitions
+ // When OFFSET is used, collect more data to ensure we have enough after skipping
+ // Note: OFFSET will be applied at the end to avoid double-application
+ if options.Limit > 0 {
+ // Collect exact amount needed: LIMIT + OFFSET (no excessive doubling)
+ minRequired := options.Limit + options.Offset
+ // Small buffer only when needed to handle edge cases in distributed scanning
+ if options.Offset > 0 && minRequired < 10 {
+ minRequired = minRequired + 1 // Add 1 extra row buffer, not doubling
+ }
+ if len(results) >= minRequired {
+ break
+ }
+ }
+ }
+
+ // Apply final OFFSET and LIMIT processing (done once at the end)
+ // Limit semantics: -1 = no limit, 0 = LIMIT 0 (empty), >0 = limit to N rows
+ if options.Offset > 0 || options.Limit >= 0 {
+ // Handle LIMIT 0 special case first
+ if options.Limit == 0 {
+ return []HybridScanResult{}, stats, nil
+ }
+
+ // Apply OFFSET first
+ if options.Offset > 0 {
+ if options.Offset >= len(results) {
+ results = []HybridScanResult{}
+ } else {
+ results = results[options.Offset:]
+ }
+ }
+
+ // Apply LIMIT after OFFSET (only if limit > 0)
+ if options.Limit > 0 && len(results) > options.Limit {
+ results = results[:options.Limit]
+ }
+ }
+
+ return results, stats, nil
+}
+
+// scanUnflushedData queries brokers for unflushed in-memory data using buffer_start deduplication
+func (hms *HybridMessageScanner) scanUnflushedData(ctx context.Context, partition topic.Partition, options HybridScanOptions) ([]HybridScanResult, error) {
+ results, _, err := hms.scanUnflushedDataWithStats(ctx, partition, options)
+ return results, err
+}
+
+// scanUnflushedDataWithStats queries brokers for unflushed data and returns statistics
+func (hms *HybridMessageScanner) scanUnflushedDataWithStats(ctx context.Context, partition topic.Partition, options HybridScanOptions) ([]HybridScanResult, *HybridScanStats, error) {
+ var results []HybridScanResult
+ stats := &HybridScanStats{}
+
+ // Skip if no broker client available
+ if hms.brokerClient == nil {
+ return results, stats, nil
+ }
+
+ // Mark that we attempted to query broker buffer
+ stats.BrokerBufferQueried = true
+
+ // Step 1: Get unflushed data from broker using buffer_start-based method
+ // This method uses buffer_start metadata to avoid double-counting with exact precision
+ unflushedEntries, err := hms.brokerClient.GetUnflushedMessages(ctx, hms.topic.Namespace, hms.topic.Name, partition, options.StartTimeNs)
+ if err != nil {
+ // Log error but don't fail the query - continue with disk data only
+ if isDebugMode(ctx) {
+ fmt.Printf("Debug: Failed to get unflushed messages: %v\n", err)
+ }
+ // Reset queried flag on error
+ stats.BrokerBufferQueried = false
+ return results, stats, nil
+ }
+
+ // Capture stats for EXPLAIN
+ stats.BrokerBufferMessages = len(unflushedEntries)
+
+ // Debug logging for EXPLAIN mode
+ if isDebugMode(ctx) {
+ fmt.Printf("Debug: Broker buffer queried - found %d unflushed messages\n", len(unflushedEntries))
+ if len(unflushedEntries) > 0 {
+ fmt.Printf("Debug: Using buffer_start deduplication for precise real-time data\n")
+ }
+ }
+
+ // Step 2: Process unflushed entries (already deduplicated by broker)
+ for _, logEntry := range unflushedEntries {
+ // Skip control entries without actual data
+ if hms.isControlEntry(logEntry) {
+ continue // Skip this entry
+ }
+
+ // Skip messages outside time range
+ if options.StartTimeNs > 0 && logEntry.TsNs < options.StartTimeNs {
+ continue
+ }
+ if options.StopTimeNs > 0 && logEntry.TsNs > options.StopTimeNs {
+ continue
+ }
+
+ // Convert LogEntry to RecordValue format (same as disk data)
+ recordValue, _, err := hms.convertLogEntryToRecordValue(logEntry)
+ if err != nil {
+ if isDebugMode(ctx) {
+ fmt.Printf("Debug: Failed to convert unflushed log entry: %v\n", err)
+ }
+ continue // Skip malformed messages
+ }
+
+ // Apply predicate filter if provided
+ if options.Predicate != nil && !options.Predicate(recordValue) {
+ continue
+ }
+
+ // Extract system columns for result
+ timestamp := recordValue.Fields[SW_COLUMN_NAME_TIMESTAMP].GetInt64Value()
+ key := recordValue.Fields[SW_COLUMN_NAME_KEY].GetBytesValue()
+
+ // Apply column projection
+ values := make(map[string]*schema_pb.Value)
+ if len(options.Columns) == 0 {
+ // Select all columns (excluding system columns from user view)
+ for name, value := range recordValue.Fields {
+ if name != SW_COLUMN_NAME_TIMESTAMP && name != SW_COLUMN_NAME_KEY {
+ values[name] = value
+ }
+ }
+ } else {
+ // Select specified columns only
+ for _, columnName := range options.Columns {
+ if value, exists := recordValue.Fields[columnName]; exists {
+ values[columnName] = value
+ }
+ }
+ }
+
+ // Create result with proper source tagging
+ result := HybridScanResult{
+ Values: values,
+ Timestamp: timestamp,
+ Key: key,
+ Source: "live_log", // Data from broker's unflushed messages
+ }
+
+ results = append(results, result)
+
+ // Apply limit (accounting for offset) - collect exact amount needed
+ if options.Limit > 0 {
+ // Collect exact amount needed: LIMIT + OFFSET (no excessive doubling)
+ minRequired := options.Limit + options.Offset
+ // Small buffer only when needed to handle edge cases in message streaming
+ if options.Offset > 0 && minRequired < 10 {
+ minRequired = minRequired + 1 // Add 1 extra row buffer, not doubling
+ }
+ if len(results) >= minRequired {
+ break
+ }
+ }
+ }
+
+ if isDebugMode(ctx) {
+ fmt.Printf("Debug: Retrieved %d unflushed messages from broker\n", len(results))
+ }
+
+ return results, stats, nil
+}
+
+// convertDataMessageToRecord converts mq_pb.DataMessage to schema_pb.RecordValue
+func (hms *HybridMessageScanner) convertDataMessageToRecord(msg *mq_pb.DataMessage) (*schema_pb.RecordValue, string, error) {
+ // Parse the message data as RecordValue
+ recordValue := &schema_pb.RecordValue{}
+ if err := proto.Unmarshal(msg.Value, recordValue); err != nil {
+ return nil, "", fmt.Errorf("failed to unmarshal message data: %v", err)
+ }
+
+ // Add system columns
+ if recordValue.Fields == nil {
+ recordValue.Fields = make(map[string]*schema_pb.Value)
+ }
+
+ // Add timestamp
+ recordValue.Fields[SW_COLUMN_NAME_TIMESTAMP] = &schema_pb.Value{
+ Kind: &schema_pb.Value_Int64Value{Int64Value: msg.TsNs},
+ }
+
+ return recordValue, string(msg.Key), nil
+}
+
+// discoverTopicPartitions discovers the actual partitions for this topic by scanning the filesystem
+// This finds real partition directories like v2025-09-01-07-16-34/0000-0630/
+func (hms *HybridMessageScanner) discoverTopicPartitions(ctx context.Context) ([]topic.Partition, error) {
+ if hms.filerClient == nil {
+ return nil, fmt.Errorf("filerClient not available for partition discovery")
+ }
+
+ var allPartitions []topic.Partition
+ var err error
+
+ // Scan the topic directory for actual partition versions (timestamped directories)
+ // List all version directories in the topic directory
+ err = filer_pb.ReadDirAllEntries(ctx, hms.filerClient, util.FullPath(hms.topic.Dir()), "", func(versionEntry *filer_pb.Entry, isLast bool) error {
+ if !versionEntry.IsDirectory {
+ return nil // Skip non-directories
+ }
+
+ // Parse version timestamp from directory name (e.g., "v2025-09-01-07-16-34")
+ versionTime, parseErr := topic.ParseTopicVersion(versionEntry.Name)
+ if parseErr != nil {
+ // Skip directories that don't match the version format
+ return nil
+ }
+
+ // Scan partition directories within this version
+ versionDir := fmt.Sprintf("%s/%s", hms.topic.Dir(), versionEntry.Name)
+ return filer_pb.ReadDirAllEntries(ctx, hms.filerClient, util.FullPath(versionDir), "", func(partitionEntry *filer_pb.Entry, isLast bool) error {
+ if !partitionEntry.IsDirectory {
+ return nil // Skip non-directories
+ }
+
+ // Parse partition boundary from directory name (e.g., "0000-0630")
+ rangeStart, rangeStop := topic.ParsePartitionBoundary(partitionEntry.Name)
+ if rangeStart == rangeStop {
+ return nil // Skip invalid partition names
+ }
+
+ // Create partition object
+ partition := topic.Partition{
+ RangeStart: rangeStart,
+ RangeStop: rangeStop,
+ RingSize: topic.PartitionCount,
+ UnixTimeNs: versionTime.UnixNano(),
+ }
+
+ allPartitions = append(allPartitions, partition)
+ return nil
+ })
+ })
+
+ if err != nil {
+ return nil, fmt.Errorf("failed to scan topic directory for partitions: %v", err)
+ }
+
+ // If no partitions found, return empty slice (valid for newly created or empty topics)
+ if len(allPartitions) == 0 {
+ fmt.Printf("No partitions found for topic %s - returning empty result set\n", hms.topic.String())
+ return []topic.Partition{}, nil
+ }
+
+ fmt.Printf("Discovered %d partitions for topic %s\n", len(allPartitions), hms.topic.String())
+ return allPartitions, nil
+}
+
+// scanPartitionHybrid scans a specific partition using the hybrid approach
+// This is where the magic happens - seamlessly reading ALL data sources:
+// 1. Unflushed in-memory data from brokers (REAL-TIME)
+// 2. Live logs + Parquet files from disk (FLUSHED/ARCHIVED)
+func (hms *HybridMessageScanner) scanPartitionHybrid(ctx context.Context, partition topic.Partition, options HybridScanOptions) ([]HybridScanResult, error) {
+ results, _, err := hms.scanPartitionHybridWithStats(ctx, partition, options)
+ return results, err
+}
+
+// scanPartitionHybridWithStats scans a specific partition using streaming merge for memory efficiency
+// PERFORMANCE IMPROVEMENT: Uses heap-based streaming merge instead of collecting all data and sorting
+// - Memory usage: O(k) where k = number of data sources, instead of O(n) where n = total records
+// - Scalable: Can handle large topics without LIMIT clauses efficiently
+// - Streaming: Processes data as it arrives rather than buffering everything
+func (hms *HybridMessageScanner) scanPartitionHybridWithStats(ctx context.Context, partition topic.Partition, options HybridScanOptions) ([]HybridScanResult, *HybridScanStats, error) {
+ stats := &HybridScanStats{}
+
+ // STEP 1: Scan unflushed in-memory data from brokers (REAL-TIME)
+ unflushedResults, unflushedStats, err := hms.scanUnflushedDataWithStats(ctx, partition, options)
+ if err != nil {
+ // Don't fail the query if broker scanning fails, but provide clear warning to user
+ // This ensures users are aware that results may not include the most recent data
+ if isDebugMode(ctx) {
+ fmt.Printf("Debug: Failed to scan unflushed data from broker: %v\n", err)
+ } else {
+ fmt.Printf("Warning: Unable to access real-time data from message broker: %v\n", err)
+ fmt.Printf("Note: Query results may not include the most recent unflushed messages\n")
+ }
+ } else if unflushedStats != nil {
+ stats.BrokerBufferQueried = unflushedStats.BrokerBufferQueried
+ stats.BrokerBufferMessages = unflushedStats.BrokerBufferMessages
+ stats.BufferStartIndex = unflushedStats.BufferStartIndex
+ }
+
+ // Count live log files for statistics
+ liveLogCount, err := hms.countLiveLogFiles(partition)
+ if err != nil {
+ // Don't fail the query, just log warning
+ fmt.Printf("Warning: Failed to count live log files: %v\n", err)
+ liveLogCount = 0
+ }
+ stats.LiveLogFilesScanned = liveLogCount
+
+ // STEP 2: Create streaming data sources for memory-efficient merge
+ var dataSources []StreamingDataSource
+
+ // Add unflushed data source (if we have unflushed results)
+ if len(unflushedResults) > 0 {
+ // Sort unflushed results by timestamp before creating stream
+ if len(unflushedResults) > 1 {
+ hms.mergeSort(unflushedResults, 0, len(unflushedResults)-1)
+ }
+ dataSources = append(dataSources, NewSliceDataSource(unflushedResults))
+ }
+
+ // Add streaming flushed data source (live logs + Parquet files)
+ flushedDataSource := NewStreamingFlushedDataSource(hms, partition, options)
+ dataSources = append(dataSources, flushedDataSource)
+
+ // STEP 3: Use streaming merge for memory-efficient chronological ordering
+ var results []HybridScanResult
+ if len(dataSources) > 0 {
+ // Calculate how many rows we need to collect during scanning (before OFFSET/LIMIT)
+ // For LIMIT N OFFSET M, we need to collect at least N+M rows
+ scanLimit := options.Limit
+ if options.Limit > 0 && options.Offset > 0 {
+ scanLimit = options.Limit + options.Offset
+ }
+
+ mergedResults, err := hms.streamingMerge(dataSources, scanLimit)
+ if err != nil {
+ return nil, stats, fmt.Errorf("streaming merge failed: %v", err)
+ }
+ results = mergedResults
+ }
+
+ return results, stats, nil
+}
+
+// countLiveLogFiles counts the number of live log files in a partition for statistics
+func (hms *HybridMessageScanner) countLiveLogFiles(partition topic.Partition) (int, error) {
+ partitionDir := topic.PartitionDir(hms.topic, partition)
+
+ var fileCount int
+ err := hms.filerClient.WithFilerClient(false, func(client filer_pb.SeaweedFilerClient) error {
+ // List all files in partition directory
+ request := &filer_pb.ListEntriesRequest{
+ Directory: partitionDir,
+ Prefix: "",
+ StartFromFileName: "",
+ InclusiveStartFrom: true,
+ Limit: 10000, // reasonable limit for counting
+ }
+
+ stream, err := client.ListEntries(context.Background(), request)
+ if err != nil {
+ return err
+ }
+
+ for {
+ resp, err := stream.Recv()
+ if err == io.EOF {
+ break
+ }
+ if err != nil {
+ return err
+ }
+
+ // Count files that are not .parquet files (live log files)
+ // Live log files typically have timestamps or are named like log files
+ fileName := resp.Entry.Name
+ if !strings.HasSuffix(fileName, ".parquet") &&
+ !strings.HasSuffix(fileName, ".offset") &&
+ len(resp.Entry.Chunks) > 0 { // Has actual content
+ fileCount++
+ }
+ }
+
+ return nil
+ })
+
+ if err != nil {
+ return 0, err
+ }
+ return fileCount, nil
+}
+
+// isControlEntry checks if a log entry is a control entry without actual data
+// Based on MQ system analysis, control entries are:
+// 1. DataMessages with populated Ctrl field (publisher close signals)
+// 2. Entries with empty keys (as filtered by subscriber)
+// 3. Entries with no data
+func (hms *HybridMessageScanner) isControlEntry(logEntry *filer_pb.LogEntry) bool {
+ // Skip entries with no data
+ if len(logEntry.Data) == 0 {
+ return true
+ }
+
+ // Skip entries with empty keys (same logic as subscriber)
+ if len(logEntry.Key) == 0 {
+ return true
+ }
+
+ // Check if this is a DataMessage with control field populated
+ dataMessage := &mq_pb.DataMessage{}
+ if err := proto.Unmarshal(logEntry.Data, dataMessage); err == nil {
+ // If it has a control field, it's a control message
+ if dataMessage.Ctrl != nil {
+ return true
+ }
+ }
+
+ return false
+}
+
+// convertLogEntryToRecordValue converts a filer_pb.LogEntry to schema_pb.RecordValue
+// This handles both:
+// 1. Live log entries (raw message format)
+// 2. Parquet entries (already in schema_pb.RecordValue format)
+func (hms *HybridMessageScanner) convertLogEntryToRecordValue(logEntry *filer_pb.LogEntry) (*schema_pb.RecordValue, string, error) {
+ // Try to unmarshal as RecordValue first (Parquet format)
+ recordValue := &schema_pb.RecordValue{}
+ if err := proto.Unmarshal(logEntry.Data, recordValue); err == nil {
+ // This is an archived message from Parquet files
+ // FIX: Add system columns from LogEntry to RecordValue
+ if recordValue.Fields == nil {
+ recordValue.Fields = make(map[string]*schema_pb.Value)
+ }
+
+ // Add system columns from LogEntry
+ recordValue.Fields[SW_COLUMN_NAME_TIMESTAMP] = &schema_pb.Value{
+ Kind: &schema_pb.Value_Int64Value{Int64Value: logEntry.TsNs},
+ }
+ recordValue.Fields[SW_COLUMN_NAME_KEY] = &schema_pb.Value{
+ Kind: &schema_pb.Value_BytesValue{BytesValue: logEntry.Key},
+ }
+
+ return recordValue, "parquet_archive", nil
+ }
+
+ // If not a RecordValue, this is raw live message data - parse with schema
+ return hms.parseRawMessageWithSchema(logEntry)
+}
+
+// parseRawMessageWithSchema parses raw live message data using the topic's schema
+// This provides proper type conversion and field mapping instead of treating everything as strings
+func (hms *HybridMessageScanner) parseRawMessageWithSchema(logEntry *filer_pb.LogEntry) (*schema_pb.RecordValue, string, error) {
+ recordValue := &schema_pb.RecordValue{
+ Fields: make(map[string]*schema_pb.Value),
+ }
+
+ // Add system columns (always present)
+ recordValue.Fields[SW_COLUMN_NAME_TIMESTAMP] = &schema_pb.Value{
+ Kind: &schema_pb.Value_Int64Value{Int64Value: logEntry.TsNs},
+ }
+ recordValue.Fields[SW_COLUMN_NAME_KEY] = &schema_pb.Value{
+ Kind: &schema_pb.Value_BytesValue{BytesValue: logEntry.Key},
+ }
+
+ // Parse message data based on schema
+ if hms.recordSchema == nil || len(hms.recordSchema.Fields) == 0 {
+ // Fallback: No schema available, treat as single "data" field
+ recordValue.Fields["data"] = &schema_pb.Value{
+ Kind: &schema_pb.Value_StringValue{StringValue: string(logEntry.Data)},
+ }
+ return recordValue, "live_log", nil
+ }
+
+ // Attempt schema-aware parsing
+ // Strategy 1: Try JSON parsing first (most common for live messages)
+ if parsedRecord, err := hms.parseJSONMessage(logEntry.Data); err == nil {
+ // Successfully parsed as JSON, merge with system columns
+ for fieldName, fieldValue := range parsedRecord.Fields {
+ recordValue.Fields[fieldName] = fieldValue
+ }
+ return recordValue, "live_log", nil
+ }
+
+ // Strategy 2: Try protobuf parsing (binary messages)
+ if parsedRecord, err := hms.parseProtobufMessage(logEntry.Data); err == nil {
+ // Successfully parsed as protobuf, merge with system columns
+ for fieldName, fieldValue := range parsedRecord.Fields {
+ recordValue.Fields[fieldName] = fieldValue
+ }
+ return recordValue, "live_log", nil
+ }
+
+ // Strategy 3: Fallback to single field with raw data
+ // If schema has a single field, map the raw data to it with type conversion
+ if len(hms.recordSchema.Fields) == 1 {
+ field := hms.recordSchema.Fields[0]
+ convertedValue, err := hms.convertRawDataToSchemaValue(logEntry.Data, field.Type)
+ if err == nil {
+ recordValue.Fields[field.Name] = convertedValue
+ return recordValue, "live_log", nil
+ }
+ }
+
+ // Final fallback: treat as string data field
+ recordValue.Fields["data"] = &schema_pb.Value{
+ Kind: &schema_pb.Value_StringValue{StringValue: string(logEntry.Data)},
+ }
+
+ return recordValue, "live_log", nil
+}
+
+// parseJSONMessage attempts to parse raw data as JSON and map to schema fields
+func (hms *HybridMessageScanner) parseJSONMessage(data []byte) (*schema_pb.RecordValue, error) {
+ // Try to parse as JSON
+ var jsonData map[string]interface{}
+ if err := json.Unmarshal(data, &jsonData); err != nil {
+ return nil, fmt.Errorf("not valid JSON: %v", err)
+ }
+
+ recordValue := &schema_pb.RecordValue{
+ Fields: make(map[string]*schema_pb.Value),
+ }
+
+ // Map JSON fields to schema fields
+ for _, schemaField := range hms.recordSchema.Fields {
+ fieldName := schemaField.Name
+ if jsonValue, exists := jsonData[fieldName]; exists {
+ schemaValue, err := hms.convertJSONValueToSchemaValue(jsonValue, schemaField.Type)
+ if err != nil {
+ // Log conversion error but continue with other fields
+ continue
+ }
+ recordValue.Fields[fieldName] = schemaValue
+ }
+ }
+
+ return recordValue, nil
+}
+
+// parseProtobufMessage attempts to parse raw data as protobuf RecordValue
+func (hms *HybridMessageScanner) parseProtobufMessage(data []byte) (*schema_pb.RecordValue, error) {
+ // This might be a raw protobuf message that didn't parse correctly the first time
+ // Try alternative protobuf unmarshaling approaches
+ recordValue := &schema_pb.RecordValue{}
+
+ // Strategy 1: Direct unmarshaling (might work if it's actually a RecordValue)
+ if err := proto.Unmarshal(data, recordValue); err == nil {
+ return recordValue, nil
+ }
+
+ // Strategy 2: Check if it's a different protobuf message type
+ // For now, return error as we need more specific knowledge of MQ message formats
+ return nil, fmt.Errorf("could not parse as protobuf RecordValue")
+}
+
+// convertRawDataToSchemaValue converts raw bytes to a specific schema type
+func (hms *HybridMessageScanner) convertRawDataToSchemaValue(data []byte, fieldType *schema_pb.Type) (*schema_pb.Value, error) {
+ dataStr := string(data)
+
+ switch fieldType.Kind.(type) {
+ case *schema_pb.Type_ScalarType:
+ scalarType := fieldType.GetScalarType()
+ switch scalarType {
+ case schema_pb.ScalarType_STRING:
+ return &schema_pb.Value{
+ Kind: &schema_pb.Value_StringValue{StringValue: dataStr},
+ }, nil
+ case schema_pb.ScalarType_INT32:
+ if val, err := strconv.ParseInt(strings.TrimSpace(dataStr), 10, 32); err == nil {
+ return &schema_pb.Value{
+ Kind: &schema_pb.Value_Int32Value{Int32Value: int32(val)},
+ }, nil
+ }
+ case schema_pb.ScalarType_INT64:
+ if val, err := strconv.ParseInt(strings.TrimSpace(dataStr), 10, 64); err == nil {
+ return &schema_pb.Value{
+ Kind: &schema_pb.Value_Int64Value{Int64Value: val},
+ }, nil
+ }
+ case schema_pb.ScalarType_FLOAT:
+ if val, err := strconv.ParseFloat(strings.TrimSpace(dataStr), 32); err == nil {
+ return &schema_pb.Value{
+ Kind: &schema_pb.Value_FloatValue{FloatValue: float32(val)},
+ }, nil
+ }
+ case schema_pb.ScalarType_DOUBLE:
+ if val, err := strconv.ParseFloat(strings.TrimSpace(dataStr), 64); err == nil {
+ return &schema_pb.Value{
+ Kind: &schema_pb.Value_DoubleValue{DoubleValue: val},
+ }, nil
+ }
+ case schema_pb.ScalarType_BOOL:
+ lowerStr := strings.ToLower(strings.TrimSpace(dataStr))
+ if lowerStr == "true" || lowerStr == "1" || lowerStr == "yes" {
+ return &schema_pb.Value{
+ Kind: &schema_pb.Value_BoolValue{BoolValue: true},
+ }, nil
+ } else if lowerStr == "false" || lowerStr == "0" || lowerStr == "no" {
+ return &schema_pb.Value{
+ Kind: &schema_pb.Value_BoolValue{BoolValue: false},
+ }, nil
+ }
+ case schema_pb.ScalarType_BYTES:
+ return &schema_pb.Value{
+ Kind: &schema_pb.Value_BytesValue{BytesValue: data},
+ }, nil
+ }
+ }
+
+ return nil, fmt.Errorf("unsupported type conversion for %v", fieldType)
+}
+
+// convertJSONValueToSchemaValue converts a JSON value to schema_pb.Value based on schema type
+func (hms *HybridMessageScanner) convertJSONValueToSchemaValue(jsonValue interface{}, fieldType *schema_pb.Type) (*schema_pb.Value, error) {
+ switch fieldType.Kind.(type) {
+ case *schema_pb.Type_ScalarType:
+ scalarType := fieldType.GetScalarType()
+ switch scalarType {
+ case schema_pb.ScalarType_STRING:
+ if str, ok := jsonValue.(string); ok {
+ return &schema_pb.Value{
+ Kind: &schema_pb.Value_StringValue{StringValue: str},
+ }, nil
+ }
+ // Convert other types to string
+ return &schema_pb.Value{
+ Kind: &schema_pb.Value_StringValue{StringValue: fmt.Sprintf("%v", jsonValue)},
+ }, nil
+ case schema_pb.ScalarType_INT32:
+ if num, ok := jsonValue.(float64); ok { // JSON numbers are float64
+ return &schema_pb.Value{
+ Kind: &schema_pb.Value_Int32Value{Int32Value: int32(num)},
+ }, nil
+ }
+ case schema_pb.ScalarType_INT64:
+ if num, ok := jsonValue.(float64); ok {
+ return &schema_pb.Value{
+ Kind: &schema_pb.Value_Int64Value{Int64Value: int64(num)},
+ }, nil
+ }
+ case schema_pb.ScalarType_FLOAT:
+ if num, ok := jsonValue.(float64); ok {
+ return &schema_pb.Value{
+ Kind: &schema_pb.Value_FloatValue{FloatValue: float32(num)},
+ }, nil
+ }
+ case schema_pb.ScalarType_DOUBLE:
+ if num, ok := jsonValue.(float64); ok {
+ return &schema_pb.Value{
+ Kind: &schema_pb.Value_DoubleValue{DoubleValue: num},
+ }, nil
+ }
+ case schema_pb.ScalarType_BOOL:
+ if boolVal, ok := jsonValue.(bool); ok {
+ return &schema_pb.Value{
+ Kind: &schema_pb.Value_BoolValue{BoolValue: boolVal},
+ }, nil
+ }
+ case schema_pb.ScalarType_BYTES:
+ if str, ok := jsonValue.(string); ok {
+ return &schema_pb.Value{
+ Kind: &schema_pb.Value_BytesValue{BytesValue: []byte(str)},
+ }, nil
+ }
+ }
+ }
+
+ return nil, fmt.Errorf("incompatible JSON value type %T for schema type %v", jsonValue, fieldType)
+}
+
+// ConvertToSQLResult converts HybridScanResults to SQL query results
+func (hms *HybridMessageScanner) ConvertToSQLResult(results []HybridScanResult, columns []string) *QueryResult {
+ if len(results) == 0 {
+ return &QueryResult{
+ Columns: columns,
+ Rows: [][]sqltypes.Value{},
+ Database: hms.topic.Namespace,
+ Table: hms.topic.Name,
+ }
+ }
+
+ // Determine columns if not specified
+ if len(columns) == 0 {
+ columnSet := make(map[string]bool)
+ for _, result := range results {
+ for columnName := range result.Values {
+ columnSet[columnName] = true
+ }
+ }
+
+ columns = make([]string, 0, len(columnSet))
+ for columnName := range columnSet {
+ columns = append(columns, columnName)
+ }
+ }
+
+ // Convert to SQL rows
+ rows := make([][]sqltypes.Value, len(results))
+ for i, result := range results {
+ row := make([]sqltypes.Value, len(columns))
+ for j, columnName := range columns {
+ switch columnName {
+ case SW_COLUMN_NAME_SOURCE:
+ row[j] = sqltypes.NewVarChar(result.Source)
+ case SW_COLUMN_NAME_TIMESTAMP, SW_DISPLAY_NAME_TIMESTAMP:
+ // Format timestamp as proper timestamp type instead of raw nanoseconds
+ row[j] = hms.engine.formatTimestampColumn(result.Timestamp)
+ case SW_COLUMN_NAME_KEY:
+ row[j] = sqltypes.NewVarBinary(string(result.Key))
+ default:
+ if value, exists := result.Values[columnName]; exists {
+ row[j] = convertSchemaValueToSQL(value)
+ } else {
+ row[j] = sqltypes.NULL
+ }
+ }
+ }
+ rows[i] = row
+ }
+
+ return &QueryResult{
+ Columns: columns,
+ Rows: rows,
+ Database: hms.topic.Namespace,
+ Table: hms.topic.Name,
+ }
+}
+
+// ConvertToSQLResultWithMixedColumns handles SELECT *, specific_columns queries
+// Combines auto-discovered columns (from *) with explicitly requested columns
+func (hms *HybridMessageScanner) ConvertToSQLResultWithMixedColumns(results []HybridScanResult, explicitColumns []string) *QueryResult {
+ if len(results) == 0 {
+ // For empty results, combine auto-discovered columns with explicit ones
+ columnSet := make(map[string]bool)
+
+ // Add explicit columns first
+ for _, col := range explicitColumns {
+ columnSet[col] = true
+ }
+
+ // Build final column list
+ columns := make([]string, 0, len(columnSet))
+ for col := range columnSet {
+ columns = append(columns, col)
+ }
+
+ return &QueryResult{
+ Columns: columns,
+ Rows: [][]sqltypes.Value{},
+ Database: hms.topic.Namespace,
+ Table: hms.topic.Name,
+ }
+ }
+
+ // Auto-discover columns from data (like SELECT *)
+ autoColumns := make(map[string]bool)
+ for _, result := range results {
+ for columnName := range result.Values {
+ autoColumns[columnName] = true
+ }
+ }
+
+ // Combine auto-discovered and explicit columns
+ columnSet := make(map[string]bool)
+
+ // Add auto-discovered columns first (regular data columns)
+ for col := range autoColumns {
+ columnSet[col] = true
+ }
+
+ // Add explicit columns (may include system columns like _source)
+ for _, col := range explicitColumns {
+ columnSet[col] = true
+ }
+
+ // Build final column list
+ columns := make([]string, 0, len(columnSet))
+ for col := range columnSet {
+ columns = append(columns, col)
+ }
+
+ // Convert to SQL rows
+ rows := make([][]sqltypes.Value, len(results))
+ for i, result := range results {
+ row := make([]sqltypes.Value, len(columns))
+ for j, columnName := range columns {
+ switch columnName {
+ case SW_COLUMN_NAME_TIMESTAMP:
+ row[j] = sqltypes.NewInt64(result.Timestamp)
+ case SW_COLUMN_NAME_KEY:
+ row[j] = sqltypes.NewVarBinary(string(result.Key))
+ case SW_COLUMN_NAME_SOURCE:
+ row[j] = sqltypes.NewVarChar(result.Source)
+ default:
+ // Regular data column
+ if value, exists := result.Values[columnName]; exists {
+ row[j] = convertSchemaValueToSQL(value)
+ } else {
+ row[j] = sqltypes.NULL
+ }
+ }
+ }
+ rows[i] = row
+ }
+
+ return &QueryResult{
+ Columns: columns,
+ Rows: rows,
+ Database: hms.topic.Namespace,
+ Table: hms.topic.Name,
+ }
+}
+
+// ReadParquetStatistics efficiently reads column statistics from parquet files
+// without scanning the full file content - uses parquet's built-in metadata
+func (h *HybridMessageScanner) ReadParquetStatistics(partitionPath string) ([]*ParquetFileStats, error) {
+ var fileStats []*ParquetFileStats
+
+ // Use the same chunk cache as the logstore package
+ chunkCache := chunk_cache.NewChunkCacheInMemory(256)
+ lookupFileIdFn := filer.LookupFn(h.filerClient)
+
+ err := filer_pb.ReadDirAllEntries(context.Background(), h.filerClient, util.FullPath(partitionPath), "", func(entry *filer_pb.Entry, isLast bool) error {
+ // Only process parquet files
+ if entry.IsDirectory || !strings.HasSuffix(entry.Name, ".parquet") {
+ return nil
+ }
+
+ // Extract statistics from this parquet file
+ stats, err := h.extractParquetFileStats(entry, lookupFileIdFn, chunkCache)
+ if err != nil {
+ // Log error but continue processing other files
+ fmt.Printf("Warning: failed to extract stats from %s: %v\n", entry.Name, err)
+ return nil
+ }
+
+ if stats != nil {
+ fileStats = append(fileStats, stats)
+ }
+ return nil
+ })
+
+ return fileStats, err
+}
+
+// extractParquetFileStats extracts column statistics from a single parquet file
+func (h *HybridMessageScanner) extractParquetFileStats(entry *filer_pb.Entry, lookupFileIdFn wdclient.LookupFileIdFunctionType, chunkCache *chunk_cache.ChunkCacheInMemory) (*ParquetFileStats, error) {
+ // Create reader for the parquet file
+ fileSize := filer.FileSize(entry)
+ visibleIntervals, _ := filer.NonOverlappingVisibleIntervals(context.Background(), lookupFileIdFn, entry.Chunks, 0, int64(fileSize))
+ chunkViews := filer.ViewFromVisibleIntervals(visibleIntervals, 0, int64(fileSize))
+ readerCache := filer.NewReaderCache(32, chunkCache, lookupFileIdFn)
+ readerAt := filer.NewChunkReaderAtFromClient(context.Background(), readerCache, chunkViews, int64(fileSize))
+
+ // Create parquet reader - this only reads metadata, not data
+ parquetReader := parquet.NewReader(readerAt)
+ defer parquetReader.Close()
+
+ fileView := parquetReader.File()
+
+ fileStats := &ParquetFileStats{
+ FileName: entry.Name,
+ RowCount: fileView.NumRows(),
+ ColumnStats: make(map[string]*ParquetColumnStats),
+ }
+
+ // Get schema information
+ schema := fileView.Schema()
+
+ // Process each row group
+ rowGroups := fileView.RowGroups()
+ for _, rowGroup := range rowGroups {
+ columnChunks := rowGroup.ColumnChunks()
+
+ // Process each column chunk
+ for i, chunk := range columnChunks {
+ // Get column name from schema
+ columnName := h.getColumnNameFromSchema(schema, i)
+ if columnName == "" {
+ continue
+ }
+
+ // Try to get column statistics
+ columnIndex, err := chunk.ColumnIndex()
+ if err != nil {
+ // No column index available - skip this column
+ continue
+ }
+
+ // Extract min/max values from the first page (for simplicity)
+ // In a more sophisticated implementation, we could aggregate across all pages
+ numPages := columnIndex.NumPages()
+ if numPages == 0 {
+ continue
+ }
+
+ minParquetValue := columnIndex.MinValue(0)
+ maxParquetValue := columnIndex.MaxValue(numPages - 1)
+ nullCount := int64(0)
+
+ // Aggregate null counts across all pages
+ for pageIdx := 0; pageIdx < numPages; pageIdx++ {
+ nullCount += columnIndex.NullCount(pageIdx)
+ }
+
+ // Convert parquet values to schema_pb.Value
+ minValue, err := h.convertParquetValueToSchemaValue(minParquetValue)
+ if err != nil {
+ continue
+ }
+
+ maxValue, err := h.convertParquetValueToSchemaValue(maxParquetValue)
+ if err != nil {
+ continue
+ }
+
+ // Store column statistics (aggregate across row groups if column already exists)
+ if existingStats, exists := fileStats.ColumnStats[columnName]; exists {
+ // Update existing statistics
+ if h.compareSchemaValues(minValue, existingStats.MinValue) < 0 {
+ existingStats.MinValue = minValue
+ }
+ if h.compareSchemaValues(maxValue, existingStats.MaxValue) > 0 {
+ existingStats.MaxValue = maxValue
+ }
+ existingStats.NullCount += nullCount
+ } else {
+ // Create new column statistics
+ fileStats.ColumnStats[columnName] = &ParquetColumnStats{
+ ColumnName: columnName,
+ MinValue: minValue,
+ MaxValue: maxValue,
+ NullCount: nullCount,
+ RowCount: rowGroup.NumRows(),
+ }
+ }
+ }
+ }
+
+ return fileStats, nil
+}
+
+// getColumnNameFromSchema extracts column name from parquet schema by index
+func (h *HybridMessageScanner) getColumnNameFromSchema(schema *parquet.Schema, columnIndex int) string {
+ // Get the leaf columns in order
+ var columnNames []string
+ h.collectColumnNames(schema.Fields(), &columnNames)
+
+ if columnIndex >= 0 && columnIndex < len(columnNames) {
+ return columnNames[columnIndex]
+ }
+ return ""
+}
+
+// collectColumnNames recursively collects leaf column names from schema
+func (h *HybridMessageScanner) collectColumnNames(fields []parquet.Field, names *[]string) {
+ for _, field := range fields {
+ if len(field.Fields()) == 0 {
+ // This is a leaf field (no sub-fields)
+ *names = append(*names, field.Name())
+ } else {
+ // This is a group - recurse
+ h.collectColumnNames(field.Fields(), names)
+ }
+ }
+}
+
+// convertParquetValueToSchemaValue converts parquet.Value to schema_pb.Value
+func (h *HybridMessageScanner) convertParquetValueToSchemaValue(pv parquet.Value) (*schema_pb.Value, error) {
+ switch pv.Kind() {
+ case parquet.Boolean:
+ return &schema_pb.Value{Kind: &schema_pb.Value_BoolValue{BoolValue: pv.Boolean()}}, nil
+ case parquet.Int32:
+ return &schema_pb.Value{Kind: &schema_pb.Value_Int32Value{Int32Value: pv.Int32()}}, nil
+ case parquet.Int64:
+ return &schema_pb.Value{Kind: &schema_pb.Value_Int64Value{Int64Value: pv.Int64()}}, nil
+ case parquet.Float:
+ return &schema_pb.Value{Kind: &schema_pb.Value_FloatValue{FloatValue: pv.Float()}}, nil
+ case parquet.Double:
+ return &schema_pb.Value{Kind: &schema_pb.Value_DoubleValue{DoubleValue: pv.Double()}}, nil
+ case parquet.ByteArray:
+ return &schema_pb.Value{Kind: &schema_pb.Value_BytesValue{BytesValue: pv.ByteArray()}}, nil
+ default:
+ return nil, fmt.Errorf("unsupported parquet value kind: %v", pv.Kind())
+ }
+}
+
+// compareSchemaValues compares two schema_pb.Value objects
+func (h *HybridMessageScanner) compareSchemaValues(v1, v2 *schema_pb.Value) int {
+ if v1 == nil && v2 == nil {
+ return 0
+ }
+ if v1 == nil {
+ return -1
+ }
+ if v2 == nil {
+ return 1
+ }
+
+ // Extract raw values and compare
+ raw1 := h.extractRawValueFromSchema(v1)
+ raw2 := h.extractRawValueFromSchema(v2)
+
+ return h.compareRawValues(raw1, raw2)
+}
+
+// extractRawValueFromSchema extracts the raw value from schema_pb.Value
+func (h *HybridMessageScanner) extractRawValueFromSchema(value *schema_pb.Value) interface{} {
+ switch v := value.Kind.(type) {
+ case *schema_pb.Value_BoolValue:
+ return v.BoolValue
+ case *schema_pb.Value_Int32Value:
+ return v.Int32Value
+ case *schema_pb.Value_Int64Value:
+ return v.Int64Value
+ case *schema_pb.Value_FloatValue:
+ return v.FloatValue
+ case *schema_pb.Value_DoubleValue:
+ return v.DoubleValue
+ case *schema_pb.Value_BytesValue:
+ return string(v.BytesValue) // Convert to string for comparison
+ case *schema_pb.Value_StringValue:
+ return v.StringValue
+ }
+ return nil
+}
+
+// compareRawValues compares two raw values
+func (h *HybridMessageScanner) compareRawValues(v1, v2 interface{}) int {
+ // Handle nil cases
+ if v1 == nil && v2 == nil {
+ return 0
+ }
+ if v1 == nil {
+ return -1
+ }
+ if v2 == nil {
+ return 1
+ }
+
+ // Compare based on type
+ switch val1 := v1.(type) {
+ case bool:
+ if val2, ok := v2.(bool); ok {
+ if val1 == val2 {
+ return 0
+ }
+ if val1 {
+ return 1
+ }
+ return -1
+ }
+ case int32:
+ if val2, ok := v2.(int32); ok {
+ if val1 < val2 {
+ return -1
+ } else if val1 > val2 {
+ return 1
+ }
+ return 0
+ }
+ case int64:
+ if val2, ok := v2.(int64); ok {
+ if val1 < val2 {
+ return -1
+ } else if val1 > val2 {
+ return 1
+ }
+ return 0
+ }
+ case float32:
+ if val2, ok := v2.(float32); ok {
+ if val1 < val2 {
+ return -1
+ } else if val1 > val2 {
+ return 1
+ }
+ return 0
+ }
+ case float64:
+ if val2, ok := v2.(float64); ok {
+ if val1 < val2 {
+ return -1
+ } else if val1 > val2 {
+ return 1
+ }
+ return 0
+ }
+ case string:
+ if val2, ok := v2.(string); ok {
+ if val1 < val2 {
+ return -1
+ } else if val1 > val2 {
+ return 1
+ }
+ return 0
+ }
+ }
+
+ // Default: try string comparison
+ str1 := fmt.Sprintf("%v", v1)
+ str2 := fmt.Sprintf("%v", v2)
+ if str1 < str2 {
+ return -1
+ } else if str1 > str2 {
+ return 1
+ }
+ return 0
+}
+
+// streamingMerge merges multiple sorted data sources using a heap-based approach
+// This provides memory-efficient merging without loading all data into memory
+func (hms *HybridMessageScanner) streamingMerge(dataSources []StreamingDataSource, limit int) ([]HybridScanResult, error) {
+ if len(dataSources) == 0 {
+ return nil, nil
+ }
+
+ var results []HybridScanResult
+ mergeHeap := &StreamingMergeHeap{}
+ heap.Init(mergeHeap)
+
+ // Initialize heap with first item from each data source
+ for i, source := range dataSources {
+ if source.HasMore() {
+ result, err := source.Next()
+ if err != nil {
+ // Close all sources and return error
+ for _, s := range dataSources {
+ s.Close()
+ }
+ return nil, fmt.Errorf("failed to read from data source %d: %v", i, err)
+ }
+ if result != nil {
+ heap.Push(mergeHeap, &StreamingMergeItem{
+ Result: result,
+ SourceID: i,
+ DataSource: source,
+ })
+ }
+ }
+ }
+
+ // Process results in chronological order
+ for mergeHeap.Len() > 0 {
+ // Get next chronologically ordered result
+ item := heap.Pop(mergeHeap).(*StreamingMergeItem)
+ results = append(results, *item.Result)
+
+ // Check limit
+ if limit > 0 && len(results) >= limit {
+ break
+ }
+
+ // Try to get next item from the same data source
+ if item.DataSource.HasMore() {
+ nextResult, err := item.DataSource.Next()
+ if err != nil {
+ // Log error but continue with other sources
+ fmt.Printf("Warning: Error reading next item from source %d: %v\n", item.SourceID, err)
+ } else if nextResult != nil {
+ heap.Push(mergeHeap, &StreamingMergeItem{
+ Result: nextResult,
+ SourceID: item.SourceID,
+ DataSource: item.DataSource,
+ })
+ }
+ }
+ }
+
+ // Close all data sources
+ for _, source := range dataSources {
+ source.Close()
+ }
+
+ return results, nil
+}
+
+// SliceDataSource wraps a pre-loaded slice of results as a StreamingDataSource
+// This is used for unflushed data that is already loaded into memory
+type SliceDataSource struct {
+ results []HybridScanResult
+ index int
+}
+
+func NewSliceDataSource(results []HybridScanResult) *SliceDataSource {
+ return &SliceDataSource{
+ results: results,
+ index: 0,
+ }
+}
+
+func (s *SliceDataSource) Next() (*HybridScanResult, error) {
+ if s.index >= len(s.results) {
+ return nil, nil
+ }
+ result := &s.results[s.index]
+ s.index++
+ return result, nil
+}
+
+func (s *SliceDataSource) HasMore() bool {
+ return s.index < len(s.results)
+}
+
+func (s *SliceDataSource) Close() error {
+ return nil // Nothing to clean up for slice-based source
+}
+
+// StreamingFlushedDataSource provides streaming access to flushed data
+type StreamingFlushedDataSource struct {
+ hms *HybridMessageScanner
+ partition topic.Partition
+ options HybridScanOptions
+ mergedReadFn func(startPosition log_buffer.MessagePosition, stopTsNs int64, eachLogEntryFn log_buffer.EachLogEntryFuncType) (lastReadPosition log_buffer.MessagePosition, isDone bool, err error)
+ resultChan chan *HybridScanResult
+ errorChan chan error
+ doneChan chan struct{}
+ started bool
+ finished bool
+ closed int32 // atomic flag to prevent double close
+ mu sync.RWMutex
+}
+
+func NewStreamingFlushedDataSource(hms *HybridMessageScanner, partition topic.Partition, options HybridScanOptions) *StreamingFlushedDataSource {
+ mergedReadFn := logstore.GenMergedReadFunc(hms.filerClient, hms.topic, partition)
+
+ return &StreamingFlushedDataSource{
+ hms: hms,
+ partition: partition,
+ options: options,
+ mergedReadFn: mergedReadFn,
+ resultChan: make(chan *HybridScanResult, 100), // Buffer for better performance
+ errorChan: make(chan error, 1),
+ doneChan: make(chan struct{}),
+ started: false,
+ finished: false,
+ }
+}
+
+func (s *StreamingFlushedDataSource) startStreaming() {
+ if s.started {
+ return
+ }
+ s.started = true
+
+ go func() {
+ defer func() {
+ // Use atomic flag to ensure channels are only closed once
+ if atomic.CompareAndSwapInt32(&s.closed, 0, 1) {
+ close(s.resultChan)
+ close(s.errorChan)
+ close(s.doneChan)
+ }
+ }()
+
+ // Set up time range for scanning
+ startTime := time.Unix(0, s.options.StartTimeNs)
+ if s.options.StartTimeNs == 0 {
+ startTime = time.Unix(0, 0)
+ }
+
+ stopTsNs := s.options.StopTimeNs
+ // For SQL queries, stopTsNs = 0 means "no stop time restriction"
+ // This is different from message queue consumers which want to stop at "now"
+ // We detect SQL context by checking if we have a predicate function
+ if stopTsNs == 0 && s.options.Predicate == nil {
+ // Only set to current time for non-SQL queries (message queue consumers)
+ stopTsNs = time.Now().UnixNano()
+ }
+ // If stopTsNs is still 0, it means this is a SQL query that wants unrestricted scanning
+
+ // Message processing function
+ eachLogEntryFn := func(logEntry *filer_pb.LogEntry) (isDone bool, err error) {
+ // Skip control entries without actual data
+ if s.hms.isControlEntry(logEntry) {
+ return false, nil // Skip this entry
+ }
+
+ // Convert log entry to schema_pb.RecordValue for consistent processing
+ recordValue, source, convertErr := s.hms.convertLogEntryToRecordValue(logEntry)
+ if convertErr != nil {
+ return false, fmt.Errorf("failed to convert log entry: %v", convertErr)
+ }
+
+ // Apply predicate filtering (WHERE clause)
+ if s.options.Predicate != nil && !s.options.Predicate(recordValue) {
+ return false, nil // Skip this message
+ }
+
+ // Extract system columns
+ timestamp := recordValue.Fields[SW_COLUMN_NAME_TIMESTAMP].GetInt64Value()
+ key := recordValue.Fields[SW_COLUMN_NAME_KEY].GetBytesValue()
+
+ // Apply column projection
+ values := make(map[string]*schema_pb.Value)
+ if len(s.options.Columns) == 0 {
+ // Select all columns (excluding system columns from user view)
+ for name, value := range recordValue.Fields {
+ if name != SW_COLUMN_NAME_TIMESTAMP && name != SW_COLUMN_NAME_KEY {
+ values[name] = value
+ }
+ }
+ } else {
+ // Select specified columns only
+ for _, columnName := range s.options.Columns {
+ if value, exists := recordValue.Fields[columnName]; exists {
+ values[columnName] = value
+ }
+ }
+ }
+
+ result := &HybridScanResult{
+ Values: values,
+ Timestamp: timestamp,
+ Key: key,
+ Source: source,
+ }
+
+ // Check if already closed before trying to send
+ if atomic.LoadInt32(&s.closed) != 0 {
+ return true, nil // Stop processing if closed
+ }
+
+ // Send result to channel with proper handling of closed channels
+ select {
+ case s.resultChan <- result:
+ return false, nil
+ case <-s.doneChan:
+ return true, nil // Stop processing if closed
+ default:
+ // Check again if closed (in case it was closed between the atomic check and select)
+ if atomic.LoadInt32(&s.closed) != 0 {
+ return true, nil
+ }
+ // If not closed, try sending again with blocking select
+ select {
+ case s.resultChan <- result:
+ return false, nil
+ case <-s.doneChan:
+ return true, nil
+ }
+ }
+ }
+
+ // Start scanning from the specified position
+ startPosition := log_buffer.MessagePosition{Time: startTime}
+ _, _, err := s.mergedReadFn(startPosition, stopTsNs, eachLogEntryFn)
+
+ if err != nil {
+ // Only try to send error if not already closed
+ if atomic.LoadInt32(&s.closed) == 0 {
+ select {
+ case s.errorChan <- fmt.Errorf("flushed data scan failed: %v", err):
+ case <-s.doneChan:
+ default:
+ // Channel might be full or closed, ignore
+ }
+ }
+ }
+
+ s.finished = true
+ }()
+}
+
+func (s *StreamingFlushedDataSource) Next() (*HybridScanResult, error) {
+ if !s.started {
+ s.startStreaming()
+ }
+
+ select {
+ case result, ok := <-s.resultChan:
+ if !ok {
+ return nil, nil // No more results
+ }
+ return result, nil
+ case err := <-s.errorChan:
+ return nil, err
+ case <-s.doneChan:
+ return nil, nil
+ }
+}
+
+func (s *StreamingFlushedDataSource) HasMore() bool {
+ if !s.started {
+ return true // Haven't started yet, so potentially has data
+ }
+ return !s.finished || len(s.resultChan) > 0
+}
+
+func (s *StreamingFlushedDataSource) Close() error {
+ // Use atomic flag to ensure channels are only closed once
+ if atomic.CompareAndSwapInt32(&s.closed, 0, 1) {
+ close(s.doneChan)
+ close(s.resultChan)
+ close(s.errorChan)
+ }
+ return nil
+}
+
+// mergeSort efficiently sorts HybridScanResult slice by timestamp using merge sort algorithm
+func (hms *HybridMessageScanner) mergeSort(results []HybridScanResult, left, right int) {
+ if left < right {
+ mid := left + (right-left)/2
+
+ // Recursively sort both halves
+ hms.mergeSort(results, left, mid)
+ hms.mergeSort(results, mid+1, right)
+
+ // Merge the sorted halves
+ hms.merge(results, left, mid, right)
+ }
+}
+
+// merge combines two sorted subarrays into a single sorted array
+func (hms *HybridMessageScanner) merge(results []HybridScanResult, left, mid, right int) {
+ // Create temporary arrays for the two subarrays
+ leftArray := make([]HybridScanResult, mid-left+1)
+ rightArray := make([]HybridScanResult, right-mid)
+
+ // Copy data to temporary arrays
+ copy(leftArray, results[left:mid+1])
+ copy(rightArray, results[mid+1:right+1])
+
+ // Merge the temporary arrays back into results[left..right]
+ i, j, k := 0, 0, left
+
+ for i < len(leftArray) && j < len(rightArray) {
+ if leftArray[i].Timestamp <= rightArray[j].Timestamp {
+ results[k] = leftArray[i]
+ i++
+ } else {
+ results[k] = rightArray[j]
+ j++
+ }
+ k++
+ }
+
+ // Copy remaining elements of leftArray, if any
+ for i < len(leftArray) {
+ results[k] = leftArray[i]
+ i++
+ k++
+ }
+
+ // Copy remaining elements of rightArray, if any
+ for j < len(rightArray) {
+ results[k] = rightArray[j]
+ j++
+ k++
+ }
+}