aboutsummaryrefslogtreecommitdiff
path: root/weed/util
diff options
context:
space:
mode:
authoryulai.li <blacktear23@gmail.com>2022-06-26 22:43:37 +0800
committeryulai.li <blacktear23@gmail.com>2022-06-26 22:43:37 +0800
commit46e0b629e529f3aff535f90dd25eb719adf1c0d0 (patch)
tree734125b48b6d96f8796a2b89b924312cd169ef0e /weed/util
parenta5bd0b3a1644a77dcc0b9ff41c4ce8eb3ea0d566 (diff)
parentdc59ccd110a321db7d0b0480631aa95a3d9ba7e6 (diff)
downloadseaweedfs-46e0b629e529f3aff535f90dd25eb719adf1c0d0.tar.xz
seaweedfs-46e0b629e529f3aff535f90dd25eb719adf1c0d0.zip
Update tikv client version and add one PC support
Diffstat (limited to 'weed/util')
-rw-r--r--weed/util/bounded_tree/bounded_tree.go179
-rw-r--r--weed/util/bounded_tree/bounded_tree_test.go126
-rw-r--r--weed/util/chunk_cache/chunk_cache.go92
-rw-r--r--weed/util/chunk_cache/chunk_cache_in_memory.go18
-rw-r--r--weed/util/chunk_cache/chunk_cache_on_disk.go22
-rw-r--r--weed/util/chunk_cache/chunk_cache_on_disk_test.go31
-rw-r--r--weed/util/chunk_cache/on_disk_cache_layer.go32
-rw-r--r--weed/util/compression_test.go21
-rw-r--r--weed/util/config.go25
-rw-r--r--weed/util/constants.go5
-rw-r--r--weed/util/constants_4bytes.go1
-rw-r--r--weed/util/constants_5bytes.go1
-rw-r--r--weed/util/file_util.go25
-rw-r--r--weed/util/file_util_non_posix.go1
-rw-r--r--weed/util/file_util_posix.go1
-rw-r--r--weed/util/fullpath.go7
-rw-r--r--weed/util/grace/signal_handling.go1
-rw-r--r--weed/util/grace/signal_handling_notsupported.go1
-rw-r--r--weed/util/http_util.go57
-rw-r--r--weed/util/log_buffer/log_buffer.go62
-rw-r--r--weed/util/log_buffer/log_buffer_test.go2
-rw-r--r--weed/util/log_buffer/log_read.go20
-rw-r--r--weed/util/mem/slot_pool.go63
-rw-r--r--weed/util/mem/slot_pool_test.go48
-rw-r--r--weed/util/net_timeout.go51
-rw-r--r--weed/util/network.go33
-rw-r--r--weed/util/parse.go5
-rw-r--r--weed/util/retry.go1
-rw-r--r--weed/util/skiplist/Makefile6
-rw-r--r--weed/util/skiplist/list_store.go32
-rw-r--r--weed/util/skiplist/name_batch.go102
-rw-r--r--weed/util/skiplist/name_list.go326
-rw-r--r--weed/util/skiplist/name_list_serde.go71
-rw-r--r--weed/util/skiplist/name_list_test.go73
-rw-r--r--weed/util/skiplist/skiplist.go571
-rw-r--r--weed/util/skiplist/skiplist.pb.go438
-rw-r--r--weed/util/skiplist/skiplist.proto30
-rw-r--r--weed/util/skiplist/skiplist_serde.go51
-rw-r--r--weed/util/skiplist/skiplist_test.go290
39 files changed, 2442 insertions, 479 deletions
diff --git a/weed/util/bounded_tree/bounded_tree.go b/weed/util/bounded_tree/bounded_tree.go
deleted file mode 100644
index 58911df75..000000000
--- a/weed/util/bounded_tree/bounded_tree.go
+++ /dev/null
@@ -1,179 +0,0 @@
-package bounded_tree
-
-import (
- "sync"
-
- "github.com/chrislusf/seaweedfs/weed/glog"
- "github.com/chrislusf/seaweedfs/weed/util"
-)
-
-type Node struct {
- Parent *Node
- Name string
- Children map[string]*Node
-}
-
-type BoundedTree struct {
- root *Node
- sync.RWMutex
- baseDir util.FullPath
-}
-
-func NewBoundedTree(baseDir util.FullPath) *BoundedTree {
- return &BoundedTree{
- root: &Node{
- Name: "/",
- },
- baseDir: baseDir,
- }
-}
-
-type VisitNodeFunc func(path util.FullPath) (childDirectories []string, err error)
-
-// If the path is not visited, call the visitFn for each level of directory
-// No action if the directory has been visited before or does not exist.
-// A leaf node, which has no children, represents a directory not visited.
-// A non-leaf node or a non-existing node represents a directory already visited, or does not need to visit.
-func (t *BoundedTree) EnsureVisited(p util.FullPath, visitFn VisitNodeFunc) (visitErr error) {
- t.Lock()
- defer t.Unlock()
-
- if t.root == nil {
- return
- }
- components := p.Split()
- // fmt.Printf("components %v %d\n", components, len(components))
- canDelete, err := t.ensureVisited(t.root, util.FullPath("/"), components, 0, visitFn)
- if err != nil {
- return err
- }
- if canDelete {
- t.root = nil
- }
- return nil
-}
-
-func (t *BoundedTree) ensureVisited(n *Node, currentPath util.FullPath, components []string, i int, visitFn VisitNodeFunc) (canDeleteNode bool, visitErr error) {
-
- // println("ensureVisited", currentPath, i)
-
- if n == nil {
- // fmt.Printf("%s null\n", currentPath)
- return
- }
-
- if n.isVisited() {
- // fmt.Printf("%s visited %v\n", currentPath, n.Name)
- } else {
- // fmt.Printf("ensure %v\n", currentPath)
-
- children, err := visitFn(currentPath)
- if err != nil {
- glog.V(0).Infof("failed to visit %s: %v", currentPath, err)
- return false, err
- }
-
- if len(children) == 0 {
- // fmt.Printf(" canDelete %v without children\n", currentPath)
- return true, nil
- }
-
- n.Children = make(map[string]*Node)
- for _, child := range children {
- // fmt.Printf(" add child %v %v\n", currentPath, child)
- n.Children[child] = &Node{
- Name: child,
- }
- }
- }
-
- if i >= len(components) {
- return
- }
-
- // fmt.Printf(" check child %v %v\n", currentPath, components[i])
-
- toVisitNode, found := n.Children[components[i]]
- if !found {
- // fmt.Printf(" did not find child %v %v\n", currentPath, components[i])
- return
- }
-
- // fmt.Printf(" ensureVisited %v %v\n", currentPath, toVisitNode.Name)
- canDelete, childVisitErr := t.ensureVisited(toVisitNode, currentPath.Child(components[i]), components, i+1, visitFn)
- if childVisitErr != nil {
- return false, childVisitErr
- }
- if canDelete {
-
- // fmt.Printf(" delete %v %v\n", currentPath, components[i])
- delete(n.Children, components[i])
-
- if len(n.Children) == 0 {
- // fmt.Printf(" canDelete %v\n", currentPath)
- return true, nil
- }
- }
-
- return false, nil
-
-}
-
-func (n *Node) isVisited() bool {
- if n == nil {
- return true
- }
- if len(n.Children) > 0 {
- return true
- }
- return false
-}
-
-func (n *Node) getChild(childName string) *Node {
- if n == nil {
- return nil
- }
- if len(n.Children) > 0 {
- return n.Children[childName]
- }
- return nil
-}
-
-func (t *BoundedTree) HasVisited(p util.FullPath) bool {
-
- t.RLock()
- defer t.RUnlock()
-
- if t.root == nil {
- return true
- }
-
- components := p.Split()
- // fmt.Printf("components %v %d\n", components, len(components))
- return t.hasVisited(t.root, util.FullPath("/"), components, 0)
-}
-
-func (t *BoundedTree) hasVisited(n *Node, currentPath util.FullPath, components []string, i int) bool {
-
- if n == nil {
- return true
- }
-
- if !n.isVisited() {
- return false
- }
-
- // fmt.Printf(" hasVisited child %v %+v %d\n", currentPath, components, i)
-
- if i >= len(components) {
- return true
- }
-
- toVisitNode, found := n.Children[components[i]]
- if !found {
- return true
- }
-
- return t.hasVisited(toVisitNode, currentPath.Child(components[i]), components, i+1)
-
-}
diff --git a/weed/util/bounded_tree/bounded_tree_test.go b/weed/util/bounded_tree/bounded_tree_test.go
deleted file mode 100644
index 465f1cc9c..000000000
--- a/weed/util/bounded_tree/bounded_tree_test.go
+++ /dev/null
@@ -1,126 +0,0 @@
-package bounded_tree
-
-import (
- "fmt"
- "testing"
-
- "github.com/stretchr/testify/assert"
-
- "github.com/chrislusf/seaweedfs/weed/util"
-)
-
-var (
- visitFn = func(path util.FullPath) (childDirectories []string, err error) {
- fmt.Printf(" visit %v ...\n", path)
- switch path {
- case "/":
- return []string{"a", "g", "h"}, nil
- case "/a":
- return []string{"b", "f"}, nil
- case "/a/b":
- return []string{"c", "e"}, nil
- case "/a/b/c":
- return []string{"d"}, nil
- case "/a/b/c/d":
- return []string{"i", "j"}, nil
- case "/a/b/c/d/i":
- return []string{}, nil
- case "/a/b/c/d/j":
- return []string{}, nil
- case "/a/b/e":
- return []string{}, nil
- case "/a/f":
- return []string{}, nil
- }
- return nil, nil
- }
-
- printMap = func(m map[string]*Node) {
- for k := range m {
- println(" >", k)
- }
- }
-)
-
-func TestBoundedTree(t *testing.T) {
-
- // a/b/c/d/i
- // a/b/c/d/j
- // a/b/c/d
- // a/b/e
- // a/f
- // g
- // h
-
- tree := NewBoundedTree(util.FullPath("/"))
-
- tree.EnsureVisited(util.FullPath("/a/b/c"), visitFn)
-
- assert.Equal(t, true, tree.HasVisited(util.FullPath("/a/b")))
- assert.Equal(t, true, tree.HasVisited(util.FullPath("/a/b/c")))
- assert.Equal(t, false, tree.HasVisited(util.FullPath("/a/b/c/d")))
- assert.Equal(t, false, tree.HasVisited(util.FullPath("/a/b/e")))
- assert.Equal(t, false, tree.HasVisited(util.FullPath("/a/f")))
- assert.Equal(t, false, tree.HasVisited(util.FullPath("/g")))
- assert.Equal(t, false, tree.HasVisited(util.FullPath("/h")))
- assert.Equal(t, true, tree.HasVisited(util.FullPath("/")))
- assert.Equal(t, true, tree.HasVisited(util.FullPath("/x")))
- assert.Equal(t, false, tree.HasVisited(util.FullPath("/a/b/e/x")))
-
- printMap(tree.root.Children)
-
- a := tree.root.getChild("a")
-
- b := a.getChild("b")
- if !b.isVisited() {
- t.Errorf("expect visited /a/b")
- }
- c := b.getChild("c")
- if !c.isVisited() {
- t.Errorf("expect visited /a/b/c")
- }
-
- d := c.getChild("d")
- if d.isVisited() {
- t.Errorf("expect unvisited /a/b/c/d")
- }
-
- tree.EnsureVisited(util.FullPath("/a/b/c/d"), visitFn)
- tree.EnsureVisited(util.FullPath("/a/b/c/d/i"), visitFn)
- tree.EnsureVisited(util.FullPath("/a/b/c/d/j"), visitFn)
- tree.EnsureVisited(util.FullPath("/a/b/e"), visitFn)
- tree.EnsureVisited(util.FullPath("/a/f"), visitFn)
-
- printMap(tree.root.Children)
-
-}
-
-func TestEmptyBoundedTree(t *testing.T) {
-
- // g
- // h
-
- tree := NewBoundedTree(util.FullPath("/"))
-
- visitFn := func(path util.FullPath) (childDirectories []string, err error) {
- fmt.Printf(" visit %v ...\n", path)
- switch path {
- case "/":
- return []string{"g", "h"}, nil
- }
- t.Fatalf("expected visit %s", path)
- return nil, nil
- }
-
- tree.EnsureVisited(util.FullPath("/a/b"), visitFn)
-
- tree.EnsureVisited(util.FullPath("/a/b"), visitFn)
-
- printMap(tree.root.Children)
-
- assert.Equal(t, true, tree.HasVisited(util.FullPath("/a/b")))
- assert.Equal(t, true, tree.HasVisited(util.FullPath("/a")))
- assert.Equal(t, false, tree.HasVisited(util.FullPath("/g")))
- assert.Equal(t, false, tree.HasVisited(util.FullPath("/g/x")))
-
-}
diff --git a/weed/util/chunk_cache/chunk_cache.go b/weed/util/chunk_cache/chunk_cache.go
index 40d24b322..3f3b264b1 100644
--- a/weed/util/chunk_cache/chunk_cache.go
+++ b/weed/util/chunk_cache/chunk_cache.go
@@ -11,8 +11,7 @@ import (
var ErrorOutOfBounds = errors.New("attempt to read out of bounds")
type ChunkCache interface {
- GetChunk(fileId string, minSize uint64) (data []byte)
- GetChunkSlice(fileId string, offset, length uint64) []byte
+ ReadChunkAt(data []byte, fileId string, offset uint64) (n int, err error)
SetChunk(fileId string, data []byte)
}
@@ -44,105 +43,52 @@ func NewTieredChunkCache(maxEntries int64, dir string, diskSizeInUnit int64, uni
return c
}
-func (c *TieredChunkCache) GetChunk(fileId string, minSize uint64) (data []byte) {
+func (c *TieredChunkCache) ReadChunkAt(data []byte, fileId string, offset uint64) (n int, err error) {
if c == nil {
- return
- }
-
- c.RLock()
- defer c.RUnlock()
-
- return c.doGetChunk(fileId, minSize)
-}
-
-func (c *TieredChunkCache) doGetChunk(fileId string, minSize uint64) (data []byte) {
-
- if minSize <= c.onDiskCacheSizeLimit0 {
- data = c.memCache.GetChunk(fileId)
- if len(data) >= int(minSize) {
- return data
- }
- }
-
- fid, err := needle.ParseFileIdFromString(fileId)
- if err != nil {
- glog.Errorf("failed to parse file id %s", fileId)
- return nil
- }
-
- if minSize <= c.onDiskCacheSizeLimit0 {
- data = c.diskCaches[0].getChunk(fid.Key)
- if len(data) >= int(minSize) {
- return data
- }
- }
- if minSize <= c.onDiskCacheSizeLimit1 {
- data = c.diskCaches[1].getChunk(fid.Key)
- if len(data) >= int(minSize) {
- return data
- }
- }
- {
- data = c.diskCaches[2].getChunk(fid.Key)
- if len(data) >= int(minSize) {
- return data
- }
- }
-
- return nil
-
-}
-
-func (c *TieredChunkCache) GetChunkSlice(fileId string, offset, length uint64) []byte {
- if c == nil {
- return nil
+ return 0, nil
}
c.RLock()
defer c.RUnlock()
- return c.doGetChunkSlice(fileId, offset, length)
-}
-
-func (c *TieredChunkCache) doGetChunkSlice(fileId string, offset, length uint64) (data []byte) {
-
- minSize := offset + length
+ minSize := offset + uint64(len(data))
if minSize <= c.onDiskCacheSizeLimit0 {
- data, err := c.memCache.getChunkSlice(fileId, offset, length)
+ n, err = c.memCache.readChunkAt(data, fileId, offset)
if err != nil {
glog.Errorf("failed to read from memcache: %s", err)
}
- if len(data) >= int(minSize) {
- return data
+ if n >= int(minSize) {
+ return n, nil
}
}
fid, err := needle.ParseFileIdFromString(fileId)
if err != nil {
glog.Errorf("failed to parse file id %s", fileId)
- return nil
+ return n, nil
}
if minSize <= c.onDiskCacheSizeLimit0 {
- data = c.diskCaches[0].getChunkSlice(fid.Key, offset, length)
- if len(data) >= int(minSize) {
- return data
+ n, err = c.diskCaches[0].readChunkAt(data, fid.Key, offset)
+ if n >= int(minSize) {
+ return
}
}
if minSize <= c.onDiskCacheSizeLimit1 {
- data = c.diskCaches[1].getChunkSlice(fid.Key, offset, length)
- if len(data) >= int(minSize) {
- return data
+ n, err = c.diskCaches[1].readChunkAt(data, fid.Key, offset)
+ if n >= int(minSize) {
+ return
}
}
{
- data = c.diskCaches[2].getChunkSlice(fid.Key, offset, length)
- if len(data) >= int(minSize) {
- return data
+ n, err = c.diskCaches[2].readChunkAt(data, fid.Key, offset)
+ if n >= int(minSize) {
+ return
}
}
- return nil
+ return 0, nil
+
}
func (c *TieredChunkCache) SetChunk(fileId string, data []byte) {
diff --git a/weed/util/chunk_cache/chunk_cache_in_memory.go b/weed/util/chunk_cache/chunk_cache_in_memory.go
index d725f8a16..2982d0979 100644
--- a/weed/util/chunk_cache/chunk_cache_in_memory.go
+++ b/weed/util/chunk_cache/chunk_cache_in_memory.go
@@ -1,9 +1,8 @@
package chunk_cache
import (
- "time"
-
"github.com/karlseguin/ccache/v2"
+ "time"
)
// a global cache for recently accessed file chunks
@@ -45,6 +44,21 @@ func (c *ChunkCacheInMemory) getChunkSlice(fileId string, offset, length uint64)
return data[offset : int(offset)+wanted], nil
}
+func (c *ChunkCacheInMemory) readChunkAt(buffer []byte, fileId string, offset uint64) (int, error) {
+ item := c.cache.Get(fileId)
+ if item == nil {
+ return 0, nil
+ }
+ data := item.Value().([]byte)
+ item.Extend(time.Hour)
+ wanted := min(len(buffer), len(data)-int(offset))
+ if wanted < 0 {
+ return 0, ErrorOutOfBounds
+ }
+ n := copy(buffer, data[offset:int(offset)+wanted])
+ return n, nil
+}
+
func (c *ChunkCacheInMemory) SetChunk(fileId string, data []byte) {
localCopy := make([]byte, len(data))
copy(localCopy, data)
diff --git a/weed/util/chunk_cache/chunk_cache_on_disk.go b/weed/util/chunk_cache/chunk_cache_on_disk.go
index 36de5c972..100b5919e 100644
--- a/weed/util/chunk_cache/chunk_cache_on_disk.go
+++ b/weed/util/chunk_cache/chunk_cache_on_disk.go
@@ -144,6 +144,28 @@ func (v *ChunkCacheVolume) getNeedleSlice(key types.NeedleId, offset, length uin
return data, nil
}
+func (v *ChunkCacheVolume) readNeedleSliceAt(data []byte, key types.NeedleId, offset uint64) (n int, err error) {
+ nv, ok := v.nm.Get(key)
+ if !ok {
+ return 0, storage.ErrorNotFound
+ }
+ wanted := min(len(data), int(nv.Size)-int(offset))
+ if wanted < 0 {
+ // should never happen, but better than panicing
+ return 0, ErrorOutOfBounds
+ }
+ if n, err = v.DataBackend.ReadAt(data, nv.Offset.ToActualOffset()+int64(offset)); err != nil {
+ return n, fmt.Errorf("read %s.dat [%d,%d): %v",
+ v.fileName, nv.Offset.ToActualOffset()+int64(offset), int(nv.Offset.ToActualOffset())+int(offset)+wanted, err)
+ } else {
+ if n != wanted {
+ return n, fmt.Errorf("read %d, expected %d", n, wanted)
+ }
+ }
+
+ return n, nil
+}
+
func (v *ChunkCacheVolume) WriteNeedle(key types.NeedleId, data []byte) error {
offset := v.fileSize
diff --git a/weed/util/chunk_cache/chunk_cache_on_disk_test.go b/weed/util/chunk_cache/chunk_cache_on_disk_test.go
index f8325276e..8c7880eee 100644
--- a/weed/util/chunk_cache/chunk_cache_on_disk_test.go
+++ b/weed/util/chunk_cache/chunk_cache_on_disk_test.go
@@ -3,16 +3,13 @@ package chunk_cache
import (
"bytes"
"fmt"
- "io/ioutil"
+ "github.com/chrislusf/seaweedfs/weed/util/mem"
"math/rand"
- "os"
"testing"
)
func TestOnDisk(t *testing.T) {
-
- tmpDir, _ := ioutil.TempDir("", "c")
- defer os.RemoveAll(tmpDir)
+ tmpDir := t.TempDir()
totalDiskSizeInKB := int64(32)
@@ -22,7 +19,7 @@ func TestOnDisk(t *testing.T) {
type test_data struct {
data []byte
fileId string
- size uint64
+ size int
}
testData := make([]*test_data, writeCount)
for i := 0; i < writeCount; i++ {
@@ -31,29 +28,35 @@ func TestOnDisk(t *testing.T) {
testData[i] = &test_data{
data: buff,
fileId: fmt.Sprintf("1,%daabbccdd", i+1),
- size: uint64(len(buff)),
+ size: len(buff),
}
cache.SetChunk(testData[i].fileId, testData[i].data)
// read back right after write
- data := cache.GetChunk(testData[i].fileId, testData[i].size)
+ data := mem.Allocate(testData[i].size)
+ cache.ReadChunkAt(data, testData[i].fileId, 0)
if bytes.Compare(data, testData[i].data) != 0 {
t.Errorf("failed to write to and read from cache: %d", i)
}
+ mem.Free(data)
}
for i := 0; i < 2; i++ {
- data := cache.GetChunk(testData[i].fileId, testData[i].size)
+ data := mem.Allocate(testData[i].size)
+ cache.ReadChunkAt(data, testData[i].fileId, 0)
if bytes.Compare(data, testData[i].data) == 0 {
t.Errorf("old cache should have been purged: %d", i)
}
+ mem.Free(data)
}
for i := 2; i < writeCount; i++ {
- data := cache.GetChunk(testData[i].fileId, testData[i].size)
+ data := mem.Allocate(testData[i].size)
+ cache.ReadChunkAt(data, testData[i].fileId, 0)
if bytes.Compare(data, testData[i].data) != 0 {
t.Errorf("failed to write to and read from cache: %d", i)
}
+ mem.Free(data)
}
cache.Shutdown()
@@ -61,10 +64,12 @@ func TestOnDisk(t *testing.T) {
cache = NewTieredChunkCache(2, tmpDir, totalDiskSizeInKB, 1024)
for i := 0; i < 2; i++ {
- data := cache.GetChunk(testData[i].fileId, testData[i].size)
+ data := mem.Allocate(testData[i].size)
+ cache.ReadChunkAt(data, testData[i].fileId, 0)
if bytes.Compare(data, testData[i].data) == 0 {
t.Errorf("old cache should have been purged: %d", i)
}
+ mem.Free(data)
}
for i := 2; i < writeCount; i++ {
@@ -87,10 +92,12 @@ func TestOnDisk(t *testing.T) {
*/
continue
}
- data := cache.GetChunk(testData[i].fileId, testData[i].size)
+ data := mem.Allocate(testData[i].size)
+ cache.ReadChunkAt(data, testData[i].fileId, 0)
if bytes.Compare(data, testData[i].data) != 0 {
t.Errorf("failed to write to and read from cache: %d", i)
}
+ mem.Free(data)
}
cache.Shutdown()
diff --git a/weed/util/chunk_cache/on_disk_cache_layer.go b/weed/util/chunk_cache/on_disk_cache_layer.go
index b004913ef..de32fb445 100644
--- a/weed/util/chunk_cache/on_disk_cache_layer.go
+++ b/weed/util/chunk_cache/on_disk_cache_layer.go
@@ -2,12 +2,11 @@ package chunk_cache
import (
"fmt"
- "path"
- "sort"
-
"github.com/chrislusf/seaweedfs/weed/glog"
"github.com/chrislusf/seaweedfs/weed/storage"
"github.com/chrislusf/seaweedfs/weed/storage/types"
+ "golang.org/x/exp/slices"
+ "path"
)
type OnDiskCacheLayer struct {
@@ -33,10 +32,9 @@ func NewOnDiskCacheLayer(dir, namePrefix string, diskSize int64, segmentCount in
}
// keep newest cache to the front
- sort.Slice(c.diskCaches, func(i, j int) bool {
- return c.diskCaches[i].lastModTime.After(c.diskCaches[j].lastModTime)
+ slices.SortFunc(c.diskCaches, func(a, b *ChunkCacheVolume) bool {
+ return a.lastModTime.After(b.lastModTime)
})
-
return c
}
@@ -96,7 +94,7 @@ func (c *OnDiskCacheLayer) getChunkSlice(needleId types.NeedleId, offset, length
continue
}
if err != nil {
- glog.Errorf("failed to read cache file %s id %d", diskCache.fileName, needleId)
+ glog.Warningf("failed to read cache file %s id %d: %v", diskCache.fileName, needleId, err)
continue
}
if len(data) != 0 {
@@ -108,6 +106,26 @@ func (c *OnDiskCacheLayer) getChunkSlice(needleId types.NeedleId, offset, length
}
+func (c *OnDiskCacheLayer) readChunkAt(buffer []byte, needleId types.NeedleId, offset uint64) (n int, err error) {
+
+ for _, diskCache := range c.diskCaches {
+ n, err = diskCache.readNeedleSliceAt(buffer, needleId, offset)
+ if err == storage.ErrorNotFound {
+ continue
+ }
+ if err != nil {
+ glog.Warningf("failed to read cache file %s id %d: %v", diskCache.fileName, needleId, err)
+ continue
+ }
+ if n > 0 {
+ return
+ }
+ }
+
+ return
+
+}
+
func (c *OnDiskCacheLayer) shutdown() {
for _, diskCache := range c.diskCaches {
diff --git a/weed/util/compression_test.go b/weed/util/compression_test.go
deleted file mode 100644
index b515e8988..000000000
--- a/weed/util/compression_test.go
+++ /dev/null
@@ -1,21 +0,0 @@
-package util
-
-import (
- "testing"
-
- "golang.org/x/tools/godoc/util"
-)
-
-func TestIsGzippable(t *testing.T) {
- buf := make([]byte, 1024)
-
- isText := util.IsText(buf)
-
- if isText {
- t.Error("buf with zeros are not text")
- }
-
- compressed, _ := GzipData(buf)
-
- t.Logf("compressed size %d\n", len(compressed))
-}
diff --git a/weed/util/config.go b/weed/util/config.go
index ae9397340..f09ac7e5e 100644
--- a/weed/util/config.go
+++ b/weed/util/config.go
@@ -9,6 +9,20 @@ import (
"github.com/chrislusf/seaweedfs/weed/glog"
)
+var (
+ ConfigurationFileDirectory DirectoryValueType
+)
+
+type DirectoryValueType string
+
+func (s *DirectoryValueType) Set(value string) error {
+ *s = DirectoryValueType(value)
+ return nil
+}
+func (s *DirectoryValueType) String() string {
+ return string(*s)
+}
+
type Configuration interface {
GetString(key string) string
GetBool(key string) bool
@@ -20,11 +34,12 @@ type Configuration interface {
func LoadConfiguration(configFileName string, required bool) (loaded bool) {
// find a filer store
- viper.SetConfigName(configFileName) // name of config file (without extension)
- viper.AddConfigPath(".") // optionally look for config in the working directory
- viper.AddConfigPath("$HOME/.seaweedfs") // call multiple times to add many search paths
- viper.AddConfigPath("/usr/local/etc/seaweedfs/") // search path for bsd-style config directory in
- viper.AddConfigPath("/etc/seaweedfs/") // path to look for the config file in
+ viper.SetConfigName(configFileName) // name of config file (without extension)
+ viper.AddConfigPath(ResolvePath(ConfigurationFileDirectory.String())) // path to look for the config file in
+ viper.AddConfigPath(".") // optionally look for config in the working directory
+ viper.AddConfigPath("$HOME/.seaweedfs") // call multiple times to add many search paths
+ viper.AddConfigPath("/usr/local/etc/seaweedfs/") // search path for bsd-style config directory in
+ viper.AddConfigPath("/etc/seaweedfs/") // path to look for the config file in
if err := viper.MergeInConfig(); err != nil { // Handle errors reading the config file
if strings.Contains(err.Error(), "Not Found") {
diff --git a/weed/util/constants.go b/weed/util/constants.go
index d2a90f874..8886e8fd2 100644
--- a/weed/util/constants.go
+++ b/weed/util/constants.go
@@ -5,8 +5,9 @@ import (
)
var (
- VERSION = fmt.Sprintf("%s %.02f", sizeLimit, 2.65)
- COMMIT = ""
+ VERSION_NUMBER = fmt.Sprintf("%.02f", 3.12)
+ VERSION = sizeLimit + " " + VERSION_NUMBER
+ COMMIT = ""
)
func Version() string {
diff --git a/weed/util/constants_4bytes.go b/weed/util/constants_4bytes.go
index a29d9d3b0..187e33909 100644
--- a/weed/util/constants_4bytes.go
+++ b/weed/util/constants_4bytes.go
@@ -1,3 +1,4 @@
+//go:build !5BytesOffset
// +build !5BytesOffset
package util
diff --git a/weed/util/constants_5bytes.go b/weed/util/constants_5bytes.go
index 91ce4066f..7c6a158cf 100644
--- a/weed/util/constants_5bytes.go
+++ b/weed/util/constants_5bytes.go
@@ -1,3 +1,4 @@
+//go:build 5BytesOffset
// +build 5BytesOffset
package util
diff --git a/weed/util/file_util.go b/weed/util/file_util.go
index f83f80265..6155d18e1 100644
--- a/weed/util/file_util.go
+++ b/weed/util/file_util.go
@@ -87,3 +87,28 @@ func ResolvePath(path string) string {
return path
}
+
+func FileNameBase(filename string) string {
+ lastDotIndex := strings.LastIndex(filename, ".")
+ if lastDotIndex < 0 {
+ return filename
+ }
+ return filename[:lastDotIndex]
+}
+
+// Copied from os.WriteFile(), adding file sync.
+// see https://github.com/golang/go/issues/20599
+func WriteFile(name string, data []byte, perm os.FileMode) error {
+ f, err := os.OpenFile(name, os.O_WRONLY|os.O_CREATE|os.O_TRUNC, perm)
+ if err != nil {
+ return err
+ }
+ _, err = f.Write(data)
+ if err1 := f.Sync(); err1 != nil && err == nil {
+ err = err1
+ }
+ if err1 := f.Close(); err1 != nil && err == nil {
+ err = err1
+ }
+ return err
+}
diff --git a/weed/util/file_util_non_posix.go b/weed/util/file_util_non_posix.go
index ffcfef6d5..29aecffa6 100644
--- a/weed/util/file_util_non_posix.go
+++ b/weed/util/file_util_non_posix.go
@@ -1,3 +1,4 @@
+//go:build linux || darwin || freebsd || netbsd || openbsd || plan9 || solaris || zos
// +build linux darwin freebsd netbsd openbsd plan9 solaris zos
package util
diff --git a/weed/util/file_util_posix.go b/weed/util/file_util_posix.go
index 22ca60b3b..0bec8abad 100644
--- a/weed/util/file_util_posix.go
+++ b/weed/util/file_util_posix.go
@@ -1,3 +1,4 @@
+//go:build windows
// +build windows
package util
diff --git a/weed/util/fullpath.go b/weed/util/fullpath.go
index 85028b052..f52d4d1d0 100644
--- a/weed/util/fullpath.go
+++ b/weed/util/fullpath.go
@@ -41,8 +41,11 @@ func (fp FullPath) Child(name string) FullPath {
return FullPath(dir + "/" + noPrefix)
}
-func (fp FullPath) AsInode() uint64 {
- return uint64(HashStringToLong(string(fp)))
+// AsInode an in-memory only inode representation
+func (fp FullPath) AsInode(unixTime int64) uint64 {
+ inode := uint64(HashStringToLong(string(fp)))
+ inode = inode + uint64(unixTime)*37
+ return inode
}
// split, but skipping the root
diff --git a/weed/util/grace/signal_handling.go b/weed/util/grace/signal_handling.go
index 7cca46764..fc7afcad9 100644
--- a/weed/util/grace/signal_handling.go
+++ b/weed/util/grace/signal_handling.go
@@ -1,3 +1,4 @@
+//go:build !plan9
// +build !plan9
package grace
diff --git a/weed/util/grace/signal_handling_notsupported.go b/weed/util/grace/signal_handling_notsupported.go
index 5335915a1..de898159a 100644
--- a/weed/util/grace/signal_handling_notsupported.go
+++ b/weed/util/grace/signal_handling_notsupported.go
@@ -1,3 +1,4 @@
+//go:build plan9
// +build plan9
package grace
diff --git a/weed/util/http_util.go b/weed/util/http_util.go
index 2efd6b5aa..2f42d3768 100644
--- a/weed/util/http_util.go
+++ b/weed/util/http_util.go
@@ -5,8 +5,8 @@ import (
"encoding/json"
"errors"
"fmt"
+ "github.com/chrislusf/seaweedfs/weed/util/mem"
"io"
- "io/ioutil"
"net/http"
"net/url"
"strings"
@@ -35,7 +35,7 @@ func Post(url string, values url.Values) ([]byte, error) {
return nil, err
}
defer r.Body.Close()
- b, err := ioutil.ReadAll(r.Body)
+ b, err := io.ReadAll(r.Body)
if r.StatusCode >= 400 {
if err != nil {
return nil, fmt.Errorf("%s: %d - %s", url, r.StatusCode, string(b))
@@ -71,7 +71,7 @@ func Get(url string) ([]byte, bool, error) {
reader = response.Body
}
- b, err := ioutil.ReadAll(reader)
+ b, err := io.ReadAll(reader)
if response.StatusCode >= 400 {
retryable := response.StatusCode >= 500
return nil, retryable, fmt.Errorf("%s: %s", url, response.Status)
@@ -107,7 +107,7 @@ func Delete(url string, jwt string) error {
return e
}
defer resp.Body.Close()
- body, err := ioutil.ReadAll(resp.Body)
+ body, err := io.ReadAll(resp.Body)
if err != nil {
return err
}
@@ -137,7 +137,7 @@ func DeleteProxied(url string, jwt string) (body []byte, httpStatus int, err err
return
}
defer resp.Body.Close()
- body, err = ioutil.ReadAll(resp.Body)
+ body, err = io.ReadAll(resp.Body)
if err != nil {
return
}
@@ -181,7 +181,16 @@ func GetUrlStream(url string, values url.Values, readFn func(io.Reader) error) e
}
func DownloadFile(fileUrl string, jwt string) (filename string, header http.Header, resp *http.Response, e error) {
- response, err := client.Get(fileUrl)
+ req, err := http.NewRequest("GET", fileUrl, nil)
+ if err != nil {
+ return "", nil, nil, err
+ }
+
+ if len(jwt) > 0 {
+ req.Header.Set("Authorization", "BEARER "+jwt)
+ }
+
+ response, err := client.Do(req)
if err != nil {
return "", nil, nil, err
}
@@ -271,7 +280,7 @@ func ReadUrl(fileUrl string, cipherKey []byte, isContentCompressed bool, isFullC
}
}
// drains the response body to avoid memory leak
- data, _ := ioutil.ReadAll(reader)
+ data, _ := io.ReadAll(reader)
if len(data) != 0 {
glog.V(1).Infof("%s reader has remaining %d bytes", contentEncoding, len(data))
}
@@ -318,7 +327,8 @@ func ReadUrlAsStream(fileUrl string, cipherKey []byte, isContentGzipped bool, is
var (
m int
)
- buf := make([]byte, 64*1024)
+ buf := mem.Allocate(64 * 1024)
+ defer mem.Free(buf)
for {
m, err = reader.Read(buf)
@@ -327,7 +337,7 @@ func ReadUrlAsStream(fileUrl string, cipherKey []byte, isContentGzipped bool, is
return false, nil
}
if err != nil {
- return false, err
+ return true, err
}
}
@@ -359,7 +369,7 @@ func readEncryptedUrl(fileUrl string, cipherKey []byte, isContentCompressed bool
return false, nil
}
-func ReadUrlAsReaderCloser(fileUrl string, rangeHeader string) (io.ReadCloser, error) {
+func ReadUrlAsReaderCloser(fileUrl string, jwt string, rangeHeader string) (io.ReadCloser, error) {
req, err := http.NewRequest("GET", fileUrl, nil)
if err != nil {
@@ -371,6 +381,10 @@ func ReadUrlAsReaderCloser(fileUrl string, rangeHeader string) (io.ReadCloser, e
req.Header.Add("Accept-Encoding", "gzip")
}
+ if len(jwt) > 0 {
+ req.Header.Set("Authorization", "BEARER "+jwt)
+ }
+
r, err := client.Do(req)
if err != nil {
return nil, err
@@ -393,11 +407,30 @@ func ReadUrlAsReaderCloser(fileUrl string, rangeHeader string) (io.ReadCloser, e
}
func CloseResponse(resp *http.Response) {
- io.Copy(ioutil.Discard, resp.Body)
+ reader := &CountingReader{reader: resp.Body}
+ io.Copy(io.Discard, reader)
resp.Body.Close()
+ if reader.BytesRead > 0 {
+ glog.V(1).Infof("response leftover %d bytes", reader.BytesRead)
+ }
}
func CloseRequest(req *http.Request) {
- io.Copy(ioutil.Discard, req.Body)
+ reader := &CountingReader{reader: req.Body}
+ io.Copy(io.Discard, reader)
req.Body.Close()
+ if reader.BytesRead > 0 {
+ glog.V(1).Infof("request leftover %d bytes", reader.BytesRead)
+ }
+}
+
+type CountingReader struct {
+ reader io.Reader
+ BytesRead int
+}
+
+func (r *CountingReader) Read(p []byte) (n int, err error) {
+ n, err = r.reader.Read(p)
+ r.BytesRead += n
+ return n, err
}
diff --git a/weed/util/log_buffer/log_buffer.go b/weed/util/log_buffer/log_buffer.go
index c2158e7eb..422575193 100644
--- a/weed/util/log_buffer/log_buffer.go
+++ b/weed/util/log_buffer/log_buffer.go
@@ -56,11 +56,15 @@ func NewLogBuffer(name string, flushInterval time.Duration, flushFn func(startTi
return lb
}
-func (m *LogBuffer) AddToBuffer(partitionKey, data []byte, eventTsNs int64) {
+func (m *LogBuffer) AddToBuffer(partitionKey, data []byte, processingTsNs int64) {
+ var toFlush *dataToFlush
m.Lock()
defer func() {
m.Unlock()
+ if toFlush != nil {
+ m.flushChan <- toFlush
+ }
if m.notifyFn != nil {
m.notifyFn()
}
@@ -68,20 +72,20 @@ func (m *LogBuffer) AddToBuffer(partitionKey, data []byte, eventTsNs int64) {
// need to put the timestamp inside the lock
var ts time.Time
- if eventTsNs == 0 {
+ if processingTsNs == 0 {
ts = time.Now()
- eventTsNs = ts.UnixNano()
+ processingTsNs = ts.UnixNano()
} else {
- ts = time.Unix(0, eventTsNs)
+ ts = time.Unix(0, processingTsNs)
}
- if m.lastTsNs >= eventTsNs {
+ if m.lastTsNs >= processingTsNs {
// this is unlikely to happen, but just in case
- eventTsNs = m.lastTsNs + 1
- ts = time.Unix(0, eventTsNs)
+ processingTsNs = m.lastTsNs + 1
+ ts = time.Unix(0, processingTsNs)
}
- m.lastTsNs = eventTsNs
+ m.lastTsNs = processingTsNs
logEntry := &filer_pb.LogEntry{
- TsNs: eventTsNs,
+ TsNs: processingTsNs,
PartitionKeyHash: util.HashToInt32(partitionKey),
Data: data,
}
@@ -96,7 +100,7 @@ func (m *LogBuffer) AddToBuffer(partitionKey, data []byte, eventTsNs int64) {
if m.startTime.Add(m.flushInterval).Before(ts) || len(m.buf)-m.pos < size+4 {
// glog.V(4).Infof("%s copyToFlush1 start time %v, ts %v, remaining %d bytes", m.name, m.startTime, ts, len(m.buf)-m.pos)
- m.flushChan <- m.copyToFlush()
+ toFlush = m.copyToFlush()
m.startTime = ts
if len(m.buf) < size+4 {
m.buf = make([]byte, 2*size+4)
@@ -148,8 +152,10 @@ func (m *LogBuffer) loopInterval() {
return
}
toFlush := m.copyToFlush()
- m.flushChan <- toFlush
m.Unlock()
+ if toFlush != nil {
+ m.flushChan <- toFlush
+ }
}
}
@@ -188,16 +194,34 @@ func (m *LogBuffer) ReadFromBuffer(lastReadTime time.Time) (bufferCopy *bytes.Bu
m.RLock()
defer m.RUnlock()
- if !m.lastFlushTime.IsZero() && m.lastFlushTime.After(lastReadTime) {
- return nil, ResumeFromDiskError
+ // Read from disk and memory
+ // 1. read from disk, last time is = td
+ // 2. in memory, the earliest time = tm
+ // if tm <= td, case 2.1
+ // read from memory
+ // if tm is empty, case 2.2
+ // read from memory
+ // if td < tm, case 2.3
+ // read from disk again
+ var tsMemory time.Time
+ if !m.startTime.IsZero() {
+ tsMemory = m.startTime
}
-
- /*
- fmt.Printf("read buffer %p: %v last stop time: [%v,%v], pos %d, entries:%d, prevBufs:%d\n", m, lastReadTime, m.startTime, m.stopTime, m.pos, len(m.idx), len(m.prevBuffers.buffers))
- for i, prevBuf := range m.prevBuffers.buffers {
- fmt.Printf(" prev %d : %s\n", i, prevBuf.String())
+ for _, prevBuf := range m.prevBuffers.buffers {
+ if !prevBuf.startTime.IsZero() && prevBuf.startTime.Before(tsMemory) {
+ tsMemory = prevBuf.startTime
}
- */
+ }
+ if tsMemory.IsZero() { // case 2.2
+ return nil, nil
+ } else if lastReadTime.Before(tsMemory) { // case 2.3
+ if !m.lastFlushTime.IsZero() {
+ glog.V(0).Infof("resume with last flush time: %v", m.lastFlushTime)
+ return nil, ResumeFromDiskError
+ }
+ }
+
+ // the following is case 2.1
if lastReadTime.Equal(m.stopTime) {
return nil, nil
diff --git a/weed/util/log_buffer/log_buffer_test.go b/weed/util/log_buffer/log_buffer_test.go
index 7dcfe5f52..915b93bf4 100644
--- a/weed/util/log_buffer/log_buffer_test.go
+++ b/weed/util/log_buffer/log_buffer_test.go
@@ -27,7 +27,7 @@ func TestNewLogBufferFirstBuffer(t *testing.T) {
}
receivedmessageCount := 0
- lb.LoopProcessLogData("test", startTime, func() bool {
+ lb.LoopProcessLogData("test", startTime, 0, func() bool {
// stop if no more messages
return false
}, func(logEntry *filer_pb.LogEntry) error {
diff --git a/weed/util/log_buffer/log_read.go b/weed/util/log_buffer/log_read.go
index 02f5af274..99532b47b 100644
--- a/weed/util/log_buffer/log_read.go
+++ b/weed/util/log_buffer/log_read.go
@@ -17,10 +17,11 @@ var (
ResumeFromDiskError = fmt.Errorf("resumeFromDisk")
)
-func (logBuffer *LogBuffer) LoopProcessLogData(readerName string, startTreadTime time.Time, waitForDataFn func() bool, eachLogDataFn func(logEntry *filer_pb.LogEntry) error) (lastReadTime time.Time, err error) {
+func (logBuffer *LogBuffer) LoopProcessLogData(readerName string, startReadTime time.Time, stopTsNs int64,
+ waitForDataFn func() bool, eachLogDataFn func(logEntry *filer_pb.LogEntry) error) (lastReadTime time.Time, isDone bool, err error) {
// loop through all messages
var bytesBuf *bytes.Buffer
- lastReadTime = startTreadTime
+ lastReadTime = startReadTime
defer func() {
if bytesBuf != nil {
logBuffer.ReleaseMemory(bytesBuf)
@@ -34,10 +35,15 @@ func (logBuffer *LogBuffer) LoopProcessLogData(readerName string, startTreadTime
}
bytesBuf, err = logBuffer.ReadFromBuffer(lastReadTime)
if err == ResumeFromDiskError {
- return lastReadTime, ResumeFromDiskError
+ time.Sleep(1127 * time.Millisecond)
+ return lastReadTime, isDone, ResumeFromDiskError
}
// glog.V(4).Infof("%s ReadFromBuffer by %v", readerName, lastReadTime)
if bytesBuf == nil {
+ if stopTsNs != 0 {
+ isDone = true
+ return
+ }
if waitForDataFn() {
continue
} else {
@@ -49,7 +55,6 @@ func (logBuffer *LogBuffer) LoopProcessLogData(readerName string, startTreadTime
// fmt.Printf("ReadFromBuffer %s by %v size %d\n", readerName, lastReadTime, len(buf))
batchSize := 0
- var startReadTime time.Time
for pos := 0; pos+4 < len(buf); {
@@ -67,10 +72,11 @@ func (logBuffer *LogBuffer) LoopProcessLogData(readerName string, startTreadTime
pos += 4 + int(size)
continue
}
- lastReadTime = time.Unix(0, logEntry.TsNs)
- if startReadTime.IsZero() {
- startReadTime = lastReadTime
+ if stopTsNs != 0 && logEntry.TsNs > stopTsNs {
+ isDone = true
+ return
}
+ lastReadTime = time.Unix(0, logEntry.TsNs)
if err = eachLogDataFn(logEntry); err != nil {
return
diff --git a/weed/util/mem/slot_pool.go b/weed/util/mem/slot_pool.go
new file mode 100644
index 000000000..b3493febf
--- /dev/null
+++ b/weed/util/mem/slot_pool.go
@@ -0,0 +1,63 @@
+package mem
+
+import (
+ "github.com/chrislusf/seaweedfs/weed/glog"
+ "sync"
+ "sync/atomic"
+)
+
+var pools []*sync.Pool
+
+const (
+ min_size = 1024
+)
+
+func bitCount(size int) (count int) {
+ for ; size > min_size; count++ {
+ size = (size + 1) >> 1
+ }
+ return
+}
+
+func init() {
+ // 1KB ~ 256MB
+ pools = make([]*sync.Pool, bitCount(1024*1024*256))
+ for i := 0; i < len(pools); i++ {
+ slotSize := 1024 << i
+ pools[i] = &sync.Pool{
+ New: func() interface{} {
+ buffer := make([]byte, slotSize)
+ return &buffer
+ },
+ }
+ }
+}
+
+func getSlotPool(size int) (*sync.Pool, bool) {
+ index := bitCount(size)
+ if index >= len(pools) {
+ return nil, false
+ }
+ return pools[index], true
+}
+
+var total int64
+
+func Allocate(size int) []byte {
+ if pool, found := getSlotPool(size); found {
+ newVal := atomic.AddInt64(&total, 1)
+ glog.V(4).Infof("++> %d", newVal)
+
+ slab := *pool.Get().(*[]byte)
+ return slab[:size]
+ }
+ return make([]byte, size)
+}
+
+func Free(buf []byte) {
+ if pool, found := getSlotPool(cap(buf)); found {
+ newVal := atomic.AddInt64(&total, -1)
+ glog.V(4).Infof("--> %d", newVal)
+ pool.Put(&buf)
+ }
+}
diff --git a/weed/util/mem/slot_pool_test.go b/weed/util/mem/slot_pool_test.go
new file mode 100644
index 000000000..44f9ec004
--- /dev/null
+++ b/weed/util/mem/slot_pool_test.go
@@ -0,0 +1,48 @@
+package mem
+
+import (
+ "github.com/stretchr/testify/assert"
+ "testing"
+)
+
+func TestAllocateFree(t *testing.T) {
+ buf := Allocate(12)
+ Free(buf)
+ if cap(buf) != min_size {
+ t.Errorf("min size error allocated capacity=%d", cap(buf))
+ }
+ if len(buf) != 12 {
+ t.Errorf("size error")
+ }
+
+ buf = Allocate(4883)
+ Free(buf)
+ if cap(buf) != 1024<<bitCount(4883) {
+ t.Errorf("min size error allocated capacity=%d", cap(buf))
+ }
+ if len(buf) != 4883 {
+ t.Errorf("size error")
+ }
+
+}
+
+func TestAllocateFreeEdgeCases(t *testing.T) {
+ assert.Equal(t, 1, bitCount(2048))
+ assert.Equal(t, 2, bitCount(2049))
+
+ buf := Allocate(2048)
+ Free(buf)
+ buf = Allocate(2049)
+ Free(buf)
+}
+
+func TestBitCount(t *testing.T) {
+ count := bitCount(12)
+ if count != 0 {
+ t.Errorf("bitCount error count=%d", count)
+ }
+ if count != bitCount(min_size) {
+ t.Errorf("bitCount error count=%d", count)
+ }
+
+}
diff --git a/weed/util/net_timeout.go b/weed/util/net_timeout.go
index e8075c297..536359eec 100644
--- a/weed/util/net_timeout.go
+++ b/weed/util/net_timeout.go
@@ -1,6 +1,7 @@
package util
import (
+ "github.com/chrislusf/seaweedfs/weed/glog"
"net"
"time"
@@ -36,11 +37,13 @@ type Conn struct {
ReadTimeout time.Duration
WriteTimeout time.Duration
isClosed bool
+ bytesRead int64
+ bytesWritten int64
}
func (c *Conn) Read(b []byte) (count int, e error) {
if c.ReadTimeout != 0 {
- err := c.Conn.SetReadDeadline(time.Now().Add(c.ReadTimeout))
+ err := c.Conn.SetReadDeadline(time.Now().Add(c.ReadTimeout * time.Duration(c.bytesRead/40000+1)))
if err != nil {
return 0, err
}
@@ -48,6 +51,7 @@ func (c *Conn) Read(b []byte) (count int, e error) {
count, e = c.Conn.Read(b)
if e == nil {
stats.BytesIn(int64(count))
+ c.bytesRead += int64(count)
}
return
}
@@ -55,7 +59,7 @@ func (c *Conn) Read(b []byte) (count int, e error) {
func (c *Conn) Write(b []byte) (count int, e error) {
if c.WriteTimeout != 0 {
// minimum 4KB/s
- err := c.Conn.SetWriteDeadline(time.Now().Add(c.WriteTimeout * time.Duration(len(b)/40000+1)))
+ err := c.Conn.SetWriteDeadline(time.Now().Add(c.WriteTimeout * time.Duration(c.bytesWritten/40000+1)))
if err != nil {
return 0, err
}
@@ -63,6 +67,7 @@ func (c *Conn) Write(b []byte) (count int, e error) {
count, e = c.Conn.Write(b)
if e == nil {
stats.BytesOut(int64(count))
+ c.bytesWritten += int64(count)
}
return
}
@@ -78,16 +83,46 @@ func (c *Conn) Close() error {
return err
}
-func NewListener(addr string, timeout time.Duration) (net.Listener, error) {
- l, err := net.Listen("tcp", addr)
+func NewListener(addr string, timeout time.Duration) (ipListner net.Listener, err error) {
+ listner, err := net.Listen("tcp", addr)
if err != nil {
- return nil, err
+ return
}
- tl := &Listener{
- Listener: l,
+ ipListner = &Listener{
+ Listener: listner,
ReadTimeout: timeout,
WriteTimeout: timeout,
}
- return tl, nil
+
+ return
+}
+
+func NewIpAndLocalListeners(host string, port int, timeout time.Duration) (ipListner net.Listener, localListener net.Listener, err error) {
+ listner, err := net.Listen("tcp", JoinHostPort(host, port))
+ if err != nil {
+ return
+ }
+
+ ipListner = &Listener{
+ Listener: listner,
+ ReadTimeout: timeout,
+ WriteTimeout: timeout,
+ }
+
+ if host != "localhost" && host != "" && host != "0.0.0.0" && host != "127.0.0.1" && host != "[::]" && host != "[::1]" {
+ listner, err = net.Listen("tcp", JoinHostPort("localhost", port))
+ if err != nil {
+ glog.V(0).Infof("skip starting on %s:%d: %v", host, port, err)
+ return ipListner, nil, nil
+ }
+
+ localListener = &Listener{
+ Listener: listner,
+ ReadTimeout: timeout,
+ WriteTimeout: timeout,
+ }
+ }
+
+ return
}
diff --git a/weed/util/network.go b/weed/util/network.go
index 55a123667..687b6ec22 100644
--- a/weed/util/network.go
+++ b/weed/util/network.go
@@ -2,6 +2,8 @@ package util
import (
"net"
+ "strconv"
+ "strings"
"github.com/chrislusf/seaweedfs/weed/glog"
)
@@ -13,6 +15,18 @@ func DetectedHostAddress() string {
return ""
}
+ if v4Address := selectIpV4(netInterfaces, true); v4Address != "" {
+ return v4Address
+ }
+
+ if v6Address := selectIpV4(netInterfaces, false); v6Address != "" {
+ return v6Address
+ }
+
+ return "localhost"
+}
+
+func selectIpV4(netInterfaces []net.Interface, isIpV4 bool) string {
for _, netInterface := range netInterfaces {
if (netInterface.Flags & net.FlagUp) == 0 {
continue
@@ -24,12 +38,25 @@ func DetectedHostAddress() string {
for _, a := range addrs {
if ipNet, ok := a.(*net.IPNet); ok && !ipNet.IP.IsLoopback() {
- if ipNet.IP.To4() != nil {
- return ipNet.IP.String()
+ if isIpV4 {
+ if ipNet.IP.To4() != nil {
+ return ipNet.IP.String()
+ }
+ } else {
+ if ipNet.IP.To16() != nil {
+ return ipNet.IP.String()
+ }
}
}
}
}
+ return ""
+}
- return "localhost"
+func JoinHostPort(host string, port int) string {
+ portStr := strconv.Itoa(port)
+ if strings.HasPrefix(host, "[") && strings.HasSuffix(host, "]") {
+ return host + ":" + portStr
+ }
+ return net.JoinHostPort(host, portStr)
}
diff --git a/weed/util/parse.go b/weed/util/parse.go
index 0955db682..502f3a80f 100644
--- a/weed/util/parse.go
+++ b/weed/util/parse.go
@@ -61,3 +61,8 @@ func ParseHostPort(hostPort string) (filerServer string, filerPort int64, err er
return
}
+
+func CanonicalizeETag(etag string) string {
+ canonicalETag := strings.TrimPrefix(etag, "\"")
+ return strings.TrimSuffix(canonicalETag, "\"")
+}
diff --git a/weed/util/retry.go b/weed/util/retry.go
index e1ad99d54..b7cd278b3 100644
--- a/weed/util/retry.go
+++ b/weed/util/retry.go
@@ -37,6 +37,7 @@ func RetryForever(name string, job func() error, onErrFn func(err error) bool) {
for {
err := job()
if err == nil {
+ waitTime = time.Second
break
}
if onErrFn(err) {
diff --git a/weed/util/skiplist/Makefile b/weed/util/skiplist/Makefile
new file mode 100644
index 000000000..af4afe639
--- /dev/null
+++ b/weed/util/skiplist/Makefile
@@ -0,0 +1,6 @@
+all: gen
+
+.PHONY : gen
+
+gen:
+ protoc skiplist.proto --go_out=plugins=grpc:. --go_opt=paths=source_relative
diff --git a/weed/util/skiplist/list_store.go b/weed/util/skiplist/list_store.go
new file mode 100644
index 000000000..0eb1106bc
--- /dev/null
+++ b/weed/util/skiplist/list_store.go
@@ -0,0 +1,32 @@
+package skiplist
+
+type ListStore interface {
+ SaveElement(id int64, element *SkipListElement) error
+ DeleteElement(id int64) error
+ LoadElement(id int64) (*SkipListElement, error)
+}
+
+type MemStore struct {
+ m map[int64]*SkipListElement
+}
+
+func newMemStore() *MemStore {
+ return &MemStore{
+ m: make(map[int64]*SkipListElement),
+ }
+}
+
+func (m *MemStore) SaveElement(id int64, element *SkipListElement) error {
+ m.m[id] = element
+ return nil
+}
+
+func (m *MemStore) DeleteElement(id int64) error {
+ delete(m.m, id)
+ return nil
+}
+
+func (m *MemStore) LoadElement(id int64) (*SkipListElement, error) {
+ element := m.m[id]
+ return element, nil
+}
diff --git a/weed/util/skiplist/name_batch.go b/weed/util/skiplist/name_batch.go
new file mode 100644
index 000000000..53db5918f
--- /dev/null
+++ b/weed/util/skiplist/name_batch.go
@@ -0,0 +1,102 @@
+package skiplist
+
+import (
+ "github.com/chrislusf/seaweedfs/weed/glog"
+ "github.com/golang/protobuf/proto"
+ "golang.org/x/exp/slices"
+ "strings"
+)
+
+type NameBatch struct {
+ key string
+ names map[string]struct{}
+}
+
+func (nb *NameBatch) ContainsName(name string) (found bool) {
+ _, found = nb.names[name]
+ return
+}
+func (nb *NameBatch) WriteName(name string) {
+ if nb.key == "" || strings.Compare(nb.key, name) > 0 {
+ nb.key = name
+ }
+ nb.names[name] = struct{}{}
+}
+func (nb *NameBatch) DeleteName(name string) {
+ delete(nb.names, name)
+ if nb.key == name {
+ nb.key = ""
+ for n := range nb.names {
+ if nb.key == "" || strings.Compare(nb.key, n) > 0 {
+ nb.key = n
+ }
+ }
+ }
+}
+func (nb *NameBatch) ListNames(startFrom string, visitNamesFn func(name string) bool) bool {
+ var names []string
+ needFilter := startFrom != ""
+ for n := range nb.names {
+ if !needFilter || strings.Compare(n, startFrom) >= 0 {
+ names = append(names, n)
+ }
+ }
+ slices.SortFunc(names, func(a, b string) bool {
+ return strings.Compare(a, b) < 0
+ })
+ for _, n := range names {
+ if !visitNamesFn(n) {
+ return false
+ }
+ }
+ return true
+}
+
+func NewNameBatch() *NameBatch {
+ return &NameBatch{
+ names: make(map[string]struct{}),
+ }
+}
+
+func LoadNameBatch(data []byte) *NameBatch {
+ t := &NameBatchData{}
+ if len(data) > 0 {
+ err := proto.Unmarshal(data, t)
+ if err != nil {
+ glog.Errorf("unmarshal into NameBatchData{} : %v", err)
+ return nil
+ }
+ }
+ nb := NewNameBatch()
+ for _, n := range t.Names {
+ name := string(n)
+ if nb.key == "" || strings.Compare(nb.key, name) > 0 {
+ nb.key = name
+ }
+ nb.names[name] = struct{}{}
+ }
+ return nb
+}
+
+func (nb *NameBatch) ToBytes() []byte {
+ t := &NameBatchData{}
+ for n := range nb.names {
+ t.Names = append(t.Names, []byte(n))
+ }
+ data, _ := proto.Marshal(t)
+ return data
+}
+
+func (nb *NameBatch) SplitBy(name string) (x, y *NameBatch) {
+ x, y = NewNameBatch(), NewNameBatch()
+
+ for n := range nb.names {
+ // there should be no equal case though
+ if strings.Compare(n, name) <= 0 {
+ x.WriteName(n)
+ } else {
+ y.WriteName(n)
+ }
+ }
+ return
+}
diff --git a/weed/util/skiplist/name_list.go b/weed/util/skiplist/name_list.go
new file mode 100644
index 000000000..1d4d2ebae
--- /dev/null
+++ b/weed/util/skiplist/name_list.go
@@ -0,0 +1,326 @@
+package skiplist
+
+import (
+ "bytes"
+)
+
+type NameList struct {
+ skipList *SkipList
+ batchSize int
+}
+
+func newNameList(store ListStore, batchSize int) *NameList {
+ return &NameList{
+ skipList: New(store),
+ batchSize: batchSize,
+ }
+}
+
+/*
+Be reluctant to create new nodes. Try to fit into either previous node or next node.
+Prefer to add to previous node.
+
+There are multiple cases after finding the name for greater or equal node
+ 1. found and node.Key == name
+ The node contains a batch with leading key the same as the name
+ nothing to do
+ 2. no such node found or node.Key > name
+
+ if no such node found
+ prevNode = list.LargestNode
+
+ // case 2.1
+ if previousNode contains name
+ nothing to do
+
+ // prefer to add to previous node
+ if prevNode != nil {
+ // case 2.2
+ if prevNode has capacity
+ prevNode.add name, and save
+ return
+ // case 2.3
+ split prevNode by name
+ }
+
+ // case 2.4
+ // merge into next node. Avoid too many nodes if adding data in reverse order.
+ if nextNode is not nil and nextNode has capacity
+ delete nextNode.Key
+ nextNode.Key = name
+ nextNode.batch.add name
+ insert nodeNode.Key
+ return
+
+ // case 2.5
+ if prevNode is nil
+ insert new node with key = name, value = batch{name}
+ return
+
+*/
+func (nl *NameList) WriteName(name string) error {
+
+ lookupKey := []byte(name)
+ prevNode, nextNode, found, err := nl.skipList.FindGreaterOrEqual(lookupKey)
+ if err != nil {
+ return err
+ }
+ // case 1: the name already exists as one leading key in the batch
+ if found && bytes.Compare(nextNode.Key, lookupKey) == 0 {
+ return nil
+ }
+
+ if !found {
+ prevNode, err = nl.skipList.GetLargestNode()
+ if err != nil {
+ return err
+ }
+ }
+
+ if nextNode != nil && prevNode == nil {
+ prevNode, err = nl.skipList.LoadElement(nextNode.Prev)
+ if err != nil {
+ return err
+ }
+ }
+
+ if prevNode != nil {
+ prevNameBatch := LoadNameBatch(prevNode.Value)
+ // case 2.1
+ if prevNameBatch.ContainsName(name) {
+ return nil
+ }
+
+ // case 2.2
+ if len(prevNameBatch.names) < nl.batchSize {
+ prevNameBatch.WriteName(name)
+ return nl.skipList.ChangeValue(prevNode, prevNameBatch.ToBytes())
+ }
+
+ // case 2.3
+ x, y := prevNameBatch.SplitBy(name)
+ addToX := len(x.names) <= len(y.names)
+ if len(x.names) != len(prevNameBatch.names) {
+ if addToX {
+ x.WriteName(name)
+ }
+ if x.key == prevNameBatch.key {
+ if err := nl.skipList.ChangeValue(prevNode, x.ToBytes()); err != nil {
+ return err
+ }
+ } else {
+ if _, err := nl.skipList.InsertByKey([]byte(x.key), 0, x.ToBytes()); err != nil {
+ return err
+ }
+ }
+ }
+ if len(y.names) != len(prevNameBatch.names) {
+ if !addToX {
+ y.WriteName(name)
+ }
+ if y.key == prevNameBatch.key {
+ if err := nl.skipList.ChangeValue(prevNode, y.ToBytes()); err != nil {
+ return err
+ }
+ } else {
+ if _, err := nl.skipList.InsertByKey([]byte(y.key), 0, y.ToBytes()); err != nil {
+ return err
+ }
+ }
+ }
+ return nil
+
+ }
+
+ // case 2.4
+ if nextNode != nil {
+ nextNameBatch := LoadNameBatch(nextNode.Value)
+ if len(nextNameBatch.names) < nl.batchSize {
+ if _, err := nl.skipList.DeleteByKey(nextNode.Key); err != nil {
+ return err
+ }
+ nextNameBatch.WriteName(name)
+ if _, err := nl.skipList.InsertByKey([]byte(nextNameBatch.key), 0, nextNameBatch.ToBytes()); err != nil {
+ return err
+ }
+ return nil
+ }
+ }
+
+ // case 2.5
+ // now prevNode is nil
+ newNameBatch := NewNameBatch()
+ newNameBatch.WriteName(name)
+ if _, err := nl.skipList.InsertByKey([]byte(newNameBatch.key), 0, newNameBatch.ToBytes()); err != nil {
+ return err
+ }
+
+ return nil
+}
+
+/*
+// case 1: exists in nextNode
+if nextNode != nil && nextNode.Key == name {
+ remove from nextNode, update nextNode
+ // TODO: merge with prevNode if possible?
+ return
+}
+if nextNode is nil
+ prevNode = list.Largestnode
+if prevNode == nil and nextNode.Prev != nil
+ prevNode = load(nextNode.Prev)
+
+// case 2: does not exist
+// case 2.1
+if prevNode == nil {
+ return
+}
+// case 2.2
+if prevNameBatch does not contain name {
+ return
+}
+
+// case 3
+delete from prevNameBatch
+if prevNameBatch + nextNode < capacityList
+ // case 3.1
+ merge
+else
+ // case 3.2
+ update prevNode
+
+
+*/
+func (nl *NameList) DeleteName(name string) error {
+ lookupKey := []byte(name)
+ prevNode, nextNode, found, err := nl.skipList.FindGreaterOrEqual(lookupKey)
+ if err != nil {
+ return err
+ }
+
+ // case 1
+ var nextNameBatch *NameBatch
+ if nextNode != nil {
+ nextNameBatch = LoadNameBatch(nextNode.Value)
+ }
+ if found && bytes.Compare(nextNode.Key, lookupKey) == 0 {
+ if _, err := nl.skipList.DeleteByKey(nextNode.Key); err != nil {
+ return err
+ }
+ nextNameBatch.DeleteName(name)
+ if len(nextNameBatch.names) > 0 {
+ if _, err := nl.skipList.InsertByKey([]byte(nextNameBatch.key), 0, nextNameBatch.ToBytes()); err != nil {
+ return err
+ }
+ }
+ return nil
+ }
+
+ if !found {
+ prevNode, err = nl.skipList.GetLargestNode()
+ if err != nil {
+ return err
+ }
+ }
+
+ if nextNode != nil && prevNode == nil {
+ prevNode, err = nl.skipList.LoadElement(nextNode.Prev)
+ if err != nil {
+ return err
+ }
+ }
+
+ // case 2
+ if prevNode == nil {
+ // case 2.1
+ return nil
+ }
+ prevNameBatch := LoadNameBatch(prevNode.Value)
+ if !prevNameBatch.ContainsName(name) {
+ // case 2.2
+ return nil
+ }
+
+ // case 3
+ prevNameBatch.DeleteName(name)
+ if len(prevNameBatch.names) == 0 {
+ if _, err := nl.skipList.DeleteByKey(prevNode.Key); err != nil {
+ return err
+ }
+ return nil
+ }
+ if nextNameBatch != nil && len(nextNameBatch.names)+len(prevNameBatch.names) < nl.batchSize {
+ // case 3.1 merge nextNode and prevNode
+ if _, err := nl.skipList.DeleteByKey(nextNode.Key); err != nil {
+ return err
+ }
+ for nextName := range nextNameBatch.names {
+ prevNameBatch.WriteName(nextName)
+ }
+ return nl.skipList.ChangeValue(prevNode, prevNameBatch.ToBytes())
+ } else {
+ // case 3.2 update prevNode
+ return nl.skipList.ChangeValue(prevNode, prevNameBatch.ToBytes())
+ }
+
+ return nil
+}
+
+func (nl *NameList) ListNames(startFrom string, visitNamesFn func(name string) bool) error {
+ lookupKey := []byte(startFrom)
+ prevNode, nextNode, found, err := nl.skipList.FindGreaterOrEqual(lookupKey)
+ if err != nil {
+ return err
+ }
+ if found && bytes.Compare(nextNode.Key, lookupKey) == 0 {
+ prevNode = nil
+ }
+ if !found {
+ prevNode, err = nl.skipList.GetLargestNode()
+ if err != nil {
+ return err
+ }
+ }
+
+ if prevNode != nil {
+ prevNameBatch := LoadNameBatch(prevNode.Value)
+ if !prevNameBatch.ListNames(startFrom, visitNamesFn) {
+ return nil
+ }
+ }
+
+ for nextNode != nil {
+ nextNameBatch := LoadNameBatch(nextNode.Value)
+ if !nextNameBatch.ListNames(startFrom, visitNamesFn) {
+ return nil
+ }
+ nextNode, err = nl.skipList.LoadElement(nextNode.Next[0])
+ if err != nil {
+ return err
+ }
+ }
+
+ return nil
+}
+
+func (nl *NameList) RemoteAllListElement() error {
+
+ t := nl.skipList
+
+ nodeRef := t.StartLevels[0]
+ for nodeRef != nil {
+ node, err := t.LoadElement(nodeRef)
+ if err != nil {
+ return err
+ }
+ if node == nil {
+ return nil
+ }
+ if err := t.DeleteElement(node); err != nil {
+ return err
+ }
+ nodeRef = node.Next[0]
+ }
+ return nil
+
+}
diff --git a/weed/util/skiplist/name_list_serde.go b/weed/util/skiplist/name_list_serde.go
new file mode 100644
index 000000000..0a2052e7b
--- /dev/null
+++ b/weed/util/skiplist/name_list_serde.go
@@ -0,0 +1,71 @@
+package skiplist
+
+import (
+ "github.com/chrislusf/seaweedfs/weed/glog"
+ "github.com/golang/protobuf/proto"
+)
+
+func LoadNameList(data []byte, store ListStore, batchSize int) *NameList {
+
+ nl := &NameList{
+ skipList: New(store),
+ batchSize: batchSize,
+ }
+
+ if len(data) == 0 {
+ return nl
+ }
+
+ message := &SkipListProto{}
+ if err := proto.Unmarshal(data, message); err != nil {
+ glog.Errorf("loading skiplist: %v", err)
+ }
+ nl.skipList.MaxNewLevel = int(message.MaxNewLevel)
+ nl.skipList.MaxLevel = int(message.MaxLevel)
+ for i, ref := range message.StartLevels {
+ nl.skipList.StartLevels[i] = &SkipListElementReference{
+ ElementPointer: ref.ElementPointer,
+ Key: ref.Key,
+ }
+ }
+ for i, ref := range message.EndLevels {
+ nl.skipList.EndLevels[i] = &SkipListElementReference{
+ ElementPointer: ref.ElementPointer,
+ Key: ref.Key,
+ }
+ }
+ return nl
+}
+
+func (nl *NameList) HasChanges() bool {
+ return nl.skipList.HasChanges
+}
+
+func (nl *NameList) ToBytes() []byte {
+ message := &SkipListProto{}
+ message.MaxNewLevel = int32(nl.skipList.MaxNewLevel)
+ message.MaxLevel = int32(nl.skipList.MaxLevel)
+ for _, ref := range nl.skipList.StartLevels {
+ if ref == nil {
+ break
+ }
+ message.StartLevels = append(message.StartLevels, &SkipListElementReference{
+ ElementPointer: ref.ElementPointer,
+ Key: ref.Key,
+ })
+ }
+ for _, ref := range nl.skipList.EndLevels {
+ if ref == nil {
+ break
+ }
+ message.EndLevels = append(message.EndLevels, &SkipListElementReference{
+ ElementPointer: ref.ElementPointer,
+ Key: ref.Key,
+ })
+ }
+ data, err := proto.Marshal(message)
+ if err != nil {
+ glog.Errorf("marshal skiplist: %v", err)
+ }
+ return data
+}
diff --git a/weed/util/skiplist/name_list_test.go b/weed/util/skiplist/name_list_test.go
new file mode 100644
index 000000000..9d539c9ba
--- /dev/null
+++ b/weed/util/skiplist/name_list_test.go
@@ -0,0 +1,73 @@
+package skiplist
+
+import (
+ "math/rand"
+ "strconv"
+ "testing"
+)
+
+const (
+ maxNameCount = 100
+)
+
+func String(x int) string {
+ return strconv.Itoa(x)
+}
+
+func TestNameList(t *testing.T) {
+ list := newNameList(memStore, 7)
+
+ for i := 0; i < maxNameCount; i++ {
+ list.WriteName(String(i))
+ }
+
+ counter := 0
+ list.ListNames("", func(name string) bool {
+ counter++
+ print(name, " ")
+ return true
+ })
+ if counter != maxNameCount {
+ t.Fail()
+ }
+
+ // list.skipList.println()
+
+ deleteBase := 5
+ deleteCount := maxNameCount - 3*deleteBase
+
+ for i := deleteBase; i < deleteBase+deleteCount; i++ {
+ list.DeleteName(String(i))
+ }
+
+ counter = 0
+ list.ListNames("", func(name string) bool {
+ counter++
+ return true
+ })
+ // list.skipList.println()
+ if counter != maxNameCount-deleteCount {
+ t.Fail()
+ }
+
+ // randomized deletion
+ list = newNameList(memStore, 7)
+ // Delete elements at random positions in the list.
+ rList := rand.Perm(maxN)
+ for _, i := range rList {
+ list.WriteName(String(i))
+ }
+ for _, i := range rList {
+ list.DeleteName(String(i))
+ }
+ counter = 0
+ list.ListNames("", func(name string) bool {
+ counter++
+ print(name, " ")
+ return true
+ })
+ if counter != 0 {
+ t.Fail()
+ }
+
+}
diff --git a/weed/util/skiplist/skiplist.go b/weed/util/skiplist/skiplist.go
new file mode 100644
index 000000000..21eed4b43
--- /dev/null
+++ b/weed/util/skiplist/skiplist.go
@@ -0,0 +1,571 @@
+package skiplist
+
+// adapted from https://github.com/MauriceGit/skiplist/blob/master/skiplist.go
+
+import (
+ "bytes"
+ "fmt"
+ "math/bits"
+ "math/rand"
+ "time"
+)
+
+const (
+ // maxLevel denotes the maximum height of the skiplist. This height will keep the skiplist
+ // efficient for up to 34m entries. If there is a need for much more, please adjust this constant accordingly.
+ maxLevel = 25
+)
+
+type SkipList struct {
+ StartLevels [maxLevel]*SkipListElementReference
+ EndLevels [maxLevel]*SkipListElementReference
+ MaxNewLevel int
+ MaxLevel int
+ ListStore ListStore
+ HasChanges bool
+ // elementCount int
+}
+
+// NewSeedEps returns a new empty, initialized Skiplist.
+// Given a seed, a deterministic height/list behaviour can be achieved.
+// Eps is used to compare keys given by the ExtractKey() function on equality.
+func NewSeed(seed int64, listStore ListStore) *SkipList {
+
+ // Initialize random number generator.
+ rand.Seed(seed)
+ //fmt.Printf("SkipList seed: %v\n", seed)
+
+ list := &SkipList{
+ MaxNewLevel: maxLevel,
+ MaxLevel: 0,
+ ListStore: listStore,
+ // elementCount: 0,
+ }
+
+ return list
+}
+
+// New returns a new empty, initialized Skiplist.
+func New(listStore ListStore) *SkipList {
+ return NewSeed(time.Now().UTC().UnixNano(), listStore)
+}
+
+// IsEmpty checks, if the skiplist is empty.
+func (t *SkipList) IsEmpty() bool {
+ return t.StartLevels[0] == nil
+}
+
+func (t *SkipList) generateLevel(maxLevel int) int {
+ level := maxLevel - 1
+ // First we apply some mask which makes sure that we don't get a level
+ // above our desired level. Then we find the first set bit.
+ var x = rand.Uint64() & ((1 << uint(maxLevel-1)) - 1)
+ zeroes := bits.TrailingZeros64(x)
+ if zeroes <= maxLevel {
+ level = zeroes
+ }
+
+ return level
+}
+
+func (t *SkipList) findEntryIndex(key []byte, minLevel int) int {
+ // Find good entry point so we don't accidentally skip half the list.
+ for i := t.MaxLevel; i >= 0; i-- {
+ if t.StartLevels[i] != nil && bytes.Compare(t.StartLevels[i].Key, key) < 0 || i <= minLevel {
+ return i
+ }
+ }
+ return 0
+}
+
+func (t *SkipList) findExtended(key []byte, findGreaterOrEqual bool) (prevElementIfVisited *SkipListElement, foundElem *SkipListElement, ok bool, err error) {
+
+ foundElem = nil
+ ok = false
+
+ if t.IsEmpty() {
+ return
+ }
+
+ index := t.findEntryIndex(key, 0)
+ var currentNode *SkipListElement
+
+ currentNode, err = t.LoadElement(t.StartLevels[index])
+ if err != nil {
+ return
+ }
+ if currentNode == nil {
+ return
+ }
+
+ // In case, that our first element is already greater-or-equal!
+ if findGreaterOrEqual && compareElement(currentNode, key) > 0 {
+ foundElem = currentNode
+ ok = true
+ return
+ }
+
+ for {
+ if compareElement(currentNode, key) == 0 {
+ foundElem = currentNode
+ ok = true
+ return
+ }
+
+ // Which direction are we continuing next time?
+ if currentNode.Next[index] != nil && bytes.Compare(currentNode.Next[index].Key, key) <= 0 {
+ // Go right
+ currentNode, err = t.LoadElement(currentNode.Next[index])
+ if err != nil {
+ return
+ }
+ if currentNode == nil {
+ return
+ }
+ } else {
+ if index > 0 {
+
+ // Early exit
+ if currentNode.Next[0] != nil && bytes.Compare(currentNode.Next[0].Key, key) == 0 {
+ prevElementIfVisited = currentNode
+ var currentNodeNext *SkipListElement
+ currentNodeNext, err = t.LoadElement(currentNode.Next[0])
+ if err != nil {
+ return
+ }
+ if currentNodeNext == nil {
+ return
+ }
+ foundElem = currentNodeNext
+ ok = true
+ return
+ }
+ // Go down
+ index--
+ } else {
+ // Element is not found and we reached the bottom.
+ if findGreaterOrEqual {
+ foundElem, err = t.LoadElement(currentNode.Next[index])
+ if err != nil {
+ return
+ }
+ ok = foundElem != nil
+ }
+
+ return
+ }
+ }
+ }
+}
+
+// Find tries to find an element in the skiplist based on the key from the given ListElement.
+// elem can be used, if ok is true.
+// Find runs in approx. O(log(n))
+func (t *SkipList) Find(key []byte) (prevIfVisited *SkipListElement, elem *SkipListElement, ok bool, err error) {
+
+ if t == nil || key == nil {
+ return
+ }
+
+ prevIfVisited, elem, ok, err = t.findExtended(key, false)
+ return
+}
+
+// FindGreaterOrEqual finds the first element, that is greater or equal to the given ListElement e.
+// The comparison is done on the keys (So on ExtractKey()).
+// FindGreaterOrEqual runs in approx. O(log(n))
+func (t *SkipList) FindGreaterOrEqual(key []byte) (prevIfVisited *SkipListElement, elem *SkipListElement, ok bool, err error) {
+
+ if t == nil || key == nil {
+ return
+ }
+
+ prevIfVisited, elem, ok, err = t.findExtended(key, true)
+ return
+}
+
+// Delete removes an element equal to e from the skiplist, if there is one.
+// If there are multiple entries with the same value, Delete will remove one of them
+// (Which one will change based on the actual skiplist layout)
+// Delete runs in approx. O(log(n))
+func (t *SkipList) DeleteByKey(key []byte) (id int64, err error) {
+
+ if t == nil || t.IsEmpty() || key == nil {
+ return
+ }
+
+ index := t.findEntryIndex(key, t.MaxLevel)
+
+ var currentNode *SkipListElement
+ var nextNode *SkipListElement
+
+ for {
+
+ if currentNode == nil {
+ nextNode, err = t.LoadElement(t.StartLevels[index])
+ } else {
+ nextNode, err = t.LoadElement(currentNode.Next[index])
+ }
+ if err != nil {
+ return id, err
+ }
+
+ // Found and remove!
+ if nextNode != nil && compareElement(nextNode, key) == 0 {
+
+ if currentNode != nil {
+ currentNode.Next[index] = nextNode.Next[index]
+ if err = t.SaveElement(currentNode); err != nil {
+ return id, err
+ }
+ }
+
+ if index == 0 {
+ if nextNode.Next[index] != nil {
+ nextNextNode, err := t.LoadElement(nextNode.Next[index])
+ if err != nil {
+ return id, err
+ }
+ if nextNextNode != nil {
+ nextNextNode.Prev = currentNode.Reference()
+ if err = t.SaveElement(nextNextNode); err != nil {
+ return id, err
+ }
+ }
+ }
+ // t.elementCount--
+ id = nextNode.Id
+ if err = t.DeleteElement(nextNode); err != nil {
+ return id, err
+ }
+ }
+
+ // Link from start needs readjustments.
+ startNextKey := t.StartLevels[index].Key
+ if compareElement(nextNode, startNextKey) == 0 {
+ t.HasChanges = true
+ t.StartLevels[index] = nextNode.Next[index]
+ // This was our currently highest node!
+ if t.StartLevels[index] == nil {
+ t.MaxLevel = index - 1
+ }
+ }
+
+ // Link from end needs readjustments.
+ if nextNode.Next[index] == nil {
+ t.EndLevels[index] = currentNode.Reference()
+ t.HasChanges = true
+ }
+ nextNode.Next[index] = nil
+ }
+
+ if nextNode != nil && compareElement(nextNode, key) < 0 {
+ // Go right
+ currentNode = nextNode
+ } else {
+ // Go down
+ index--
+ if index < 0 {
+ break
+ }
+ }
+ }
+ return
+}
+
+// Insert inserts the given ListElement into the skiplist.
+// Insert runs in approx. O(log(n))
+func (t *SkipList) InsertByKey(key []byte, idIfKnown int64, value []byte) (id int64, err error) {
+
+ if t == nil || key == nil {
+ return
+ }
+
+ level := t.generateLevel(t.MaxNewLevel)
+
+ // Only grow the height of the skiplist by one at a time!
+ if level > t.MaxLevel {
+ level = t.MaxLevel + 1
+ t.MaxLevel = level
+ t.HasChanges = true
+ }
+
+ id = idIfKnown
+ if id == 0 {
+ id = rand.Int63()
+ }
+ elem := &SkipListElement{
+ Id: id,
+ Next: make([]*SkipListElementReference, t.MaxNewLevel, t.MaxNewLevel),
+ Level: int32(level),
+ Key: key,
+ Value: value,
+ }
+
+ // t.elementCount++
+
+ newFirst := true
+ newLast := true
+ if !t.IsEmpty() {
+ newFirst = compareElement(elem, t.StartLevels[0].Key) < 0
+ newLast = compareElement(elem, t.EndLevels[0].Key) > 0
+ }
+
+ normallyInserted := false
+ if !newFirst && !newLast {
+
+ normallyInserted = true
+
+ index := t.findEntryIndex(key, level)
+
+ var currentNode *SkipListElement
+ var nextNodeRef *SkipListElementReference
+
+ for {
+
+ if currentNode == nil {
+ nextNodeRef = t.StartLevels[index]
+ } else {
+ nextNodeRef = currentNode.Next[index]
+ }
+
+ var nextNode *SkipListElement
+
+ // Connect node to next
+ if index <= level && (nextNodeRef == nil || bytes.Compare(nextNodeRef.Key, key) > 0) {
+ elem.Next[index] = nextNodeRef
+ if currentNode != nil {
+ currentNode.Next[index] = elem.Reference()
+ if err = t.SaveElement(currentNode); err != nil {
+ return
+ }
+ }
+ if index == 0 {
+ elem.Prev = currentNode.Reference()
+ if nextNodeRef != nil {
+ if nextNode, err = t.LoadElement(nextNodeRef); err != nil {
+ return
+ }
+ if nextNode != nil {
+ nextNode.Prev = elem.Reference()
+ if err = t.SaveElement(nextNode); err != nil {
+ return
+ }
+ }
+ }
+ }
+ }
+
+ if nextNodeRef != nil && bytes.Compare(nextNodeRef.Key, key) <= 0 {
+ // Go right
+ if nextNode == nil {
+ // reuse nextNode when index == 0
+ if nextNode, err = t.LoadElement(nextNodeRef); err != nil {
+ return
+ }
+ }
+ currentNode = nextNode
+ if currentNode == nil {
+ return
+ }
+ } else {
+ // Go down
+ index--
+ if index < 0 {
+ break
+ }
+ }
+ }
+ }
+
+ // Where we have a left-most position that needs to be referenced!
+ for i := level; i >= 0; i-- {
+
+ didSomething := false
+
+ if newFirst || normallyInserted {
+
+ if t.StartLevels[i] == nil || bytes.Compare(t.StartLevels[i].Key, key) > 0 {
+ if i == 0 && t.StartLevels[i] != nil {
+ startLevelElement, err := t.LoadElement(t.StartLevels[i])
+ if err != nil {
+ return id, err
+ }
+ if startLevelElement != nil {
+ startLevelElement.Prev = elem.Reference()
+ if err = t.SaveElement(startLevelElement); err != nil {
+ return id, err
+ }
+ }
+ }
+ elem.Next[i] = t.StartLevels[i]
+ t.StartLevels[i] = elem.Reference()
+ t.HasChanges = true
+ }
+
+ // link the EndLevels to this element!
+ if elem.Next[i] == nil {
+ t.EndLevels[i] = elem.Reference()
+ t.HasChanges = true
+ }
+
+ didSomething = true
+ }
+
+ if newLast {
+ // Places the element after the very last element on this level!
+ // This is very important, so we are not linking the very first element (newFirst AND newLast) to itself!
+ if !newFirst {
+ if t.EndLevels[i] != nil {
+ endLevelElement, err := t.LoadElement(t.EndLevels[i])
+ if err != nil {
+ return id, err
+ }
+ if endLevelElement != nil {
+ endLevelElement.Next[i] = elem.Reference()
+ if err = t.SaveElement(endLevelElement); err != nil {
+ return id, err
+ }
+ }
+ }
+ if i == 0 {
+ elem.Prev = t.EndLevels[i]
+ }
+ t.EndLevels[i] = elem.Reference()
+ t.HasChanges = true
+ }
+
+ // Link the startLevels to this element!
+ if t.StartLevels[i] == nil || bytes.Compare(t.StartLevels[i].Key, key) > 0 {
+ t.StartLevels[i] = elem.Reference()
+ t.HasChanges = true
+ }
+
+ didSomething = true
+ }
+
+ if !didSomething {
+ break
+ }
+ }
+
+ if err = t.SaveElement(elem); err != nil {
+ return id, err
+ }
+ return id, nil
+
+}
+
+// GetSmallestNode returns the very first/smallest node in the skiplist.
+// GetSmallestNode runs in O(1)
+func (t *SkipList) GetSmallestNode() (*SkipListElement, error) {
+ return t.LoadElement(t.StartLevels[0])
+}
+
+// GetLargestNode returns the very last/largest node in the skiplist.
+// GetLargestNode runs in O(1)
+func (t *SkipList) GetLargestNode() (*SkipListElement, error) {
+ return t.LoadElement(t.EndLevels[0])
+}
+func (t *SkipList) GetLargestNodeReference() *SkipListElementReference {
+ return t.EndLevels[0]
+}
+
+// Next returns the next element based on the given node.
+// Next will loop around to the first node, if you call it on the last!
+func (t *SkipList) Next(e *SkipListElement) (*SkipListElement, error) {
+ if e.Next[0] == nil {
+ return t.LoadElement(t.StartLevels[0])
+ }
+ return t.LoadElement(e.Next[0])
+}
+
+// Prev returns the previous element based on the given node.
+// Prev will loop around to the last node, if you call it on the first!
+func (t *SkipList) Prev(e *SkipListElement) (*SkipListElement, error) {
+ if e.Prev == nil {
+ return t.LoadElement(t.EndLevels[0])
+ }
+ return t.LoadElement(e.Prev)
+}
+
+// ChangeValue can be used to change the actual value of a node in the skiplist
+// without the need of Deleting and reinserting the node again.
+// Be advised, that ChangeValue only works, if the actual key from ExtractKey() will stay the same!
+// ok is an indicator, wether the value is actually changed.
+func (t *SkipList) ChangeValue(e *SkipListElement, newValue []byte) (err error) {
+ // The key needs to stay correct, so this is very important!
+ e.Value = newValue
+ return t.SaveElement(e)
+}
+
+// String returns a string format of the skiplist. Useful to get a graphical overview and/or debugging.
+func (t *SkipList) println() {
+
+ print("start --> ")
+ for i, l := range t.StartLevels {
+ if l == nil {
+ break
+ }
+ if i > 0 {
+ print(" -> ")
+ }
+ next := "---"
+ if l != nil {
+ next = string(l.Key)
+ }
+ print(fmt.Sprintf("[%v]", next))
+ }
+ println()
+
+ nodeRef := t.StartLevels[0]
+ for nodeRef != nil {
+ print(fmt.Sprintf("%v: ", string(nodeRef.Key)))
+ node, _ := t.LoadElement(nodeRef)
+ if node == nil {
+ break
+ }
+ for i := 0; i <= int(node.Level); i++ {
+
+ l := node.Next[i]
+
+ next := "---"
+ if l != nil {
+ next = string(l.Key)
+ }
+
+ if i == 0 {
+ prev := "---"
+
+ if node.Prev != nil {
+ prev = string(node.Prev.Key)
+ }
+ print(fmt.Sprintf("[%v|%v]", prev, next))
+ } else {
+ print(fmt.Sprintf("[%v]", next))
+ }
+ if i < int(node.Level) {
+ print(" -> ")
+ }
+
+ }
+ nodeRef = node.Next[0]
+ println()
+ }
+
+ print("end --> ")
+ for i, l := range t.EndLevels {
+ if l == nil {
+ break
+ }
+ if i > 0 {
+ print(" -> ")
+ }
+ next := "---"
+ if l != nil {
+ next = string(l.Key)
+ }
+ print(fmt.Sprintf("[%v]", next))
+ }
+ println()
+}
diff --git a/weed/util/skiplist/skiplist.pb.go b/weed/util/skiplist/skiplist.pb.go
new file mode 100644
index 000000000..adb121bfc
--- /dev/null
+++ b/weed/util/skiplist/skiplist.pb.go
@@ -0,0 +1,438 @@
+// Code generated by protoc-gen-go. DO NOT EDIT.
+// versions:
+// protoc-gen-go v1.25.0
+// protoc v3.12.3
+// source: skiplist.proto
+
+package skiplist
+
+import (
+ proto "github.com/golang/protobuf/proto"
+ protoreflect "google.golang.org/protobuf/reflect/protoreflect"
+ protoimpl "google.golang.org/protobuf/runtime/protoimpl"
+ reflect "reflect"
+ sync "sync"
+)
+
+const (
+ // Verify that this generated code is sufficiently up-to-date.
+ _ = protoimpl.EnforceVersion(20 - protoimpl.MinVersion)
+ // Verify that runtime/protoimpl is sufficiently up-to-date.
+ _ = protoimpl.EnforceVersion(protoimpl.MaxVersion - 20)
+)
+
+// This is a compile-time assertion that a sufficiently up-to-date version
+// of the legacy proto package is being used.
+const _ = proto.ProtoPackageIsVersion4
+
+type SkipListProto struct {
+ state protoimpl.MessageState
+ sizeCache protoimpl.SizeCache
+ unknownFields protoimpl.UnknownFields
+
+ StartLevels []*SkipListElementReference `protobuf:"bytes,1,rep,name=start_levels,json=startLevels,proto3" json:"start_levels,omitempty"`
+ EndLevels []*SkipListElementReference `protobuf:"bytes,2,rep,name=end_levels,json=endLevels,proto3" json:"end_levels,omitempty"`
+ MaxNewLevel int32 `protobuf:"varint,3,opt,name=max_new_level,json=maxNewLevel,proto3" json:"max_new_level,omitempty"`
+ MaxLevel int32 `protobuf:"varint,4,opt,name=max_level,json=maxLevel,proto3" json:"max_level,omitempty"`
+}
+
+func (x *SkipListProto) Reset() {
+ *x = SkipListProto{}
+ if protoimpl.UnsafeEnabled {
+ mi := &file_skiplist_proto_msgTypes[0]
+ ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
+ ms.StoreMessageInfo(mi)
+ }
+}
+
+func (x *SkipListProto) String() string {
+ return protoimpl.X.MessageStringOf(x)
+}
+
+func (*SkipListProto) ProtoMessage() {}
+
+func (x *SkipListProto) ProtoReflect() protoreflect.Message {
+ mi := &file_skiplist_proto_msgTypes[0]
+ if protoimpl.UnsafeEnabled && x != nil {
+ ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
+ if ms.LoadMessageInfo() == nil {
+ ms.StoreMessageInfo(mi)
+ }
+ return ms
+ }
+ return mi.MessageOf(x)
+}
+
+// Deprecated: Use SkipListProto.ProtoReflect.Descriptor instead.
+func (*SkipListProto) Descriptor() ([]byte, []int) {
+ return file_skiplist_proto_rawDescGZIP(), []int{0}
+}
+
+func (x *SkipListProto) GetStartLevels() []*SkipListElementReference {
+ if x != nil {
+ return x.StartLevels
+ }
+ return nil
+}
+
+func (x *SkipListProto) GetEndLevels() []*SkipListElementReference {
+ if x != nil {
+ return x.EndLevels
+ }
+ return nil
+}
+
+func (x *SkipListProto) GetMaxNewLevel() int32 {
+ if x != nil {
+ return x.MaxNewLevel
+ }
+ return 0
+}
+
+func (x *SkipListProto) GetMaxLevel() int32 {
+ if x != nil {
+ return x.MaxLevel
+ }
+ return 0
+}
+
+type SkipListElementReference struct {
+ state protoimpl.MessageState
+ sizeCache protoimpl.SizeCache
+ unknownFields protoimpl.UnknownFields
+
+ ElementPointer int64 `protobuf:"varint,1,opt,name=element_pointer,json=elementPointer,proto3" json:"element_pointer,omitempty"`
+ Key []byte `protobuf:"bytes,2,opt,name=key,proto3" json:"key,omitempty"`
+}
+
+func (x *SkipListElementReference) Reset() {
+ *x = SkipListElementReference{}
+ if protoimpl.UnsafeEnabled {
+ mi := &file_skiplist_proto_msgTypes[1]
+ ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
+ ms.StoreMessageInfo(mi)
+ }
+}
+
+func (x *SkipListElementReference) String() string {
+ return protoimpl.X.MessageStringOf(x)
+}
+
+func (*SkipListElementReference) ProtoMessage() {}
+
+func (x *SkipListElementReference) ProtoReflect() protoreflect.Message {
+ mi := &file_skiplist_proto_msgTypes[1]
+ if protoimpl.UnsafeEnabled && x != nil {
+ ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
+ if ms.LoadMessageInfo() == nil {
+ ms.StoreMessageInfo(mi)
+ }
+ return ms
+ }
+ return mi.MessageOf(x)
+}
+
+// Deprecated: Use SkipListElementReference.ProtoReflect.Descriptor instead.
+func (*SkipListElementReference) Descriptor() ([]byte, []int) {
+ return file_skiplist_proto_rawDescGZIP(), []int{1}
+}
+
+func (x *SkipListElementReference) GetElementPointer() int64 {
+ if x != nil {
+ return x.ElementPointer
+ }
+ return 0
+}
+
+func (x *SkipListElementReference) GetKey() []byte {
+ if x != nil {
+ return x.Key
+ }
+ return nil
+}
+
+type SkipListElement struct {
+ state protoimpl.MessageState
+ sizeCache protoimpl.SizeCache
+ unknownFields protoimpl.UnknownFields
+
+ Id int64 `protobuf:"varint,1,opt,name=id,proto3" json:"id,omitempty"`
+ Next []*SkipListElementReference `protobuf:"bytes,2,rep,name=next,proto3" json:"next,omitempty"`
+ Level int32 `protobuf:"varint,3,opt,name=level,proto3" json:"level,omitempty"`
+ Key []byte `protobuf:"bytes,4,opt,name=key,proto3" json:"key,omitempty"`
+ Value []byte `protobuf:"bytes,5,opt,name=value,proto3" json:"value,omitempty"`
+ Prev *SkipListElementReference `protobuf:"bytes,6,opt,name=prev,proto3" json:"prev,omitempty"`
+}
+
+func (x *SkipListElement) Reset() {
+ *x = SkipListElement{}
+ if protoimpl.UnsafeEnabled {
+ mi := &file_skiplist_proto_msgTypes[2]
+ ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
+ ms.StoreMessageInfo(mi)
+ }
+}
+
+func (x *SkipListElement) String() string {
+ return protoimpl.X.MessageStringOf(x)
+}
+
+func (*SkipListElement) ProtoMessage() {}
+
+func (x *SkipListElement) ProtoReflect() protoreflect.Message {
+ mi := &file_skiplist_proto_msgTypes[2]
+ if protoimpl.UnsafeEnabled && x != nil {
+ ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
+ if ms.LoadMessageInfo() == nil {
+ ms.StoreMessageInfo(mi)
+ }
+ return ms
+ }
+ return mi.MessageOf(x)
+}
+
+// Deprecated: Use SkipListElement.ProtoReflect.Descriptor instead.
+func (*SkipListElement) Descriptor() ([]byte, []int) {
+ return file_skiplist_proto_rawDescGZIP(), []int{2}
+}
+
+func (x *SkipListElement) GetId() int64 {
+ if x != nil {
+ return x.Id
+ }
+ return 0
+}
+
+func (x *SkipListElement) GetNext() []*SkipListElementReference {
+ if x != nil {
+ return x.Next
+ }
+ return nil
+}
+
+func (x *SkipListElement) GetLevel() int32 {
+ if x != nil {
+ return x.Level
+ }
+ return 0
+}
+
+func (x *SkipListElement) GetKey() []byte {
+ if x != nil {
+ return x.Key
+ }
+ return nil
+}
+
+func (x *SkipListElement) GetValue() []byte {
+ if x != nil {
+ return x.Value
+ }
+ return nil
+}
+
+func (x *SkipListElement) GetPrev() *SkipListElementReference {
+ if x != nil {
+ return x.Prev
+ }
+ return nil
+}
+
+type NameBatchData struct {
+ state protoimpl.MessageState
+ sizeCache protoimpl.SizeCache
+ unknownFields protoimpl.UnknownFields
+
+ Names [][]byte `protobuf:"bytes,1,rep,name=names,proto3" json:"names,omitempty"`
+}
+
+func (x *NameBatchData) Reset() {
+ *x = NameBatchData{}
+ if protoimpl.UnsafeEnabled {
+ mi := &file_skiplist_proto_msgTypes[3]
+ ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
+ ms.StoreMessageInfo(mi)
+ }
+}
+
+func (x *NameBatchData) String() string {
+ return protoimpl.X.MessageStringOf(x)
+}
+
+func (*NameBatchData) ProtoMessage() {}
+
+func (x *NameBatchData) ProtoReflect() protoreflect.Message {
+ mi := &file_skiplist_proto_msgTypes[3]
+ if protoimpl.UnsafeEnabled && x != nil {
+ ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
+ if ms.LoadMessageInfo() == nil {
+ ms.StoreMessageInfo(mi)
+ }
+ return ms
+ }
+ return mi.MessageOf(x)
+}
+
+// Deprecated: Use NameBatchData.ProtoReflect.Descriptor instead.
+func (*NameBatchData) Descriptor() ([]byte, []int) {
+ return file_skiplist_proto_rawDescGZIP(), []int{3}
+}
+
+func (x *NameBatchData) GetNames() [][]byte {
+ if x != nil {
+ return x.Names
+ }
+ return nil
+}
+
+var File_skiplist_proto protoreflect.FileDescriptor
+
+var file_skiplist_proto_rawDesc = []byte{
+ 0x0a, 0x0e, 0x73, 0x6b, 0x69, 0x70, 0x6c, 0x69, 0x73, 0x74, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f,
+ 0x12, 0x08, 0x73, 0x6b, 0x69, 0x70, 0x6c, 0x69, 0x73, 0x74, 0x22, 0xda, 0x01, 0x0a, 0x0d, 0x53,
+ 0x6b, 0x69, 0x70, 0x4c, 0x69, 0x73, 0x74, 0x50, 0x72, 0x6f, 0x74, 0x6f, 0x12, 0x45, 0x0a, 0x0c,
+ 0x73, 0x74, 0x61, 0x72, 0x74, 0x5f, 0x6c, 0x65, 0x76, 0x65, 0x6c, 0x73, 0x18, 0x01, 0x20, 0x03,
+ 0x28, 0x0b, 0x32, 0x22, 0x2e, 0x73, 0x6b, 0x69, 0x70, 0x6c, 0x69, 0x73, 0x74, 0x2e, 0x53, 0x6b,
+ 0x69, 0x70, 0x4c, 0x69, 0x73, 0x74, 0x45, 0x6c, 0x65, 0x6d, 0x65, 0x6e, 0x74, 0x52, 0x65, 0x66,
+ 0x65, 0x72, 0x65, 0x6e, 0x63, 0x65, 0x52, 0x0b, 0x73, 0x74, 0x61, 0x72, 0x74, 0x4c, 0x65, 0x76,
+ 0x65, 0x6c, 0x73, 0x12, 0x41, 0x0a, 0x0a, 0x65, 0x6e, 0x64, 0x5f, 0x6c, 0x65, 0x76, 0x65, 0x6c,
+ 0x73, 0x18, 0x02, 0x20, 0x03, 0x28, 0x0b, 0x32, 0x22, 0x2e, 0x73, 0x6b, 0x69, 0x70, 0x6c, 0x69,
+ 0x73, 0x74, 0x2e, 0x53, 0x6b, 0x69, 0x70, 0x4c, 0x69, 0x73, 0x74, 0x45, 0x6c, 0x65, 0x6d, 0x65,
+ 0x6e, 0x74, 0x52, 0x65, 0x66, 0x65, 0x72, 0x65, 0x6e, 0x63, 0x65, 0x52, 0x09, 0x65, 0x6e, 0x64,
+ 0x4c, 0x65, 0x76, 0x65, 0x6c, 0x73, 0x12, 0x22, 0x0a, 0x0d, 0x6d, 0x61, 0x78, 0x5f, 0x6e, 0x65,
+ 0x77, 0x5f, 0x6c, 0x65, 0x76, 0x65, 0x6c, 0x18, 0x03, 0x20, 0x01, 0x28, 0x05, 0x52, 0x0b, 0x6d,
+ 0x61, 0x78, 0x4e, 0x65, 0x77, 0x4c, 0x65, 0x76, 0x65, 0x6c, 0x12, 0x1b, 0x0a, 0x09, 0x6d, 0x61,
+ 0x78, 0x5f, 0x6c, 0x65, 0x76, 0x65, 0x6c, 0x18, 0x04, 0x20, 0x01, 0x28, 0x05, 0x52, 0x08, 0x6d,
+ 0x61, 0x78, 0x4c, 0x65, 0x76, 0x65, 0x6c, 0x22, 0x55, 0x0a, 0x18, 0x53, 0x6b, 0x69, 0x70, 0x4c,
+ 0x69, 0x73, 0x74, 0x45, 0x6c, 0x65, 0x6d, 0x65, 0x6e, 0x74, 0x52, 0x65, 0x66, 0x65, 0x72, 0x65,
+ 0x6e, 0x63, 0x65, 0x12, 0x27, 0x0a, 0x0f, 0x65, 0x6c, 0x65, 0x6d, 0x65, 0x6e, 0x74, 0x5f, 0x70,
+ 0x6f, 0x69, 0x6e, 0x74, 0x65, 0x72, 0x18, 0x01, 0x20, 0x01, 0x28, 0x03, 0x52, 0x0e, 0x65, 0x6c,
+ 0x65, 0x6d, 0x65, 0x6e, 0x74, 0x50, 0x6f, 0x69, 0x6e, 0x74, 0x65, 0x72, 0x12, 0x10, 0x0a, 0x03,
+ 0x6b, 0x65, 0x79, 0x18, 0x02, 0x20, 0x01, 0x28, 0x0c, 0x52, 0x03, 0x6b, 0x65, 0x79, 0x22, 0xcf,
+ 0x01, 0x0a, 0x0f, 0x53, 0x6b, 0x69, 0x70, 0x4c, 0x69, 0x73, 0x74, 0x45, 0x6c, 0x65, 0x6d, 0x65,
+ 0x6e, 0x74, 0x12, 0x0e, 0x0a, 0x02, 0x69, 0x64, 0x18, 0x01, 0x20, 0x01, 0x28, 0x03, 0x52, 0x02,
+ 0x69, 0x64, 0x12, 0x36, 0x0a, 0x04, 0x6e, 0x65, 0x78, 0x74, 0x18, 0x02, 0x20, 0x03, 0x28, 0x0b,
+ 0x32, 0x22, 0x2e, 0x73, 0x6b, 0x69, 0x70, 0x6c, 0x69, 0x73, 0x74, 0x2e, 0x53, 0x6b, 0x69, 0x70,
+ 0x4c, 0x69, 0x73, 0x74, 0x45, 0x6c, 0x65, 0x6d, 0x65, 0x6e, 0x74, 0x52, 0x65, 0x66, 0x65, 0x72,
+ 0x65, 0x6e, 0x63, 0x65, 0x52, 0x04, 0x6e, 0x65, 0x78, 0x74, 0x12, 0x14, 0x0a, 0x05, 0x6c, 0x65,
+ 0x76, 0x65, 0x6c, 0x18, 0x03, 0x20, 0x01, 0x28, 0x05, 0x52, 0x05, 0x6c, 0x65, 0x76, 0x65, 0x6c,
+ 0x12, 0x10, 0x0a, 0x03, 0x6b, 0x65, 0x79, 0x18, 0x04, 0x20, 0x01, 0x28, 0x0c, 0x52, 0x03, 0x6b,
+ 0x65, 0x79, 0x12, 0x14, 0x0a, 0x05, 0x76, 0x61, 0x6c, 0x75, 0x65, 0x18, 0x05, 0x20, 0x01, 0x28,
+ 0x0c, 0x52, 0x05, 0x76, 0x61, 0x6c, 0x75, 0x65, 0x12, 0x36, 0x0a, 0x04, 0x70, 0x72, 0x65, 0x76,
+ 0x18, 0x06, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x22, 0x2e, 0x73, 0x6b, 0x69, 0x70, 0x6c, 0x69, 0x73,
+ 0x74, 0x2e, 0x53, 0x6b, 0x69, 0x70, 0x4c, 0x69, 0x73, 0x74, 0x45, 0x6c, 0x65, 0x6d, 0x65, 0x6e,
+ 0x74, 0x52, 0x65, 0x66, 0x65, 0x72, 0x65, 0x6e, 0x63, 0x65, 0x52, 0x04, 0x70, 0x72, 0x65, 0x76,
+ 0x22, 0x25, 0x0a, 0x0d, 0x4e, 0x61, 0x6d, 0x65, 0x42, 0x61, 0x74, 0x63, 0x68, 0x44, 0x61, 0x74,
+ 0x61, 0x12, 0x14, 0x0a, 0x05, 0x6e, 0x61, 0x6d, 0x65, 0x73, 0x18, 0x01, 0x20, 0x03, 0x28, 0x0c,
+ 0x52, 0x05, 0x6e, 0x61, 0x6d, 0x65, 0x73, 0x42, 0x33, 0x5a, 0x31, 0x67, 0x69, 0x74, 0x68, 0x75,
+ 0x62, 0x2e, 0x63, 0x6f, 0x6d, 0x2f, 0x63, 0x68, 0x72, 0x69, 0x73, 0x6c, 0x75, 0x73, 0x66, 0x2f,
+ 0x73, 0x65, 0x61, 0x77, 0x65, 0x65, 0x64, 0x66, 0x73, 0x2f, 0x77, 0x65, 0x65, 0x64, 0x2f, 0x75,
+ 0x74, 0x69, 0x6c, 0x2f, 0x73, 0x6b, 0x69, 0x70, 0x6c, 0x69, 0x73, 0x74, 0x62, 0x06, 0x70, 0x72,
+ 0x6f, 0x74, 0x6f, 0x33,
+}
+
+var (
+ file_skiplist_proto_rawDescOnce sync.Once
+ file_skiplist_proto_rawDescData = file_skiplist_proto_rawDesc
+)
+
+func file_skiplist_proto_rawDescGZIP() []byte {
+ file_skiplist_proto_rawDescOnce.Do(func() {
+ file_skiplist_proto_rawDescData = protoimpl.X.CompressGZIP(file_skiplist_proto_rawDescData)
+ })
+ return file_skiplist_proto_rawDescData
+}
+
+var file_skiplist_proto_msgTypes = make([]protoimpl.MessageInfo, 4)
+var file_skiplist_proto_goTypes = []interface{}{
+ (*SkipListProto)(nil), // 0: skiplist.SkipListProto
+ (*SkipListElementReference)(nil), // 1: skiplist.SkipListElementReference
+ (*SkipListElement)(nil), // 2: skiplist.SkipListElement
+ (*NameBatchData)(nil), // 3: skiplist.NameBatchData
+}
+var file_skiplist_proto_depIdxs = []int32{
+ 1, // 0: skiplist.SkipListProto.start_levels:type_name -> skiplist.SkipListElementReference
+ 1, // 1: skiplist.SkipListProto.end_levels:type_name -> skiplist.SkipListElementReference
+ 1, // 2: skiplist.SkipListElement.next:type_name -> skiplist.SkipListElementReference
+ 1, // 3: skiplist.SkipListElement.prev:type_name -> skiplist.SkipListElementReference
+ 4, // [4:4] is the sub-list for method output_type
+ 4, // [4:4] is the sub-list for method input_type
+ 4, // [4:4] is the sub-list for extension type_name
+ 4, // [4:4] is the sub-list for extension extendee
+ 0, // [0:4] is the sub-list for field type_name
+}
+
+func init() { file_skiplist_proto_init() }
+func file_skiplist_proto_init() {
+ if File_skiplist_proto != nil {
+ return
+ }
+ if !protoimpl.UnsafeEnabled {
+ file_skiplist_proto_msgTypes[0].Exporter = func(v interface{}, i int) interface{} {
+ switch v := v.(*SkipListProto); i {
+ case 0:
+ return &v.state
+ case 1:
+ return &v.sizeCache
+ case 2:
+ return &v.unknownFields
+ default:
+ return nil
+ }
+ }
+ file_skiplist_proto_msgTypes[1].Exporter = func(v interface{}, i int) interface{} {
+ switch v := v.(*SkipListElementReference); i {
+ case 0:
+ return &v.state
+ case 1:
+ return &v.sizeCache
+ case 2:
+ return &v.unknownFields
+ default:
+ return nil
+ }
+ }
+ file_skiplist_proto_msgTypes[2].Exporter = func(v interface{}, i int) interface{} {
+ switch v := v.(*SkipListElement); i {
+ case 0:
+ return &v.state
+ case 1:
+ return &v.sizeCache
+ case 2:
+ return &v.unknownFields
+ default:
+ return nil
+ }
+ }
+ file_skiplist_proto_msgTypes[3].Exporter = func(v interface{}, i int) interface{} {
+ switch v := v.(*NameBatchData); i {
+ case 0:
+ return &v.state
+ case 1:
+ return &v.sizeCache
+ case 2:
+ return &v.unknownFields
+ default:
+ return nil
+ }
+ }
+ }
+ type x struct{}
+ out := protoimpl.TypeBuilder{
+ File: protoimpl.DescBuilder{
+ GoPackagePath: reflect.TypeOf(x{}).PkgPath(),
+ RawDescriptor: file_skiplist_proto_rawDesc,
+ NumEnums: 0,
+ NumMessages: 4,
+ NumExtensions: 0,
+ NumServices: 0,
+ },
+ GoTypes: file_skiplist_proto_goTypes,
+ DependencyIndexes: file_skiplist_proto_depIdxs,
+ MessageInfos: file_skiplist_proto_msgTypes,
+ }.Build()
+ File_skiplist_proto = out.File
+ file_skiplist_proto_rawDesc = nil
+ file_skiplist_proto_goTypes = nil
+ file_skiplist_proto_depIdxs = nil
+}
diff --git a/weed/util/skiplist/skiplist.proto b/weed/util/skiplist/skiplist.proto
new file mode 100644
index 000000000..2991ad830
--- /dev/null
+++ b/weed/util/skiplist/skiplist.proto
@@ -0,0 +1,30 @@
+syntax = "proto3";
+
+package skiplist;
+
+option go_package = "github.com/chrislusf/seaweedfs/weed/util/skiplist";
+
+message SkipListProto {
+ repeated SkipListElementReference start_levels = 1;
+ repeated SkipListElementReference end_levels = 2;
+ int32 max_new_level = 3;
+ int32 max_level = 4;
+}
+
+message SkipListElementReference {
+ int64 element_pointer = 1;
+ bytes key = 2;
+}
+
+message SkipListElement {
+ int64 id = 1;
+ repeated SkipListElementReference next = 2;
+ int32 level = 3;
+ bytes key = 4;
+ bytes value = 5;
+ SkipListElementReference prev = 6;
+}
+
+message NameBatchData {
+ repeated bytes names = 1;
+} \ No newline at end of file
diff --git a/weed/util/skiplist/skiplist_serde.go b/weed/util/skiplist/skiplist_serde.go
new file mode 100644
index 000000000..a735f66fa
--- /dev/null
+++ b/weed/util/skiplist/skiplist_serde.go
@@ -0,0 +1,51 @@
+package skiplist
+
+import "bytes"
+
+func compareElement(a *SkipListElement, key []byte) int {
+ if len(a.Key) == 0 {
+ return -1
+ }
+ return bytes.Compare(a.Key, key)
+}
+
+func (node *SkipListElement) Reference() *SkipListElementReference {
+ if node == nil {
+ return nil
+ }
+ return &SkipListElementReference{
+ ElementPointer: node.Id,
+ Key: node.Key,
+ }
+}
+
+func (t *SkipList) SaveElement(element *SkipListElement) error {
+ if element == nil {
+ return nil
+ }
+ return t.ListStore.SaveElement(element.Id, element)
+}
+
+func (t *SkipList) DeleteElement(element *SkipListElement) error {
+ if element == nil {
+ return nil
+ }
+ return t.ListStore.DeleteElement(element.Id)
+}
+
+func (t *SkipList) LoadElement(ref *SkipListElementReference) (*SkipListElement, error) {
+ if ref.IsNil() {
+ return nil, nil
+ }
+ return t.ListStore.LoadElement(ref.ElementPointer)
+}
+
+func (ref *SkipListElementReference) IsNil() bool {
+ if ref == nil {
+ return true
+ }
+ if len(ref.Key) == 0 {
+ return true
+ }
+ return false
+}
diff --git a/weed/util/skiplist/skiplist_test.go b/weed/util/skiplist/skiplist_test.go
new file mode 100644
index 000000000..5b36cacbd
--- /dev/null
+++ b/weed/util/skiplist/skiplist_test.go
@@ -0,0 +1,290 @@
+package skiplist
+
+import (
+ "bytes"
+ "math/rand"
+ "strconv"
+ "testing"
+)
+
+const (
+ maxN = 10000
+)
+
+var (
+ memStore = newMemStore()
+)
+
+func TestReverseInsert(t *testing.T) {
+ list := NewSeed(100, memStore)
+
+ list.InsertByKey([]byte("zzz"), 0, []byte("zzz"))
+ list.DeleteByKey([]byte("zzz"))
+
+ list.InsertByKey([]byte("aaa"), 0, []byte("aaa"))
+
+ if list.IsEmpty() {
+ t.Fail()
+ }
+
+}
+
+func TestInsertAndFind(t *testing.T) {
+
+ k0 := []byte("0")
+ var list *SkipList
+
+ var listPointer *SkipList
+ listPointer.InsertByKey(k0, 0, k0)
+ if _, _, ok, _ := listPointer.Find(k0); ok {
+ t.Fail()
+ }
+
+ list = New(memStore)
+ if _, _, ok, _ := list.Find(k0); ok {
+ t.Fail()
+ }
+ if !list.IsEmpty() {
+ t.Fail()
+ }
+
+ // Test at the beginning of the list.
+ for i := 0; i < maxN; i++ {
+ key := []byte(strconv.Itoa(maxN - i))
+ list.InsertByKey(key, 0, key)
+ }
+ for i := 0; i < maxN; i++ {
+ key := []byte(strconv.Itoa(maxN - i))
+ if _, _, ok, _ := list.Find(key); !ok {
+ t.Fail()
+ }
+ }
+
+ list = New(memStore)
+ // Test at the end of the list.
+ for i := 0; i < maxN; i++ {
+ key := []byte(strconv.Itoa(i))
+ list.InsertByKey(key, 0, key)
+ }
+ for i := 0; i < maxN; i++ {
+ key := []byte(strconv.Itoa(i))
+ if _, _, ok, _ := list.Find(key); !ok {
+ t.Fail()
+ }
+ }
+
+ list = New(memStore)
+ // Test at random positions in the list.
+ rList := rand.Perm(maxN)
+ for _, e := range rList {
+ key := []byte(strconv.Itoa(e))
+ // println("insert", e)
+ list.InsertByKey(key, 0, key)
+ }
+ for _, e := range rList {
+ key := []byte(strconv.Itoa(e))
+ // println("find", e)
+ if _, _, ok, _ := list.Find(key); !ok {
+ t.Fail()
+ }
+ }
+ // println("print list")
+ list.println()
+
+}
+
+func Element(x int) []byte {
+ return []byte(strconv.Itoa(x))
+}
+
+func TestDelete(t *testing.T) {
+
+ k0 := []byte("0")
+
+ var list *SkipList
+
+ // Delete on empty list
+ list.DeleteByKey(k0)
+
+ list = New(memStore)
+
+ list.DeleteByKey(k0)
+ if !list.IsEmpty() {
+ t.Fail()
+ }
+
+ list.InsertByKey(k0, 0, k0)
+ list.DeleteByKey(k0)
+ if !list.IsEmpty() {
+ t.Fail()
+ }
+
+ // Delete elements at the beginning of the list.
+ for i := 0; i < maxN; i++ {
+ list.InsertByKey(Element(i), 0, Element(i))
+ }
+ for i := 0; i < maxN; i++ {
+ list.DeleteByKey(Element(i))
+ }
+ if !list.IsEmpty() {
+ t.Fail()
+ }
+
+ list = New(memStore)
+ // Delete elements at the end of the list.
+ for i := 0; i < maxN; i++ {
+ list.InsertByKey(Element(i), 0, Element(i))
+ }
+ for i := 0; i < maxN; i++ {
+ list.DeleteByKey(Element(maxN - i - 1))
+ }
+ if !list.IsEmpty() {
+ t.Fail()
+ }
+
+ list = New(memStore)
+ // Delete elements at random positions in the list.
+ rList := rand.Perm(maxN)
+ for _, e := range rList {
+ list.InsertByKey(Element(e), 0, Element(e))
+ }
+ for _, e := range rList {
+ list.DeleteByKey(Element(e))
+ }
+ if !list.IsEmpty() {
+ t.Fail()
+ }
+}
+
+func TestNext(t *testing.T) {
+ list := New(memStore)
+
+ for i := 0; i < maxN; i++ {
+ list.InsertByKey(Element(i), 0, Element(i))
+ }
+
+ smallest, _ := list.GetSmallestNode()
+ largest, _ := list.GetLargestNode()
+
+ lastNode := smallest
+ node := lastNode
+ for node != largest {
+ node, _ = list.Next(node)
+ // Must always be incrementing here!
+ if bytes.Compare(node.Key, lastNode.Key) <= 0 {
+ t.Fail()
+ }
+ // Next.Prev must always point to itself!
+ prevNode, _ := list.Prev(node)
+ nextNode, _ := list.Next(prevNode)
+ if nextNode != node {
+ t.Fail()
+ }
+ lastNode = node
+ }
+
+ if nextNode, _ := list.Next(largest); nextNode != smallest {
+ t.Fail()
+ }
+}
+
+func TestPrev(t *testing.T) {
+ list := New(memStore)
+
+ for i := 0; i < maxN; i++ {
+ list.InsertByKey(Element(i), 0, Element(i))
+ }
+
+ smallest, _ := list.GetSmallestNode()
+ largest, _ := list.GetLargestNode()
+
+ lastNode := largest
+ node := lastNode
+ for node != smallest {
+ node, _ = list.Prev(node)
+ // Must always be incrementing here!
+ if bytes.Compare(node.Key, lastNode.Key) >= 0 {
+ t.Fail()
+ }
+ // Next.Prev must always point to itself!
+ nextNode, _ := list.Next(node)
+ prevNode, _ := list.Prev(nextNode)
+ if prevNode != node {
+ t.Fail()
+ }
+ lastNode = node
+ }
+
+ if prevNode, _ := list.Prev(smallest); prevNode != largest {
+ t.Fail()
+ }
+}
+
+func TestFindGreaterOrEqual(t *testing.T) {
+
+ maxNumber := maxN * 100
+
+ var list *SkipList
+ var listPointer *SkipList
+
+ // Test on empty list.
+ if _, _, ok, _ := listPointer.FindGreaterOrEqual(Element(0)); ok {
+ t.Errorf("found element 0 in an empty list")
+ }
+
+ list = New(memStore)
+
+ for i := 0; i < maxN; i++ {
+ list.InsertByKey(Element(rand.Intn(maxNumber)), 0, Element(i))
+ }
+
+ for i := 0; i < maxN; i++ {
+ key := Element(rand.Intn(maxNumber))
+ if _, v, ok, _ := list.FindGreaterOrEqual(key); ok {
+ // if f is v should be bigger than the element before
+ if v.Prev != nil && bytes.Compare(key, v.Prev.Key) < 0 {
+ t.Errorf("PrevV: %s\n key: %s\n\n", string(v.Prev.Key), string(key))
+ }
+ // v should be bigger or equal to f
+ // If we compare directly, we get an equal key with a difference on the 10th decimal point, which fails.
+ if bytes.Compare(v.Key, key) < 0 {
+ t.Errorf("v: %s\n key: %s\n\n", string(v.Key), string(key))
+ }
+ } else {
+ lastNode, _ := list.GetLargestNode()
+ lastV := lastNode.GetValue()
+ // It is OK, to fail, as long as f is bigger than the last element.
+ if bytes.Compare(key, lastV) <= 0 {
+ t.Errorf("lastV: %s\n key: %s\n\n", string(lastV), string(key))
+ }
+ }
+ }
+
+}
+
+func TestChangeValue(t *testing.T) {
+ list := New(memStore)
+
+ for i := 0; i < maxN; i++ {
+ list.InsertByKey(Element(i), 0, []byte("value"))
+ }
+
+ for i := 0; i < maxN; i++ {
+ // The key only looks at the int so the string doesn't matter here!
+ _, f1, ok, _ := list.Find(Element(i))
+ if !ok {
+ t.Fail()
+ }
+ err := list.ChangeValue(f1, []byte("different value"))
+ if err != nil {
+ t.Fail()
+ }
+ _, f2, ok, _ := list.Find(Element(i))
+ if !ok {
+ t.Fail()
+ }
+ if bytes.Compare(f2.GetValue(), []byte("different value")) != 0 {
+ t.Fail()
+ }
+ }
+}