diff options
Diffstat (limited to 'weed/query/engine/hybrid_message_scanner.go')
| -rw-r--r-- | weed/query/engine/hybrid_message_scanner.go | 1668 |
1 files changed, 1668 insertions, 0 deletions
diff --git a/weed/query/engine/hybrid_message_scanner.go b/weed/query/engine/hybrid_message_scanner.go new file mode 100644 index 000000000..2584b54a6 --- /dev/null +++ b/weed/query/engine/hybrid_message_scanner.go @@ -0,0 +1,1668 @@ +package engine + +import ( + "container/heap" + "context" + "encoding/json" + "fmt" + "io" + "strconv" + "strings" + "sync" + "sync/atomic" + "time" + + "github.com/parquet-go/parquet-go" + "github.com/seaweedfs/seaweedfs/weed/filer" + "github.com/seaweedfs/seaweedfs/weed/mq/logstore" + "github.com/seaweedfs/seaweedfs/weed/mq/schema" + "github.com/seaweedfs/seaweedfs/weed/mq/topic" + "github.com/seaweedfs/seaweedfs/weed/pb/filer_pb" + "github.com/seaweedfs/seaweedfs/weed/pb/mq_pb" + "github.com/seaweedfs/seaweedfs/weed/pb/schema_pb" + "github.com/seaweedfs/seaweedfs/weed/query/sqltypes" + "github.com/seaweedfs/seaweedfs/weed/util" + "github.com/seaweedfs/seaweedfs/weed/util/chunk_cache" + "github.com/seaweedfs/seaweedfs/weed/util/log_buffer" + "github.com/seaweedfs/seaweedfs/weed/wdclient" + "google.golang.org/protobuf/proto" +) + +// HybridMessageScanner scans from ALL data sources: +// Architecture: +// 1. Unflushed in-memory data from brokers (mq_pb.DataMessage format) - REAL-TIME +// 2. Recent/live messages in log files (filer_pb.LogEntry format) - FLUSHED +// 3. Older messages in Parquet files (schema_pb.RecordValue format) - ARCHIVED +// 4. Seamlessly merges data from all sources chronologically +// 5. Provides complete real-time view of all messages in a topic +type HybridMessageScanner struct { + filerClient filer_pb.FilerClient + brokerClient BrokerClientInterface // For querying unflushed data + topic topic.Topic + recordSchema *schema_pb.RecordType + parquetLevels *schema.ParquetLevels + engine *SQLEngine // Reference for system column formatting +} + +// NewHybridMessageScanner creates a scanner that reads from all data sources +// This provides complete real-time message coverage including unflushed data +func NewHybridMessageScanner(filerClient filer_pb.FilerClient, brokerClient BrokerClientInterface, namespace, topicName string, engine *SQLEngine) (*HybridMessageScanner, error) { + // Check if filerClient is available + if filerClient == nil { + return nil, fmt.Errorf("filerClient is required but not available") + } + + // Create topic reference + t := topic.Topic{ + Namespace: namespace, + Name: topicName, + } + + // Get topic schema from broker client (works with both real and mock clients) + recordType, err := brokerClient.GetTopicSchema(context.Background(), namespace, topicName) + if err != nil { + return nil, fmt.Errorf("failed to get topic schema: %v", err) + } + if recordType == nil { + return nil, NoSchemaError{Namespace: namespace, Topic: topicName} + } + + // Create a copy of the recordType to avoid modifying the original + recordTypeCopy := &schema_pb.RecordType{ + Fields: make([]*schema_pb.Field, len(recordType.Fields)), + } + copy(recordTypeCopy.Fields, recordType.Fields) + + // Add system columns that MQ adds to all records + recordType = schema.NewRecordTypeBuilder(recordTypeCopy). + WithField(SW_COLUMN_NAME_TIMESTAMP, schema.TypeInt64). + WithField(SW_COLUMN_NAME_KEY, schema.TypeBytes). + RecordTypeEnd() + + // Convert to Parquet levels for efficient reading + parquetLevels, err := schema.ToParquetLevels(recordType) + if err != nil { + return nil, fmt.Errorf("failed to create Parquet levels: %v", err) + } + + return &HybridMessageScanner{ + filerClient: filerClient, + brokerClient: brokerClient, + topic: t, + recordSchema: recordType, + parquetLevels: parquetLevels, + engine: engine, + }, nil +} + +// HybridScanOptions configure how the scanner reads from both live and archived data +type HybridScanOptions struct { + // Time range filtering (Unix nanoseconds) + StartTimeNs int64 + StopTimeNs int64 + + // Column projection - if empty, select all columns + Columns []string + + // Row limit - 0 means no limit + Limit int + + // Row offset - 0 means no offset + Offset int + + // Predicate for WHERE clause filtering + Predicate func(*schema_pb.RecordValue) bool +} + +// HybridScanResult represents a message from either live logs or Parquet files +type HybridScanResult struct { + Values map[string]*schema_pb.Value // Column name -> value + Timestamp int64 // Message timestamp (_ts_ns) + Key []byte // Message key (_key) + Source string // "live_log" or "parquet_archive" or "in_memory_broker" +} + +// HybridScanStats contains statistics about data sources scanned +type HybridScanStats struct { + BrokerBufferQueried bool + BrokerBufferMessages int + BufferStartIndex int64 + PartitionsScanned int + LiveLogFilesScanned int // Number of live log files processed +} + +// ParquetColumnStats holds statistics for a single column from parquet metadata +type ParquetColumnStats struct { + ColumnName string + MinValue *schema_pb.Value + MaxValue *schema_pb.Value + NullCount int64 + RowCount int64 +} + +// ParquetFileStats holds aggregated statistics for a parquet file +type ParquetFileStats struct { + FileName string + RowCount int64 + ColumnStats map[string]*ParquetColumnStats +} + +// StreamingDataSource provides a streaming interface for reading scan results +type StreamingDataSource interface { + Next() (*HybridScanResult, error) // Returns next result or nil when done + HasMore() bool // Returns true if more data available + Close() error // Clean up resources +} + +// StreamingMergeItem represents an item in the priority queue for streaming merge +type StreamingMergeItem struct { + Result *HybridScanResult + SourceID int + DataSource StreamingDataSource +} + +// StreamingMergeHeap implements heap.Interface for merging sorted streams by timestamp +type StreamingMergeHeap []*StreamingMergeItem + +func (h StreamingMergeHeap) Len() int { return len(h) } + +func (h StreamingMergeHeap) Less(i, j int) bool { + // Sort by timestamp (ascending order) + return h[i].Result.Timestamp < h[j].Result.Timestamp +} + +func (h StreamingMergeHeap) Swap(i, j int) { h[i], h[j] = h[j], h[i] } + +func (h *StreamingMergeHeap) Push(x interface{}) { + *h = append(*h, x.(*StreamingMergeItem)) +} + +func (h *StreamingMergeHeap) Pop() interface{} { + old := *h + n := len(old) + item := old[n-1] + *h = old[0 : n-1] + return item +} + +// Scan reads messages from both live logs and archived Parquet files +// Uses SeaweedFS MQ's GenMergedReadFunc for seamless integration +// Assumptions: +// 1. Chronologically merges live and archived data +// 2. Applies filtering at the lowest level for efficiency +// 3. Handles schema evolution transparently +func (hms *HybridMessageScanner) Scan(ctx context.Context, options HybridScanOptions) ([]HybridScanResult, error) { + results, _, err := hms.ScanWithStats(ctx, options) + return results, err +} + +// ScanWithStats reads messages and returns scan statistics for execution plans +func (hms *HybridMessageScanner) ScanWithStats(ctx context.Context, options HybridScanOptions) ([]HybridScanResult, *HybridScanStats, error) { + var results []HybridScanResult + stats := &HybridScanStats{} + + // Get all partitions for this topic via MQ broker discovery + partitions, err := hms.discoverTopicPartitions(ctx) + if err != nil { + return nil, stats, fmt.Errorf("failed to discover partitions for topic %s: %v", hms.topic.String(), err) + } + + stats.PartitionsScanned = len(partitions) + + for _, partition := range partitions { + partitionResults, partitionStats, err := hms.scanPartitionHybridWithStats(ctx, partition, options) + if err != nil { + return nil, stats, fmt.Errorf("failed to scan partition %v: %v", partition, err) + } + + results = append(results, partitionResults...) + + // Aggregate broker buffer stats + if partitionStats != nil { + if partitionStats.BrokerBufferQueried { + stats.BrokerBufferQueried = true + } + stats.BrokerBufferMessages += partitionStats.BrokerBufferMessages + if partitionStats.BufferStartIndex > 0 && (stats.BufferStartIndex == 0 || partitionStats.BufferStartIndex < stats.BufferStartIndex) { + stats.BufferStartIndex = partitionStats.BufferStartIndex + } + } + + // Apply global limit (without offset) across all partitions + // When OFFSET is used, collect more data to ensure we have enough after skipping + // Note: OFFSET will be applied at the end to avoid double-application + if options.Limit > 0 { + // Collect exact amount needed: LIMIT + OFFSET (no excessive doubling) + minRequired := options.Limit + options.Offset + // Small buffer only when needed to handle edge cases in distributed scanning + if options.Offset > 0 && minRequired < 10 { + minRequired = minRequired + 1 // Add 1 extra row buffer, not doubling + } + if len(results) >= minRequired { + break + } + } + } + + // Apply final OFFSET and LIMIT processing (done once at the end) + // Limit semantics: -1 = no limit, 0 = LIMIT 0 (empty), >0 = limit to N rows + if options.Offset > 0 || options.Limit >= 0 { + // Handle LIMIT 0 special case first + if options.Limit == 0 { + return []HybridScanResult{}, stats, nil + } + + // Apply OFFSET first + if options.Offset > 0 { + if options.Offset >= len(results) { + results = []HybridScanResult{} + } else { + results = results[options.Offset:] + } + } + + // Apply LIMIT after OFFSET (only if limit > 0) + if options.Limit > 0 && len(results) > options.Limit { + results = results[:options.Limit] + } + } + + return results, stats, nil +} + +// scanUnflushedData queries brokers for unflushed in-memory data using buffer_start deduplication +func (hms *HybridMessageScanner) scanUnflushedData(ctx context.Context, partition topic.Partition, options HybridScanOptions) ([]HybridScanResult, error) { + results, _, err := hms.scanUnflushedDataWithStats(ctx, partition, options) + return results, err +} + +// scanUnflushedDataWithStats queries brokers for unflushed data and returns statistics +func (hms *HybridMessageScanner) scanUnflushedDataWithStats(ctx context.Context, partition topic.Partition, options HybridScanOptions) ([]HybridScanResult, *HybridScanStats, error) { + var results []HybridScanResult + stats := &HybridScanStats{} + + // Skip if no broker client available + if hms.brokerClient == nil { + return results, stats, nil + } + + // Mark that we attempted to query broker buffer + stats.BrokerBufferQueried = true + + // Step 1: Get unflushed data from broker using buffer_start-based method + // This method uses buffer_start metadata to avoid double-counting with exact precision + unflushedEntries, err := hms.brokerClient.GetUnflushedMessages(ctx, hms.topic.Namespace, hms.topic.Name, partition, options.StartTimeNs) + if err != nil { + // Log error but don't fail the query - continue with disk data only + if isDebugMode(ctx) { + fmt.Printf("Debug: Failed to get unflushed messages: %v\n", err) + } + // Reset queried flag on error + stats.BrokerBufferQueried = false + return results, stats, nil + } + + // Capture stats for EXPLAIN + stats.BrokerBufferMessages = len(unflushedEntries) + + // Debug logging for EXPLAIN mode + if isDebugMode(ctx) { + fmt.Printf("Debug: Broker buffer queried - found %d unflushed messages\n", len(unflushedEntries)) + if len(unflushedEntries) > 0 { + fmt.Printf("Debug: Using buffer_start deduplication for precise real-time data\n") + } + } + + // Step 2: Process unflushed entries (already deduplicated by broker) + for _, logEntry := range unflushedEntries { + // Skip control entries without actual data + if hms.isControlEntry(logEntry) { + continue // Skip this entry + } + + // Skip messages outside time range + if options.StartTimeNs > 0 && logEntry.TsNs < options.StartTimeNs { + continue + } + if options.StopTimeNs > 0 && logEntry.TsNs > options.StopTimeNs { + continue + } + + // Convert LogEntry to RecordValue format (same as disk data) + recordValue, _, err := hms.convertLogEntryToRecordValue(logEntry) + if err != nil { + if isDebugMode(ctx) { + fmt.Printf("Debug: Failed to convert unflushed log entry: %v\n", err) + } + continue // Skip malformed messages + } + + // Apply predicate filter if provided + if options.Predicate != nil && !options.Predicate(recordValue) { + continue + } + + // Extract system columns for result + timestamp := recordValue.Fields[SW_COLUMN_NAME_TIMESTAMP].GetInt64Value() + key := recordValue.Fields[SW_COLUMN_NAME_KEY].GetBytesValue() + + // Apply column projection + values := make(map[string]*schema_pb.Value) + if len(options.Columns) == 0 { + // Select all columns (excluding system columns from user view) + for name, value := range recordValue.Fields { + if name != SW_COLUMN_NAME_TIMESTAMP && name != SW_COLUMN_NAME_KEY { + values[name] = value + } + } + } else { + // Select specified columns only + for _, columnName := range options.Columns { + if value, exists := recordValue.Fields[columnName]; exists { + values[columnName] = value + } + } + } + + // Create result with proper source tagging + result := HybridScanResult{ + Values: values, + Timestamp: timestamp, + Key: key, + Source: "live_log", // Data from broker's unflushed messages + } + + results = append(results, result) + + // Apply limit (accounting for offset) - collect exact amount needed + if options.Limit > 0 { + // Collect exact amount needed: LIMIT + OFFSET (no excessive doubling) + minRequired := options.Limit + options.Offset + // Small buffer only when needed to handle edge cases in message streaming + if options.Offset > 0 && minRequired < 10 { + minRequired = minRequired + 1 // Add 1 extra row buffer, not doubling + } + if len(results) >= minRequired { + break + } + } + } + + if isDebugMode(ctx) { + fmt.Printf("Debug: Retrieved %d unflushed messages from broker\n", len(results)) + } + + return results, stats, nil +} + +// convertDataMessageToRecord converts mq_pb.DataMessage to schema_pb.RecordValue +func (hms *HybridMessageScanner) convertDataMessageToRecord(msg *mq_pb.DataMessage) (*schema_pb.RecordValue, string, error) { + // Parse the message data as RecordValue + recordValue := &schema_pb.RecordValue{} + if err := proto.Unmarshal(msg.Value, recordValue); err != nil { + return nil, "", fmt.Errorf("failed to unmarshal message data: %v", err) + } + + // Add system columns + if recordValue.Fields == nil { + recordValue.Fields = make(map[string]*schema_pb.Value) + } + + // Add timestamp + recordValue.Fields[SW_COLUMN_NAME_TIMESTAMP] = &schema_pb.Value{ + Kind: &schema_pb.Value_Int64Value{Int64Value: msg.TsNs}, + } + + return recordValue, string(msg.Key), nil +} + +// discoverTopicPartitions discovers the actual partitions for this topic by scanning the filesystem +// This finds real partition directories like v2025-09-01-07-16-34/0000-0630/ +func (hms *HybridMessageScanner) discoverTopicPartitions(ctx context.Context) ([]topic.Partition, error) { + if hms.filerClient == nil { + return nil, fmt.Errorf("filerClient not available for partition discovery") + } + + var allPartitions []topic.Partition + var err error + + // Scan the topic directory for actual partition versions (timestamped directories) + // List all version directories in the topic directory + err = filer_pb.ReadDirAllEntries(ctx, hms.filerClient, util.FullPath(hms.topic.Dir()), "", func(versionEntry *filer_pb.Entry, isLast bool) error { + if !versionEntry.IsDirectory { + return nil // Skip non-directories + } + + // Parse version timestamp from directory name (e.g., "v2025-09-01-07-16-34") + versionTime, parseErr := topic.ParseTopicVersion(versionEntry.Name) + if parseErr != nil { + // Skip directories that don't match the version format + return nil + } + + // Scan partition directories within this version + versionDir := fmt.Sprintf("%s/%s", hms.topic.Dir(), versionEntry.Name) + return filer_pb.ReadDirAllEntries(ctx, hms.filerClient, util.FullPath(versionDir), "", func(partitionEntry *filer_pb.Entry, isLast bool) error { + if !partitionEntry.IsDirectory { + return nil // Skip non-directories + } + + // Parse partition boundary from directory name (e.g., "0000-0630") + rangeStart, rangeStop := topic.ParsePartitionBoundary(partitionEntry.Name) + if rangeStart == rangeStop { + return nil // Skip invalid partition names + } + + // Create partition object + partition := topic.Partition{ + RangeStart: rangeStart, + RangeStop: rangeStop, + RingSize: topic.PartitionCount, + UnixTimeNs: versionTime.UnixNano(), + } + + allPartitions = append(allPartitions, partition) + return nil + }) + }) + + if err != nil { + return nil, fmt.Errorf("failed to scan topic directory for partitions: %v", err) + } + + // If no partitions found, return empty slice (valid for newly created or empty topics) + if len(allPartitions) == 0 { + fmt.Printf("No partitions found for topic %s - returning empty result set\n", hms.topic.String()) + return []topic.Partition{}, nil + } + + fmt.Printf("Discovered %d partitions for topic %s\n", len(allPartitions), hms.topic.String()) + return allPartitions, nil +} + +// scanPartitionHybrid scans a specific partition using the hybrid approach +// This is where the magic happens - seamlessly reading ALL data sources: +// 1. Unflushed in-memory data from brokers (REAL-TIME) +// 2. Live logs + Parquet files from disk (FLUSHED/ARCHIVED) +func (hms *HybridMessageScanner) scanPartitionHybrid(ctx context.Context, partition topic.Partition, options HybridScanOptions) ([]HybridScanResult, error) { + results, _, err := hms.scanPartitionHybridWithStats(ctx, partition, options) + return results, err +} + +// scanPartitionHybridWithStats scans a specific partition using streaming merge for memory efficiency +// PERFORMANCE IMPROVEMENT: Uses heap-based streaming merge instead of collecting all data and sorting +// - Memory usage: O(k) where k = number of data sources, instead of O(n) where n = total records +// - Scalable: Can handle large topics without LIMIT clauses efficiently +// - Streaming: Processes data as it arrives rather than buffering everything +func (hms *HybridMessageScanner) scanPartitionHybridWithStats(ctx context.Context, partition topic.Partition, options HybridScanOptions) ([]HybridScanResult, *HybridScanStats, error) { + stats := &HybridScanStats{} + + // STEP 1: Scan unflushed in-memory data from brokers (REAL-TIME) + unflushedResults, unflushedStats, err := hms.scanUnflushedDataWithStats(ctx, partition, options) + if err != nil { + // Don't fail the query if broker scanning fails, but provide clear warning to user + // This ensures users are aware that results may not include the most recent data + if isDebugMode(ctx) { + fmt.Printf("Debug: Failed to scan unflushed data from broker: %v\n", err) + } else { + fmt.Printf("Warning: Unable to access real-time data from message broker: %v\n", err) + fmt.Printf("Note: Query results may not include the most recent unflushed messages\n") + } + } else if unflushedStats != nil { + stats.BrokerBufferQueried = unflushedStats.BrokerBufferQueried + stats.BrokerBufferMessages = unflushedStats.BrokerBufferMessages + stats.BufferStartIndex = unflushedStats.BufferStartIndex + } + + // Count live log files for statistics + liveLogCount, err := hms.countLiveLogFiles(partition) + if err != nil { + // Don't fail the query, just log warning + fmt.Printf("Warning: Failed to count live log files: %v\n", err) + liveLogCount = 0 + } + stats.LiveLogFilesScanned = liveLogCount + + // STEP 2: Create streaming data sources for memory-efficient merge + var dataSources []StreamingDataSource + + // Add unflushed data source (if we have unflushed results) + if len(unflushedResults) > 0 { + // Sort unflushed results by timestamp before creating stream + if len(unflushedResults) > 1 { + hms.mergeSort(unflushedResults, 0, len(unflushedResults)-1) + } + dataSources = append(dataSources, NewSliceDataSource(unflushedResults)) + } + + // Add streaming flushed data source (live logs + Parquet files) + flushedDataSource := NewStreamingFlushedDataSource(hms, partition, options) + dataSources = append(dataSources, flushedDataSource) + + // STEP 3: Use streaming merge for memory-efficient chronological ordering + var results []HybridScanResult + if len(dataSources) > 0 { + // Calculate how many rows we need to collect during scanning (before OFFSET/LIMIT) + // For LIMIT N OFFSET M, we need to collect at least N+M rows + scanLimit := options.Limit + if options.Limit > 0 && options.Offset > 0 { + scanLimit = options.Limit + options.Offset + } + + mergedResults, err := hms.streamingMerge(dataSources, scanLimit) + if err != nil { + return nil, stats, fmt.Errorf("streaming merge failed: %v", err) + } + results = mergedResults + } + + return results, stats, nil +} + +// countLiveLogFiles counts the number of live log files in a partition for statistics +func (hms *HybridMessageScanner) countLiveLogFiles(partition topic.Partition) (int, error) { + partitionDir := topic.PartitionDir(hms.topic, partition) + + var fileCount int + err := hms.filerClient.WithFilerClient(false, func(client filer_pb.SeaweedFilerClient) error { + // List all files in partition directory + request := &filer_pb.ListEntriesRequest{ + Directory: partitionDir, + Prefix: "", + StartFromFileName: "", + InclusiveStartFrom: true, + Limit: 10000, // reasonable limit for counting + } + + stream, err := client.ListEntries(context.Background(), request) + if err != nil { + return err + } + + for { + resp, err := stream.Recv() + if err == io.EOF { + break + } + if err != nil { + return err + } + + // Count files that are not .parquet files (live log files) + // Live log files typically have timestamps or are named like log files + fileName := resp.Entry.Name + if !strings.HasSuffix(fileName, ".parquet") && + !strings.HasSuffix(fileName, ".offset") && + len(resp.Entry.Chunks) > 0 { // Has actual content + fileCount++ + } + } + + return nil + }) + + if err != nil { + return 0, err + } + return fileCount, nil +} + +// isControlEntry checks if a log entry is a control entry without actual data +// Based on MQ system analysis, control entries are: +// 1. DataMessages with populated Ctrl field (publisher close signals) +// 2. Entries with empty keys (as filtered by subscriber) +// 3. Entries with no data +func (hms *HybridMessageScanner) isControlEntry(logEntry *filer_pb.LogEntry) bool { + // Skip entries with no data + if len(logEntry.Data) == 0 { + return true + } + + // Skip entries with empty keys (same logic as subscriber) + if len(logEntry.Key) == 0 { + return true + } + + // Check if this is a DataMessage with control field populated + dataMessage := &mq_pb.DataMessage{} + if err := proto.Unmarshal(logEntry.Data, dataMessage); err == nil { + // If it has a control field, it's a control message + if dataMessage.Ctrl != nil { + return true + } + } + + return false +} + +// convertLogEntryToRecordValue converts a filer_pb.LogEntry to schema_pb.RecordValue +// This handles both: +// 1. Live log entries (raw message format) +// 2. Parquet entries (already in schema_pb.RecordValue format) +func (hms *HybridMessageScanner) convertLogEntryToRecordValue(logEntry *filer_pb.LogEntry) (*schema_pb.RecordValue, string, error) { + // Try to unmarshal as RecordValue first (Parquet format) + recordValue := &schema_pb.RecordValue{} + if err := proto.Unmarshal(logEntry.Data, recordValue); err == nil { + // This is an archived message from Parquet files + // FIX: Add system columns from LogEntry to RecordValue + if recordValue.Fields == nil { + recordValue.Fields = make(map[string]*schema_pb.Value) + } + + // Add system columns from LogEntry + recordValue.Fields[SW_COLUMN_NAME_TIMESTAMP] = &schema_pb.Value{ + Kind: &schema_pb.Value_Int64Value{Int64Value: logEntry.TsNs}, + } + recordValue.Fields[SW_COLUMN_NAME_KEY] = &schema_pb.Value{ + Kind: &schema_pb.Value_BytesValue{BytesValue: logEntry.Key}, + } + + return recordValue, "parquet_archive", nil + } + + // If not a RecordValue, this is raw live message data - parse with schema + return hms.parseRawMessageWithSchema(logEntry) +} + +// parseRawMessageWithSchema parses raw live message data using the topic's schema +// This provides proper type conversion and field mapping instead of treating everything as strings +func (hms *HybridMessageScanner) parseRawMessageWithSchema(logEntry *filer_pb.LogEntry) (*schema_pb.RecordValue, string, error) { + recordValue := &schema_pb.RecordValue{ + Fields: make(map[string]*schema_pb.Value), + } + + // Add system columns (always present) + recordValue.Fields[SW_COLUMN_NAME_TIMESTAMP] = &schema_pb.Value{ + Kind: &schema_pb.Value_Int64Value{Int64Value: logEntry.TsNs}, + } + recordValue.Fields[SW_COLUMN_NAME_KEY] = &schema_pb.Value{ + Kind: &schema_pb.Value_BytesValue{BytesValue: logEntry.Key}, + } + + // Parse message data based on schema + if hms.recordSchema == nil || len(hms.recordSchema.Fields) == 0 { + // Fallback: No schema available, treat as single "data" field + recordValue.Fields["data"] = &schema_pb.Value{ + Kind: &schema_pb.Value_StringValue{StringValue: string(logEntry.Data)}, + } + return recordValue, "live_log", nil + } + + // Attempt schema-aware parsing + // Strategy 1: Try JSON parsing first (most common for live messages) + if parsedRecord, err := hms.parseJSONMessage(logEntry.Data); err == nil { + // Successfully parsed as JSON, merge with system columns + for fieldName, fieldValue := range parsedRecord.Fields { + recordValue.Fields[fieldName] = fieldValue + } + return recordValue, "live_log", nil + } + + // Strategy 2: Try protobuf parsing (binary messages) + if parsedRecord, err := hms.parseProtobufMessage(logEntry.Data); err == nil { + // Successfully parsed as protobuf, merge with system columns + for fieldName, fieldValue := range parsedRecord.Fields { + recordValue.Fields[fieldName] = fieldValue + } + return recordValue, "live_log", nil + } + + // Strategy 3: Fallback to single field with raw data + // If schema has a single field, map the raw data to it with type conversion + if len(hms.recordSchema.Fields) == 1 { + field := hms.recordSchema.Fields[0] + convertedValue, err := hms.convertRawDataToSchemaValue(logEntry.Data, field.Type) + if err == nil { + recordValue.Fields[field.Name] = convertedValue + return recordValue, "live_log", nil + } + } + + // Final fallback: treat as string data field + recordValue.Fields["data"] = &schema_pb.Value{ + Kind: &schema_pb.Value_StringValue{StringValue: string(logEntry.Data)}, + } + + return recordValue, "live_log", nil +} + +// parseJSONMessage attempts to parse raw data as JSON and map to schema fields +func (hms *HybridMessageScanner) parseJSONMessage(data []byte) (*schema_pb.RecordValue, error) { + // Try to parse as JSON + var jsonData map[string]interface{} + if err := json.Unmarshal(data, &jsonData); err != nil { + return nil, fmt.Errorf("not valid JSON: %v", err) + } + + recordValue := &schema_pb.RecordValue{ + Fields: make(map[string]*schema_pb.Value), + } + + // Map JSON fields to schema fields + for _, schemaField := range hms.recordSchema.Fields { + fieldName := schemaField.Name + if jsonValue, exists := jsonData[fieldName]; exists { + schemaValue, err := hms.convertJSONValueToSchemaValue(jsonValue, schemaField.Type) + if err != nil { + // Log conversion error but continue with other fields + continue + } + recordValue.Fields[fieldName] = schemaValue + } + } + + return recordValue, nil +} + +// parseProtobufMessage attempts to parse raw data as protobuf RecordValue +func (hms *HybridMessageScanner) parseProtobufMessage(data []byte) (*schema_pb.RecordValue, error) { + // This might be a raw protobuf message that didn't parse correctly the first time + // Try alternative protobuf unmarshaling approaches + recordValue := &schema_pb.RecordValue{} + + // Strategy 1: Direct unmarshaling (might work if it's actually a RecordValue) + if err := proto.Unmarshal(data, recordValue); err == nil { + return recordValue, nil + } + + // Strategy 2: Check if it's a different protobuf message type + // For now, return error as we need more specific knowledge of MQ message formats + return nil, fmt.Errorf("could not parse as protobuf RecordValue") +} + +// convertRawDataToSchemaValue converts raw bytes to a specific schema type +func (hms *HybridMessageScanner) convertRawDataToSchemaValue(data []byte, fieldType *schema_pb.Type) (*schema_pb.Value, error) { + dataStr := string(data) + + switch fieldType.Kind.(type) { + case *schema_pb.Type_ScalarType: + scalarType := fieldType.GetScalarType() + switch scalarType { + case schema_pb.ScalarType_STRING: + return &schema_pb.Value{ + Kind: &schema_pb.Value_StringValue{StringValue: dataStr}, + }, nil + case schema_pb.ScalarType_INT32: + if val, err := strconv.ParseInt(strings.TrimSpace(dataStr), 10, 32); err == nil { + return &schema_pb.Value{ + Kind: &schema_pb.Value_Int32Value{Int32Value: int32(val)}, + }, nil + } + case schema_pb.ScalarType_INT64: + if val, err := strconv.ParseInt(strings.TrimSpace(dataStr), 10, 64); err == nil { + return &schema_pb.Value{ + Kind: &schema_pb.Value_Int64Value{Int64Value: val}, + }, nil + } + case schema_pb.ScalarType_FLOAT: + if val, err := strconv.ParseFloat(strings.TrimSpace(dataStr), 32); err == nil { + return &schema_pb.Value{ + Kind: &schema_pb.Value_FloatValue{FloatValue: float32(val)}, + }, nil + } + case schema_pb.ScalarType_DOUBLE: + if val, err := strconv.ParseFloat(strings.TrimSpace(dataStr), 64); err == nil { + return &schema_pb.Value{ + Kind: &schema_pb.Value_DoubleValue{DoubleValue: val}, + }, nil + } + case schema_pb.ScalarType_BOOL: + lowerStr := strings.ToLower(strings.TrimSpace(dataStr)) + if lowerStr == "true" || lowerStr == "1" || lowerStr == "yes" { + return &schema_pb.Value{ + Kind: &schema_pb.Value_BoolValue{BoolValue: true}, + }, nil + } else if lowerStr == "false" || lowerStr == "0" || lowerStr == "no" { + return &schema_pb.Value{ + Kind: &schema_pb.Value_BoolValue{BoolValue: false}, + }, nil + } + case schema_pb.ScalarType_BYTES: + return &schema_pb.Value{ + Kind: &schema_pb.Value_BytesValue{BytesValue: data}, + }, nil + } + } + + return nil, fmt.Errorf("unsupported type conversion for %v", fieldType) +} + +// convertJSONValueToSchemaValue converts a JSON value to schema_pb.Value based on schema type +func (hms *HybridMessageScanner) convertJSONValueToSchemaValue(jsonValue interface{}, fieldType *schema_pb.Type) (*schema_pb.Value, error) { + switch fieldType.Kind.(type) { + case *schema_pb.Type_ScalarType: + scalarType := fieldType.GetScalarType() + switch scalarType { + case schema_pb.ScalarType_STRING: + if str, ok := jsonValue.(string); ok { + return &schema_pb.Value{ + Kind: &schema_pb.Value_StringValue{StringValue: str}, + }, nil + } + // Convert other types to string + return &schema_pb.Value{ + Kind: &schema_pb.Value_StringValue{StringValue: fmt.Sprintf("%v", jsonValue)}, + }, nil + case schema_pb.ScalarType_INT32: + if num, ok := jsonValue.(float64); ok { // JSON numbers are float64 + return &schema_pb.Value{ + Kind: &schema_pb.Value_Int32Value{Int32Value: int32(num)}, + }, nil + } + case schema_pb.ScalarType_INT64: + if num, ok := jsonValue.(float64); ok { + return &schema_pb.Value{ + Kind: &schema_pb.Value_Int64Value{Int64Value: int64(num)}, + }, nil + } + case schema_pb.ScalarType_FLOAT: + if num, ok := jsonValue.(float64); ok { + return &schema_pb.Value{ + Kind: &schema_pb.Value_FloatValue{FloatValue: float32(num)}, + }, nil + } + case schema_pb.ScalarType_DOUBLE: + if num, ok := jsonValue.(float64); ok { + return &schema_pb.Value{ + Kind: &schema_pb.Value_DoubleValue{DoubleValue: num}, + }, nil + } + case schema_pb.ScalarType_BOOL: + if boolVal, ok := jsonValue.(bool); ok { + return &schema_pb.Value{ + Kind: &schema_pb.Value_BoolValue{BoolValue: boolVal}, + }, nil + } + case schema_pb.ScalarType_BYTES: + if str, ok := jsonValue.(string); ok { + return &schema_pb.Value{ + Kind: &schema_pb.Value_BytesValue{BytesValue: []byte(str)}, + }, nil + } + } + } + + return nil, fmt.Errorf("incompatible JSON value type %T for schema type %v", jsonValue, fieldType) +} + +// ConvertToSQLResult converts HybridScanResults to SQL query results +func (hms *HybridMessageScanner) ConvertToSQLResult(results []HybridScanResult, columns []string) *QueryResult { + if len(results) == 0 { + return &QueryResult{ + Columns: columns, + Rows: [][]sqltypes.Value{}, + Database: hms.topic.Namespace, + Table: hms.topic.Name, + } + } + + // Determine columns if not specified + if len(columns) == 0 { + columnSet := make(map[string]bool) + for _, result := range results { + for columnName := range result.Values { + columnSet[columnName] = true + } + } + + columns = make([]string, 0, len(columnSet)) + for columnName := range columnSet { + columns = append(columns, columnName) + } + } + + // Convert to SQL rows + rows := make([][]sqltypes.Value, len(results)) + for i, result := range results { + row := make([]sqltypes.Value, len(columns)) + for j, columnName := range columns { + switch columnName { + case SW_COLUMN_NAME_SOURCE: + row[j] = sqltypes.NewVarChar(result.Source) + case SW_COLUMN_NAME_TIMESTAMP, SW_DISPLAY_NAME_TIMESTAMP: + // Format timestamp as proper timestamp type instead of raw nanoseconds + row[j] = hms.engine.formatTimestampColumn(result.Timestamp) + case SW_COLUMN_NAME_KEY: + row[j] = sqltypes.NewVarBinary(string(result.Key)) + default: + if value, exists := result.Values[columnName]; exists { + row[j] = convertSchemaValueToSQL(value) + } else { + row[j] = sqltypes.NULL + } + } + } + rows[i] = row + } + + return &QueryResult{ + Columns: columns, + Rows: rows, + Database: hms.topic.Namespace, + Table: hms.topic.Name, + } +} + +// ConvertToSQLResultWithMixedColumns handles SELECT *, specific_columns queries +// Combines auto-discovered columns (from *) with explicitly requested columns +func (hms *HybridMessageScanner) ConvertToSQLResultWithMixedColumns(results []HybridScanResult, explicitColumns []string) *QueryResult { + if len(results) == 0 { + // For empty results, combine auto-discovered columns with explicit ones + columnSet := make(map[string]bool) + + // Add explicit columns first + for _, col := range explicitColumns { + columnSet[col] = true + } + + // Build final column list + columns := make([]string, 0, len(columnSet)) + for col := range columnSet { + columns = append(columns, col) + } + + return &QueryResult{ + Columns: columns, + Rows: [][]sqltypes.Value{}, + Database: hms.topic.Namespace, + Table: hms.topic.Name, + } + } + + // Auto-discover columns from data (like SELECT *) + autoColumns := make(map[string]bool) + for _, result := range results { + for columnName := range result.Values { + autoColumns[columnName] = true + } + } + + // Combine auto-discovered and explicit columns + columnSet := make(map[string]bool) + + // Add auto-discovered columns first (regular data columns) + for col := range autoColumns { + columnSet[col] = true + } + + // Add explicit columns (may include system columns like _source) + for _, col := range explicitColumns { + columnSet[col] = true + } + + // Build final column list + columns := make([]string, 0, len(columnSet)) + for col := range columnSet { + columns = append(columns, col) + } + + // Convert to SQL rows + rows := make([][]sqltypes.Value, len(results)) + for i, result := range results { + row := make([]sqltypes.Value, len(columns)) + for j, columnName := range columns { + switch columnName { + case SW_COLUMN_NAME_TIMESTAMP: + row[j] = sqltypes.NewInt64(result.Timestamp) + case SW_COLUMN_NAME_KEY: + row[j] = sqltypes.NewVarBinary(string(result.Key)) + case SW_COLUMN_NAME_SOURCE: + row[j] = sqltypes.NewVarChar(result.Source) + default: + // Regular data column + if value, exists := result.Values[columnName]; exists { + row[j] = convertSchemaValueToSQL(value) + } else { + row[j] = sqltypes.NULL + } + } + } + rows[i] = row + } + + return &QueryResult{ + Columns: columns, + Rows: rows, + Database: hms.topic.Namespace, + Table: hms.topic.Name, + } +} + +// ReadParquetStatistics efficiently reads column statistics from parquet files +// without scanning the full file content - uses parquet's built-in metadata +func (h *HybridMessageScanner) ReadParquetStatistics(partitionPath string) ([]*ParquetFileStats, error) { + var fileStats []*ParquetFileStats + + // Use the same chunk cache as the logstore package + chunkCache := chunk_cache.NewChunkCacheInMemory(256) + lookupFileIdFn := filer.LookupFn(h.filerClient) + + err := filer_pb.ReadDirAllEntries(context.Background(), h.filerClient, util.FullPath(partitionPath), "", func(entry *filer_pb.Entry, isLast bool) error { + // Only process parquet files + if entry.IsDirectory || !strings.HasSuffix(entry.Name, ".parquet") { + return nil + } + + // Extract statistics from this parquet file + stats, err := h.extractParquetFileStats(entry, lookupFileIdFn, chunkCache) + if err != nil { + // Log error but continue processing other files + fmt.Printf("Warning: failed to extract stats from %s: %v\n", entry.Name, err) + return nil + } + + if stats != nil { + fileStats = append(fileStats, stats) + } + return nil + }) + + return fileStats, err +} + +// extractParquetFileStats extracts column statistics from a single parquet file +func (h *HybridMessageScanner) extractParquetFileStats(entry *filer_pb.Entry, lookupFileIdFn wdclient.LookupFileIdFunctionType, chunkCache *chunk_cache.ChunkCacheInMemory) (*ParquetFileStats, error) { + // Create reader for the parquet file + fileSize := filer.FileSize(entry) + visibleIntervals, _ := filer.NonOverlappingVisibleIntervals(context.Background(), lookupFileIdFn, entry.Chunks, 0, int64(fileSize)) + chunkViews := filer.ViewFromVisibleIntervals(visibleIntervals, 0, int64(fileSize)) + readerCache := filer.NewReaderCache(32, chunkCache, lookupFileIdFn) + readerAt := filer.NewChunkReaderAtFromClient(context.Background(), readerCache, chunkViews, int64(fileSize)) + + // Create parquet reader - this only reads metadata, not data + parquetReader := parquet.NewReader(readerAt) + defer parquetReader.Close() + + fileView := parquetReader.File() + + fileStats := &ParquetFileStats{ + FileName: entry.Name, + RowCount: fileView.NumRows(), + ColumnStats: make(map[string]*ParquetColumnStats), + } + + // Get schema information + schema := fileView.Schema() + + // Process each row group + rowGroups := fileView.RowGroups() + for _, rowGroup := range rowGroups { + columnChunks := rowGroup.ColumnChunks() + + // Process each column chunk + for i, chunk := range columnChunks { + // Get column name from schema + columnName := h.getColumnNameFromSchema(schema, i) + if columnName == "" { + continue + } + + // Try to get column statistics + columnIndex, err := chunk.ColumnIndex() + if err != nil { + // No column index available - skip this column + continue + } + + // Extract min/max values from the first page (for simplicity) + // In a more sophisticated implementation, we could aggregate across all pages + numPages := columnIndex.NumPages() + if numPages == 0 { + continue + } + + minParquetValue := columnIndex.MinValue(0) + maxParquetValue := columnIndex.MaxValue(numPages - 1) + nullCount := int64(0) + + // Aggregate null counts across all pages + for pageIdx := 0; pageIdx < numPages; pageIdx++ { + nullCount += columnIndex.NullCount(pageIdx) + } + + // Convert parquet values to schema_pb.Value + minValue, err := h.convertParquetValueToSchemaValue(minParquetValue) + if err != nil { + continue + } + + maxValue, err := h.convertParquetValueToSchemaValue(maxParquetValue) + if err != nil { + continue + } + + // Store column statistics (aggregate across row groups if column already exists) + if existingStats, exists := fileStats.ColumnStats[columnName]; exists { + // Update existing statistics + if h.compareSchemaValues(minValue, existingStats.MinValue) < 0 { + existingStats.MinValue = minValue + } + if h.compareSchemaValues(maxValue, existingStats.MaxValue) > 0 { + existingStats.MaxValue = maxValue + } + existingStats.NullCount += nullCount + } else { + // Create new column statistics + fileStats.ColumnStats[columnName] = &ParquetColumnStats{ + ColumnName: columnName, + MinValue: minValue, + MaxValue: maxValue, + NullCount: nullCount, + RowCount: rowGroup.NumRows(), + } + } + } + } + + return fileStats, nil +} + +// getColumnNameFromSchema extracts column name from parquet schema by index +func (h *HybridMessageScanner) getColumnNameFromSchema(schema *parquet.Schema, columnIndex int) string { + // Get the leaf columns in order + var columnNames []string + h.collectColumnNames(schema.Fields(), &columnNames) + + if columnIndex >= 0 && columnIndex < len(columnNames) { + return columnNames[columnIndex] + } + return "" +} + +// collectColumnNames recursively collects leaf column names from schema +func (h *HybridMessageScanner) collectColumnNames(fields []parquet.Field, names *[]string) { + for _, field := range fields { + if len(field.Fields()) == 0 { + // This is a leaf field (no sub-fields) + *names = append(*names, field.Name()) + } else { + // This is a group - recurse + h.collectColumnNames(field.Fields(), names) + } + } +} + +// convertParquetValueToSchemaValue converts parquet.Value to schema_pb.Value +func (h *HybridMessageScanner) convertParquetValueToSchemaValue(pv parquet.Value) (*schema_pb.Value, error) { + switch pv.Kind() { + case parquet.Boolean: + return &schema_pb.Value{Kind: &schema_pb.Value_BoolValue{BoolValue: pv.Boolean()}}, nil + case parquet.Int32: + return &schema_pb.Value{Kind: &schema_pb.Value_Int32Value{Int32Value: pv.Int32()}}, nil + case parquet.Int64: + return &schema_pb.Value{Kind: &schema_pb.Value_Int64Value{Int64Value: pv.Int64()}}, nil + case parquet.Float: + return &schema_pb.Value{Kind: &schema_pb.Value_FloatValue{FloatValue: pv.Float()}}, nil + case parquet.Double: + return &schema_pb.Value{Kind: &schema_pb.Value_DoubleValue{DoubleValue: pv.Double()}}, nil + case parquet.ByteArray: + return &schema_pb.Value{Kind: &schema_pb.Value_BytesValue{BytesValue: pv.ByteArray()}}, nil + default: + return nil, fmt.Errorf("unsupported parquet value kind: %v", pv.Kind()) + } +} + +// compareSchemaValues compares two schema_pb.Value objects +func (h *HybridMessageScanner) compareSchemaValues(v1, v2 *schema_pb.Value) int { + if v1 == nil && v2 == nil { + return 0 + } + if v1 == nil { + return -1 + } + if v2 == nil { + return 1 + } + + // Extract raw values and compare + raw1 := h.extractRawValueFromSchema(v1) + raw2 := h.extractRawValueFromSchema(v2) + + return h.compareRawValues(raw1, raw2) +} + +// extractRawValueFromSchema extracts the raw value from schema_pb.Value +func (h *HybridMessageScanner) extractRawValueFromSchema(value *schema_pb.Value) interface{} { + switch v := value.Kind.(type) { + case *schema_pb.Value_BoolValue: + return v.BoolValue + case *schema_pb.Value_Int32Value: + return v.Int32Value + case *schema_pb.Value_Int64Value: + return v.Int64Value + case *schema_pb.Value_FloatValue: + return v.FloatValue + case *schema_pb.Value_DoubleValue: + return v.DoubleValue + case *schema_pb.Value_BytesValue: + return string(v.BytesValue) // Convert to string for comparison + case *schema_pb.Value_StringValue: + return v.StringValue + } + return nil +} + +// compareRawValues compares two raw values +func (h *HybridMessageScanner) compareRawValues(v1, v2 interface{}) int { + // Handle nil cases + if v1 == nil && v2 == nil { + return 0 + } + if v1 == nil { + return -1 + } + if v2 == nil { + return 1 + } + + // Compare based on type + switch val1 := v1.(type) { + case bool: + if val2, ok := v2.(bool); ok { + if val1 == val2 { + return 0 + } + if val1 { + return 1 + } + return -1 + } + case int32: + if val2, ok := v2.(int32); ok { + if val1 < val2 { + return -1 + } else if val1 > val2 { + return 1 + } + return 0 + } + case int64: + if val2, ok := v2.(int64); ok { + if val1 < val2 { + return -1 + } else if val1 > val2 { + return 1 + } + return 0 + } + case float32: + if val2, ok := v2.(float32); ok { + if val1 < val2 { + return -1 + } else if val1 > val2 { + return 1 + } + return 0 + } + case float64: + if val2, ok := v2.(float64); ok { + if val1 < val2 { + return -1 + } else if val1 > val2 { + return 1 + } + return 0 + } + case string: + if val2, ok := v2.(string); ok { + if val1 < val2 { + return -1 + } else if val1 > val2 { + return 1 + } + return 0 + } + } + + // Default: try string comparison + str1 := fmt.Sprintf("%v", v1) + str2 := fmt.Sprintf("%v", v2) + if str1 < str2 { + return -1 + } else if str1 > str2 { + return 1 + } + return 0 +} + +// streamingMerge merges multiple sorted data sources using a heap-based approach +// This provides memory-efficient merging without loading all data into memory +func (hms *HybridMessageScanner) streamingMerge(dataSources []StreamingDataSource, limit int) ([]HybridScanResult, error) { + if len(dataSources) == 0 { + return nil, nil + } + + var results []HybridScanResult + mergeHeap := &StreamingMergeHeap{} + heap.Init(mergeHeap) + + // Initialize heap with first item from each data source + for i, source := range dataSources { + if source.HasMore() { + result, err := source.Next() + if err != nil { + // Close all sources and return error + for _, s := range dataSources { + s.Close() + } + return nil, fmt.Errorf("failed to read from data source %d: %v", i, err) + } + if result != nil { + heap.Push(mergeHeap, &StreamingMergeItem{ + Result: result, + SourceID: i, + DataSource: source, + }) + } + } + } + + // Process results in chronological order + for mergeHeap.Len() > 0 { + // Get next chronologically ordered result + item := heap.Pop(mergeHeap).(*StreamingMergeItem) + results = append(results, *item.Result) + + // Check limit + if limit > 0 && len(results) >= limit { + break + } + + // Try to get next item from the same data source + if item.DataSource.HasMore() { + nextResult, err := item.DataSource.Next() + if err != nil { + // Log error but continue with other sources + fmt.Printf("Warning: Error reading next item from source %d: %v\n", item.SourceID, err) + } else if nextResult != nil { + heap.Push(mergeHeap, &StreamingMergeItem{ + Result: nextResult, + SourceID: item.SourceID, + DataSource: item.DataSource, + }) + } + } + } + + // Close all data sources + for _, source := range dataSources { + source.Close() + } + + return results, nil +} + +// SliceDataSource wraps a pre-loaded slice of results as a StreamingDataSource +// This is used for unflushed data that is already loaded into memory +type SliceDataSource struct { + results []HybridScanResult + index int +} + +func NewSliceDataSource(results []HybridScanResult) *SliceDataSource { + return &SliceDataSource{ + results: results, + index: 0, + } +} + +func (s *SliceDataSource) Next() (*HybridScanResult, error) { + if s.index >= len(s.results) { + return nil, nil + } + result := &s.results[s.index] + s.index++ + return result, nil +} + +func (s *SliceDataSource) HasMore() bool { + return s.index < len(s.results) +} + +func (s *SliceDataSource) Close() error { + return nil // Nothing to clean up for slice-based source +} + +// StreamingFlushedDataSource provides streaming access to flushed data +type StreamingFlushedDataSource struct { + hms *HybridMessageScanner + partition topic.Partition + options HybridScanOptions + mergedReadFn func(startPosition log_buffer.MessagePosition, stopTsNs int64, eachLogEntryFn log_buffer.EachLogEntryFuncType) (lastReadPosition log_buffer.MessagePosition, isDone bool, err error) + resultChan chan *HybridScanResult + errorChan chan error + doneChan chan struct{} + started bool + finished bool + closed int32 // atomic flag to prevent double close + mu sync.RWMutex +} + +func NewStreamingFlushedDataSource(hms *HybridMessageScanner, partition topic.Partition, options HybridScanOptions) *StreamingFlushedDataSource { + mergedReadFn := logstore.GenMergedReadFunc(hms.filerClient, hms.topic, partition) + + return &StreamingFlushedDataSource{ + hms: hms, + partition: partition, + options: options, + mergedReadFn: mergedReadFn, + resultChan: make(chan *HybridScanResult, 100), // Buffer for better performance + errorChan: make(chan error, 1), + doneChan: make(chan struct{}), + started: false, + finished: false, + } +} + +func (s *StreamingFlushedDataSource) startStreaming() { + if s.started { + return + } + s.started = true + + go func() { + defer func() { + // Use atomic flag to ensure channels are only closed once + if atomic.CompareAndSwapInt32(&s.closed, 0, 1) { + close(s.resultChan) + close(s.errorChan) + close(s.doneChan) + } + }() + + // Set up time range for scanning + startTime := time.Unix(0, s.options.StartTimeNs) + if s.options.StartTimeNs == 0 { + startTime = time.Unix(0, 0) + } + + stopTsNs := s.options.StopTimeNs + // For SQL queries, stopTsNs = 0 means "no stop time restriction" + // This is different from message queue consumers which want to stop at "now" + // We detect SQL context by checking if we have a predicate function + if stopTsNs == 0 && s.options.Predicate == nil { + // Only set to current time for non-SQL queries (message queue consumers) + stopTsNs = time.Now().UnixNano() + } + // If stopTsNs is still 0, it means this is a SQL query that wants unrestricted scanning + + // Message processing function + eachLogEntryFn := func(logEntry *filer_pb.LogEntry) (isDone bool, err error) { + // Skip control entries without actual data + if s.hms.isControlEntry(logEntry) { + return false, nil // Skip this entry + } + + // Convert log entry to schema_pb.RecordValue for consistent processing + recordValue, source, convertErr := s.hms.convertLogEntryToRecordValue(logEntry) + if convertErr != nil { + return false, fmt.Errorf("failed to convert log entry: %v", convertErr) + } + + // Apply predicate filtering (WHERE clause) + if s.options.Predicate != nil && !s.options.Predicate(recordValue) { + return false, nil // Skip this message + } + + // Extract system columns + timestamp := recordValue.Fields[SW_COLUMN_NAME_TIMESTAMP].GetInt64Value() + key := recordValue.Fields[SW_COLUMN_NAME_KEY].GetBytesValue() + + // Apply column projection + values := make(map[string]*schema_pb.Value) + if len(s.options.Columns) == 0 { + // Select all columns (excluding system columns from user view) + for name, value := range recordValue.Fields { + if name != SW_COLUMN_NAME_TIMESTAMP && name != SW_COLUMN_NAME_KEY { + values[name] = value + } + } + } else { + // Select specified columns only + for _, columnName := range s.options.Columns { + if value, exists := recordValue.Fields[columnName]; exists { + values[columnName] = value + } + } + } + + result := &HybridScanResult{ + Values: values, + Timestamp: timestamp, + Key: key, + Source: source, + } + + // Check if already closed before trying to send + if atomic.LoadInt32(&s.closed) != 0 { + return true, nil // Stop processing if closed + } + + // Send result to channel with proper handling of closed channels + select { + case s.resultChan <- result: + return false, nil + case <-s.doneChan: + return true, nil // Stop processing if closed + default: + // Check again if closed (in case it was closed between the atomic check and select) + if atomic.LoadInt32(&s.closed) != 0 { + return true, nil + } + // If not closed, try sending again with blocking select + select { + case s.resultChan <- result: + return false, nil + case <-s.doneChan: + return true, nil + } + } + } + + // Start scanning from the specified position + startPosition := log_buffer.MessagePosition{Time: startTime} + _, _, err := s.mergedReadFn(startPosition, stopTsNs, eachLogEntryFn) + + if err != nil { + // Only try to send error if not already closed + if atomic.LoadInt32(&s.closed) == 0 { + select { + case s.errorChan <- fmt.Errorf("flushed data scan failed: %v", err): + case <-s.doneChan: + default: + // Channel might be full or closed, ignore + } + } + } + + s.finished = true + }() +} + +func (s *StreamingFlushedDataSource) Next() (*HybridScanResult, error) { + if !s.started { + s.startStreaming() + } + + select { + case result, ok := <-s.resultChan: + if !ok { + return nil, nil // No more results + } + return result, nil + case err := <-s.errorChan: + return nil, err + case <-s.doneChan: + return nil, nil + } +} + +func (s *StreamingFlushedDataSource) HasMore() bool { + if !s.started { + return true // Haven't started yet, so potentially has data + } + return !s.finished || len(s.resultChan) > 0 +} + +func (s *StreamingFlushedDataSource) Close() error { + // Use atomic flag to ensure channels are only closed once + if atomic.CompareAndSwapInt32(&s.closed, 0, 1) { + close(s.doneChan) + close(s.resultChan) + close(s.errorChan) + } + return nil +} + +// mergeSort efficiently sorts HybridScanResult slice by timestamp using merge sort algorithm +func (hms *HybridMessageScanner) mergeSort(results []HybridScanResult, left, right int) { + if left < right { + mid := left + (right-left)/2 + + // Recursively sort both halves + hms.mergeSort(results, left, mid) + hms.mergeSort(results, mid+1, right) + + // Merge the sorted halves + hms.merge(results, left, mid, right) + } +} + +// merge combines two sorted subarrays into a single sorted array +func (hms *HybridMessageScanner) merge(results []HybridScanResult, left, mid, right int) { + // Create temporary arrays for the two subarrays + leftArray := make([]HybridScanResult, mid-left+1) + rightArray := make([]HybridScanResult, right-mid) + + // Copy data to temporary arrays + copy(leftArray, results[left:mid+1]) + copy(rightArray, results[mid+1:right+1]) + + // Merge the temporary arrays back into results[left..right] + i, j, k := 0, 0, left + + for i < len(leftArray) && j < len(rightArray) { + if leftArray[i].Timestamp <= rightArray[j].Timestamp { + results[k] = leftArray[i] + i++ + } else { + results[k] = rightArray[j] + j++ + } + k++ + } + + // Copy remaining elements of leftArray, if any + for i < len(leftArray) { + results[k] = leftArray[i] + i++ + k++ + } + + // Copy remaining elements of rightArray, if any + for j < len(rightArray) { + results[k] = rightArray[j] + j++ + k++ + } +} |
