aboutsummaryrefslogtreecommitdiff
path: root/weed/mq/offset/sql_storage_test.go
diff options
context:
space:
mode:
Diffstat (limited to 'weed/mq/offset/sql_storage_test.go')
-rw-r--r--weed/mq/offset/sql_storage_test.go516
1 files changed, 516 insertions, 0 deletions
diff --git a/weed/mq/offset/sql_storage_test.go b/weed/mq/offset/sql_storage_test.go
new file mode 100644
index 000000000..661f317de
--- /dev/null
+++ b/weed/mq/offset/sql_storage_test.go
@@ -0,0 +1,516 @@
+package offset
+
+import (
+ "database/sql"
+ "os"
+ "testing"
+ "time"
+
+ _ "github.com/mattn/go-sqlite3" // SQLite driver
+ "github.com/seaweedfs/seaweedfs/weed/pb/schema_pb"
+)
+
+func createTestDB(t *testing.T) *sql.DB {
+ // Create temporary database file
+ tmpFile, err := os.CreateTemp("", "offset_test_*.db")
+ if err != nil {
+ t.Fatalf("Failed to create temp database file: %v", err)
+ }
+ tmpFile.Close()
+
+ // Clean up the file when test completes
+ t.Cleanup(func() {
+ os.Remove(tmpFile.Name())
+ })
+
+ db, err := sql.Open("sqlite3", tmpFile.Name())
+ if err != nil {
+ t.Fatalf("Failed to open database: %v", err)
+ }
+
+ t.Cleanup(func() {
+ db.Close()
+ })
+
+ return db
+}
+
+func createTestPartitionForSQL() *schema_pb.Partition {
+ return &schema_pb.Partition{
+ RingSize: 1024,
+ RangeStart: 0,
+ RangeStop: 31,
+ UnixTimeNs: time.Now().UnixNano(),
+ }
+}
+
+func TestSQLOffsetStorage_InitializeSchema(t *testing.T) {
+ db := createTestDB(t)
+
+ storage, err := NewSQLOffsetStorage(db)
+ if err != nil {
+ t.Fatalf("Failed to create SQL storage: %v", err)
+ }
+ defer storage.Close()
+
+ // Verify tables were created
+ tables := []string{
+ "partition_offset_checkpoints",
+ "offset_mappings",
+ }
+
+ for _, table := range tables {
+ var count int
+ err := db.QueryRow("SELECT COUNT(*) FROM sqlite_master WHERE type='table' AND name=?", table).Scan(&count)
+ if err != nil {
+ t.Fatalf("Failed to check table %s: %v", table, err)
+ }
+
+ if count != 1 {
+ t.Errorf("Table %s was not created", table)
+ }
+ }
+}
+
+func TestSQLOffsetStorage_SaveLoadCheckpoint(t *testing.T) {
+ db := createTestDB(t)
+ storage, err := NewSQLOffsetStorage(db)
+ if err != nil {
+ t.Fatalf("Failed to create SQL storage: %v", err)
+ }
+ defer storage.Close()
+
+ partition := createTestPartitionForSQL()
+
+ // Test saving checkpoint
+ err = storage.SaveCheckpoint("test-namespace", "test-topic", partition, 100)
+ if err != nil {
+ t.Fatalf("Failed to save checkpoint: %v", err)
+ }
+
+ // Test loading checkpoint
+ checkpoint, err := storage.LoadCheckpoint("test-namespace", "test-topic", partition)
+ if err != nil {
+ t.Fatalf("Failed to load checkpoint: %v", err)
+ }
+
+ if checkpoint != 100 {
+ t.Errorf("Expected checkpoint 100, got %d", checkpoint)
+ }
+
+ // Test updating checkpoint
+ err = storage.SaveCheckpoint("test-namespace", "test-topic", partition, 200)
+ if err != nil {
+ t.Fatalf("Failed to update checkpoint: %v", err)
+ }
+
+ checkpoint, err = storage.LoadCheckpoint("test-namespace", "test-topic", partition)
+ if err != nil {
+ t.Fatalf("Failed to load updated checkpoint: %v", err)
+ }
+
+ if checkpoint != 200 {
+ t.Errorf("Expected updated checkpoint 200, got %d", checkpoint)
+ }
+}
+
+func TestSQLOffsetStorage_LoadCheckpointNotFound(t *testing.T) {
+ db := createTestDB(t)
+ storage, err := NewSQLOffsetStorage(db)
+ if err != nil {
+ t.Fatalf("Failed to create SQL storage: %v", err)
+ }
+ defer storage.Close()
+
+ partition := createTestPartitionForSQL()
+
+ // Test loading non-existent checkpoint
+ _, err = storage.LoadCheckpoint("test-namespace", "test-topic", partition)
+ if err == nil {
+ t.Error("Expected error for non-existent checkpoint")
+ }
+}
+
+func TestSQLOffsetStorage_SaveLoadOffsetMappings(t *testing.T) {
+ db := createTestDB(t)
+ storage, err := NewSQLOffsetStorage(db)
+ if err != nil {
+ t.Fatalf("Failed to create SQL storage: %v", err)
+ }
+ defer storage.Close()
+
+ partition := createTestPartitionForSQL()
+ partitionKey := partitionKey(partition)
+
+ // Save multiple offset mappings
+ mappings := []struct {
+ offset int64
+ timestamp int64
+ size int32
+ }{
+ {0, 1000, 100},
+ {1, 2000, 150},
+ {2, 3000, 200},
+ }
+
+ for _, mapping := range mappings {
+ err := storage.SaveOffsetMapping(partitionKey, mapping.offset, mapping.timestamp, mapping.size)
+ if err != nil {
+ t.Fatalf("Failed to save offset mapping: %v", err)
+ }
+ }
+
+ // Load offset mappings
+ entries, err := storage.LoadOffsetMappings(partitionKey)
+ if err != nil {
+ t.Fatalf("Failed to load offset mappings: %v", err)
+ }
+
+ if len(entries) != len(mappings) {
+ t.Errorf("Expected %d entries, got %d", len(mappings), len(entries))
+ }
+
+ // Verify entries are sorted by offset
+ for i, entry := range entries {
+ expected := mappings[i]
+ if entry.KafkaOffset != expected.offset {
+ t.Errorf("Entry %d: expected offset %d, got %d", i, expected.offset, entry.KafkaOffset)
+ }
+ if entry.SMQTimestamp != expected.timestamp {
+ t.Errorf("Entry %d: expected timestamp %d, got %d", i, expected.timestamp, entry.SMQTimestamp)
+ }
+ if entry.MessageSize != expected.size {
+ t.Errorf("Entry %d: expected size %d, got %d", i, expected.size, entry.MessageSize)
+ }
+ }
+}
+
+func TestSQLOffsetStorage_GetHighestOffset(t *testing.T) {
+ db := createTestDB(t)
+ storage, err := NewSQLOffsetStorage(db)
+ if err != nil {
+ t.Fatalf("Failed to create SQL storage: %v", err)
+ }
+ defer storage.Close()
+
+ partition := createTestPartitionForSQL()
+ partitionKey := TopicPartitionKey("test-namespace", "test-topic", partition)
+
+ // Test empty partition
+ _, err = storage.GetHighestOffset("test-namespace", "test-topic", partition)
+ if err == nil {
+ t.Error("Expected error for empty partition")
+ }
+
+ // Add some offset mappings
+ offsets := []int64{5, 1, 3, 2, 4}
+ for _, offset := range offsets {
+ err := storage.SaveOffsetMapping(partitionKey, offset, offset*1000, 100)
+ if err != nil {
+ t.Fatalf("Failed to save offset mapping: %v", err)
+ }
+ }
+
+ // Get highest offset
+ highest, err := storage.GetHighestOffset("test-namespace", "test-topic", partition)
+ if err != nil {
+ t.Fatalf("Failed to get highest offset: %v", err)
+ }
+
+ if highest != 5 {
+ t.Errorf("Expected highest offset 5, got %d", highest)
+ }
+}
+
+func TestSQLOffsetStorage_GetOffsetMappingsByRange(t *testing.T) {
+ db := createTestDB(t)
+ storage, err := NewSQLOffsetStorage(db)
+ if err != nil {
+ t.Fatalf("Failed to create SQL storage: %v", err)
+ }
+ defer storage.Close()
+
+ partition := createTestPartitionForSQL()
+ partitionKey := partitionKey(partition)
+
+ // Add offset mappings
+ for i := int64(0); i < 10; i++ {
+ err := storage.SaveOffsetMapping(partitionKey, i, i*1000, 100)
+ if err != nil {
+ t.Fatalf("Failed to save offset mapping: %v", err)
+ }
+ }
+
+ // Get range of offsets
+ entries, err := storage.GetOffsetMappingsByRange(partitionKey, 3, 7)
+ if err != nil {
+ t.Fatalf("Failed to get offset range: %v", err)
+ }
+
+ expectedCount := 5 // offsets 3, 4, 5, 6, 7
+ if len(entries) != expectedCount {
+ t.Errorf("Expected %d entries, got %d", expectedCount, len(entries))
+ }
+
+ // Verify range
+ for i, entry := range entries {
+ expectedOffset := int64(3 + i)
+ if entry.KafkaOffset != expectedOffset {
+ t.Errorf("Entry %d: expected offset %d, got %d", i, expectedOffset, entry.KafkaOffset)
+ }
+ }
+}
+
+func TestSQLOffsetStorage_GetPartitionStats(t *testing.T) {
+ db := createTestDB(t)
+ storage, err := NewSQLOffsetStorage(db)
+ if err != nil {
+ t.Fatalf("Failed to create SQL storage: %v", err)
+ }
+ defer storage.Close()
+
+ partition := createTestPartitionForSQL()
+ partitionKey := partitionKey(partition)
+
+ // Test empty partition stats
+ stats, err := storage.GetPartitionStats(partitionKey)
+ if err != nil {
+ t.Fatalf("Failed to get empty partition stats: %v", err)
+ }
+
+ if stats.RecordCount != 0 {
+ t.Errorf("Expected record count 0, got %d", stats.RecordCount)
+ }
+
+ if stats.EarliestOffset != -1 {
+ t.Errorf("Expected earliest offset -1, got %d", stats.EarliestOffset)
+ }
+
+ // Add some data
+ sizes := []int32{100, 150, 200}
+ for i, size := range sizes {
+ err := storage.SaveOffsetMapping(partitionKey, int64(i), int64(i*1000), size)
+ if err != nil {
+ t.Fatalf("Failed to save offset mapping: %v", err)
+ }
+ }
+
+ // Get stats with data
+ stats, err = storage.GetPartitionStats(partitionKey)
+ if err != nil {
+ t.Fatalf("Failed to get partition stats: %v", err)
+ }
+
+ if stats.RecordCount != 3 {
+ t.Errorf("Expected record count 3, got %d", stats.RecordCount)
+ }
+
+ if stats.EarliestOffset != 0 {
+ t.Errorf("Expected earliest offset 0, got %d", stats.EarliestOffset)
+ }
+
+ if stats.LatestOffset != 2 {
+ t.Errorf("Expected latest offset 2, got %d", stats.LatestOffset)
+ }
+
+ if stats.HighWaterMark != 3 {
+ t.Errorf("Expected high water mark 3, got %d", stats.HighWaterMark)
+ }
+
+ expectedTotalSize := int64(100 + 150 + 200)
+ if stats.TotalSize != expectedTotalSize {
+ t.Errorf("Expected total size %d, got %d", expectedTotalSize, stats.TotalSize)
+ }
+}
+
+func TestSQLOffsetStorage_GetAllPartitions(t *testing.T) {
+ db := createTestDB(t)
+ storage, err := NewSQLOffsetStorage(db)
+ if err != nil {
+ t.Fatalf("Failed to create SQL storage: %v", err)
+ }
+ defer storage.Close()
+
+ // Test empty database
+ partitions, err := storage.GetAllPartitions()
+ if err != nil {
+ t.Fatalf("Failed to get all partitions: %v", err)
+ }
+
+ if len(partitions) != 0 {
+ t.Errorf("Expected 0 partitions, got %d", len(partitions))
+ }
+
+ // Add data for multiple partitions
+ partition1 := createTestPartitionForSQL()
+ partition2 := &schema_pb.Partition{
+ RingSize: 1024,
+ RangeStart: 32,
+ RangeStop: 63,
+ UnixTimeNs: time.Now().UnixNano(),
+ }
+
+ partitionKey1 := partitionKey(partition1)
+ partitionKey2 := partitionKey(partition2)
+
+ storage.SaveOffsetMapping(partitionKey1, 0, 1000, 100)
+ storage.SaveOffsetMapping(partitionKey2, 0, 2000, 150)
+
+ // Get all partitions
+ partitions, err = storage.GetAllPartitions()
+ if err != nil {
+ t.Fatalf("Failed to get all partitions: %v", err)
+ }
+
+ if len(partitions) != 2 {
+ t.Errorf("Expected 2 partitions, got %d", len(partitions))
+ }
+
+ // Verify partition keys are present
+ partitionMap := make(map[string]bool)
+ for _, p := range partitions {
+ partitionMap[p] = true
+ }
+
+ if !partitionMap[partitionKey1] {
+ t.Errorf("Partition key %s not found", partitionKey1)
+ }
+
+ if !partitionMap[partitionKey2] {
+ t.Errorf("Partition key %s not found", partitionKey2)
+ }
+}
+
+func TestSQLOffsetStorage_CleanupOldMappings(t *testing.T) {
+ db := createTestDB(t)
+ storage, err := NewSQLOffsetStorage(db)
+ if err != nil {
+ t.Fatalf("Failed to create SQL storage: %v", err)
+ }
+ defer storage.Close()
+
+ partition := createTestPartitionForSQL()
+ partitionKey := partitionKey(partition)
+
+ // Add mappings with different timestamps
+ now := time.Now().UnixNano()
+
+ // Add old mapping by directly inserting with old timestamp
+ oldTime := now - (24 * time.Hour).Nanoseconds() // 24 hours ago
+ _, err = db.Exec(`
+ INSERT INTO offset_mappings
+ (partition_key, kafka_offset, smq_timestamp, message_size, created_at)
+ VALUES (?, ?, ?, ?, ?)
+ `, partitionKey, 0, oldTime, 100, oldTime)
+ if err != nil {
+ t.Fatalf("Failed to insert old mapping: %v", err)
+ }
+
+ // Add recent mapping
+ storage.SaveOffsetMapping(partitionKey, 1, now, 150)
+
+ // Verify both mappings exist
+ entries, err := storage.LoadOffsetMappings(partitionKey)
+ if err != nil {
+ t.Fatalf("Failed to load mappings: %v", err)
+ }
+
+ if len(entries) != 2 {
+ t.Errorf("Expected 2 mappings before cleanup, got %d", len(entries))
+ }
+
+ // Cleanup old mappings (older than 12 hours)
+ cutoffTime := now - (12 * time.Hour).Nanoseconds()
+ err = storage.CleanupOldMappings(cutoffTime)
+ if err != nil {
+ t.Fatalf("Failed to cleanup old mappings: %v", err)
+ }
+
+ // Verify only recent mapping remains
+ entries, err = storage.LoadOffsetMappings(partitionKey)
+ if err != nil {
+ t.Fatalf("Failed to load mappings after cleanup: %v", err)
+ }
+
+ if len(entries) != 1 {
+ t.Errorf("Expected 1 mapping after cleanup, got %d", len(entries))
+ }
+
+ if entries[0].KafkaOffset != 1 {
+ t.Errorf("Expected remaining mapping offset 1, got %d", entries[0].KafkaOffset)
+ }
+}
+
+func TestSQLOffsetStorage_Vacuum(t *testing.T) {
+ db := createTestDB(t)
+ storage, err := NewSQLOffsetStorage(db)
+ if err != nil {
+ t.Fatalf("Failed to create SQL storage: %v", err)
+ }
+ defer storage.Close()
+
+ // Vacuum should not fail on empty database
+ err = storage.Vacuum()
+ if err != nil {
+ t.Fatalf("Failed to vacuum database: %v", err)
+ }
+
+ // Add some data and vacuum again
+ partition := createTestPartitionForSQL()
+ partitionKey := partitionKey(partition)
+ storage.SaveOffsetMapping(partitionKey, 0, 1000, 100)
+
+ err = storage.Vacuum()
+ if err != nil {
+ t.Fatalf("Failed to vacuum database with data: %v", err)
+ }
+}
+
+func TestSQLOffsetStorage_ConcurrentAccess(t *testing.T) {
+ db := createTestDB(t)
+ storage, err := NewSQLOffsetStorage(db)
+ if err != nil {
+ t.Fatalf("Failed to create SQL storage: %v", err)
+ }
+ defer storage.Close()
+
+ partition := createTestPartitionForSQL()
+ partitionKey := partitionKey(partition)
+
+ // Test concurrent writes
+ const numGoroutines = 10
+ const offsetsPerGoroutine = 10
+
+ done := make(chan bool, numGoroutines)
+
+ for i := 0; i < numGoroutines; i++ {
+ go func(goroutineID int) {
+ defer func() { done <- true }()
+
+ for j := 0; j < offsetsPerGoroutine; j++ {
+ offset := int64(goroutineID*offsetsPerGoroutine + j)
+ err := storage.SaveOffsetMapping(partitionKey, offset, offset*1000, 100)
+ if err != nil {
+ t.Errorf("Failed to save offset mapping %d: %v", offset, err)
+ return
+ }
+ }
+ }(i)
+ }
+
+ // Wait for all goroutines to complete
+ for i := 0; i < numGoroutines; i++ {
+ <-done
+ }
+
+ // Verify all mappings were saved
+ entries, err := storage.LoadOffsetMappings(partitionKey)
+ if err != nil {
+ t.Fatalf("Failed to load mappings: %v", err)
+ }
+
+ expectedCount := numGoroutines * offsetsPerGoroutine
+ if len(entries) != expectedCount {
+ t.Errorf("Expected %d mappings, got %d", expectedCount, len(entries))
+ }
+}