diff options
Diffstat (limited to 'weed/mq/offset/sql_storage.go')
| -rw-r--r-- | weed/mq/offset/sql_storage.go | 394 |
1 files changed, 394 insertions, 0 deletions
diff --git a/weed/mq/offset/sql_storage.go b/weed/mq/offset/sql_storage.go new file mode 100644 index 000000000..c3107e5a4 --- /dev/null +++ b/weed/mq/offset/sql_storage.go @@ -0,0 +1,394 @@ +package offset + +import ( + "database/sql" + "fmt" + "time" + + "github.com/seaweedfs/seaweedfs/weed/pb/schema_pb" +) + +// OffsetEntry represents a mapping between Kafka offset and SMQ timestamp +type OffsetEntry struct { + KafkaOffset int64 + SMQTimestamp int64 + MessageSize int32 +} + +// SQLOffsetStorage implements OffsetStorage using SQL database with _index column +type SQLOffsetStorage struct { + db *sql.DB +} + +// NewSQLOffsetStorage creates a new SQL-based offset storage +func NewSQLOffsetStorage(db *sql.DB) (*SQLOffsetStorage, error) { + storage := &SQLOffsetStorage{db: db} + + // Initialize database schema + if err := storage.initializeSchema(); err != nil { + return nil, fmt.Errorf("failed to initialize schema: %w", err) + } + + return storage, nil +} + +// initializeSchema creates the necessary tables for offset storage +func (s *SQLOffsetStorage) initializeSchema() error { + // TODO: Create offset storage tables with _index as hidden column + // ASSUMPTION: Using SQLite-compatible syntax, may need adaptation for other databases + + queries := []string{ + // Partition offset checkpoints table + // TODO: Add _index as computed column when supported by database + // ASSUMPTION: Using regular columns for now, _index concept preserved for future enhancement + `CREATE TABLE IF NOT EXISTS partition_offset_checkpoints ( + partition_key TEXT PRIMARY KEY, + ring_size INTEGER NOT NULL, + range_start INTEGER NOT NULL, + range_stop INTEGER NOT NULL, + unix_time_ns INTEGER NOT NULL, + checkpoint_offset INTEGER NOT NULL, + updated_at INTEGER NOT NULL + )`, + + // Offset mappings table for detailed tracking + // TODO: Add _index as computed column when supported by database + `CREATE TABLE IF NOT EXISTS offset_mappings ( + id INTEGER PRIMARY KEY AUTOINCREMENT, + partition_key TEXT NOT NULL, + kafka_offset INTEGER NOT NULL, + smq_timestamp INTEGER NOT NULL, + message_size INTEGER NOT NULL, + created_at INTEGER NOT NULL, + UNIQUE(partition_key, kafka_offset) + )`, + + // Indexes for performance + `CREATE INDEX IF NOT EXISTS idx_partition_offset_checkpoints_partition + ON partition_offset_checkpoints(partition_key)`, + + `CREATE INDEX IF NOT EXISTS idx_offset_mappings_partition_offset + ON offset_mappings(partition_key, kafka_offset)`, + + `CREATE INDEX IF NOT EXISTS idx_offset_mappings_timestamp + ON offset_mappings(partition_key, smq_timestamp)`, + } + + for _, query := range queries { + if _, err := s.db.Exec(query); err != nil { + return fmt.Errorf("failed to execute schema query: %w", err) + } + } + + return nil +} + +// SaveCheckpoint saves the checkpoint for a partition +func (s *SQLOffsetStorage) SaveCheckpoint(namespace, topicName string, partition *schema_pb.Partition, offset int64) error { + // Use TopicPartitionKey to ensure each topic has isolated checkpoint storage + partitionKey := TopicPartitionKey(namespace, topicName, partition) + now := time.Now().UnixNano() + + // TODO: Use UPSERT for better performance + // ASSUMPTION: SQLite REPLACE syntax, may need adaptation for other databases + query := ` + REPLACE INTO partition_offset_checkpoints + (partition_key, ring_size, range_start, range_stop, unix_time_ns, checkpoint_offset, updated_at) + VALUES (?, ?, ?, ?, ?, ?, ?) + ` + + _, err := s.db.Exec(query, + partitionKey, + partition.RingSize, + partition.RangeStart, + partition.RangeStop, + partition.UnixTimeNs, + offset, + now, + ) + + if err != nil { + return fmt.Errorf("failed to save checkpoint: %w", err) + } + + return nil +} + +// LoadCheckpoint loads the checkpoint for a partition +func (s *SQLOffsetStorage) LoadCheckpoint(namespace, topicName string, partition *schema_pb.Partition) (int64, error) { + // Use TopicPartitionKey to match SaveCheckpoint + partitionKey := TopicPartitionKey(namespace, topicName, partition) + + query := ` + SELECT checkpoint_offset + FROM partition_offset_checkpoints + WHERE partition_key = ? + ` + + var checkpointOffset int64 + err := s.db.QueryRow(query, partitionKey).Scan(&checkpointOffset) + + if err == sql.ErrNoRows { + return -1, fmt.Errorf("no checkpoint found") + } + + if err != nil { + return -1, fmt.Errorf("failed to load checkpoint: %w", err) + } + + return checkpointOffset, nil +} + +// GetHighestOffset finds the highest offset in storage for a partition +func (s *SQLOffsetStorage) GetHighestOffset(namespace, topicName string, partition *schema_pb.Partition) (int64, error) { + // Use TopicPartitionKey to match SaveCheckpoint + partitionKey := TopicPartitionKey(namespace, topicName, partition) + + // TODO: Use _index column for efficient querying + // ASSUMPTION: kafka_offset represents the sequential offset we're tracking + query := ` + SELECT MAX(kafka_offset) + FROM offset_mappings + WHERE partition_key = ? + ` + + var highestOffset sql.NullInt64 + err := s.db.QueryRow(query, partitionKey).Scan(&highestOffset) + + if err != nil { + return -1, fmt.Errorf("failed to get highest offset: %w", err) + } + + if !highestOffset.Valid { + return -1, fmt.Errorf("no records found") + } + + return highestOffset.Int64, nil +} + +// SaveOffsetMapping stores an offset mapping (extends OffsetStorage interface) +func (s *SQLOffsetStorage) SaveOffsetMapping(partitionKey string, kafkaOffset, smqTimestamp int64, size int32) error { + now := time.Now().UnixNano() + + // TODO: Handle duplicate key conflicts gracefully + // ASSUMPTION: Using INSERT OR REPLACE for conflict resolution + query := ` + INSERT OR REPLACE INTO offset_mappings + (partition_key, kafka_offset, smq_timestamp, message_size, created_at) + VALUES (?, ?, ?, ?, ?) + ` + + _, err := s.db.Exec(query, partitionKey, kafkaOffset, smqTimestamp, size, now) + if err != nil { + return fmt.Errorf("failed to save offset mapping: %w", err) + } + + return nil +} + +// LoadOffsetMappings retrieves all offset mappings for a partition +func (s *SQLOffsetStorage) LoadOffsetMappings(partitionKey string) ([]OffsetEntry, error) { + // TODO: Add pagination for large result sets + // ASSUMPTION: Loading all mappings for now, should be paginated in production + query := ` + SELECT kafka_offset, smq_timestamp, message_size + FROM offset_mappings + WHERE partition_key = ? + ORDER BY kafka_offset ASC + ` + + rows, err := s.db.Query(query, partitionKey) + if err != nil { + return nil, fmt.Errorf("failed to query offset mappings: %w", err) + } + defer rows.Close() + + var entries []OffsetEntry + for rows.Next() { + var entry OffsetEntry + err := rows.Scan(&entry.KafkaOffset, &entry.SMQTimestamp, &entry.MessageSize) + if err != nil { + return nil, fmt.Errorf("failed to scan offset entry: %w", err) + } + entries = append(entries, entry) + } + + if err := rows.Err(); err != nil { + return nil, fmt.Errorf("error iterating offset mappings: %w", err) + } + + return entries, nil +} + +// GetOffsetMappingsByRange retrieves offset mappings within a specific range +func (s *SQLOffsetStorage) GetOffsetMappingsByRange(partitionKey string, startOffset, endOffset int64) ([]OffsetEntry, error) { + // TODO: Use _index column for efficient range queries + query := ` + SELECT kafka_offset, smq_timestamp, message_size + FROM offset_mappings + WHERE partition_key = ? AND kafka_offset >= ? AND kafka_offset <= ? + ORDER BY kafka_offset ASC + ` + + rows, err := s.db.Query(query, partitionKey, startOffset, endOffset) + if err != nil { + return nil, fmt.Errorf("failed to query offset range: %w", err) + } + defer rows.Close() + + var entries []OffsetEntry + for rows.Next() { + var entry OffsetEntry + err := rows.Scan(&entry.KafkaOffset, &entry.SMQTimestamp, &entry.MessageSize) + if err != nil { + return nil, fmt.Errorf("failed to scan offset entry: %w", err) + } + entries = append(entries, entry) + } + + return entries, nil +} + +// GetPartitionStats returns statistics about a partition's offset usage +func (s *SQLOffsetStorage) GetPartitionStats(partitionKey string) (*PartitionStats, error) { + query := ` + SELECT + COUNT(*) as record_count, + MIN(kafka_offset) as earliest_offset, + MAX(kafka_offset) as latest_offset, + SUM(message_size) as total_size, + MIN(created_at) as first_record_time, + MAX(created_at) as last_record_time + FROM offset_mappings + WHERE partition_key = ? + ` + + var stats PartitionStats + var earliestOffset, latestOffset sql.NullInt64 + var totalSize sql.NullInt64 + var firstRecordTime, lastRecordTime sql.NullInt64 + + err := s.db.QueryRow(query, partitionKey).Scan( + &stats.RecordCount, + &earliestOffset, + &latestOffset, + &totalSize, + &firstRecordTime, + &lastRecordTime, + ) + + if err != nil { + return nil, fmt.Errorf("failed to get partition stats: %w", err) + } + + stats.PartitionKey = partitionKey + + if earliestOffset.Valid { + stats.EarliestOffset = earliestOffset.Int64 + } else { + stats.EarliestOffset = -1 + } + + if latestOffset.Valid { + stats.LatestOffset = latestOffset.Int64 + stats.HighWaterMark = latestOffset.Int64 + 1 + } else { + stats.LatestOffset = -1 + stats.HighWaterMark = 0 + } + + if firstRecordTime.Valid { + stats.FirstRecordTime = firstRecordTime.Int64 + } + + if lastRecordTime.Valid { + stats.LastRecordTime = lastRecordTime.Int64 + } + + if totalSize.Valid { + stats.TotalSize = totalSize.Int64 + } + + return &stats, nil +} + +// CleanupOldMappings removes offset mappings older than the specified time +func (s *SQLOffsetStorage) CleanupOldMappings(olderThanNs int64) error { + // TODO: Add configurable cleanup policies + // ASSUMPTION: Simple time-based cleanup, could be enhanced with retention policies + query := ` + DELETE FROM offset_mappings + WHERE created_at < ? + ` + + result, err := s.db.Exec(query, olderThanNs) + if err != nil { + return fmt.Errorf("failed to cleanup old mappings: %w", err) + } + + rowsAffected, _ := result.RowsAffected() + if rowsAffected > 0 { + // Log cleanup activity + fmt.Printf("Cleaned up %d old offset mappings\n", rowsAffected) + } + + return nil +} + +// Close closes the database connection +func (s *SQLOffsetStorage) Close() error { + if s.db != nil { + return s.db.Close() + } + return nil +} + +// PartitionStats provides statistics about a partition's offset usage +type PartitionStats struct { + PartitionKey string + RecordCount int64 + EarliestOffset int64 + LatestOffset int64 + HighWaterMark int64 + TotalSize int64 + FirstRecordTime int64 + LastRecordTime int64 +} + +// GetAllPartitions returns a list of all partitions with offset data +func (s *SQLOffsetStorage) GetAllPartitions() ([]string, error) { + query := ` + SELECT DISTINCT partition_key + FROM offset_mappings + ORDER BY partition_key + ` + + rows, err := s.db.Query(query) + if err != nil { + return nil, fmt.Errorf("failed to get all partitions: %w", err) + } + defer rows.Close() + + var partitions []string + for rows.Next() { + var partitionKey string + if err := rows.Scan(&partitionKey); err != nil { + return nil, fmt.Errorf("failed to scan partition key: %w", err) + } + partitions = append(partitions, partitionKey) + } + + return partitions, nil +} + +// Vacuum performs database maintenance operations +func (s *SQLOffsetStorage) Vacuum() error { + // TODO: Add database-specific optimization commands + // ASSUMPTION: SQLite VACUUM command, may need adaptation for other databases + _, err := s.db.Exec("VACUUM") + if err != nil { + return fmt.Errorf("failed to vacuum database: %w", err) + } + + return nil +} |
