aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--weed/command/imports.go1
-rw-r--r--weed/command/scaffold/filer.toml22
-rw-r--r--weed/filer.toml5
-rw-r--r--weed/filer/redis3/kv_directory_children.go111
-rw-r--r--weed/filer/redis3/redis_cluster_store.go42
-rw-r--r--weed/filer/redis3/redis_store.go36
-rw-r--r--weed/filer/redis3/skiplist_element_store.go62
-rw-r--r--weed/filer/redis3/universal_redis_store.go177
-rw-r--r--weed/filer/redis3/universal_redis_store_kv.go42
-rw-r--r--weed/server/filer_server.go1
-rw-r--r--weed/util/skiplist/Makefile6
-rw-r--r--weed/util/skiplist/list_store.go32
-rw-r--r--weed/util/skiplist/name_batch.go102
-rw-r--r--weed/util/skiplist/name_list.go326
-rw-r--r--weed/util/skiplist/name_list_serde.go71
-rw-r--r--weed/util/skiplist/name_list_test.go73
-rw-r--r--weed/util/skiplist/skiplist.go563
-rw-r--r--weed/util/skiplist/skiplist.pb.go438
-rw-r--r--weed/util/skiplist/skiplist.proto30
-rw-r--r--weed/util/skiplist/skiplist_serde.go51
-rw-r--r--weed/util/skiplist/skiplist_test.go295
21 files changed, 2486 insertions, 0 deletions
diff --git a/weed/command/imports.go b/weed/command/imports.go
index a2f59189f..48cda5f90 100644
--- a/weed/command/imports.go
+++ b/weed/command/imports.go
@@ -29,6 +29,7 @@ import (
_ "github.com/chrislusf/seaweedfs/weed/filer/postgres2"
_ "github.com/chrislusf/seaweedfs/weed/filer/redis"
_ "github.com/chrislusf/seaweedfs/weed/filer/redis2"
+ _ "github.com/chrislusf/seaweedfs/weed/filer/redis3"
_ "github.com/chrislusf/seaweedfs/weed/filer/sqlite"
_ "github.com/chrislusf/seaweedfs/weed/filer/tikv"
)
diff --git a/weed/command/scaffold/filer.toml b/weed/command/scaffold/filer.toml
index caf9d173d..aeb8a5b67 100644
--- a/weed/command/scaffold/filer.toml
+++ b/weed/command/scaffold/filer.toml
@@ -185,6 +185,28 @@ routeByLatency = false
# This changes the data layout. Only add new directories. Removing/Updating will cause data loss.
superLargeDirectories = []
+[redis3] # beta
+enabled = false
+address = "localhost:6379"
+password = ""
+database = 0
+
+[redis_cluster3] # beta
+enabled = false
+addresses = [
+ "localhost:30001",
+ "localhost:30002",
+ "localhost:30003",
+ "localhost:30004",
+ "localhost:30005",
+ "localhost:30006",
+]
+password = ""
+# allows reads from slave servers or the master, but all writes still go to the master
+readOnly = false
+# automatically use the closest Redis server for reads
+routeByLatency = false
+
[etcd]
enabled = false
servers = "localhost:2379"
diff --git a/weed/filer.toml b/weed/filer.toml
new file mode 100644
index 000000000..a0af38d95
--- /dev/null
+++ b/weed/filer.toml
@@ -0,0 +1,5 @@
+[redis3]
+enabled = true
+address = "localhost:6379"
+password = ""
+database = 0
diff --git a/weed/filer/redis3/kv_directory_children.go b/weed/filer/redis3/kv_directory_children.go
new file mode 100644
index 000000000..16d921d03
--- /dev/null
+++ b/weed/filer/redis3/kv_directory_children.go
@@ -0,0 +1,111 @@
+package redis3
+
+import (
+ "context"
+ "fmt"
+ "github.com/chrislusf/seaweedfs/weed/glog"
+ "github.com/chrislusf/seaweedfs/weed/util/skiplist"
+ "github.com/go-redis/redis/v8"
+)
+
+const maxNameBatchSizeLimit = 1000
+
+func insertChild(ctx context.Context, client redis.UniversalClient, key string, name string) error {
+ data, err := client.Get(ctx, key).Result()
+ if err != nil {
+ if err != redis.Nil {
+ return fmt.Errorf("read %s: %v", key, err)
+ }
+ }
+ store := newSkipListElementStore(key, client)
+ nameList := skiplist.LoadNameList([]byte(data), store, maxNameBatchSizeLimit)
+
+ // println("add", key, name)
+ if err := nameList.WriteName(name); err != nil {
+ glog.Errorf("add %s %s: %v", key, name, err)
+ return err
+ }
+ if !nameList.HasChanges() {
+ return nil
+ }
+
+ if err := client.Set(ctx, key, nameList.ToBytes(), 0).Err(); err != nil {
+ return err
+ }
+
+ return nil
+}
+
+func removeChild(ctx context.Context, client redis.UniversalClient, key string, name string) error {
+ data, err := client.Get(ctx, key).Result()
+ if err != nil {
+ if err != redis.Nil {
+ return fmt.Errorf("read %s: %v", key, err)
+ }
+ }
+ store := newSkipListElementStore(key, client)
+ nameList := skiplist.LoadNameList([]byte(data), store, maxNameBatchSizeLimit)
+
+ if err := nameList.DeleteName(name); err != nil {
+ return err
+ }
+ if !nameList.HasChanges() {
+ return nil
+ }
+
+ if err := client.Set(ctx, key, nameList.ToBytes(), 0).Err(); err != nil {
+ return err
+ }
+
+ return nil
+}
+
+func removeChildren(ctx context.Context, client redis.UniversalClient, key string, onDeleteFn func(name string) error) error {
+
+ data, err := client.Get(ctx, key).Result()
+ if err != nil {
+ if err != redis.Nil {
+ return fmt.Errorf("read %s: %v", key, err)
+ }
+ }
+ store := newSkipListElementStore(key, client)
+ nameList := skiplist.LoadNameList([]byte(data), store, maxNameBatchSizeLimit)
+
+ if err = nameList.ListNames("", func(name string) bool {
+ if err := onDeleteFn(name); err != nil {
+ glog.Errorf("delete %s child %s: %v", key, name, err)
+ return false
+ }
+ return true
+ }); err != nil {
+ return err
+ }
+
+ if err = nameList.RemoteAllListElement(); err != nil {
+ return err
+ }
+
+ return nil
+
+}
+
+func listChildren(ctx context.Context, client redis.UniversalClient, key string, startFileName string, eachFn func(name string) bool) error {
+
+ data, err := client.Get(ctx, key).Result()
+ if err != nil {
+ if err != redis.Nil {
+ return fmt.Errorf("read %s: %v", key, err)
+ }
+ }
+ store := newSkipListElementStore(key, client)
+ nameList := skiplist.LoadNameList([]byte(data), store, maxNameBatchSizeLimit)
+
+ if err = nameList.ListNames(startFileName, func(name string) bool {
+ return eachFn(name)
+ }); err != nil {
+ return err
+ }
+
+ return nil
+
+}
diff --git a/weed/filer/redis3/redis_cluster_store.go b/weed/filer/redis3/redis_cluster_store.go
new file mode 100644
index 000000000..e0c620450
--- /dev/null
+++ b/weed/filer/redis3/redis_cluster_store.go
@@ -0,0 +1,42 @@
+package redis3
+
+import (
+ "github.com/chrislusf/seaweedfs/weed/filer"
+ "github.com/chrislusf/seaweedfs/weed/util"
+ "github.com/go-redis/redis/v8"
+)
+
+func init() {
+ filer.Stores = append(filer.Stores, &RedisCluster3Store{})
+}
+
+type RedisCluster3Store struct {
+ UniversalRedis3Store
+}
+
+func (store *RedisCluster3Store) GetName() string {
+ return "redis_cluster3"
+}
+
+func (store *RedisCluster3Store) Initialize(configuration util.Configuration, prefix string) (err error) {
+
+ configuration.SetDefault(prefix+"useReadOnly", false)
+ configuration.SetDefault(prefix+"routeByLatency", false)
+
+ return store.initialize(
+ configuration.GetStringSlice(prefix+"addresses"),
+ configuration.GetString(prefix+"password"),
+ configuration.GetBool(prefix+"useReadOnly"),
+ configuration.GetBool(prefix+"routeByLatency"),
+ )
+}
+
+func (store *RedisCluster3Store) initialize(addresses []string, password string, readOnly, routeByLatency bool) (err error) {
+ store.Client = redis.NewClusterClient(&redis.ClusterOptions{
+ Addrs: addresses,
+ Password: password,
+ ReadOnly: readOnly,
+ RouteByLatency: routeByLatency,
+ })
+ return
+}
diff --git a/weed/filer/redis3/redis_store.go b/weed/filer/redis3/redis_store.go
new file mode 100644
index 000000000..fdbf994ec
--- /dev/null
+++ b/weed/filer/redis3/redis_store.go
@@ -0,0 +1,36 @@
+package redis3
+
+import (
+ "github.com/chrislusf/seaweedfs/weed/filer"
+ "github.com/chrislusf/seaweedfs/weed/util"
+ "github.com/go-redis/redis/v8"
+)
+
+func init() {
+ filer.Stores = append(filer.Stores, &Redis3Store{})
+}
+
+type Redis3Store struct {
+ UniversalRedis3Store
+}
+
+func (store *Redis3Store) GetName() string {
+ return "redis3"
+}
+
+func (store *Redis3Store) Initialize(configuration util.Configuration, prefix string) (err error) {
+ return store.initialize(
+ configuration.GetString(prefix+"address"),
+ configuration.GetString(prefix+"password"),
+ configuration.GetInt(prefix+"database"),
+ )
+}
+
+func (store *Redis3Store) initialize(hostPort string, password string, database int) (err error) {
+ store.Client = redis.NewClient(&redis.Options{
+ Addr: hostPort,
+ Password: password,
+ DB: database,
+ })
+ return
+}
diff --git a/weed/filer/redis3/skiplist_element_store.go b/weed/filer/redis3/skiplist_element_store.go
new file mode 100644
index 000000000..66a5408d6
--- /dev/null
+++ b/weed/filer/redis3/skiplist_element_store.go
@@ -0,0 +1,62 @@
+package redis3
+
+import (
+ "context"
+ "fmt"
+ "github.com/chrislusf/seaweedfs/weed/glog"
+ "github.com/chrislusf/seaweedfs/weed/util/skiplist"
+ "github.com/go-redis/redis/v8"
+ "github.com/golang/protobuf/proto"
+)
+
+type SkipListElementStore struct {
+ prefix string
+ client redis.UniversalClient
+}
+
+var _ = skiplist.ListStore(&SkipListElementStore{})
+
+func newSkipListElementStore(prefix string, client redis.UniversalClient) *SkipListElementStore {
+ return &SkipListElementStore{
+ prefix: prefix,
+ client: client,
+ }
+}
+
+func (m *SkipListElementStore) SaveElement(id int64, element *skiplist.SkipListElement) error {
+ key := fmt.Sprintf("%s%d", m.prefix, id)
+ data, err := proto.Marshal(element)
+ if err != nil {
+ glog.Errorf("marshal %s: %v", key, err)
+ }
+ return m.client.Set(context.Background(), key, data, 0).Err()
+}
+
+func (m *SkipListElementStore) DeleteElement(id int64) error {
+ key := fmt.Sprintf("%s%d", m.prefix, id)
+ return m.client.Del(context.Background(), key).Err()
+}
+
+func (m *SkipListElementStore) LoadElement(id int64) (*skiplist.SkipListElement, error) {
+ key := fmt.Sprintf("%s%d", m.prefix, id)
+ data, err := m.client.Get(context.Background(), key).Result()
+ if err != nil {
+ if err == redis.Nil {
+ return nil, nil
+ }
+ return nil, err
+ }
+ t := &skiplist.SkipListElement{}
+ err = proto.Unmarshal([]byte(data), t)
+ if err == nil {
+ for i:=0;i<len(t.Next);i++{
+ if t.Next[i].IsNil() {
+ t.Next[i] = nil
+ }
+ }
+ if t.Prev.IsNil() {
+ t.Prev = nil
+ }
+ }
+ return t, err
+}
diff --git a/weed/filer/redis3/universal_redis_store.go b/weed/filer/redis3/universal_redis_store.go
new file mode 100644
index 000000000..8a89e7c48
--- /dev/null
+++ b/weed/filer/redis3/universal_redis_store.go
@@ -0,0 +1,177 @@
+package redis3
+
+import (
+ "context"
+ "fmt"
+ "time"
+
+ "github.com/go-redis/redis/v8"
+
+ "github.com/chrislusf/seaweedfs/weed/filer"
+ "github.com/chrislusf/seaweedfs/weed/glog"
+ "github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
+ "github.com/chrislusf/seaweedfs/weed/util"
+)
+
+const (
+ DIR_LIST_MARKER = "\x00"
+)
+
+type UniversalRedis3Store struct {
+ Client redis.UniversalClient
+}
+
+func (store *UniversalRedis3Store) BeginTransaction(ctx context.Context) (context.Context, error) {
+ return ctx, nil
+}
+func (store *UniversalRedis3Store) CommitTransaction(ctx context.Context) error {
+ return nil
+}
+func (store *UniversalRedis3Store) RollbackTransaction(ctx context.Context) error {
+ return nil
+}
+
+func (store *UniversalRedis3Store) InsertEntry(ctx context.Context, entry *filer.Entry) (err error) {
+
+ value, err := entry.EncodeAttributesAndChunks()
+ if err != nil {
+ return fmt.Errorf("encoding %s %+v: %v", entry.FullPath, entry.Attr, err)
+ }
+
+ if len(entry.Chunks) > 50 {
+ value = util.MaybeGzipData(value)
+ }
+
+ if err = store.Client.Set(ctx, string(entry.FullPath), value, time.Duration(entry.TtlSec)*time.Second).Err(); err != nil {
+ return fmt.Errorf("persisting %s : %v", entry.FullPath, err)
+ }
+
+ dir, name := entry.FullPath.DirAndName()
+
+ if name != "" {
+ if err = insertChild(ctx, store.Client, genDirectoryListKey(dir), name); err != nil {
+ return fmt.Errorf("persisting %s in parent dir: %v", entry.FullPath, err)
+ }
+ }
+
+ return nil
+}
+
+func (store *UniversalRedis3Store) UpdateEntry(ctx context.Context, entry *filer.Entry) (err error) {
+
+ return store.InsertEntry(ctx, entry)
+}
+
+func (store *UniversalRedis3Store) FindEntry(ctx context.Context, fullpath util.FullPath) (entry *filer.Entry, err error) {
+
+ data, err := store.Client.Get(ctx, string(fullpath)).Result()
+ if err == redis.Nil {
+ return nil, filer_pb.ErrNotFound
+ }
+
+ if err != nil {
+ return nil, fmt.Errorf("get %s : %v", fullpath, err)
+ }
+
+ entry = &filer.Entry{
+ FullPath: fullpath,
+ }
+ err = entry.DecodeAttributesAndChunks(util.MaybeDecompressData([]byte(data)))
+ if err != nil {
+ return entry, fmt.Errorf("decode %s : %v", entry.FullPath, err)
+ }
+
+ return entry, nil
+}
+
+func (store *UniversalRedis3Store) DeleteEntry(ctx context.Context, fullpath util.FullPath) (err error) {
+
+ _, err = store.Client.Del(ctx, genDirectoryListKey(string(fullpath))).Result()
+ if err != nil {
+ return fmt.Errorf("delete dir list %s : %v", fullpath, err)
+ }
+
+ _, err = store.Client.Del(ctx, string(fullpath)).Result()
+ if err != nil {
+ return fmt.Errorf("delete %s : %v", fullpath, err)
+ }
+
+ dir, name := fullpath.DirAndName()
+
+ if name != "" {
+ if err = removeChild(ctx, store.Client, genDirectoryListKey(dir), name); err != nil {
+ return fmt.Errorf("DeleteEntry %s in parent dir: %v", fullpath, err)
+ }
+ }
+
+ return nil
+}
+
+func (store *UniversalRedis3Store) DeleteFolderChildren(ctx context.Context, fullpath util.FullPath) (err error) {
+
+ return removeChildren(ctx, store.Client, genDirectoryListKey(string(fullpath)), func(name string) error {
+ path := util.NewFullPath(string(fullpath), name)
+ _, err = store.Client.Del(ctx, string(path)).Result()
+ if err != nil {
+ return fmt.Errorf("DeleteFolderChildren %s in parent dir: %v", fullpath, err)
+ }
+ // not efficient, but need to remove if it is a directory
+ store.Client.Del(ctx, genDirectoryListKey(string(path)))
+ return nil
+ })
+
+}
+
+func (store *UniversalRedis3Store) ListDirectoryPrefixedEntries(ctx context.Context, dirPath util.FullPath, startFileName string, includeStartFile bool, limit int64, prefix string, eachEntryFunc filer.ListEachEntryFunc) (lastFileName string, err error) {
+ return lastFileName, filer.ErrUnsupportedListDirectoryPrefixed
+}
+
+func (store *UniversalRedis3Store) ListDirectoryEntries(ctx context.Context, dirPath util.FullPath, startFileName string, includeStartFile bool, limit int64, eachEntryFunc filer.ListEachEntryFunc) (lastFileName string, err error) {
+
+ dirListKey := genDirectoryListKey(string(dirPath))
+ counter := int64(0)
+
+ err = listChildren(ctx, store.Client, dirListKey, startFileName, func(fileName string) bool {
+ if startFileName != "" {
+ if !includeStartFile && startFileName == fileName {
+ return true
+ }
+ }
+
+ path := util.NewFullPath(string(dirPath), fileName)
+ entry, err := store.FindEntry(ctx, path)
+ lastFileName = fileName
+ if err != nil {
+ glog.V(0).Infof("list %s : %v", path, err)
+ if err == filer_pb.ErrNotFound {
+ return true
+ }
+ } else {
+ if entry.TtlSec > 0 {
+ if entry.Attr.Crtime.Add(time.Duration(entry.TtlSec) * time.Second).Before(time.Now()) {
+ store.Client.Del(ctx, string(path)).Result()
+ store.Client.ZRem(ctx, dirListKey, fileName).Result()
+ return true
+ }
+ }
+ counter++
+ if !eachEntryFunc(entry) {
+ return false
+ }
+ if counter >= limit {
+ return false
+ }
+ }
+ return true
+ })
+
+ return lastFileName, err
+}
+
+func genDirectoryListKey(dir string) (dirList string) {
+ return dir + DIR_LIST_MARKER
+}
+
+func (store *UniversalRedis3Store) Shutdown() {
+ store.Client.Close()
+}
diff --git a/weed/filer/redis3/universal_redis_store_kv.go b/weed/filer/redis3/universal_redis_store_kv.go
new file mode 100644
index 000000000..a9c440a37
--- /dev/null
+++ b/weed/filer/redis3/universal_redis_store_kv.go
@@ -0,0 +1,42 @@
+package redis3
+
+import (
+ "context"
+ "fmt"
+
+ "github.com/chrislusf/seaweedfs/weed/filer"
+ "github.com/go-redis/redis/v8"
+)
+
+func (store *UniversalRedis3Store) KvPut(ctx context.Context, key []byte, value []byte) (err error) {
+
+ _, err = store.Client.Set(ctx, string(key), value, 0).Result()
+
+ if err != nil {
+ return fmt.Errorf("kv put: %v", err)
+ }
+
+ return nil
+}
+
+func (store *UniversalRedis3Store) KvGet(ctx context.Context, key []byte) (value []byte, err error) {
+
+ data, err := store.Client.Get(ctx, string(key)).Result()
+
+ if err == redis.Nil {
+ return nil, filer.ErrKvNotFound
+ }
+
+ return []byte(data), err
+}
+
+func (store *UniversalRedis3Store) KvDelete(ctx context.Context, key []byte) (err error) {
+
+ _, err = store.Client.Del(ctx, string(key)).Result()
+
+ if err != nil {
+ return fmt.Errorf("kv delete: %v", err)
+ }
+
+ return nil
+}
diff --git a/weed/server/filer_server.go b/weed/server/filer_server.go
index b886bf641..aa66b4187 100644
--- a/weed/server/filer_server.go
+++ b/weed/server/filer_server.go
@@ -34,6 +34,7 @@ import (
_ "github.com/chrislusf/seaweedfs/weed/filer/postgres2"
_ "github.com/chrislusf/seaweedfs/weed/filer/redis"
_ "github.com/chrislusf/seaweedfs/weed/filer/redis2"
+ _ "github.com/chrislusf/seaweedfs/weed/filer/redis3"
_ "github.com/chrislusf/seaweedfs/weed/filer/sqlite"
"github.com/chrislusf/seaweedfs/weed/glog"
"github.com/chrislusf/seaweedfs/weed/notification"
diff --git a/weed/util/skiplist/Makefile b/weed/util/skiplist/Makefile
new file mode 100644
index 000000000..af4afe639
--- /dev/null
+++ b/weed/util/skiplist/Makefile
@@ -0,0 +1,6 @@
+all: gen
+
+.PHONY : gen
+
+gen:
+ protoc skiplist.proto --go_out=plugins=grpc:. --go_opt=paths=source_relative
diff --git a/weed/util/skiplist/list_store.go b/weed/util/skiplist/list_store.go
new file mode 100644
index 000000000..0eb1106bc
--- /dev/null
+++ b/weed/util/skiplist/list_store.go
@@ -0,0 +1,32 @@
+package skiplist
+
+type ListStore interface {
+ SaveElement(id int64, element *SkipListElement) error
+ DeleteElement(id int64) error
+ LoadElement(id int64) (*SkipListElement, error)
+}
+
+type MemStore struct {
+ m map[int64]*SkipListElement
+}
+
+func newMemStore() *MemStore {
+ return &MemStore{
+ m: make(map[int64]*SkipListElement),
+ }
+}
+
+func (m *MemStore) SaveElement(id int64, element *SkipListElement) error {
+ m.m[id] = element
+ return nil
+}
+
+func (m *MemStore) DeleteElement(id int64) error {
+ delete(m.m, id)
+ return nil
+}
+
+func (m *MemStore) LoadElement(id int64) (*SkipListElement, error) {
+ element := m.m[id]
+ return element, nil
+}
diff --git a/weed/util/skiplist/name_batch.go b/weed/util/skiplist/name_batch.go
new file mode 100644
index 000000000..71e5aeeba
--- /dev/null
+++ b/weed/util/skiplist/name_batch.go
@@ -0,0 +1,102 @@
+package skiplist
+
+import (
+ "github.com/chrislusf/seaweedfs/weed/glog"
+ "github.com/golang/protobuf/proto"
+ "sort"
+ "strings"
+)
+
+type NameBatch struct {
+ key string
+ names map[string]struct{}
+}
+
+func (nb *NameBatch) ContainsName(name string) (found bool) {
+ _, found = nb.names[name]
+ return
+}
+func (nb *NameBatch) WriteName(name string) {
+ if nb.key == "" || strings.Compare(nb.key, name) > 0 {
+ nb.key = name
+ }
+ nb.names[name] = struct{}{}
+}
+func (nb *NameBatch) DeleteName(name string) {
+ delete(nb.names, name)
+ if nb.key == name {
+ nb.key = ""
+ for n := range nb.names {
+ if nb.key == "" || strings.Compare(nb.key, n) > 0 {
+ nb.key = n
+ }
+ }
+ }
+}
+func (nb *NameBatch) ListNames(startFrom string, visitNamesFn func(name string) bool) bool {
+ var names []string
+ needFilter := startFrom != ""
+ for n := range nb.names {
+ if !needFilter || strings.Compare(n, startFrom) >= 0 {
+ names = append(names, n)
+ }
+ }
+ sort.Slice(names, func(i, j int) bool {
+ return strings.Compare(names[i], names[j]) < 0
+ })
+ for _, n := range names {
+ if !visitNamesFn(n) {
+ return false
+ }
+ }
+ return true
+}
+
+func NewNameBatch() *NameBatch {
+ return &NameBatch{
+ names: make(map[string]struct{}),
+ }
+}
+
+func LoadNameBatch(data []byte) *NameBatch {
+ t := &NameBatchData{}
+ if len(data) > 0 {
+ err := proto.Unmarshal(data, t)
+ if err != nil {
+ glog.Errorf("unmarshal into NameBatchData{} : %v", err)
+ return nil
+ }
+ }
+ nb := NewNameBatch()
+ for _, n := range t.Names {
+ name := string(n)
+ if nb.key == "" || strings.Compare(nb.key, name) > 0 {
+ nb.key = name
+ }
+ nb.names[name] = struct{}{}
+ }
+ return nb
+}
+
+func (nb *NameBatch) ToBytes() []byte {
+ t := &NameBatchData{}
+ for n := range nb.names {
+ t.Names = append(t.Names, []byte(n))
+ }
+ data, _ := proto.Marshal(t)
+ return data
+}
+
+func (nb *NameBatch) SplitBy(name string) (x, y *NameBatch) {
+ x, y = NewNameBatch(), NewNameBatch()
+
+ for n := range nb.names {
+ // there should be no equal case though
+ if strings.Compare(n, name) <= 0 {
+ x.WriteName(n)
+ } else {
+ y.WriteName(n)
+ }
+ }
+ return
+}
diff --git a/weed/util/skiplist/name_list.go b/weed/util/skiplist/name_list.go
new file mode 100644
index 000000000..4ba26665a
--- /dev/null
+++ b/weed/util/skiplist/name_list.go
@@ -0,0 +1,326 @@
+package skiplist
+
+import (
+ "bytes"
+)
+
+type NameList struct {
+ skipList *SkipList
+ batchSize int
+}
+
+func newNameList(store ListStore, batchSize int) *NameList {
+ return &NameList{
+ skipList: New(store),
+ batchSize: batchSize,
+ }
+}
+
+/*
+Be reluctant to create new nodes. Try to fit into either previous node or next node.
+Prefer to add to previous node.
+
+There are multiple cases after finding the name for greater or equal node
+ 1. found and node.Key == name
+ The node contains a batch with leading key the same as the name
+ nothing to do
+ 2. no such node found or node.Key > name
+
+ if no such node found
+ prevNode = list.LargestNode
+
+ // case 2.1
+ if previousNode contains name
+ nothing to do
+
+ // prefer to add to previous node
+ if prevNode != nil {
+ // case 2.2
+ if prevNode has capacity
+ prevNode.add name, and save
+ return
+ // case 2.3
+ split prevNode by name
+ }
+
+ // case 2.4
+ // merge into next node. Avoid too many nodes if adding data in reverse order.
+ if nextNode is not nil and nextNode has capacity
+ delete nextNode.Key
+ nextNode.Key = name
+ nextNode.batch.add name
+ insert nodeNode.Key
+ return
+
+ // case 2.5
+ if prevNode is nil
+ insert new node with key = name, value = batch{name}
+ return
+
+*/
+func (nl *NameList) WriteName(name string) error {
+
+ lookupKey := []byte(name)
+ prevNode, nextNode, found, err := nl.skipList.FindGreaterOrEqual(lookupKey)
+ if err != nil {
+ return err
+ }
+ // case 1: the name already exists as one leading key in the batch
+ if found && bytes.Compare(nextNode.Key, lookupKey) == 0 {
+ return nil
+ }
+
+ if !found {
+ prevNode, err = nl.skipList.GetLargestNode()
+ if err != nil {
+ return err
+ }
+ }
+
+ if nextNode != nil && prevNode == nil {
+ prevNode, err = nl.skipList.loadElement(nextNode.Prev)
+ if err != nil {
+ return err
+ }
+ }
+
+ if prevNode != nil {
+ prevNameBatch := LoadNameBatch(prevNode.Value)
+ // case 2.1
+ if prevNameBatch.ContainsName(name) {
+ return nil
+ }
+
+ // case 2.2
+ if len(prevNameBatch.names) < nl.batchSize {
+ prevNameBatch.WriteName(name)
+ return nl.skipList.ChangeValue(prevNode, prevNameBatch.ToBytes())
+ }
+
+ // case 2.3
+ x, y := prevNameBatch.SplitBy(name)
+ addToX := len(x.names) <= len(y.names)
+ if len(x.names) != len(prevNameBatch.names) {
+ if addToX {
+ x.WriteName(name)
+ }
+ if x.key == prevNameBatch.key {
+ if err := nl.skipList.ChangeValue(prevNode, x.ToBytes()); err != nil {
+ return err
+ }
+ } else {
+ if err := nl.skipList.Insert([]byte(x.key), x.ToBytes()); err != nil {
+ return err
+ }
+ }
+ }
+ if len(y.names) != len(prevNameBatch.names) {
+ if !addToX {
+ y.WriteName(name)
+ }
+ if y.key == prevNameBatch.key {
+ if err := nl.skipList.ChangeValue(prevNode, y.ToBytes()); err != nil {
+ return err
+ }
+ } else {
+ if err := nl.skipList.Insert([]byte(y.key), y.ToBytes()); err != nil {
+ return err
+ }
+ }
+ }
+ return nil
+
+ }
+
+ // case 2.4
+ if nextNode != nil {
+ nextNameBatch := LoadNameBatch(nextNode.Value)
+ if len(nextNameBatch.names) < nl.batchSize {
+ if err := nl.skipList.Delete(nextNode.Key); err != nil {
+ return err
+ }
+ nextNameBatch.WriteName(name)
+ if err := nl.skipList.Insert([]byte(nextNameBatch.key), nextNameBatch.ToBytes()); err != nil {
+ return err
+ }
+ return nil
+ }
+ }
+
+ // case 2.5
+ // now prevNode is nil
+ newNameBatch := NewNameBatch()
+ newNameBatch.WriteName(name)
+ if err := nl.skipList.Insert([]byte(newNameBatch.key), newNameBatch.ToBytes()); err != nil {
+ return err
+ }
+
+ return nil
+}
+
+/*
+// case 1: exists in nextNode
+if nextNode != nil && nextNode.Key == name {
+ remove from nextNode, update nextNode
+ // TODO: merge with prevNode if possible?
+ return
+}
+if nextNode is nil
+ prevNode = list.Largestnode
+if prevNode == nil and nextNode.Prev != nil
+ prevNode = load(nextNode.Prev)
+
+// case 2: does not exist
+// case 2.1
+if prevNode == nil {
+ return
+}
+// case 2.2
+if prevNameBatch does not contain name {
+ return
+}
+
+// case 3
+delete from prevNameBatch
+if prevNameBatch + nextNode < capacityList
+ // case 3.1
+ merge
+else
+ // case 3.2
+ update prevNode
+
+
+*/
+func (nl *NameList) DeleteName(name string) error {
+ lookupKey := []byte(name)
+ prevNode, nextNode, found, err := nl.skipList.FindGreaterOrEqual(lookupKey)
+ if err != nil {
+ return err
+ }
+
+ // case 1
+ var nextNameBatch *NameBatch
+ if nextNode != nil {
+ nextNameBatch = LoadNameBatch(nextNode.Value)
+ }
+ if found && bytes.Compare(nextNode.Key, lookupKey) == 0 {
+ if err := nl.skipList.Delete(nextNode.Key); err != nil {
+ return err
+ }
+ nextNameBatch.DeleteName(name)
+ if len(nextNameBatch.names) > 0 {
+ if err := nl.skipList.Insert([]byte(nextNameBatch.key), nextNameBatch.ToBytes()); err != nil {
+ return err
+ }
+ }
+ return nil
+ }
+
+ if !found {
+ prevNode, err = nl.skipList.GetLargestNode()
+ if err != nil {
+ return err
+ }
+ }
+
+ if nextNode != nil && prevNode == nil {
+ prevNode, err = nl.skipList.loadElement(nextNode.Prev)
+ if err != nil {
+ return err
+ }
+ }
+
+ // case 2
+ if prevNode == nil {
+ // case 2.1
+ return nil
+ }
+ prevNameBatch := LoadNameBatch(prevNode.Value)
+ if !prevNameBatch.ContainsName(name) {
+ // case 2.2
+ return nil
+ }
+
+ // case 3
+ prevNameBatch.DeleteName(name)
+ if len(prevNameBatch.names) == 0 {
+ if err := nl.skipList.Delete(prevNode.Key); err != nil {
+ return err
+ }
+ return nil
+ }
+ if nextNameBatch != nil && len(nextNameBatch.names) + len(prevNameBatch.names) < nl.batchSize {
+ // case 3.1 merge nextNode and prevNode
+ if err := nl.skipList.Delete(nextNode.Key); err != nil {
+ return err
+ }
+ for nextName := range nextNameBatch.names {
+ prevNameBatch.WriteName(nextName)
+ }
+ return nl.skipList.ChangeValue(prevNode, prevNameBatch.ToBytes())
+ } else {
+ // case 3.2 update prevNode
+ return nl.skipList.ChangeValue(prevNode, prevNameBatch.ToBytes())
+ }
+
+ return nil
+}
+
+func (nl *NameList) ListNames(startFrom string, visitNamesFn func(name string) bool) error {
+ lookupKey := []byte(startFrom)
+ prevNode, nextNode, found, err := nl.skipList.FindGreaterOrEqual(lookupKey)
+ if err != nil {
+ return err
+ }
+ if found && bytes.Compare(nextNode.Key, lookupKey) == 0 {
+ prevNode = nil
+ }
+ if !found {
+ prevNode, err = nl.skipList.GetLargestNode()
+ if err != nil {
+ return err
+ }
+ }
+
+ if prevNode != nil {
+ prevNameBatch := LoadNameBatch(prevNode.Value)
+ if !prevNameBatch.ListNames(startFrom, visitNamesFn) {
+ return nil
+ }
+ }
+
+ for nextNode != nil {
+ nextNameBatch := LoadNameBatch(nextNode.Value)
+ if !nextNameBatch.ListNames(startFrom, visitNamesFn) {
+ return nil
+ }
+ nextNode, err = nl.skipList.loadElement(nextNode.Next[0])
+ if err != nil {
+ return err
+ }
+ }
+
+ return nil
+}
+
+func (nl *NameList) RemoteAllListElement() error {
+
+ t := nl.skipList
+
+ nodeRef := t.startLevels[0]
+ for nodeRef != nil {
+ node, err := t.loadElement(nodeRef)
+ if err != nil {
+ return err
+ }
+ if node == nil {
+ return nil
+ }
+ if err := t.deleteElement(node); err != nil {
+ return err
+ }
+ nodeRef = node.Next[0]
+ }
+ return nil
+
+} \ No newline at end of file
diff --git a/weed/util/skiplist/name_list_serde.go b/weed/util/skiplist/name_list_serde.go
new file mode 100644
index 000000000..be9f06698
--- /dev/null
+++ b/weed/util/skiplist/name_list_serde.go
@@ -0,0 +1,71 @@
+package skiplist
+
+import (
+ "github.com/chrislusf/seaweedfs/weed/glog"
+ "github.com/golang/protobuf/proto"
+)
+
+func LoadNameList(data []byte, store ListStore, batchSize int) *NameList {
+
+ nl := &NameList{
+ skipList: New(store),
+ batchSize: batchSize,
+ }
+
+ if len(data) == 0 {
+ return nl
+ }
+
+ message := &SkipListProto{}
+ if err := proto.Unmarshal(data, message); err != nil {
+ glog.Errorf("loading skiplist: %v", err)
+ }
+ nl.skipList.maxNewLevel = int(message.MaxNewLevel)
+ nl.skipList.maxLevel = int(message.MaxLevel)
+ for i, ref := range message.StartLevels {
+ nl.skipList.startLevels[i] = &SkipListElementReference{
+ ElementPointer: ref.ElementPointer,
+ Key: ref.Key,
+ }
+ }
+ for i, ref := range message.EndLevels {
+ nl.skipList.endLevels[i] = &SkipListElementReference{
+ ElementPointer: ref.ElementPointer,
+ Key: ref.Key,
+ }
+ }
+ return nl
+}
+
+func (nl *NameList) HasChanges() bool {
+ return nl.skipList.hasChanges
+}
+
+func (nl *NameList) ToBytes() []byte {
+ message := &SkipListProto{}
+ message.MaxNewLevel = int32(nl.skipList.maxNewLevel)
+ message.MaxLevel = int32(nl.skipList.maxLevel)
+ for _, ref := range nl.skipList.startLevels {
+ if ref == nil {
+ break
+ }
+ message.StartLevels = append(message.StartLevels, &SkipListElementReference{
+ ElementPointer: ref.ElementPointer,
+ Key: ref.Key,
+ })
+ }
+ for _, ref := range nl.skipList.endLevels {
+ if ref == nil {
+ break
+ }
+ message.EndLevels = append(message.EndLevels, &SkipListElementReference{
+ ElementPointer: ref.ElementPointer,
+ Key: ref.Key,
+ })
+ }
+ data, err := proto.Marshal(message)
+ if err != nil {
+ glog.Errorf("marshal skiplist: %v", err)
+ }
+ return data
+} \ No newline at end of file
diff --git a/weed/util/skiplist/name_list_test.go b/weed/util/skiplist/name_list_test.go
new file mode 100644
index 000000000..b3a686553
--- /dev/null
+++ b/weed/util/skiplist/name_list_test.go
@@ -0,0 +1,73 @@
+package skiplist
+
+import (
+ "math/rand"
+ "strconv"
+ "testing"
+)
+
+const (
+ maxNameCount = 100
+)
+
+func String(x int) string {
+ return strconv.Itoa(x)
+}
+
+func TestNameList(t *testing.T) {
+ list := newNameList(memStore, 7)
+
+ for i := 0; i < maxNameCount; i++ {
+ list.WriteName(String(i))
+ }
+
+ counter := 0
+ list.ListNames("", func(name string) bool {
+ counter++
+ print(name, " ")
+ return true
+ })
+ if counter != maxNameCount {
+ t.Fail()
+ }
+
+ // list.skipList.println()
+
+ deleteBase := 5
+ deleteCount := maxNameCount - 3 * deleteBase
+
+ for i := deleteBase; i < deleteBase+deleteCount; i++ {
+ list.DeleteName(String(i))
+ }
+
+ counter = 0
+ list.ListNames("", func(name string) bool {
+ counter++
+ return true
+ })
+ // list.skipList.println()
+ if counter != maxNameCount-deleteCount {
+ t.Fail()
+ }
+
+ // randomized deletion
+ list = newNameList(memStore, 7)
+ // Delete elements at random positions in the list.
+ rList := rand.Perm(maxN)
+ for _, i := range rList {
+ list.WriteName(String(i))
+ }
+ for _, i := range rList {
+ list.DeleteName(String(i))
+ }
+ counter = 0
+ list.ListNames("", func(name string) bool {
+ counter++
+ print(name, " ")
+ return true
+ })
+ if counter != 0 {
+ t.Fail()
+ }
+
+}
diff --git a/weed/util/skiplist/skiplist.go b/weed/util/skiplist/skiplist.go
new file mode 100644
index 000000000..52e6c606a
--- /dev/null
+++ b/weed/util/skiplist/skiplist.go
@@ -0,0 +1,563 @@
+package skiplist
+
+// adapted from https://github.com/MauriceGit/skiplist/blob/master/skiplist.go
+
+import (
+ "bytes"
+ "fmt"
+ "math/bits"
+ "math/rand"
+ "time"
+)
+
+const (
+ // maxLevel denotes the maximum height of the skiplist. This height will keep the skiplist
+ // efficient for up to 34m entries. If there is a need for much more, please adjust this constant accordingly.
+ maxLevel = 25
+)
+
+type SkipList struct {
+ startLevels [maxLevel]*SkipListElementReference
+ endLevels [maxLevel]*SkipListElementReference
+ maxNewLevel int
+ maxLevel int
+ listStore ListStore
+ hasChanges bool
+ // elementCount int
+}
+
+// NewSeedEps returns a new empty, initialized Skiplist.
+// Given a seed, a deterministic height/list behaviour can be achieved.
+// Eps is used to compare keys given by the ExtractKey() function on equality.
+func NewSeed(seed int64, listStore ListStore) *SkipList {
+
+ // Initialize random number generator.
+ rand.Seed(seed)
+ //fmt.Printf("SkipList seed: %v\n", seed)
+
+ list := &SkipList{
+ maxNewLevel: maxLevel,
+ maxLevel: 0,
+ listStore: listStore,
+ // elementCount: 0,
+ }
+
+ return list
+}
+
+// New returns a new empty, initialized Skiplist.
+func New(listStore ListStore) *SkipList {
+ return NewSeed(time.Now().UTC().UnixNano(), listStore)
+}
+
+// IsEmpty checks, if the skiplist is empty.
+func (t *SkipList) IsEmpty() bool {
+ return t.startLevels[0] == nil
+}
+
+func (t *SkipList) generateLevel(maxLevel int) int {
+ level := maxLevel - 1
+ // First we apply some mask which makes sure that we don't get a level
+ // above our desired level. Then we find the first set bit.
+ var x = rand.Uint64() & ((1 << uint(maxLevel-1)) - 1)
+ zeroes := bits.TrailingZeros64(x)
+ if zeroes <= maxLevel {
+ level = zeroes
+ }
+
+ return level
+}
+
+func (t *SkipList) findEntryIndex(key []byte, minLevel int) int {
+ // Find good entry point so we don't accidentally skip half the list.
+ for i := t.maxLevel; i >= 0; i-- {
+ if t.startLevels[i] != nil && bytes.Compare(t.startLevels[i].Key, key) < 0 || i <= minLevel {
+ return i
+ }
+ }
+ return 0
+}
+
+func (t *SkipList) findExtended(key []byte, findGreaterOrEqual bool) (prevElementIfVisited *SkipListElement, foundElem *SkipListElement, ok bool, err error) {
+
+ foundElem = nil
+ ok = false
+
+ if t.IsEmpty() {
+ return
+ }
+
+ index := t.findEntryIndex(key, 0)
+ var currentNode *SkipListElement
+
+ currentNode, err = t.loadElement(t.startLevels[index])
+ if err != nil {
+ return
+ }
+ if currentNode == nil {
+ return
+ }
+
+ // In case, that our first element is already greater-or-equal!
+ if findGreaterOrEqual && compareElement(currentNode, key) > 0 {
+ foundElem = currentNode
+ ok = true
+ return
+ }
+
+ for {
+ if compareElement(currentNode, key) == 0 {
+ foundElem = currentNode
+ ok = true
+ return
+ }
+
+ // Which direction are we continuing next time?
+ if currentNode.Next[index] != nil && bytes.Compare(currentNode.Next[index].Key, key) <= 0 {
+ // Go right
+ currentNode, err = t.loadElement(currentNode.Next[index])
+ if err != nil {
+ return
+ }
+ if currentNode == nil {
+ return
+ }
+ } else {
+ if index > 0 {
+
+ // Early exit
+ if currentNode.Next[0] != nil && bytes.Compare(currentNode.Next[0].Key, key) == 0 {
+ prevElementIfVisited = currentNode
+ var currentNodeNext *SkipListElement
+ currentNodeNext, err = t.loadElement(currentNode.Next[0])
+ if err != nil {
+ return
+ }
+ if currentNodeNext == nil {
+ return
+ }
+ foundElem = currentNodeNext
+ ok = true
+ return
+ }
+ // Go down
+ index--
+ } else {
+ // Element is not found and we reached the bottom.
+ if findGreaterOrEqual {
+ foundElem, err = t.loadElement(currentNode.Next[index])
+ if err != nil {
+ return
+ }
+ ok = foundElem != nil
+ }
+
+ return
+ }
+ }
+ }
+}
+
+// Find tries to find an element in the skiplist based on the key from the given ListElement.
+// elem can be used, if ok is true.
+// Find runs in approx. O(log(n))
+func (t *SkipList) Find(key []byte) (prevIfVisited *SkipListElement, elem *SkipListElement, ok bool, err error) {
+
+ if t == nil || key == nil {
+ return
+ }
+
+ prevIfVisited, elem, ok, err = t.findExtended(key, false)
+ return
+}
+
+// FindGreaterOrEqual finds the first element, that is greater or equal to the given ListElement e.
+// The comparison is done on the keys (So on ExtractKey()).
+// FindGreaterOrEqual runs in approx. O(log(n))
+func (t *SkipList) FindGreaterOrEqual(key []byte) (prevIfVisited *SkipListElement, elem *SkipListElement, ok bool, err error) {
+
+ if t == nil || key == nil {
+ return
+ }
+
+ prevIfVisited, elem, ok, err = t.findExtended(key, true)
+ return
+}
+
+// Delete removes an element equal to e from the skiplist, if there is one.
+// If there are multiple entries with the same value, Delete will remove one of them
+// (Which one will change based on the actual skiplist layout)
+// Delete runs in approx. O(log(n))
+func (t *SkipList) Delete(key []byte) (err error) {
+
+ if t == nil || t.IsEmpty() || key == nil {
+ return
+ }
+
+ index := t.findEntryIndex(key, t.maxLevel)
+
+ var currentNode *SkipListElement
+ var nextNode *SkipListElement
+
+ for {
+
+ if currentNode == nil {
+ nextNode, err = t.loadElement(t.startLevels[index])
+ } else {
+ nextNode, err = t.loadElement(currentNode.Next[index])
+ }
+ if err != nil {
+ return err
+ }
+
+ // Found and remove!
+ if nextNode != nil && compareElement(nextNode, key) == 0 {
+
+ if currentNode != nil {
+ currentNode.Next[index] = nextNode.Next[index]
+ if err = t.saveElement(currentNode); err != nil {
+ return err
+ }
+ }
+
+ if index == 0 {
+ if nextNode.Next[index] != nil {
+ nextNextNode, err := t.loadElement(nextNode.Next[index])
+ if err != nil {
+ return err
+ }
+ if nextNextNode != nil {
+ nextNextNode.Prev = currentNode.Reference()
+ if err = t.saveElement(nextNextNode); err != nil {
+ return err
+ }
+ }
+ }
+ // t.elementCount--
+ if err = t.deleteElement(nextNode); err != nil {
+ return err
+ }
+ }
+
+ // Link from start needs readjustments.
+ startNextKey := t.startLevels[index].Key
+ if compareElement(nextNode, startNextKey) == 0 {
+ t.hasChanges = true
+ t.startLevels[index] = nextNode.Next[index]
+ // This was our currently highest node!
+ if t.startLevels[index] == nil {
+ t.maxLevel = index - 1
+ }
+ }
+
+ // Link from end needs readjustments.
+ if nextNode.Next[index] == nil {
+ t.endLevels[index] = currentNode.Reference()
+ t.hasChanges = true
+ }
+ nextNode.Next[index] = nil
+ }
+
+ if nextNode != nil && compareElement(nextNode, key) < 0 {
+ // Go right
+ currentNode = nextNode
+ } else {
+ // Go down
+ index--
+ if index < 0 {
+ break
+ }
+ }
+ }
+ return
+}
+
+// Insert inserts the given ListElement into the skiplist.
+// Insert runs in approx. O(log(n))
+func (t *SkipList) Insert(key, value []byte) (err error) {
+
+ if t == nil || key == nil {
+ return
+ }
+
+ level := t.generateLevel(t.maxNewLevel)
+
+ // Only grow the height of the skiplist by one at a time!
+ if level > t.maxLevel {
+ level = t.maxLevel + 1
+ t.maxLevel = level
+ t.hasChanges = true
+ }
+
+ elem := &SkipListElement{
+ Id: rand.Int63(),
+ Next: make([]*SkipListElementReference, t.maxNewLevel, t.maxNewLevel),
+ Level: int32(level),
+ Key: key,
+ Value: value,
+ }
+
+ // t.elementCount++
+
+ newFirst := true
+ newLast := true
+ if !t.IsEmpty() {
+ newFirst = compareElement(elem, t.startLevels[0].Key) < 0
+ newLast = compareElement(elem, t.endLevels[0].Key) > 0
+ }
+
+ normallyInserted := false
+ if !newFirst && !newLast {
+
+ normallyInserted = true
+
+ index := t.findEntryIndex(key, level)
+
+ var currentNode *SkipListElement
+ var nextNodeRef *SkipListElementReference
+
+ for {
+
+ if currentNode == nil {
+ nextNodeRef = t.startLevels[index]
+ } else {
+ nextNodeRef = currentNode.Next[index]
+ }
+
+ var nextNode *SkipListElement
+
+ // Connect node to next
+ if index <= level && (nextNodeRef == nil || bytes.Compare(nextNodeRef.Key, key) > 0) {
+ elem.Next[index] = nextNodeRef
+ if currentNode != nil {
+ currentNode.Next[index] = elem.Reference()
+ if err = t.saveElement(currentNode); err != nil {
+ return
+ }
+ }
+ if index == 0 {
+ elem.Prev = currentNode.Reference()
+ if nextNodeRef != nil {
+ if nextNode, err = t.loadElement(nextNodeRef); err != nil {
+ return
+ }
+ if nextNode != nil {
+ nextNode.Prev = elem.Reference()
+ if err = t.saveElement(nextNode); err != nil {
+ return
+ }
+ }
+ }
+ }
+ }
+
+ if nextNodeRef != nil && bytes.Compare(nextNodeRef.Key, key) <= 0 {
+ // Go right
+ if nextNode == nil {
+ // reuse nextNode when index == 0
+ if nextNode, err = t.loadElement(nextNodeRef); err != nil {
+ return
+ }
+ }
+ currentNode = nextNode
+ if currentNode == nil {
+ return
+ }
+ } else {
+ // Go down
+ index--
+ if index < 0 {
+ break
+ }
+ }
+ }
+ }
+
+ // Where we have a left-most position that needs to be referenced!
+ for i := level; i >= 0; i-- {
+
+ didSomething := false
+
+ if newFirst || normallyInserted {
+
+ if t.startLevels[i] == nil || bytes.Compare(t.startLevels[i].Key, key) > 0 {
+ if i == 0 && t.startLevels[i] != nil {
+ startLevelElement, err := t.loadElement(t.startLevels[i])
+ if err != nil {
+ return err
+ }
+ if startLevelElement != nil {
+ startLevelElement.Prev = elem.Reference()
+ if err = t.saveElement(startLevelElement); err != nil {
+ return err
+ }
+ }
+ }
+ elem.Next[i] = t.startLevels[i]
+ t.startLevels[i] = elem.Reference()
+ t.hasChanges = true
+ }
+
+ // link the endLevels to this element!
+ if elem.Next[i] == nil {
+ t.endLevels[i] = elem.Reference()
+ t.hasChanges = true
+ }
+
+ didSomething = true
+ }
+
+ if newLast {
+ // Places the element after the very last element on this level!
+ // This is very important, so we are not linking the very first element (newFirst AND newLast) to itself!
+ if !newFirst {
+ if t.endLevels[i] != nil {
+ endLevelElement, err := t.loadElement(t.endLevels[i])
+ if err != nil {
+ return err
+ }
+ if endLevelElement != nil {
+ endLevelElement.Next[i] = elem.Reference()
+ if err = t.saveElement(endLevelElement); err != nil {
+ return err
+ }
+ }
+ }
+ if i == 0 {
+ elem.Prev = t.endLevels[i]
+ }
+ t.endLevels[i] = elem.Reference()
+ t.hasChanges = true
+ }
+
+ // Link the startLevels to this element!
+ if t.startLevels[i] == nil || bytes.Compare(t.startLevels[i].Key, key) > 0 {
+ t.startLevels[i] = elem.Reference()
+ t.hasChanges = true
+ }
+
+ didSomething = true
+ }
+
+ if !didSomething {
+ break
+ }
+ }
+
+ if err = t.saveElement(elem); err != nil {
+ return err
+ }
+ return nil
+
+}
+
+// GetSmallestNode returns the very first/smallest node in the skiplist.
+// GetSmallestNode runs in O(1)
+func (t *SkipList) GetSmallestNode() (*SkipListElement, error) {
+ return t.loadElement(t.startLevels[0])
+}
+
+// GetLargestNode returns the very last/largest node in the skiplist.
+// GetLargestNode runs in O(1)
+func (t *SkipList) GetLargestNode() (*SkipListElement, error) {
+ return t.loadElement(t.endLevels[0])
+}
+
+// Next returns the next element based on the given node.
+// Next will loop around to the first node, if you call it on the last!
+func (t *SkipList) Next(e *SkipListElement) (*SkipListElement, error) {
+ if e.Next[0] == nil {
+ return t.loadElement(t.startLevels[0])
+ }
+ return t.loadElement(e.Next[0])
+}
+
+// Prev returns the previous element based on the given node.
+// Prev will loop around to the last node, if you call it on the first!
+func (t *SkipList) Prev(e *SkipListElement) (*SkipListElement, error) {
+ if e.Prev == nil {
+ return t.loadElement(t.endLevels[0])
+ }
+ return t.loadElement(e.Prev)
+}
+
+// ChangeValue can be used to change the actual value of a node in the skiplist
+// without the need of Deleting and reinserting the node again.
+// Be advised, that ChangeValue only works, if the actual key from ExtractKey() will stay the same!
+// ok is an indicator, wether the value is actually changed.
+func (t *SkipList) ChangeValue(e *SkipListElement, newValue []byte) (err error) {
+ // The key needs to stay correct, so this is very important!
+ e.Value = newValue
+ return t.saveElement(e)
+}
+
+// String returns a string format of the skiplist. Useful to get a graphical overview and/or debugging.
+func (t *SkipList) println() {
+
+ print("start --> ")
+ for i, l := range t.startLevels {
+ if l == nil {
+ break
+ }
+ if i > 0 {
+ print(" -> ")
+ }
+ next := "---"
+ if l != nil {
+ next = string(l.Key)
+ }
+ print(fmt.Sprintf("[%v]", next))
+ }
+ println()
+
+ nodeRef := t.startLevels[0]
+ for nodeRef != nil {
+ print(fmt.Sprintf("%v: ", string(nodeRef.Key)))
+ node, _ := t.loadElement(nodeRef)
+ if node == nil {
+ break
+ }
+ for i := 0; i <= int(node.Level); i++ {
+
+ l := node.Next[i]
+
+ next := "---"
+ if l != nil {
+ next = string(l.Key)
+ }
+
+ if i == 0 {
+ prev := "---"
+
+ if node.Prev != nil {
+ prev = string(node.Prev.Key)
+ }
+ print(fmt.Sprintf("[%v|%v]", prev, next))
+ } else {
+ print(fmt.Sprintf("[%v]", next))
+ }
+ if i < int(node.Level) {
+ print(" -> ")
+ }
+
+ }
+ nodeRef = node.Next[0]
+ println()
+ }
+
+ print("end --> ")
+ for i, l := range t.endLevels {
+ if l == nil {
+ break
+ }
+ if i > 0 {
+ print(" -> ")
+ }
+ next := "---"
+ if l != nil {
+ next = string(l.Key)
+ }
+ print(fmt.Sprintf("[%v]", next))
+ }
+ println()
+}
diff --git a/weed/util/skiplist/skiplist.pb.go b/weed/util/skiplist/skiplist.pb.go
new file mode 100644
index 000000000..adb121bfc
--- /dev/null
+++ b/weed/util/skiplist/skiplist.pb.go
@@ -0,0 +1,438 @@
+// Code generated by protoc-gen-go. DO NOT EDIT.
+// versions:
+// protoc-gen-go v1.25.0
+// protoc v3.12.3
+// source: skiplist.proto
+
+package skiplist
+
+import (
+ proto "github.com/golang/protobuf/proto"
+ protoreflect "google.golang.org/protobuf/reflect/protoreflect"
+ protoimpl "google.golang.org/protobuf/runtime/protoimpl"
+ reflect "reflect"
+ sync "sync"
+)
+
+const (
+ // Verify that this generated code is sufficiently up-to-date.
+ _ = protoimpl.EnforceVersion(20 - protoimpl.MinVersion)
+ // Verify that runtime/protoimpl is sufficiently up-to-date.
+ _ = protoimpl.EnforceVersion(protoimpl.MaxVersion - 20)
+)
+
+// This is a compile-time assertion that a sufficiently up-to-date version
+// of the legacy proto package is being used.
+const _ = proto.ProtoPackageIsVersion4
+
+type SkipListProto struct {
+ state protoimpl.MessageState
+ sizeCache protoimpl.SizeCache
+ unknownFields protoimpl.UnknownFields
+
+ StartLevels []*SkipListElementReference `protobuf:"bytes,1,rep,name=start_levels,json=startLevels,proto3" json:"start_levels,omitempty"`
+ EndLevels []*SkipListElementReference `protobuf:"bytes,2,rep,name=end_levels,json=endLevels,proto3" json:"end_levels,omitempty"`
+ MaxNewLevel int32 `protobuf:"varint,3,opt,name=max_new_level,json=maxNewLevel,proto3" json:"max_new_level,omitempty"`
+ MaxLevel int32 `protobuf:"varint,4,opt,name=max_level,json=maxLevel,proto3" json:"max_level,omitempty"`
+}
+
+func (x *SkipListProto) Reset() {
+ *x = SkipListProto{}
+ if protoimpl.UnsafeEnabled {
+ mi := &file_skiplist_proto_msgTypes[0]
+ ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
+ ms.StoreMessageInfo(mi)
+ }
+}
+
+func (x *SkipListProto) String() string {
+ return protoimpl.X.MessageStringOf(x)
+}
+
+func (*SkipListProto) ProtoMessage() {}
+
+func (x *SkipListProto) ProtoReflect() protoreflect.Message {
+ mi := &file_skiplist_proto_msgTypes[0]
+ if protoimpl.UnsafeEnabled && x != nil {
+ ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
+ if ms.LoadMessageInfo() == nil {
+ ms.StoreMessageInfo(mi)
+ }
+ return ms
+ }
+ return mi.MessageOf(x)
+}
+
+// Deprecated: Use SkipListProto.ProtoReflect.Descriptor instead.
+func (*SkipListProto) Descriptor() ([]byte, []int) {
+ return file_skiplist_proto_rawDescGZIP(), []int{0}
+}
+
+func (x *SkipListProto) GetStartLevels() []*SkipListElementReference {
+ if x != nil {
+ return x.StartLevels
+ }
+ return nil
+}
+
+func (x *SkipListProto) GetEndLevels() []*SkipListElementReference {
+ if x != nil {
+ return x.EndLevels
+ }
+ return nil
+}
+
+func (x *SkipListProto) GetMaxNewLevel() int32 {
+ if x != nil {
+ return x.MaxNewLevel
+ }
+ return 0
+}
+
+func (x *SkipListProto) GetMaxLevel() int32 {
+ if x != nil {
+ return x.MaxLevel
+ }
+ return 0
+}
+
+type SkipListElementReference struct {
+ state protoimpl.MessageState
+ sizeCache protoimpl.SizeCache
+ unknownFields protoimpl.UnknownFields
+
+ ElementPointer int64 `protobuf:"varint,1,opt,name=element_pointer,json=elementPointer,proto3" json:"element_pointer,omitempty"`
+ Key []byte `protobuf:"bytes,2,opt,name=key,proto3" json:"key,omitempty"`
+}
+
+func (x *SkipListElementReference) Reset() {
+ *x = SkipListElementReference{}
+ if protoimpl.UnsafeEnabled {
+ mi := &file_skiplist_proto_msgTypes[1]
+ ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
+ ms.StoreMessageInfo(mi)
+ }
+}
+
+func (x *SkipListElementReference) String() string {
+ return protoimpl.X.MessageStringOf(x)
+}
+
+func (*SkipListElementReference) ProtoMessage() {}
+
+func (x *SkipListElementReference) ProtoReflect() protoreflect.Message {
+ mi := &file_skiplist_proto_msgTypes[1]
+ if protoimpl.UnsafeEnabled && x != nil {
+ ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
+ if ms.LoadMessageInfo() == nil {
+ ms.StoreMessageInfo(mi)
+ }
+ return ms
+ }
+ return mi.MessageOf(x)
+}
+
+// Deprecated: Use SkipListElementReference.ProtoReflect.Descriptor instead.
+func (*SkipListElementReference) Descriptor() ([]byte, []int) {
+ return file_skiplist_proto_rawDescGZIP(), []int{1}
+}
+
+func (x *SkipListElementReference) GetElementPointer() int64 {
+ if x != nil {
+ return x.ElementPointer
+ }
+ return 0
+}
+
+func (x *SkipListElementReference) GetKey() []byte {
+ if x != nil {
+ return x.Key
+ }
+ return nil
+}
+
+type SkipListElement struct {
+ state protoimpl.MessageState
+ sizeCache protoimpl.SizeCache
+ unknownFields protoimpl.UnknownFields
+
+ Id int64 `protobuf:"varint,1,opt,name=id,proto3" json:"id,omitempty"`
+ Next []*SkipListElementReference `protobuf:"bytes,2,rep,name=next,proto3" json:"next,omitempty"`
+ Level int32 `protobuf:"varint,3,opt,name=level,proto3" json:"level,omitempty"`
+ Key []byte `protobuf:"bytes,4,opt,name=key,proto3" json:"key,omitempty"`
+ Value []byte `protobuf:"bytes,5,opt,name=value,proto3" json:"value,omitempty"`
+ Prev *SkipListElementReference `protobuf:"bytes,6,opt,name=prev,proto3" json:"prev,omitempty"`
+}
+
+func (x *SkipListElement) Reset() {
+ *x = SkipListElement{}
+ if protoimpl.UnsafeEnabled {
+ mi := &file_skiplist_proto_msgTypes[2]
+ ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
+ ms.StoreMessageInfo(mi)
+ }
+}
+
+func (x *SkipListElement) String() string {
+ return protoimpl.X.MessageStringOf(x)
+}
+
+func (*SkipListElement) ProtoMessage() {}
+
+func (x *SkipListElement) ProtoReflect() protoreflect.Message {
+ mi := &file_skiplist_proto_msgTypes[2]
+ if protoimpl.UnsafeEnabled && x != nil {
+ ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
+ if ms.LoadMessageInfo() == nil {
+ ms.StoreMessageInfo(mi)
+ }
+ return ms
+ }
+ return mi.MessageOf(x)
+}
+
+// Deprecated: Use SkipListElement.ProtoReflect.Descriptor instead.
+func (*SkipListElement) Descriptor() ([]byte, []int) {
+ return file_skiplist_proto_rawDescGZIP(), []int{2}
+}
+
+func (x *SkipListElement) GetId() int64 {
+ if x != nil {
+ return x.Id
+ }
+ return 0
+}
+
+func (x *SkipListElement) GetNext() []*SkipListElementReference {
+ if x != nil {
+ return x.Next
+ }
+ return nil
+}
+
+func (x *SkipListElement) GetLevel() int32 {
+ if x != nil {
+ return x.Level
+ }
+ return 0
+}
+
+func (x *SkipListElement) GetKey() []byte {
+ if x != nil {
+ return x.Key
+ }
+ return nil
+}
+
+func (x *SkipListElement) GetValue() []byte {
+ if x != nil {
+ return x.Value
+ }
+ return nil
+}
+
+func (x *SkipListElement) GetPrev() *SkipListElementReference {
+ if x != nil {
+ return x.Prev
+ }
+ return nil
+}
+
+type NameBatchData struct {
+ state protoimpl.MessageState
+ sizeCache protoimpl.SizeCache
+ unknownFields protoimpl.UnknownFields
+
+ Names [][]byte `protobuf:"bytes,1,rep,name=names,proto3" json:"names,omitempty"`
+}
+
+func (x *NameBatchData) Reset() {
+ *x = NameBatchData{}
+ if protoimpl.UnsafeEnabled {
+ mi := &file_skiplist_proto_msgTypes[3]
+ ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
+ ms.StoreMessageInfo(mi)
+ }
+}
+
+func (x *NameBatchData) String() string {
+ return protoimpl.X.MessageStringOf(x)
+}
+
+func (*NameBatchData) ProtoMessage() {}
+
+func (x *NameBatchData) ProtoReflect() protoreflect.Message {
+ mi := &file_skiplist_proto_msgTypes[3]
+ if protoimpl.UnsafeEnabled && x != nil {
+ ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
+ if ms.LoadMessageInfo() == nil {
+ ms.StoreMessageInfo(mi)
+ }
+ return ms
+ }
+ return mi.MessageOf(x)
+}
+
+// Deprecated: Use NameBatchData.ProtoReflect.Descriptor instead.
+func (*NameBatchData) Descriptor() ([]byte, []int) {
+ return file_skiplist_proto_rawDescGZIP(), []int{3}
+}
+
+func (x *NameBatchData) GetNames() [][]byte {
+ if x != nil {
+ return x.Names
+ }
+ return nil
+}
+
+var File_skiplist_proto protoreflect.FileDescriptor
+
+var file_skiplist_proto_rawDesc = []byte{
+ 0x0a, 0x0e, 0x73, 0x6b, 0x69, 0x70, 0x6c, 0x69, 0x73, 0x74, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f,
+ 0x12, 0x08, 0x73, 0x6b, 0x69, 0x70, 0x6c, 0x69, 0x73, 0x74, 0x22, 0xda, 0x01, 0x0a, 0x0d, 0x53,
+ 0x6b, 0x69, 0x70, 0x4c, 0x69, 0x73, 0x74, 0x50, 0x72, 0x6f, 0x74, 0x6f, 0x12, 0x45, 0x0a, 0x0c,
+ 0x73, 0x74, 0x61, 0x72, 0x74, 0x5f, 0x6c, 0x65, 0x76, 0x65, 0x6c, 0x73, 0x18, 0x01, 0x20, 0x03,
+ 0x28, 0x0b, 0x32, 0x22, 0x2e, 0x73, 0x6b, 0x69, 0x70, 0x6c, 0x69, 0x73, 0x74, 0x2e, 0x53, 0x6b,
+ 0x69, 0x70, 0x4c, 0x69, 0x73, 0x74, 0x45, 0x6c, 0x65, 0x6d, 0x65, 0x6e, 0x74, 0x52, 0x65, 0x66,
+ 0x65, 0x72, 0x65, 0x6e, 0x63, 0x65, 0x52, 0x0b, 0x73, 0x74, 0x61, 0x72, 0x74, 0x4c, 0x65, 0x76,
+ 0x65, 0x6c, 0x73, 0x12, 0x41, 0x0a, 0x0a, 0x65, 0x6e, 0x64, 0x5f, 0x6c, 0x65, 0x76, 0x65, 0x6c,
+ 0x73, 0x18, 0x02, 0x20, 0x03, 0x28, 0x0b, 0x32, 0x22, 0x2e, 0x73, 0x6b, 0x69, 0x70, 0x6c, 0x69,
+ 0x73, 0x74, 0x2e, 0x53, 0x6b, 0x69, 0x70, 0x4c, 0x69, 0x73, 0x74, 0x45, 0x6c, 0x65, 0x6d, 0x65,
+ 0x6e, 0x74, 0x52, 0x65, 0x66, 0x65, 0x72, 0x65, 0x6e, 0x63, 0x65, 0x52, 0x09, 0x65, 0x6e, 0x64,
+ 0x4c, 0x65, 0x76, 0x65, 0x6c, 0x73, 0x12, 0x22, 0x0a, 0x0d, 0x6d, 0x61, 0x78, 0x5f, 0x6e, 0x65,
+ 0x77, 0x5f, 0x6c, 0x65, 0x76, 0x65, 0x6c, 0x18, 0x03, 0x20, 0x01, 0x28, 0x05, 0x52, 0x0b, 0x6d,
+ 0x61, 0x78, 0x4e, 0x65, 0x77, 0x4c, 0x65, 0x76, 0x65, 0x6c, 0x12, 0x1b, 0x0a, 0x09, 0x6d, 0x61,
+ 0x78, 0x5f, 0x6c, 0x65, 0x76, 0x65, 0x6c, 0x18, 0x04, 0x20, 0x01, 0x28, 0x05, 0x52, 0x08, 0x6d,
+ 0x61, 0x78, 0x4c, 0x65, 0x76, 0x65, 0x6c, 0x22, 0x55, 0x0a, 0x18, 0x53, 0x6b, 0x69, 0x70, 0x4c,
+ 0x69, 0x73, 0x74, 0x45, 0x6c, 0x65, 0x6d, 0x65, 0x6e, 0x74, 0x52, 0x65, 0x66, 0x65, 0x72, 0x65,
+ 0x6e, 0x63, 0x65, 0x12, 0x27, 0x0a, 0x0f, 0x65, 0x6c, 0x65, 0x6d, 0x65, 0x6e, 0x74, 0x5f, 0x70,
+ 0x6f, 0x69, 0x6e, 0x74, 0x65, 0x72, 0x18, 0x01, 0x20, 0x01, 0x28, 0x03, 0x52, 0x0e, 0x65, 0x6c,
+ 0x65, 0x6d, 0x65, 0x6e, 0x74, 0x50, 0x6f, 0x69, 0x6e, 0x74, 0x65, 0x72, 0x12, 0x10, 0x0a, 0x03,
+ 0x6b, 0x65, 0x79, 0x18, 0x02, 0x20, 0x01, 0x28, 0x0c, 0x52, 0x03, 0x6b, 0x65, 0x79, 0x22, 0xcf,
+ 0x01, 0x0a, 0x0f, 0x53, 0x6b, 0x69, 0x70, 0x4c, 0x69, 0x73, 0x74, 0x45, 0x6c, 0x65, 0x6d, 0x65,
+ 0x6e, 0x74, 0x12, 0x0e, 0x0a, 0x02, 0x69, 0x64, 0x18, 0x01, 0x20, 0x01, 0x28, 0x03, 0x52, 0x02,
+ 0x69, 0x64, 0x12, 0x36, 0x0a, 0x04, 0x6e, 0x65, 0x78, 0x74, 0x18, 0x02, 0x20, 0x03, 0x28, 0x0b,
+ 0x32, 0x22, 0x2e, 0x73, 0x6b, 0x69, 0x70, 0x6c, 0x69, 0x73, 0x74, 0x2e, 0x53, 0x6b, 0x69, 0x70,
+ 0x4c, 0x69, 0x73, 0x74, 0x45, 0x6c, 0x65, 0x6d, 0x65, 0x6e, 0x74, 0x52, 0x65, 0x66, 0x65, 0x72,
+ 0x65, 0x6e, 0x63, 0x65, 0x52, 0x04, 0x6e, 0x65, 0x78, 0x74, 0x12, 0x14, 0x0a, 0x05, 0x6c, 0x65,
+ 0x76, 0x65, 0x6c, 0x18, 0x03, 0x20, 0x01, 0x28, 0x05, 0x52, 0x05, 0x6c, 0x65, 0x76, 0x65, 0x6c,
+ 0x12, 0x10, 0x0a, 0x03, 0x6b, 0x65, 0x79, 0x18, 0x04, 0x20, 0x01, 0x28, 0x0c, 0x52, 0x03, 0x6b,
+ 0x65, 0x79, 0x12, 0x14, 0x0a, 0x05, 0x76, 0x61, 0x6c, 0x75, 0x65, 0x18, 0x05, 0x20, 0x01, 0x28,
+ 0x0c, 0x52, 0x05, 0x76, 0x61, 0x6c, 0x75, 0x65, 0x12, 0x36, 0x0a, 0x04, 0x70, 0x72, 0x65, 0x76,
+ 0x18, 0x06, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x22, 0x2e, 0x73, 0x6b, 0x69, 0x70, 0x6c, 0x69, 0x73,
+ 0x74, 0x2e, 0x53, 0x6b, 0x69, 0x70, 0x4c, 0x69, 0x73, 0x74, 0x45, 0x6c, 0x65, 0x6d, 0x65, 0x6e,
+ 0x74, 0x52, 0x65, 0x66, 0x65, 0x72, 0x65, 0x6e, 0x63, 0x65, 0x52, 0x04, 0x70, 0x72, 0x65, 0x76,
+ 0x22, 0x25, 0x0a, 0x0d, 0x4e, 0x61, 0x6d, 0x65, 0x42, 0x61, 0x74, 0x63, 0x68, 0x44, 0x61, 0x74,
+ 0x61, 0x12, 0x14, 0x0a, 0x05, 0x6e, 0x61, 0x6d, 0x65, 0x73, 0x18, 0x01, 0x20, 0x03, 0x28, 0x0c,
+ 0x52, 0x05, 0x6e, 0x61, 0x6d, 0x65, 0x73, 0x42, 0x33, 0x5a, 0x31, 0x67, 0x69, 0x74, 0x68, 0x75,
+ 0x62, 0x2e, 0x63, 0x6f, 0x6d, 0x2f, 0x63, 0x68, 0x72, 0x69, 0x73, 0x6c, 0x75, 0x73, 0x66, 0x2f,
+ 0x73, 0x65, 0x61, 0x77, 0x65, 0x65, 0x64, 0x66, 0x73, 0x2f, 0x77, 0x65, 0x65, 0x64, 0x2f, 0x75,
+ 0x74, 0x69, 0x6c, 0x2f, 0x73, 0x6b, 0x69, 0x70, 0x6c, 0x69, 0x73, 0x74, 0x62, 0x06, 0x70, 0x72,
+ 0x6f, 0x74, 0x6f, 0x33,
+}
+
+var (
+ file_skiplist_proto_rawDescOnce sync.Once
+ file_skiplist_proto_rawDescData = file_skiplist_proto_rawDesc
+)
+
+func file_skiplist_proto_rawDescGZIP() []byte {
+ file_skiplist_proto_rawDescOnce.Do(func() {
+ file_skiplist_proto_rawDescData = protoimpl.X.CompressGZIP(file_skiplist_proto_rawDescData)
+ })
+ return file_skiplist_proto_rawDescData
+}
+
+var file_skiplist_proto_msgTypes = make([]protoimpl.MessageInfo, 4)
+var file_skiplist_proto_goTypes = []interface{}{
+ (*SkipListProto)(nil), // 0: skiplist.SkipListProto
+ (*SkipListElementReference)(nil), // 1: skiplist.SkipListElementReference
+ (*SkipListElement)(nil), // 2: skiplist.SkipListElement
+ (*NameBatchData)(nil), // 3: skiplist.NameBatchData
+}
+var file_skiplist_proto_depIdxs = []int32{
+ 1, // 0: skiplist.SkipListProto.start_levels:type_name -> skiplist.SkipListElementReference
+ 1, // 1: skiplist.SkipListProto.end_levels:type_name -> skiplist.SkipListElementReference
+ 1, // 2: skiplist.SkipListElement.next:type_name -> skiplist.SkipListElementReference
+ 1, // 3: skiplist.SkipListElement.prev:type_name -> skiplist.SkipListElementReference
+ 4, // [4:4] is the sub-list for method output_type
+ 4, // [4:4] is the sub-list for method input_type
+ 4, // [4:4] is the sub-list for extension type_name
+ 4, // [4:4] is the sub-list for extension extendee
+ 0, // [0:4] is the sub-list for field type_name
+}
+
+func init() { file_skiplist_proto_init() }
+func file_skiplist_proto_init() {
+ if File_skiplist_proto != nil {
+ return
+ }
+ if !protoimpl.UnsafeEnabled {
+ file_skiplist_proto_msgTypes[0].Exporter = func(v interface{}, i int) interface{} {
+ switch v := v.(*SkipListProto); i {
+ case 0:
+ return &v.state
+ case 1:
+ return &v.sizeCache
+ case 2:
+ return &v.unknownFields
+ default:
+ return nil
+ }
+ }
+ file_skiplist_proto_msgTypes[1].Exporter = func(v interface{}, i int) interface{} {
+ switch v := v.(*SkipListElementReference); i {
+ case 0:
+ return &v.state
+ case 1:
+ return &v.sizeCache
+ case 2:
+ return &v.unknownFields
+ default:
+ return nil
+ }
+ }
+ file_skiplist_proto_msgTypes[2].Exporter = func(v interface{}, i int) interface{} {
+ switch v := v.(*SkipListElement); i {
+ case 0:
+ return &v.state
+ case 1:
+ return &v.sizeCache
+ case 2:
+ return &v.unknownFields
+ default:
+ return nil
+ }
+ }
+ file_skiplist_proto_msgTypes[3].Exporter = func(v interface{}, i int) interface{} {
+ switch v := v.(*NameBatchData); i {
+ case 0:
+ return &v.state
+ case 1:
+ return &v.sizeCache
+ case 2:
+ return &v.unknownFields
+ default:
+ return nil
+ }
+ }
+ }
+ type x struct{}
+ out := protoimpl.TypeBuilder{
+ File: protoimpl.DescBuilder{
+ GoPackagePath: reflect.TypeOf(x{}).PkgPath(),
+ RawDescriptor: file_skiplist_proto_rawDesc,
+ NumEnums: 0,
+ NumMessages: 4,
+ NumExtensions: 0,
+ NumServices: 0,
+ },
+ GoTypes: file_skiplist_proto_goTypes,
+ DependencyIndexes: file_skiplist_proto_depIdxs,
+ MessageInfos: file_skiplist_proto_msgTypes,
+ }.Build()
+ File_skiplist_proto = out.File
+ file_skiplist_proto_rawDesc = nil
+ file_skiplist_proto_goTypes = nil
+ file_skiplist_proto_depIdxs = nil
+}
diff --git a/weed/util/skiplist/skiplist.proto b/weed/util/skiplist/skiplist.proto
new file mode 100644
index 000000000..2991ad830
--- /dev/null
+++ b/weed/util/skiplist/skiplist.proto
@@ -0,0 +1,30 @@
+syntax = "proto3";
+
+package skiplist;
+
+option go_package = "github.com/chrislusf/seaweedfs/weed/util/skiplist";
+
+message SkipListProto {
+ repeated SkipListElementReference start_levels = 1;
+ repeated SkipListElementReference end_levels = 2;
+ int32 max_new_level = 3;
+ int32 max_level = 4;
+}
+
+message SkipListElementReference {
+ int64 element_pointer = 1;
+ bytes key = 2;
+}
+
+message SkipListElement {
+ int64 id = 1;
+ repeated SkipListElementReference next = 2;
+ int32 level = 3;
+ bytes key = 4;
+ bytes value = 5;
+ SkipListElementReference prev = 6;
+}
+
+message NameBatchData {
+ repeated bytes names = 1;
+} \ No newline at end of file
diff --git a/weed/util/skiplist/skiplist_serde.go b/weed/util/skiplist/skiplist_serde.go
new file mode 100644
index 000000000..e528b8a3d
--- /dev/null
+++ b/weed/util/skiplist/skiplist_serde.go
@@ -0,0 +1,51 @@
+package skiplist
+
+import "bytes"
+
+func compareElement(a *SkipListElement, key []byte) int {
+ if len(a.Key) == 0 {
+ return -1
+ }
+ return bytes.Compare(a.Key, key)
+}
+
+func (node *SkipListElement) Reference() *SkipListElementReference {
+ if node == nil {
+ return nil
+ }
+ return &SkipListElementReference{
+ ElementPointer: node.Id,
+ Key: node.Key,
+ }
+}
+
+func (t *SkipList) saveElement(element *SkipListElement) error {
+ if element == nil {
+ return nil
+ }
+ return t.listStore.SaveElement(element.Id, element)
+}
+
+func (t *SkipList) deleteElement(element *SkipListElement) error {
+ if element == nil {
+ return nil
+ }
+ return t.listStore.DeleteElement(element.Id)
+}
+
+func (t *SkipList) loadElement(ref *SkipListElementReference) (*SkipListElement, error) {
+ if ref.IsNil() {
+ return nil, nil
+ }
+ return t.listStore.LoadElement(ref.ElementPointer)
+}
+
+func (ref *SkipListElementReference) IsNil() bool {
+ if ref == nil {
+ return true
+ }
+ if len(ref.Key) == 0 {
+ return true
+ }
+ return false
+} \ No newline at end of file
diff --git a/weed/util/skiplist/skiplist_test.go b/weed/util/skiplist/skiplist_test.go
new file mode 100644
index 000000000..a35bef6f3
--- /dev/null
+++ b/weed/util/skiplist/skiplist_test.go
@@ -0,0 +1,295 @@
+package skiplist
+
+import (
+ "bytes"
+ "fmt"
+ "math/rand"
+ "strconv"
+ "testing"
+)
+
+const (
+ maxN = 10000
+)
+
+var (
+ memStore = newMemStore()
+)
+
+func TestReverseInsert(t *testing.T) {
+ list := NewSeed(100, memStore)
+
+ list.Insert([]byte("zzz"), []byte("zzz"))
+ list.Delete([]byte("zzz"))
+
+ list.Insert([]byte("aaa"), []byte("aaa"))
+
+ if list.IsEmpty() {
+ t.Fail()
+ }
+
+}
+
+
+func TestInsertAndFind(t *testing.T) {
+
+ k0 := []byte("0")
+ var list *SkipList
+
+ var listPointer *SkipList
+ listPointer.Insert(k0, k0)
+ if _, _, ok, _ := listPointer.Find(k0); ok {
+ t.Fail()
+ }
+
+ list = New(memStore)
+ if _, _, ok, _ := list.Find(k0); ok {
+ t.Fail()
+ }
+ if !list.IsEmpty() {
+ t.Fail()
+ }
+
+ // Test at the beginning of the list.
+ for i := 0; i < maxN; i++ {
+ key := []byte(strconv.Itoa(maxN - i))
+ list.Insert(key, key)
+ }
+ for i := 0; i < maxN; i++ {
+ key := []byte(strconv.Itoa(maxN - i))
+ if _, _, ok, _ := list.Find(key); !ok {
+ t.Fail()
+ }
+ }
+
+ list = New(memStore)
+ // Test at the end of the list.
+ for i := 0; i < maxN; i++ {
+ key := []byte(strconv.Itoa(i))
+ list.Insert(key, key)
+ }
+ for i := 0; i < maxN; i++ {
+ key := []byte(strconv.Itoa(i))
+ if _, _, ok, _ := list.Find(key); !ok {
+ t.Fail()
+ }
+ }
+
+ list = New(memStore)
+ // Test at random positions in the list.
+ rList := rand.Perm(maxN)
+ for _, e := range rList {
+ key := []byte(strconv.Itoa(e))
+ // println("insert", e)
+ list.Insert(key, key)
+ }
+ for _, e := range rList {
+ key := []byte(strconv.Itoa(e))
+ // println("find", e)
+ if _, _, ok, _ := list.Find(key); !ok {
+ t.Fail()
+ }
+ }
+ // println("print list")
+ list.println()
+
+}
+
+func Element(x int) []byte {
+ return []byte(strconv.Itoa(x))
+}
+
+func TestDelete(t *testing.T) {
+
+ k0 := []byte("0")
+
+ var list *SkipList
+
+ // Delete on empty list
+ list.Delete(k0)
+
+ list = New(memStore)
+
+ list.Delete(k0)
+ if !list.IsEmpty() {
+ t.Fail()
+ }
+
+ list.Insert(k0, k0)
+ list.Delete(k0)
+ if !list.IsEmpty() {
+ t.Fail()
+ }
+
+ // Delete elements at the beginning of the list.
+ for i := 0; i < maxN; i++ {
+ list.Insert(Element(i), Element(i))
+ }
+ for i := 0; i < maxN; i++ {
+ list.Delete(Element(i))
+ }
+ if !list.IsEmpty() {
+ t.Fail()
+ }
+
+ list = New(memStore)
+ // Delete elements at the end of the list.
+ for i := 0; i < maxN; i++ {
+ list.Insert(Element(i), Element(i))
+ }
+ for i := 0; i < maxN; i++ {
+ list.Delete(Element(maxN - i - 1))
+ }
+ if !list.IsEmpty() {
+ t.Fail()
+ }
+
+ list = New(memStore)
+ // Delete elements at random positions in the list.
+ rList := rand.Perm(maxN)
+ for _, e := range rList {
+ list.Insert(Element(e), Element(e))
+ }
+ for _, e := range rList {
+ list.Delete(Element(e))
+ }
+ if !list.IsEmpty() {
+ t.Fail()
+ }
+}
+
+func TestNext(t *testing.T) {
+ list := New(memStore)
+
+ for i := 0; i < maxN; i++ {
+ list.Insert(Element(i), Element(i))
+ }
+
+ smallest, _ := list.GetSmallestNode()
+ largest, _ := list.GetLargestNode()
+
+ lastNode := smallest
+ node := lastNode
+ for node != largest {
+ node, _ = list.Next(node)
+ // Must always be incrementing here!
+ if bytes.Compare(node.Key, lastNode.Key) <= 0 {
+ t.Fail()
+ }
+ // Next.Prev must always point to itself!
+ prevNode, _ := list.Prev(node)
+ nextNode, _ := list.Next(prevNode)
+ if nextNode != node {
+ t.Fail()
+ }
+ lastNode = node
+ }
+
+ if nextNode, _ := list.Next(largest); nextNode != smallest {
+ t.Fail()
+ }
+}
+
+func TestPrev(t *testing.T) {
+ list := New(memStore)
+
+ for i := 0; i < maxN; i++ {
+ list.Insert(Element(i), Element(i))
+ }
+
+ smallest, _ := list.GetSmallestNode()
+ largest, _ := list.GetLargestNode()
+
+ lastNode := largest
+ node := lastNode
+ for node != smallest {
+ node, _ = list.Prev(node)
+ // Must always be incrementing here!
+ if bytes.Compare(node.Key, lastNode.Key) >= 0 {
+ t.Fail()
+ }
+ // Next.Prev must always point to itself!
+ nextNode, _ := list.Next(node)
+ prevNode, _ := list.Prev(nextNode)
+ if prevNode != node {
+ t.Fail()
+ }
+ lastNode = node
+ }
+
+ if prevNode, _ := list.Prev(smallest); prevNode != largest {
+ t.Fail()
+ }
+}
+
+func TestFindGreaterOrEqual(t *testing.T) {
+
+ maxNumber := maxN * 100
+
+ var list *SkipList
+ var listPointer *SkipList
+
+ // Test on empty list.
+ if _, _, ok, _ := listPointer.FindGreaterOrEqual(Element(0)); ok {
+ t.Fail()
+ }
+
+ list = New(memStore)
+
+ for i := 0; i < maxN; i++ {
+ list.Insert(Element(rand.Intn(maxNumber)), Element(i))
+ }
+
+ for i := 0; i < maxN; i++ {
+ key := Element(rand.Intn(maxNumber))
+ if _, v, ok, _ := list.FindGreaterOrEqual(key); ok {
+ // if f is v should be bigger than the element before
+ if v.Prev != nil && bytes.Compare(v.Prev.Key, key) >= 0 {
+ fmt.Printf("PrevV: %s\n key: %s\n\n", string(v.Prev.Key), string(key))
+ t.Fail()
+ }
+ // v should be bigger or equal to f
+ // If we compare directly, we get an equal key with a difference on the 10th decimal point, which fails.
+ if bytes.Compare(v.Key, key) < 0 {
+ fmt.Printf("v: %s\n key: %s\n\n", string(v.Key), string(key))
+ t.Fail()
+ }
+ } else {
+ lastNode, _ := list.GetLargestNode()
+ lastV := lastNode.GetValue()
+ // It is OK, to fail, as long as f is bigger than the last element.
+ if bytes.Compare(key, lastV) <= 0 {
+ fmt.Printf("lastV: %s\n key: %s\n\n", string(lastV), string(key))
+ t.Fail()
+ }
+ }
+ }
+
+}
+
+func TestChangeValue(t *testing.T) {
+ list := New(memStore)
+
+ for i := 0; i < maxN; i++ {
+ list.Insert(Element(i), []byte("value"))
+ }
+
+ for i := 0; i < maxN; i++ {
+ // The key only looks at the int so the string doesn't matter here!
+ _, f1, ok, _ := list.Find(Element(i))
+ if !ok {
+ t.Fail()
+ }
+ err := list.ChangeValue(f1, []byte("different value"))
+ if err != nil {
+ t.Fail()
+ }
+ _, f2, ok, _ := list.Find(Element(i))
+ if !ok {
+ t.Fail()
+ }
+ if bytes.Compare(f2.GetValue(), []byte("different value")) != 0 {
+ t.Fail()
+ }
+ }
+}