diff options
Diffstat (limited to 'weed/util')
| -rw-r--r-- | weed/util/bounded_tree/bounded_tree.go | 179 | ||||
| -rw-r--r-- | weed/util/bounded_tree/bounded_tree_test.go | 126 | ||||
| -rw-r--r-- | weed/util/chunk_cache/chunk_cache.go | 92 | ||||
| -rw-r--r-- | weed/util/chunk_cache/chunk_cache_in_memory.go | 18 | ||||
| -rw-r--r-- | weed/util/chunk_cache/chunk_cache_on_disk.go | 22 | ||||
| -rw-r--r-- | weed/util/chunk_cache/chunk_cache_on_disk_test.go | 30 | ||||
| -rw-r--r-- | weed/util/chunk_cache/on_disk_cache_layer.go | 20 | ||||
| -rw-r--r-- | weed/util/config.go | 25 | ||||
| -rw-r--r-- | weed/util/constants.go | 2 | ||||
| -rw-r--r-- | weed/util/fullpath.go | 25 | ||||
| -rw-r--r-- | weed/util/http_util.go | 4 | ||||
| -rw-r--r-- | weed/util/mem/slot_pool.go | 27 | ||||
| -rw-r--r-- | weed/util/net_timeout.go | 41 |
13 files changed, 175 insertions, 436 deletions
diff --git a/weed/util/bounded_tree/bounded_tree.go b/weed/util/bounded_tree/bounded_tree.go deleted file mode 100644 index 58911df75..000000000 --- a/weed/util/bounded_tree/bounded_tree.go +++ /dev/null @@ -1,179 +0,0 @@ -package bounded_tree - -import ( - "sync" - - "github.com/chrislusf/seaweedfs/weed/glog" - "github.com/chrislusf/seaweedfs/weed/util" -) - -type Node struct { - Parent *Node - Name string - Children map[string]*Node -} - -type BoundedTree struct { - root *Node - sync.RWMutex - baseDir util.FullPath -} - -func NewBoundedTree(baseDir util.FullPath) *BoundedTree { - return &BoundedTree{ - root: &Node{ - Name: "/", - }, - baseDir: baseDir, - } -} - -type VisitNodeFunc func(path util.FullPath) (childDirectories []string, err error) - -// If the path is not visited, call the visitFn for each level of directory -// No action if the directory has been visited before or does not exist. -// A leaf node, which has no children, represents a directory not visited. -// A non-leaf node or a non-existing node represents a directory already visited, or does not need to visit. -func (t *BoundedTree) EnsureVisited(p util.FullPath, visitFn VisitNodeFunc) (visitErr error) { - t.Lock() - defer t.Unlock() - - if t.root == nil { - return - } - components := p.Split() - // fmt.Printf("components %v %d\n", components, len(components)) - canDelete, err := t.ensureVisited(t.root, util.FullPath("/"), components, 0, visitFn) - if err != nil { - return err - } - if canDelete { - t.root = nil - } - return nil -} - -func (t *BoundedTree) ensureVisited(n *Node, currentPath util.FullPath, components []string, i int, visitFn VisitNodeFunc) (canDeleteNode bool, visitErr error) { - - // println("ensureVisited", currentPath, i) - - if n == nil { - // fmt.Printf("%s null\n", currentPath) - return - } - - if n.isVisited() { - // fmt.Printf("%s visited %v\n", currentPath, n.Name) - } else { - // fmt.Printf("ensure %v\n", currentPath) - - children, err := visitFn(currentPath) - if err != nil { - glog.V(0).Infof("failed to visit %s: %v", currentPath, err) - return false, err - } - - if len(children) == 0 { - // fmt.Printf(" canDelete %v without children\n", currentPath) - return true, nil - } - - n.Children = make(map[string]*Node) - for _, child := range children { - // fmt.Printf(" add child %v %v\n", currentPath, child) - n.Children[child] = &Node{ - Name: child, - } - } - } - - if i >= len(components) { - return - } - - // fmt.Printf(" check child %v %v\n", currentPath, components[i]) - - toVisitNode, found := n.Children[components[i]] - if !found { - // fmt.Printf(" did not find child %v %v\n", currentPath, components[i]) - return - } - - // fmt.Printf(" ensureVisited %v %v\n", currentPath, toVisitNode.Name) - canDelete, childVisitErr := t.ensureVisited(toVisitNode, currentPath.Child(components[i]), components, i+1, visitFn) - if childVisitErr != nil { - return false, childVisitErr - } - if canDelete { - - // fmt.Printf(" delete %v %v\n", currentPath, components[i]) - delete(n.Children, components[i]) - - if len(n.Children) == 0 { - // fmt.Printf(" canDelete %v\n", currentPath) - return true, nil - } - } - - return false, nil - -} - -func (n *Node) isVisited() bool { - if n == nil { - return true - } - if len(n.Children) > 0 { - return true - } - return false -} - -func (n *Node) getChild(childName string) *Node { - if n == nil { - return nil - } - if len(n.Children) > 0 { - return n.Children[childName] - } - return nil -} - -func (t *BoundedTree) HasVisited(p util.FullPath) bool { - - t.RLock() - defer t.RUnlock() - - if t.root == nil { - return true - } - - components := p.Split() - // fmt.Printf("components %v %d\n", components, len(components)) - return t.hasVisited(t.root, util.FullPath("/"), components, 0) -} - -func (t *BoundedTree) hasVisited(n *Node, currentPath util.FullPath, components []string, i int) bool { - - if n == nil { - return true - } - - if !n.isVisited() { - return false - } - - // fmt.Printf(" hasVisited child %v %+v %d\n", currentPath, components, i) - - if i >= len(components) { - return true - } - - toVisitNode, found := n.Children[components[i]] - if !found { - return true - } - - return t.hasVisited(toVisitNode, currentPath.Child(components[i]), components, i+1) - -} diff --git a/weed/util/bounded_tree/bounded_tree_test.go b/weed/util/bounded_tree/bounded_tree_test.go deleted file mode 100644 index 465f1cc9c..000000000 --- a/weed/util/bounded_tree/bounded_tree_test.go +++ /dev/null @@ -1,126 +0,0 @@ -package bounded_tree - -import ( - "fmt" - "testing" - - "github.com/stretchr/testify/assert" - - "github.com/chrislusf/seaweedfs/weed/util" -) - -var ( - visitFn = func(path util.FullPath) (childDirectories []string, err error) { - fmt.Printf(" visit %v ...\n", path) - switch path { - case "/": - return []string{"a", "g", "h"}, nil - case "/a": - return []string{"b", "f"}, nil - case "/a/b": - return []string{"c", "e"}, nil - case "/a/b/c": - return []string{"d"}, nil - case "/a/b/c/d": - return []string{"i", "j"}, nil - case "/a/b/c/d/i": - return []string{}, nil - case "/a/b/c/d/j": - return []string{}, nil - case "/a/b/e": - return []string{}, nil - case "/a/f": - return []string{}, nil - } - return nil, nil - } - - printMap = func(m map[string]*Node) { - for k := range m { - println(" >", k) - } - } -) - -func TestBoundedTree(t *testing.T) { - - // a/b/c/d/i - // a/b/c/d/j - // a/b/c/d - // a/b/e - // a/f - // g - // h - - tree := NewBoundedTree(util.FullPath("/")) - - tree.EnsureVisited(util.FullPath("/a/b/c"), visitFn) - - assert.Equal(t, true, tree.HasVisited(util.FullPath("/a/b"))) - assert.Equal(t, true, tree.HasVisited(util.FullPath("/a/b/c"))) - assert.Equal(t, false, tree.HasVisited(util.FullPath("/a/b/c/d"))) - assert.Equal(t, false, tree.HasVisited(util.FullPath("/a/b/e"))) - assert.Equal(t, false, tree.HasVisited(util.FullPath("/a/f"))) - assert.Equal(t, false, tree.HasVisited(util.FullPath("/g"))) - assert.Equal(t, false, tree.HasVisited(util.FullPath("/h"))) - assert.Equal(t, true, tree.HasVisited(util.FullPath("/"))) - assert.Equal(t, true, tree.HasVisited(util.FullPath("/x"))) - assert.Equal(t, false, tree.HasVisited(util.FullPath("/a/b/e/x"))) - - printMap(tree.root.Children) - - a := tree.root.getChild("a") - - b := a.getChild("b") - if !b.isVisited() { - t.Errorf("expect visited /a/b") - } - c := b.getChild("c") - if !c.isVisited() { - t.Errorf("expect visited /a/b/c") - } - - d := c.getChild("d") - if d.isVisited() { - t.Errorf("expect unvisited /a/b/c/d") - } - - tree.EnsureVisited(util.FullPath("/a/b/c/d"), visitFn) - tree.EnsureVisited(util.FullPath("/a/b/c/d/i"), visitFn) - tree.EnsureVisited(util.FullPath("/a/b/c/d/j"), visitFn) - tree.EnsureVisited(util.FullPath("/a/b/e"), visitFn) - tree.EnsureVisited(util.FullPath("/a/f"), visitFn) - - printMap(tree.root.Children) - -} - -func TestEmptyBoundedTree(t *testing.T) { - - // g - // h - - tree := NewBoundedTree(util.FullPath("/")) - - visitFn := func(path util.FullPath) (childDirectories []string, err error) { - fmt.Printf(" visit %v ...\n", path) - switch path { - case "/": - return []string{"g", "h"}, nil - } - t.Fatalf("expected visit %s", path) - return nil, nil - } - - tree.EnsureVisited(util.FullPath("/a/b"), visitFn) - - tree.EnsureVisited(util.FullPath("/a/b"), visitFn) - - printMap(tree.root.Children) - - assert.Equal(t, true, tree.HasVisited(util.FullPath("/a/b"))) - assert.Equal(t, true, tree.HasVisited(util.FullPath("/a"))) - assert.Equal(t, false, tree.HasVisited(util.FullPath("/g"))) - assert.Equal(t, false, tree.HasVisited(util.FullPath("/g/x"))) - -} diff --git a/weed/util/chunk_cache/chunk_cache.go b/weed/util/chunk_cache/chunk_cache.go index 40d24b322..3f3b264b1 100644 --- a/weed/util/chunk_cache/chunk_cache.go +++ b/weed/util/chunk_cache/chunk_cache.go @@ -11,8 +11,7 @@ import ( var ErrorOutOfBounds = errors.New("attempt to read out of bounds") type ChunkCache interface { - GetChunk(fileId string, minSize uint64) (data []byte) - GetChunkSlice(fileId string, offset, length uint64) []byte + ReadChunkAt(data []byte, fileId string, offset uint64) (n int, err error) SetChunk(fileId string, data []byte) } @@ -44,105 +43,52 @@ func NewTieredChunkCache(maxEntries int64, dir string, diskSizeInUnit int64, uni return c } -func (c *TieredChunkCache) GetChunk(fileId string, minSize uint64) (data []byte) { +func (c *TieredChunkCache) ReadChunkAt(data []byte, fileId string, offset uint64) (n int, err error) { if c == nil { - return - } - - c.RLock() - defer c.RUnlock() - - return c.doGetChunk(fileId, minSize) -} - -func (c *TieredChunkCache) doGetChunk(fileId string, minSize uint64) (data []byte) { - - if minSize <= c.onDiskCacheSizeLimit0 { - data = c.memCache.GetChunk(fileId) - if len(data) >= int(minSize) { - return data - } - } - - fid, err := needle.ParseFileIdFromString(fileId) - if err != nil { - glog.Errorf("failed to parse file id %s", fileId) - return nil - } - - if minSize <= c.onDiskCacheSizeLimit0 { - data = c.diskCaches[0].getChunk(fid.Key) - if len(data) >= int(minSize) { - return data - } - } - if minSize <= c.onDiskCacheSizeLimit1 { - data = c.diskCaches[1].getChunk(fid.Key) - if len(data) >= int(minSize) { - return data - } - } - { - data = c.diskCaches[2].getChunk(fid.Key) - if len(data) >= int(minSize) { - return data - } - } - - return nil - -} - -func (c *TieredChunkCache) GetChunkSlice(fileId string, offset, length uint64) []byte { - if c == nil { - return nil + return 0, nil } c.RLock() defer c.RUnlock() - return c.doGetChunkSlice(fileId, offset, length) -} - -func (c *TieredChunkCache) doGetChunkSlice(fileId string, offset, length uint64) (data []byte) { - - minSize := offset + length + minSize := offset + uint64(len(data)) if minSize <= c.onDiskCacheSizeLimit0 { - data, err := c.memCache.getChunkSlice(fileId, offset, length) + n, err = c.memCache.readChunkAt(data, fileId, offset) if err != nil { glog.Errorf("failed to read from memcache: %s", err) } - if len(data) >= int(minSize) { - return data + if n >= int(minSize) { + return n, nil } } fid, err := needle.ParseFileIdFromString(fileId) if err != nil { glog.Errorf("failed to parse file id %s", fileId) - return nil + return n, nil } if minSize <= c.onDiskCacheSizeLimit0 { - data = c.diskCaches[0].getChunkSlice(fid.Key, offset, length) - if len(data) >= int(minSize) { - return data + n, err = c.diskCaches[0].readChunkAt(data, fid.Key, offset) + if n >= int(minSize) { + return } } if minSize <= c.onDiskCacheSizeLimit1 { - data = c.diskCaches[1].getChunkSlice(fid.Key, offset, length) - if len(data) >= int(minSize) { - return data + n, err = c.diskCaches[1].readChunkAt(data, fid.Key, offset) + if n >= int(minSize) { + return } } { - data = c.diskCaches[2].getChunkSlice(fid.Key, offset, length) - if len(data) >= int(minSize) { - return data + n, err = c.diskCaches[2].readChunkAt(data, fid.Key, offset) + if n >= int(minSize) { + return } } - return nil + return 0, nil + } func (c *TieredChunkCache) SetChunk(fileId string, data []byte) { diff --git a/weed/util/chunk_cache/chunk_cache_in_memory.go b/weed/util/chunk_cache/chunk_cache_in_memory.go index d725f8a16..2982d0979 100644 --- a/weed/util/chunk_cache/chunk_cache_in_memory.go +++ b/weed/util/chunk_cache/chunk_cache_in_memory.go @@ -1,9 +1,8 @@ package chunk_cache import ( - "time" - "github.com/karlseguin/ccache/v2" + "time" ) // a global cache for recently accessed file chunks @@ -45,6 +44,21 @@ func (c *ChunkCacheInMemory) getChunkSlice(fileId string, offset, length uint64) return data[offset : int(offset)+wanted], nil } +func (c *ChunkCacheInMemory) readChunkAt(buffer []byte, fileId string, offset uint64) (int, error) { + item := c.cache.Get(fileId) + if item == nil { + return 0, nil + } + data := item.Value().([]byte) + item.Extend(time.Hour) + wanted := min(len(buffer), len(data)-int(offset)) + if wanted < 0 { + return 0, ErrorOutOfBounds + } + n := copy(buffer, data[offset:int(offset)+wanted]) + return n, nil +} + func (c *ChunkCacheInMemory) SetChunk(fileId string, data []byte) { localCopy := make([]byte, len(data)) copy(localCopy, data) diff --git a/weed/util/chunk_cache/chunk_cache_on_disk.go b/weed/util/chunk_cache/chunk_cache_on_disk.go index 36de5c972..100b5919e 100644 --- a/weed/util/chunk_cache/chunk_cache_on_disk.go +++ b/weed/util/chunk_cache/chunk_cache_on_disk.go @@ -144,6 +144,28 @@ func (v *ChunkCacheVolume) getNeedleSlice(key types.NeedleId, offset, length uin return data, nil } +func (v *ChunkCacheVolume) readNeedleSliceAt(data []byte, key types.NeedleId, offset uint64) (n int, err error) { + nv, ok := v.nm.Get(key) + if !ok { + return 0, storage.ErrorNotFound + } + wanted := min(len(data), int(nv.Size)-int(offset)) + if wanted < 0 { + // should never happen, but better than panicing + return 0, ErrorOutOfBounds + } + if n, err = v.DataBackend.ReadAt(data, nv.Offset.ToActualOffset()+int64(offset)); err != nil { + return n, fmt.Errorf("read %s.dat [%d,%d): %v", + v.fileName, nv.Offset.ToActualOffset()+int64(offset), int(nv.Offset.ToActualOffset())+int(offset)+wanted, err) + } else { + if n != wanted { + return n, fmt.Errorf("read %d, expected %d", n, wanted) + } + } + + return n, nil +} + func (v *ChunkCacheVolume) WriteNeedle(key types.NeedleId, data []byte) error { offset := v.fileSize diff --git a/weed/util/chunk_cache/chunk_cache_on_disk_test.go b/weed/util/chunk_cache/chunk_cache_on_disk_test.go index 7dccfd43f..8c7880eee 100644 --- a/weed/util/chunk_cache/chunk_cache_on_disk_test.go +++ b/weed/util/chunk_cache/chunk_cache_on_disk_test.go @@ -3,15 +3,13 @@ package chunk_cache import ( "bytes" "fmt" + "github.com/chrislusf/seaweedfs/weed/util/mem" "math/rand" - "os" "testing" ) func TestOnDisk(t *testing.T) { - - tmpDir, _ := os.MkdirTemp("", "c") - defer os.RemoveAll(tmpDir) + tmpDir := t.TempDir() totalDiskSizeInKB := int64(32) @@ -21,7 +19,7 @@ func TestOnDisk(t *testing.T) { type test_data struct { data []byte fileId string - size uint64 + size int } testData := make([]*test_data, writeCount) for i := 0; i < writeCount; i++ { @@ -30,29 +28,35 @@ func TestOnDisk(t *testing.T) { testData[i] = &test_data{ data: buff, fileId: fmt.Sprintf("1,%daabbccdd", i+1), - size: uint64(len(buff)), + size: len(buff), } cache.SetChunk(testData[i].fileId, testData[i].data) // read back right after write - data := cache.GetChunk(testData[i].fileId, testData[i].size) + data := mem.Allocate(testData[i].size) + cache.ReadChunkAt(data, testData[i].fileId, 0) if bytes.Compare(data, testData[i].data) != 0 { t.Errorf("failed to write to and read from cache: %d", i) } + mem.Free(data) } for i := 0; i < 2; i++ { - data := cache.GetChunk(testData[i].fileId, testData[i].size) + data := mem.Allocate(testData[i].size) + cache.ReadChunkAt(data, testData[i].fileId, 0) if bytes.Compare(data, testData[i].data) == 0 { t.Errorf("old cache should have been purged: %d", i) } + mem.Free(data) } for i := 2; i < writeCount; i++ { - data := cache.GetChunk(testData[i].fileId, testData[i].size) + data := mem.Allocate(testData[i].size) + cache.ReadChunkAt(data, testData[i].fileId, 0) if bytes.Compare(data, testData[i].data) != 0 { t.Errorf("failed to write to and read from cache: %d", i) } + mem.Free(data) } cache.Shutdown() @@ -60,10 +64,12 @@ func TestOnDisk(t *testing.T) { cache = NewTieredChunkCache(2, tmpDir, totalDiskSizeInKB, 1024) for i := 0; i < 2; i++ { - data := cache.GetChunk(testData[i].fileId, testData[i].size) + data := mem.Allocate(testData[i].size) + cache.ReadChunkAt(data, testData[i].fileId, 0) if bytes.Compare(data, testData[i].data) == 0 { t.Errorf("old cache should have been purged: %d", i) } + mem.Free(data) } for i := 2; i < writeCount; i++ { @@ -86,10 +92,12 @@ func TestOnDisk(t *testing.T) { */ continue } - data := cache.GetChunk(testData[i].fileId, testData[i].size) + data := mem.Allocate(testData[i].size) + cache.ReadChunkAt(data, testData[i].fileId, 0) if bytes.Compare(data, testData[i].data) != 0 { t.Errorf("failed to write to and read from cache: %d", i) } + mem.Free(data) } cache.Shutdown() diff --git a/weed/util/chunk_cache/on_disk_cache_layer.go b/weed/util/chunk_cache/on_disk_cache_layer.go index 3a656110e..9115b1bb1 100644 --- a/weed/util/chunk_cache/on_disk_cache_layer.go +++ b/weed/util/chunk_cache/on_disk_cache_layer.go @@ -108,6 +108,26 @@ func (c *OnDiskCacheLayer) getChunkSlice(needleId types.NeedleId, offset, length } +func (c *OnDiskCacheLayer) readChunkAt(buffer []byte, needleId types.NeedleId, offset uint64) (n int, err error) { + + for _, diskCache := range c.diskCaches { + n, err = diskCache.readNeedleSliceAt(buffer, needleId, offset) + if err == storage.ErrorNotFound { + continue + } + if err != nil { + glog.Warningf("failed to read cache file %s id %d: %v", diskCache.fileName, needleId, err) + continue + } + if n > 0 { + return + } + } + + return + +} + func (c *OnDiskCacheLayer) shutdown() { for _, diskCache := range c.diskCaches { diff --git a/weed/util/config.go b/weed/util/config.go index ae9397340..f09ac7e5e 100644 --- a/weed/util/config.go +++ b/weed/util/config.go @@ -9,6 +9,20 @@ import ( "github.com/chrislusf/seaweedfs/weed/glog" ) +var ( + ConfigurationFileDirectory DirectoryValueType +) + +type DirectoryValueType string + +func (s *DirectoryValueType) Set(value string) error { + *s = DirectoryValueType(value) + return nil +} +func (s *DirectoryValueType) String() string { + return string(*s) +} + type Configuration interface { GetString(key string) string GetBool(key string) bool @@ -20,11 +34,12 @@ type Configuration interface { func LoadConfiguration(configFileName string, required bool) (loaded bool) { // find a filer store - viper.SetConfigName(configFileName) // name of config file (without extension) - viper.AddConfigPath(".") // optionally look for config in the working directory - viper.AddConfigPath("$HOME/.seaweedfs") // call multiple times to add many search paths - viper.AddConfigPath("/usr/local/etc/seaweedfs/") // search path for bsd-style config directory in - viper.AddConfigPath("/etc/seaweedfs/") // path to look for the config file in + viper.SetConfigName(configFileName) // name of config file (without extension) + viper.AddConfigPath(ResolvePath(ConfigurationFileDirectory.String())) // path to look for the config file in + viper.AddConfigPath(".") // optionally look for config in the working directory + viper.AddConfigPath("$HOME/.seaweedfs") // call multiple times to add many search paths + viper.AddConfigPath("/usr/local/etc/seaweedfs/") // search path for bsd-style config directory in + viper.AddConfigPath("/etc/seaweedfs/") // path to look for the config file in if err := viper.MergeInConfig(); err != nil { // Handle errors reading the config file if strings.Contains(err.Error(), "Not Found") { diff --git a/weed/util/constants.go b/weed/util/constants.go index d148e1ca6..db2e1e958 100644 --- a/weed/util/constants.go +++ b/weed/util/constants.go @@ -5,7 +5,7 @@ import ( ) var ( - VERSION_NUMBER = fmt.Sprintf("%.02f", 2.88) + VERSION_NUMBER = fmt.Sprintf("%.02f", 2.96) VERSION = sizeLimit + " " + VERSION_NUMBER COMMIT = "" ) diff --git a/weed/util/fullpath.go b/weed/util/fullpath.go index 6c4f5c6ae..f52d4d1d0 100644 --- a/weed/util/fullpath.go +++ b/weed/util/fullpath.go @@ -1,7 +1,6 @@ package util import ( - "os" "path/filepath" "strings" ) @@ -43,29 +42,9 @@ func (fp FullPath) Child(name string) FullPath { } // AsInode an in-memory only inode representation -func (fp FullPath) AsInode(fileMode os.FileMode) uint64 { +func (fp FullPath) AsInode(unixTime int64) uint64 { inode := uint64(HashStringToLong(string(fp))) - inode = inode - inode%16 - if fileMode == 0 { - } else if fileMode&os.ModeDir > 0 { - inode += 1 - } else if fileMode&os.ModeSymlink > 0 { - inode += 2 - } else if fileMode&os.ModeDevice > 0 { - if fileMode&os.ModeCharDevice > 0 { - inode += 6 - } else { - inode += 3 - } - } else if fileMode&os.ModeNamedPipe > 0 { - inode += 4 - } else if fileMode&os.ModeSocket > 0 { - inode += 5 - } else if fileMode&os.ModeCharDevice > 0 { - inode += 6 - } else if fileMode&os.ModeIrregular > 0 { - inode += 7 - } + inode = inode + uint64(unixTime)*37 return inode } diff --git a/weed/util/http_util.go b/weed/util/http_util.go index 8b66c6dc1..2f42d3768 100644 --- a/weed/util/http_util.go +++ b/weed/util/http_util.go @@ -5,6 +5,7 @@ import ( "encoding/json" "errors" "fmt" + "github.com/chrislusf/seaweedfs/weed/util/mem" "io" "net/http" "net/url" @@ -326,7 +327,8 @@ func ReadUrlAsStream(fileUrl string, cipherKey []byte, isContentGzipped bool, is var ( m int ) - buf := make([]byte, 64*1024) + buf := mem.Allocate(64 * 1024) + defer mem.Free(buf) for { m, err = reader.Read(buf) diff --git a/weed/util/mem/slot_pool.go b/weed/util/mem/slot_pool.go index 5bd759ab7..b3493febf 100644 --- a/weed/util/mem/slot_pool.go +++ b/weed/util/mem/slot_pool.go @@ -33,22 +33,31 @@ func init() { } } -func getSlotPool(size int) *sync.Pool { +func getSlotPool(size int) (*sync.Pool, bool) { index := bitCount(size) - return pools[index] + if index >= len(pools) { + return nil, false + } + return pools[index], true } var total int64 func Allocate(size int) []byte { - newVal := atomic.AddInt64(&total, 1) - glog.V(4).Infof("++> %d", newVal) - slab := *getSlotPool(size).Get().(*[]byte) - return slab[:size] + if pool, found := getSlotPool(size); found { + newVal := atomic.AddInt64(&total, 1) + glog.V(4).Infof("++> %d", newVal) + + slab := *pool.Get().(*[]byte) + return slab[:size] + } + return make([]byte, size) } func Free(buf []byte) { - newVal := atomic.AddInt64(&total, -1) - glog.V(4).Infof("--> %d", newVal) - getSlotPool(cap(buf)).Put(&buf) + if pool, found := getSlotPool(cap(buf)); found { + newVal := atomic.AddInt64(&total, -1) + glog.V(4).Infof("--> %d", newVal) + pool.Put(&buf) + } } diff --git a/weed/util/net_timeout.go b/weed/util/net_timeout.go index f1ae9016d..abb96c403 100644 --- a/weed/util/net_timeout.go +++ b/weed/util/net_timeout.go @@ -82,16 +82,45 @@ func (c *Conn) Close() error { return err } -func NewListener(addr string, timeout time.Duration) (net.Listener, error) { - l, err := net.Listen("tcp", addr) +func NewListener(addr string, timeout time.Duration) (ipListner net.Listener, err error) { + listner, err := net.Listen("tcp", addr) if err != nil { - return nil, err + return } - tl := &Listener{ - Listener: l, + ipListner = &Listener{ + Listener: listner, ReadTimeout: timeout, WriteTimeout: timeout, } - return tl, nil + + return +} + +func NewIpAndLocalListeners(host string, port int, timeout time.Duration) (ipListner net.Listener, localListener net.Listener, err error) { + listner, err := net.Listen("tcp", JoinHostPort(host, port)) + if err != nil { + return + } + + ipListner = &Listener{ + Listener: listner, + ReadTimeout: timeout, + WriteTimeout: timeout, + } + + if host != "localhost" && host != "" && host != "0.0.0.0" && host != "127.0.0.1" { + listner, err = net.Listen("tcp", JoinHostPort("localhost", port)) + if err != nil { + return + } + + localListener = &Listener{ + Listener: listner, + ReadTimeout: timeout, + WriteTimeout: timeout, + } + } + + return } |
