aboutsummaryrefslogtreecommitdiff
path: root/weed/mq/offset/migration.go
blob: 4e0a6ab12c69781bc6fb9e77caab41d9be429c2e (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
package offset

import (
	"database/sql"
	"fmt"
	"time"
)

// MigrationVersion represents a database migration version
type MigrationVersion struct {
	Version     int
	Description string
	SQL         string
}

// GetMigrations returns all available migrations for offset storage
func GetMigrations() []MigrationVersion {
	return []MigrationVersion{
		{
			Version:     1,
			Description: "Create initial offset storage tables",
			SQL: `
				-- Partition offset checkpoints table
				-- TODO: Add _index as computed column when supported by database
				CREATE TABLE IF NOT EXISTS partition_offset_checkpoints (
					partition_key TEXT PRIMARY KEY,
					ring_size INTEGER NOT NULL,
					range_start INTEGER NOT NULL,
					range_stop INTEGER NOT NULL,
					unix_time_ns INTEGER NOT NULL,
					checkpoint_offset INTEGER NOT NULL,
					updated_at INTEGER NOT NULL
				);
				
				-- Offset mappings table for detailed tracking
				-- TODO: Add _index as computed column when supported by database
				CREATE TABLE IF NOT EXISTS offset_mappings (
					id INTEGER PRIMARY KEY AUTOINCREMENT,
					partition_key TEXT NOT NULL,
					kafka_offset INTEGER NOT NULL,
					smq_timestamp INTEGER NOT NULL,
					message_size INTEGER NOT NULL,
					created_at INTEGER NOT NULL,
					UNIQUE(partition_key, kafka_offset)
				);
				
				-- Schema migrations tracking table
				CREATE TABLE IF NOT EXISTS schema_migrations (
					version INTEGER PRIMARY KEY,
					description TEXT NOT NULL,
					applied_at INTEGER NOT NULL
				);
			`,
		},
		{
			Version:     2,
			Description: "Add indexes for performance optimization",
			SQL: `
				-- Indexes for performance
				CREATE INDEX IF NOT EXISTS idx_partition_offset_checkpoints_partition 
				ON partition_offset_checkpoints(partition_key);
				
				CREATE INDEX IF NOT EXISTS idx_offset_mappings_partition_offset 
				ON offset_mappings(partition_key, kafka_offset);
				
				CREATE INDEX IF NOT EXISTS idx_offset_mappings_timestamp 
				ON offset_mappings(partition_key, smq_timestamp);
				
				CREATE INDEX IF NOT EXISTS idx_offset_mappings_created_at 
				ON offset_mappings(created_at);
			`,
		},
		{
			Version:     3,
			Description: "Add partition metadata table for enhanced tracking",
			SQL: `
				-- Partition metadata table
				CREATE TABLE IF NOT EXISTS partition_metadata (
					partition_key TEXT PRIMARY KEY,
					ring_size INTEGER NOT NULL,
					range_start INTEGER NOT NULL,
					range_stop INTEGER NOT NULL,
					unix_time_ns INTEGER NOT NULL,
					created_at INTEGER NOT NULL,
					last_activity_at INTEGER NOT NULL,
					record_count INTEGER DEFAULT 0,
					total_size INTEGER DEFAULT 0
				);
				
				-- Index for partition metadata
				CREATE INDEX IF NOT EXISTS idx_partition_metadata_activity 
				ON partition_metadata(last_activity_at);
			`,
		},
	}
}

// MigrationManager handles database schema migrations
type MigrationManager struct {
	db *sql.DB
}

// NewMigrationManager creates a new migration manager
func NewMigrationManager(db *sql.DB) *MigrationManager {
	return &MigrationManager{db: db}
}

// GetCurrentVersion returns the current schema version
func (m *MigrationManager) GetCurrentVersion() (int, error) {
	// First, ensure the migrations table exists
	_, err := m.db.Exec(`
		CREATE TABLE IF NOT EXISTS schema_migrations (
			version INTEGER PRIMARY KEY,
			description TEXT NOT NULL,
			applied_at INTEGER NOT NULL
		)
	`)
	if err != nil {
		return 0, fmt.Errorf("failed to create migrations table: %w", err)
	}

	var version sql.NullInt64
	err = m.db.QueryRow("SELECT MAX(version) FROM schema_migrations").Scan(&version)
	if err != nil {
		return 0, fmt.Errorf("failed to get current version: %w", err)
	}

	if !version.Valid {
		return 0, nil // No migrations applied yet
	}

	return int(version.Int64), nil
}

// ApplyMigrations applies all pending migrations
func (m *MigrationManager) ApplyMigrations() error {
	currentVersion, err := m.GetCurrentVersion()
	if err != nil {
		return fmt.Errorf("failed to get current version: %w", err)
	}

	migrations := GetMigrations()

	for _, migration := range migrations {
		if migration.Version <= currentVersion {
			continue // Already applied
		}

		fmt.Printf("Applying migration %d: %s\n", migration.Version, migration.Description)

		// Begin transaction
		tx, err := m.db.Begin()
		if err != nil {
			return fmt.Errorf("failed to begin transaction for migration %d: %w", migration.Version, err)
		}

		// Execute migration SQL
		_, err = tx.Exec(migration.SQL)
		if err != nil {
			tx.Rollback()
			return fmt.Errorf("failed to execute migration %d: %w", migration.Version, err)
		}

		// Record migration as applied
		_, err = tx.Exec(
			"INSERT INTO schema_migrations (version, description, applied_at) VALUES (?, ?, ?)",
			migration.Version,
			migration.Description,
			getCurrentTimestamp(),
		)
		if err != nil {
			tx.Rollback()
			return fmt.Errorf("failed to record migration %d: %w", migration.Version, err)
		}

		// Commit transaction
		err = tx.Commit()
		if err != nil {
			return fmt.Errorf("failed to commit migration %d: %w", migration.Version, err)
		}

		fmt.Printf("Successfully applied migration %d\n", migration.Version)
	}

	return nil
}

// RollbackMigration rolls back a specific migration (if supported)
func (m *MigrationManager) RollbackMigration(version int) error {
	// TODO: Implement rollback functionality
	// ASSUMPTION: For now, rollbacks are not supported as they require careful planning
	return fmt.Errorf("migration rollbacks not implemented - manual intervention required")
}

// GetAppliedMigrations returns a list of all applied migrations
func (m *MigrationManager) GetAppliedMigrations() ([]AppliedMigration, error) {
	rows, err := m.db.Query(`
		SELECT version, description, applied_at 
		FROM schema_migrations 
		ORDER BY version
	`)
	if err != nil {
		return nil, fmt.Errorf("failed to query applied migrations: %w", err)
	}
	defer rows.Close()

	var migrations []AppliedMigration
	for rows.Next() {
		var migration AppliedMigration
		err := rows.Scan(&migration.Version, &migration.Description, &migration.AppliedAt)
		if err != nil {
			return nil, fmt.Errorf("failed to scan migration: %w", err)
		}
		migrations = append(migrations, migration)
	}

	return migrations, nil
}

// ValidateSchema validates that the database schema is up to date
func (m *MigrationManager) ValidateSchema() error {
	currentVersion, err := m.GetCurrentVersion()
	if err != nil {
		return fmt.Errorf("failed to get current version: %w", err)
	}

	migrations := GetMigrations()
	if len(migrations) == 0 {
		return nil
	}

	latestVersion := migrations[len(migrations)-1].Version
	if currentVersion < latestVersion {
		return fmt.Errorf("schema is outdated: current version %d, latest version %d", currentVersion, latestVersion)
	}

	return nil
}

// AppliedMigration represents a migration that has been applied
type AppliedMigration struct {
	Version     int
	Description string
	AppliedAt   int64
}

// getCurrentTimestamp returns the current timestamp in nanoseconds
func getCurrentTimestamp() int64 {
	return time.Now().UnixNano()
}

// CreateDatabase creates and initializes a new offset storage database
func CreateDatabase(dbPath string) (*sql.DB, error) {
	// TODO: Support different database types (PostgreSQL, MySQL, etc.)
	// ASSUMPTION: Using SQLite for now, can be extended for other databases

	db, err := sql.Open("sqlite3", dbPath)
	if err != nil {
		return nil, fmt.Errorf("failed to open database: %w", err)
	}

	// Configure SQLite for better performance
	pragmas := []string{
		"PRAGMA journal_mode=WAL",   // Write-Ahead Logging for better concurrency
		"PRAGMA synchronous=NORMAL", // Balance between safety and performance
		"PRAGMA cache_size=10000",   // Increase cache size
		"PRAGMA foreign_keys=ON",    // Enable foreign key constraints
		"PRAGMA temp_store=MEMORY",  // Store temporary tables in memory
	}

	for _, pragma := range pragmas {
		_, err := db.Exec(pragma)
		if err != nil {
			db.Close()
			return nil, fmt.Errorf("failed to set pragma %s: %w", pragma, err)
		}
	}

	// Apply migrations
	migrationManager := NewMigrationManager(db)
	err = migrationManager.ApplyMigrations()
	if err != nil {
		db.Close()
		return nil, fmt.Errorf("failed to apply migrations: %w", err)
	}

	return db, nil
}

// BackupDatabase creates a backup of the offset storage database
func BackupDatabase(sourceDB *sql.DB, backupPath string) error {
	// TODO: Implement database backup functionality
	// ASSUMPTION: This would use database-specific backup mechanisms
	return fmt.Errorf("database backup not implemented yet")
}

// RestoreDatabase restores a database from a backup
func RestoreDatabase(backupPath, targetPath string) error {
	// TODO: Implement database restore functionality
	// ASSUMPTION: This would use database-specific restore mechanisms
	return fmt.Errorf("database restore not implemented yet")
}