diff options
Diffstat (limited to 'weed/query/engine/catalog.go')
| -rw-r--r-- | weed/query/engine/catalog.go | 419 |
1 files changed, 419 insertions, 0 deletions
diff --git a/weed/query/engine/catalog.go b/weed/query/engine/catalog.go new file mode 100644 index 000000000..4cd39f3f0 --- /dev/null +++ b/weed/query/engine/catalog.go @@ -0,0 +1,419 @@ +package engine + +import ( + "context" + "fmt" + "sync" + "time" + + "github.com/seaweedfs/seaweedfs/weed/mq/schema" + "github.com/seaweedfs/seaweedfs/weed/mq/topic" + "github.com/seaweedfs/seaweedfs/weed/pb/filer_pb" + "github.com/seaweedfs/seaweedfs/weed/pb/schema_pb" +) + +// BrokerClientInterface defines the interface for broker client operations +// Both real BrokerClient and MockBrokerClient implement this interface +type BrokerClientInterface interface { + ListNamespaces(ctx context.Context) ([]string, error) + ListTopics(ctx context.Context, namespace string) ([]string, error) + GetTopicSchema(ctx context.Context, namespace, topic string) (*schema_pb.RecordType, error) + GetFilerClient() (filer_pb.FilerClient, error) + ConfigureTopic(ctx context.Context, namespace, topicName string, partitionCount int32, recordType *schema_pb.RecordType) error + DeleteTopic(ctx context.Context, namespace, topicName string) error + // GetUnflushedMessages returns only messages that haven't been flushed to disk yet + // This prevents double-counting when combining with disk-based data + GetUnflushedMessages(ctx context.Context, namespace, topicName string, partition topic.Partition, startTimeNs int64) ([]*filer_pb.LogEntry, error) +} + +// SchemaCatalog manages the mapping between MQ topics and SQL tables +// Assumptions: +// 1. Each MQ namespace corresponds to a SQL database +// 2. Each MQ topic corresponds to a SQL table +// 3. Topic schemas are cached for performance +// 4. Schema evolution is tracked via RevisionId +type SchemaCatalog struct { + mu sync.RWMutex + + // databases maps namespace names to database metadata + // Assumption: Namespace names are valid SQL database identifiers + databases map[string]*DatabaseInfo + + // currentDatabase tracks the active database context (for USE database) + // Assumption: Single-threaded usage per SQL session + currentDatabase string + + // brokerClient handles communication with MQ broker + brokerClient BrokerClientInterface // Use interface for dependency injection + + // defaultPartitionCount is the default number of partitions for new topics + // Can be overridden in CREATE TABLE statements with PARTITION COUNT option + defaultPartitionCount int32 + + // cacheTTL is the time-to-live for cached database and table information + // After this duration, cached data is considered stale and will be refreshed + cacheTTL time.Duration +} + +// DatabaseInfo represents a SQL database (MQ namespace) +type DatabaseInfo struct { + Name string + Tables map[string]*TableInfo + CachedAt time.Time // Timestamp when this database info was cached +} + +// TableInfo represents a SQL table (MQ topic) with schema information +// Assumptions: +// 1. All topic messages conform to the same schema within a revision +// 2. Schema evolution maintains backward compatibility +// 3. Primary key is implicitly the message timestamp/offset +type TableInfo struct { + Name string + Namespace string + Schema *schema.Schema + Columns []ColumnInfo + RevisionId uint32 + CachedAt time.Time // Timestamp when this table info was cached +} + +// ColumnInfo represents a SQL column (MQ schema field) +type ColumnInfo struct { + Name string + Type string // SQL type representation + Nullable bool // Assumption: MQ fields are nullable by default +} + +// NewSchemaCatalog creates a new schema catalog +// Uses master address for service discovery of filers and brokers +func NewSchemaCatalog(masterAddress string) *SchemaCatalog { + return &SchemaCatalog{ + databases: make(map[string]*DatabaseInfo), + brokerClient: NewBrokerClient(masterAddress), + defaultPartitionCount: 6, // Default partition count, can be made configurable via environment variable + cacheTTL: 5 * time.Minute, // Default cache TTL of 5 minutes, can be made configurable + } +} + +// ListDatabases returns all available databases (MQ namespaces) +// Assumption: This would be populated from MQ broker metadata +func (c *SchemaCatalog) ListDatabases() []string { + // Clean up expired cache entries first + c.mu.Lock() + c.cleanExpiredDatabases() + c.mu.Unlock() + + c.mu.RLock() + defer c.mu.RUnlock() + + // Try to get real namespaces from broker first + ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) + defer cancel() + + namespaces, err := c.brokerClient.ListNamespaces(ctx) + if err != nil { + // Silently handle broker connection errors + + // Fallback to cached databases if broker unavailable + databases := make([]string, 0, len(c.databases)) + for name := range c.databases { + databases = append(databases, name) + } + + // Return empty list if no cached data (no more sample data) + return databases + } + + return namespaces +} + +// ListTables returns all tables in a database (MQ topics in namespace) +func (c *SchemaCatalog) ListTables(database string) ([]string, error) { + // Clean up expired cache entries first + c.mu.Lock() + c.cleanExpiredDatabases() + c.mu.Unlock() + + c.mu.RLock() + defer c.mu.RUnlock() + + // Try to get real topics from broker first + ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) + defer cancel() + + topics, err := c.brokerClient.ListTopics(ctx, database) + if err != nil { + // Fallback to cached data if broker unavailable + db, exists := c.databases[database] + if !exists { + // Return empty list if database not found (no more sample data) + return []string{}, nil + } + + tables := make([]string, 0, len(db.Tables)) + for name := range db.Tables { + tables = append(tables, name) + } + return tables, nil + } + + return topics, nil +} + +// GetTableInfo returns detailed schema information for a table +// Assumption: Table exists and schema is accessible +func (c *SchemaCatalog) GetTableInfo(database, table string) (*TableInfo, error) { + // Clean up expired cache entries first + c.mu.Lock() + c.cleanExpiredDatabases() + c.mu.Unlock() + + c.mu.RLock() + db, exists := c.databases[database] + if !exists { + c.mu.RUnlock() + return nil, TableNotFoundError{ + Database: database, + Table: "", + } + } + + tableInfo, exists := db.Tables[table] + if !exists || c.isTableCacheExpired(tableInfo) { + c.mu.RUnlock() + + // Try to refresh table info from broker if not found or expired + ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) + defer cancel() + + recordType, err := c.brokerClient.GetTopicSchema(ctx, database, table) + if err != nil { + // If broker unavailable and we have expired cached data, return it + if exists { + return tableInfo, nil + } + // Otherwise return not found error + return nil, TableNotFoundError{ + Database: database, + Table: table, + } + } + + // Convert the broker response to schema and register it + mqSchema := &schema.Schema{ + RecordType: recordType, + RevisionId: 1, // Default revision for schema fetched from broker + } + + // Register the refreshed schema + err = c.RegisterTopic(database, table, mqSchema) + if err != nil { + // If registration fails but we have cached data, return it + if exists { + return tableInfo, nil + } + return nil, fmt.Errorf("failed to register topic schema: %v", err) + } + + // Get the newly registered table info + c.mu.RLock() + defer c.mu.RUnlock() + + db, exists := c.databases[database] + if !exists { + return nil, TableNotFoundError{ + Database: database, + Table: table, + } + } + + tableInfo, exists := db.Tables[table] + if !exists { + return nil, TableNotFoundError{ + Database: database, + Table: table, + } + } + + return tableInfo, nil + } + + c.mu.RUnlock() + return tableInfo, nil +} + +// RegisterTopic adds or updates a topic's schema information in the catalog +// Assumption: This is called when topics are created or schemas are modified +func (c *SchemaCatalog) RegisterTopic(namespace, topicName string, mqSchema *schema.Schema) error { + c.mu.Lock() + defer c.mu.Unlock() + + now := time.Now() + + // Ensure database exists + db, exists := c.databases[namespace] + if !exists { + db = &DatabaseInfo{ + Name: namespace, + Tables: make(map[string]*TableInfo), + CachedAt: now, + } + c.databases[namespace] = db + } + + // Convert MQ schema to SQL table info + tableInfo, err := c.convertMQSchemaToTableInfo(namespace, topicName, mqSchema) + if err != nil { + return fmt.Errorf("failed to convert MQ schema: %v", err) + } + + // Set the cached timestamp for the table + tableInfo.CachedAt = now + + db.Tables[topicName] = tableInfo + return nil +} + +// convertMQSchemaToTableInfo converts MQ schema to SQL table information +// Assumptions: +// 1. MQ scalar types map directly to SQL types +// 2. Complex types (arrays, maps) are serialized as JSON strings +// 3. All fields are nullable unless specifically marked otherwise +func (c *SchemaCatalog) convertMQSchemaToTableInfo(namespace, topicName string, mqSchema *schema.Schema) (*TableInfo, error) { + columns := make([]ColumnInfo, len(mqSchema.RecordType.Fields)) + + for i, field := range mqSchema.RecordType.Fields { + sqlType, err := c.convertMQFieldTypeToSQL(field.Type) + if err != nil { + return nil, fmt.Errorf("unsupported field type for '%s': %v", field.Name, err) + } + + columns[i] = ColumnInfo{ + Name: field.Name, + Type: sqlType, + Nullable: true, // Assumption: MQ fields are nullable by default + } + } + + return &TableInfo{ + Name: topicName, + Namespace: namespace, + Schema: mqSchema, + Columns: columns, + RevisionId: mqSchema.RevisionId, + }, nil +} + +// convertMQFieldTypeToSQL maps MQ field types to SQL types +// Uses standard SQL type mappings with PostgreSQL compatibility +func (c *SchemaCatalog) convertMQFieldTypeToSQL(fieldType *schema_pb.Type) (string, error) { + switch t := fieldType.Kind.(type) { + case *schema_pb.Type_ScalarType: + switch t.ScalarType { + case schema_pb.ScalarType_BOOL: + return "BOOLEAN", nil + case schema_pb.ScalarType_INT32: + return "INT", nil + case schema_pb.ScalarType_INT64: + return "BIGINT", nil + case schema_pb.ScalarType_FLOAT: + return "FLOAT", nil + case schema_pb.ScalarType_DOUBLE: + return "DOUBLE", nil + case schema_pb.ScalarType_BYTES: + return "VARBINARY", nil + case schema_pb.ScalarType_STRING: + return "VARCHAR(255)", nil // Assumption: Default string length + default: + return "", fmt.Errorf("unsupported scalar type: %v", t.ScalarType) + } + case *schema_pb.Type_ListType: + // Assumption: Lists are serialized as JSON strings in SQL + return "TEXT", nil + case *schema_pb.Type_RecordType: + // Assumption: Nested records are serialized as JSON strings + return "TEXT", nil + default: + return "", fmt.Errorf("unsupported field type: %T", t) + } +} + +// SetCurrentDatabase sets the active database context +// Assumption: Used for implementing "USE database" functionality +func (c *SchemaCatalog) SetCurrentDatabase(database string) error { + c.mu.Lock() + defer c.mu.Unlock() + + // TODO: Validate database exists in MQ broker + c.currentDatabase = database + return nil +} + +// GetCurrentDatabase returns the currently active database +func (c *SchemaCatalog) GetCurrentDatabase() string { + c.mu.RLock() + defer c.mu.RUnlock() + return c.currentDatabase +} + +// SetDefaultPartitionCount sets the default number of partitions for new topics +func (c *SchemaCatalog) SetDefaultPartitionCount(count int32) { + c.mu.Lock() + defer c.mu.Unlock() + c.defaultPartitionCount = count +} + +// GetDefaultPartitionCount returns the default number of partitions for new topics +func (c *SchemaCatalog) GetDefaultPartitionCount() int32 { + c.mu.RLock() + defer c.mu.RUnlock() + return c.defaultPartitionCount +} + +// SetCacheTTL sets the time-to-live for cached database and table information +func (c *SchemaCatalog) SetCacheTTL(ttl time.Duration) { + c.mu.Lock() + defer c.mu.Unlock() + c.cacheTTL = ttl +} + +// GetCacheTTL returns the current cache TTL setting +func (c *SchemaCatalog) GetCacheTTL() time.Duration { + c.mu.RLock() + defer c.mu.RUnlock() + return c.cacheTTL +} + +// isDatabaseCacheExpired checks if a database's cached information has expired +func (c *SchemaCatalog) isDatabaseCacheExpired(db *DatabaseInfo) bool { + return time.Since(db.CachedAt) > c.cacheTTL +} + +// isTableCacheExpired checks if a table's cached information has expired +func (c *SchemaCatalog) isTableCacheExpired(table *TableInfo) bool { + return time.Since(table.CachedAt) > c.cacheTTL +} + +// cleanExpiredDatabases removes expired database entries from cache +// Note: This method assumes the caller already holds the write lock +func (c *SchemaCatalog) cleanExpiredDatabases() { + for name, db := range c.databases { + if c.isDatabaseCacheExpired(db) { + delete(c.databases, name) + } else { + // Clean expired tables within non-expired databases + for tableName, table := range db.Tables { + if c.isTableCacheExpired(table) { + delete(db.Tables, tableName) + } + } + } + } +} + +// CleanExpiredCache removes all expired entries from the cache +// This method can be called externally to perform periodic cache cleanup +func (c *SchemaCatalog) CleanExpiredCache() { + c.mu.Lock() + defer c.mu.Unlock() + c.cleanExpiredDatabases() +} |
