diff options
Diffstat (limited to 'weed/cluster')
| -rw-r--r-- | weed/cluster/lock_manager/lock_manager.go | 118 | ||||
| -rw-r--r-- | weed/cluster/lock_manager/lock_ring.go | 32 | ||||
| -rw-r--r-- | weed/cluster/lock_manager/lock_ring_test.go | 12 |
3 files changed, 150 insertions, 12 deletions
diff --git a/weed/cluster/lock_manager/lock_manager.go b/weed/cluster/lock_manager/lock_manager.go new file mode 100644 index 000000000..01dc58810 --- /dev/null +++ b/weed/cluster/lock_manager/lock_manager.go @@ -0,0 +1,118 @@ +package lock_manager + +import ( + "fmt" + "github.com/google/uuid" + "github.com/puzpuzpuz/xsync/v2" + "time" +) + +// LockManager lock manager +type LockManager struct { + locks *xsync.MapOf[string, *Lock] +} +type Lock struct { + Token string + ExpirationNs int64 + Key string // only used for moving locks +} + +func NewLockManager() *LockManager { + t := &LockManager{ + locks: xsync.NewMapOf[*Lock](), + } + go t.CleanUp() + return t +} + +func (lm *LockManager) Lock(path string, ttlDuration time.Duration, token string) (renewToken string, err error) { + lm.locks.Compute(path, func(oldValue *Lock, loaded bool) (newValue *Lock, delete bool) { + if oldValue != nil { + now := time.Now() + if oldValue.ExpirationNs > 0 && oldValue.ExpirationNs < now.UnixNano() { + // lock is expired, set to a new lock + expirationNs := time.Now().Add(ttlDuration).UnixNano() + return &Lock{Token: token, ExpirationNs: expirationNs}, false + } + if oldValue.Token == token { + expirationNs := time.Now().Add(ttlDuration).UnixNano() + return &Lock{Token: token, ExpirationNs: expirationNs}, false + } else { + err = fmt.Errorf("lock: token mismatch") + return oldValue, false + } + } else { + expirationNs := time.Now().Add(ttlDuration).UnixNano() + if token == "" { + renewToken = uuid.New().String() + return &Lock{Token: renewToken, ExpirationNs: expirationNs}, false + } else { + err = fmt.Errorf("lock: non-empty token on a new lock") + return nil, false + } + return &Lock{Token: token, ExpirationNs: expirationNs}, false + } + }) + return +} + +func (lm *LockManager) Unlock(path string, token string) (isUnlocked bool, err error) { + lm.locks.Compute(path, func(oldValue *Lock, loaded bool) (newValue *Lock, delete bool) { + if oldValue != nil { + now := time.Now() + if oldValue.ExpirationNs > 0 && oldValue.ExpirationNs < now.UnixNano() { + // lock is expired, delete it + isUnlocked = true + return nil, true + } + if oldValue.Token == token { + if oldValue.ExpirationNs <= now.UnixNano() { + isUnlocked = true + return nil, true + } + return oldValue, false + } else { + isUnlocked = false + err = fmt.Errorf("unlock: token mismatch") + return oldValue, false + } + } else { + isUnlocked = true + return nil, true + } + }) + return +} + +func (lm *LockManager) CleanUp() { + for { + time.Sleep(1 * time.Minute) + now := time.Now().UnixNano() + lm.locks.Range(func(key string, value *Lock) bool { + if now > value.ExpirationNs { + lm.locks.Delete(key) + return true + } + return true + }) + } +} + +// TakeOutLocksByKey takes out locks by key +// if keyFn return true, the lock will be taken out +func (lm *LockManager) TakeOutLocksByKey(keyFn func(key string) bool) (locks []*Lock) { + now := time.Now().UnixNano() + lm.locks.Range(func(key string, lock *Lock) bool { + if now > lock.ExpirationNs { + lm.locks.Delete(key) + return true + } + if keyFn(key) { + lm.locks.Delete(key) + lock.Key = key + locks = append(locks, lock) + } + return true + }) + return +} diff --git a/weed/cluster/lock_manager/lock_ring.go b/weed/cluster/lock_manager/lock_ring.go index df5ebebac..eca86618a 100644 --- a/weed/cluster/lock_manager/lock_ring.go +++ b/weed/cluster/lock_manager/lock_ring.go @@ -2,6 +2,7 @@ package lock_manager import ( "github.com/seaweedfs/seaweedfs/weed/pb" + "github.com/seaweedfs/seaweedfs/weed/util" "sort" "sync" "time" @@ -22,14 +23,19 @@ type LockRing struct { onTakeSnapshot func(snapshot []pb.ServerAddress) } -func NewLockRing(snapshotInterval time.Duration, onTakeSnapshot func(snapshot []pb.ServerAddress)) *LockRing { +func NewLockRing(snapshotInterval time.Duration) *LockRing { return &LockRing{ snapshotInterval: snapshotInterval, candidateServers: make(map[pb.ServerAddress]struct{}), - onTakeSnapshot: onTakeSnapshot, } } +func (r *LockRing) SetTakeSnapshotCallback(onTakeSnapshot func(snapshot []pb.ServerAddress)) { + r.Lock() + defer r.Unlock() + r.onTakeSnapshot = onTakeSnapshot +} + // AddServer adds a server to the ring // if the previous snapshot passed the snapshot interval, create a new snapshot func (r *LockRing) AddServer(server pb.ServerAddress) { @@ -144,3 +150,25 @@ func (r *LockRing) getSortedServers() []pb.ServerAddress { }) return sortedServers } + +func (r *LockRing) GetSnapshot() (servers []pb.ServerAddress) { + r.RLock() + defer r.RUnlock() + + if len(r.snapshots) == 0 { + return + } + return r.snapshots[0].servers +} + +func HashKeyToServer(key string, servers []pb.ServerAddress) pb.ServerAddress { + if len(servers) == 0 { + return "" + } + x := util.HashStringToLong(key) + if x < 0 { + x = -x + } + x = x % int64(len(servers)) + return servers[x] +} diff --git a/weed/cluster/lock_manager/lock_ring_test.go b/weed/cluster/lock_manager/lock_ring_test.go index a8c88f4ab..9025309ec 100644 --- a/weed/cluster/lock_manager/lock_ring_test.go +++ b/weed/cluster/lock_manager/lock_ring_test.go @@ -8,15 +8,7 @@ import ( ) func TestAddServer(t *testing.T) { - counter := 0 - r := NewLockRing(100*time.Millisecond, func(snapshot []pb.ServerAddress) { - counter++ - if counter == 1 { - assert.Equal(t, 1, len(snapshot)) - } else if counter == 2 { - assert.Equal(t, 2, len(snapshot)) - } - }) + r := NewLockRing(100 * time.Millisecond) r.AddServer("localhost:8080") assert.Equal(t, 1, len(r.snapshots)) r.AddServer("localhost:8081") @@ -36,7 +28,7 @@ func TestAddServer(t *testing.T) { } func TestLockRing(t *testing.T) { - r := NewLockRing(100*time.Millisecond, nil) + r := NewLockRing(100 * time.Millisecond) r.SetSnapshot([]pb.ServerAddress{"localhost:8080", "localhost:8081"}) assert.Equal(t, 1, len(r.snapshots)) r.SetSnapshot([]pb.ServerAddress{"localhost:8080", "localhost:8081", "localhost:8082"}) |
