diff options
| author | yulai.li <blacktear23@gmail.com> | 2022-06-26 22:43:37 +0800 |
|---|---|---|
| committer | yulai.li <blacktear23@gmail.com> | 2022-06-26 22:43:37 +0800 |
| commit | 46e0b629e529f3aff535f90dd25eb719adf1c0d0 (patch) | |
| tree | 734125b48b6d96f8796a2b89b924312cd169ef0e /weed/util | |
| parent | a5bd0b3a1644a77dcc0b9ff41c4ce8eb3ea0d566 (diff) | |
| parent | dc59ccd110a321db7d0b0480631aa95a3d9ba7e6 (diff) | |
| download | seaweedfs-46e0b629e529f3aff535f90dd25eb719adf1c0d0.tar.xz seaweedfs-46e0b629e529f3aff535f90dd25eb719adf1c0d0.zip | |
Update tikv client version and add one PC support
Diffstat (limited to 'weed/util')
39 files changed, 2442 insertions, 479 deletions
diff --git a/weed/util/bounded_tree/bounded_tree.go b/weed/util/bounded_tree/bounded_tree.go deleted file mode 100644 index 58911df75..000000000 --- a/weed/util/bounded_tree/bounded_tree.go +++ /dev/null @@ -1,179 +0,0 @@ -package bounded_tree - -import ( - "sync" - - "github.com/chrislusf/seaweedfs/weed/glog" - "github.com/chrislusf/seaweedfs/weed/util" -) - -type Node struct { - Parent *Node - Name string - Children map[string]*Node -} - -type BoundedTree struct { - root *Node - sync.RWMutex - baseDir util.FullPath -} - -func NewBoundedTree(baseDir util.FullPath) *BoundedTree { - return &BoundedTree{ - root: &Node{ - Name: "/", - }, - baseDir: baseDir, - } -} - -type VisitNodeFunc func(path util.FullPath) (childDirectories []string, err error) - -// If the path is not visited, call the visitFn for each level of directory -// No action if the directory has been visited before or does not exist. -// A leaf node, which has no children, represents a directory not visited. -// A non-leaf node or a non-existing node represents a directory already visited, or does not need to visit. -func (t *BoundedTree) EnsureVisited(p util.FullPath, visitFn VisitNodeFunc) (visitErr error) { - t.Lock() - defer t.Unlock() - - if t.root == nil { - return - } - components := p.Split() - // fmt.Printf("components %v %d\n", components, len(components)) - canDelete, err := t.ensureVisited(t.root, util.FullPath("/"), components, 0, visitFn) - if err != nil { - return err - } - if canDelete { - t.root = nil - } - return nil -} - -func (t *BoundedTree) ensureVisited(n *Node, currentPath util.FullPath, components []string, i int, visitFn VisitNodeFunc) (canDeleteNode bool, visitErr error) { - - // println("ensureVisited", currentPath, i) - - if n == nil { - // fmt.Printf("%s null\n", currentPath) - return - } - - if n.isVisited() { - // fmt.Printf("%s visited %v\n", currentPath, n.Name) - } else { - // fmt.Printf("ensure %v\n", currentPath) - - children, err := visitFn(currentPath) - if err != nil { - glog.V(0).Infof("failed to visit %s: %v", currentPath, err) - return false, err - } - - if len(children) == 0 { - // fmt.Printf(" canDelete %v without children\n", currentPath) - return true, nil - } - - n.Children = make(map[string]*Node) - for _, child := range children { - // fmt.Printf(" add child %v %v\n", currentPath, child) - n.Children[child] = &Node{ - Name: child, - } - } - } - - if i >= len(components) { - return - } - - // fmt.Printf(" check child %v %v\n", currentPath, components[i]) - - toVisitNode, found := n.Children[components[i]] - if !found { - // fmt.Printf(" did not find child %v %v\n", currentPath, components[i]) - return - } - - // fmt.Printf(" ensureVisited %v %v\n", currentPath, toVisitNode.Name) - canDelete, childVisitErr := t.ensureVisited(toVisitNode, currentPath.Child(components[i]), components, i+1, visitFn) - if childVisitErr != nil { - return false, childVisitErr - } - if canDelete { - - // fmt.Printf(" delete %v %v\n", currentPath, components[i]) - delete(n.Children, components[i]) - - if len(n.Children) == 0 { - // fmt.Printf(" canDelete %v\n", currentPath) - return true, nil - } - } - - return false, nil - -} - -func (n *Node) isVisited() bool { - if n == nil { - return true - } - if len(n.Children) > 0 { - return true - } - return false -} - -func (n *Node) getChild(childName string) *Node { - if n == nil { - return nil - } - if len(n.Children) > 0 { - return n.Children[childName] - } - return nil -} - -func (t *BoundedTree) HasVisited(p util.FullPath) bool { - - t.RLock() - defer t.RUnlock() - - if t.root == nil { - return true - } - - components := p.Split() - // fmt.Printf("components %v %d\n", components, len(components)) - return t.hasVisited(t.root, util.FullPath("/"), components, 0) -} - -func (t *BoundedTree) hasVisited(n *Node, currentPath util.FullPath, components []string, i int) bool { - - if n == nil { - return true - } - - if !n.isVisited() { - return false - } - - // fmt.Printf(" hasVisited child %v %+v %d\n", currentPath, components, i) - - if i >= len(components) { - return true - } - - toVisitNode, found := n.Children[components[i]] - if !found { - return true - } - - return t.hasVisited(toVisitNode, currentPath.Child(components[i]), components, i+1) - -} diff --git a/weed/util/bounded_tree/bounded_tree_test.go b/weed/util/bounded_tree/bounded_tree_test.go deleted file mode 100644 index 465f1cc9c..000000000 --- a/weed/util/bounded_tree/bounded_tree_test.go +++ /dev/null @@ -1,126 +0,0 @@ -package bounded_tree - -import ( - "fmt" - "testing" - - "github.com/stretchr/testify/assert" - - "github.com/chrislusf/seaweedfs/weed/util" -) - -var ( - visitFn = func(path util.FullPath) (childDirectories []string, err error) { - fmt.Printf(" visit %v ...\n", path) - switch path { - case "/": - return []string{"a", "g", "h"}, nil - case "/a": - return []string{"b", "f"}, nil - case "/a/b": - return []string{"c", "e"}, nil - case "/a/b/c": - return []string{"d"}, nil - case "/a/b/c/d": - return []string{"i", "j"}, nil - case "/a/b/c/d/i": - return []string{}, nil - case "/a/b/c/d/j": - return []string{}, nil - case "/a/b/e": - return []string{}, nil - case "/a/f": - return []string{}, nil - } - return nil, nil - } - - printMap = func(m map[string]*Node) { - for k := range m { - println(" >", k) - } - } -) - -func TestBoundedTree(t *testing.T) { - - // a/b/c/d/i - // a/b/c/d/j - // a/b/c/d - // a/b/e - // a/f - // g - // h - - tree := NewBoundedTree(util.FullPath("/")) - - tree.EnsureVisited(util.FullPath("/a/b/c"), visitFn) - - assert.Equal(t, true, tree.HasVisited(util.FullPath("/a/b"))) - assert.Equal(t, true, tree.HasVisited(util.FullPath("/a/b/c"))) - assert.Equal(t, false, tree.HasVisited(util.FullPath("/a/b/c/d"))) - assert.Equal(t, false, tree.HasVisited(util.FullPath("/a/b/e"))) - assert.Equal(t, false, tree.HasVisited(util.FullPath("/a/f"))) - assert.Equal(t, false, tree.HasVisited(util.FullPath("/g"))) - assert.Equal(t, false, tree.HasVisited(util.FullPath("/h"))) - assert.Equal(t, true, tree.HasVisited(util.FullPath("/"))) - assert.Equal(t, true, tree.HasVisited(util.FullPath("/x"))) - assert.Equal(t, false, tree.HasVisited(util.FullPath("/a/b/e/x"))) - - printMap(tree.root.Children) - - a := tree.root.getChild("a") - - b := a.getChild("b") - if !b.isVisited() { - t.Errorf("expect visited /a/b") - } - c := b.getChild("c") - if !c.isVisited() { - t.Errorf("expect visited /a/b/c") - } - - d := c.getChild("d") - if d.isVisited() { - t.Errorf("expect unvisited /a/b/c/d") - } - - tree.EnsureVisited(util.FullPath("/a/b/c/d"), visitFn) - tree.EnsureVisited(util.FullPath("/a/b/c/d/i"), visitFn) - tree.EnsureVisited(util.FullPath("/a/b/c/d/j"), visitFn) - tree.EnsureVisited(util.FullPath("/a/b/e"), visitFn) - tree.EnsureVisited(util.FullPath("/a/f"), visitFn) - - printMap(tree.root.Children) - -} - -func TestEmptyBoundedTree(t *testing.T) { - - // g - // h - - tree := NewBoundedTree(util.FullPath("/")) - - visitFn := func(path util.FullPath) (childDirectories []string, err error) { - fmt.Printf(" visit %v ...\n", path) - switch path { - case "/": - return []string{"g", "h"}, nil - } - t.Fatalf("expected visit %s", path) - return nil, nil - } - - tree.EnsureVisited(util.FullPath("/a/b"), visitFn) - - tree.EnsureVisited(util.FullPath("/a/b"), visitFn) - - printMap(tree.root.Children) - - assert.Equal(t, true, tree.HasVisited(util.FullPath("/a/b"))) - assert.Equal(t, true, tree.HasVisited(util.FullPath("/a"))) - assert.Equal(t, false, tree.HasVisited(util.FullPath("/g"))) - assert.Equal(t, false, tree.HasVisited(util.FullPath("/g/x"))) - -} diff --git a/weed/util/chunk_cache/chunk_cache.go b/weed/util/chunk_cache/chunk_cache.go index 40d24b322..3f3b264b1 100644 --- a/weed/util/chunk_cache/chunk_cache.go +++ b/weed/util/chunk_cache/chunk_cache.go @@ -11,8 +11,7 @@ import ( var ErrorOutOfBounds = errors.New("attempt to read out of bounds") type ChunkCache interface { - GetChunk(fileId string, minSize uint64) (data []byte) - GetChunkSlice(fileId string, offset, length uint64) []byte + ReadChunkAt(data []byte, fileId string, offset uint64) (n int, err error) SetChunk(fileId string, data []byte) } @@ -44,105 +43,52 @@ func NewTieredChunkCache(maxEntries int64, dir string, diskSizeInUnit int64, uni return c } -func (c *TieredChunkCache) GetChunk(fileId string, minSize uint64) (data []byte) { +func (c *TieredChunkCache) ReadChunkAt(data []byte, fileId string, offset uint64) (n int, err error) { if c == nil { - return - } - - c.RLock() - defer c.RUnlock() - - return c.doGetChunk(fileId, minSize) -} - -func (c *TieredChunkCache) doGetChunk(fileId string, minSize uint64) (data []byte) { - - if minSize <= c.onDiskCacheSizeLimit0 { - data = c.memCache.GetChunk(fileId) - if len(data) >= int(minSize) { - return data - } - } - - fid, err := needle.ParseFileIdFromString(fileId) - if err != nil { - glog.Errorf("failed to parse file id %s", fileId) - return nil - } - - if minSize <= c.onDiskCacheSizeLimit0 { - data = c.diskCaches[0].getChunk(fid.Key) - if len(data) >= int(minSize) { - return data - } - } - if minSize <= c.onDiskCacheSizeLimit1 { - data = c.diskCaches[1].getChunk(fid.Key) - if len(data) >= int(minSize) { - return data - } - } - { - data = c.diskCaches[2].getChunk(fid.Key) - if len(data) >= int(minSize) { - return data - } - } - - return nil - -} - -func (c *TieredChunkCache) GetChunkSlice(fileId string, offset, length uint64) []byte { - if c == nil { - return nil + return 0, nil } c.RLock() defer c.RUnlock() - return c.doGetChunkSlice(fileId, offset, length) -} - -func (c *TieredChunkCache) doGetChunkSlice(fileId string, offset, length uint64) (data []byte) { - - minSize := offset + length + minSize := offset + uint64(len(data)) if minSize <= c.onDiskCacheSizeLimit0 { - data, err := c.memCache.getChunkSlice(fileId, offset, length) + n, err = c.memCache.readChunkAt(data, fileId, offset) if err != nil { glog.Errorf("failed to read from memcache: %s", err) } - if len(data) >= int(minSize) { - return data + if n >= int(minSize) { + return n, nil } } fid, err := needle.ParseFileIdFromString(fileId) if err != nil { glog.Errorf("failed to parse file id %s", fileId) - return nil + return n, nil } if minSize <= c.onDiskCacheSizeLimit0 { - data = c.diskCaches[0].getChunkSlice(fid.Key, offset, length) - if len(data) >= int(minSize) { - return data + n, err = c.diskCaches[0].readChunkAt(data, fid.Key, offset) + if n >= int(minSize) { + return } } if minSize <= c.onDiskCacheSizeLimit1 { - data = c.diskCaches[1].getChunkSlice(fid.Key, offset, length) - if len(data) >= int(minSize) { - return data + n, err = c.diskCaches[1].readChunkAt(data, fid.Key, offset) + if n >= int(minSize) { + return } } { - data = c.diskCaches[2].getChunkSlice(fid.Key, offset, length) - if len(data) >= int(minSize) { - return data + n, err = c.diskCaches[2].readChunkAt(data, fid.Key, offset) + if n >= int(minSize) { + return } } - return nil + return 0, nil + } func (c *TieredChunkCache) SetChunk(fileId string, data []byte) { diff --git a/weed/util/chunk_cache/chunk_cache_in_memory.go b/weed/util/chunk_cache/chunk_cache_in_memory.go index d725f8a16..2982d0979 100644 --- a/weed/util/chunk_cache/chunk_cache_in_memory.go +++ b/weed/util/chunk_cache/chunk_cache_in_memory.go @@ -1,9 +1,8 @@ package chunk_cache import ( - "time" - "github.com/karlseguin/ccache/v2" + "time" ) // a global cache for recently accessed file chunks @@ -45,6 +44,21 @@ func (c *ChunkCacheInMemory) getChunkSlice(fileId string, offset, length uint64) return data[offset : int(offset)+wanted], nil } +func (c *ChunkCacheInMemory) readChunkAt(buffer []byte, fileId string, offset uint64) (int, error) { + item := c.cache.Get(fileId) + if item == nil { + return 0, nil + } + data := item.Value().([]byte) + item.Extend(time.Hour) + wanted := min(len(buffer), len(data)-int(offset)) + if wanted < 0 { + return 0, ErrorOutOfBounds + } + n := copy(buffer, data[offset:int(offset)+wanted]) + return n, nil +} + func (c *ChunkCacheInMemory) SetChunk(fileId string, data []byte) { localCopy := make([]byte, len(data)) copy(localCopy, data) diff --git a/weed/util/chunk_cache/chunk_cache_on_disk.go b/weed/util/chunk_cache/chunk_cache_on_disk.go index 36de5c972..100b5919e 100644 --- a/weed/util/chunk_cache/chunk_cache_on_disk.go +++ b/weed/util/chunk_cache/chunk_cache_on_disk.go @@ -144,6 +144,28 @@ func (v *ChunkCacheVolume) getNeedleSlice(key types.NeedleId, offset, length uin return data, nil } +func (v *ChunkCacheVolume) readNeedleSliceAt(data []byte, key types.NeedleId, offset uint64) (n int, err error) { + nv, ok := v.nm.Get(key) + if !ok { + return 0, storage.ErrorNotFound + } + wanted := min(len(data), int(nv.Size)-int(offset)) + if wanted < 0 { + // should never happen, but better than panicing + return 0, ErrorOutOfBounds + } + if n, err = v.DataBackend.ReadAt(data, nv.Offset.ToActualOffset()+int64(offset)); err != nil { + return n, fmt.Errorf("read %s.dat [%d,%d): %v", + v.fileName, nv.Offset.ToActualOffset()+int64(offset), int(nv.Offset.ToActualOffset())+int(offset)+wanted, err) + } else { + if n != wanted { + return n, fmt.Errorf("read %d, expected %d", n, wanted) + } + } + + return n, nil +} + func (v *ChunkCacheVolume) WriteNeedle(key types.NeedleId, data []byte) error { offset := v.fileSize diff --git a/weed/util/chunk_cache/chunk_cache_on_disk_test.go b/weed/util/chunk_cache/chunk_cache_on_disk_test.go index f8325276e..8c7880eee 100644 --- a/weed/util/chunk_cache/chunk_cache_on_disk_test.go +++ b/weed/util/chunk_cache/chunk_cache_on_disk_test.go @@ -3,16 +3,13 @@ package chunk_cache import ( "bytes" "fmt" - "io/ioutil" + "github.com/chrislusf/seaweedfs/weed/util/mem" "math/rand" - "os" "testing" ) func TestOnDisk(t *testing.T) { - - tmpDir, _ := ioutil.TempDir("", "c") - defer os.RemoveAll(tmpDir) + tmpDir := t.TempDir() totalDiskSizeInKB := int64(32) @@ -22,7 +19,7 @@ func TestOnDisk(t *testing.T) { type test_data struct { data []byte fileId string - size uint64 + size int } testData := make([]*test_data, writeCount) for i := 0; i < writeCount; i++ { @@ -31,29 +28,35 @@ func TestOnDisk(t *testing.T) { testData[i] = &test_data{ data: buff, fileId: fmt.Sprintf("1,%daabbccdd", i+1), - size: uint64(len(buff)), + size: len(buff), } cache.SetChunk(testData[i].fileId, testData[i].data) // read back right after write - data := cache.GetChunk(testData[i].fileId, testData[i].size) + data := mem.Allocate(testData[i].size) + cache.ReadChunkAt(data, testData[i].fileId, 0) if bytes.Compare(data, testData[i].data) != 0 { t.Errorf("failed to write to and read from cache: %d", i) } + mem.Free(data) } for i := 0; i < 2; i++ { - data := cache.GetChunk(testData[i].fileId, testData[i].size) + data := mem.Allocate(testData[i].size) + cache.ReadChunkAt(data, testData[i].fileId, 0) if bytes.Compare(data, testData[i].data) == 0 { t.Errorf("old cache should have been purged: %d", i) } + mem.Free(data) } for i := 2; i < writeCount; i++ { - data := cache.GetChunk(testData[i].fileId, testData[i].size) + data := mem.Allocate(testData[i].size) + cache.ReadChunkAt(data, testData[i].fileId, 0) if bytes.Compare(data, testData[i].data) != 0 { t.Errorf("failed to write to and read from cache: %d", i) } + mem.Free(data) } cache.Shutdown() @@ -61,10 +64,12 @@ func TestOnDisk(t *testing.T) { cache = NewTieredChunkCache(2, tmpDir, totalDiskSizeInKB, 1024) for i := 0; i < 2; i++ { - data := cache.GetChunk(testData[i].fileId, testData[i].size) + data := mem.Allocate(testData[i].size) + cache.ReadChunkAt(data, testData[i].fileId, 0) if bytes.Compare(data, testData[i].data) == 0 { t.Errorf("old cache should have been purged: %d", i) } + mem.Free(data) } for i := 2; i < writeCount; i++ { @@ -87,10 +92,12 @@ func TestOnDisk(t *testing.T) { */ continue } - data := cache.GetChunk(testData[i].fileId, testData[i].size) + data := mem.Allocate(testData[i].size) + cache.ReadChunkAt(data, testData[i].fileId, 0) if bytes.Compare(data, testData[i].data) != 0 { t.Errorf("failed to write to and read from cache: %d", i) } + mem.Free(data) } cache.Shutdown() diff --git a/weed/util/chunk_cache/on_disk_cache_layer.go b/weed/util/chunk_cache/on_disk_cache_layer.go index b004913ef..de32fb445 100644 --- a/weed/util/chunk_cache/on_disk_cache_layer.go +++ b/weed/util/chunk_cache/on_disk_cache_layer.go @@ -2,12 +2,11 @@ package chunk_cache import ( "fmt" - "path" - "sort" - "github.com/chrislusf/seaweedfs/weed/glog" "github.com/chrislusf/seaweedfs/weed/storage" "github.com/chrislusf/seaweedfs/weed/storage/types" + "golang.org/x/exp/slices" + "path" ) type OnDiskCacheLayer struct { @@ -33,10 +32,9 @@ func NewOnDiskCacheLayer(dir, namePrefix string, diskSize int64, segmentCount in } // keep newest cache to the front - sort.Slice(c.diskCaches, func(i, j int) bool { - return c.diskCaches[i].lastModTime.After(c.diskCaches[j].lastModTime) + slices.SortFunc(c.diskCaches, func(a, b *ChunkCacheVolume) bool { + return a.lastModTime.After(b.lastModTime) }) - return c } @@ -96,7 +94,7 @@ func (c *OnDiskCacheLayer) getChunkSlice(needleId types.NeedleId, offset, length continue } if err != nil { - glog.Errorf("failed to read cache file %s id %d", diskCache.fileName, needleId) + glog.Warningf("failed to read cache file %s id %d: %v", diskCache.fileName, needleId, err) continue } if len(data) != 0 { @@ -108,6 +106,26 @@ func (c *OnDiskCacheLayer) getChunkSlice(needleId types.NeedleId, offset, length } +func (c *OnDiskCacheLayer) readChunkAt(buffer []byte, needleId types.NeedleId, offset uint64) (n int, err error) { + + for _, diskCache := range c.diskCaches { + n, err = diskCache.readNeedleSliceAt(buffer, needleId, offset) + if err == storage.ErrorNotFound { + continue + } + if err != nil { + glog.Warningf("failed to read cache file %s id %d: %v", diskCache.fileName, needleId, err) + continue + } + if n > 0 { + return + } + } + + return + +} + func (c *OnDiskCacheLayer) shutdown() { for _, diskCache := range c.diskCaches { diff --git a/weed/util/compression_test.go b/weed/util/compression_test.go deleted file mode 100644 index b515e8988..000000000 --- a/weed/util/compression_test.go +++ /dev/null @@ -1,21 +0,0 @@ -package util - -import ( - "testing" - - "golang.org/x/tools/godoc/util" -) - -func TestIsGzippable(t *testing.T) { - buf := make([]byte, 1024) - - isText := util.IsText(buf) - - if isText { - t.Error("buf with zeros are not text") - } - - compressed, _ := GzipData(buf) - - t.Logf("compressed size %d\n", len(compressed)) -} diff --git a/weed/util/config.go b/weed/util/config.go index ae9397340..f09ac7e5e 100644 --- a/weed/util/config.go +++ b/weed/util/config.go @@ -9,6 +9,20 @@ import ( "github.com/chrislusf/seaweedfs/weed/glog" ) +var ( + ConfigurationFileDirectory DirectoryValueType +) + +type DirectoryValueType string + +func (s *DirectoryValueType) Set(value string) error { + *s = DirectoryValueType(value) + return nil +} +func (s *DirectoryValueType) String() string { + return string(*s) +} + type Configuration interface { GetString(key string) string GetBool(key string) bool @@ -20,11 +34,12 @@ type Configuration interface { func LoadConfiguration(configFileName string, required bool) (loaded bool) { // find a filer store - viper.SetConfigName(configFileName) // name of config file (without extension) - viper.AddConfigPath(".") // optionally look for config in the working directory - viper.AddConfigPath("$HOME/.seaweedfs") // call multiple times to add many search paths - viper.AddConfigPath("/usr/local/etc/seaweedfs/") // search path for bsd-style config directory in - viper.AddConfigPath("/etc/seaweedfs/") // path to look for the config file in + viper.SetConfigName(configFileName) // name of config file (without extension) + viper.AddConfigPath(ResolvePath(ConfigurationFileDirectory.String())) // path to look for the config file in + viper.AddConfigPath(".") // optionally look for config in the working directory + viper.AddConfigPath("$HOME/.seaweedfs") // call multiple times to add many search paths + viper.AddConfigPath("/usr/local/etc/seaweedfs/") // search path for bsd-style config directory in + viper.AddConfigPath("/etc/seaweedfs/") // path to look for the config file in if err := viper.MergeInConfig(); err != nil { // Handle errors reading the config file if strings.Contains(err.Error(), "Not Found") { diff --git a/weed/util/constants.go b/weed/util/constants.go index d2a90f874..8886e8fd2 100644 --- a/weed/util/constants.go +++ b/weed/util/constants.go @@ -5,8 +5,9 @@ import ( ) var ( - VERSION = fmt.Sprintf("%s %.02f", sizeLimit, 2.65) - COMMIT = "" + VERSION_NUMBER = fmt.Sprintf("%.02f", 3.12) + VERSION = sizeLimit + " " + VERSION_NUMBER + COMMIT = "" ) func Version() string { diff --git a/weed/util/constants_4bytes.go b/weed/util/constants_4bytes.go index a29d9d3b0..187e33909 100644 --- a/weed/util/constants_4bytes.go +++ b/weed/util/constants_4bytes.go @@ -1,3 +1,4 @@ +//go:build !5BytesOffset // +build !5BytesOffset package util diff --git a/weed/util/constants_5bytes.go b/weed/util/constants_5bytes.go index 91ce4066f..7c6a158cf 100644 --- a/weed/util/constants_5bytes.go +++ b/weed/util/constants_5bytes.go @@ -1,3 +1,4 @@ +//go:build 5BytesOffset // +build 5BytesOffset package util diff --git a/weed/util/file_util.go b/weed/util/file_util.go index f83f80265..6155d18e1 100644 --- a/weed/util/file_util.go +++ b/weed/util/file_util.go @@ -87,3 +87,28 @@ func ResolvePath(path string) string { return path } + +func FileNameBase(filename string) string { + lastDotIndex := strings.LastIndex(filename, ".") + if lastDotIndex < 0 { + return filename + } + return filename[:lastDotIndex] +} + +// Copied from os.WriteFile(), adding file sync. +// see https://github.com/golang/go/issues/20599 +func WriteFile(name string, data []byte, perm os.FileMode) error { + f, err := os.OpenFile(name, os.O_WRONLY|os.O_CREATE|os.O_TRUNC, perm) + if err != nil { + return err + } + _, err = f.Write(data) + if err1 := f.Sync(); err1 != nil && err == nil { + err = err1 + } + if err1 := f.Close(); err1 != nil && err == nil { + err = err1 + } + return err +} diff --git a/weed/util/file_util_non_posix.go b/weed/util/file_util_non_posix.go index ffcfef6d5..29aecffa6 100644 --- a/weed/util/file_util_non_posix.go +++ b/weed/util/file_util_non_posix.go @@ -1,3 +1,4 @@ +//go:build linux || darwin || freebsd || netbsd || openbsd || plan9 || solaris || zos // +build linux darwin freebsd netbsd openbsd plan9 solaris zos package util diff --git a/weed/util/file_util_posix.go b/weed/util/file_util_posix.go index 22ca60b3b..0bec8abad 100644 --- a/weed/util/file_util_posix.go +++ b/weed/util/file_util_posix.go @@ -1,3 +1,4 @@ +//go:build windows // +build windows package util diff --git a/weed/util/fullpath.go b/weed/util/fullpath.go index 85028b052..f52d4d1d0 100644 --- a/weed/util/fullpath.go +++ b/weed/util/fullpath.go @@ -41,8 +41,11 @@ func (fp FullPath) Child(name string) FullPath { return FullPath(dir + "/" + noPrefix) } -func (fp FullPath) AsInode() uint64 { - return uint64(HashStringToLong(string(fp))) +// AsInode an in-memory only inode representation +func (fp FullPath) AsInode(unixTime int64) uint64 { + inode := uint64(HashStringToLong(string(fp))) + inode = inode + uint64(unixTime)*37 + return inode } // split, but skipping the root diff --git a/weed/util/grace/signal_handling.go b/weed/util/grace/signal_handling.go index 7cca46764..fc7afcad9 100644 --- a/weed/util/grace/signal_handling.go +++ b/weed/util/grace/signal_handling.go @@ -1,3 +1,4 @@ +//go:build !plan9 // +build !plan9 package grace diff --git a/weed/util/grace/signal_handling_notsupported.go b/weed/util/grace/signal_handling_notsupported.go index 5335915a1..de898159a 100644 --- a/weed/util/grace/signal_handling_notsupported.go +++ b/weed/util/grace/signal_handling_notsupported.go @@ -1,3 +1,4 @@ +//go:build plan9 // +build plan9 package grace diff --git a/weed/util/http_util.go b/weed/util/http_util.go index 2efd6b5aa..2f42d3768 100644 --- a/weed/util/http_util.go +++ b/weed/util/http_util.go @@ -5,8 +5,8 @@ import ( "encoding/json" "errors" "fmt" + "github.com/chrislusf/seaweedfs/weed/util/mem" "io" - "io/ioutil" "net/http" "net/url" "strings" @@ -35,7 +35,7 @@ func Post(url string, values url.Values) ([]byte, error) { return nil, err } defer r.Body.Close() - b, err := ioutil.ReadAll(r.Body) + b, err := io.ReadAll(r.Body) if r.StatusCode >= 400 { if err != nil { return nil, fmt.Errorf("%s: %d - %s", url, r.StatusCode, string(b)) @@ -71,7 +71,7 @@ func Get(url string) ([]byte, bool, error) { reader = response.Body } - b, err := ioutil.ReadAll(reader) + b, err := io.ReadAll(reader) if response.StatusCode >= 400 { retryable := response.StatusCode >= 500 return nil, retryable, fmt.Errorf("%s: %s", url, response.Status) @@ -107,7 +107,7 @@ func Delete(url string, jwt string) error { return e } defer resp.Body.Close() - body, err := ioutil.ReadAll(resp.Body) + body, err := io.ReadAll(resp.Body) if err != nil { return err } @@ -137,7 +137,7 @@ func DeleteProxied(url string, jwt string) (body []byte, httpStatus int, err err return } defer resp.Body.Close() - body, err = ioutil.ReadAll(resp.Body) + body, err = io.ReadAll(resp.Body) if err != nil { return } @@ -181,7 +181,16 @@ func GetUrlStream(url string, values url.Values, readFn func(io.Reader) error) e } func DownloadFile(fileUrl string, jwt string) (filename string, header http.Header, resp *http.Response, e error) { - response, err := client.Get(fileUrl) + req, err := http.NewRequest("GET", fileUrl, nil) + if err != nil { + return "", nil, nil, err + } + + if len(jwt) > 0 { + req.Header.Set("Authorization", "BEARER "+jwt) + } + + response, err := client.Do(req) if err != nil { return "", nil, nil, err } @@ -271,7 +280,7 @@ func ReadUrl(fileUrl string, cipherKey []byte, isContentCompressed bool, isFullC } } // drains the response body to avoid memory leak - data, _ := ioutil.ReadAll(reader) + data, _ := io.ReadAll(reader) if len(data) != 0 { glog.V(1).Infof("%s reader has remaining %d bytes", contentEncoding, len(data)) } @@ -318,7 +327,8 @@ func ReadUrlAsStream(fileUrl string, cipherKey []byte, isContentGzipped bool, is var ( m int ) - buf := make([]byte, 64*1024) + buf := mem.Allocate(64 * 1024) + defer mem.Free(buf) for { m, err = reader.Read(buf) @@ -327,7 +337,7 @@ func ReadUrlAsStream(fileUrl string, cipherKey []byte, isContentGzipped bool, is return false, nil } if err != nil { - return false, err + return true, err } } @@ -359,7 +369,7 @@ func readEncryptedUrl(fileUrl string, cipherKey []byte, isContentCompressed bool return false, nil } -func ReadUrlAsReaderCloser(fileUrl string, rangeHeader string) (io.ReadCloser, error) { +func ReadUrlAsReaderCloser(fileUrl string, jwt string, rangeHeader string) (io.ReadCloser, error) { req, err := http.NewRequest("GET", fileUrl, nil) if err != nil { @@ -371,6 +381,10 @@ func ReadUrlAsReaderCloser(fileUrl string, rangeHeader string) (io.ReadCloser, e req.Header.Add("Accept-Encoding", "gzip") } + if len(jwt) > 0 { + req.Header.Set("Authorization", "BEARER "+jwt) + } + r, err := client.Do(req) if err != nil { return nil, err @@ -393,11 +407,30 @@ func ReadUrlAsReaderCloser(fileUrl string, rangeHeader string) (io.ReadCloser, e } func CloseResponse(resp *http.Response) { - io.Copy(ioutil.Discard, resp.Body) + reader := &CountingReader{reader: resp.Body} + io.Copy(io.Discard, reader) resp.Body.Close() + if reader.BytesRead > 0 { + glog.V(1).Infof("response leftover %d bytes", reader.BytesRead) + } } func CloseRequest(req *http.Request) { - io.Copy(ioutil.Discard, req.Body) + reader := &CountingReader{reader: req.Body} + io.Copy(io.Discard, reader) req.Body.Close() + if reader.BytesRead > 0 { + glog.V(1).Infof("request leftover %d bytes", reader.BytesRead) + } +} + +type CountingReader struct { + reader io.Reader + BytesRead int +} + +func (r *CountingReader) Read(p []byte) (n int, err error) { + n, err = r.reader.Read(p) + r.BytesRead += n + return n, err } diff --git a/weed/util/log_buffer/log_buffer.go b/weed/util/log_buffer/log_buffer.go index c2158e7eb..422575193 100644 --- a/weed/util/log_buffer/log_buffer.go +++ b/weed/util/log_buffer/log_buffer.go @@ -56,11 +56,15 @@ func NewLogBuffer(name string, flushInterval time.Duration, flushFn func(startTi return lb } -func (m *LogBuffer) AddToBuffer(partitionKey, data []byte, eventTsNs int64) { +func (m *LogBuffer) AddToBuffer(partitionKey, data []byte, processingTsNs int64) { + var toFlush *dataToFlush m.Lock() defer func() { m.Unlock() + if toFlush != nil { + m.flushChan <- toFlush + } if m.notifyFn != nil { m.notifyFn() } @@ -68,20 +72,20 @@ func (m *LogBuffer) AddToBuffer(partitionKey, data []byte, eventTsNs int64) { // need to put the timestamp inside the lock var ts time.Time - if eventTsNs == 0 { + if processingTsNs == 0 { ts = time.Now() - eventTsNs = ts.UnixNano() + processingTsNs = ts.UnixNano() } else { - ts = time.Unix(0, eventTsNs) + ts = time.Unix(0, processingTsNs) } - if m.lastTsNs >= eventTsNs { + if m.lastTsNs >= processingTsNs { // this is unlikely to happen, but just in case - eventTsNs = m.lastTsNs + 1 - ts = time.Unix(0, eventTsNs) + processingTsNs = m.lastTsNs + 1 + ts = time.Unix(0, processingTsNs) } - m.lastTsNs = eventTsNs + m.lastTsNs = processingTsNs logEntry := &filer_pb.LogEntry{ - TsNs: eventTsNs, + TsNs: processingTsNs, PartitionKeyHash: util.HashToInt32(partitionKey), Data: data, } @@ -96,7 +100,7 @@ func (m *LogBuffer) AddToBuffer(partitionKey, data []byte, eventTsNs int64) { if m.startTime.Add(m.flushInterval).Before(ts) || len(m.buf)-m.pos < size+4 { // glog.V(4).Infof("%s copyToFlush1 start time %v, ts %v, remaining %d bytes", m.name, m.startTime, ts, len(m.buf)-m.pos) - m.flushChan <- m.copyToFlush() + toFlush = m.copyToFlush() m.startTime = ts if len(m.buf) < size+4 { m.buf = make([]byte, 2*size+4) @@ -148,8 +152,10 @@ func (m *LogBuffer) loopInterval() { return } toFlush := m.copyToFlush() - m.flushChan <- toFlush m.Unlock() + if toFlush != nil { + m.flushChan <- toFlush + } } } @@ -188,16 +194,34 @@ func (m *LogBuffer) ReadFromBuffer(lastReadTime time.Time) (bufferCopy *bytes.Bu m.RLock() defer m.RUnlock() - if !m.lastFlushTime.IsZero() && m.lastFlushTime.After(lastReadTime) { - return nil, ResumeFromDiskError + // Read from disk and memory + // 1. read from disk, last time is = td + // 2. in memory, the earliest time = tm + // if tm <= td, case 2.1 + // read from memory + // if tm is empty, case 2.2 + // read from memory + // if td < tm, case 2.3 + // read from disk again + var tsMemory time.Time + if !m.startTime.IsZero() { + tsMemory = m.startTime } - - /* - fmt.Printf("read buffer %p: %v last stop time: [%v,%v], pos %d, entries:%d, prevBufs:%d\n", m, lastReadTime, m.startTime, m.stopTime, m.pos, len(m.idx), len(m.prevBuffers.buffers)) - for i, prevBuf := range m.prevBuffers.buffers { - fmt.Printf(" prev %d : %s\n", i, prevBuf.String()) + for _, prevBuf := range m.prevBuffers.buffers { + if !prevBuf.startTime.IsZero() && prevBuf.startTime.Before(tsMemory) { + tsMemory = prevBuf.startTime } - */ + } + if tsMemory.IsZero() { // case 2.2 + return nil, nil + } else if lastReadTime.Before(tsMemory) { // case 2.3 + if !m.lastFlushTime.IsZero() { + glog.V(0).Infof("resume with last flush time: %v", m.lastFlushTime) + return nil, ResumeFromDiskError + } + } + + // the following is case 2.1 if lastReadTime.Equal(m.stopTime) { return nil, nil diff --git a/weed/util/log_buffer/log_buffer_test.go b/weed/util/log_buffer/log_buffer_test.go index 7dcfe5f52..915b93bf4 100644 --- a/weed/util/log_buffer/log_buffer_test.go +++ b/weed/util/log_buffer/log_buffer_test.go @@ -27,7 +27,7 @@ func TestNewLogBufferFirstBuffer(t *testing.T) { } receivedmessageCount := 0 - lb.LoopProcessLogData("test", startTime, func() bool { + lb.LoopProcessLogData("test", startTime, 0, func() bool { // stop if no more messages return false }, func(logEntry *filer_pb.LogEntry) error { diff --git a/weed/util/log_buffer/log_read.go b/weed/util/log_buffer/log_read.go index 02f5af274..99532b47b 100644 --- a/weed/util/log_buffer/log_read.go +++ b/weed/util/log_buffer/log_read.go @@ -17,10 +17,11 @@ var ( ResumeFromDiskError = fmt.Errorf("resumeFromDisk") ) -func (logBuffer *LogBuffer) LoopProcessLogData(readerName string, startTreadTime time.Time, waitForDataFn func() bool, eachLogDataFn func(logEntry *filer_pb.LogEntry) error) (lastReadTime time.Time, err error) { +func (logBuffer *LogBuffer) LoopProcessLogData(readerName string, startReadTime time.Time, stopTsNs int64, + waitForDataFn func() bool, eachLogDataFn func(logEntry *filer_pb.LogEntry) error) (lastReadTime time.Time, isDone bool, err error) { // loop through all messages var bytesBuf *bytes.Buffer - lastReadTime = startTreadTime + lastReadTime = startReadTime defer func() { if bytesBuf != nil { logBuffer.ReleaseMemory(bytesBuf) @@ -34,10 +35,15 @@ func (logBuffer *LogBuffer) LoopProcessLogData(readerName string, startTreadTime } bytesBuf, err = logBuffer.ReadFromBuffer(lastReadTime) if err == ResumeFromDiskError { - return lastReadTime, ResumeFromDiskError + time.Sleep(1127 * time.Millisecond) + return lastReadTime, isDone, ResumeFromDiskError } // glog.V(4).Infof("%s ReadFromBuffer by %v", readerName, lastReadTime) if bytesBuf == nil { + if stopTsNs != 0 { + isDone = true + return + } if waitForDataFn() { continue } else { @@ -49,7 +55,6 @@ func (logBuffer *LogBuffer) LoopProcessLogData(readerName string, startTreadTime // fmt.Printf("ReadFromBuffer %s by %v size %d\n", readerName, lastReadTime, len(buf)) batchSize := 0 - var startReadTime time.Time for pos := 0; pos+4 < len(buf); { @@ -67,10 +72,11 @@ func (logBuffer *LogBuffer) LoopProcessLogData(readerName string, startTreadTime pos += 4 + int(size) continue } - lastReadTime = time.Unix(0, logEntry.TsNs) - if startReadTime.IsZero() { - startReadTime = lastReadTime + if stopTsNs != 0 && logEntry.TsNs > stopTsNs { + isDone = true + return } + lastReadTime = time.Unix(0, logEntry.TsNs) if err = eachLogDataFn(logEntry); err != nil { return diff --git a/weed/util/mem/slot_pool.go b/weed/util/mem/slot_pool.go new file mode 100644 index 000000000..b3493febf --- /dev/null +++ b/weed/util/mem/slot_pool.go @@ -0,0 +1,63 @@ +package mem + +import ( + "github.com/chrislusf/seaweedfs/weed/glog" + "sync" + "sync/atomic" +) + +var pools []*sync.Pool + +const ( + min_size = 1024 +) + +func bitCount(size int) (count int) { + for ; size > min_size; count++ { + size = (size + 1) >> 1 + } + return +} + +func init() { + // 1KB ~ 256MB + pools = make([]*sync.Pool, bitCount(1024*1024*256)) + for i := 0; i < len(pools); i++ { + slotSize := 1024 << i + pools[i] = &sync.Pool{ + New: func() interface{} { + buffer := make([]byte, slotSize) + return &buffer + }, + } + } +} + +func getSlotPool(size int) (*sync.Pool, bool) { + index := bitCount(size) + if index >= len(pools) { + return nil, false + } + return pools[index], true +} + +var total int64 + +func Allocate(size int) []byte { + if pool, found := getSlotPool(size); found { + newVal := atomic.AddInt64(&total, 1) + glog.V(4).Infof("++> %d", newVal) + + slab := *pool.Get().(*[]byte) + return slab[:size] + } + return make([]byte, size) +} + +func Free(buf []byte) { + if pool, found := getSlotPool(cap(buf)); found { + newVal := atomic.AddInt64(&total, -1) + glog.V(4).Infof("--> %d", newVal) + pool.Put(&buf) + } +} diff --git a/weed/util/mem/slot_pool_test.go b/weed/util/mem/slot_pool_test.go new file mode 100644 index 000000000..44f9ec004 --- /dev/null +++ b/weed/util/mem/slot_pool_test.go @@ -0,0 +1,48 @@ +package mem + +import ( + "github.com/stretchr/testify/assert" + "testing" +) + +func TestAllocateFree(t *testing.T) { + buf := Allocate(12) + Free(buf) + if cap(buf) != min_size { + t.Errorf("min size error allocated capacity=%d", cap(buf)) + } + if len(buf) != 12 { + t.Errorf("size error") + } + + buf = Allocate(4883) + Free(buf) + if cap(buf) != 1024<<bitCount(4883) { + t.Errorf("min size error allocated capacity=%d", cap(buf)) + } + if len(buf) != 4883 { + t.Errorf("size error") + } + +} + +func TestAllocateFreeEdgeCases(t *testing.T) { + assert.Equal(t, 1, bitCount(2048)) + assert.Equal(t, 2, bitCount(2049)) + + buf := Allocate(2048) + Free(buf) + buf = Allocate(2049) + Free(buf) +} + +func TestBitCount(t *testing.T) { + count := bitCount(12) + if count != 0 { + t.Errorf("bitCount error count=%d", count) + } + if count != bitCount(min_size) { + t.Errorf("bitCount error count=%d", count) + } + +} diff --git a/weed/util/net_timeout.go b/weed/util/net_timeout.go index e8075c297..536359eec 100644 --- a/weed/util/net_timeout.go +++ b/weed/util/net_timeout.go @@ -1,6 +1,7 @@ package util import ( + "github.com/chrislusf/seaweedfs/weed/glog" "net" "time" @@ -36,11 +37,13 @@ type Conn struct { ReadTimeout time.Duration WriteTimeout time.Duration isClosed bool + bytesRead int64 + bytesWritten int64 } func (c *Conn) Read(b []byte) (count int, e error) { if c.ReadTimeout != 0 { - err := c.Conn.SetReadDeadline(time.Now().Add(c.ReadTimeout)) + err := c.Conn.SetReadDeadline(time.Now().Add(c.ReadTimeout * time.Duration(c.bytesRead/40000+1))) if err != nil { return 0, err } @@ -48,6 +51,7 @@ func (c *Conn) Read(b []byte) (count int, e error) { count, e = c.Conn.Read(b) if e == nil { stats.BytesIn(int64(count)) + c.bytesRead += int64(count) } return } @@ -55,7 +59,7 @@ func (c *Conn) Read(b []byte) (count int, e error) { func (c *Conn) Write(b []byte) (count int, e error) { if c.WriteTimeout != 0 { // minimum 4KB/s - err := c.Conn.SetWriteDeadline(time.Now().Add(c.WriteTimeout * time.Duration(len(b)/40000+1))) + err := c.Conn.SetWriteDeadline(time.Now().Add(c.WriteTimeout * time.Duration(c.bytesWritten/40000+1))) if err != nil { return 0, err } @@ -63,6 +67,7 @@ func (c *Conn) Write(b []byte) (count int, e error) { count, e = c.Conn.Write(b) if e == nil { stats.BytesOut(int64(count)) + c.bytesWritten += int64(count) } return } @@ -78,16 +83,46 @@ func (c *Conn) Close() error { return err } -func NewListener(addr string, timeout time.Duration) (net.Listener, error) { - l, err := net.Listen("tcp", addr) +func NewListener(addr string, timeout time.Duration) (ipListner net.Listener, err error) { + listner, err := net.Listen("tcp", addr) if err != nil { - return nil, err + return } - tl := &Listener{ - Listener: l, + ipListner = &Listener{ + Listener: listner, ReadTimeout: timeout, WriteTimeout: timeout, } - return tl, nil + + return +} + +func NewIpAndLocalListeners(host string, port int, timeout time.Duration) (ipListner net.Listener, localListener net.Listener, err error) { + listner, err := net.Listen("tcp", JoinHostPort(host, port)) + if err != nil { + return + } + + ipListner = &Listener{ + Listener: listner, + ReadTimeout: timeout, + WriteTimeout: timeout, + } + + if host != "localhost" && host != "" && host != "0.0.0.0" && host != "127.0.0.1" && host != "[::]" && host != "[::1]" { + listner, err = net.Listen("tcp", JoinHostPort("localhost", port)) + if err != nil { + glog.V(0).Infof("skip starting on %s:%d: %v", host, port, err) + return ipListner, nil, nil + } + + localListener = &Listener{ + Listener: listner, + ReadTimeout: timeout, + WriteTimeout: timeout, + } + } + + return } diff --git a/weed/util/network.go b/weed/util/network.go index 55a123667..687b6ec22 100644 --- a/weed/util/network.go +++ b/weed/util/network.go @@ -2,6 +2,8 @@ package util import ( "net" + "strconv" + "strings" "github.com/chrislusf/seaweedfs/weed/glog" ) @@ -13,6 +15,18 @@ func DetectedHostAddress() string { return "" } + if v4Address := selectIpV4(netInterfaces, true); v4Address != "" { + return v4Address + } + + if v6Address := selectIpV4(netInterfaces, false); v6Address != "" { + return v6Address + } + + return "localhost" +} + +func selectIpV4(netInterfaces []net.Interface, isIpV4 bool) string { for _, netInterface := range netInterfaces { if (netInterface.Flags & net.FlagUp) == 0 { continue @@ -24,12 +38,25 @@ func DetectedHostAddress() string { for _, a := range addrs { if ipNet, ok := a.(*net.IPNet); ok && !ipNet.IP.IsLoopback() { - if ipNet.IP.To4() != nil { - return ipNet.IP.String() + if isIpV4 { + if ipNet.IP.To4() != nil { + return ipNet.IP.String() + } + } else { + if ipNet.IP.To16() != nil { + return ipNet.IP.String() + } } } } } + return "" +} - return "localhost" +func JoinHostPort(host string, port int) string { + portStr := strconv.Itoa(port) + if strings.HasPrefix(host, "[") && strings.HasSuffix(host, "]") { + return host + ":" + portStr + } + return net.JoinHostPort(host, portStr) } diff --git a/weed/util/parse.go b/weed/util/parse.go index 0955db682..502f3a80f 100644 --- a/weed/util/parse.go +++ b/weed/util/parse.go @@ -61,3 +61,8 @@ func ParseHostPort(hostPort string) (filerServer string, filerPort int64, err er return } + +func CanonicalizeETag(etag string) string { + canonicalETag := strings.TrimPrefix(etag, "\"") + return strings.TrimSuffix(canonicalETag, "\"") +} diff --git a/weed/util/retry.go b/weed/util/retry.go index e1ad99d54..b7cd278b3 100644 --- a/weed/util/retry.go +++ b/weed/util/retry.go @@ -37,6 +37,7 @@ func RetryForever(name string, job func() error, onErrFn func(err error) bool) { for { err := job() if err == nil { + waitTime = time.Second break } if onErrFn(err) { diff --git a/weed/util/skiplist/Makefile b/weed/util/skiplist/Makefile new file mode 100644 index 000000000..af4afe639 --- /dev/null +++ b/weed/util/skiplist/Makefile @@ -0,0 +1,6 @@ +all: gen + +.PHONY : gen + +gen: + protoc skiplist.proto --go_out=plugins=grpc:. --go_opt=paths=source_relative diff --git a/weed/util/skiplist/list_store.go b/weed/util/skiplist/list_store.go new file mode 100644 index 000000000..0eb1106bc --- /dev/null +++ b/weed/util/skiplist/list_store.go @@ -0,0 +1,32 @@ +package skiplist + +type ListStore interface { + SaveElement(id int64, element *SkipListElement) error + DeleteElement(id int64) error + LoadElement(id int64) (*SkipListElement, error) +} + +type MemStore struct { + m map[int64]*SkipListElement +} + +func newMemStore() *MemStore { + return &MemStore{ + m: make(map[int64]*SkipListElement), + } +} + +func (m *MemStore) SaveElement(id int64, element *SkipListElement) error { + m.m[id] = element + return nil +} + +func (m *MemStore) DeleteElement(id int64) error { + delete(m.m, id) + return nil +} + +func (m *MemStore) LoadElement(id int64) (*SkipListElement, error) { + element := m.m[id] + return element, nil +} diff --git a/weed/util/skiplist/name_batch.go b/weed/util/skiplist/name_batch.go new file mode 100644 index 000000000..53db5918f --- /dev/null +++ b/weed/util/skiplist/name_batch.go @@ -0,0 +1,102 @@ +package skiplist + +import ( + "github.com/chrislusf/seaweedfs/weed/glog" + "github.com/golang/protobuf/proto" + "golang.org/x/exp/slices" + "strings" +) + +type NameBatch struct { + key string + names map[string]struct{} +} + +func (nb *NameBatch) ContainsName(name string) (found bool) { + _, found = nb.names[name] + return +} +func (nb *NameBatch) WriteName(name string) { + if nb.key == "" || strings.Compare(nb.key, name) > 0 { + nb.key = name + } + nb.names[name] = struct{}{} +} +func (nb *NameBatch) DeleteName(name string) { + delete(nb.names, name) + if nb.key == name { + nb.key = "" + for n := range nb.names { + if nb.key == "" || strings.Compare(nb.key, n) > 0 { + nb.key = n + } + } + } +} +func (nb *NameBatch) ListNames(startFrom string, visitNamesFn func(name string) bool) bool { + var names []string + needFilter := startFrom != "" + for n := range nb.names { + if !needFilter || strings.Compare(n, startFrom) >= 0 { + names = append(names, n) + } + } + slices.SortFunc(names, func(a, b string) bool { + return strings.Compare(a, b) < 0 + }) + for _, n := range names { + if !visitNamesFn(n) { + return false + } + } + return true +} + +func NewNameBatch() *NameBatch { + return &NameBatch{ + names: make(map[string]struct{}), + } +} + +func LoadNameBatch(data []byte) *NameBatch { + t := &NameBatchData{} + if len(data) > 0 { + err := proto.Unmarshal(data, t) + if err != nil { + glog.Errorf("unmarshal into NameBatchData{} : %v", err) + return nil + } + } + nb := NewNameBatch() + for _, n := range t.Names { + name := string(n) + if nb.key == "" || strings.Compare(nb.key, name) > 0 { + nb.key = name + } + nb.names[name] = struct{}{} + } + return nb +} + +func (nb *NameBatch) ToBytes() []byte { + t := &NameBatchData{} + for n := range nb.names { + t.Names = append(t.Names, []byte(n)) + } + data, _ := proto.Marshal(t) + return data +} + +func (nb *NameBatch) SplitBy(name string) (x, y *NameBatch) { + x, y = NewNameBatch(), NewNameBatch() + + for n := range nb.names { + // there should be no equal case though + if strings.Compare(n, name) <= 0 { + x.WriteName(n) + } else { + y.WriteName(n) + } + } + return +} diff --git a/weed/util/skiplist/name_list.go b/weed/util/skiplist/name_list.go new file mode 100644 index 000000000..1d4d2ebae --- /dev/null +++ b/weed/util/skiplist/name_list.go @@ -0,0 +1,326 @@ +package skiplist + +import ( + "bytes" +) + +type NameList struct { + skipList *SkipList + batchSize int +} + +func newNameList(store ListStore, batchSize int) *NameList { + return &NameList{ + skipList: New(store), + batchSize: batchSize, + } +} + +/* +Be reluctant to create new nodes. Try to fit into either previous node or next node. +Prefer to add to previous node. + +There are multiple cases after finding the name for greater or equal node + 1. found and node.Key == name + The node contains a batch with leading key the same as the name + nothing to do + 2. no such node found or node.Key > name + + if no such node found + prevNode = list.LargestNode + + // case 2.1 + if previousNode contains name + nothing to do + + // prefer to add to previous node + if prevNode != nil { + // case 2.2 + if prevNode has capacity + prevNode.add name, and save + return + // case 2.3 + split prevNode by name + } + + // case 2.4 + // merge into next node. Avoid too many nodes if adding data in reverse order. + if nextNode is not nil and nextNode has capacity + delete nextNode.Key + nextNode.Key = name + nextNode.batch.add name + insert nodeNode.Key + return + + // case 2.5 + if prevNode is nil + insert new node with key = name, value = batch{name} + return + +*/ +func (nl *NameList) WriteName(name string) error { + + lookupKey := []byte(name) + prevNode, nextNode, found, err := nl.skipList.FindGreaterOrEqual(lookupKey) + if err != nil { + return err + } + // case 1: the name already exists as one leading key in the batch + if found && bytes.Compare(nextNode.Key, lookupKey) == 0 { + return nil + } + + if !found { + prevNode, err = nl.skipList.GetLargestNode() + if err != nil { + return err + } + } + + if nextNode != nil && prevNode == nil { + prevNode, err = nl.skipList.LoadElement(nextNode.Prev) + if err != nil { + return err + } + } + + if prevNode != nil { + prevNameBatch := LoadNameBatch(prevNode.Value) + // case 2.1 + if prevNameBatch.ContainsName(name) { + return nil + } + + // case 2.2 + if len(prevNameBatch.names) < nl.batchSize { + prevNameBatch.WriteName(name) + return nl.skipList.ChangeValue(prevNode, prevNameBatch.ToBytes()) + } + + // case 2.3 + x, y := prevNameBatch.SplitBy(name) + addToX := len(x.names) <= len(y.names) + if len(x.names) != len(prevNameBatch.names) { + if addToX { + x.WriteName(name) + } + if x.key == prevNameBatch.key { + if err := nl.skipList.ChangeValue(prevNode, x.ToBytes()); err != nil { + return err + } + } else { + if _, err := nl.skipList.InsertByKey([]byte(x.key), 0, x.ToBytes()); err != nil { + return err + } + } + } + if len(y.names) != len(prevNameBatch.names) { + if !addToX { + y.WriteName(name) + } + if y.key == prevNameBatch.key { + if err := nl.skipList.ChangeValue(prevNode, y.ToBytes()); err != nil { + return err + } + } else { + if _, err := nl.skipList.InsertByKey([]byte(y.key), 0, y.ToBytes()); err != nil { + return err + } + } + } + return nil + + } + + // case 2.4 + if nextNode != nil { + nextNameBatch := LoadNameBatch(nextNode.Value) + if len(nextNameBatch.names) < nl.batchSize { + if _, err := nl.skipList.DeleteByKey(nextNode.Key); err != nil { + return err + } + nextNameBatch.WriteName(name) + if _, err := nl.skipList.InsertByKey([]byte(nextNameBatch.key), 0, nextNameBatch.ToBytes()); err != nil { + return err + } + return nil + } + } + + // case 2.5 + // now prevNode is nil + newNameBatch := NewNameBatch() + newNameBatch.WriteName(name) + if _, err := nl.skipList.InsertByKey([]byte(newNameBatch.key), 0, newNameBatch.ToBytes()); err != nil { + return err + } + + return nil +} + +/* +// case 1: exists in nextNode +if nextNode != nil && nextNode.Key == name { + remove from nextNode, update nextNode + // TODO: merge with prevNode if possible? + return +} +if nextNode is nil + prevNode = list.Largestnode +if prevNode == nil and nextNode.Prev != nil + prevNode = load(nextNode.Prev) + +// case 2: does not exist +// case 2.1 +if prevNode == nil { + return +} +// case 2.2 +if prevNameBatch does not contain name { + return +} + +// case 3 +delete from prevNameBatch +if prevNameBatch + nextNode < capacityList + // case 3.1 + merge +else + // case 3.2 + update prevNode + + +*/ +func (nl *NameList) DeleteName(name string) error { + lookupKey := []byte(name) + prevNode, nextNode, found, err := nl.skipList.FindGreaterOrEqual(lookupKey) + if err != nil { + return err + } + + // case 1 + var nextNameBatch *NameBatch + if nextNode != nil { + nextNameBatch = LoadNameBatch(nextNode.Value) + } + if found && bytes.Compare(nextNode.Key, lookupKey) == 0 { + if _, err := nl.skipList.DeleteByKey(nextNode.Key); err != nil { + return err + } + nextNameBatch.DeleteName(name) + if len(nextNameBatch.names) > 0 { + if _, err := nl.skipList.InsertByKey([]byte(nextNameBatch.key), 0, nextNameBatch.ToBytes()); err != nil { + return err + } + } + return nil + } + + if !found { + prevNode, err = nl.skipList.GetLargestNode() + if err != nil { + return err + } + } + + if nextNode != nil && prevNode == nil { + prevNode, err = nl.skipList.LoadElement(nextNode.Prev) + if err != nil { + return err + } + } + + // case 2 + if prevNode == nil { + // case 2.1 + return nil + } + prevNameBatch := LoadNameBatch(prevNode.Value) + if !prevNameBatch.ContainsName(name) { + // case 2.2 + return nil + } + + // case 3 + prevNameBatch.DeleteName(name) + if len(prevNameBatch.names) == 0 { + if _, err := nl.skipList.DeleteByKey(prevNode.Key); err != nil { + return err + } + return nil + } + if nextNameBatch != nil && len(nextNameBatch.names)+len(prevNameBatch.names) < nl.batchSize { + // case 3.1 merge nextNode and prevNode + if _, err := nl.skipList.DeleteByKey(nextNode.Key); err != nil { + return err + } + for nextName := range nextNameBatch.names { + prevNameBatch.WriteName(nextName) + } + return nl.skipList.ChangeValue(prevNode, prevNameBatch.ToBytes()) + } else { + // case 3.2 update prevNode + return nl.skipList.ChangeValue(prevNode, prevNameBatch.ToBytes()) + } + + return nil +} + +func (nl *NameList) ListNames(startFrom string, visitNamesFn func(name string) bool) error { + lookupKey := []byte(startFrom) + prevNode, nextNode, found, err := nl.skipList.FindGreaterOrEqual(lookupKey) + if err != nil { + return err + } + if found && bytes.Compare(nextNode.Key, lookupKey) == 0 { + prevNode = nil + } + if !found { + prevNode, err = nl.skipList.GetLargestNode() + if err != nil { + return err + } + } + + if prevNode != nil { + prevNameBatch := LoadNameBatch(prevNode.Value) + if !prevNameBatch.ListNames(startFrom, visitNamesFn) { + return nil + } + } + + for nextNode != nil { + nextNameBatch := LoadNameBatch(nextNode.Value) + if !nextNameBatch.ListNames(startFrom, visitNamesFn) { + return nil + } + nextNode, err = nl.skipList.LoadElement(nextNode.Next[0]) + if err != nil { + return err + } + } + + return nil +} + +func (nl *NameList) RemoteAllListElement() error { + + t := nl.skipList + + nodeRef := t.StartLevels[0] + for nodeRef != nil { + node, err := t.LoadElement(nodeRef) + if err != nil { + return err + } + if node == nil { + return nil + } + if err := t.DeleteElement(node); err != nil { + return err + } + nodeRef = node.Next[0] + } + return nil + +} diff --git a/weed/util/skiplist/name_list_serde.go b/weed/util/skiplist/name_list_serde.go new file mode 100644 index 000000000..0a2052e7b --- /dev/null +++ b/weed/util/skiplist/name_list_serde.go @@ -0,0 +1,71 @@ +package skiplist + +import ( + "github.com/chrislusf/seaweedfs/weed/glog" + "github.com/golang/protobuf/proto" +) + +func LoadNameList(data []byte, store ListStore, batchSize int) *NameList { + + nl := &NameList{ + skipList: New(store), + batchSize: batchSize, + } + + if len(data) == 0 { + return nl + } + + message := &SkipListProto{} + if err := proto.Unmarshal(data, message); err != nil { + glog.Errorf("loading skiplist: %v", err) + } + nl.skipList.MaxNewLevel = int(message.MaxNewLevel) + nl.skipList.MaxLevel = int(message.MaxLevel) + for i, ref := range message.StartLevels { + nl.skipList.StartLevels[i] = &SkipListElementReference{ + ElementPointer: ref.ElementPointer, + Key: ref.Key, + } + } + for i, ref := range message.EndLevels { + nl.skipList.EndLevels[i] = &SkipListElementReference{ + ElementPointer: ref.ElementPointer, + Key: ref.Key, + } + } + return nl +} + +func (nl *NameList) HasChanges() bool { + return nl.skipList.HasChanges +} + +func (nl *NameList) ToBytes() []byte { + message := &SkipListProto{} + message.MaxNewLevel = int32(nl.skipList.MaxNewLevel) + message.MaxLevel = int32(nl.skipList.MaxLevel) + for _, ref := range nl.skipList.StartLevels { + if ref == nil { + break + } + message.StartLevels = append(message.StartLevels, &SkipListElementReference{ + ElementPointer: ref.ElementPointer, + Key: ref.Key, + }) + } + for _, ref := range nl.skipList.EndLevels { + if ref == nil { + break + } + message.EndLevels = append(message.EndLevels, &SkipListElementReference{ + ElementPointer: ref.ElementPointer, + Key: ref.Key, + }) + } + data, err := proto.Marshal(message) + if err != nil { + glog.Errorf("marshal skiplist: %v", err) + } + return data +} diff --git a/weed/util/skiplist/name_list_test.go b/weed/util/skiplist/name_list_test.go new file mode 100644 index 000000000..9d539c9ba --- /dev/null +++ b/weed/util/skiplist/name_list_test.go @@ -0,0 +1,73 @@ +package skiplist + +import ( + "math/rand" + "strconv" + "testing" +) + +const ( + maxNameCount = 100 +) + +func String(x int) string { + return strconv.Itoa(x) +} + +func TestNameList(t *testing.T) { + list := newNameList(memStore, 7) + + for i := 0; i < maxNameCount; i++ { + list.WriteName(String(i)) + } + + counter := 0 + list.ListNames("", func(name string) bool { + counter++ + print(name, " ") + return true + }) + if counter != maxNameCount { + t.Fail() + } + + // list.skipList.println() + + deleteBase := 5 + deleteCount := maxNameCount - 3*deleteBase + + for i := deleteBase; i < deleteBase+deleteCount; i++ { + list.DeleteName(String(i)) + } + + counter = 0 + list.ListNames("", func(name string) bool { + counter++ + return true + }) + // list.skipList.println() + if counter != maxNameCount-deleteCount { + t.Fail() + } + + // randomized deletion + list = newNameList(memStore, 7) + // Delete elements at random positions in the list. + rList := rand.Perm(maxN) + for _, i := range rList { + list.WriteName(String(i)) + } + for _, i := range rList { + list.DeleteName(String(i)) + } + counter = 0 + list.ListNames("", func(name string) bool { + counter++ + print(name, " ") + return true + }) + if counter != 0 { + t.Fail() + } + +} diff --git a/weed/util/skiplist/skiplist.go b/weed/util/skiplist/skiplist.go new file mode 100644 index 000000000..21eed4b43 --- /dev/null +++ b/weed/util/skiplist/skiplist.go @@ -0,0 +1,571 @@ +package skiplist + +// adapted from https://github.com/MauriceGit/skiplist/blob/master/skiplist.go + +import ( + "bytes" + "fmt" + "math/bits" + "math/rand" + "time" +) + +const ( + // maxLevel denotes the maximum height of the skiplist. This height will keep the skiplist + // efficient for up to 34m entries. If there is a need for much more, please adjust this constant accordingly. + maxLevel = 25 +) + +type SkipList struct { + StartLevels [maxLevel]*SkipListElementReference + EndLevels [maxLevel]*SkipListElementReference + MaxNewLevel int + MaxLevel int + ListStore ListStore + HasChanges bool + // elementCount int +} + +// NewSeedEps returns a new empty, initialized Skiplist. +// Given a seed, a deterministic height/list behaviour can be achieved. +// Eps is used to compare keys given by the ExtractKey() function on equality. +func NewSeed(seed int64, listStore ListStore) *SkipList { + + // Initialize random number generator. + rand.Seed(seed) + //fmt.Printf("SkipList seed: %v\n", seed) + + list := &SkipList{ + MaxNewLevel: maxLevel, + MaxLevel: 0, + ListStore: listStore, + // elementCount: 0, + } + + return list +} + +// New returns a new empty, initialized Skiplist. +func New(listStore ListStore) *SkipList { + return NewSeed(time.Now().UTC().UnixNano(), listStore) +} + +// IsEmpty checks, if the skiplist is empty. +func (t *SkipList) IsEmpty() bool { + return t.StartLevels[0] == nil +} + +func (t *SkipList) generateLevel(maxLevel int) int { + level := maxLevel - 1 + // First we apply some mask which makes sure that we don't get a level + // above our desired level. Then we find the first set bit. + var x = rand.Uint64() & ((1 << uint(maxLevel-1)) - 1) + zeroes := bits.TrailingZeros64(x) + if zeroes <= maxLevel { + level = zeroes + } + + return level +} + +func (t *SkipList) findEntryIndex(key []byte, minLevel int) int { + // Find good entry point so we don't accidentally skip half the list. + for i := t.MaxLevel; i >= 0; i-- { + if t.StartLevels[i] != nil && bytes.Compare(t.StartLevels[i].Key, key) < 0 || i <= minLevel { + return i + } + } + return 0 +} + +func (t *SkipList) findExtended(key []byte, findGreaterOrEqual bool) (prevElementIfVisited *SkipListElement, foundElem *SkipListElement, ok bool, err error) { + + foundElem = nil + ok = false + + if t.IsEmpty() { + return + } + + index := t.findEntryIndex(key, 0) + var currentNode *SkipListElement + + currentNode, err = t.LoadElement(t.StartLevels[index]) + if err != nil { + return + } + if currentNode == nil { + return + } + + // In case, that our first element is already greater-or-equal! + if findGreaterOrEqual && compareElement(currentNode, key) > 0 { + foundElem = currentNode + ok = true + return + } + + for { + if compareElement(currentNode, key) == 0 { + foundElem = currentNode + ok = true + return + } + + // Which direction are we continuing next time? + if currentNode.Next[index] != nil && bytes.Compare(currentNode.Next[index].Key, key) <= 0 { + // Go right + currentNode, err = t.LoadElement(currentNode.Next[index]) + if err != nil { + return + } + if currentNode == nil { + return + } + } else { + if index > 0 { + + // Early exit + if currentNode.Next[0] != nil && bytes.Compare(currentNode.Next[0].Key, key) == 0 { + prevElementIfVisited = currentNode + var currentNodeNext *SkipListElement + currentNodeNext, err = t.LoadElement(currentNode.Next[0]) + if err != nil { + return + } + if currentNodeNext == nil { + return + } + foundElem = currentNodeNext + ok = true + return + } + // Go down + index-- + } else { + // Element is not found and we reached the bottom. + if findGreaterOrEqual { + foundElem, err = t.LoadElement(currentNode.Next[index]) + if err != nil { + return + } + ok = foundElem != nil + } + + return + } + } + } +} + +// Find tries to find an element in the skiplist based on the key from the given ListElement. +// elem can be used, if ok is true. +// Find runs in approx. O(log(n)) +func (t *SkipList) Find(key []byte) (prevIfVisited *SkipListElement, elem *SkipListElement, ok bool, err error) { + + if t == nil || key == nil { + return + } + + prevIfVisited, elem, ok, err = t.findExtended(key, false) + return +} + +// FindGreaterOrEqual finds the first element, that is greater or equal to the given ListElement e. +// The comparison is done on the keys (So on ExtractKey()). +// FindGreaterOrEqual runs in approx. O(log(n)) +func (t *SkipList) FindGreaterOrEqual(key []byte) (prevIfVisited *SkipListElement, elem *SkipListElement, ok bool, err error) { + + if t == nil || key == nil { + return + } + + prevIfVisited, elem, ok, err = t.findExtended(key, true) + return +} + +// Delete removes an element equal to e from the skiplist, if there is one. +// If there are multiple entries with the same value, Delete will remove one of them +// (Which one will change based on the actual skiplist layout) +// Delete runs in approx. O(log(n)) +func (t *SkipList) DeleteByKey(key []byte) (id int64, err error) { + + if t == nil || t.IsEmpty() || key == nil { + return + } + + index := t.findEntryIndex(key, t.MaxLevel) + + var currentNode *SkipListElement + var nextNode *SkipListElement + + for { + + if currentNode == nil { + nextNode, err = t.LoadElement(t.StartLevels[index]) + } else { + nextNode, err = t.LoadElement(currentNode.Next[index]) + } + if err != nil { + return id, err + } + + // Found and remove! + if nextNode != nil && compareElement(nextNode, key) == 0 { + + if currentNode != nil { + currentNode.Next[index] = nextNode.Next[index] + if err = t.SaveElement(currentNode); err != nil { + return id, err + } + } + + if index == 0 { + if nextNode.Next[index] != nil { + nextNextNode, err := t.LoadElement(nextNode.Next[index]) + if err != nil { + return id, err + } + if nextNextNode != nil { + nextNextNode.Prev = currentNode.Reference() + if err = t.SaveElement(nextNextNode); err != nil { + return id, err + } + } + } + // t.elementCount-- + id = nextNode.Id + if err = t.DeleteElement(nextNode); err != nil { + return id, err + } + } + + // Link from start needs readjustments. + startNextKey := t.StartLevels[index].Key + if compareElement(nextNode, startNextKey) == 0 { + t.HasChanges = true + t.StartLevels[index] = nextNode.Next[index] + // This was our currently highest node! + if t.StartLevels[index] == nil { + t.MaxLevel = index - 1 + } + } + + // Link from end needs readjustments. + if nextNode.Next[index] == nil { + t.EndLevels[index] = currentNode.Reference() + t.HasChanges = true + } + nextNode.Next[index] = nil + } + + if nextNode != nil && compareElement(nextNode, key) < 0 { + // Go right + currentNode = nextNode + } else { + // Go down + index-- + if index < 0 { + break + } + } + } + return +} + +// Insert inserts the given ListElement into the skiplist. +// Insert runs in approx. O(log(n)) +func (t *SkipList) InsertByKey(key []byte, idIfKnown int64, value []byte) (id int64, err error) { + + if t == nil || key == nil { + return + } + + level := t.generateLevel(t.MaxNewLevel) + + // Only grow the height of the skiplist by one at a time! + if level > t.MaxLevel { + level = t.MaxLevel + 1 + t.MaxLevel = level + t.HasChanges = true + } + + id = idIfKnown + if id == 0 { + id = rand.Int63() + } + elem := &SkipListElement{ + Id: id, + Next: make([]*SkipListElementReference, t.MaxNewLevel, t.MaxNewLevel), + Level: int32(level), + Key: key, + Value: value, + } + + // t.elementCount++ + + newFirst := true + newLast := true + if !t.IsEmpty() { + newFirst = compareElement(elem, t.StartLevels[0].Key) < 0 + newLast = compareElement(elem, t.EndLevels[0].Key) > 0 + } + + normallyInserted := false + if !newFirst && !newLast { + + normallyInserted = true + + index := t.findEntryIndex(key, level) + + var currentNode *SkipListElement + var nextNodeRef *SkipListElementReference + + for { + + if currentNode == nil { + nextNodeRef = t.StartLevels[index] + } else { + nextNodeRef = currentNode.Next[index] + } + + var nextNode *SkipListElement + + // Connect node to next + if index <= level && (nextNodeRef == nil || bytes.Compare(nextNodeRef.Key, key) > 0) { + elem.Next[index] = nextNodeRef + if currentNode != nil { + currentNode.Next[index] = elem.Reference() + if err = t.SaveElement(currentNode); err != nil { + return + } + } + if index == 0 { + elem.Prev = currentNode.Reference() + if nextNodeRef != nil { + if nextNode, err = t.LoadElement(nextNodeRef); err != nil { + return + } + if nextNode != nil { + nextNode.Prev = elem.Reference() + if err = t.SaveElement(nextNode); err != nil { + return + } + } + } + } + } + + if nextNodeRef != nil && bytes.Compare(nextNodeRef.Key, key) <= 0 { + // Go right + if nextNode == nil { + // reuse nextNode when index == 0 + if nextNode, err = t.LoadElement(nextNodeRef); err != nil { + return + } + } + currentNode = nextNode + if currentNode == nil { + return + } + } else { + // Go down + index-- + if index < 0 { + break + } + } + } + } + + // Where we have a left-most position that needs to be referenced! + for i := level; i >= 0; i-- { + + didSomething := false + + if newFirst || normallyInserted { + + if t.StartLevels[i] == nil || bytes.Compare(t.StartLevels[i].Key, key) > 0 { + if i == 0 && t.StartLevels[i] != nil { + startLevelElement, err := t.LoadElement(t.StartLevels[i]) + if err != nil { + return id, err + } + if startLevelElement != nil { + startLevelElement.Prev = elem.Reference() + if err = t.SaveElement(startLevelElement); err != nil { + return id, err + } + } + } + elem.Next[i] = t.StartLevels[i] + t.StartLevels[i] = elem.Reference() + t.HasChanges = true + } + + // link the EndLevels to this element! + if elem.Next[i] == nil { + t.EndLevels[i] = elem.Reference() + t.HasChanges = true + } + + didSomething = true + } + + if newLast { + // Places the element after the very last element on this level! + // This is very important, so we are not linking the very first element (newFirst AND newLast) to itself! + if !newFirst { + if t.EndLevels[i] != nil { + endLevelElement, err := t.LoadElement(t.EndLevels[i]) + if err != nil { + return id, err + } + if endLevelElement != nil { + endLevelElement.Next[i] = elem.Reference() + if err = t.SaveElement(endLevelElement); err != nil { + return id, err + } + } + } + if i == 0 { + elem.Prev = t.EndLevels[i] + } + t.EndLevels[i] = elem.Reference() + t.HasChanges = true + } + + // Link the startLevels to this element! + if t.StartLevels[i] == nil || bytes.Compare(t.StartLevels[i].Key, key) > 0 { + t.StartLevels[i] = elem.Reference() + t.HasChanges = true + } + + didSomething = true + } + + if !didSomething { + break + } + } + + if err = t.SaveElement(elem); err != nil { + return id, err + } + return id, nil + +} + +// GetSmallestNode returns the very first/smallest node in the skiplist. +// GetSmallestNode runs in O(1) +func (t *SkipList) GetSmallestNode() (*SkipListElement, error) { + return t.LoadElement(t.StartLevels[0]) +} + +// GetLargestNode returns the very last/largest node in the skiplist. +// GetLargestNode runs in O(1) +func (t *SkipList) GetLargestNode() (*SkipListElement, error) { + return t.LoadElement(t.EndLevels[0]) +} +func (t *SkipList) GetLargestNodeReference() *SkipListElementReference { + return t.EndLevels[0] +} + +// Next returns the next element based on the given node. +// Next will loop around to the first node, if you call it on the last! +func (t *SkipList) Next(e *SkipListElement) (*SkipListElement, error) { + if e.Next[0] == nil { + return t.LoadElement(t.StartLevels[0]) + } + return t.LoadElement(e.Next[0]) +} + +// Prev returns the previous element based on the given node. +// Prev will loop around to the last node, if you call it on the first! +func (t *SkipList) Prev(e *SkipListElement) (*SkipListElement, error) { + if e.Prev == nil { + return t.LoadElement(t.EndLevels[0]) + } + return t.LoadElement(e.Prev) +} + +// ChangeValue can be used to change the actual value of a node in the skiplist +// without the need of Deleting and reinserting the node again. +// Be advised, that ChangeValue only works, if the actual key from ExtractKey() will stay the same! +// ok is an indicator, wether the value is actually changed. +func (t *SkipList) ChangeValue(e *SkipListElement, newValue []byte) (err error) { + // The key needs to stay correct, so this is very important! + e.Value = newValue + return t.SaveElement(e) +} + +// String returns a string format of the skiplist. Useful to get a graphical overview and/or debugging. +func (t *SkipList) println() { + + print("start --> ") + for i, l := range t.StartLevels { + if l == nil { + break + } + if i > 0 { + print(" -> ") + } + next := "---" + if l != nil { + next = string(l.Key) + } + print(fmt.Sprintf("[%v]", next)) + } + println() + + nodeRef := t.StartLevels[0] + for nodeRef != nil { + print(fmt.Sprintf("%v: ", string(nodeRef.Key))) + node, _ := t.LoadElement(nodeRef) + if node == nil { + break + } + for i := 0; i <= int(node.Level); i++ { + + l := node.Next[i] + + next := "---" + if l != nil { + next = string(l.Key) + } + + if i == 0 { + prev := "---" + + if node.Prev != nil { + prev = string(node.Prev.Key) + } + print(fmt.Sprintf("[%v|%v]", prev, next)) + } else { + print(fmt.Sprintf("[%v]", next)) + } + if i < int(node.Level) { + print(" -> ") + } + + } + nodeRef = node.Next[0] + println() + } + + print("end --> ") + for i, l := range t.EndLevels { + if l == nil { + break + } + if i > 0 { + print(" -> ") + } + next := "---" + if l != nil { + next = string(l.Key) + } + print(fmt.Sprintf("[%v]", next)) + } + println() +} diff --git a/weed/util/skiplist/skiplist.pb.go b/weed/util/skiplist/skiplist.pb.go new file mode 100644 index 000000000..adb121bfc --- /dev/null +++ b/weed/util/skiplist/skiplist.pb.go @@ -0,0 +1,438 @@ +// Code generated by protoc-gen-go. DO NOT EDIT. +// versions: +// protoc-gen-go v1.25.0 +// protoc v3.12.3 +// source: skiplist.proto + +package skiplist + +import ( + proto "github.com/golang/protobuf/proto" + protoreflect "google.golang.org/protobuf/reflect/protoreflect" + protoimpl "google.golang.org/protobuf/runtime/protoimpl" + reflect "reflect" + sync "sync" +) + +const ( + // Verify that this generated code is sufficiently up-to-date. + _ = protoimpl.EnforceVersion(20 - protoimpl.MinVersion) + // Verify that runtime/protoimpl is sufficiently up-to-date. + _ = protoimpl.EnforceVersion(protoimpl.MaxVersion - 20) +) + +// This is a compile-time assertion that a sufficiently up-to-date version +// of the legacy proto package is being used. +const _ = proto.ProtoPackageIsVersion4 + +type SkipListProto struct { + state protoimpl.MessageState + sizeCache protoimpl.SizeCache + unknownFields protoimpl.UnknownFields + + StartLevels []*SkipListElementReference `protobuf:"bytes,1,rep,name=start_levels,json=startLevels,proto3" json:"start_levels,omitempty"` + EndLevels []*SkipListElementReference `protobuf:"bytes,2,rep,name=end_levels,json=endLevels,proto3" json:"end_levels,omitempty"` + MaxNewLevel int32 `protobuf:"varint,3,opt,name=max_new_level,json=maxNewLevel,proto3" json:"max_new_level,omitempty"` + MaxLevel int32 `protobuf:"varint,4,opt,name=max_level,json=maxLevel,proto3" json:"max_level,omitempty"` +} + +func (x *SkipListProto) Reset() { + *x = SkipListProto{} + if protoimpl.UnsafeEnabled { + mi := &file_skiplist_proto_msgTypes[0] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) + } +} + +func (x *SkipListProto) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*SkipListProto) ProtoMessage() {} + +func (x *SkipListProto) ProtoReflect() protoreflect.Message { + mi := &file_skiplist_proto_msgTypes[0] + if protoimpl.UnsafeEnabled && x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use SkipListProto.ProtoReflect.Descriptor instead. +func (*SkipListProto) Descriptor() ([]byte, []int) { + return file_skiplist_proto_rawDescGZIP(), []int{0} +} + +func (x *SkipListProto) GetStartLevels() []*SkipListElementReference { + if x != nil { + return x.StartLevels + } + return nil +} + +func (x *SkipListProto) GetEndLevels() []*SkipListElementReference { + if x != nil { + return x.EndLevels + } + return nil +} + +func (x *SkipListProto) GetMaxNewLevel() int32 { + if x != nil { + return x.MaxNewLevel + } + return 0 +} + +func (x *SkipListProto) GetMaxLevel() int32 { + if x != nil { + return x.MaxLevel + } + return 0 +} + +type SkipListElementReference struct { + state protoimpl.MessageState + sizeCache protoimpl.SizeCache + unknownFields protoimpl.UnknownFields + + ElementPointer int64 `protobuf:"varint,1,opt,name=element_pointer,json=elementPointer,proto3" json:"element_pointer,omitempty"` + Key []byte `protobuf:"bytes,2,opt,name=key,proto3" json:"key,omitempty"` +} + +func (x *SkipListElementReference) Reset() { + *x = SkipListElementReference{} + if protoimpl.UnsafeEnabled { + mi := &file_skiplist_proto_msgTypes[1] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) + } +} + +func (x *SkipListElementReference) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*SkipListElementReference) ProtoMessage() {} + +func (x *SkipListElementReference) ProtoReflect() protoreflect.Message { + mi := &file_skiplist_proto_msgTypes[1] + if protoimpl.UnsafeEnabled && x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use SkipListElementReference.ProtoReflect.Descriptor instead. +func (*SkipListElementReference) Descriptor() ([]byte, []int) { + return file_skiplist_proto_rawDescGZIP(), []int{1} +} + +func (x *SkipListElementReference) GetElementPointer() int64 { + if x != nil { + return x.ElementPointer + } + return 0 +} + +func (x *SkipListElementReference) GetKey() []byte { + if x != nil { + return x.Key + } + return nil +} + +type SkipListElement struct { + state protoimpl.MessageState + sizeCache protoimpl.SizeCache + unknownFields protoimpl.UnknownFields + + Id int64 `protobuf:"varint,1,opt,name=id,proto3" json:"id,omitempty"` + Next []*SkipListElementReference `protobuf:"bytes,2,rep,name=next,proto3" json:"next,omitempty"` + Level int32 `protobuf:"varint,3,opt,name=level,proto3" json:"level,omitempty"` + Key []byte `protobuf:"bytes,4,opt,name=key,proto3" json:"key,omitempty"` + Value []byte `protobuf:"bytes,5,opt,name=value,proto3" json:"value,omitempty"` + Prev *SkipListElementReference `protobuf:"bytes,6,opt,name=prev,proto3" json:"prev,omitempty"` +} + +func (x *SkipListElement) Reset() { + *x = SkipListElement{} + if protoimpl.UnsafeEnabled { + mi := &file_skiplist_proto_msgTypes[2] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) + } +} + +func (x *SkipListElement) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*SkipListElement) ProtoMessage() {} + +func (x *SkipListElement) ProtoReflect() protoreflect.Message { + mi := &file_skiplist_proto_msgTypes[2] + if protoimpl.UnsafeEnabled && x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use SkipListElement.ProtoReflect.Descriptor instead. +func (*SkipListElement) Descriptor() ([]byte, []int) { + return file_skiplist_proto_rawDescGZIP(), []int{2} +} + +func (x *SkipListElement) GetId() int64 { + if x != nil { + return x.Id + } + return 0 +} + +func (x *SkipListElement) GetNext() []*SkipListElementReference { + if x != nil { + return x.Next + } + return nil +} + +func (x *SkipListElement) GetLevel() int32 { + if x != nil { + return x.Level + } + return 0 +} + +func (x *SkipListElement) GetKey() []byte { + if x != nil { + return x.Key + } + return nil +} + +func (x *SkipListElement) GetValue() []byte { + if x != nil { + return x.Value + } + return nil +} + +func (x *SkipListElement) GetPrev() *SkipListElementReference { + if x != nil { + return x.Prev + } + return nil +} + +type NameBatchData struct { + state protoimpl.MessageState + sizeCache protoimpl.SizeCache + unknownFields protoimpl.UnknownFields + + Names [][]byte `protobuf:"bytes,1,rep,name=names,proto3" json:"names,omitempty"` +} + +func (x *NameBatchData) Reset() { + *x = NameBatchData{} + if protoimpl.UnsafeEnabled { + mi := &file_skiplist_proto_msgTypes[3] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) + } +} + +func (x *NameBatchData) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*NameBatchData) ProtoMessage() {} + +func (x *NameBatchData) ProtoReflect() protoreflect.Message { + mi := &file_skiplist_proto_msgTypes[3] + if protoimpl.UnsafeEnabled && x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use NameBatchData.ProtoReflect.Descriptor instead. +func (*NameBatchData) Descriptor() ([]byte, []int) { + return file_skiplist_proto_rawDescGZIP(), []int{3} +} + +func (x *NameBatchData) GetNames() [][]byte { + if x != nil { + return x.Names + } + return nil +} + +var File_skiplist_proto protoreflect.FileDescriptor + +var file_skiplist_proto_rawDesc = []byte{ + 0x0a, 0x0e, 0x73, 0x6b, 0x69, 0x70, 0x6c, 0x69, 0x73, 0x74, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, + 0x12, 0x08, 0x73, 0x6b, 0x69, 0x70, 0x6c, 0x69, 0x73, 0x74, 0x22, 0xda, 0x01, 0x0a, 0x0d, 0x53, + 0x6b, 0x69, 0x70, 0x4c, 0x69, 0x73, 0x74, 0x50, 0x72, 0x6f, 0x74, 0x6f, 0x12, 0x45, 0x0a, 0x0c, + 0x73, 0x74, 0x61, 0x72, 0x74, 0x5f, 0x6c, 0x65, 0x76, 0x65, 0x6c, 0x73, 0x18, 0x01, 0x20, 0x03, + 0x28, 0x0b, 0x32, 0x22, 0x2e, 0x73, 0x6b, 0x69, 0x70, 0x6c, 0x69, 0x73, 0x74, 0x2e, 0x53, 0x6b, + 0x69, 0x70, 0x4c, 0x69, 0x73, 0x74, 0x45, 0x6c, 0x65, 0x6d, 0x65, 0x6e, 0x74, 0x52, 0x65, 0x66, + 0x65, 0x72, 0x65, 0x6e, 0x63, 0x65, 0x52, 0x0b, 0x73, 0x74, 0x61, 0x72, 0x74, 0x4c, 0x65, 0x76, + 0x65, 0x6c, 0x73, 0x12, 0x41, 0x0a, 0x0a, 0x65, 0x6e, 0x64, 0x5f, 0x6c, 0x65, 0x76, 0x65, 0x6c, + 0x73, 0x18, 0x02, 0x20, 0x03, 0x28, 0x0b, 0x32, 0x22, 0x2e, 0x73, 0x6b, 0x69, 0x70, 0x6c, 0x69, + 0x73, 0x74, 0x2e, 0x53, 0x6b, 0x69, 0x70, 0x4c, 0x69, 0x73, 0x74, 0x45, 0x6c, 0x65, 0x6d, 0x65, + 0x6e, 0x74, 0x52, 0x65, 0x66, 0x65, 0x72, 0x65, 0x6e, 0x63, 0x65, 0x52, 0x09, 0x65, 0x6e, 0x64, + 0x4c, 0x65, 0x76, 0x65, 0x6c, 0x73, 0x12, 0x22, 0x0a, 0x0d, 0x6d, 0x61, 0x78, 0x5f, 0x6e, 0x65, + 0x77, 0x5f, 0x6c, 0x65, 0x76, 0x65, 0x6c, 0x18, 0x03, 0x20, 0x01, 0x28, 0x05, 0x52, 0x0b, 0x6d, + 0x61, 0x78, 0x4e, 0x65, 0x77, 0x4c, 0x65, 0x76, 0x65, 0x6c, 0x12, 0x1b, 0x0a, 0x09, 0x6d, 0x61, + 0x78, 0x5f, 0x6c, 0x65, 0x76, 0x65, 0x6c, 0x18, 0x04, 0x20, 0x01, 0x28, 0x05, 0x52, 0x08, 0x6d, + 0x61, 0x78, 0x4c, 0x65, 0x76, 0x65, 0x6c, 0x22, 0x55, 0x0a, 0x18, 0x53, 0x6b, 0x69, 0x70, 0x4c, + 0x69, 0x73, 0x74, 0x45, 0x6c, 0x65, 0x6d, 0x65, 0x6e, 0x74, 0x52, 0x65, 0x66, 0x65, 0x72, 0x65, + 0x6e, 0x63, 0x65, 0x12, 0x27, 0x0a, 0x0f, 0x65, 0x6c, 0x65, 0x6d, 0x65, 0x6e, 0x74, 0x5f, 0x70, + 0x6f, 0x69, 0x6e, 0x74, 0x65, 0x72, 0x18, 0x01, 0x20, 0x01, 0x28, 0x03, 0x52, 0x0e, 0x65, 0x6c, + 0x65, 0x6d, 0x65, 0x6e, 0x74, 0x50, 0x6f, 0x69, 0x6e, 0x74, 0x65, 0x72, 0x12, 0x10, 0x0a, 0x03, + 0x6b, 0x65, 0x79, 0x18, 0x02, 0x20, 0x01, 0x28, 0x0c, 0x52, 0x03, 0x6b, 0x65, 0x79, 0x22, 0xcf, + 0x01, 0x0a, 0x0f, 0x53, 0x6b, 0x69, 0x70, 0x4c, 0x69, 0x73, 0x74, 0x45, 0x6c, 0x65, 0x6d, 0x65, + 0x6e, 0x74, 0x12, 0x0e, 0x0a, 0x02, 0x69, 0x64, 0x18, 0x01, 0x20, 0x01, 0x28, 0x03, 0x52, 0x02, + 0x69, 0x64, 0x12, 0x36, 0x0a, 0x04, 0x6e, 0x65, 0x78, 0x74, 0x18, 0x02, 0x20, 0x03, 0x28, 0x0b, + 0x32, 0x22, 0x2e, 0x73, 0x6b, 0x69, 0x70, 0x6c, 0x69, 0x73, 0x74, 0x2e, 0x53, 0x6b, 0x69, 0x70, + 0x4c, 0x69, 0x73, 0x74, 0x45, 0x6c, 0x65, 0x6d, 0x65, 0x6e, 0x74, 0x52, 0x65, 0x66, 0x65, 0x72, + 0x65, 0x6e, 0x63, 0x65, 0x52, 0x04, 0x6e, 0x65, 0x78, 0x74, 0x12, 0x14, 0x0a, 0x05, 0x6c, 0x65, + 0x76, 0x65, 0x6c, 0x18, 0x03, 0x20, 0x01, 0x28, 0x05, 0x52, 0x05, 0x6c, 0x65, 0x76, 0x65, 0x6c, + 0x12, 0x10, 0x0a, 0x03, 0x6b, 0x65, 0x79, 0x18, 0x04, 0x20, 0x01, 0x28, 0x0c, 0x52, 0x03, 0x6b, + 0x65, 0x79, 0x12, 0x14, 0x0a, 0x05, 0x76, 0x61, 0x6c, 0x75, 0x65, 0x18, 0x05, 0x20, 0x01, 0x28, + 0x0c, 0x52, 0x05, 0x76, 0x61, 0x6c, 0x75, 0x65, 0x12, 0x36, 0x0a, 0x04, 0x70, 0x72, 0x65, 0x76, + 0x18, 0x06, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x22, 0x2e, 0x73, 0x6b, 0x69, 0x70, 0x6c, 0x69, 0x73, + 0x74, 0x2e, 0x53, 0x6b, 0x69, 0x70, 0x4c, 0x69, 0x73, 0x74, 0x45, 0x6c, 0x65, 0x6d, 0x65, 0x6e, + 0x74, 0x52, 0x65, 0x66, 0x65, 0x72, 0x65, 0x6e, 0x63, 0x65, 0x52, 0x04, 0x70, 0x72, 0x65, 0x76, + 0x22, 0x25, 0x0a, 0x0d, 0x4e, 0x61, 0x6d, 0x65, 0x42, 0x61, 0x74, 0x63, 0x68, 0x44, 0x61, 0x74, + 0x61, 0x12, 0x14, 0x0a, 0x05, 0x6e, 0x61, 0x6d, 0x65, 0x73, 0x18, 0x01, 0x20, 0x03, 0x28, 0x0c, + 0x52, 0x05, 0x6e, 0x61, 0x6d, 0x65, 0x73, 0x42, 0x33, 0x5a, 0x31, 0x67, 0x69, 0x74, 0x68, 0x75, + 0x62, 0x2e, 0x63, 0x6f, 0x6d, 0x2f, 0x63, 0x68, 0x72, 0x69, 0x73, 0x6c, 0x75, 0x73, 0x66, 0x2f, + 0x73, 0x65, 0x61, 0x77, 0x65, 0x65, 0x64, 0x66, 0x73, 0x2f, 0x77, 0x65, 0x65, 0x64, 0x2f, 0x75, + 0x74, 0x69, 0x6c, 0x2f, 0x73, 0x6b, 0x69, 0x70, 0x6c, 0x69, 0x73, 0x74, 0x62, 0x06, 0x70, 0x72, + 0x6f, 0x74, 0x6f, 0x33, +} + +var ( + file_skiplist_proto_rawDescOnce sync.Once + file_skiplist_proto_rawDescData = file_skiplist_proto_rawDesc +) + +func file_skiplist_proto_rawDescGZIP() []byte { + file_skiplist_proto_rawDescOnce.Do(func() { + file_skiplist_proto_rawDescData = protoimpl.X.CompressGZIP(file_skiplist_proto_rawDescData) + }) + return file_skiplist_proto_rawDescData +} + +var file_skiplist_proto_msgTypes = make([]protoimpl.MessageInfo, 4) +var file_skiplist_proto_goTypes = []interface{}{ + (*SkipListProto)(nil), // 0: skiplist.SkipListProto + (*SkipListElementReference)(nil), // 1: skiplist.SkipListElementReference + (*SkipListElement)(nil), // 2: skiplist.SkipListElement + (*NameBatchData)(nil), // 3: skiplist.NameBatchData +} +var file_skiplist_proto_depIdxs = []int32{ + 1, // 0: skiplist.SkipListProto.start_levels:type_name -> skiplist.SkipListElementReference + 1, // 1: skiplist.SkipListProto.end_levels:type_name -> skiplist.SkipListElementReference + 1, // 2: skiplist.SkipListElement.next:type_name -> skiplist.SkipListElementReference + 1, // 3: skiplist.SkipListElement.prev:type_name -> skiplist.SkipListElementReference + 4, // [4:4] is the sub-list for method output_type + 4, // [4:4] is the sub-list for method input_type + 4, // [4:4] is the sub-list for extension type_name + 4, // [4:4] is the sub-list for extension extendee + 0, // [0:4] is the sub-list for field type_name +} + +func init() { file_skiplist_proto_init() } +func file_skiplist_proto_init() { + if File_skiplist_proto != nil { + return + } + if !protoimpl.UnsafeEnabled { + file_skiplist_proto_msgTypes[0].Exporter = func(v interface{}, i int) interface{} { + switch v := v.(*SkipListProto); i { + case 0: + return &v.state + case 1: + return &v.sizeCache + case 2: + return &v.unknownFields + default: + return nil + } + } + file_skiplist_proto_msgTypes[1].Exporter = func(v interface{}, i int) interface{} { + switch v := v.(*SkipListElementReference); i { + case 0: + return &v.state + case 1: + return &v.sizeCache + case 2: + return &v.unknownFields + default: + return nil + } + } + file_skiplist_proto_msgTypes[2].Exporter = func(v interface{}, i int) interface{} { + switch v := v.(*SkipListElement); i { + case 0: + return &v.state + case 1: + return &v.sizeCache + case 2: + return &v.unknownFields + default: + return nil + } + } + file_skiplist_proto_msgTypes[3].Exporter = func(v interface{}, i int) interface{} { + switch v := v.(*NameBatchData); i { + case 0: + return &v.state + case 1: + return &v.sizeCache + case 2: + return &v.unknownFields + default: + return nil + } + } + } + type x struct{} + out := protoimpl.TypeBuilder{ + File: protoimpl.DescBuilder{ + GoPackagePath: reflect.TypeOf(x{}).PkgPath(), + RawDescriptor: file_skiplist_proto_rawDesc, + NumEnums: 0, + NumMessages: 4, + NumExtensions: 0, + NumServices: 0, + }, + GoTypes: file_skiplist_proto_goTypes, + DependencyIndexes: file_skiplist_proto_depIdxs, + MessageInfos: file_skiplist_proto_msgTypes, + }.Build() + File_skiplist_proto = out.File + file_skiplist_proto_rawDesc = nil + file_skiplist_proto_goTypes = nil + file_skiplist_proto_depIdxs = nil +} diff --git a/weed/util/skiplist/skiplist.proto b/weed/util/skiplist/skiplist.proto new file mode 100644 index 000000000..2991ad830 --- /dev/null +++ b/weed/util/skiplist/skiplist.proto @@ -0,0 +1,30 @@ +syntax = "proto3"; + +package skiplist; + +option go_package = "github.com/chrislusf/seaweedfs/weed/util/skiplist"; + +message SkipListProto { + repeated SkipListElementReference start_levels = 1; + repeated SkipListElementReference end_levels = 2; + int32 max_new_level = 3; + int32 max_level = 4; +} + +message SkipListElementReference { + int64 element_pointer = 1; + bytes key = 2; +} + +message SkipListElement { + int64 id = 1; + repeated SkipListElementReference next = 2; + int32 level = 3; + bytes key = 4; + bytes value = 5; + SkipListElementReference prev = 6; +} + +message NameBatchData { + repeated bytes names = 1; +}
\ No newline at end of file diff --git a/weed/util/skiplist/skiplist_serde.go b/weed/util/skiplist/skiplist_serde.go new file mode 100644 index 000000000..a735f66fa --- /dev/null +++ b/weed/util/skiplist/skiplist_serde.go @@ -0,0 +1,51 @@ +package skiplist + +import "bytes" + +func compareElement(a *SkipListElement, key []byte) int { + if len(a.Key) == 0 { + return -1 + } + return bytes.Compare(a.Key, key) +} + +func (node *SkipListElement) Reference() *SkipListElementReference { + if node == nil { + return nil + } + return &SkipListElementReference{ + ElementPointer: node.Id, + Key: node.Key, + } +} + +func (t *SkipList) SaveElement(element *SkipListElement) error { + if element == nil { + return nil + } + return t.ListStore.SaveElement(element.Id, element) +} + +func (t *SkipList) DeleteElement(element *SkipListElement) error { + if element == nil { + return nil + } + return t.ListStore.DeleteElement(element.Id) +} + +func (t *SkipList) LoadElement(ref *SkipListElementReference) (*SkipListElement, error) { + if ref.IsNil() { + return nil, nil + } + return t.ListStore.LoadElement(ref.ElementPointer) +} + +func (ref *SkipListElementReference) IsNil() bool { + if ref == nil { + return true + } + if len(ref.Key) == 0 { + return true + } + return false +} diff --git a/weed/util/skiplist/skiplist_test.go b/weed/util/skiplist/skiplist_test.go new file mode 100644 index 000000000..5b36cacbd --- /dev/null +++ b/weed/util/skiplist/skiplist_test.go @@ -0,0 +1,290 @@ +package skiplist + +import ( + "bytes" + "math/rand" + "strconv" + "testing" +) + +const ( + maxN = 10000 +) + +var ( + memStore = newMemStore() +) + +func TestReverseInsert(t *testing.T) { + list := NewSeed(100, memStore) + + list.InsertByKey([]byte("zzz"), 0, []byte("zzz")) + list.DeleteByKey([]byte("zzz")) + + list.InsertByKey([]byte("aaa"), 0, []byte("aaa")) + + if list.IsEmpty() { + t.Fail() + } + +} + +func TestInsertAndFind(t *testing.T) { + + k0 := []byte("0") + var list *SkipList + + var listPointer *SkipList + listPointer.InsertByKey(k0, 0, k0) + if _, _, ok, _ := listPointer.Find(k0); ok { + t.Fail() + } + + list = New(memStore) + if _, _, ok, _ := list.Find(k0); ok { + t.Fail() + } + if !list.IsEmpty() { + t.Fail() + } + + // Test at the beginning of the list. + for i := 0; i < maxN; i++ { + key := []byte(strconv.Itoa(maxN - i)) + list.InsertByKey(key, 0, key) + } + for i := 0; i < maxN; i++ { + key := []byte(strconv.Itoa(maxN - i)) + if _, _, ok, _ := list.Find(key); !ok { + t.Fail() + } + } + + list = New(memStore) + // Test at the end of the list. + for i := 0; i < maxN; i++ { + key := []byte(strconv.Itoa(i)) + list.InsertByKey(key, 0, key) + } + for i := 0; i < maxN; i++ { + key := []byte(strconv.Itoa(i)) + if _, _, ok, _ := list.Find(key); !ok { + t.Fail() + } + } + + list = New(memStore) + // Test at random positions in the list. + rList := rand.Perm(maxN) + for _, e := range rList { + key := []byte(strconv.Itoa(e)) + // println("insert", e) + list.InsertByKey(key, 0, key) + } + for _, e := range rList { + key := []byte(strconv.Itoa(e)) + // println("find", e) + if _, _, ok, _ := list.Find(key); !ok { + t.Fail() + } + } + // println("print list") + list.println() + +} + +func Element(x int) []byte { + return []byte(strconv.Itoa(x)) +} + +func TestDelete(t *testing.T) { + + k0 := []byte("0") + + var list *SkipList + + // Delete on empty list + list.DeleteByKey(k0) + + list = New(memStore) + + list.DeleteByKey(k0) + if !list.IsEmpty() { + t.Fail() + } + + list.InsertByKey(k0, 0, k0) + list.DeleteByKey(k0) + if !list.IsEmpty() { + t.Fail() + } + + // Delete elements at the beginning of the list. + for i := 0; i < maxN; i++ { + list.InsertByKey(Element(i), 0, Element(i)) + } + for i := 0; i < maxN; i++ { + list.DeleteByKey(Element(i)) + } + if !list.IsEmpty() { + t.Fail() + } + + list = New(memStore) + // Delete elements at the end of the list. + for i := 0; i < maxN; i++ { + list.InsertByKey(Element(i), 0, Element(i)) + } + for i := 0; i < maxN; i++ { + list.DeleteByKey(Element(maxN - i - 1)) + } + if !list.IsEmpty() { + t.Fail() + } + + list = New(memStore) + // Delete elements at random positions in the list. + rList := rand.Perm(maxN) + for _, e := range rList { + list.InsertByKey(Element(e), 0, Element(e)) + } + for _, e := range rList { + list.DeleteByKey(Element(e)) + } + if !list.IsEmpty() { + t.Fail() + } +} + +func TestNext(t *testing.T) { + list := New(memStore) + + for i := 0; i < maxN; i++ { + list.InsertByKey(Element(i), 0, Element(i)) + } + + smallest, _ := list.GetSmallestNode() + largest, _ := list.GetLargestNode() + + lastNode := smallest + node := lastNode + for node != largest { + node, _ = list.Next(node) + // Must always be incrementing here! + if bytes.Compare(node.Key, lastNode.Key) <= 0 { + t.Fail() + } + // Next.Prev must always point to itself! + prevNode, _ := list.Prev(node) + nextNode, _ := list.Next(prevNode) + if nextNode != node { + t.Fail() + } + lastNode = node + } + + if nextNode, _ := list.Next(largest); nextNode != smallest { + t.Fail() + } +} + +func TestPrev(t *testing.T) { + list := New(memStore) + + for i := 0; i < maxN; i++ { + list.InsertByKey(Element(i), 0, Element(i)) + } + + smallest, _ := list.GetSmallestNode() + largest, _ := list.GetLargestNode() + + lastNode := largest + node := lastNode + for node != smallest { + node, _ = list.Prev(node) + // Must always be incrementing here! + if bytes.Compare(node.Key, lastNode.Key) >= 0 { + t.Fail() + } + // Next.Prev must always point to itself! + nextNode, _ := list.Next(node) + prevNode, _ := list.Prev(nextNode) + if prevNode != node { + t.Fail() + } + lastNode = node + } + + if prevNode, _ := list.Prev(smallest); prevNode != largest { + t.Fail() + } +} + +func TestFindGreaterOrEqual(t *testing.T) { + + maxNumber := maxN * 100 + + var list *SkipList + var listPointer *SkipList + + // Test on empty list. + if _, _, ok, _ := listPointer.FindGreaterOrEqual(Element(0)); ok { + t.Errorf("found element 0 in an empty list") + } + + list = New(memStore) + + for i := 0; i < maxN; i++ { + list.InsertByKey(Element(rand.Intn(maxNumber)), 0, Element(i)) + } + + for i := 0; i < maxN; i++ { + key := Element(rand.Intn(maxNumber)) + if _, v, ok, _ := list.FindGreaterOrEqual(key); ok { + // if f is v should be bigger than the element before + if v.Prev != nil && bytes.Compare(key, v.Prev.Key) < 0 { + t.Errorf("PrevV: %s\n key: %s\n\n", string(v.Prev.Key), string(key)) + } + // v should be bigger or equal to f + // If we compare directly, we get an equal key with a difference on the 10th decimal point, which fails. + if bytes.Compare(v.Key, key) < 0 { + t.Errorf("v: %s\n key: %s\n\n", string(v.Key), string(key)) + } + } else { + lastNode, _ := list.GetLargestNode() + lastV := lastNode.GetValue() + // It is OK, to fail, as long as f is bigger than the last element. + if bytes.Compare(key, lastV) <= 0 { + t.Errorf("lastV: %s\n key: %s\n\n", string(lastV), string(key)) + } + } + } + +} + +func TestChangeValue(t *testing.T) { + list := New(memStore) + + for i := 0; i < maxN; i++ { + list.InsertByKey(Element(i), 0, []byte("value")) + } + + for i := 0; i < maxN; i++ { + // The key only looks at the int so the string doesn't matter here! + _, f1, ok, _ := list.Find(Element(i)) + if !ok { + t.Fail() + } + err := list.ChangeValue(f1, []byte("different value")) + if err != nil { + t.Fail() + } + _, f2, ok, _ := list.Find(Element(i)) + if !ok { + t.Fail() + } + if bytes.Compare(f2.GetValue(), []byte("different value")) != 0 { + t.Fail() + } + } +} |
