aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--weed/cluster/lock_manager/lock_ring.go7
-rw-r--r--weed/cluster/lock_manager/lock_ring_test.go67
2 files changed, 63 insertions, 11 deletions
diff --git a/weed/cluster/lock_manager/lock_ring.go b/weed/cluster/lock_manager/lock_ring.go
index a7934e1eb..a36c8e222 100644
--- a/weed/cluster/lock_manager/lock_ring.go
+++ b/weed/cluster/lock_manager/lock_ring.go
@@ -184,6 +184,13 @@ func (r *LockRing) WaitForCleanup() {
r.cleanupWg.Wait()
}
+// GetSnapshotCount safely returns the number of snapshots for testing
+func (r *LockRing) GetSnapshotCount() int {
+ r.RLock()
+ defer r.RUnlock()
+ return len(r.snapshots)
+}
+
func hashKeyToServer(key string, servers []pb.ServerAddress) pb.ServerAddress {
if len(servers) == 0 {
return ""
diff --git a/weed/cluster/lock_manager/lock_ring_test.go b/weed/cluster/lock_manager/lock_ring_test.go
index cb5c5c8d9..f82a5ffe4 100644
--- a/weed/cluster/lock_manager/lock_ring_test.go
+++ b/weed/cluster/lock_manager/lock_ring_test.go
@@ -10,37 +10,82 @@ import (
func TestAddServer(t *testing.T) {
r := NewLockRing(100 * time.Millisecond)
+
+ // Add servers
r.AddServer("localhost:8080")
- assert.Equal(t, 1, len(r.snapshots))
r.AddServer("localhost:8081")
r.AddServer("localhost:8082")
r.AddServer("localhost:8083")
r.AddServer("localhost:8084")
+
+ // Verify all servers are present
+ servers := r.GetSnapshot()
+ assert.Equal(t, 5, len(servers))
+ assert.Contains(t, servers, pb.ServerAddress("localhost:8080"))
+ assert.Contains(t, servers, pb.ServerAddress("localhost:8081"))
+ assert.Contains(t, servers, pb.ServerAddress("localhost:8082"))
+ assert.Contains(t, servers, pb.ServerAddress("localhost:8083"))
+ assert.Contains(t, servers, pb.ServerAddress("localhost:8084"))
+
+ // Remove servers
r.RemoveServer("localhost:8084")
r.RemoveServer("localhost:8082")
r.RemoveServer("localhost:8080")
- assert.Equal(t, 8, len(r.snapshots))
+ // Wait for all cleanup operations to complete
+ r.WaitForCleanup()
- // Wait for all cleanup operations to complete instead of using time.Sleep
- time.Sleep(110 * time.Millisecond) // Still need to wait for the cleanup interval
- r.WaitForCleanup() // Ensure all cleanup goroutines have finished
+ // Verify only 2 servers remain (localhost:8081 and localhost:8083)
+ servers = r.GetSnapshot()
+ assert.Equal(t, 2, len(servers))
+ assert.Contains(t, servers, pb.ServerAddress("localhost:8081"))
+ assert.Contains(t, servers, pb.ServerAddress("localhost:8083"))
- assert.Equal(t, 2, len(r.snapshots))
+ // Verify cleanup has happened - wait for snapshot interval and check snapshots are compacted
+ time.Sleep(110 * time.Millisecond)
+ r.WaitForCleanup()
+ // Verify snapshot history is cleaned up properly (should have at most 2 snapshots after compaction)
+ snapshotCount := r.GetSnapshotCount()
+ assert.LessOrEqual(t, snapshotCount, 2, "Snapshot history should be compacted")
}
func TestLockRing(t *testing.T) {
r := NewLockRing(100 * time.Millisecond)
+
+ // Test initial snapshot
r.SetSnapshot([]pb.ServerAddress{"localhost:8080", "localhost:8081"})
- assert.Equal(t, 1, len(r.snapshots))
+ assert.Equal(t, 1, r.GetSnapshotCount())
+ servers := r.GetSnapshot()
+ assert.Equal(t, 2, len(servers))
+ assert.Contains(t, servers, pb.ServerAddress("localhost:8080"))
+ assert.Contains(t, servers, pb.ServerAddress("localhost:8081"))
+
+ // Add another server
r.SetSnapshot([]pb.ServerAddress{"localhost:8080", "localhost:8081", "localhost:8082"})
- assert.Equal(t, 2, len(r.snapshots))
+ assert.Equal(t, 2, r.GetSnapshotCount())
+ servers = r.GetSnapshot()
+ assert.Equal(t, 3, len(servers))
+ assert.Contains(t, servers, pb.ServerAddress("localhost:8082"))
+
+ // Wait for cleanup interval and add another server
time.Sleep(110 * time.Millisecond)
+ r.WaitForCleanup()
r.SetSnapshot([]pb.ServerAddress{"localhost:8080", "localhost:8081", "localhost:8082", "localhost:8083"})
- assert.Equal(t, 3, len(r.snapshots))
+ assert.LessOrEqual(t, r.GetSnapshotCount(), 3)
+ servers = r.GetSnapshot()
+ assert.Equal(t, 4, len(servers))
+ assert.Contains(t, servers, pb.ServerAddress("localhost:8083"))
+
+ // Wait for cleanup and verify compaction
time.Sleep(110 * time.Millisecond)
- assert.Equal(t, 2, len(r.snapshots))
+ r.WaitForCleanup()
+ assert.LessOrEqual(t, r.GetSnapshotCount(), 2, "Snapshots should be compacted")
+
+ // Add final server
r.SetSnapshot([]pb.ServerAddress{"localhost:8080", "localhost:8081", "localhost:8082", "localhost:8083", "localhost:8084"})
- assert.Equal(t, 3, len(r.snapshots))
+ servers = r.GetSnapshot()
+ assert.Equal(t, 5, len(servers))
+ assert.Contains(t, servers, pb.ServerAddress("localhost:8084"))
+ assert.LessOrEqual(t, r.GetSnapshotCount(), 3)
}