aboutsummaryrefslogtreecommitdiff
path: root/weed/query/engine/hybrid_test.go
diff options
context:
space:
mode:
Diffstat (limited to 'weed/query/engine/hybrid_test.go')
-rw-r--r--weed/query/engine/hybrid_test.go309
1 files changed, 309 insertions, 0 deletions
diff --git a/weed/query/engine/hybrid_test.go b/weed/query/engine/hybrid_test.go
new file mode 100644
index 000000000..74ef256c7
--- /dev/null
+++ b/weed/query/engine/hybrid_test.go
@@ -0,0 +1,309 @@
+package engine
+
+import (
+ "context"
+ "fmt"
+ "strings"
+ "testing"
+)
+
+func TestSQLEngine_HybridSelectBasic(t *testing.T) {
+ engine := NewTestSQLEngine()
+
+ // Test SELECT with _source column to show both live and archived data
+ result, err := engine.ExecuteSQL(context.Background(), "SELECT *, _source FROM user_events")
+ if err != nil {
+ t.Fatalf("Expected no error, got %v", err)
+ }
+
+ if result.Error != nil {
+ t.Fatalf("Expected no query error, got %v", result.Error)
+ }
+
+ if len(result.Columns) == 0 {
+ t.Error("Expected columns in result")
+ }
+
+ // In mock environment, we only get live_log data from unflushed messages
+ // parquet_archive data would come from parquet files in a real system
+ if len(result.Rows) == 0 {
+ t.Error("Expected rows in result")
+ }
+
+ // Check that we have the _source column showing data source
+ hasSourceColumn := false
+ sourceColumnIndex := -1
+ for i, column := range result.Columns {
+ if column == SW_COLUMN_NAME_SOURCE {
+ hasSourceColumn = true
+ sourceColumnIndex = i
+ break
+ }
+ }
+
+ if !hasSourceColumn {
+ t.Skip("_source column not available in fallback mode - test requires real SeaweedFS cluster")
+ }
+
+ // Verify we have the expected data sources (in mock environment, only live_log)
+ if hasSourceColumn && sourceColumnIndex >= 0 {
+ foundLiveLog := false
+
+ for _, row := range result.Rows {
+ if sourceColumnIndex < len(row) {
+ source := row[sourceColumnIndex].ToString()
+ if source == "live_log" {
+ foundLiveLog = true
+ }
+ // In mock environment, all data comes from unflushed messages (live_log)
+ // In a real system, we would also see parquet_archive from parquet files
+ }
+ }
+
+ if !foundLiveLog {
+ t.Error("Expected to find live_log data source in results")
+ }
+
+ t.Logf("Found live_log data source from unflushed messages")
+ }
+}
+
+func TestSQLEngine_HybridSelectWithLimit(t *testing.T) {
+ engine := NewTestSQLEngine()
+
+ // Test SELECT with LIMIT on hybrid data
+ result, err := engine.ExecuteSQL(context.Background(), "SELECT * FROM user_events LIMIT 2")
+ if err != nil {
+ t.Fatalf("Expected no error, got %v", err)
+ }
+
+ if result.Error != nil {
+ t.Fatalf("Expected no query error, got %v", result.Error)
+ }
+
+ // Should have exactly 2 rows due to LIMIT
+ if len(result.Rows) != 2 {
+ t.Errorf("Expected 2 rows with LIMIT 2, got %d", len(result.Rows))
+ }
+}
+
+func TestSQLEngine_HybridSelectDifferentTables(t *testing.T) {
+ engine := NewTestSQLEngine()
+
+ // Test both user_events and system_logs tables
+ tables := []string{"user_events", "system_logs"}
+
+ for _, tableName := range tables {
+ result, err := engine.ExecuteSQL(context.Background(), fmt.Sprintf("SELECT *, _source FROM %s", tableName))
+ if err != nil {
+ t.Errorf("Error querying hybrid table %s: %v", tableName, err)
+ continue
+ }
+
+ if result.Error != nil {
+ t.Errorf("Query error for hybrid table %s: %v", tableName, result.Error)
+ continue
+ }
+
+ if len(result.Columns) == 0 {
+ t.Errorf("No columns returned for hybrid table %s", tableName)
+ }
+
+ if len(result.Rows) == 0 {
+ t.Errorf("No rows returned for hybrid table %s", tableName)
+ }
+
+ // Check for _source column
+ hasSourceColumn := false
+ for _, column := range result.Columns {
+ if column == "_source" {
+ hasSourceColumn = true
+ break
+ }
+ }
+
+ if !hasSourceColumn {
+ t.Logf("Table %s missing _source column - running in fallback mode", tableName)
+ }
+
+ t.Logf("Table %s: %d columns, %d rows with hybrid data sources", tableName, len(result.Columns), len(result.Rows))
+ }
+}
+
+func TestSQLEngine_HybridDataSource(t *testing.T) {
+ engine := NewTestSQLEngine()
+
+ // Test that we can distinguish between live and archived data
+ result, err := engine.ExecuteSQL(context.Background(), "SELECT user_id, event_type, _source FROM user_events")
+ if err != nil {
+ t.Fatalf("Expected no error, got %v", err)
+ }
+
+ if result.Error != nil {
+ t.Fatalf("Expected no query error, got %v", result.Error)
+ }
+
+ // Find the _source column
+ sourceColumnIndex := -1
+ eventTypeColumnIndex := -1
+
+ for i, column := range result.Columns {
+ switch column {
+ case "_source":
+ sourceColumnIndex = i
+ case "event_type":
+ eventTypeColumnIndex = i
+ }
+ }
+
+ if sourceColumnIndex == -1 {
+ t.Skip("Could not find _source column - test requires real SeaweedFS cluster")
+ }
+
+ if eventTypeColumnIndex == -1 {
+ t.Fatal("Could not find event_type column")
+ }
+
+ // Check the data characteristics
+ liveEventFound := false
+ archivedEventFound := false
+
+ for _, row := range result.Rows {
+ if sourceColumnIndex < len(row) && eventTypeColumnIndex < len(row) {
+ source := row[sourceColumnIndex].ToString()
+ eventType := row[eventTypeColumnIndex].ToString()
+
+ if source == "live_log" && strings.Contains(eventType, "live_") {
+ liveEventFound = true
+ t.Logf("Found live event: %s from %s", eventType, source)
+ }
+
+ if source == "parquet_archive" && strings.Contains(eventType, "archived_") {
+ archivedEventFound = true
+ t.Logf("Found archived event: %s from %s", eventType, source)
+ }
+ }
+ }
+
+ if !liveEventFound {
+ t.Error("Expected to find live events with live_ prefix")
+ }
+
+ if !archivedEventFound {
+ t.Error("Expected to find archived events with archived_ prefix")
+ }
+}
+
+func TestSQLEngine_HybridSystemLogs(t *testing.T) {
+ engine := NewTestSQLEngine()
+
+ // Test system_logs with hybrid data
+ result, err := engine.ExecuteSQL(context.Background(), "SELECT level, message, service, _source FROM system_logs")
+ if err != nil {
+ t.Fatalf("Expected no error, got %v", err)
+ }
+
+ if result.Error != nil {
+ t.Fatalf("Expected no query error, got %v", result.Error)
+ }
+
+ // Should have both live and archived system logs
+ if len(result.Rows) < 2 {
+ t.Errorf("Expected at least 2 system log entries, got %d", len(result.Rows))
+ }
+
+ // Find column indices
+ levelIndex := -1
+ sourceIndex := -1
+
+ for i, column := range result.Columns {
+ switch column {
+ case "level":
+ levelIndex = i
+ case "_source":
+ sourceIndex = i
+ }
+ }
+
+ // Verify we have both live and archived system logs
+ foundLive := false
+ foundArchived := false
+
+ for _, row := range result.Rows {
+ if sourceIndex >= 0 && sourceIndex < len(row) {
+ source := row[sourceIndex].ToString()
+
+ if source == "live_log" {
+ foundLive = true
+ if levelIndex >= 0 && levelIndex < len(row) {
+ level := row[levelIndex].ToString()
+ t.Logf("Live system log: level=%s", level)
+ }
+ }
+
+ if source == "parquet_archive" {
+ foundArchived = true
+ if levelIndex >= 0 && levelIndex < len(row) {
+ level := row[levelIndex].ToString()
+ t.Logf("Archived system log: level=%s", level)
+ }
+ }
+ }
+ }
+
+ if !foundLive {
+ t.Log("No live system logs found - running in fallback mode")
+ }
+
+ if !foundArchived {
+ t.Log("No archived system logs found - running in fallback mode")
+ }
+}
+
+func TestSQLEngine_HybridSelectWithTimeImplications(t *testing.T) {
+ engine := NewTestSQLEngine()
+
+ // Test that demonstrates the time-based nature of hybrid data
+ // Live data should be more recent than archived data
+ result, err := engine.ExecuteSQL(context.Background(), "SELECT event_type, _source FROM user_events")
+ if err != nil {
+ t.Fatalf("Expected no error, got %v", err)
+ }
+
+ if result.Error != nil {
+ t.Fatalf("Expected no query error, got %v", result.Error)
+ }
+
+ // This test documents that hybrid scanning provides a complete view
+ // of both recent (live) and historical (archived) data in a single query
+ liveCount := 0
+ archivedCount := 0
+
+ sourceIndex := -1
+ for i, column := range result.Columns {
+ if column == "_source" {
+ sourceIndex = i
+ break
+ }
+ }
+
+ if sourceIndex >= 0 {
+ for _, row := range result.Rows {
+ if sourceIndex < len(row) {
+ source := row[sourceIndex].ToString()
+ switch source {
+ case "live_log":
+ liveCount++
+ case "parquet_archive":
+ archivedCount++
+ }
+ }
+ }
+ }
+
+ t.Logf("Hybrid query results: %d live messages, %d archived messages", liveCount, archivedCount)
+
+ if liveCount == 0 && archivedCount == 0 {
+ t.Log("No live or archived messages found - running in fallback mode")
+ }
+}