aboutsummaryrefslogtreecommitdiff
path: root/weed/cluster
diff options
context:
space:
mode:
Diffstat (limited to 'weed/cluster')
-rw-r--r--weed/cluster/lock_manager/lock_manager.go118
-rw-r--r--weed/cluster/lock_manager/lock_ring.go32
-rw-r--r--weed/cluster/lock_manager/lock_ring_test.go12
3 files changed, 150 insertions, 12 deletions
diff --git a/weed/cluster/lock_manager/lock_manager.go b/weed/cluster/lock_manager/lock_manager.go
new file mode 100644
index 000000000..01dc58810
--- /dev/null
+++ b/weed/cluster/lock_manager/lock_manager.go
@@ -0,0 +1,118 @@
+package lock_manager
+
+import (
+ "fmt"
+ "github.com/google/uuid"
+ "github.com/puzpuzpuz/xsync/v2"
+ "time"
+)
+
+// LockManager lock manager
+type LockManager struct {
+ locks *xsync.MapOf[string, *Lock]
+}
+type Lock struct {
+ Token string
+ ExpirationNs int64
+ Key string // only used for moving locks
+}
+
+func NewLockManager() *LockManager {
+ t := &LockManager{
+ locks: xsync.NewMapOf[*Lock](),
+ }
+ go t.CleanUp()
+ return t
+}
+
+func (lm *LockManager) Lock(path string, ttlDuration time.Duration, token string) (renewToken string, err error) {
+ lm.locks.Compute(path, func(oldValue *Lock, loaded bool) (newValue *Lock, delete bool) {
+ if oldValue != nil {
+ now := time.Now()
+ if oldValue.ExpirationNs > 0 && oldValue.ExpirationNs < now.UnixNano() {
+ // lock is expired, set to a new lock
+ expirationNs := time.Now().Add(ttlDuration).UnixNano()
+ return &Lock{Token: token, ExpirationNs: expirationNs}, false
+ }
+ if oldValue.Token == token {
+ expirationNs := time.Now().Add(ttlDuration).UnixNano()
+ return &Lock{Token: token, ExpirationNs: expirationNs}, false
+ } else {
+ err = fmt.Errorf("lock: token mismatch")
+ return oldValue, false
+ }
+ } else {
+ expirationNs := time.Now().Add(ttlDuration).UnixNano()
+ if token == "" {
+ renewToken = uuid.New().String()
+ return &Lock{Token: renewToken, ExpirationNs: expirationNs}, false
+ } else {
+ err = fmt.Errorf("lock: non-empty token on a new lock")
+ return nil, false
+ }
+ return &Lock{Token: token, ExpirationNs: expirationNs}, false
+ }
+ })
+ return
+}
+
+func (lm *LockManager) Unlock(path string, token string) (isUnlocked bool, err error) {
+ lm.locks.Compute(path, func(oldValue *Lock, loaded bool) (newValue *Lock, delete bool) {
+ if oldValue != nil {
+ now := time.Now()
+ if oldValue.ExpirationNs > 0 && oldValue.ExpirationNs < now.UnixNano() {
+ // lock is expired, delete it
+ isUnlocked = true
+ return nil, true
+ }
+ if oldValue.Token == token {
+ if oldValue.ExpirationNs <= now.UnixNano() {
+ isUnlocked = true
+ return nil, true
+ }
+ return oldValue, false
+ } else {
+ isUnlocked = false
+ err = fmt.Errorf("unlock: token mismatch")
+ return oldValue, false
+ }
+ } else {
+ isUnlocked = true
+ return nil, true
+ }
+ })
+ return
+}
+
+func (lm *LockManager) CleanUp() {
+ for {
+ time.Sleep(1 * time.Minute)
+ now := time.Now().UnixNano()
+ lm.locks.Range(func(key string, value *Lock) bool {
+ if now > value.ExpirationNs {
+ lm.locks.Delete(key)
+ return true
+ }
+ return true
+ })
+ }
+}
+
+// TakeOutLocksByKey takes out locks by key
+// if keyFn return true, the lock will be taken out
+func (lm *LockManager) TakeOutLocksByKey(keyFn func(key string) bool) (locks []*Lock) {
+ now := time.Now().UnixNano()
+ lm.locks.Range(func(key string, lock *Lock) bool {
+ if now > lock.ExpirationNs {
+ lm.locks.Delete(key)
+ return true
+ }
+ if keyFn(key) {
+ lm.locks.Delete(key)
+ lock.Key = key
+ locks = append(locks, lock)
+ }
+ return true
+ })
+ return
+}
diff --git a/weed/cluster/lock_manager/lock_ring.go b/weed/cluster/lock_manager/lock_ring.go
index df5ebebac..eca86618a 100644
--- a/weed/cluster/lock_manager/lock_ring.go
+++ b/weed/cluster/lock_manager/lock_ring.go
@@ -2,6 +2,7 @@ package lock_manager
import (
"github.com/seaweedfs/seaweedfs/weed/pb"
+ "github.com/seaweedfs/seaweedfs/weed/util"
"sort"
"sync"
"time"
@@ -22,14 +23,19 @@ type LockRing struct {
onTakeSnapshot func(snapshot []pb.ServerAddress)
}
-func NewLockRing(snapshotInterval time.Duration, onTakeSnapshot func(snapshot []pb.ServerAddress)) *LockRing {
+func NewLockRing(snapshotInterval time.Duration) *LockRing {
return &LockRing{
snapshotInterval: snapshotInterval,
candidateServers: make(map[pb.ServerAddress]struct{}),
- onTakeSnapshot: onTakeSnapshot,
}
}
+func (r *LockRing) SetTakeSnapshotCallback(onTakeSnapshot func(snapshot []pb.ServerAddress)) {
+ r.Lock()
+ defer r.Unlock()
+ r.onTakeSnapshot = onTakeSnapshot
+}
+
// AddServer adds a server to the ring
// if the previous snapshot passed the snapshot interval, create a new snapshot
func (r *LockRing) AddServer(server pb.ServerAddress) {
@@ -144,3 +150,25 @@ func (r *LockRing) getSortedServers() []pb.ServerAddress {
})
return sortedServers
}
+
+func (r *LockRing) GetSnapshot() (servers []pb.ServerAddress) {
+ r.RLock()
+ defer r.RUnlock()
+
+ if len(r.snapshots) == 0 {
+ return
+ }
+ return r.snapshots[0].servers
+}
+
+func HashKeyToServer(key string, servers []pb.ServerAddress) pb.ServerAddress {
+ if len(servers) == 0 {
+ return ""
+ }
+ x := util.HashStringToLong(key)
+ if x < 0 {
+ x = -x
+ }
+ x = x % int64(len(servers))
+ return servers[x]
+}
diff --git a/weed/cluster/lock_manager/lock_ring_test.go b/weed/cluster/lock_manager/lock_ring_test.go
index a8c88f4ab..9025309ec 100644
--- a/weed/cluster/lock_manager/lock_ring_test.go
+++ b/weed/cluster/lock_manager/lock_ring_test.go
@@ -8,15 +8,7 @@ import (
)
func TestAddServer(t *testing.T) {
- counter := 0
- r := NewLockRing(100*time.Millisecond, func(snapshot []pb.ServerAddress) {
- counter++
- if counter == 1 {
- assert.Equal(t, 1, len(snapshot))
- } else if counter == 2 {
- assert.Equal(t, 2, len(snapshot))
- }
- })
+ r := NewLockRing(100 * time.Millisecond)
r.AddServer("localhost:8080")
assert.Equal(t, 1, len(r.snapshots))
r.AddServer("localhost:8081")
@@ -36,7 +28,7 @@ func TestAddServer(t *testing.T) {
}
func TestLockRing(t *testing.T) {
- r := NewLockRing(100*time.Millisecond, nil)
+ r := NewLockRing(100 * time.Millisecond)
r.SetSnapshot([]pb.ServerAddress{"localhost:8080", "localhost:8081"})
assert.Equal(t, 1, len(r.snapshots))
r.SetSnapshot([]pb.ServerAddress{"localhost:8080", "localhost:8081", "localhost:8082"})