aboutsummaryrefslogtreecommitdiff
path: root/weed/filer/redis3
diff options
context:
space:
mode:
authorChris Lu <chris.lu@gmail.com>2021-10-06 18:18:24 -0700
committerChris Lu <chris.lu@gmail.com>2021-10-06 18:18:24 -0700
commit371fead8a5c7453ff5c49a01cfbe2bdcb46bb8b8 (patch)
tree5960108ec6a2448a7a3948caaae315b6a57d895d /weed/filer/redis3
parent8668d49c9d14b063d8c363e0abd3e0bf7e0120e8 (diff)
downloadseaweedfs-371fead8a5c7453ff5c49a01cfbe2bdcb46bb8b8.tar.xz
seaweedfs-371fead8a5c7453ff5c49a01cfbe2bdcb46bb8b8.zip
redis3 using redis native sorted set
Diffstat (limited to 'weed/filer/redis3')
-rw-r--r--weed/filer/redis3/ItemList.go483
-rw-r--r--weed/filer/redis3/item_list_serde.go75
-rw-r--r--weed/filer/redis3/kv_directory_children.go11
-rw-r--r--weed/filer/redis3/kv_directory_children_test.go78
-rw-r--r--weed/filer/redis3/skiplist_element_store.go10
5 files changed, 632 insertions, 25 deletions
diff --git a/weed/filer/redis3/ItemList.go b/weed/filer/redis3/ItemList.go
new file mode 100644
index 000000000..ae4e61cfb
--- /dev/null
+++ b/weed/filer/redis3/ItemList.go
@@ -0,0 +1,483 @@
+package redis3
+
+import (
+ "bytes"
+ "context"
+ "fmt"
+ "github.com/chrislusf/seaweedfs/weed/util/skiplist"
+ "github.com/go-redis/redis/v8"
+)
+
+type ItemList struct {
+ skipList *skiplist.SkipList
+ batchSize int
+ client redis.UniversalClient
+ prefix string
+}
+
+func newItemList(client redis.UniversalClient, prefix string, store skiplist.ListStore, batchSize int) *ItemList {
+ return &ItemList{
+ skipList: skiplist.New(store),
+ batchSize: batchSize,
+ client: client,
+ prefix: prefix,
+ }
+}
+
+/*
+Be reluctant to create new nodes. Try to fit into either previous node or next node.
+Prefer to add to previous node.
+
+There are multiple cases after finding the name for greater or equal node
+ 1. found and node.Key == name
+ The node contains a batch with leading key the same as the name
+ nothing to do
+ 2. no such node found or node.Key > name
+
+ if no such node found
+ prevNode = list.LargestNode
+
+ // case 2.1
+ if previousNode contains name
+ nothing to do
+
+ // prefer to add to previous node
+ if prevNode != nil {
+ // case 2.2
+ if prevNode has capacity
+ prevNode.add name, and save
+ return
+ // case 2.3
+ split prevNode by name
+ }
+
+ // case 2.4
+ // merge into next node. Avoid too many nodes if adding data in reverse order.
+ if nextNode is not nil and nextNode has capacity
+ delete nextNode.Key
+ nextNode.Key = name
+ nextNode.batch.add name
+ insert nodeNode.Key
+ return
+
+ // case 2.5
+ if prevNode is nil
+ insert new node with key = name, value = batch{name}
+ return
+
+*/
+func (nl *ItemList) WriteName(name string) error {
+
+ lookupKey := []byte(name)
+ prevNode, nextNode, found, err := nl.skipList.FindGreaterOrEqual(lookupKey)
+ if err != nil {
+ return err
+ }
+ // case 1: the name already exists as one leading key in the batch
+ if found && bytes.Compare(nextNode.Key, lookupKey) == 0 {
+ return nil
+ }
+
+ if !found {
+ prevNode, err = nl.skipList.GetLargestNode()
+ if err != nil {
+ return err
+ }
+ }
+
+ if nextNode != nil && prevNode == nil {
+ prevNode, err = nl.skipList.LoadElement(nextNode.Prev)
+ if err != nil {
+ return err
+ }
+ }
+
+ if prevNode != nil {
+ // case 2.1
+ if nl.NodeContainsItem(prevNode.Reference(), name) {
+ return nil
+ }
+
+ // case 2.2
+ nodeSize := nl.NodeSize(prevNode.Reference())
+ if nodeSize < nl.batchSize {
+ return nl.NodeAddMember(prevNode.Reference(), name)
+ }
+
+ // case 2.3
+ x := nl.NodeInnerPosition(prevNode.Reference(), name)
+ y := nodeSize - x
+ addToX := x <= y
+ // add to a new node
+ if x == 0 || y == 0 {
+ if err := nl.ItemAdd(lookupKey, 0, name); err != nil {
+ return err
+ }
+ return nil
+ }
+ if addToX {
+ // collect names before name, add them to X
+ namesToX, err := nl.NodeRangeBeforeExclusive(prevNode.Reference(), name)
+ if err != nil {
+ return nil
+ }
+ // delete skiplist reference to old node
+ if _, err := nl.skipList.DeleteByKey(prevNode.Key); err != nil {
+ return err
+ }
+ // add namesToY and name to a new X
+ namesToX = append(namesToX, name)
+ if err := nl.ItemAdd([]byte(namesToX[0]), 0, namesToX...); err != nil {
+ return nil
+ }
+ // remove names less than name from current Y
+ if err := nl.NodeDeleteBeforeExclusive(prevNode.Reference(), name); err != nil {
+ return nil
+ }
+
+ // point skip list to current Y
+ if err := nl.ItemAdd(lookupKey, prevNode.Id); err != nil {
+ return nil
+ }
+ return nil
+ } else {
+ // collect names after name, add them to Y
+ namesToY, err := nl.NodeRangeAfterExclusive(prevNode.Reference(), name)
+ if err != nil {
+ return nil
+ }
+ // add namesToY and name to a new Y
+ namesToY = append(namesToY, name)
+ if err := nl.ItemAdd(lookupKey, 0, namesToY...); err != nil {
+ return nil
+ }
+ // remove names after name from current X
+ if err := nl.NodeDeleteAfterExclusive(prevNode.Reference(), name); err != nil {
+ return nil
+ }
+ return nil
+ }
+
+ }
+
+ // case 2.4
+ if nextNode != nil {
+ nodeSize := nl.NodeSize(nextNode.Reference())
+ if nodeSize < nl.batchSize {
+ if id, err := nl.skipList.DeleteByKey(nextNode.Key); err != nil {
+ return err
+ } else {
+ if err := nl.ItemAdd(lookupKey, id, name); err != nil {
+ return err
+ }
+ }
+ return nil
+ }
+ }
+
+ // case 2.5
+ // now prevNode is nil
+ return nl.ItemAdd(lookupKey, 0, name)
+}
+
+/*
+// case 1: exists in nextNode
+if nextNode != nil && nextNode.Key == name {
+ remove from nextNode, update nextNode
+ // TODO: merge with prevNode if possible?
+ return
+}
+if nextNode is nil
+ prevNode = list.Largestnode
+if prevNode == nil and nextNode.Prev != nil
+ prevNode = load(nextNode.Prev)
+
+// case 2: does not exist
+// case 2.1
+if prevNode == nil {
+ return
+}
+// case 2.2
+if prevNameBatch does not contain name {
+ return
+}
+
+// case 3
+delete from prevNameBatch
+if prevNameBatch + nextNode < capacityList
+ // case 3.1
+ merge
+else
+ // case 3.2
+ update prevNode
+
+
+*/
+func (nl *ItemList) DeleteName(name string) error {
+ lookupKey := []byte(name)
+ prevNode, nextNode, found, err := nl.skipList.FindGreaterOrEqual(lookupKey)
+ if err != nil {
+ return err
+ }
+
+ // case 1
+ if found && bytes.Compare(nextNode.Key, lookupKey) == 0 {
+ if _, err := nl.skipList.DeleteByKey(nextNode.Key); err != nil {
+ return err
+ }
+ if err := nl.NodeDeleteMember(nextNode.Reference(), name); err != nil {
+ return err
+ }
+ minName := nl.NodeMin(nextNode.Reference())
+ if minName == "" {
+ return nl.NodeDelete(nextNode.Reference())
+ }
+ return nl.ItemAdd([]byte(minName), nextNode.Id)
+ }
+
+ if !found {
+ prevNode, err = nl.skipList.GetLargestNode()
+ if err != nil {
+ return err
+ }
+ }
+
+ if nextNode != nil && prevNode == nil {
+ prevNode, err = nl.skipList.LoadElement(nextNode.Prev)
+ if err != nil {
+ return err
+ }
+ }
+
+ // case 2
+ if prevNode == nil {
+ // case 2.1
+ return nil
+ }
+ if !nl.NodeContainsItem(prevNode.Reference(), name) {
+ return nil
+ }
+
+ // case 3
+ if err := nl.NodeDeleteMember(prevNode.Reference(), name); err != nil {
+ return err
+ }
+ prevSize := nl.NodeSize(prevNode.Reference())
+ if prevSize == 0 {
+ if _, err := nl.skipList.DeleteByKey(prevNode.Key); err != nil {
+ return err
+ }
+ return nil
+ }
+ nextSize := nl.NodeSize(nextNode.Reference())
+ if nextSize > 0 && prevSize + nextSize < nl.batchSize {
+ // case 3.1 merge nextNode and prevNode
+ if _, err := nl.skipList.DeleteByKey(nextNode.Key); err != nil {
+ return err
+ }
+ nextNames, err := nl.NodeRangeBeforeExclusive(nextNode.Reference(), "")
+ if err != nil {
+ return err
+ }
+ if err := nl.NodeAddMember(prevNode.Reference(), nextNames...); err != nil {
+ return err
+ }
+ return nl.NodeDelete(nextNode.Reference())
+ } else {
+ // case 3.2 update prevNode
+ // no action to take
+ return nil
+ }
+
+ return nil
+}
+
+func (nl *ItemList) ListNames(startFrom string, visitNamesFn func(name string) bool) error {
+ lookupKey := []byte(startFrom)
+ prevNode, nextNode, found, err := nl.skipList.FindGreaterOrEqual(lookupKey)
+ if err != nil {
+ return err
+ }
+ if found && bytes.Compare(nextNode.Key, lookupKey) == 0 {
+ prevNode = nil
+ }
+ if !found {
+ prevNode, err = nl.skipList.GetLargestNode()
+ if err != nil {
+ return err
+ }
+ }
+
+ if prevNode != nil {
+ if !nl.NodeScanIncluseiveAfter(prevNode.Reference(), startFrom, visitNamesFn) {
+ return nil
+ }
+ }
+
+ for nextNode != nil {
+ if !nl.NodeScanIncluseiveAfter(nextNode.Reference(), startFrom, visitNamesFn) {
+ return nil
+ }
+ nextNode, err = nl.skipList.LoadElement(nextNode.Next[0])
+ if err != nil {
+ return err
+ }
+ }
+
+ return nil
+}
+
+func (nl *ItemList) RemoteAllListElement() error {
+
+ t := nl.skipList
+
+ nodeRef := t.StartLevels[0]
+ for nodeRef != nil {
+ node, err := t.LoadElement(nodeRef)
+ if err != nil {
+ return err
+ }
+ if node == nil {
+ return nil
+ }
+ if err := t.DeleteElement(node); err != nil {
+ return err
+ }
+ if err := nl.NodeDelete(node.Reference()); err != nil {
+ return err
+ }
+ nodeRef = node.Next[0]
+ }
+ return nil
+
+}
+
+func (nl *ItemList) NodeContainsItem(node *skiplist.SkipListElementReference, item string) bool {
+ key := fmt.Sprintf("%s%dm", nl.prefix, node.ElementPointer)
+ _, err := nl.client.ZScore(context.Background(), key, item).Result()
+ if err == redis.Nil {
+ return false
+ }
+ if err == nil {
+ return true
+ }
+ return false
+}
+
+func (nl *ItemList) NodeSize(node *skiplist.SkipListElementReference) int {
+ key := fmt.Sprintf("%s%dm", nl.prefix, node.ElementPointer)
+ return int(nl.client.ZLexCount(context.Background(), key, "-", "+").Val())
+}
+
+func (nl *ItemList) NodeAddMember(node *skiplist.SkipListElementReference, names ...string) error {
+ key := fmt.Sprintf("%s%dm", nl.prefix, node.ElementPointer)
+ var members []*redis.Z
+ for _, name := range names {
+ members = append(members, &redis.Z{
+ Score: 0,
+ Member: name,
+ })
+ }
+ return nl.client.ZAddNX(context.Background(), key, members...).Err()
+}
+func (nl *ItemList) NodeDeleteMember(node *skiplist.SkipListElementReference, name string) error {
+ key := fmt.Sprintf("%s%dm", nl.prefix, node.ElementPointer)
+ return nl.client.ZRem(context.Background(), key, name).Err()
+}
+
+func (nl *ItemList) NodeDelete(node *skiplist.SkipListElementReference) error {
+ key := fmt.Sprintf("%s%dm", nl.prefix, node.ElementPointer)
+ return nl.client.Del(context.Background(), key).Err()
+}
+
+func (nl *ItemList) NodeInnerPosition(node *skiplist.SkipListElementReference, name string) int {
+ key := fmt.Sprintf("%s%dm", nl.prefix, node.ElementPointer)
+ return int(nl.client.ZLexCount(context.Background(), key, "-", "("+name).Val())
+}
+
+func (nl *ItemList) NodeMin(node *skiplist.SkipListElementReference) string {
+ key := fmt.Sprintf("%s%dm", nl.prefix, node.ElementPointer)
+ slice := nl.client.ZPopMin(context.Background(), key).Val()
+ if len(slice)>0{
+ s := slice[0].Member.(string)
+ return s
+ }
+ return ""
+}
+
+func (nl *ItemList) NodeScanIncluseiveAfter(node *skiplist.SkipListElementReference, startFrom string, visitNamesFn func(name string) bool) bool {
+ key := fmt.Sprintf("%s%dm", nl.prefix, node.ElementPointer)
+ if startFrom == "" {
+ startFrom = "-"
+ } else {
+ startFrom = "[" + startFrom
+ }
+ names := nl.client.ZRangeByLex(context.Background(), key, &redis.ZRangeBy{
+ Min: startFrom,
+ Max: "+",
+ }).Val()
+ for _, n := range names {
+ if !visitNamesFn(n) {
+ return false
+ }
+ }
+ return true
+}
+
+func (nl *ItemList) NodeRangeBeforeExclusive(node *skiplist.SkipListElementReference, stopAt string) ([]string, error) {
+ key := fmt.Sprintf("%s%dm", nl.prefix, node.ElementPointer)
+ if stopAt == "" {
+ stopAt = "+"
+ } else {
+ stopAt = "(" + stopAt
+ }
+ return nl.client.ZRangeByLex(context.Background(), key, &redis.ZRangeBy{
+ Min: "-",
+ Max: stopAt,
+ }).Result()
+}
+func (nl *ItemList) NodeRangeAfterExclusive(node *skiplist.SkipListElementReference, startFrom string) ([]string, error) {
+ key := fmt.Sprintf("%s%dm", nl.prefix, node.ElementPointer)
+ if startFrom == "" {
+ startFrom = "-"
+ } else {
+ startFrom = "(" + startFrom
+ }
+ return nl.client.ZRangeByLex(context.Background(), key, &redis.ZRangeBy{
+ Min: startFrom,
+ Max: "+",
+ }).Result()
+}
+
+func (nl *ItemList) NodeDeleteBeforeExclusive(node *skiplist.SkipListElementReference, stopAt string) error {
+ key := fmt.Sprintf("%s%dm", nl.prefix, node.ElementPointer)
+ if stopAt == "" {
+ stopAt = "+"
+ } else {
+ stopAt = "(" + stopAt
+ }
+ return nl.client.ZRemRangeByLex(context.Background(), key, "-", stopAt).Err()
+}
+func (nl *ItemList) NodeDeleteAfterExclusive(node *skiplist.SkipListElementReference, startFrom string) error {
+ key := fmt.Sprintf("%s%dm", nl.prefix, node.ElementPointer)
+ if startFrom == "" {
+ startFrom = "-"
+ } else {
+ startFrom = "(" + startFrom
+ }
+ return nl.client.ZRemRangeByLex(context.Background(), key, startFrom, "+").Err()
+}
+
+func (nl *ItemList) ItemAdd(lookupKey []byte, idIfKnown int64, names ...string) error {
+ if id, err := nl.skipList.InsertByKey(lookupKey, idIfKnown, nil); err != nil {
+ return err
+ } else {
+ if len(names) > 0 {
+ return nl.NodeAddMember(&skiplist.SkipListElementReference{
+ ElementPointer: id,
+ Key: lookupKey,
+ }, names...)
+ }
+ }
+ return nil
+}
diff --git a/weed/filer/redis3/item_list_serde.go b/weed/filer/redis3/item_list_serde.go
new file mode 100644
index 000000000..d0310ce40
--- /dev/null
+++ b/weed/filer/redis3/item_list_serde.go
@@ -0,0 +1,75 @@
+package redis3
+
+import (
+ "github.com/chrislusf/seaweedfs/weed/glog"
+ "github.com/chrislusf/seaweedfs/weed/util/skiplist"
+ "github.com/go-redis/redis/v8"
+ "github.com/golang/protobuf/proto"
+)
+
+func LoadItemList(data []byte, prefix string, client redis.UniversalClient, store skiplist.ListStore, batchSize int) *ItemList {
+
+ nl := &ItemList{
+ skipList: skiplist.New(store),
+ batchSize: batchSize,
+ client: client,
+ prefix: prefix,
+ }
+
+ if len(data) == 0 {
+ return nl
+ }
+
+ message := &skiplist.SkipListProto{}
+ if err := proto.Unmarshal(data, message); err != nil {
+ glog.Errorf("loading skiplist: %v", err)
+ }
+ nl.skipList.MaxNewLevel = int(message.MaxNewLevel)
+ nl.skipList.MaxLevel = int(message.MaxLevel)
+ for i, ref := range message.StartLevels {
+ nl.skipList.StartLevels[i] = &skiplist.SkipListElementReference{
+ ElementPointer: ref.ElementPointer,
+ Key: ref.Key,
+ }
+ }
+ for i, ref := range message.EndLevels {
+ nl.skipList.EndLevels[i] = &skiplist.SkipListElementReference{
+ ElementPointer: ref.ElementPointer,
+ Key: ref.Key,
+ }
+ }
+ return nl
+}
+
+func (nl *ItemList) HasChanges() bool {
+ return nl.skipList.HasChanges
+}
+
+func (nl *ItemList) ToBytes() []byte {
+ message := &skiplist.SkipListProto{}
+ message.MaxNewLevel = int32(nl.skipList.MaxNewLevel)
+ message.MaxLevel = int32(nl.skipList.MaxLevel)
+ for _, ref := range nl.skipList.StartLevels {
+ if ref == nil {
+ break
+ }
+ message.StartLevels = append(message.StartLevels, &skiplist.SkipListElementReference{
+ ElementPointer: ref.ElementPointer,
+ Key: ref.Key,
+ })
+ }
+ for _, ref := range nl.skipList.EndLevels {
+ if ref == nil {
+ break
+ }
+ message.EndLevels = append(message.EndLevels, &skiplist.SkipListElementReference{
+ ElementPointer: ref.ElementPointer,
+ Key: ref.Key,
+ })
+ }
+ data, err := proto.Marshal(message)
+ if err != nil {
+ glog.Errorf("marshal skiplist: %v", err)
+ }
+ return data
+}
diff --git a/weed/filer/redis3/kv_directory_children.go b/weed/filer/redis3/kv_directory_children.go
index 797e7797c..624d17374 100644
--- a/weed/filer/redis3/kv_directory_children.go
+++ b/weed/filer/redis3/kv_directory_children.go
@@ -4,11 +4,10 @@ import (
"context"
"fmt"
"github.com/chrislusf/seaweedfs/weed/glog"
- "github.com/chrislusf/seaweedfs/weed/util/skiplist"
"github.com/go-redis/redis/v8"
)
-const maxNameBatchSizeLimit = 1000
+const maxNameBatchSizeLimit = 1000000
func insertChild(ctx context.Context, redisStore *UniversalRedis3Store, key string, name string) error {
@@ -29,7 +28,7 @@ func insertChild(ctx context.Context, redisStore *UniversalRedis3Store, key stri
}
}
store := newSkipListElementStore(key, client)
- nameList := skiplist.LoadNameList([]byte(data), store, maxNameBatchSizeLimit)
+ nameList := LoadItemList([]byte(data), key, client, store, maxNameBatchSizeLimit)
if err := nameList.WriteName(name); err != nil {
glog.Errorf("add %s %s: %v", key, name, err)
@@ -64,7 +63,7 @@ func removeChild(ctx context.Context, redisStore *UniversalRedis3Store, key stri
}
}
store := newSkipListElementStore(key, client)
- nameList := skiplist.LoadNameList([]byte(data), store, maxNameBatchSizeLimit)
+ nameList := LoadItemList([]byte(data), key, client, store, maxNameBatchSizeLimit)
if err := nameList.DeleteName(name); err != nil {
return err
@@ -97,7 +96,7 @@ func removeChildren(ctx context.Context, redisStore *UniversalRedis3Store, key s
}
}
store := newSkipListElementStore(key, client)
- nameList := skiplist.LoadNameList([]byte(data), store, maxNameBatchSizeLimit)
+ nameList := LoadItemList([]byte(data), key, client, store, maxNameBatchSizeLimit)
if err = nameList.ListNames("", func(name string) bool {
if err := onDeleteFn(name); err != nil {
@@ -126,7 +125,7 @@ func listChildren(ctx context.Context, redisStore *UniversalRedis3Store, key str
}
}
store := newSkipListElementStore(key, client)
- nameList := skiplist.LoadNameList([]byte(data), store, maxNameBatchSizeLimit)
+ nameList := LoadItemList([]byte(data), key, client, store, maxNameBatchSizeLimit)
if err = nameList.ListNames(startFileName, func(name string) bool {
return eachFn(name)
diff --git a/weed/filer/redis3/kv_directory_children_test.go b/weed/filer/redis3/kv_directory_children_test.go
index 5c1cff2bb..77988e1a3 100644
--- a/weed/filer/redis3/kv_directory_children_test.go
+++ b/weed/filer/redis3/kv_directory_children_test.go
@@ -2,11 +2,12 @@ package redis3
import (
"context"
- "github.com/chrislusf/seaweedfs/weed/util/skiplist"
+ "fmt"
"github.com/go-redis/redis/v8"
"github.com/stvp/tempredis"
"strconv"
"testing"
+ "time"
)
var names = []string{
@@ -53,20 +54,21 @@ func TestNameList(t *testing.T) {
store := newSkipListElementStore("/yyy/bin", client)
var data []byte
for _, name := range names {
- nameList := skiplist.LoadNameList(data, store, maxNameBatchSizeLimit)
+ nameList := LoadItemList(data, "/yyy/bin", client, store, maxNameBatchSizeLimit)
nameList.WriteName(name)
nameList.ListNames("", func(name string) bool {
+ // println(name)
return true
})
if nameList.HasChanges() {
data = nameList.ToBytes()
}
- println()
+ // println()
}
- nameList := skiplist.LoadNameList(data, store, maxNameBatchSizeLimit)
+ nameList := LoadItemList(data, "/yyy/bin", client, store, maxNameBatchSizeLimit)
nameList.ListNames("", func(name string) bool {
println(name)
return true
@@ -74,7 +76,7 @@ func TestNameList(t *testing.T) {
}
-func BenchmarkNameList(b *testing.B) {
+func xBenchmarkNameList(b *testing.B) {
server, err := tempredis.Start(tempredis.Config{})
if err != nil {
@@ -90,9 +92,9 @@ func BenchmarkNameList(b *testing.B) {
store := newSkipListElementStore("/yyy/bin", client)
var data []byte
for i := 0; i < b.N; i++ {
- nameList := skiplist.LoadNameList(data, store, maxNameBatchSizeLimit)
+ nameList := LoadItemList(data, "/yyy/bin", client, store, maxNameBatchSizeLimit)
- nameList.WriteName("name"+strconv.Itoa(i))
+ nameList.WriteName(strconv.Itoa(i)+"namexxxxxxxxxxxxxxxxxxx")
if nameList.HasChanges() {
data = nameList.ToBytes()
@@ -100,7 +102,7 @@ func BenchmarkNameList(b *testing.B) {
}
}
-func BenchmarkRedis(b *testing.B) {
+func xBenchmarkRedis(b *testing.B) {
server, err := tempredis.Start(tempredis.Config{})
if err != nil {
@@ -114,12 +116,60 @@ func BenchmarkRedis(b *testing.B) {
})
for i := 0; i < b.N; i++ {
- client.ZAddNX(context.Background(),"/yyy/bin", &redis.Z{Score: 0, Member: "name"+strconv.Itoa(i)})
+ client.ZAddNX(context.Background(),"/yyy/bin", &redis.Z{Score: 0, Member: strconv.Itoa(i)+"namexxxxxxxxxxxxxxxxxxx"})
}
}
+func TestNameListAdd(t *testing.T) {
-func xBenchmarkNameList(b *testing.B) {
+ server, err := tempredis.Start(tempredis.Config{})
+ if err != nil {
+ panic(err)
+ }
+ defer server.Term()
+
+ client := redis.NewClient(&redis.Options{
+ Addr: "localhost:6379",
+ Password: "",
+ DB: 0,
+ })
+
+ client.FlushAll(context.Background())
+
+ N := 364800
+
+ ts0 := time.Now()
+ store := newSkipListElementStore("/y", client)
+ var data []byte
+ nameList := LoadItemList(data, "/y", client, store, 100000)
+ for i := 0; i < N; i++ {
+ nameList.WriteName(fmt.Sprintf("%8d", i))
+ }
+
+ ts1 := time.Now()
+
+ for i := 0; i < N; i++ {
+ client.ZAddNX(context.Background(),"/x", &redis.Z{Score: 0, Member: fmt.Sprintf("name %8d", i)})
+ }
+ ts2 := time.Now()
+
+ fmt.Printf("%v %v", ts1.Sub(ts0), ts2.Sub(ts1))
+
+ /*
+ keys := client.Keys(context.Background(), "/*m").Val()
+ for _, k := range keys {
+ println("key", k)
+ for i, v := range client.ZRangeByLex(context.Background(), k, &redis.ZRangeBy{
+ Min: "-",
+ Max: "+",
+ }).Val() {
+ println(" ", i, v)
+ }
+ }
+ */
+}
+
+func BenchmarkNameList(b *testing.B) {
server, err := tempredis.Start(tempredis.Config{})
if err != nil {
@@ -136,9 +186,9 @@ func xBenchmarkNameList(b *testing.B) {
store := newSkipListElementStore("/yyy/bin", client)
var data []byte
for i := 0; i < b.N; i++ {
- nameList := skiplist.LoadNameList(data, store, maxNameBatchSizeLimit)
+ nameList := LoadItemList(data, "/yyy/bin", client, store, maxNameBatchSizeLimit)
- nameList.WriteName("name"+strconv.Itoa(i))
+ nameList.WriteName(fmt.Sprintf("name %8d", i))
if nameList.HasChanges() {
data = nameList.ToBytes()
@@ -146,7 +196,7 @@ func xBenchmarkNameList(b *testing.B) {
}
}
-func xBenchmarkRedis(b *testing.B) {
+func BenchmarkRedis(b *testing.B) {
client := redis.NewClient(&redis.Options{
Addr: "localhost:6379",
@@ -155,6 +205,6 @@ func xBenchmarkRedis(b *testing.B) {
})
for i := 0; i < b.N; i++ {
- client.ZAddNX(context.Background(),"/xxx/bin", &redis.Z{Score: 0, Member: "name"+strconv.Itoa(i)})
+ client.ZAddNX(context.Background(),"/xxx/bin", &redis.Z{Score: 0, Member: fmt.Sprintf("name %8d", i)})
}
}
diff --git a/weed/filer/redis3/skiplist_element_store.go b/weed/filer/redis3/skiplist_element_store.go
index 66a5408d6..bcad356dd 100644
--- a/weed/filer/redis3/skiplist_element_store.go
+++ b/weed/filer/redis3/skiplist_element_store.go
@@ -10,7 +10,7 @@ import (
)
type SkipListElementStore struct {
- prefix string
+ Prefix string
client redis.UniversalClient
}
@@ -18,13 +18,13 @@ var _ = skiplist.ListStore(&SkipListElementStore{})
func newSkipListElementStore(prefix string, client redis.UniversalClient) *SkipListElementStore {
return &SkipListElementStore{
- prefix: prefix,
+ Prefix: prefix,
client: client,
}
}
func (m *SkipListElementStore) SaveElement(id int64, element *skiplist.SkipListElement) error {
- key := fmt.Sprintf("%s%d", m.prefix, id)
+ key := fmt.Sprintf("%s%d", m.Prefix, id)
data, err := proto.Marshal(element)
if err != nil {
glog.Errorf("marshal %s: %v", key, err)
@@ -33,12 +33,12 @@ func (m *SkipListElementStore) SaveElement(id int64, element *skiplist.SkipListE
}
func (m *SkipListElementStore) DeleteElement(id int64) error {
- key := fmt.Sprintf("%s%d", m.prefix, id)
+ key := fmt.Sprintf("%s%d", m.Prefix, id)
return m.client.Del(context.Background(), key).Err()
}
func (m *SkipListElementStore) LoadElement(id int64) (*skiplist.SkipListElement, error) {
- key := fmt.Sprintf("%s%d", m.prefix, id)
+ key := fmt.Sprintf("%s%d", m.Prefix, id)
data, err := m.client.Get(context.Background(), key).Result()
if err != nil {
if err == redis.Nil {