aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorChris Lu <chris.lu@gmail.com>2021-10-03 17:54:25 -0700
committerChris Lu <chris.lu@gmail.com>2021-10-03 17:54:25 -0700
commite6196cdc503dbe135c3ac22f4e13f62968d30036 (patch)
tree9c113396d1a2339dee9f2af35446934920b00a2f
parenta481c4a45ef60de22d6dacf83010542ce8c6e1bb (diff)
downloadseaweedfs-e6196cdc503dbe135c3ac22f4e13f62968d30036.tar.xz
seaweedfs-e6196cdc503dbe135c3ac22f4e13f62968d30036.zip
add name list
-rw-r--r--weed/util/skiplist/name_batch.go102
-rw-r--r--weed/util/skiplist/name_list.go303
-rw-r--r--weed/util/skiplist/name_list_test.go73
-rw-r--r--weed/util/skiplist/skiplist.pb.go75
-rw-r--r--weed/util/skiplist/skiplist.proto4
5 files changed, 551 insertions, 6 deletions
diff --git a/weed/util/skiplist/name_batch.go b/weed/util/skiplist/name_batch.go
new file mode 100644
index 000000000..18427d341
--- /dev/null
+++ b/weed/util/skiplist/name_batch.go
@@ -0,0 +1,102 @@
+package skiplist
+
+import (
+ "github.com/chrislusf/seaweedfs/weed/glog"
+ "github.com/golang/protobuf/proto"
+ "sort"
+ "strings"
+)
+
+type NameBatch struct {
+ key string
+ names map[string]struct{}
+}
+
+func (nb *NameBatch) ContainsName(name string) (found bool) {
+ _, found = nb.names[name]
+ return
+}
+func (nb *NameBatch) WriteName(name string) {
+ if nb.key == "" || strings.Compare(nb.key, name) > 0 {
+ nb.key = name
+ }
+ nb.names[name] = struct{}{}
+}
+func (nb *NameBatch) DeleteName(name string) {
+ delete(nb.names, name)
+ if nb.key == name {
+ nb.key = ""
+ for n := range nb.names {
+ if nb.key == "" || strings.Compare(nb.key, n) > 0 {
+ nb.key = n
+ }
+ }
+ }
+}
+func (nb *NameBatch) ListNames(startFrom string, visitNamesFn func(name string) bool) bool {
+ var names []string
+ needFilter := startFrom == ""
+ for n := range nb.names {
+ if !needFilter || strings.Compare(n, startFrom) >= 0 {
+ names = append(names, n)
+ }
+ }
+ sort.Slice(names, func(i, j int) bool {
+ return strings.Compare(names[i], names[j]) < 0
+ })
+ for _, n := range names {
+ if !visitNamesFn(n) {
+ return false
+ }
+ }
+ return true
+}
+
+func NewNameBatch() *NameBatch {
+ return &NameBatch{
+ names: make(map[string]struct{}),
+ }
+}
+
+func LoadNameBatch(data []byte) *NameBatch {
+ t := &NameBatchData{}
+ if len(data) > 0 {
+ err := proto.Unmarshal(data, t)
+ if err != nil {
+ glog.Errorf("unmarshal into NameBatchData{} : %v", err)
+ return nil
+ }
+ }
+ nb := NewNameBatch()
+ for _, n := range t.Names {
+ name := string(n)
+ if nb.key == "" || strings.Compare(nb.key, name) > 0 {
+ nb.key = name
+ }
+ nb.names[name] = struct{}{}
+ }
+ return nb
+}
+
+func (nb *NameBatch) ToBytes() []byte {
+ t := &NameBatchData{}
+ for n := range nb.names {
+ t.Names = append(t.Names, []byte(n))
+ }
+ data, _ := proto.Marshal(t)
+ return data
+}
+
+func (nb *NameBatch) SplitBy(name string) (x, y *NameBatch) {
+ x, y = NewNameBatch(), NewNameBatch()
+
+ for n := range nb.names {
+ // there should be no equal case though
+ if strings.Compare(n, name) <= 0 {
+ x.WriteName(n)
+ } else {
+ y.WriteName(n)
+ }
+ }
+ return
+}
diff --git a/weed/util/skiplist/name_list.go b/weed/util/skiplist/name_list.go
new file mode 100644
index 000000000..db328afba
--- /dev/null
+++ b/weed/util/skiplist/name_list.go
@@ -0,0 +1,303 @@
+package skiplist
+
+import (
+ "bytes"
+)
+
+type NameList struct {
+ skipList *SkipList
+ batchSize int
+}
+
+func NewNameList(store ListStore, batchSize int) *NameList {
+ return &NameList{
+ skipList: New(store),
+ batchSize: batchSize,
+ }
+}
+
+/*
+Be reluctant to create new nodes. Try to fit into either previous node or next node.
+Prefer to add to previous node.
+
+There are multiple cases after finding the name for greater or equal node
+ 1. found and node.Key == name
+ The node contains a batch with leading key the same as the name
+ nothing to do
+ 2. no such node found or node.Key > name
+
+ if no such node found
+ prevNode = list.LargestNode
+
+ // case 2.1
+ if previousNode contains name
+ nothing to do
+
+ // prefer to add to previous node
+ if prevNode != nil {
+ // case 2.2
+ if prevNode has capacity
+ prevNode.add name, and save
+ return
+ // case 2.3
+ split prevNode by name
+ }
+
+ // case 2.4
+ // merge into next node. Avoid too many nodes if adding data in reverse order.
+ if nextNode is not nil and nextNode has capacity
+ delete nextNode.Key
+ nextNode.Key = name
+ nextNode.batch.add name
+ insert nodeNode.Key
+ return
+
+ // case 2.5
+ if prevNode is nil
+ insert new node with key = name, value = batch{name}
+ return
+
+*/
+func (nl *NameList) WriteName(name string) error {
+ lookupKey := []byte(name)
+ prevNode, nextNode, found, err := nl.skipList.FindGreaterOrEqual(lookupKey)
+ if err != nil {
+ return err
+ }
+ // case 1: the name already exists as one leading key in the batch
+ if found && bytes.Compare(nextNode.Key, lookupKey) == 0 {
+ return nil
+ }
+
+ if !found {
+ prevNode, err = nl.skipList.GetLargestNode()
+ if err != nil {
+ return err
+ }
+ }
+
+ if nextNode != nil && prevNode == nil {
+ prevNode, err = nl.skipList.loadElement(nextNode.Prev)
+ if err != nil {
+ return err
+ }
+ }
+
+ if prevNode != nil {
+ prevNameBatch := LoadNameBatch(prevNode.Value)
+ // case 2.1
+ if prevNameBatch.ContainsName(name) {
+ return nil
+ }
+
+ // case 2.2
+ if len(prevNameBatch.names) < nl.batchSize {
+ prevNameBatch.WriteName(name)
+ return nl.skipList.ChangeValue(prevNode, prevNameBatch.ToBytes())
+ }
+
+ // case 2.3
+ x, y := prevNameBatch.SplitBy(name)
+ addToX := len(x.names) <= len(y.names)
+ if len(x.names) != len(prevNameBatch.names) {
+ if addToX {
+ x.WriteName(name)
+ }
+ if x.key == prevNameBatch.key {
+ if err := nl.skipList.ChangeValue(prevNode, x.ToBytes()); err != nil {
+ return err
+ }
+ } else {
+ if err := nl.skipList.Insert([]byte(x.key), x.ToBytes()); err != nil {
+ return err
+ }
+ }
+ }
+ if len(y.names) != len(prevNameBatch.names) {
+ if !addToX {
+ y.WriteName(name)
+ }
+ if y.key == prevNameBatch.key {
+ if err := nl.skipList.ChangeValue(prevNode, y.ToBytes()); err != nil {
+ return err
+ }
+ } else {
+ if err := nl.skipList.Insert([]byte(y.key), y.ToBytes()); err != nil {
+ return err
+ }
+ }
+ }
+ return nil
+
+ }
+
+ // case 2.4
+ if nextNode != nil {
+ nextNameBatch := LoadNameBatch(nextNode.Value)
+ if len(nextNameBatch.names) < nl.batchSize {
+ if err := nl.skipList.Delete(nextNode.Key); err != nil {
+ return err
+ }
+ nextNameBatch.WriteName(name)
+ if err := nl.skipList.Insert([]byte(nextNameBatch.key), nextNameBatch.ToBytes()); err != nil {
+ return err
+ }
+ return nil
+ }
+ }
+
+ // case 2.5
+ // now prevNode is nil
+ newNameBatch := NewNameBatch()
+ newNameBatch.WriteName(name)
+ if err := nl.skipList.Insert([]byte(newNameBatch.key), newNameBatch.ToBytes()); err != nil {
+ return err
+ }
+
+ return nil
+}
+
+/*
+// case 1: exists in nextNode
+if nextNode != nil && nextNode.Key == name {
+ remove from nextNode, update nextNode
+ // TODO: merge with prevNode if possible?
+ return
+}
+if nextNode is nil
+ prevNode = list.Largestnode
+if prevNode == nil and nextNode.Prev != nil
+ prevNode = load(nextNode.Prev)
+
+// case 2: does not exist
+// case 2.1
+if prevNode == nil {
+ return
+}
+// case 2.2
+if prevNameBatch does not contain name {
+ return
+}
+
+// case 3
+delete from prevNameBatch
+if prevNameBatch + nextNode < capacityList
+ // case 3.1
+ merge
+else
+ // case 3.2
+ update prevNode
+
+
+*/
+func (nl *NameList) DeleteName(name string) error {
+ lookupKey := []byte(name)
+ prevNode, nextNode, found, err := nl.skipList.FindGreaterOrEqual(lookupKey)
+ if err != nil {
+ return err
+ }
+
+ // case 1
+ var nextNameBatch *NameBatch
+ if nextNode != nil {
+ nextNameBatch = LoadNameBatch(nextNode.Value)
+ }
+ if found && bytes.Compare(nextNode.Key, lookupKey) == 0 {
+ if err := nl.skipList.Delete(nextNode.Key); err != nil {
+ return err
+ }
+ nextNameBatch.DeleteName(name)
+ if len(nextNameBatch.names) > 0 {
+ if err := nl.skipList.Insert([]byte(nextNameBatch.key), nextNameBatch.ToBytes()); err != nil {
+ return err
+ }
+ }
+ return nil
+ }
+
+ if !found {
+ prevNode, err = nl.skipList.GetLargestNode()
+ if err != nil {
+ return err
+ }
+ }
+
+ if nextNode != nil && prevNode == nil {
+ prevNode, err = nl.skipList.loadElement(nextNode.Prev)
+ if err != nil {
+ return err
+ }
+ }
+
+ // case 2
+ if prevNode == nil {
+ // case 2.1
+ return nil
+ }
+ prevNameBatch := LoadNameBatch(prevNode.Value)
+ if !prevNameBatch.ContainsName(name) {
+ // case 2.2
+ return nil
+ }
+
+ // case 3
+ prevNameBatch.DeleteName(name)
+ if len(prevNameBatch.names) == 0 {
+ if err := nl.skipList.Delete(prevNode.Key); err != nil {
+ return err
+ }
+ return nil
+ }
+ if nextNameBatch != nil && len(nextNameBatch.names) + len(prevNameBatch.names) < nl.batchSize {
+ // case 3.1 merge nextNode and prevNode
+ if err := nl.skipList.Delete(nextNode.Key); err != nil {
+ return err
+ }
+ for nextName := range nextNameBatch.names {
+ prevNameBatch.WriteName(nextName)
+ }
+ return nl.skipList.ChangeValue(prevNode, prevNameBatch.ToBytes())
+ } else {
+ // case 3.2 update prevNode
+ return nl.skipList.ChangeValue(prevNode, prevNameBatch.ToBytes())
+ }
+
+ return nil
+}
+
+func (nl *NameList) ListNames(startFrom string, visitNamesFn func(name string) bool) error {
+ lookupKey := []byte(startFrom)
+ prevNode, nextNode, found, err := nl.skipList.FindGreaterOrEqual(lookupKey)
+ if err != nil {
+ return err
+ }
+ if found && bytes.Compare(nextNode.Key, lookupKey) == 0 {
+ prevNode = nil
+ }
+ if !found {
+ prevNode, err = nl.skipList.GetLargestNode()
+ if err != nil {
+ return err
+ }
+ }
+
+ if prevNode != nil {
+ prevNameBatch := LoadNameBatch(prevNode.Value)
+ if !prevNameBatch.ListNames(startFrom, visitNamesFn) {
+ return nil
+ }
+ }
+
+ for nextNode != nil {
+ nextNameBatch := LoadNameBatch(nextNode.Value)
+ if !nextNameBatch.ListNames(startFrom, visitNamesFn) {
+ return nil
+ }
+ nextNode, err = nl.skipList.loadElement(nextNode.Next[0])
+ if err != nil {
+ return err
+ }
+ }
+
+ return nil
+}
diff --git a/weed/util/skiplist/name_list_test.go b/weed/util/skiplist/name_list_test.go
new file mode 100644
index 000000000..811a101f2
--- /dev/null
+++ b/weed/util/skiplist/name_list_test.go
@@ -0,0 +1,73 @@
+package skiplist
+
+import (
+ "math/rand"
+ "strconv"
+ "testing"
+)
+
+const (
+ maxNameCount = 100
+)
+
+func String(x int) string {
+ return strconv.Itoa(x)
+}
+
+func TestNameList(t *testing.T) {
+ list := NewNameList(memStore, 7)
+
+ for i := 0; i < maxNameCount; i++ {
+ list.WriteName(String(i))
+ }
+
+ counter := 0
+ list.ListNames("", func(name string) bool {
+ counter++
+ print(name, " ")
+ return true
+ })
+ if counter != maxNameCount {
+ t.Fail()
+ }
+
+ // list.skipList.println()
+
+ deleteBase := 5
+ deleteCount := maxNameCount - 3 * deleteBase
+
+ for i := deleteBase; i < deleteBase+deleteCount; i++ {
+ list.DeleteName(String(i))
+ }
+
+ counter = 0
+ list.ListNames("", func(name string) bool {
+ counter++
+ return true
+ })
+ // list.skipList.println()
+ if counter != maxNameCount-deleteCount {
+ t.Fail()
+ }
+
+ // randomized deletion
+ list = NewNameList(memStore, 7)
+ // Delete elements at random positions in the list.
+ rList := rand.Perm(maxN)
+ for _, i := range rList {
+ list.WriteName(String(i))
+ }
+ for _, i := range rList {
+ list.DeleteName(String(i))
+ }
+ counter = 0
+ list.ListNames("", func(name string) bool {
+ counter++
+ print(name, " ")
+ return true
+ })
+ if counter != 0 {
+ t.Fail()
+ }
+
+}
diff --git a/weed/util/skiplist/skiplist.pb.go b/weed/util/skiplist/skiplist.pb.go
index 82afec453..adb121bfc 100644
--- a/weed/util/skiplist/skiplist.pb.go
+++ b/weed/util/skiplist/skiplist.pb.go
@@ -238,6 +238,53 @@ func (x *SkipListElement) GetPrev() *SkipListElementReference {
return nil
}
+type NameBatchData struct {
+ state protoimpl.MessageState
+ sizeCache protoimpl.SizeCache
+ unknownFields protoimpl.UnknownFields
+
+ Names [][]byte `protobuf:"bytes,1,rep,name=names,proto3" json:"names,omitempty"`
+}
+
+func (x *NameBatchData) Reset() {
+ *x = NameBatchData{}
+ if protoimpl.UnsafeEnabled {
+ mi := &file_skiplist_proto_msgTypes[3]
+ ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
+ ms.StoreMessageInfo(mi)
+ }
+}
+
+func (x *NameBatchData) String() string {
+ return protoimpl.X.MessageStringOf(x)
+}
+
+func (*NameBatchData) ProtoMessage() {}
+
+func (x *NameBatchData) ProtoReflect() protoreflect.Message {
+ mi := &file_skiplist_proto_msgTypes[3]
+ if protoimpl.UnsafeEnabled && x != nil {
+ ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
+ if ms.LoadMessageInfo() == nil {
+ ms.StoreMessageInfo(mi)
+ }
+ return ms
+ }
+ return mi.MessageOf(x)
+}
+
+// Deprecated: Use NameBatchData.ProtoReflect.Descriptor instead.
+func (*NameBatchData) Descriptor() ([]byte, []int) {
+ return file_skiplist_proto_rawDescGZIP(), []int{3}
+}
+
+func (x *NameBatchData) GetNames() [][]byte {
+ if x != nil {
+ return x.Names
+ }
+ return nil
+}
+
var File_skiplist_proto protoreflect.FileDescriptor
var file_skiplist_proto_rawDesc = []byte{
@@ -275,10 +322,13 @@ var file_skiplist_proto_rawDesc = []byte{
0x18, 0x06, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x22, 0x2e, 0x73, 0x6b, 0x69, 0x70, 0x6c, 0x69, 0x73,
0x74, 0x2e, 0x53, 0x6b, 0x69, 0x70, 0x4c, 0x69, 0x73, 0x74, 0x45, 0x6c, 0x65, 0x6d, 0x65, 0x6e,
0x74, 0x52, 0x65, 0x66, 0x65, 0x72, 0x65, 0x6e, 0x63, 0x65, 0x52, 0x04, 0x70, 0x72, 0x65, 0x76,
- 0x42, 0x33, 0x5a, 0x31, 0x67, 0x69, 0x74, 0x68, 0x75, 0x62, 0x2e, 0x63, 0x6f, 0x6d, 0x2f, 0x63,
- 0x68, 0x72, 0x69, 0x73, 0x6c, 0x75, 0x73, 0x66, 0x2f, 0x73, 0x65, 0x61, 0x77, 0x65, 0x65, 0x64,
- 0x66, 0x73, 0x2f, 0x77, 0x65, 0x65, 0x64, 0x2f, 0x75, 0x74, 0x69, 0x6c, 0x2f, 0x73, 0x6b, 0x69,
- 0x70, 0x6c, 0x69, 0x73, 0x74, 0x62, 0x06, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x33,
+ 0x22, 0x25, 0x0a, 0x0d, 0x4e, 0x61, 0x6d, 0x65, 0x42, 0x61, 0x74, 0x63, 0x68, 0x44, 0x61, 0x74,
+ 0x61, 0x12, 0x14, 0x0a, 0x05, 0x6e, 0x61, 0x6d, 0x65, 0x73, 0x18, 0x01, 0x20, 0x03, 0x28, 0x0c,
+ 0x52, 0x05, 0x6e, 0x61, 0x6d, 0x65, 0x73, 0x42, 0x33, 0x5a, 0x31, 0x67, 0x69, 0x74, 0x68, 0x75,
+ 0x62, 0x2e, 0x63, 0x6f, 0x6d, 0x2f, 0x63, 0x68, 0x72, 0x69, 0x73, 0x6c, 0x75, 0x73, 0x66, 0x2f,
+ 0x73, 0x65, 0x61, 0x77, 0x65, 0x65, 0x64, 0x66, 0x73, 0x2f, 0x77, 0x65, 0x65, 0x64, 0x2f, 0x75,
+ 0x74, 0x69, 0x6c, 0x2f, 0x73, 0x6b, 0x69, 0x70, 0x6c, 0x69, 0x73, 0x74, 0x62, 0x06, 0x70, 0x72,
+ 0x6f, 0x74, 0x6f, 0x33,
}
var (
@@ -293,11 +343,12 @@ func file_skiplist_proto_rawDescGZIP() []byte {
return file_skiplist_proto_rawDescData
}
-var file_skiplist_proto_msgTypes = make([]protoimpl.MessageInfo, 3)
+var file_skiplist_proto_msgTypes = make([]protoimpl.MessageInfo, 4)
var file_skiplist_proto_goTypes = []interface{}{
(*SkipListProto)(nil), // 0: skiplist.SkipListProto
(*SkipListElementReference)(nil), // 1: skiplist.SkipListElementReference
(*SkipListElement)(nil), // 2: skiplist.SkipListElement
+ (*NameBatchData)(nil), // 3: skiplist.NameBatchData
}
var file_skiplist_proto_depIdxs = []int32{
1, // 0: skiplist.SkipListProto.start_levels:type_name -> skiplist.SkipListElementReference
@@ -353,6 +404,18 @@ func file_skiplist_proto_init() {
return nil
}
}
+ file_skiplist_proto_msgTypes[3].Exporter = func(v interface{}, i int) interface{} {
+ switch v := v.(*NameBatchData); i {
+ case 0:
+ return &v.state
+ case 1:
+ return &v.sizeCache
+ case 2:
+ return &v.unknownFields
+ default:
+ return nil
+ }
+ }
}
type x struct{}
out := protoimpl.TypeBuilder{
@@ -360,7 +423,7 @@ func file_skiplist_proto_init() {
GoPackagePath: reflect.TypeOf(x{}).PkgPath(),
RawDescriptor: file_skiplist_proto_rawDesc,
NumEnums: 0,
- NumMessages: 3,
+ NumMessages: 4,
NumExtensions: 0,
NumServices: 0,
},
diff --git a/weed/util/skiplist/skiplist.proto b/weed/util/skiplist/skiplist.proto
index bfb190b33..2991ad830 100644
--- a/weed/util/skiplist/skiplist.proto
+++ b/weed/util/skiplist/skiplist.proto
@@ -24,3 +24,7 @@ message SkipListElement {
bytes value = 5;
SkipListElementReference prev = 6;
}
+
+message NameBatchData {
+ repeated bytes names = 1;
+} \ No newline at end of file