aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--weed/filer/redis3/ItemList.go27
-rw-r--r--weed/filer/redis3/kv_directory_children_test.go14
2 files changed, 31 insertions, 10 deletions
diff --git a/weed/filer/redis3/ItemList.go b/weed/filer/redis3/ItemList.go
index ae4e61cfb..bad3d2efb 100644
--- a/weed/filer/redis3/ItemList.go
+++ b/weed/filer/redis3/ItemList.go
@@ -66,6 +66,24 @@ There are multiple cases after finding the name for greater or equal node
return
*/
+
+func (nl *ItemList) canAddMember(node *skiplist.SkipListElementReference, name string) (alreadyContains bool, nodeSize int, err error) {
+ ctx := context.Background()
+ pipe := nl.client.TxPipeline()
+ key := fmt.Sprintf("%s%dm", nl.prefix, node.ElementPointer)
+ countOperation := pipe.ZLexCount(ctx, key, "-", "+")
+ scoreOperationt := pipe.ZScore(ctx, key, name)
+ if _, err = pipe.Exec(ctx); err != nil && err != redis.Nil{
+ return false, 0, err
+ }
+ if err == redis.Nil {
+ err = nil
+ }
+ alreadyContains = scoreOperationt.Err() == nil
+ nodeSize = int(countOperation.Val())
+ return
+}
+
func (nl *ItemList) WriteName(name string) error {
lookupKey := []byte(name)
@@ -93,13 +111,16 @@ func (nl *ItemList) WriteName(name string) error {
}
if prevNode != nil {
- // case 2.1
- if nl.NodeContainsItem(prevNode.Reference(), name) {
+ alreadyContains, nodeSize, err := nl.canAddMember(prevNode.Reference(), name)
+ if err != nil {
+ return err
+ }
+ if alreadyContains {
+ // case 2.1
return nil
}
// case 2.2
- nodeSize := nl.NodeSize(prevNode.Reference())
if nodeSize < nl.batchSize {
return nl.NodeAddMember(prevNode.Reference(), name)
}
diff --git a/weed/filer/redis3/kv_directory_children_test.go b/weed/filer/redis3/kv_directory_children_test.go
index 77988e1a3..03c15ec35 100644
--- a/weed/filer/redis3/kv_directory_children_test.go
+++ b/weed/filer/redis3/kv_directory_children_test.go
@@ -58,14 +58,14 @@ func TestNameList(t *testing.T) {
nameList.WriteName(name)
nameList.ListNames("", func(name string) bool {
- // println(name)
+ println(name)
return true
})
if nameList.HasChanges() {
data = nameList.ToBytes()
}
- // println()
+ println()
}
nameList := LoadItemList(data, "/yyy/bin", client, store, maxNameBatchSizeLimit)
@@ -76,7 +76,7 @@ func TestNameList(t *testing.T) {
}
-func xBenchmarkNameList(b *testing.B) {
+func BenchmarkNameList(b *testing.B) {
server, err := tempredis.Start(tempredis.Config{})
if err != nil {
@@ -102,7 +102,7 @@ func xBenchmarkNameList(b *testing.B) {
}
}
-func xBenchmarkRedis(b *testing.B) {
+func BenchmarkRedis(b *testing.B) {
server, err := tempredis.Start(tempredis.Config{})
if err != nil {
@@ -120,7 +120,7 @@ func xBenchmarkRedis(b *testing.B) {
}
}
-func TestNameListAdd(t *testing.T) {
+func xTestNameListAdd(t *testing.T) {
server, err := tempredis.Start(tempredis.Config{})
if err != nil {
@@ -169,7 +169,7 @@ func TestNameListAdd(t *testing.T) {
*/
}
-func BenchmarkNameList(b *testing.B) {
+func xBenchmarkNameList(b *testing.B) {
server, err := tempredis.Start(tempredis.Config{})
if err != nil {
@@ -196,7 +196,7 @@ func BenchmarkNameList(b *testing.B) {
}
}
-func BenchmarkRedis(b *testing.B) {
+func xBenchmarkRedis(b *testing.B) {
client := redis.NewClient(&redis.Options{
Addr: "localhost:6379",