aboutsummaryrefslogtreecommitdiff
path: root/weed/cluster/lock_manager/lock_ring.go
diff options
context:
space:
mode:
Diffstat (limited to 'weed/cluster/lock_manager/lock_ring.go')
-rw-r--r--weed/cluster/lock_manager/lock_ring.go18
1 files changed, 15 insertions, 3 deletions
diff --git a/weed/cluster/lock_manager/lock_ring.go b/weed/cluster/lock_manager/lock_ring.go
index e7f60e6d2..a7934e1eb 100644
--- a/weed/cluster/lock_manager/lock_ring.go
+++ b/weed/cluster/lock_manager/lock_ring.go
@@ -1,12 +1,13 @@
package lock_manager
import (
- "github.com/seaweedfs/seaweedfs/weed/glog"
- "github.com/seaweedfs/seaweedfs/weed/pb"
- "github.com/seaweedfs/seaweedfs/weed/util"
"sort"
"sync"
"time"
+
+ "github.com/seaweedfs/seaweedfs/weed/glog"
+ "github.com/seaweedfs/seaweedfs/weed/pb"
+ "github.com/seaweedfs/seaweedfs/weed/util"
)
type LockRingSnapshot struct {
@@ -22,6 +23,7 @@ type LockRing struct {
lastCompactTime time.Time
snapshotInterval time.Duration
onTakeSnapshot func(snapshot []pb.ServerAddress)
+ cleanupWg sync.WaitGroup
}
func NewLockRing(snapshotInterval time.Duration) *LockRing {
@@ -87,7 +89,9 @@ func (r *LockRing) SetSnapshot(servers []pb.ServerAddress) {
r.addOneSnapshot(servers)
+ r.cleanupWg.Add(1)
go func() {
+ defer r.cleanupWg.Done()
<-time.After(r.snapshotInterval)
r.compactSnapshots()
}()
@@ -96,7 +100,9 @@ func (r *LockRing) SetSnapshot(servers []pb.ServerAddress) {
func (r *LockRing) takeSnapshotWithDelayedCompaction() {
r.doTakeSnapshot()
+ r.cleanupWg.Add(1)
go func() {
+ defer r.cleanupWg.Done()
<-time.After(r.snapshotInterval)
r.compactSnapshots()
}()
@@ -172,6 +178,12 @@ func (r *LockRing) GetSnapshot() (servers []pb.ServerAddress) {
return r.snapshots[0].servers
}
+// WaitForCleanup waits for all pending cleanup operations to complete
+// This is useful for testing to ensure deterministic behavior
+func (r *LockRing) WaitForCleanup() {
+ r.cleanupWg.Wait()
+}
+
func hashKeyToServer(key string, servers []pb.ServerAddress) pb.ServerAddress {
if len(servers) == 0 {
return ""