diff options
Diffstat (limited to 'weed/mq/offset/migration.go')
| -rw-r--r-- | weed/mq/offset/migration.go | 302 |
1 files changed, 302 insertions, 0 deletions
diff --git a/weed/mq/offset/migration.go b/weed/mq/offset/migration.go new file mode 100644 index 000000000..106129206 --- /dev/null +++ b/weed/mq/offset/migration.go @@ -0,0 +1,302 @@ +package offset + +import ( + "database/sql" + "fmt" + "time" +) + +// MigrationVersion represents a database migration version +type MigrationVersion struct { + Version int + Description string + SQL string +} + +// GetMigrations returns all available migrations for offset storage +func GetMigrations() []MigrationVersion { + return []MigrationVersion{ + { + Version: 1, + Description: "Create initial offset storage tables", + SQL: ` + -- Partition offset checkpoints table + -- TODO: Add _index as computed column when supported by database + CREATE TABLE IF NOT EXISTS partition_offset_checkpoints ( + partition_key TEXT PRIMARY KEY, + ring_size INTEGER NOT NULL, + range_start INTEGER NOT NULL, + range_stop INTEGER NOT NULL, + unix_time_ns INTEGER NOT NULL, + checkpoint_offset INTEGER NOT NULL, + updated_at INTEGER NOT NULL + ); + + -- Offset mappings table for detailed tracking + -- TODO: Add _index as computed column when supported by database + CREATE TABLE IF NOT EXISTS offset_mappings ( + id INTEGER PRIMARY KEY AUTOINCREMENT, + partition_key TEXT NOT NULL, + kafka_offset INTEGER NOT NULL, + smq_timestamp INTEGER NOT NULL, + message_size INTEGER NOT NULL, + created_at INTEGER NOT NULL, + UNIQUE(partition_key, kafka_offset) + ); + + -- Schema migrations tracking table + CREATE TABLE IF NOT EXISTS schema_migrations ( + version INTEGER PRIMARY KEY, + description TEXT NOT NULL, + applied_at INTEGER NOT NULL + ); + `, + }, + { + Version: 2, + Description: "Add indexes for performance optimization", + SQL: ` + -- Indexes for performance + CREATE INDEX IF NOT EXISTS idx_partition_offset_checkpoints_partition + ON partition_offset_checkpoints(partition_key); + + CREATE INDEX IF NOT EXISTS idx_offset_mappings_partition_offset + ON offset_mappings(partition_key, kafka_offset); + + CREATE INDEX IF NOT EXISTS idx_offset_mappings_timestamp + ON offset_mappings(partition_key, smq_timestamp); + + CREATE INDEX IF NOT EXISTS idx_offset_mappings_created_at + ON offset_mappings(created_at); + `, + }, + { + Version: 3, + Description: "Add partition metadata table for enhanced tracking", + SQL: ` + -- Partition metadata table + CREATE TABLE IF NOT EXISTS partition_metadata ( + partition_key TEXT PRIMARY KEY, + ring_size INTEGER NOT NULL, + range_start INTEGER NOT NULL, + range_stop INTEGER NOT NULL, + unix_time_ns INTEGER NOT NULL, + created_at INTEGER NOT NULL, + last_activity_at INTEGER NOT NULL, + record_count INTEGER DEFAULT 0, + total_size INTEGER DEFAULT 0 + ); + + -- Index for partition metadata + CREATE INDEX IF NOT EXISTS idx_partition_metadata_activity + ON partition_metadata(last_activity_at); + `, + }, + } +} + +// MigrationManager handles database schema migrations +type MigrationManager struct { + db *sql.DB +} + +// NewMigrationManager creates a new migration manager +func NewMigrationManager(db *sql.DB) *MigrationManager { + return &MigrationManager{db: db} +} + +// GetCurrentVersion returns the current schema version +func (m *MigrationManager) GetCurrentVersion() (int, error) { + // First, ensure the migrations table exists + _, err := m.db.Exec(` + CREATE TABLE IF NOT EXISTS schema_migrations ( + version INTEGER PRIMARY KEY, + description TEXT NOT NULL, + applied_at INTEGER NOT NULL + ) + `) + if err != nil { + return 0, fmt.Errorf("failed to create migrations table: %w", err) + } + + var version sql.NullInt64 + err = m.db.QueryRow("SELECT MAX(version) FROM schema_migrations").Scan(&version) + if err != nil { + return 0, fmt.Errorf("failed to get current version: %w", err) + } + + if !version.Valid { + return 0, nil // No migrations applied yet + } + + return int(version.Int64), nil +} + +// ApplyMigrations applies all pending migrations +func (m *MigrationManager) ApplyMigrations() error { + currentVersion, err := m.GetCurrentVersion() + if err != nil { + return fmt.Errorf("failed to get current version: %w", err) + } + + migrations := GetMigrations() + + for _, migration := range migrations { + if migration.Version <= currentVersion { + continue // Already applied + } + + fmt.Printf("Applying migration %d: %s\n", migration.Version, migration.Description) + + // Begin transaction + tx, err := m.db.Begin() + if err != nil { + return fmt.Errorf("failed to begin transaction for migration %d: %w", migration.Version, err) + } + + // Execute migration SQL + _, err = tx.Exec(migration.SQL) + if err != nil { + tx.Rollback() + return fmt.Errorf("failed to execute migration %d: %w", migration.Version, err) + } + + // Record migration as applied + _, err = tx.Exec( + "INSERT INTO schema_migrations (version, description, applied_at) VALUES (?, ?, ?)", + migration.Version, + migration.Description, + getCurrentTimestamp(), + ) + if err != nil { + tx.Rollback() + return fmt.Errorf("failed to record migration %d: %w", migration.Version, err) + } + + // Commit transaction + err = tx.Commit() + if err != nil { + return fmt.Errorf("failed to commit migration %d: %w", migration.Version, err) + } + + fmt.Printf("Successfully applied migration %d\n", migration.Version) + } + + return nil +} + +// RollbackMigration rolls back a specific migration (if supported) +func (m *MigrationManager) RollbackMigration(version int) error { + // TODO: Implement rollback functionality + // ASSUMPTION: For now, rollbacks are not supported as they require careful planning + return fmt.Errorf("migration rollbacks not implemented - manual intervention required") +} + +// GetAppliedMigrations returns a list of all applied migrations +func (m *MigrationManager) GetAppliedMigrations() ([]AppliedMigration, error) { + rows, err := m.db.Query(` + SELECT version, description, applied_at + FROM schema_migrations + ORDER BY version + `) + if err != nil { + return nil, fmt.Errorf("failed to query applied migrations: %w", err) + } + defer rows.Close() + + var migrations []AppliedMigration + for rows.Next() { + var migration AppliedMigration + err := rows.Scan(&migration.Version, &migration.Description, &migration.AppliedAt) + if err != nil { + return nil, fmt.Errorf("failed to scan migration: %w", err) + } + migrations = append(migrations, migration) + } + + return migrations, nil +} + +// ValidateSchema validates that the database schema is up to date +func (m *MigrationManager) ValidateSchema() error { + currentVersion, err := m.GetCurrentVersion() + if err != nil { + return fmt.Errorf("failed to get current version: %w", err) + } + + migrations := GetMigrations() + if len(migrations) == 0 { + return nil + } + + latestVersion := migrations[len(migrations)-1].Version + if currentVersion < latestVersion { + return fmt.Errorf("schema is outdated: current version %d, latest version %d", currentVersion, latestVersion) + } + + return nil +} + +// AppliedMigration represents a migration that has been applied +type AppliedMigration struct { + Version int + Description string + AppliedAt int64 +} + +// getCurrentTimestamp returns the current timestamp in nanoseconds +func getCurrentTimestamp() int64 { + return time.Now().UnixNano() +} + +// CreateDatabase creates and initializes a new offset storage database +func CreateDatabase(dbPath string) (*sql.DB, error) { + // TODO: Support different database types (PostgreSQL, MySQL, etc.) + // ASSUMPTION: Using SQLite for now, can be extended for other databases + + db, err := sql.Open("sqlite3", dbPath) + if err != nil { + return nil, fmt.Errorf("failed to open database: %w", err) + } + + // Configure SQLite for better performance + pragmas := []string{ + "PRAGMA journal_mode=WAL", // Write-Ahead Logging for better concurrency + "PRAGMA synchronous=NORMAL", // Balance between safety and performance + "PRAGMA cache_size=10000", // Increase cache size + "PRAGMA foreign_keys=ON", // Enable foreign key constraints + "PRAGMA temp_store=MEMORY", // Store temporary tables in memory + } + + for _, pragma := range pragmas { + _, err := db.Exec(pragma) + if err != nil { + db.Close() + return nil, fmt.Errorf("failed to set pragma %s: %w", pragma, err) + } + } + + // Apply migrations + migrationManager := NewMigrationManager(db) + err = migrationManager.ApplyMigrations() + if err != nil { + db.Close() + return nil, fmt.Errorf("failed to apply migrations: %w", err) + } + + return db, nil +} + +// BackupDatabase creates a backup of the offset storage database +func BackupDatabase(sourceDB *sql.DB, backupPath string) error { + // TODO: Implement database backup functionality + // ASSUMPTION: This would use database-specific backup mechanisms + return fmt.Errorf("database backup not implemented yet") +} + +// RestoreDatabase restores a database from a backup +func RestoreDatabase(backupPath, targetPath string) error { + // TODO: Implement database restore functionality + // ASSUMPTION: This would use database-specific restore mechanisms + return fmt.Errorf("database restore not implemented yet") +} |
