aboutsummaryrefslogtreecommitdiff
path: root/weed/mq/offset/migration.go
diff options
context:
space:
mode:
Diffstat (limited to 'weed/mq/offset/migration.go')
-rw-r--r--weed/mq/offset/migration.go302
1 files changed, 302 insertions, 0 deletions
diff --git a/weed/mq/offset/migration.go b/weed/mq/offset/migration.go
new file mode 100644
index 000000000..106129206
--- /dev/null
+++ b/weed/mq/offset/migration.go
@@ -0,0 +1,302 @@
+package offset
+
+import (
+ "database/sql"
+ "fmt"
+ "time"
+)
+
+// MigrationVersion represents a database migration version
+type MigrationVersion struct {
+ Version int
+ Description string
+ SQL string
+}
+
+// GetMigrations returns all available migrations for offset storage
+func GetMigrations() []MigrationVersion {
+ return []MigrationVersion{
+ {
+ Version: 1,
+ Description: "Create initial offset storage tables",
+ SQL: `
+ -- Partition offset checkpoints table
+ -- TODO: Add _index as computed column when supported by database
+ CREATE TABLE IF NOT EXISTS partition_offset_checkpoints (
+ partition_key TEXT PRIMARY KEY,
+ ring_size INTEGER NOT NULL,
+ range_start INTEGER NOT NULL,
+ range_stop INTEGER NOT NULL,
+ unix_time_ns INTEGER NOT NULL,
+ checkpoint_offset INTEGER NOT NULL,
+ updated_at INTEGER NOT NULL
+ );
+
+ -- Offset mappings table for detailed tracking
+ -- TODO: Add _index as computed column when supported by database
+ CREATE TABLE IF NOT EXISTS offset_mappings (
+ id INTEGER PRIMARY KEY AUTOINCREMENT,
+ partition_key TEXT NOT NULL,
+ kafka_offset INTEGER NOT NULL,
+ smq_timestamp INTEGER NOT NULL,
+ message_size INTEGER NOT NULL,
+ created_at INTEGER NOT NULL,
+ UNIQUE(partition_key, kafka_offset)
+ );
+
+ -- Schema migrations tracking table
+ CREATE TABLE IF NOT EXISTS schema_migrations (
+ version INTEGER PRIMARY KEY,
+ description TEXT NOT NULL,
+ applied_at INTEGER NOT NULL
+ );
+ `,
+ },
+ {
+ Version: 2,
+ Description: "Add indexes for performance optimization",
+ SQL: `
+ -- Indexes for performance
+ CREATE INDEX IF NOT EXISTS idx_partition_offset_checkpoints_partition
+ ON partition_offset_checkpoints(partition_key);
+
+ CREATE INDEX IF NOT EXISTS idx_offset_mappings_partition_offset
+ ON offset_mappings(partition_key, kafka_offset);
+
+ CREATE INDEX IF NOT EXISTS idx_offset_mappings_timestamp
+ ON offset_mappings(partition_key, smq_timestamp);
+
+ CREATE INDEX IF NOT EXISTS idx_offset_mappings_created_at
+ ON offset_mappings(created_at);
+ `,
+ },
+ {
+ Version: 3,
+ Description: "Add partition metadata table for enhanced tracking",
+ SQL: `
+ -- Partition metadata table
+ CREATE TABLE IF NOT EXISTS partition_metadata (
+ partition_key TEXT PRIMARY KEY,
+ ring_size INTEGER NOT NULL,
+ range_start INTEGER NOT NULL,
+ range_stop INTEGER NOT NULL,
+ unix_time_ns INTEGER NOT NULL,
+ created_at INTEGER NOT NULL,
+ last_activity_at INTEGER NOT NULL,
+ record_count INTEGER DEFAULT 0,
+ total_size INTEGER DEFAULT 0
+ );
+
+ -- Index for partition metadata
+ CREATE INDEX IF NOT EXISTS idx_partition_metadata_activity
+ ON partition_metadata(last_activity_at);
+ `,
+ },
+ }
+}
+
+// MigrationManager handles database schema migrations
+type MigrationManager struct {
+ db *sql.DB
+}
+
+// NewMigrationManager creates a new migration manager
+func NewMigrationManager(db *sql.DB) *MigrationManager {
+ return &MigrationManager{db: db}
+}
+
+// GetCurrentVersion returns the current schema version
+func (m *MigrationManager) GetCurrentVersion() (int, error) {
+ // First, ensure the migrations table exists
+ _, err := m.db.Exec(`
+ CREATE TABLE IF NOT EXISTS schema_migrations (
+ version INTEGER PRIMARY KEY,
+ description TEXT NOT NULL,
+ applied_at INTEGER NOT NULL
+ )
+ `)
+ if err != nil {
+ return 0, fmt.Errorf("failed to create migrations table: %w", err)
+ }
+
+ var version sql.NullInt64
+ err = m.db.QueryRow("SELECT MAX(version) FROM schema_migrations").Scan(&version)
+ if err != nil {
+ return 0, fmt.Errorf("failed to get current version: %w", err)
+ }
+
+ if !version.Valid {
+ return 0, nil // No migrations applied yet
+ }
+
+ return int(version.Int64), nil
+}
+
+// ApplyMigrations applies all pending migrations
+func (m *MigrationManager) ApplyMigrations() error {
+ currentVersion, err := m.GetCurrentVersion()
+ if err != nil {
+ return fmt.Errorf("failed to get current version: %w", err)
+ }
+
+ migrations := GetMigrations()
+
+ for _, migration := range migrations {
+ if migration.Version <= currentVersion {
+ continue // Already applied
+ }
+
+ fmt.Printf("Applying migration %d: %s\n", migration.Version, migration.Description)
+
+ // Begin transaction
+ tx, err := m.db.Begin()
+ if err != nil {
+ return fmt.Errorf("failed to begin transaction for migration %d: %w", migration.Version, err)
+ }
+
+ // Execute migration SQL
+ _, err = tx.Exec(migration.SQL)
+ if err != nil {
+ tx.Rollback()
+ return fmt.Errorf("failed to execute migration %d: %w", migration.Version, err)
+ }
+
+ // Record migration as applied
+ _, err = tx.Exec(
+ "INSERT INTO schema_migrations (version, description, applied_at) VALUES (?, ?, ?)",
+ migration.Version,
+ migration.Description,
+ getCurrentTimestamp(),
+ )
+ if err != nil {
+ tx.Rollback()
+ return fmt.Errorf("failed to record migration %d: %w", migration.Version, err)
+ }
+
+ // Commit transaction
+ err = tx.Commit()
+ if err != nil {
+ return fmt.Errorf("failed to commit migration %d: %w", migration.Version, err)
+ }
+
+ fmt.Printf("Successfully applied migration %d\n", migration.Version)
+ }
+
+ return nil
+}
+
+// RollbackMigration rolls back a specific migration (if supported)
+func (m *MigrationManager) RollbackMigration(version int) error {
+ // TODO: Implement rollback functionality
+ // ASSUMPTION: For now, rollbacks are not supported as they require careful planning
+ return fmt.Errorf("migration rollbacks not implemented - manual intervention required")
+}
+
+// GetAppliedMigrations returns a list of all applied migrations
+func (m *MigrationManager) GetAppliedMigrations() ([]AppliedMigration, error) {
+ rows, err := m.db.Query(`
+ SELECT version, description, applied_at
+ FROM schema_migrations
+ ORDER BY version
+ `)
+ if err != nil {
+ return nil, fmt.Errorf("failed to query applied migrations: %w", err)
+ }
+ defer rows.Close()
+
+ var migrations []AppliedMigration
+ for rows.Next() {
+ var migration AppliedMigration
+ err := rows.Scan(&migration.Version, &migration.Description, &migration.AppliedAt)
+ if err != nil {
+ return nil, fmt.Errorf("failed to scan migration: %w", err)
+ }
+ migrations = append(migrations, migration)
+ }
+
+ return migrations, nil
+}
+
+// ValidateSchema validates that the database schema is up to date
+func (m *MigrationManager) ValidateSchema() error {
+ currentVersion, err := m.GetCurrentVersion()
+ if err != nil {
+ return fmt.Errorf("failed to get current version: %w", err)
+ }
+
+ migrations := GetMigrations()
+ if len(migrations) == 0 {
+ return nil
+ }
+
+ latestVersion := migrations[len(migrations)-1].Version
+ if currentVersion < latestVersion {
+ return fmt.Errorf("schema is outdated: current version %d, latest version %d", currentVersion, latestVersion)
+ }
+
+ return nil
+}
+
+// AppliedMigration represents a migration that has been applied
+type AppliedMigration struct {
+ Version int
+ Description string
+ AppliedAt int64
+}
+
+// getCurrentTimestamp returns the current timestamp in nanoseconds
+func getCurrentTimestamp() int64 {
+ return time.Now().UnixNano()
+}
+
+// CreateDatabase creates and initializes a new offset storage database
+func CreateDatabase(dbPath string) (*sql.DB, error) {
+ // TODO: Support different database types (PostgreSQL, MySQL, etc.)
+ // ASSUMPTION: Using SQLite for now, can be extended for other databases
+
+ db, err := sql.Open("sqlite3", dbPath)
+ if err != nil {
+ return nil, fmt.Errorf("failed to open database: %w", err)
+ }
+
+ // Configure SQLite for better performance
+ pragmas := []string{
+ "PRAGMA journal_mode=WAL", // Write-Ahead Logging for better concurrency
+ "PRAGMA synchronous=NORMAL", // Balance between safety and performance
+ "PRAGMA cache_size=10000", // Increase cache size
+ "PRAGMA foreign_keys=ON", // Enable foreign key constraints
+ "PRAGMA temp_store=MEMORY", // Store temporary tables in memory
+ }
+
+ for _, pragma := range pragmas {
+ _, err := db.Exec(pragma)
+ if err != nil {
+ db.Close()
+ return nil, fmt.Errorf("failed to set pragma %s: %w", pragma, err)
+ }
+ }
+
+ // Apply migrations
+ migrationManager := NewMigrationManager(db)
+ err = migrationManager.ApplyMigrations()
+ if err != nil {
+ db.Close()
+ return nil, fmt.Errorf("failed to apply migrations: %w", err)
+ }
+
+ return db, nil
+}
+
+// BackupDatabase creates a backup of the offset storage database
+func BackupDatabase(sourceDB *sql.DB, backupPath string) error {
+ // TODO: Implement database backup functionality
+ // ASSUMPTION: This would use database-specific backup mechanisms
+ return fmt.Errorf("database backup not implemented yet")
+}
+
+// RestoreDatabase restores a database from a backup
+func RestoreDatabase(backupPath, targetPath string) error {
+ // TODO: Implement database restore functionality
+ // ASSUMPTION: This would use database-specific restore mechanisms
+ return fmt.Errorf("database restore not implemented yet")
+}