diff options
| author | shibinbin <shibinbin@megvii.com> | 2020-10-28 11:36:42 +0800 |
|---|---|---|
| committer | shibinbin <shibinbin@megvii.com> | 2020-10-28 11:36:42 +0800 |
| commit | 7cc07655d493d11c967cfa978ddc5181d4b6b861 (patch) | |
| tree | 5ae5bcf7ccc3cee3c55372674753d7c1ca48dff9 /weed/util | |
| parent | 29a4c3944eeb07434060df52dfb1d3cf4c59dc91 (diff) | |
| parent | 53c3aad87528d57343afc5fdb3fb5107544af0fc (diff) | |
| download | seaweedfs-7cc07655d493d11c967cfa978ddc5181d4b6b861.tar.xz seaweedfs-7cc07655d493d11c967cfa978ddc5181d4b6b861.zip | |
Merge remote-tracking branch 'upstream/master'
Diffstat (limited to 'weed/util')
| -rw-r--r-- | weed/util/bounded_tree/bounded_tree.go | 187 | ||||
| -rw-r--r-- | weed/util/bounded_tree/bounded_tree_test.go | 126 | ||||
| -rw-r--r-- | weed/util/bytes.go | 51 | ||||
| -rw-r--r-- | weed/util/chunk_cache/chunk_cache.go | 70 | ||||
| -rw-r--r-- | weed/util/chunk_cache/chunk_cache_on_disk.go | 6 | ||||
| -rw-r--r-- | weed/util/chunk_cache/chunk_cache_on_disk_test.go | 51 | ||||
| -rw-r--r-- | weed/util/chunk_cache/on_disk_cache_layer.go | 12 | ||||
| -rw-r--r-- | weed/util/compression.go | 93 | ||||
| -rw-r--r-- | weed/util/config.go | 6 | ||||
| -rw-r--r-- | weed/util/constants.go | 2 | ||||
| -rw-r--r-- | weed/util/file_util.go | 20 | ||||
| -rw-r--r-- | weed/util/fullpath.go | 2 | ||||
| -rw-r--r-- | weed/util/http_util.go | 103 | ||||
| -rw-r--r-- | weed/util/limiter.go | 40 | ||||
| -rw-r--r-- | weed/util/log_buffer/log_buffer.go | 36 | ||||
| -rw-r--r-- | weed/util/log_buffer/log_buffer_test.go | 2 | ||||
| -rw-r--r-- | weed/util/log_buffer/log_read.go | 20 | ||||
| -rw-r--r-- | weed/util/net_timeout.go | 3 |
18 files changed, 714 insertions, 116 deletions
diff --git a/weed/util/bounded_tree/bounded_tree.go b/weed/util/bounded_tree/bounded_tree.go new file mode 100644 index 000000000..0e8af2520 --- /dev/null +++ b/weed/util/bounded_tree/bounded_tree.go @@ -0,0 +1,187 @@ +package bounded_tree + +import ( + "sync" + + "github.com/chrislusf/seaweedfs/weed/glog" + "github.com/chrislusf/seaweedfs/weed/util" +) + +type Node struct { + Parent *Node + Name string + Children map[string]*Node +} + +type BoundedTree struct { + root *Node + sync.RWMutex + baseDir util.FullPath +} + +func NewBoundedTree(baseDir util.FullPath) *BoundedTree { + return &BoundedTree{ + root: &Node{ + Name: "/", + }, + baseDir: baseDir, + } +} + +type VisitNodeFunc func(path util.FullPath) (childDirectories []string, err error) + +// If the path is not visited, call the visitFn for each level of directory +// No action if the directory has been visited before or does not exist. +// A leaf node, which has no children, represents a directory not visited. +// A non-leaf node or a non-existing node represents a directory already visited, or does not need to visit. +func (t *BoundedTree) EnsureVisited(p util.FullPath, visitFn VisitNodeFunc) (visitErr error) { + t.Lock() + defer t.Unlock() + + if t.root == nil { + return + } + if t.baseDir != "/" { + p = p[len(t.baseDir):] + } + components := p.Split() + // fmt.Printf("components %v %d\n", components, len(components)) + canDelete, err := t.ensureVisited(t.root, t.baseDir, components, 0, visitFn) + if err != nil { + return err + } + if canDelete { + t.root = nil + } + return nil +} + +func (t *BoundedTree) ensureVisited(n *Node, currentPath util.FullPath, components []string, i int, visitFn VisitNodeFunc) (canDeleteNode bool, visitErr error) { + + // println("ensureVisited", currentPath, i) + + if n == nil { + // fmt.Printf("%s null\n", currentPath) + return + } + + if n.isVisited() { + // fmt.Printf("%s visited %v\n", currentPath, n.Name) + } else { + // fmt.Printf("ensure %v\n", currentPath) + + filerPath := currentPath + if t.baseDir != "/" { + filerPath = t.baseDir + filerPath + } + + children, err := visitFn(filerPath) + if err != nil { + glog.V(0).Infof("failed to visit %s: %v", currentPath, err) + return false, err + } + + if len(children) == 0 { + // fmt.Printf(" canDelete %v without children\n", currentPath) + return true, nil + } + + n.Children = make(map[string]*Node) + for _, child := range children { + // fmt.Printf(" add child %v %v\n", currentPath, child) + n.Children[child] = &Node{ + Name: child, + } + } + } + + if i >= len(components) { + return + } + + // fmt.Printf(" check child %v %v\n", currentPath, components[i]) + + toVisitNode, found := n.Children[components[i]] + if !found { + // fmt.Printf(" did not find child %v %v\n", currentPath, components[i]) + return + } + + // fmt.Printf(" ensureVisited %v %v\n", currentPath, toVisitNode.Name) + canDelete, childVisitErr := t.ensureVisited(toVisitNode, currentPath.Child(components[i]), components, i+1, visitFn) + if childVisitErr != nil { + return false, childVisitErr + } + if canDelete { + + // fmt.Printf(" delete %v %v\n", currentPath, components[i]) + delete(n.Children, components[i]) + + if len(n.Children) == 0 { + // fmt.Printf(" canDelete %v\n", currentPath) + return true, nil + } + } + + return false, nil + +} + +func (n *Node) isVisited() bool { + if n == nil { + return true + } + if len(n.Children) > 0 { + return true + } + return false +} + +func (n *Node) getChild(childName string) *Node { + if n == nil { + return nil + } + if len(n.Children) > 0 { + return n.Children[childName] + } + return nil +} + +func (t *BoundedTree) HasVisited(p util.FullPath) bool { + + t.RLock() + defer t.RUnlock() + + if t.root == nil { + return true + } + + components := p.Split() + // fmt.Printf("components %v %d\n", components, len(components)) + return t.hasVisited(t.root, util.FullPath("/"), components, 0) +} + +func (t *BoundedTree) hasVisited(n *Node, currentPath util.FullPath, components []string, i int) bool { + + if n == nil { + return true + } + + if !n.isVisited() { + return false + } + + // fmt.Printf(" hasVisited child %v %+v %d\n", currentPath, components, i) + + if i >= len(components) { + return true + } + + toVisitNode, found := n.Children[components[i]] + if !found { + return true + } + + return t.hasVisited(toVisitNode, currentPath.Child(components[i]), components, i+1) + +} diff --git a/weed/util/bounded_tree/bounded_tree_test.go b/weed/util/bounded_tree/bounded_tree_test.go new file mode 100644 index 000000000..465f1cc9c --- /dev/null +++ b/weed/util/bounded_tree/bounded_tree_test.go @@ -0,0 +1,126 @@ +package bounded_tree + +import ( + "fmt" + "testing" + + "github.com/stretchr/testify/assert" + + "github.com/chrislusf/seaweedfs/weed/util" +) + +var ( + visitFn = func(path util.FullPath) (childDirectories []string, err error) { + fmt.Printf(" visit %v ...\n", path) + switch path { + case "/": + return []string{"a", "g", "h"}, nil + case "/a": + return []string{"b", "f"}, nil + case "/a/b": + return []string{"c", "e"}, nil + case "/a/b/c": + return []string{"d"}, nil + case "/a/b/c/d": + return []string{"i", "j"}, nil + case "/a/b/c/d/i": + return []string{}, nil + case "/a/b/c/d/j": + return []string{}, nil + case "/a/b/e": + return []string{}, nil + case "/a/f": + return []string{}, nil + } + return nil, nil + } + + printMap = func(m map[string]*Node) { + for k := range m { + println(" >", k) + } + } +) + +func TestBoundedTree(t *testing.T) { + + // a/b/c/d/i + // a/b/c/d/j + // a/b/c/d + // a/b/e + // a/f + // g + // h + + tree := NewBoundedTree(util.FullPath("/")) + + tree.EnsureVisited(util.FullPath("/a/b/c"), visitFn) + + assert.Equal(t, true, tree.HasVisited(util.FullPath("/a/b"))) + assert.Equal(t, true, tree.HasVisited(util.FullPath("/a/b/c"))) + assert.Equal(t, false, tree.HasVisited(util.FullPath("/a/b/c/d"))) + assert.Equal(t, false, tree.HasVisited(util.FullPath("/a/b/e"))) + assert.Equal(t, false, tree.HasVisited(util.FullPath("/a/f"))) + assert.Equal(t, false, tree.HasVisited(util.FullPath("/g"))) + assert.Equal(t, false, tree.HasVisited(util.FullPath("/h"))) + assert.Equal(t, true, tree.HasVisited(util.FullPath("/"))) + assert.Equal(t, true, tree.HasVisited(util.FullPath("/x"))) + assert.Equal(t, false, tree.HasVisited(util.FullPath("/a/b/e/x"))) + + printMap(tree.root.Children) + + a := tree.root.getChild("a") + + b := a.getChild("b") + if !b.isVisited() { + t.Errorf("expect visited /a/b") + } + c := b.getChild("c") + if !c.isVisited() { + t.Errorf("expect visited /a/b/c") + } + + d := c.getChild("d") + if d.isVisited() { + t.Errorf("expect unvisited /a/b/c/d") + } + + tree.EnsureVisited(util.FullPath("/a/b/c/d"), visitFn) + tree.EnsureVisited(util.FullPath("/a/b/c/d/i"), visitFn) + tree.EnsureVisited(util.FullPath("/a/b/c/d/j"), visitFn) + tree.EnsureVisited(util.FullPath("/a/b/e"), visitFn) + tree.EnsureVisited(util.FullPath("/a/f"), visitFn) + + printMap(tree.root.Children) + +} + +func TestEmptyBoundedTree(t *testing.T) { + + // g + // h + + tree := NewBoundedTree(util.FullPath("/")) + + visitFn := func(path util.FullPath) (childDirectories []string, err error) { + fmt.Printf(" visit %v ...\n", path) + switch path { + case "/": + return []string{"g", "h"}, nil + } + t.Fatalf("expected visit %s", path) + return nil, nil + } + + tree.EnsureVisited(util.FullPath("/a/b"), visitFn) + + tree.EnsureVisited(util.FullPath("/a/b"), visitFn) + + printMap(tree.root.Children) + + assert.Equal(t, true, tree.HasVisited(util.FullPath("/a/b"))) + assert.Equal(t, true, tree.HasVisited(util.FullPath("/a"))) + assert.Equal(t, false, tree.HasVisited(util.FullPath("/g"))) + assert.Equal(t, false, tree.HasVisited(util.FullPath("/g/x"))) + +} diff --git a/weed/util/bytes.go b/weed/util/bytes.go index 0650919c0..c2a4df108 100644 --- a/weed/util/bytes.go +++ b/weed/util/bytes.go @@ -1,7 +1,10 @@ package util import ( + "bytes" "crypto/md5" + "crypto/rand" + "encoding/base64" "fmt" "io" ) @@ -109,8 +112,52 @@ func HashToInt32(data []byte) (v int32) { return } -func Md5(data []byte) string { +func Base64Encode(data []byte) string { + return base64.StdEncoding.EncodeToString(data) +} + +func Base64Md5(data []byte) string { + return Base64Encode(Md5(data)) +} + +func Md5(data []byte) []byte { hash := md5.New() hash.Write(data) - return fmt.Sprintf("%x", hash.Sum(nil)) + return hash.Sum(nil) +} + +func Md5String(data []byte) string { + return fmt.Sprintf("%x", Md5(data)) +} + +func Base64Md5ToBytes(contentMd5 string) []byte { + data, err := base64.StdEncoding.DecodeString(contentMd5) + if err != nil { + return nil + } + return data +} + +func RandomInt32() int32 { + buf := make([]byte, 4) + rand.Read(buf) + return int32(BytesToUint32(buf)) +} + +func RandomBytes(byteCount int) []byte { + buf := make([]byte, byteCount) + rand.Read(buf) + return buf +} + +type BytesReader struct { + Bytes []byte + *bytes.Reader +} + +func NewBytesReader(b []byte) *BytesReader { + return &BytesReader{ + Bytes: b, + Reader: bytes.NewReader(b), + } } diff --git a/weed/util/chunk_cache/chunk_cache.go b/weed/util/chunk_cache/chunk_cache.go index e1d4b639f..3615aee0e 100644 --- a/weed/util/chunk_cache/chunk_cache.go +++ b/weed/util/chunk_cache/chunk_cache.go @@ -7,33 +7,38 @@ import ( "github.com/chrislusf/seaweedfs/weed/storage/needle" ) -const ( - memCacheSizeLimit = 1024 * 1024 - onDiskCacheSizeLimit0 = memCacheSizeLimit - onDiskCacheSizeLimit1 = 4 * memCacheSizeLimit -) +type ChunkCache interface { + GetChunk(fileId string, minSize uint64) (data []byte) + SetChunk(fileId string, data []byte) +} // a global cache for recently accessed file chunks -type ChunkCache struct { +type TieredChunkCache struct { memCache *ChunkCacheInMemory diskCaches []*OnDiskCacheLayer sync.RWMutex + onDiskCacheSizeLimit0 uint64 + onDiskCacheSizeLimit1 uint64 + onDiskCacheSizeLimit2 uint64 } -func NewChunkCache(maxEntries int64, dir string, diskSizeMB int64) *ChunkCache { +func NewTieredChunkCache(maxEntries int64, dir string, diskSizeInUnit int64, unitSize int64) *TieredChunkCache { - c := &ChunkCache{ + c := &TieredChunkCache{ memCache: NewChunkCacheInMemory(maxEntries), } c.diskCaches = make([]*OnDiskCacheLayer, 3) - c.diskCaches[0] = NewOnDiskCacheLayer(dir, "c0_1", diskSizeMB/4, 4) - c.diskCaches[1] = NewOnDiskCacheLayer(dir, "c1_4", diskSizeMB/4, 4) - c.diskCaches[2] = NewOnDiskCacheLayer(dir, "cache", diskSizeMB/2, 4) + c.onDiskCacheSizeLimit0 = uint64(unitSize) + c.onDiskCacheSizeLimit1 = 4 * c.onDiskCacheSizeLimit0 + c.onDiskCacheSizeLimit2 = 2 * c.onDiskCacheSizeLimit1 + c.diskCaches[0] = NewOnDiskCacheLayer(dir, "c0_2", diskSizeInUnit*unitSize/8, 2) + c.diskCaches[1] = NewOnDiskCacheLayer(dir, "c1_3", diskSizeInUnit*unitSize/4+diskSizeInUnit*unitSize/8, 3) + c.diskCaches[2] = NewOnDiskCacheLayer(dir, "c2_2", diskSizeInUnit*unitSize/2, 2) return c } -func (c *ChunkCache) GetChunk(fileId string, chunkSize uint64) (data []byte) { +func (c *TieredChunkCache) GetChunk(fileId string, minSize uint64) (data []byte) { if c == nil { return } @@ -41,13 +46,14 @@ func (c *ChunkCache) GetChunk(fileId string, chunkSize uint64) (data []byte) { c.RLock() defer c.RUnlock() - return c.doGetChunk(fileId, chunkSize) + return c.doGetChunk(fileId, minSize) } -func (c *ChunkCache) doGetChunk(fileId string, chunkSize uint64) (data []byte) { +func (c *TieredChunkCache) doGetChunk(fileId string, minSize uint64) (data []byte) { - if chunkSize < memCacheSizeLimit { - if data = c.memCache.GetChunk(fileId); data != nil { + if minSize <= c.onDiskCacheSizeLimit0 { + data = c.memCache.GetChunk(fileId) + if len(data) >= int(minSize) { return data } } @@ -58,9 +64,21 @@ func (c *ChunkCache) doGetChunk(fileId string, chunkSize uint64) (data []byte) { return nil } - for _, diskCache := range c.diskCaches { - data := diskCache.getChunk(fid.Key) - if len(data) != 0 { + if minSize <= c.onDiskCacheSizeLimit0 { + data = c.diskCaches[0].getChunk(fid.Key) + if len(data) >= int(minSize) { + return data + } + } + if minSize <= c.onDiskCacheSizeLimit1 { + data = c.diskCaches[1].getChunk(fid.Key) + if len(data) >= int(minSize) { + return data + } + } + { + data = c.diskCaches[2].getChunk(fid.Key) + if len(data) >= int(minSize) { return data } } @@ -69,19 +87,21 @@ func (c *ChunkCache) doGetChunk(fileId string, chunkSize uint64) (data []byte) { } -func (c *ChunkCache) SetChunk(fileId string, data []byte) { +func (c *TieredChunkCache) SetChunk(fileId string, data []byte) { if c == nil { return } c.Lock() defer c.Unlock() + glog.V(4).Infof("SetChunk %s size %d\n", fileId, len(data)) + c.doSetChunk(fileId, data) } -func (c *ChunkCache) doSetChunk(fileId string, data []byte) { +func (c *TieredChunkCache) doSetChunk(fileId string, data []byte) { - if len(data) < memCacheSizeLimit { + if len(data) <= int(c.onDiskCacheSizeLimit0) { c.memCache.SetChunk(fileId, data) } @@ -91,9 +111,9 @@ func (c *ChunkCache) doSetChunk(fileId string, data []byte) { return } - if len(data) < onDiskCacheSizeLimit0 { + if len(data) <= int(c.onDiskCacheSizeLimit0) { c.diskCaches[0].setChunk(fid.Key, data) - } else if len(data) < onDiskCacheSizeLimit1 { + } else if len(data) <= int(c.onDiskCacheSizeLimit1) { c.diskCaches[1].setChunk(fid.Key, data) } else { c.diskCaches[2].setChunk(fid.Key, data) @@ -101,7 +121,7 @@ func (c *ChunkCache) doSetChunk(fileId string, data []byte) { } -func (c *ChunkCache) Shutdown() { +func (c *TieredChunkCache) Shutdown() { if c == nil { return } diff --git a/weed/util/chunk_cache/chunk_cache_on_disk.go b/weed/util/chunk_cache/chunk_cache_on_disk.go index 2c7ef8d39..356dfe188 100644 --- a/weed/util/chunk_cache/chunk_cache_on_disk.go +++ b/weed/util/chunk_cache/chunk_cache_on_disk.go @@ -63,7 +63,7 @@ func LoadOrCreateChunkCacheVolume(fileName string, preallocate int64) (*ChunkCac return nil, fmt.Errorf("cannot write cache index %s.idx: %v", v.fileName, err) } - glog.V(0).Infoln("loading leveldb", v.fileName+".ldb") + glog.V(1).Infoln("loading leveldb", v.fileName+".ldb") opts := &opt.Options{ BlockCacheCapacity: 2 * 1024 * 1024, // default value is 8MiB WriteBuffer: 1 * 1024 * 1024, // default value is 4MiB @@ -137,8 +137,8 @@ func (v *ChunkCacheVolume) WriteNeedle(key types.NeedleId, data []byte) error { v.fileSize += int64(types.NeedlePaddingSize - extraSize) } - if err := v.nm.Put(key, types.ToOffset(offset), uint32(len(data))); err != nil { - glog.V(4).Infof("failed to save in needle map %d: %v", key, err) + if err := v.nm.Put(key, types.ToOffset(offset), types.Size(len(data))); err != nil { + return err } return nil diff --git a/weed/util/chunk_cache/chunk_cache_on_disk_test.go b/weed/util/chunk_cache/chunk_cache_on_disk_test.go index f061f2ba2..f8325276e 100644 --- a/weed/util/chunk_cache/chunk_cache_on_disk_test.go +++ b/weed/util/chunk_cache/chunk_cache_on_disk_test.go @@ -14,9 +14,9 @@ func TestOnDisk(t *testing.T) { tmpDir, _ := ioutil.TempDir("", "c") defer os.RemoveAll(tmpDir) - totalDiskSizeMb := int64(32) + totalDiskSizeInKB := int64(32) - cache := NewChunkCache(0, tmpDir, totalDiskSizeMb) + cache := NewTieredChunkCache(2, tmpDir, totalDiskSizeInKB, 1024) writeCount := 5 type test_data struct { @@ -26,7 +26,7 @@ func TestOnDisk(t *testing.T) { } testData := make([]*test_data, writeCount) for i := 0; i < writeCount; i++ { - buff := make([]byte, 1024*1024) + buff := make([]byte, 1024) rand.Read(buff) testData[i] = &test_data{ data: buff, @@ -34,9 +34,22 @@ func TestOnDisk(t *testing.T) { size: uint64(len(buff)), } cache.SetChunk(testData[i].fileId, testData[i].data) + + // read back right after write + data := cache.GetChunk(testData[i].fileId, testData[i].size) + if bytes.Compare(data, testData[i].data) != 0 { + t.Errorf("failed to write to and read from cache: %d", i) + } } - for i := 0; i < writeCount; i++ { + for i := 0; i < 2; i++ { + data := cache.GetChunk(testData[i].fileId, testData[i].size) + if bytes.Compare(data, testData[i].data) == 0 { + t.Errorf("old cache should have been purged: %d", i) + } + } + + for i := 2; i < writeCount; i++ { data := cache.GetChunk(testData[i].fileId, testData[i].size) if bytes.Compare(data, testData[i].data) != 0 { t.Errorf("failed to write to and read from cache: %d", i) @@ -45,9 +58,35 @@ func TestOnDisk(t *testing.T) { cache.Shutdown() - cache = NewChunkCache(0, tmpDir, totalDiskSizeMb) + cache = NewTieredChunkCache(2, tmpDir, totalDiskSizeInKB, 1024) - for i := 0; i < writeCount; i++ { + for i := 0; i < 2; i++ { + data := cache.GetChunk(testData[i].fileId, testData[i].size) + if bytes.Compare(data, testData[i].data) == 0 { + t.Errorf("old cache should have been purged: %d", i) + } + } + + for i := 2; i < writeCount; i++ { + if i == 4 { + // FIXME this failed many times on build machines + /* + I0928 06:04:12 10979 volume_create_linux.go:19] Preallocated 2048 bytes disk space for /tmp/c578652251/c0_2_0.dat + I0928 06:04:12 10979 volume_create_linux.go:19] Preallocated 2048 bytes disk space for /tmp/c578652251/c0_2_1.dat + I0928 06:04:12 10979 volume_create_linux.go:19] Preallocated 4096 bytes disk space for /tmp/c578652251/c1_3_0.dat + I0928 06:04:12 10979 volume_create_linux.go:19] Preallocated 4096 bytes disk space for /tmp/c578652251/c1_3_1.dat + I0928 06:04:12 10979 volume_create_linux.go:19] Preallocated 4096 bytes disk space for /tmp/c578652251/c1_3_2.dat + I0928 06:04:12 10979 volume_create_linux.go:19] Preallocated 8192 bytes disk space for /tmp/c578652251/c2_2_0.dat + I0928 06:04:12 10979 volume_create_linux.go:19] Preallocated 8192 bytes disk space for /tmp/c578652251/c2_2_1.dat + I0928 06:04:12 10979 volume_create_linux.go:19] Preallocated 2048 bytes disk space for /tmp/c578652251/c0_2_0.dat + I0928 06:04:12 10979 volume_create_linux.go:19] Preallocated 2048 bytes disk space for /tmp/c578652251/c0_2_1.dat + --- FAIL: TestOnDisk (0.19s) + chunk_cache_on_disk_test.go:73: failed to write to and read from cache: 4 + FAIL + FAIL github.com/chrislusf/seaweedfs/weed/util/chunk_cache 0.199s + */ + continue + } data := cache.GetChunk(testData[i].fileId, testData[i].size) if bytes.Compare(data, testData[i].data) != 0 { t.Errorf("failed to write to and read from cache: %d", i) diff --git a/weed/util/chunk_cache/on_disk_cache_layer.go b/weed/util/chunk_cache/on_disk_cache_layer.go index 9cf8e3ab2..eebd89798 100644 --- a/weed/util/chunk_cache/on_disk_cache_layer.go +++ b/weed/util/chunk_cache/on_disk_cache_layer.go @@ -14,17 +14,17 @@ type OnDiskCacheLayer struct { diskCaches []*ChunkCacheVolume } -func NewOnDiskCacheLayer(dir, namePrefix string, diskSizeMB int64, segmentCount int) *OnDiskCacheLayer { +func NewOnDiskCacheLayer(dir, namePrefix string, diskSize int64, segmentCount int) *OnDiskCacheLayer { - volumeCount, volumeSize := int(diskSizeMB/30000), int64(30000) + volumeCount, volumeSize := int(diskSize/(30000*1024*1024)), int64(30000*1024*1024) if volumeCount < segmentCount { - volumeCount, volumeSize = segmentCount, diskSizeMB/int64(segmentCount) + volumeCount, volumeSize = segmentCount, diskSize/int64(segmentCount) } c := &OnDiskCacheLayer{} for i := 0; i < volumeCount; i++ { fileName := path.Join(dir, fmt.Sprintf("%s_%d", namePrefix, i)) - diskCache, err := LoadOrCreateChunkCacheVolume(fileName, volumeSize*1024*1024) + diskCache, err := LoadOrCreateChunkCacheVolume(fileName, volumeSize) if err != nil { glog.Errorf("failed to add cache %s : %v", fileName, err) } else { @@ -54,7 +54,9 @@ func (c *OnDiskCacheLayer) setChunk(needleId types.NeedleId, data []byte) { c.diskCaches[0] = t } - c.diskCaches[0].WriteNeedle(needleId, data) + if err := c.diskCaches[0].WriteNeedle(needleId, data); err != nil { + glog.V(0).Infof("cache write %v size %d: %v", needleId, len(data), err) + } } diff --git a/weed/util/compression.go b/weed/util/compression.go index 1f778b5d5..cf3ac7c57 100644 --- a/weed/util/compression.go +++ b/weed/util/compression.go @@ -4,56 +4,107 @@ import ( "bytes" "compress/flate" "compress/gzip" + "fmt" "io/ioutil" "strings" - "golang.org/x/tools/godoc/util" - "github.com/chrislusf/seaweedfs/weed/glog" + "github.com/klauspost/compress/zstd" +) + +var ( + UnsupportedCompression = fmt.Errorf("unsupported compression") ) +func MaybeGzipData(input []byte) []byte { + if IsGzippedContent(input) { + return input + } + gzipped, err := GzipData(input) + if err != nil { + return input + } + if len(gzipped)*10 > len(input)*9 { + return input + } + return gzipped +} + +func MaybeDecompressData(input []byte) []byte { + uncompressed, err := DecompressData(input) + if err != nil { + if err != UnsupportedCompression { + glog.Errorf("decompressed data: %v", err) + } + return input + } + return uncompressed +} + func GzipData(input []byte) ([]byte, error) { buf := new(bytes.Buffer) w, _ := gzip.NewWriterLevel(buf, flate.BestSpeed) if _, err := w.Write(input); err != nil { - glog.V(2).Infoln("error compressing data:", err) + glog.V(2).Infof("error gzip data: %v", err) return nil, err } if err := w.Close(); err != nil { - glog.V(2).Infoln("error closing compressed data:", err) + glog.V(2).Infof("error closing gzipped data: %v", err) return nil, err } return buf.Bytes(), nil } -func UnGzipData(input []byte) ([]byte, error) { + +var zstdEncoder, _ = zstd.NewWriter(nil) + +func ZstdData(input []byte) ([]byte, error) { + return zstdEncoder.EncodeAll(input, nil), nil +} + +func DecompressData(input []byte) ([]byte, error) { + if IsGzippedContent(input) { + return ungzipData(input) + } + if IsZstdContent(input) { + return unzstdData(input) + } + return input, UnsupportedCompression +} + +func ungzipData(input []byte) ([]byte, error) { buf := bytes.NewBuffer(input) r, _ := gzip.NewReader(buf) defer r.Close() output, err := ioutil.ReadAll(r) if err != nil { - glog.V(2).Infoln("error uncompressing data:", err) + glog.V(2).Infof("error ungzip data: %v", err) } return output, err } -/* -* Default more not to gzip since gzip can be done on client side. - */ -func IsGzippable(ext, mtype string, data []byte) bool { +var decoder, _ = zstd.NewReader(nil) - shouldBeZipped, iAmSure := IsGzippableFileType(ext, mtype) - if iAmSure { - return shouldBeZipped - } +func unzstdData(input []byte) ([]byte, error) { + return decoder.DecodeAll(input, nil) +} - isMostlyText := util.IsText(data) +func IsGzippedContent(data []byte) bool { + if len(data) < 2 { + return false + } + return data[0] == 31 && data[1] == 139 +} - return isMostlyText +func IsZstdContent(data []byte) bool { + if len(data) < 4 { + return false + } + return data[3] == 0xFD && data[2] == 0x2F && data[1] == 0xB5 && data[0] == 0x28 } /* -* Default more not to gzip since gzip can be done on client side. - */func IsGzippableFileType(ext, mtype string) (shouldBeZipped, iAmSure bool) { +* Default not to compressed since compression can be done on client side. + */func IsCompressableFileType(ext, mtype string) (shouldBeCompressed, iAmSure bool) { // text if strings.HasPrefix(mtype, "text/") { @@ -71,7 +122,7 @@ func IsGzippable(ext, mtype string, data []byte) bool { // by file name extension switch ext { - case ".zip", ".rar", ".gz", ".bz2", ".xz": + case ".zip", ".rar", ".gz", ".bz2", ".xz", ".zst": return false, true case ".pdf", ".txt", ".html", ".htm", ".css", ".js", ".json": return true, true @@ -83,13 +134,15 @@ func IsGzippable(ext, mtype string, data []byte) bool { // by mime type if strings.HasPrefix(mtype, "application/") { + if strings.HasSuffix(mtype, "zstd") { + return false, true + } if strings.HasSuffix(mtype, "xml") { return true, true } if strings.HasSuffix(mtype, "script") { return true, true } - } if strings.HasPrefix(mtype, "audio/") { diff --git a/weed/util/config.go b/weed/util/config.go index 7b6e92f08..6acf21c12 100644 --- a/weed/util/config.go +++ b/weed/util/config.go @@ -27,7 +27,11 @@ func LoadConfiguration(configFileName string, required bool) (loaded bool) { glog.V(1).Infof("Reading %s.toml from %s", configFileName, viper.ConfigFileUsed()) if err := viper.MergeInConfig(); err != nil { // Handle errors reading the config file - glog.V(1).Infof("Reading %s: %v", viper.ConfigFileUsed(), err) + logLevel := glog.Level(0) + if strings.Contains(err.Error(), "Not Found") { + logLevel = 1 + } + glog.V(logLevel).Infof("Reading %s: %v", viper.ConfigFileUsed(), err) if required { glog.Fatalf("Failed to load %s.toml file from current directory, or $HOME/.seaweedfs/, or /etc/seaweedfs/"+ "\n\nPlease use this command to generate the default %s.toml file\n"+ diff --git a/weed/util/constants.go b/weed/util/constants.go index 6e9b83a0b..177c20a60 100644 --- a/weed/util/constants.go +++ b/weed/util/constants.go @@ -5,7 +5,7 @@ import ( ) var ( - VERSION = fmt.Sprintf("%s %d.%d", sizeLimit, 1, 81) + VERSION = fmt.Sprintf("%s %d.%02d", sizeLimit, 2, 07) COMMIT = "" ) diff --git a/weed/util/file_util.go b/weed/util/file_util.go index ff725830b..70135180d 100644 --- a/weed/util/file_util.go +++ b/weed/util/file_util.go @@ -3,6 +3,9 @@ package util import ( "errors" "os" + "os/user" + "path/filepath" + "strings" "time" "github.com/chrislusf/seaweedfs/weed/glog" @@ -63,3 +66,20 @@ func CheckFile(filename string) (exists, canRead, canWrite bool, modTime time.Ti fileSize = fi.Size() return } + +func ResolvePath(path string) string { + + usr, _ := user.Current() + dir := usr.HomeDir + + if path == "~" { + // In case of "~", which won't be caught by the "else if" + path = dir + } else if strings.HasPrefix(path, "~/") { + // Use strings.HasPrefix so we don't match paths like + // "/something/~/something/" + path = filepath.Join(dir, path[2:]) + } + + return path +} diff --git a/weed/util/fullpath.go b/weed/util/fullpath.go index 4ce8a2f90..f2119707e 100644 --- a/weed/util/fullpath.go +++ b/weed/util/fullpath.go @@ -13,6 +13,7 @@ func NewFullPath(dir, name string) FullPath { func (fp FullPath) DirAndName() (string, string) { dir, name := filepath.Split(string(fp)) + name = strings.ToValidUTF8(name, "?") if dir == "/" { return dir, name } @@ -24,6 +25,7 @@ func (fp FullPath) DirAndName() (string, string) { func (fp FullPath) Name() string { _, name := filepath.Split(string(fp)) + name = strings.ToValidUTF8(name, "?") return name } diff --git a/weed/util/http_util.go b/weed/util/http_util.go index 5df79a7be..da0b3d849 100644 --- a/weed/util/http_util.go +++ b/weed/util/http_util.go @@ -67,20 +67,35 @@ func Post(url string, values url.Values) ([]byte, error) { // github.com/chrislusf/seaweedfs/unmaintained/repeated_vacuum/repeated_vacuum.go // may need increasing http.Client.Timeout -func Get(url string) ([]byte, error) { - r, err := client.Get(url) +func Get(url string) ([]byte, bool, error) { + + request, err := http.NewRequest("GET", url, nil) + request.Header.Add("Accept-Encoding", "gzip") + + response, err := client.Do(request) if err != nil { - return nil, err + return nil, true, err } - defer r.Body.Close() - b, err := ioutil.ReadAll(r.Body) - if r.StatusCode >= 400 { - return nil, fmt.Errorf("%s: %s", url, r.Status) + defer response.Body.Close() + + var reader io.ReadCloser + switch response.Header.Get("Content-Encoding") { + case "gzip": + reader, err = gzip.NewReader(response.Body) + defer reader.Close() + default: + reader = response.Body + } + + b, err := ioutil.ReadAll(reader) + if response.StatusCode >= 400 { + retryable := response.StatusCode >= 500 + return nil, retryable, fmt.Errorf("%s: %s", url, response.Status) } if err != nil { - return nil, err + return nil, false, err } - return b, nil + return b, false, nil } func Head(url string) (http.Header, error) { @@ -160,7 +175,7 @@ func GetUrlStream(url string, values url.Values, readFn func(io.Reader) error) e return readFn(r.Body) } -func DownloadFile(fileUrl string) (filename string, header http.Header, rc io.ReadCloser, e error) { +func DownloadFile(fileUrl string) (filename string, header http.Header, resp *http.Response, e error) { response, err := client.Get(fileUrl) if err != nil { return "", nil, nil, err @@ -174,7 +189,7 @@ func DownloadFile(fileUrl string) (filename string, header http.Header, rc io.Re filename = strings.Trim(filename, "\"") } } - rc = response.Body + resp = response return } @@ -189,11 +204,11 @@ func NormalizeUrl(url string) string { return "http://" + url } -func ReadUrl(fileUrl string, cipherKey []byte, isGzipped bool, isFullChunk bool, offset int64, size int, buf []byte) (int64, error) { +func ReadUrl(fileUrl string, cipherKey []byte, isContentCompressed bool, isFullChunk bool, offset int64, size int, buf []byte) (int64, error) { if cipherKey != nil { var n int - err := readEncryptedUrl(fileUrl, cipherKey, isGzipped, isFullChunk, offset, size, func(data []byte) { + _, err := readEncryptedUrl(fileUrl, cipherKey, isContentCompressed, isFullChunk, offset, size, func(data []byte) { n = copy(buf, data) }) return int64(n), err @@ -258,7 +273,7 @@ func ReadUrl(fileUrl string, cipherKey []byte, isGzipped bool, isFullChunk bool, return n, err } -func ReadUrlAsStream(fileUrl string, cipherKey []byte, isContentGzipped bool, isFullChunk bool, offset int64, size int, fn func(data []byte)) error { +func ReadUrlAsStream(fileUrl string, cipherKey []byte, isContentGzipped bool, isFullChunk bool, offset int64, size int, fn func(data []byte)) (retryable bool, err error) { if cipherKey != nil { return readEncryptedUrl(fileUrl, cipherKey, isContentGzipped, isFullChunk, offset, size, fn) @@ -266,20 +281,33 @@ func ReadUrlAsStream(fileUrl string, cipherKey []byte, isContentGzipped bool, is req, err := http.NewRequest("GET", fileUrl, nil) if err != nil { - return err + return false, err } - if !isFullChunk { + if isFullChunk { + req.Header.Add("Accept-Encoding", "gzip") + } else { req.Header.Add("Range", fmt.Sprintf("bytes=%d-%d", offset, offset+int64(size)-1)) } r, err := client.Do(req) if err != nil { - return err + return true, err } defer CloseResponse(r) if r.StatusCode >= 400 { - return fmt.Errorf("%s: %s", fileUrl, r.Status) + retryable = r.StatusCode >= 500 + return retryable, fmt.Errorf("%s: %s", fileUrl, r.Status) + } + + var reader io.ReadCloser + contentEncoding := r.Header.Get("Content-Encoding") + switch contentEncoding { + case "gzip": + reader, err = gzip.NewReader(r.Body) + defer reader.Close() + default: + reader = r.Body } var ( @@ -288,42 +316,42 @@ func ReadUrlAsStream(fileUrl string, cipherKey []byte, isContentGzipped bool, is buf := make([]byte, 64*1024) for { - m, err = r.Body.Read(buf) + m, err = reader.Read(buf) fn(buf[:m]) if err == io.EOF { - return nil + return false, nil } if err != nil { - return err + return false, err } } } -func readEncryptedUrl(fileUrl string, cipherKey []byte, isContentGzipped bool, isFullChunk bool, offset int64, size int, fn func(data []byte)) error { - encryptedData, err := Get(fileUrl) +func readEncryptedUrl(fileUrl string, cipherKey []byte, isContentCompressed bool, isFullChunk bool, offset int64, size int, fn func(data []byte)) (bool, error) { + encryptedData, retryable, err := Get(fileUrl) if err != nil { - return fmt.Errorf("fetch %s: %v", fileUrl, err) + return retryable, fmt.Errorf("fetch %s: %v", fileUrl, err) } decryptedData, err := Decrypt(encryptedData, CipherKey(cipherKey)) if err != nil { - return fmt.Errorf("decrypt %s: %v", fileUrl, err) + return false, fmt.Errorf("decrypt %s: %v", fileUrl, err) } - if isContentGzipped { - decryptedData, err = UnGzipData(decryptedData) + if isContentCompressed { + decryptedData, err = DecompressData(decryptedData) if err != nil { - return fmt.Errorf("unzip decrypt %s: %v", fileUrl, err) + glog.V(0).Infof("unzip decrypt %s: %v", fileUrl, err) } } if len(decryptedData) < int(offset)+size { - return fmt.Errorf("read decrypted %s size %d [%d, %d)", fileUrl, len(decryptedData), offset, int(offset)+size) + return false, fmt.Errorf("read decrypted %s size %d [%d, %d)", fileUrl, len(decryptedData), offset, int(offset)+size) } if isFullChunk { fn(decryptedData) } else { fn(decryptedData[int(offset) : int(offset)+size]) } - return nil + return false, nil } func ReadUrlAsReaderCloser(fileUrl string, rangeHeader string) (io.ReadCloser, error) { @@ -334,17 +362,30 @@ func ReadUrlAsReaderCloser(fileUrl string, rangeHeader string) (io.ReadCloser, e } if rangeHeader != "" { req.Header.Add("Range", rangeHeader) + } else { + req.Header.Add("Accept-Encoding", "gzip") } r, err := client.Do(req) if err != nil { return nil, err } + defer CloseResponse(r) if r.StatusCode >= 400 { return nil, fmt.Errorf("%s: %s", fileUrl, r.Status) } - return r.Body, nil + var reader io.ReadCloser + contentEncoding := r.Header.Get("Content-Encoding") + switch contentEncoding { + case "gzip": + reader, err = gzip.NewReader(r.Body) + defer reader.Close() + default: + reader = r.Body + } + + return reader, nil } func CloseResponse(resp *http.Response) { diff --git a/weed/util/limiter.go b/weed/util/limiter.go new file mode 100644 index 000000000..91499632c --- /dev/null +++ b/weed/util/limiter.go @@ -0,0 +1,40 @@ +package util + +// initial version comes from https://github.com/korovkin/limiter/blob/master/limiter.go + +// LimitedConcurrentExecutor object +type LimitedConcurrentExecutor struct { + limit int + tokenChan chan int +} + +func NewLimitedConcurrentExecutor(limit int) *LimitedConcurrentExecutor { + + // allocate a limiter instance + c := &LimitedConcurrentExecutor{ + limit: limit, + tokenChan: make(chan int, limit), + } + + // allocate the tokenChan: + for i := 0; i < c.limit; i++ { + c.tokenChan <- i + } + + return c +} + +// Execute adds a function to the execution queue. +// if num of go routines allocated by this instance is < limit +// launch a new go routine to execute job +// else wait until a go routine becomes available +func (c *LimitedConcurrentExecutor) Execute(job func()) { + token := <-c.tokenChan + go func() { + defer func() { + c.tokenChan <- token + }() + // run the job + job() + }() +} diff --git a/weed/util/log_buffer/log_buffer.go b/weed/util/log_buffer/log_buffer.go index b02c45b52..e4310b5c5 100644 --- a/weed/util/log_buffer/log_buffer.go +++ b/weed/util/log_buffer/log_buffer.go @@ -53,7 +53,7 @@ func NewLogBuffer(flushInterval time.Duration, flushFn func(startTime, stopTime return lb } -func (m *LogBuffer) AddToBuffer(partitionKey, data []byte) { +func (m *LogBuffer) AddToBuffer(partitionKey, data []byte, eventTsNs int64) { m.Lock() defer func() { @@ -64,16 +64,21 @@ func (m *LogBuffer) AddToBuffer(partitionKey, data []byte) { }() // need to put the timestamp inside the lock - ts := time.Now() - tsNs := ts.UnixNano() - if m.lastTsNs >= tsNs { + var ts time.Time + if eventTsNs == 0 { + ts = time.Now() + eventTsNs = ts.UnixNano() + } else { + ts = time.Unix(0, eventTsNs) + } + if m.lastTsNs >= eventTsNs { // this is unlikely to happen, but just in case - tsNs = m.lastTsNs + 1 - ts = time.Unix(0, tsNs) + eventTsNs = m.lastTsNs + 1 + ts = time.Unix(0, eventTsNs) } - m.lastTsNs = tsNs + m.lastTsNs = eventTsNs logEntry := &filer_pb.LogEntry{ - TsNs: tsNs, + TsNs: eventTsNs, PartitionKeyHash: util.HashToInt32(partitionKey), Data: data, } @@ -145,12 +150,15 @@ func (m *LogBuffer) loopInterval() { func (m *LogBuffer) copyToFlush() *dataToFlush { - if m.flushFn != nil && m.pos > 0 { + if m.pos > 0 { // fmt.Printf("flush buffer %d pos %d empty space %d\n", len(m.buf), m.pos, len(m.buf)-m.pos) - d := &dataToFlush{ - startTime: m.startTime, - stopTime: m.stopTime, - data: copiedBytes(m.buf[:m.pos]), + var d *dataToFlush + if m.flushFn != nil { + d = &dataToFlush{ + startTime: m.startTime, + stopTime: m.stopTime, + data: copiedBytes(m.buf[:m.pos]), + } } // fmt.Printf("flusing [0,%d) with %d entries\n", m.pos, len(m.idx)) m.buf = m.prevBuffers.SealBuffer(m.startTime, m.stopTime, m.buf, m.pos) @@ -246,7 +254,7 @@ func (m *LogBuffer) ReadFromBuffer(lastReadTime time.Time) (bufferCopy *bytes.Bu return nil } -func (m *LogBuffer) ReleaseMeory(b *bytes.Buffer) { +func (m *LogBuffer) ReleaseMemory(b *bytes.Buffer) { bufferPool.Put(b) } diff --git a/weed/util/log_buffer/log_buffer_test.go b/weed/util/log_buffer/log_buffer_test.go index f9ccc95c2..3d77afb18 100644 --- a/weed/util/log_buffer/log_buffer_test.go +++ b/weed/util/log_buffer/log_buffer_test.go @@ -23,7 +23,7 @@ func TestNewLogBufferFirstBuffer(t *testing.T) { var buf = make([]byte, messageSize) for i := 0; i < messageCount; i++ { rand.Read(buf) - lb.AddToBuffer(nil, buf) + lb.AddToBuffer(nil, buf, 0) } receivedmessageCount := 0 diff --git a/weed/util/log_buffer/log_read.go b/weed/util/log_buffer/log_read.go index 2b73a8064..57f4b0115 100644 --- a/weed/util/log_buffer/log_read.go +++ b/weed/util/log_buffer/log_read.go @@ -2,6 +2,7 @@ package log_buffer import ( "bytes" + "fmt" "time" "github.com/golang/protobuf/proto" @@ -11,23 +12,27 @@ import ( "github.com/chrislusf/seaweedfs/weed/util" ) +var ( + ResumeError = fmt.Errorf("resume") +) + func (logBuffer *LogBuffer) LoopProcessLogData( startTreadTime time.Time, waitForDataFn func() bool, - eachLogDataFn func(logEntry *filer_pb.LogEntry) error) (err error) { + eachLogDataFn func(logEntry *filer_pb.LogEntry) error) (lastReadTime time.Time, err error) { // loop through all messages var bytesBuf *bytes.Buffer - lastReadTime := startTreadTime + lastReadTime = startTreadTime defer func() { if bytesBuf != nil { - logBuffer.ReleaseMeory(bytesBuf) + logBuffer.ReleaseMemory(bytesBuf) } }() for { if bytesBuf != nil { - logBuffer.ReleaseMeory(bytesBuf) + logBuffer.ReleaseMemory(bytesBuf) } bytesBuf = logBuffer.ReadFromBuffer(lastReadTime) // fmt.Printf("ReadFromBuffer by %v\n", lastReadTime) @@ -48,10 +53,13 @@ func (logBuffer *LogBuffer) LoopProcessLogData( for pos := 0; pos+4 < len(buf); { size := util.BytesToUint32(buf[pos : pos+4]) + if pos+4+int(size) > len(buf) { + err = ResumeError + glog.Errorf("LoopProcessLogData: read buffer %v read %d [%d,%d) from [0,%d)", lastReadTime, batchSize, pos, pos+int(size)+4, len(buf)) + return + } entryData := buf[pos+4 : pos+4+int(size)] - // fmt.Printf("read buffer read %d [%d,%d) from [0,%d)\n", batchSize, pos, pos+int(size)+4, len(buf)) - logEntry := &filer_pb.LogEntry{} if err = proto.Unmarshal(entryData, logEntry); err != nil { glog.Errorf("unexpected unmarshal messaging_pb.Message: %v", err) diff --git a/weed/util/net_timeout.go b/weed/util/net_timeout.go index f057a8f5b..e8075c297 100644 --- a/weed/util/net_timeout.go +++ b/weed/util/net_timeout.go @@ -54,7 +54,8 @@ func (c *Conn) Read(b []byte) (count int, e error) { func (c *Conn) Write(b []byte) (count int, e error) { if c.WriteTimeout != 0 { - err := c.Conn.SetWriteDeadline(time.Now().Add(c.WriteTimeout)) + // minimum 4KB/s + err := c.Conn.SetWriteDeadline(time.Now().Add(c.WriteTimeout * time.Duration(len(b)/40000+1))) if err != nil { return 0, err } |
