aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--weed/cluster/lock_manager/lock_ring.go146
-rw-r--r--weed/cluster/lock_manager/lock_ring_test.go51
2 files changed, 197 insertions, 0 deletions
diff --git a/weed/cluster/lock_manager/lock_ring.go b/weed/cluster/lock_manager/lock_ring.go
new file mode 100644
index 000000000..df5ebebac
--- /dev/null
+++ b/weed/cluster/lock_manager/lock_ring.go
@@ -0,0 +1,146 @@
+package lock_manager
+
+import (
+ "github.com/seaweedfs/seaweedfs/weed/pb"
+ "sort"
+ "sync"
+ "time"
+)
+
+type LockRingSnapshot struct {
+ servers []pb.ServerAddress
+ ts time.Time
+}
+
+type LockRing struct {
+ sync.RWMutex
+ snapshots []*LockRingSnapshot
+ candidateServers map[pb.ServerAddress]struct{}
+ lastUpdateTime time.Time
+ lastCompactTime time.Time
+ snapshotInterval time.Duration
+ onTakeSnapshot func(snapshot []pb.ServerAddress)
+}
+
+func NewLockRing(snapshotInterval time.Duration, onTakeSnapshot func(snapshot []pb.ServerAddress)) *LockRing {
+ return &LockRing{
+ snapshotInterval: snapshotInterval,
+ candidateServers: make(map[pb.ServerAddress]struct{}),
+ onTakeSnapshot: onTakeSnapshot,
+ }
+}
+
+// AddServer adds a server to the ring
+// if the previous snapshot passed the snapshot interval, create a new snapshot
+func (r *LockRing) AddServer(server pb.ServerAddress) {
+ r.Lock()
+
+ if _, found := r.candidateServers[server]; found {
+ r.Unlock()
+ return
+ }
+ r.lastUpdateTime = time.Now()
+ r.candidateServers[server] = struct{}{}
+ r.Unlock()
+
+ r.takeSnapshotWithDelayedCompaction()
+}
+
+func (r *LockRing) RemoveServer(server pb.ServerAddress) {
+ r.Lock()
+
+ if _, found := r.candidateServers[server]; !found {
+ r.Unlock()
+ return
+ }
+ r.lastUpdateTime = time.Now()
+ delete(r.candidateServers, server)
+ r.Unlock()
+
+ r.takeSnapshotWithDelayedCompaction()
+}
+
+func (r *LockRing) SetSnapshot(servers []pb.ServerAddress) {
+
+ sort.Slice(servers, func(i, j int) bool {
+ return servers[i] < servers[j]
+ })
+
+ r.lastUpdateTime = time.Now()
+
+ r.addOneSnapshot(servers)
+
+ go func() {
+ <-time.After(r.snapshotInterval)
+ r.compactSnapshots()
+ }()
+}
+
+func (r *LockRing) takeSnapshotWithDelayedCompaction() {
+ r.doTakeSnapshot()
+
+ go func() {
+ <-time.After(r.snapshotInterval)
+ r.compactSnapshots()
+ }()
+}
+
+func (r *LockRing) doTakeSnapshot() {
+ servers := r.getSortedServers()
+
+ r.addOneSnapshot(servers)
+}
+
+func (r *LockRing) addOneSnapshot(servers []pb.ServerAddress) {
+ r.Lock()
+ defer r.Unlock()
+
+ ts := time.Now()
+ t := &LockRingSnapshot{
+ servers: servers,
+ ts: ts,
+ }
+ r.snapshots = append(r.snapshots, t)
+ for i := len(r.snapshots) - 2; i >= 0; i-- {
+ r.snapshots[i+1] = r.snapshots[i]
+ }
+ r.snapshots[0] = t
+
+ if r.onTakeSnapshot != nil {
+ r.onTakeSnapshot(t.servers)
+ }
+}
+
+func (r *LockRing) compactSnapshots() {
+ r.Lock()
+ defer r.Unlock()
+
+ if r.lastCompactTime.After(r.lastUpdateTime) {
+ return
+ }
+
+ ts := time.Now()
+ // remove old snapshots
+ recentSnapshotIndex := 1
+ for ; recentSnapshotIndex < len(r.snapshots); recentSnapshotIndex++ {
+ if ts.Sub(r.snapshots[recentSnapshotIndex].ts) > r.snapshotInterval {
+ break
+ }
+ }
+ // keep the one that has been running for a while
+ if recentSnapshotIndex+1 <= len(r.snapshots) {
+ r.snapshots = r.snapshots[:recentSnapshotIndex+1]
+ }
+ r.lastCompactTime = ts
+}
+
+func (r *LockRing) getSortedServers() []pb.ServerAddress {
+ sortedServers := make([]pb.ServerAddress, 0, len(r.candidateServers))
+ for server := range r.candidateServers {
+ sortedServers = append(sortedServers, server)
+ }
+ sort.Slice(sortedServers, func(i, j int) bool {
+ return sortedServers[i] < sortedServers[j]
+ })
+ return sortedServers
+}
diff --git a/weed/cluster/lock_manager/lock_ring_test.go b/weed/cluster/lock_manager/lock_ring_test.go
new file mode 100644
index 000000000..a8c88f4ab
--- /dev/null
+++ b/weed/cluster/lock_manager/lock_ring_test.go
@@ -0,0 +1,51 @@
+package lock_manager
+
+import (
+ "github.com/seaweedfs/seaweedfs/weed/pb"
+ "github.com/stretchr/testify/assert"
+ "testing"
+ "time"
+)
+
+func TestAddServer(t *testing.T) {
+ counter := 0
+ r := NewLockRing(100*time.Millisecond, func(snapshot []pb.ServerAddress) {
+ counter++
+ if counter == 1 {
+ assert.Equal(t, 1, len(snapshot))
+ } else if counter == 2 {
+ assert.Equal(t, 2, len(snapshot))
+ }
+ })
+ r.AddServer("localhost:8080")
+ assert.Equal(t, 1, len(r.snapshots))
+ r.AddServer("localhost:8081")
+ r.AddServer("localhost:8082")
+ r.AddServer("localhost:8083")
+ r.AddServer("localhost:8084")
+ r.RemoveServer("localhost:8084")
+ r.RemoveServer("localhost:8082")
+ r.RemoveServer("localhost:8080")
+
+ assert.Equal(t, 8, len(r.snapshots))
+
+ time.Sleep(110 * time.Millisecond)
+
+ assert.Equal(t, 2, len(r.snapshots))
+
+}
+
+func TestLockRing(t *testing.T) {
+ r := NewLockRing(100*time.Millisecond, nil)
+ r.SetSnapshot([]pb.ServerAddress{"localhost:8080", "localhost:8081"})
+ assert.Equal(t, 1, len(r.snapshots))
+ r.SetSnapshot([]pb.ServerAddress{"localhost:8080", "localhost:8081", "localhost:8082"})
+ assert.Equal(t, 2, len(r.snapshots))
+ time.Sleep(110 * time.Millisecond)
+ r.SetSnapshot([]pb.ServerAddress{"localhost:8080", "localhost:8081", "localhost:8082", "localhost:8083"})
+ assert.Equal(t, 3, len(r.snapshots))
+ time.Sleep(110 * time.Millisecond)
+ assert.Equal(t, 2, len(r.snapshots))
+ r.SetSnapshot([]pb.ServerAddress{"localhost:8080", "localhost:8081", "localhost:8082", "localhost:8083", "localhost:8084"})
+ assert.Equal(t, 3, len(r.snapshots))
+}