aboutsummaryrefslogtreecommitdiff
path: root/weed/util
diff options
context:
space:
mode:
authoryourchanges <yourchanges@gmail.com>2020-07-10 09:44:32 +0800
committerGitHub <noreply@github.com>2020-07-10 09:44:32 +0800
commite67096656b0fcdc313c7d8983b6ce36a54d794a3 (patch)
tree4d6cfd722cf6e19b5aa8253e477ddc596ea5e193 /weed/util
parent2b3cef7780a5e91d2072a33411926f9b30c88ee2 (diff)
parent1b680c06c1de27e6a3899c089ec354a9eb08ea44 (diff)
downloadseaweedfs-e67096656b0fcdc313c7d8983b6ce36a54d794a3.tar.xz
seaweedfs-e67096656b0fcdc313c7d8983b6ce36a54d794a3.zip
Merge pull request #1 from chrislusf/master
update
Diffstat (limited to 'weed/util')
-rw-r--r--weed/util/bounded_tree/bounded_tree.go166
-rw-r--r--weed/util/bounded_tree/bounded_tree_test.go131
-rw-r--r--weed/util/bytes.go71
-rw-r--r--weed/util/chunk_cache/chunk_cache.go128
-rw-r--r--weed/util/chunk_cache/chunk_cache_in_memory.go36
-rw-r--r--weed/util/chunk_cache/chunk_cache_on_disk.go145
-rw-r--r--weed/util/chunk_cache/chunk_cache_on_disk_test.go59
-rw-r--r--weed/util/chunk_cache/on_disk_cache_layer.go91
-rw-r--r--weed/util/cipher.go60
-rw-r--r--weed/util/cipher_test.go17
-rw-r--r--weed/util/compression.go126
-rw-r--r--weed/util/compression_test.go21
-rw-r--r--weed/util/config.go45
-rw-r--r--weed/util/constants.go13
-rw-r--r--weed/util/constants_4bytes.go8
-rw-r--r--weed/util/constants_5bytes.go8
-rw-r--r--weed/util/file_util.go33
-rw-r--r--weed/util/file_util_non_posix.go12
-rw-r--r--weed/util/file_util_posix.go11
-rw-r--r--weed/util/fullpath.go56
-rw-r--r--weed/util/grace/pprof.go (renamed from weed/util/pprof.go)2
-rw-r--r--weed/util/grace/signal_handling.go (renamed from weed/util/signal_handling.go)2
-rw-r--r--weed/util/grace/signal_handling_notsupported.go (renamed from weed/util/signal_handling_notsupported.go)2
-rw-r--r--weed/util/grpc_client_server.go87
-rw-r--r--weed/util/http_util.go156
-rw-r--r--weed/util/httpdown/http_down.go395
-rw-r--r--weed/util/inits.go52
-rw-r--r--weed/util/inits_test.go19
-rw-r--r--weed/util/log_buffer/log_buffer.go278
-rw-r--r--weed/util/log_buffer/log_buffer_test.go42
-rw-r--r--weed/util/log_buffer/log_read.go77
-rw-r--r--weed/util/log_buffer/sealed_buffer.go62
-rw-r--r--weed/util/net_timeout.go6
-rw-r--r--weed/util/network.go25
-rw-r--r--weed/util/parse.go37
-rw-r--r--weed/util/queue.go61
-rw-r--r--weed/util/queue_unbounded.go45
-rw-r--r--weed/util/queue_unbounded_test.go25
-rw-r--r--weed/util/throttler.go34
39 files changed, 2511 insertions, 133 deletions
diff --git a/weed/util/bounded_tree/bounded_tree.go b/weed/util/bounded_tree/bounded_tree.go
new file mode 100644
index 000000000..40b9c4e47
--- /dev/null
+++ b/weed/util/bounded_tree/bounded_tree.go
@@ -0,0 +1,166 @@
+package bounded_tree
+
+import (
+ "sync"
+
+ "github.com/chrislusf/seaweedfs/weed/glog"
+ "github.com/chrislusf/seaweedfs/weed/util"
+)
+
+type Node struct {
+ Parent *Node
+ Name string
+ Children map[string]*Node
+}
+
+type BoundedTree struct {
+ root *Node
+ sync.Mutex
+}
+
+func NewBoundedTree() *BoundedTree {
+ return &BoundedTree{
+ root: &Node{
+ Name: "/",
+ },
+ }
+}
+
+type VisitNodeFunc func(path util.FullPath) (childDirectories []string, err error)
+
+// If the path is not visited, call the visitFn for each level of directory
+// No action if the directory has been visited before or does not exist.
+// A leaf node, which has no children, represents a directory not visited.
+// A non-leaf node or a non-existing node represents a directory already visited, or does not need to visit.
+func (t *BoundedTree) EnsureVisited(p util.FullPath, visitFn VisitNodeFunc) {
+ t.Lock()
+ defer t.Unlock()
+
+ if t.root == nil {
+ return
+ }
+ components := p.Split()
+ // fmt.Printf("components %v %d\n", components, len(components))
+ if canDelete := t.ensureVisited(t.root, util.FullPath("/"), components, 0, visitFn); canDelete {
+ t.root = nil
+ }
+}
+
+func (t *BoundedTree) ensureVisited(n *Node, currentPath util.FullPath, components []string, i int, visitFn VisitNodeFunc) (canDeleteNode bool) {
+
+ // println("ensureVisited", currentPath, i)
+
+ if n == nil {
+ // fmt.Printf("%s null\n", currentPath)
+ return
+ }
+
+ if n.isVisited() {
+ // fmt.Printf("%s visited %v\n", currentPath, n.Name)
+ } else {
+ // fmt.Printf("ensure %v\n", currentPath)
+
+ children, err := visitFn(currentPath)
+ if err != nil {
+ glog.V(0).Infof("failed to visit %s: %v", currentPath, err)
+ return
+ }
+
+ if len(children) == 0 {
+ // fmt.Printf(" canDelete %v without children\n", currentPath)
+ return true
+ }
+
+ n.Children = make(map[string]*Node)
+ for _, child := range children {
+ // fmt.Printf(" add child %v %v\n", currentPath, child)
+ n.Children[child] = &Node{
+ Name: child,
+ }
+ }
+ }
+
+ if i >= len(components) {
+ return
+ }
+
+ // fmt.Printf(" check child %v %v\n", currentPath, components[i])
+
+ toVisitNode, found := n.Children[components[i]]
+ if !found {
+ // fmt.Printf(" did not find child %v %v\n", currentPath, components[i])
+ return
+ }
+
+ // fmt.Printf(" ensureVisited %v %v\n", currentPath, toVisitNode.Name)
+
+ if canDelete := t.ensureVisited(toVisitNode, currentPath.Child(components[i]), components, i+1, visitFn); canDelete {
+
+ // fmt.Printf(" delete %v %v\n", currentPath, components[i])
+ delete(n.Children, components[i])
+
+ if len(n.Children) == 0 {
+ // fmt.Printf(" canDelete %v\n", currentPath)
+ return true
+ }
+ }
+
+ return false
+
+}
+
+func (n *Node) isVisited() bool {
+ if n == nil {
+ return true
+ }
+ if len(n.Children) > 0 {
+ return true
+ }
+ return false
+}
+
+func (n *Node) getChild(childName string) *Node {
+ if n == nil {
+ return nil
+ }
+ if len(n.Children) > 0 {
+ return n.Children[childName]
+ }
+ return nil
+}
+
+func (t *BoundedTree) HasVisited(p util.FullPath) bool {
+
+ if t.root == nil {
+ return true
+ }
+
+ components := p.Split()
+ // fmt.Printf("components %v %d\n", components, len(components))
+ return t.hasVisited(t.root, util.FullPath("/"), components, 0)
+}
+
+func (t *BoundedTree) hasVisited(n *Node, currentPath util.FullPath, components []string, i int) bool {
+
+ if n == nil {
+ return true
+ }
+
+ if !n.isVisited() {
+ return false
+ }
+
+ // fmt.Printf(" hasVisited child %v %+v %d\n", currentPath, components, i)
+
+ if i >= len(components) {
+ return true
+ }
+
+ toVisitNode, found := n.Children[components[i]]
+ if !found {
+ return true
+ }
+
+ return t.hasVisited(toVisitNode, currentPath.Child(components[i]), components, i+1)
+
+}
diff --git a/weed/util/bounded_tree/bounded_tree_test.go b/weed/util/bounded_tree/bounded_tree_test.go
new file mode 100644
index 000000000..2328f0497
--- /dev/null
+++ b/weed/util/bounded_tree/bounded_tree_test.go
@@ -0,0 +1,131 @@
+package bounded_tree
+
+import (
+ "fmt"
+ "testing"
+
+ "github.com/stretchr/testify/assert"
+
+ "github.com/chrislusf/seaweedfs/weed/util"
+)
+
+
+var (
+
+ visitFn = func(path util.FullPath) (childDirectories []string, err error) {
+ fmt.Printf(" visit %v ...\n", path)
+ switch path {
+ case "/":
+ return []string{"a", "g", "h"}, nil
+ case "/a":
+ return []string{"b", "f"}, nil
+ case "/a/b":
+ return []string{"c", "e"}, nil
+ case "/a/b/c":
+ return []string{"d"}, nil
+ case "/a/b/c/d":
+ return []string{"i", "j"}, nil
+ case "/a/b/c/d/i":
+ return []string{}, nil
+ case "/a/b/c/d/j":
+ return []string{}, nil
+ case "/a/b/e":
+ return []string{}, nil
+ case "/a/f":
+ return []string{}, nil
+ }
+ return nil, nil
+ }
+
+
+ printMap = func(m map[string]*Node) {
+ for k := range m {
+ println(" >", k)
+ }
+ }
+
+
+)
+
+func TestBoundedTree(t *testing.T) {
+
+ // a/b/c/d/i
+ // a/b/c/d/j
+ // a/b/c/d
+ // a/b/e
+ // a/f
+ // g
+ // h
+
+ tree := NewBoundedTree()
+
+ tree.EnsureVisited(util.FullPath("/a/b/c"), visitFn)
+
+ assert.Equal(t, true, tree.HasVisited(util.FullPath("/a/b")))
+ assert.Equal(t, true, tree.HasVisited(util.FullPath("/a/b/c")))
+ assert.Equal(t, false, tree.HasVisited(util.FullPath("/a/b/c/d")))
+ assert.Equal(t, false, tree.HasVisited(util.FullPath("/a/b/e")))
+ assert.Equal(t, false, tree.HasVisited(util.FullPath("/a/f")))
+ assert.Equal(t, false, tree.HasVisited(util.FullPath("/g")))
+ assert.Equal(t, false, tree.HasVisited(util.FullPath("/h")))
+ assert.Equal(t, true, tree.HasVisited(util.FullPath("/")))
+ assert.Equal(t, true, tree.HasVisited(util.FullPath("/x")))
+ assert.Equal(t, false, tree.HasVisited(util.FullPath("/a/b/e/x")))
+
+ printMap(tree.root.Children)
+
+ a := tree.root.getChild("a")
+
+ b := a.getChild("b")
+ if !b.isVisited() {
+ t.Errorf("expect visited /a/b")
+ }
+ c := b.getChild("c")
+ if !c.isVisited() {
+ t.Errorf("expect visited /a/b/c")
+ }
+
+ d := c.getChild("d")
+ if d.isVisited() {
+ t.Errorf("expect unvisited /a/b/c/d")
+ }
+
+ tree.EnsureVisited(util.FullPath("/a/b/c/d"), visitFn)
+ tree.EnsureVisited(util.FullPath("/a/b/c/d/i"), visitFn)
+ tree.EnsureVisited(util.FullPath("/a/b/c/d/j"), visitFn)
+ tree.EnsureVisited(util.FullPath("/a/b/e"), visitFn)
+ tree.EnsureVisited(util.FullPath("/a/f"), visitFn)
+
+ printMap(tree.root.Children)
+
+}
+
+func TestEmptyBoundedTree(t *testing.T) {
+
+ // g
+ // h
+
+ tree := NewBoundedTree()
+
+ visitFn := func(path util.FullPath) (childDirectories []string, err error) {
+ fmt.Printf(" visit %v ...\n", path)
+ switch path {
+ case "/":
+ return []string{"g", "h"}, nil
+ }
+ t.Fatalf("expected visit %s", path)
+ return nil, nil
+ }
+
+ tree.EnsureVisited(util.FullPath("/a/b"), visitFn)
+
+ tree.EnsureVisited(util.FullPath("/a/b"), visitFn)
+
+ printMap(tree.root.Children)
+
+ assert.Equal(t, true, tree.HasVisited(util.FullPath("/a/b")))
+ assert.Equal(t, true, tree.HasVisited(util.FullPath("/a")))
+ assert.Equal(t, false, tree.HasVisited(util.FullPath("/g")))
+ assert.Equal(t, false, tree.HasVisited(util.FullPath("/g/x")))
+
+}
diff --git a/weed/util/bytes.go b/weed/util/bytes.go
index dfa4ae665..0650919c0 100644
--- a/weed/util/bytes.go
+++ b/weed/util/bytes.go
@@ -1,5 +1,27 @@
package util
+import (
+ "crypto/md5"
+ "fmt"
+ "io"
+)
+
+// BytesToHumanReadable returns the converted human readable representation of the bytes.
+func BytesToHumanReadable(b uint64) string {
+ const unit = 1024
+ if b < unit {
+ return fmt.Sprintf("%d B", b)
+ }
+
+ div, exp := uint64(unit), 0
+ for n := b / unit; n >= unit; n /= unit {
+ div *= unit
+ exp++
+ }
+
+ return fmt.Sprintf("%.2f %ciB", float64(b)/float64(div), "KMGTPE"[exp])
+}
+
// big endian
func BytesToUint64(b []byte) (v uint64) {
@@ -43,3 +65,52 @@ func Uint16toBytes(b []byte, v uint16) {
func Uint8toBytes(b []byte, v uint8) {
b[0] = byte(v)
}
+
+// returns a 64 bit big int
+func HashStringToLong(dir string) (v int64) {
+ h := md5.New()
+ io.WriteString(h, dir)
+
+ b := h.Sum(nil)
+
+ v += int64(b[0])
+ v <<= 8
+ v += int64(b[1])
+ v <<= 8
+ v += int64(b[2])
+ v <<= 8
+ v += int64(b[3])
+ v <<= 8
+ v += int64(b[4])
+ v <<= 8
+ v += int64(b[5])
+ v <<= 8
+ v += int64(b[6])
+ v <<= 8
+ v += int64(b[7])
+
+ return
+}
+
+func HashToInt32(data []byte) (v int32) {
+ h := md5.New()
+ h.Write(data)
+
+ b := h.Sum(nil)
+
+ v += int32(b[0])
+ v <<= 8
+ v += int32(b[1])
+ v <<= 8
+ v += int32(b[2])
+ v <<= 8
+ v += int32(b[3])
+
+ return
+}
+
+func Md5(data []byte) string {
+ hash := md5.New()
+ hash.Write(data)
+ return fmt.Sprintf("%x", hash.Sum(nil))
+}
diff --git a/weed/util/chunk_cache/chunk_cache.go b/weed/util/chunk_cache/chunk_cache.go
new file mode 100644
index 000000000..17b64fb6c
--- /dev/null
+++ b/weed/util/chunk_cache/chunk_cache.go
@@ -0,0 +1,128 @@
+package chunk_cache
+
+import (
+ "sync"
+
+ "github.com/chrislusf/seaweedfs/weed/glog"
+ "github.com/chrislusf/seaweedfs/weed/storage/needle"
+)
+
+const (
+ memCacheSizeLimit = 1024 * 1024
+ onDiskCacheSizeLimit0 = memCacheSizeLimit
+ onDiskCacheSizeLimit1 = 4 * memCacheSizeLimit
+)
+
+// a global cache for recently accessed file chunks
+type ChunkCache struct {
+ memCache *ChunkCacheInMemory
+ diskCaches []*OnDiskCacheLayer
+ sync.RWMutex
+}
+
+func NewChunkCache(maxEntries int64, dir string, diskSizeMB int64) *ChunkCache {
+
+ c := &ChunkCache{
+ memCache: NewChunkCacheInMemory(maxEntries),
+ }
+ c.diskCaches = make([]*OnDiskCacheLayer, 3)
+ c.diskCaches[0] = NewOnDiskCacheLayer(dir, "c0_1", diskSizeMB/4, 4)
+ c.diskCaches[1] = NewOnDiskCacheLayer(dir, "c1_4", diskSizeMB/4, 4)
+ c.diskCaches[2] = NewOnDiskCacheLayer(dir, "cache", diskSizeMB/2, 4)
+
+ return c
+}
+
+func (c *ChunkCache) GetChunk(fileId string, chunkSize uint64) (data []byte) {
+ if c == nil {
+ return
+ }
+
+ c.RLock()
+ defer c.RUnlock()
+
+ return c.doGetChunk(fileId, chunkSize)
+}
+
+func (c *ChunkCache) doGetChunk(fileId string, chunkSize uint64) (data []byte) {
+
+ if chunkSize < memCacheSizeLimit {
+ data = c.memCache.GetChunk(fileId)
+ if len(data) >= int(chunkSize) {
+ return data
+ }
+ }
+
+ fid, err := needle.ParseFileIdFromString(fileId)
+ if err != nil {
+ glog.Errorf("failed to parse file id %s", fileId)
+ return nil
+ }
+
+ if chunkSize < onDiskCacheSizeLimit0 {
+ data = c.diskCaches[0].getChunk(fid.Key)
+ if len(data) >= int(chunkSize) {
+ return data
+ }
+ }
+ if chunkSize < onDiskCacheSizeLimit1 {
+ data = c.diskCaches[1].getChunk(fid.Key)
+ if len(data) >= int(chunkSize) {
+ return data
+ }
+ }
+ {
+ data = c.diskCaches[2].getChunk(fid.Key)
+ if len(data) >= int(chunkSize) {
+ return data
+ }
+ }
+
+ return nil
+
+}
+
+func (c *ChunkCache) SetChunk(fileId string, data []byte) {
+ if c == nil {
+ return
+ }
+ c.Lock()
+ defer c.Unlock()
+
+ glog.V(4).Infof("SetChunk %s size %d\n", fileId, len(data))
+
+ c.doSetChunk(fileId, data)
+}
+
+func (c *ChunkCache) doSetChunk(fileId string, data []byte) {
+
+ if len(data) < memCacheSizeLimit {
+ c.memCache.SetChunk(fileId, data)
+ }
+
+ fid, err := needle.ParseFileIdFromString(fileId)
+ if err != nil {
+ glog.Errorf("failed to parse file id %s", fileId)
+ return
+ }
+
+ if len(data) < onDiskCacheSizeLimit0 {
+ c.diskCaches[0].setChunk(fid.Key, data)
+ } else if len(data) < onDiskCacheSizeLimit1 {
+ c.diskCaches[1].setChunk(fid.Key, data)
+ } else {
+ c.diskCaches[2].setChunk(fid.Key, data)
+ }
+
+}
+
+func (c *ChunkCache) Shutdown() {
+ if c == nil {
+ return
+ }
+ c.Lock()
+ defer c.Unlock()
+ for _, diskCache := range c.diskCaches {
+ diskCache.shutdown()
+ }
+}
diff --git a/weed/util/chunk_cache/chunk_cache_in_memory.go b/weed/util/chunk_cache/chunk_cache_in_memory.go
new file mode 100644
index 000000000..931e45e9a
--- /dev/null
+++ b/weed/util/chunk_cache/chunk_cache_in_memory.go
@@ -0,0 +1,36 @@
+package chunk_cache
+
+import (
+ "time"
+
+ "github.com/karlseguin/ccache"
+)
+
+// a global cache for recently accessed file chunks
+type ChunkCacheInMemory struct {
+ cache *ccache.Cache
+}
+
+func NewChunkCacheInMemory(maxEntries int64) *ChunkCacheInMemory {
+ pruneCount := maxEntries >> 3
+ if pruneCount <= 0 {
+ pruneCount = 500
+ }
+ return &ChunkCacheInMemory{
+ cache: ccache.New(ccache.Configure().MaxSize(maxEntries).ItemsToPrune(uint32(pruneCount))),
+ }
+}
+
+func (c *ChunkCacheInMemory) GetChunk(fileId string) []byte {
+ item := c.cache.Get(fileId)
+ if item == nil {
+ return nil
+ }
+ data := item.Value().([]byte)
+ item.Extend(time.Hour)
+ return data
+}
+
+func (c *ChunkCacheInMemory) SetChunk(fileId string, data []byte) {
+ c.cache.Set(fileId, data, time.Hour)
+}
diff --git a/weed/util/chunk_cache/chunk_cache_on_disk.go b/weed/util/chunk_cache/chunk_cache_on_disk.go
new file mode 100644
index 000000000..d74f87b0c
--- /dev/null
+++ b/weed/util/chunk_cache/chunk_cache_on_disk.go
@@ -0,0 +1,145 @@
+package chunk_cache
+
+import (
+ "fmt"
+ "os"
+ "time"
+
+ "github.com/syndtr/goleveldb/leveldb/opt"
+
+ "github.com/chrislusf/seaweedfs/weed/glog"
+ "github.com/chrislusf/seaweedfs/weed/storage"
+ "github.com/chrislusf/seaweedfs/weed/storage/backend"
+ "github.com/chrislusf/seaweedfs/weed/storage/types"
+ "github.com/chrislusf/seaweedfs/weed/util"
+)
+
+// This implements an on disk cache
+// The entries are an FIFO with a size limit
+
+type ChunkCacheVolume struct {
+ DataBackend backend.BackendStorageFile
+ nm storage.NeedleMapper
+ fileName string
+ smallBuffer []byte
+ sizeLimit int64
+ lastModTime time.Time
+ fileSize int64
+}
+
+func LoadOrCreateChunkCacheVolume(fileName string, preallocate int64) (*ChunkCacheVolume, error) {
+
+ v := &ChunkCacheVolume{
+ smallBuffer: make([]byte, types.NeedlePaddingSize),
+ fileName: fileName,
+ sizeLimit: preallocate,
+ }
+
+ var err error
+
+ if exists, canRead, canWrite, modTime, fileSize := util.CheckFile(v.fileName + ".dat"); exists {
+ if !canRead {
+ return nil, fmt.Errorf("cannot read cache file %s.dat", v.fileName)
+ }
+ if !canWrite {
+ return nil, fmt.Errorf("cannot write cache file %s.dat", v.fileName)
+ }
+ if dataFile, err := os.OpenFile(v.fileName+".dat", os.O_RDWR|os.O_CREATE, 0644); err != nil {
+ return nil, fmt.Errorf("cannot create cache file %s.dat: %v", v.fileName, err)
+ } else {
+ v.DataBackend = backend.NewDiskFile(dataFile)
+ v.lastModTime = modTime
+ v.fileSize = fileSize
+ }
+ } else {
+ if v.DataBackend, err = backend.CreateVolumeFile(v.fileName+".dat", preallocate, 0); err != nil {
+ return nil, fmt.Errorf("cannot create cache file %s.dat: %v", v.fileName, err)
+ }
+ v.lastModTime = time.Now()
+ }
+
+ var indexFile *os.File
+ if indexFile, err = os.OpenFile(v.fileName+".idx", os.O_RDWR|os.O_CREATE, 0644); err != nil {
+ return nil, fmt.Errorf("cannot write cache index %s.idx: %v", v.fileName, err)
+ }
+
+ glog.V(0).Infoln("loading leveldb", v.fileName+".ldb")
+ opts := &opt.Options{
+ BlockCacheCapacity: 2 * 1024 * 1024, // default value is 8MiB
+ WriteBuffer: 1 * 1024 * 1024, // default value is 4MiB
+ CompactionTableSizeMultiplier: 10, // default value is 1
+ }
+ if v.nm, err = storage.NewLevelDbNeedleMap(v.fileName+".ldb", indexFile, opts); err != nil {
+ return nil, fmt.Errorf("loading leveldb %s error: %v", v.fileName+".ldb", err)
+ }
+
+ return v, nil
+
+}
+
+func (v *ChunkCacheVolume) Shutdown() {
+ if v.DataBackend != nil {
+ v.DataBackend.Close()
+ v.DataBackend = nil
+ }
+ if v.nm != nil {
+ v.nm.Close()
+ v.nm = nil
+ }
+}
+
+func (v *ChunkCacheVolume) destroy() {
+ v.Shutdown()
+ os.Remove(v.fileName + ".dat")
+ os.Remove(v.fileName + ".idx")
+ os.RemoveAll(v.fileName + ".ldb")
+}
+
+func (v *ChunkCacheVolume) Reset() (*ChunkCacheVolume, error) {
+ v.destroy()
+ return LoadOrCreateChunkCacheVolume(v.fileName, v.sizeLimit)
+}
+
+func (v *ChunkCacheVolume) GetNeedle(key types.NeedleId) ([]byte, error) {
+
+ nv, ok := v.nm.Get(key)
+ if !ok {
+ return nil, storage.ErrorNotFound
+ }
+ data := make([]byte, nv.Size)
+ if readSize, readErr := v.DataBackend.ReadAt(data, nv.Offset.ToAcutalOffset()); readErr != nil {
+ return nil, fmt.Errorf("read %s.dat [%d,%d): %v",
+ v.fileName, nv.Offset.ToAcutalOffset(), nv.Offset.ToAcutalOffset()+int64(nv.Size), readErr)
+ } else {
+ if readSize != int(nv.Size) {
+ return nil, fmt.Errorf("read %d, expected %d", readSize, nv.Size)
+ }
+ }
+
+ return data, nil
+}
+
+func (v *ChunkCacheVolume) WriteNeedle(key types.NeedleId, data []byte) error {
+
+ offset := v.fileSize
+
+ written, err := v.DataBackend.WriteAt(data, offset)
+ if err != nil {
+ return err
+ } else if written != len(data) {
+ return fmt.Errorf("partial written %d, expected %d", written, len(data))
+ }
+
+ v.fileSize += int64(written)
+ extraSize := written % types.NeedlePaddingSize
+ if extraSize != 0 {
+ v.DataBackend.WriteAt(v.smallBuffer[:types.NeedlePaddingSize-extraSize], offset+int64(written))
+ v.fileSize += int64(types.NeedlePaddingSize - extraSize)
+ }
+
+ if err := v.nm.Put(key, types.ToOffset(offset), uint32(len(data))); err != nil {
+ return err
+ }
+
+ return nil
+}
diff --git a/weed/util/chunk_cache/chunk_cache_on_disk_test.go b/weed/util/chunk_cache/chunk_cache_on_disk_test.go
new file mode 100644
index 000000000..f061f2ba2
--- /dev/null
+++ b/weed/util/chunk_cache/chunk_cache_on_disk_test.go
@@ -0,0 +1,59 @@
+package chunk_cache
+
+import (
+ "bytes"
+ "fmt"
+ "io/ioutil"
+ "math/rand"
+ "os"
+ "testing"
+)
+
+func TestOnDisk(t *testing.T) {
+
+ tmpDir, _ := ioutil.TempDir("", "c")
+ defer os.RemoveAll(tmpDir)
+
+ totalDiskSizeMb := int64(32)
+
+ cache := NewChunkCache(0, tmpDir, totalDiskSizeMb)
+
+ writeCount := 5
+ type test_data struct {
+ data []byte
+ fileId string
+ size uint64
+ }
+ testData := make([]*test_data, writeCount)
+ for i := 0; i < writeCount; i++ {
+ buff := make([]byte, 1024*1024)
+ rand.Read(buff)
+ testData[i] = &test_data{
+ data: buff,
+ fileId: fmt.Sprintf("1,%daabbccdd", i+1),
+ size: uint64(len(buff)),
+ }
+ cache.SetChunk(testData[i].fileId, testData[i].data)
+ }
+
+ for i := 0; i < writeCount; i++ {
+ data := cache.GetChunk(testData[i].fileId, testData[i].size)
+ if bytes.Compare(data, testData[i].data) != 0 {
+ t.Errorf("failed to write to and read from cache: %d", i)
+ }
+ }
+
+ cache.Shutdown()
+
+ cache = NewChunkCache(0, tmpDir, totalDiskSizeMb)
+
+ for i := 0; i < writeCount; i++ {
+ data := cache.GetChunk(testData[i].fileId, testData[i].size)
+ if bytes.Compare(data, testData[i].data) != 0 {
+ t.Errorf("failed to write to and read from cache: %d", i)
+ }
+ }
+
+ cache.Shutdown()
+
+}
diff --git a/weed/util/chunk_cache/on_disk_cache_layer.go b/weed/util/chunk_cache/on_disk_cache_layer.go
new file mode 100644
index 000000000..c3192b548
--- /dev/null
+++ b/weed/util/chunk_cache/on_disk_cache_layer.go
@@ -0,0 +1,91 @@
+package chunk_cache
+
+import (
+ "fmt"
+ "path"
+ "sort"
+
+ "github.com/chrislusf/seaweedfs/weed/glog"
+ "github.com/chrislusf/seaweedfs/weed/storage"
+ "github.com/chrislusf/seaweedfs/weed/storage/types"
+)
+
+type OnDiskCacheLayer struct {
+ diskCaches []*ChunkCacheVolume
+}
+
+func NewOnDiskCacheLayer(dir, namePrefix string, diskSizeMB int64, segmentCount int) *OnDiskCacheLayer {
+
+ volumeCount, volumeSize := int(diskSizeMB/30000), int64(30000)
+ if volumeCount < segmentCount {
+ volumeCount, volumeSize = segmentCount, diskSizeMB/int64(segmentCount)
+ }
+
+ c := &OnDiskCacheLayer{}
+ for i := 0; i < volumeCount; i++ {
+ fileName := path.Join(dir, fmt.Sprintf("%s_%d", namePrefix, i))
+ diskCache, err := LoadOrCreateChunkCacheVolume(fileName, volumeSize*1024*1024)
+ if err != nil {
+ glog.Errorf("failed to add cache %s : %v", fileName, err)
+ } else {
+ c.diskCaches = append(c.diskCaches, diskCache)
+ }
+ }
+
+ // keep newest cache to the front
+ sort.Slice(c.diskCaches, func(i, j int) bool {
+ return c.diskCaches[i].lastModTime.After(c.diskCaches[j].lastModTime)
+ })
+
+ return c
+}
+
+func (c *OnDiskCacheLayer) setChunk(needleId types.NeedleId, data []byte) {
+
+ if c.diskCaches[0].fileSize+int64(len(data)) > c.diskCaches[0].sizeLimit {
+ t, resetErr := c.diskCaches[len(c.diskCaches)-1].Reset()
+ if resetErr != nil {
+ glog.Errorf("failed to reset cache file %s", c.diskCaches[len(c.diskCaches)-1].fileName)
+ return
+ }
+ for i := len(c.diskCaches) - 1; i > 0; i-- {
+ c.diskCaches[i] = c.diskCaches[i-1]
+ }
+ c.diskCaches[0] = t
+ }
+
+ if err := c.diskCaches[0].WriteNeedle(needleId, data); err != nil {
+ glog.V(0).Infof("cache write %v size %d: %v", needleId, len(data), err)
+ }
+
+}
+
+func (c *OnDiskCacheLayer) getChunk(needleId types.NeedleId) (data []byte) {
+
+ var err error
+
+ for _, diskCache := range c.diskCaches {
+ data, err = diskCache.GetNeedle(needleId)
+ if err == storage.ErrorNotFound {
+ continue
+ }
+ if err != nil {
+ glog.Errorf("failed to read cache file %s id %d", diskCache.fileName, needleId)
+ continue
+ }
+ if len(data) != 0 {
+ return
+ }
+ }
+
+ return nil
+
+}
+
+func (c *OnDiskCacheLayer) shutdown() {
+
+ for _, diskCache := range c.diskCaches {
+ diskCache.Shutdown()
+ }
+
+}
diff --git a/weed/util/cipher.go b/weed/util/cipher.go
new file mode 100644
index 000000000..f044c2ca3
--- /dev/null
+++ b/weed/util/cipher.go
@@ -0,0 +1,60 @@
+package util
+
+import (
+ "crypto/aes"
+ "crypto/cipher"
+ "crypto/rand"
+ "errors"
+ "io"
+
+ "github.com/chrislusf/seaweedfs/weed/glog"
+)
+
+type CipherKey []byte
+
+func GenCipherKey() CipherKey {
+ key := make([]byte, 32)
+ if _, err := io.ReadFull(rand.Reader, key); err != nil {
+ glog.Fatalf("random key gen: %v", err)
+ }
+ return CipherKey(key)
+}
+
+func Encrypt(plaintext []byte, key CipherKey) ([]byte, error) {
+ c, err := aes.NewCipher(key)
+ if err != nil {
+ return nil, err
+ }
+
+ gcm, err := cipher.NewGCM(c)
+ if err != nil {
+ return nil, err
+ }
+
+ nonce := make([]byte, gcm.NonceSize())
+ if _, err = io.ReadFull(rand.Reader, nonce); err != nil {
+ return nil, err
+ }
+
+ return gcm.Seal(nonce, nonce, plaintext, nil), nil
+}
+
+func Decrypt(ciphertext []byte, key CipherKey) ([]byte, error) {
+ c, err := aes.NewCipher(key)
+ if err != nil {
+ return nil, err
+ }
+
+ gcm, err := cipher.NewGCM(c)
+ if err != nil {
+ return nil, err
+ }
+
+ nonceSize := gcm.NonceSize()
+ if len(ciphertext) < nonceSize {
+ return nil, errors.New("ciphertext too short")
+ }
+
+ nonce, ciphertext := ciphertext[:nonceSize], ciphertext[nonceSize:]
+ return gcm.Open(nil, nonce, ciphertext, nil)
+}
diff --git a/weed/util/cipher_test.go b/weed/util/cipher_test.go
new file mode 100644
index 000000000..026c96ea3
--- /dev/null
+++ b/weed/util/cipher_test.go
@@ -0,0 +1,17 @@
+package util
+
+import (
+ "encoding/base64"
+ "testing"
+)
+
+func TestSameAsJavaImplementation(t *testing.T) {
+ str := "QVVhmqg112NMT7F+G/7QPynqSln3xPIhKdFGmTVKZD6IS0noyr2Z5kXFF6fPjZ/7Hq8kRhlmLeeqZUccxyaZHezOdgkjS6d4NTdHf5IjXzk7"
+ cipherText, _ := base64.StdEncoding.DecodeString(str)
+ secretKey := []byte("256-bit key for AES 256 GCM encr")
+ plantext, err := Decrypt(cipherText, CipherKey(secretKey))
+ if err != nil {
+ println(err.Error())
+ }
+ println(string(plantext))
+}
diff --git a/weed/util/compression.go b/weed/util/compression.go
new file mode 100644
index 000000000..b526f47c9
--- /dev/null
+++ b/weed/util/compression.go
@@ -0,0 +1,126 @@
+package util
+
+import (
+ "bytes"
+ "compress/flate"
+ "compress/gzip"
+ "fmt"
+ "io/ioutil"
+ "strings"
+
+ "github.com/chrislusf/seaweedfs/weed/glog"
+ "github.com/klauspost/compress/zstd"
+)
+
+func GzipData(input []byte) ([]byte, error) {
+ buf := new(bytes.Buffer)
+ w, _ := gzip.NewWriterLevel(buf, flate.BestSpeed)
+ if _, err := w.Write(input); err != nil {
+ glog.V(2).Infoln("error compressing data:", err)
+ return nil, err
+ }
+ if err := w.Close(); err != nil {
+ glog.V(2).Infoln("error closing compressed data:", err)
+ return nil, err
+ }
+ return buf.Bytes(), nil
+}
+
+var zstdEncoder, _ = zstd.NewWriter(nil)
+
+func ZstdData(input []byte) ([]byte, error) {
+ return zstdEncoder.EncodeAll(input, nil), nil
+}
+
+func DecompressData(input []byte) ([]byte, error) {
+ if IsGzippedContent(input) {
+ return ungzipData(input)
+ }
+ if IsZstdContent(input) {
+ return unzstdData(input)
+ }
+ return nil, fmt.Errorf("unsupported compression")
+}
+
+func ungzipData(input []byte) ([]byte, error) {
+ buf := bytes.NewBuffer(input)
+ r, _ := gzip.NewReader(buf)
+ defer r.Close()
+ output, err := ioutil.ReadAll(r)
+ if err != nil {
+ glog.V(2).Infoln("error uncompressing data:", err)
+ }
+ return output, err
+}
+
+var decoder, _ = zstd.NewReader(nil)
+func unzstdData(input []byte) ([]byte, error) {
+ return decoder.DecodeAll(input, nil)
+}
+
+func IsGzippedContent(data []byte) bool {
+ if len(data) < 2 {
+ return false
+ }
+ return data[0] == 31 && data[1] == 139
+}
+
+func IsZstdContent(data []byte) bool {
+ if len(data) < 4 {
+ return false
+ }
+ return data[3] == 0xFD && data[2] == 0x2F && data[1] == 0xB5 && data[0] == 0x28
+}
+
+/*
+* Default not to compressed since compression can be done on client side.
+ */func IsCompressableFileType(ext, mtype string) (shouldBeCompressed, iAmSure bool) {
+
+ // text
+ if strings.HasPrefix(mtype, "text/") {
+ return true, true
+ }
+
+ // images
+ switch ext {
+ case ".svg", ".bmp", ".wav":
+ return true, true
+ }
+ if strings.HasPrefix(mtype, "image/") {
+ return false, true
+ }
+
+ // by file name extension
+ switch ext {
+ case ".zip", ".rar", ".gz", ".bz2", ".xz", ".zst":
+ return false, true
+ case ".pdf", ".txt", ".html", ".htm", ".css", ".js", ".json":
+ return true, true
+ case ".php", ".java", ".go", ".rb", ".c", ".cpp", ".h", ".hpp":
+ return true, true
+ case ".png", ".jpg", ".jpeg":
+ return false, true
+ }
+
+ // by mime type
+ if strings.HasPrefix(mtype, "application/") {
+ if strings.HasSuffix(mtype, "zstd") {
+ return false, true
+ }
+ if strings.HasSuffix(mtype, "xml") {
+ return true, true
+ }
+ if strings.HasSuffix(mtype, "script") {
+ return true, true
+ }
+ }
+
+ if strings.HasPrefix(mtype, "audio/") {
+ switch strings.TrimPrefix(mtype, "audio/") {
+ case "wave", "wav", "x-wav", "x-pn-wav":
+ return true, true
+ }
+ }
+
+ return false, false
+}
diff --git a/weed/util/compression_test.go b/weed/util/compression_test.go
new file mode 100644
index 000000000..b515e8988
--- /dev/null
+++ b/weed/util/compression_test.go
@@ -0,0 +1,21 @@
+package util
+
+import (
+ "testing"
+
+ "golang.org/x/tools/godoc/util"
+)
+
+func TestIsGzippable(t *testing.T) {
+ buf := make([]byte, 1024)
+
+ isText := util.IsText(buf)
+
+ if isText {
+ t.Error("buf with zeros are not text")
+ }
+
+ compressed, _ := GzipData(buf)
+
+ t.Logf("compressed size %d\n", len(compressed))
+}
diff --git a/weed/util/config.go b/weed/util/config.go
index 77cab3019..7b6e92f08 100644
--- a/weed/util/config.go
+++ b/weed/util/config.go
@@ -1,10 +1,51 @@
package util
+import (
+ "strings"
+
+ "github.com/spf13/viper"
+
+ "github.com/chrislusf/seaweedfs/weed/glog"
+)
+
type Configuration interface {
GetString(key string) string
GetBool(key string) bool
GetInt(key string) int
- GetInt64(key string) int64
- GetFloat64(key string) float64
GetStringSlice(key string) []string
+ SetDefault(key string, value interface{})
+}
+
+func LoadConfiguration(configFileName string, required bool) (loaded bool) {
+
+ // find a filer store
+ viper.SetConfigName(configFileName) // name of config file (without extension)
+ viper.AddConfigPath(".") // optionally look for config in the working directory
+ viper.AddConfigPath("$HOME/.seaweedfs") // call multiple times to add many search paths
+ viper.AddConfigPath("/etc/seaweedfs/") // path to look for the config file in
+
+ glog.V(1).Infof("Reading %s.toml from %s", configFileName, viper.ConfigFileUsed())
+
+ if err := viper.MergeInConfig(); err != nil { // Handle errors reading the config file
+ glog.V(1).Infof("Reading %s: %v", viper.ConfigFileUsed(), err)
+ if required {
+ glog.Fatalf("Failed to load %s.toml file from current directory, or $HOME/.seaweedfs/, or /etc/seaweedfs/"+
+ "\n\nPlease use this command to generate the default %s.toml file\n"+
+ " weed scaffold -config=%s -output=.\n\n\n",
+ configFileName, configFileName, configFileName)
+ } else {
+ return false
+ }
+ }
+
+ return true
+}
+
+func GetViper() *viper.Viper {
+ v := &viper.Viper{}
+ *v = *viper.GetViper()
+ v.AutomaticEnv()
+ v.SetEnvPrefix("weed")
+ v.SetEnvKeyReplacer(strings.NewReplacer(".", "_"))
+ return v
}
diff --git a/weed/util/constants.go b/weed/util/constants.go
index 9ddf07261..3433c550b 100644
--- a/weed/util/constants.go
+++ b/weed/util/constants.go
@@ -1,5 +1,14 @@
package util
-const (
- VERSION = "1.23"
+import (
+ "fmt"
)
+
+var (
+ VERSION = fmt.Sprintf("%s %d.%d", sizeLimit, 1, 84)
+ COMMIT = ""
+)
+
+func Version() string {
+ return VERSION + " " + COMMIT
+}
diff --git a/weed/util/constants_4bytes.go b/weed/util/constants_4bytes.go
new file mode 100644
index 000000000..a29d9d3b0
--- /dev/null
+++ b/weed/util/constants_4bytes.go
@@ -0,0 +1,8 @@
+// +build !5BytesOffset
+
+package util
+
+const (
+ sizeLimit = "30GB"
+ VolumeSizeLimitGB = 30
+)
diff --git a/weed/util/constants_5bytes.go b/weed/util/constants_5bytes.go
new file mode 100644
index 000000000..91ce4066f
--- /dev/null
+++ b/weed/util/constants_5bytes.go
@@ -0,0 +1,8 @@
+// +build 5BytesOffset
+
+package util
+
+const (
+ sizeLimit = "8000GB"
+ VolumeSizeLimitGB = 8000
+)
diff --git a/weed/util/file_util.go b/weed/util/file_util.go
index 8ff2978ba..ff725830b 100644
--- a/weed/util/file_util.go
+++ b/weed/util/file_util.go
@@ -3,6 +3,7 @@ package util
import (
"errors"
"os"
+ "time"
"github.com/chrislusf/seaweedfs/weed/glog"
)
@@ -30,3 +31,35 @@ func GetFileSize(file *os.File) (size int64, err error) {
}
return
}
+
+func FileExists(filename string) bool {
+
+ _, err := os.Stat(filename)
+ if os.IsNotExist(err) {
+ return false
+ }
+ return true
+
+}
+
+func CheckFile(filename string) (exists, canRead, canWrite bool, modTime time.Time, fileSize int64) {
+ exists = true
+ fi, err := os.Stat(filename)
+ if os.IsNotExist(err) {
+ exists = false
+ return
+ }
+ if err != nil {
+ glog.Errorf("check %s: %v", filename, err)
+ return
+ }
+ if fi.Mode()&0400 != 0 {
+ canRead = true
+ }
+ if fi.Mode()&0200 != 0 {
+ canWrite = true
+ }
+ modTime = fi.ModTime()
+ fileSize = fi.Size()
+ return
+}
diff --git a/weed/util/file_util_non_posix.go b/weed/util/file_util_non_posix.go
new file mode 100644
index 000000000..ffcfef6d5
--- /dev/null
+++ b/weed/util/file_util_non_posix.go
@@ -0,0 +1,12 @@
+// +build linux darwin freebsd netbsd openbsd plan9 solaris zos
+
+package util
+
+import (
+ "os"
+ "syscall"
+)
+
+func GetFileUidGid(fi os.FileInfo) (uid, gid uint32) {
+ return fi.Sys().(*syscall.Stat_t).Uid, fi.Sys().(*syscall.Stat_t).Gid
+}
diff --git a/weed/util/file_util_posix.go b/weed/util/file_util_posix.go
new file mode 100644
index 000000000..22ca60b3b
--- /dev/null
+++ b/weed/util/file_util_posix.go
@@ -0,0 +1,11 @@
+// +build windows
+
+package util
+
+import (
+ "os"
+)
+
+func GetFileUidGid(fi os.FileInfo) (uid, gid uint32) {
+ return 0, 0
+}
diff --git a/weed/util/fullpath.go b/weed/util/fullpath.go
new file mode 100644
index 000000000..4ce8a2f90
--- /dev/null
+++ b/weed/util/fullpath.go
@@ -0,0 +1,56 @@
+package util
+
+import (
+ "path/filepath"
+ "strings"
+)
+
+type FullPath string
+
+func NewFullPath(dir, name string) FullPath {
+ return FullPath(dir).Child(name)
+}
+
+func (fp FullPath) DirAndName() (string, string) {
+ dir, name := filepath.Split(string(fp))
+ if dir == "/" {
+ return dir, name
+ }
+ if len(dir) < 1 {
+ return "/", ""
+ }
+ return dir[:len(dir)-1], name
+}
+
+func (fp FullPath) Name() string {
+ _, name := filepath.Split(string(fp))
+ return name
+}
+
+func (fp FullPath) Child(name string) FullPath {
+ dir := string(fp)
+ if strings.HasSuffix(dir, "/") {
+ return FullPath(dir + name)
+ }
+ return FullPath(dir + "/" + name)
+}
+
+func (fp FullPath) AsInode() uint64 {
+ return uint64(HashStringToLong(string(fp)))
+}
+
+// split, but skipping the root
+func (fp FullPath) Split() []string {
+ if fp == "" || fp == "/" {
+ return []string{}
+ }
+ return strings.Split(string(fp)[1:], "/")
+}
+
+func Join(names ...string) string {
+ return filepath.ToSlash(filepath.Join(names...))
+}
+
+func JoinPath(names ...string) FullPath {
+ return FullPath(Join(names...))
+}
diff --git a/weed/util/pprof.go b/weed/util/grace/pprof.go
index a2621ceee..14686bfc8 100644
--- a/weed/util/pprof.go
+++ b/weed/util/grace/pprof.go
@@ -1,4 +1,4 @@
-package util
+package grace
import (
"os"
diff --git a/weed/util/signal_handling.go b/weed/util/grace/signal_handling.go
index 99447e8be..7cca46764 100644
--- a/weed/util/signal_handling.go
+++ b/weed/util/grace/signal_handling.go
@@ -1,6 +1,6 @@
// +build !plan9
-package util
+package grace
import (
"os"
diff --git a/weed/util/signal_handling_notsupported.go b/weed/util/grace/signal_handling_notsupported.go
index c389cfb7e..5335915a1 100644
--- a/weed/util/signal_handling_notsupported.go
+++ b/weed/util/grace/signal_handling_notsupported.go
@@ -1,6 +1,6 @@
// +build plan9
-package util
+package grace
func OnInterrupt(fn func()) {
}
diff --git a/weed/util/grpc_client_server.go b/weed/util/grpc_client_server.go
deleted file mode 100644
index d029d21ae..000000000
--- a/weed/util/grpc_client_server.go
+++ /dev/null
@@ -1,87 +0,0 @@
-package util
-
-import (
- "fmt"
- "strconv"
- "strings"
- "sync"
- "time"
-
- "google.golang.org/grpc"
- "google.golang.org/grpc/keepalive"
-)
-
-var (
- // cache grpc connections
- grpcClients = make(map[string]*grpc.ClientConn)
- grpcClientsLock sync.Mutex
-)
-
-func NewGrpcServer() *grpc.Server {
- return grpc.NewServer(grpc.KeepaliveParams(keepalive.ServerParameters{
- Time: 10 * time.Second, // wait time before ping if no activity
- Timeout: 20 * time.Second, // ping timeout
- }), grpc.KeepaliveEnforcementPolicy(keepalive.EnforcementPolicy{
- MinTime: 60 * time.Second, // min time a client should wait before sending a ping
- }))
-}
-
-func GrpcDial(address string, opts ...grpc.DialOption) (*grpc.ClientConn, error) {
- // opts = append(opts, grpc.WithBlock())
- // opts = append(opts, grpc.WithTimeout(time.Duration(5*time.Second)))
- opts = append(opts, grpc.WithInsecure())
- opts = append(opts, grpc.WithKeepaliveParams(keepalive.ClientParameters{
- Time: 30 * time.Second, // client ping server if no activity for this long
- Timeout: 20 * time.Second,
- }))
-
- return grpc.Dial(address, opts...)
-}
-
-func WithCachedGrpcClient(fn func(*grpc.ClientConn) error, address string, opts ...grpc.DialOption) error {
-
- grpcClientsLock.Lock()
-
- existingConnection, found := grpcClients[address]
- if found {
- grpcClientsLock.Unlock()
- return fn(existingConnection)
- }
-
- grpcConnection, err := GrpcDial(address, opts...)
- if err != nil {
- grpcClientsLock.Unlock()
- return fmt.Errorf("fail to dial %s: %v", address, err)
- }
-
- grpcClients[address] = grpcConnection
- grpcClientsLock.Unlock()
-
- err = fn(grpcConnection)
- if err != nil {
- grpcClientsLock.Lock()
- delete(grpcClients, address)
- grpcClientsLock.Unlock()
- }
-
- return err
-}
-
-func ParseServerToGrpcAddress(server string, optionalGrpcPort int) (serverGrpcAddress string, err error) {
- hostnameAndPort := strings.Split(server, ":")
- if len(hostnameAndPort) != 2 {
- return "", fmt.Errorf("The server should have hostname:port format: %v", hostnameAndPort)
- }
-
- filerPort, parseErr := strconv.ParseUint(hostnameAndPort[1], 10, 64)
- if parseErr != nil {
- return "", fmt.Errorf("The server port parse error: %v", parseErr)
- }
-
- filerGrpcPort := int(filerPort) + 10000
- if optionalGrpcPort != 0 {
- filerGrpcPort = optionalGrpcPort
- }
-
- return fmt.Sprintf("%s:%d", hostnameAndPort[0], filerGrpcPort), nil
-}
diff --git a/weed/util/http_util.go b/weed/util/http_util.go
index 21e0a678d..c67eb3276 100644
--- a/weed/util/http_util.go
+++ b/weed/util/http_util.go
@@ -11,9 +11,8 @@ import (
"net/http"
"net/url"
"strings"
- "time"
- "github.com/chrislusf/seaweedfs/weed/security"
+ "github.com/chrislusf/seaweedfs/weed/glog"
)
var (
@@ -27,23 +26,22 @@ func init() {
}
client = &http.Client{
Transport: Transport,
- Timeout: 5 * time.Second,
}
}
func PostBytes(url string, body []byte) ([]byte, error) {
- r, err := client.Post(url, "application/octet-stream", bytes.NewReader(body))
+ r, err := client.Post(url, "", bytes.NewReader(body))
if err != nil {
return nil, fmt.Errorf("Post to %s: %v", url, err)
}
defer r.Body.Close()
- if r.StatusCode >= 400 {
- return nil, fmt.Errorf("%s: %s", url, r.Status)
- }
b, err := ioutil.ReadAll(r.Body)
if err != nil {
return nil, fmt.Errorf("Read response body: %v", err)
}
+ if r.StatusCode >= 400 {
+ return nil, fmt.Errorf("%s: %s", url, r.Status)
+ }
return b, nil
}
@@ -90,14 +88,14 @@ func Head(url string) (http.Header, error) {
if err != nil {
return nil, err
}
- defer r.Body.Close()
+ defer CloseResponse(r)
if r.StatusCode >= 400 {
return nil, fmt.Errorf("%s: %s", url, r.Status)
}
return r.Header, nil
}
-func Delete(url string, jwt security.EncodedJwt) error {
+func Delete(url string, jwt string) error {
req, err := http.NewRequest("DELETE", url, nil)
if jwt != "" {
req.Header.Set("Authorization", "BEARER "+string(jwt))
@@ -119,7 +117,7 @@ func Delete(url string, jwt security.EncodedJwt) error {
return nil
}
m := make(map[string]interface{})
- if e := json.Unmarshal(body, m); e == nil {
+ if e := json.Unmarshal(body, &m); e == nil {
if s, ok := m["error"].(string); ok {
return errors.New(s)
}
@@ -132,7 +130,7 @@ func GetBufferStream(url string, values url.Values, allocatedBytes []byte, eachB
if err != nil {
return err
}
- defer r.Body.Close()
+ defer CloseResponse(r)
if r.StatusCode != 200 {
return fmt.Errorf("%s: %s", url, r.Status)
}
@@ -155,7 +153,7 @@ func GetUrlStream(url string, values url.Values, readFn func(io.Reader) error) e
if err != nil {
return err
}
- defer r.Body.Close()
+ defer CloseResponse(r)
if r.StatusCode != 200 {
return fmt.Errorf("%s: %s", url, r.Status)
}
@@ -191,11 +189,22 @@ func NormalizeUrl(url string) string {
return "http://" + url
}
-func ReadUrl(fileUrl string, offset int64, size int, buf []byte, isReadRange bool) (n int64, e error) {
+func ReadUrl(fileUrl string, cipherKey []byte, isContentCompressed bool, isFullChunk bool, offset int64, size int, buf []byte) (int64, error) {
+
+ if cipherKey != nil {
+ var n int
+ err := readEncryptedUrl(fileUrl, cipherKey, isContentCompressed, isFullChunk, offset, size, func(data []byte) {
+ n = copy(buf, data)
+ })
+ return int64(n), err
+ }
- req, _ := http.NewRequest("GET", fileUrl, nil)
- if isReadRange {
- req.Header.Add("Range", fmt.Sprintf("bytes=%d-%d", offset, offset+int64(size)))
+ req, err := http.NewRequest("GET", fileUrl, nil)
+ if err != nil {
+ return 0, err
+ }
+ if !isFullChunk {
+ req.Header.Add("Range", fmt.Sprintf("bytes=%d-%d", offset, offset+int64(size)-1))
} else {
req.Header.Set("Accept-Encoding", "gzip")
}
@@ -211,7 +220,8 @@ func ReadUrl(fileUrl string, offset int64, size int, buf []byte, isReadRange boo
}
var reader io.ReadCloser
- switch r.Header.Get("Content-Encoding") {
+ contentEncoding := r.Header.Get("Content-Encoding")
+ switch contentEncoding {
case "gzip":
reader, err = gzip.NewReader(r.Body)
defer reader.Close()
@@ -219,55 +229,125 @@ func ReadUrl(fileUrl string, offset int64, size int, buf []byte, isReadRange boo
reader = r.Body
}
- var i, m int
+ var (
+ i, m int
+ n int64
+ )
+ // refers to https://github.com/golang/go/blob/master/src/bytes/buffer.go#L199
+ // commit id c170b14c2c1cfb2fd853a37add92a82fd6eb4318
for {
m, err = reader.Read(buf[i:])
- if m == 0 {
- return
- }
i += m
n += int64(m)
if err == io.EOF {
return n, nil
}
- if e != nil {
- return n, e
+ if err != nil {
+ return n, err
+ }
+ if n == int64(len(buf)) {
+ break
}
}
-
+ // drains the response body to avoid memory leak
+ data, _ := ioutil.ReadAll(reader)
+ if len(data) != 0 {
+ glog.V(1).Infof("%s reader has remaining %d bytes", contentEncoding, len(data))
+ }
+ return n, err
}
-func ReadUrlAsStream(fileUrl string, offset int64, size int, fn func(data []byte)) (n int64, e error) {
+func ReadUrlAsStream(fileUrl string, cipherKey []byte, isContentGzipped bool, isFullChunk bool, offset int64, size int, fn func(data []byte)) error {
+
+ if cipherKey != nil {
+ return readEncryptedUrl(fileUrl, cipherKey, isContentGzipped, isFullChunk, offset, size, fn)
+ }
+
+ req, err := http.NewRequest("GET", fileUrl, nil)
+ if err != nil {
+ return err
+ }
- req, _ := http.NewRequest("GET", fileUrl, nil)
- req.Header.Add("Range", fmt.Sprintf("bytes=%d-%d", offset, offset+int64(size)))
+ if !isFullChunk {
+ req.Header.Add("Range", fmt.Sprintf("bytes=%d-%d", offset, offset+int64(size)-1))
+ }
r, err := client.Do(req)
if err != nil {
- return 0, err
+ return err
}
- defer r.Body.Close()
+ defer CloseResponse(r)
if r.StatusCode >= 400 {
- return 0, fmt.Errorf("%s: %s", fileUrl, r.Status)
+ return fmt.Errorf("%s: %s", fileUrl, r.Status)
}
- var m int
+ var (
+ m int
+ )
buf := make([]byte, 64*1024)
for {
m, err = r.Body.Read(buf)
- if m == 0 {
- return
- }
fn(buf[:m])
- n += int64(m)
if err == io.EOF {
- return n, nil
+ return nil
+ }
+ if err != nil {
+ return err
}
- if e != nil {
- return n, e
+ }
+
+}
+
+func readEncryptedUrl(fileUrl string, cipherKey []byte, isContentCompressed bool, isFullChunk bool, offset int64, size int, fn func(data []byte)) error {
+ encryptedData, err := Get(fileUrl)
+ if err != nil {
+ return fmt.Errorf("fetch %s: %v", fileUrl, err)
+ }
+ decryptedData, err := Decrypt(encryptedData, CipherKey(cipherKey))
+ if err != nil {
+ return fmt.Errorf("decrypt %s: %v", fileUrl, err)
+ }
+ if isContentCompressed {
+ decryptedData, err = DecompressData(decryptedData)
+ if err != nil {
+ return fmt.Errorf("unzip decrypt %s: %v", fileUrl, err)
}
}
+ if len(decryptedData) < int(offset)+size {
+ return fmt.Errorf("read decrypted %s size %d [%d, %d)", fileUrl, len(decryptedData), offset, int(offset)+size)
+ }
+ if isFullChunk {
+ fn(decryptedData)
+ } else {
+ fn(decryptedData[int(offset) : int(offset)+size])
+ }
+ return nil
+}
+
+func ReadUrlAsReaderCloser(fileUrl string, rangeHeader string) (io.ReadCloser, error) {
+
+ req, err := http.NewRequest("GET", fileUrl, nil)
+ if err != nil {
+ return nil, err
+ }
+ if rangeHeader != "" {
+ req.Header.Add("Range", rangeHeader)
+ }
+
+ r, err := client.Do(req)
+ if err != nil {
+ return nil, err
+ }
+ if r.StatusCode >= 400 {
+ return nil, fmt.Errorf("%s: %s", fileUrl, r.Status)
+ }
+
+ return r.Body, nil
+}
+func CloseResponse(resp *http.Response) {
+ io.Copy(ioutil.Discard, resp.Body)
+ resp.Body.Close()
}
diff --git a/weed/util/httpdown/http_down.go b/weed/util/httpdown/http_down.go
new file mode 100644
index 000000000..5cbd9611c
--- /dev/null
+++ b/weed/util/httpdown/http_down.go
@@ -0,0 +1,395 @@
+// Package httpdown provides http.ConnState enabled graceful termination of
+// http.Server.
+// based on github.com/facebookarchive/httpdown, who's licence is MIT-licence,
+// we add a feature of supporting for http TLS
+package httpdown
+
+import (
+ "crypto/tls"
+ "fmt"
+ "net"
+ "net/http"
+ "os"
+ "os/signal"
+ "sync"
+ "syscall"
+ "time"
+
+ "github.com/facebookgo/clock"
+ "github.com/facebookgo/stats"
+)
+
+const (
+ defaultStopTimeout = time.Minute
+ defaultKillTimeout = time.Minute
+)
+
+// A Server allows encapsulates the process of accepting new connections and
+// serving them, and gracefully shutting down the listener without dropping
+// active connections.
+type Server interface {
+ // Wait waits for the serving loop to finish. This will happen when Stop is
+ // called, at which point it returns no error, or if there is an error in the
+ // serving loop. You must call Wait after calling Serve or ListenAndServe.
+ Wait() error
+
+ // Stop stops the listener. It will block until all connections have been
+ // closed.
+ Stop() error
+}
+
+// HTTP defines the configuration for serving a http.Server. Multiple calls to
+// Serve or ListenAndServe can be made on the same HTTP instance. The default
+// timeouts of 1 minute each result in a maximum of 2 minutes before a Stop()
+// returns.
+type HTTP struct {
+ // StopTimeout is the duration before we begin force closing connections.
+ // Defaults to 1 minute.
+ StopTimeout time.Duration
+
+ // KillTimeout is the duration before which we completely give up and abort
+ // even though we still have connected clients. This is useful when a large
+ // number of client connections exist and closing them can take a long time.
+ // Note, this is in addition to the StopTimeout. Defaults to 1 minute.
+ KillTimeout time.Duration
+
+ // Stats is optional. If provided, it will be used to record various metrics.
+ Stats stats.Client
+
+ // Clock allows for testing timing related functionality. Do not specify this
+ // in production code.
+ Clock clock.Clock
+
+ // when set CertFile and KeyFile, the httpDown will start a http with TLS.
+ // Files containing a certificate and matching private key for the
+ // server must be provided if neither the Server's
+ // TLSConfig.Certificates nor TLSConfig.GetCertificate are populated.
+ // If the certificate is signed by a certificate authority, the
+ // certFile should be the concatenation of the server's certificate,
+ // any intermediates, and the CA's certificate.
+ CertFile, KeyFile string
+}
+
+// Serve provides the low-level API which is useful if you're creating your own
+// net.Listener.
+func (h HTTP) Serve(s *http.Server, l net.Listener) Server {
+ stopTimeout := h.StopTimeout
+ if stopTimeout == 0 {
+ stopTimeout = defaultStopTimeout
+ }
+ killTimeout := h.KillTimeout
+ if killTimeout == 0 {
+ killTimeout = defaultKillTimeout
+ }
+ klock := h.Clock
+ if klock == nil {
+ klock = clock.New()
+ }
+
+ ss := &server{
+ stopTimeout: stopTimeout,
+ killTimeout: killTimeout,
+ stats: h.Stats,
+ clock: klock,
+ oldConnState: s.ConnState,
+ listener: l,
+ server: s,
+ serveDone: make(chan struct{}),
+ serveErr: make(chan error, 1),
+ new: make(chan net.Conn),
+ active: make(chan net.Conn),
+ idle: make(chan net.Conn),
+ closed: make(chan net.Conn),
+ stop: make(chan chan struct{}),
+ kill: make(chan chan struct{}),
+ certFile: h.CertFile,
+ keyFile: h.KeyFile,
+ }
+ s.ConnState = ss.connState
+ go ss.manage()
+ go ss.serve()
+ return ss
+}
+
+// ListenAndServe returns a Server for the given http.Server. It is equivalent
+// to ListenAndServe from the standard library, but returns immediately.
+// Requests will be accepted in a background goroutine. If the http.Server has
+// a non-nil TLSConfig, a TLS enabled listener will be setup.
+func (h HTTP) ListenAndServe(s *http.Server) (Server, error) {
+ addr := s.Addr
+ if addr == "" {
+ if s.TLSConfig == nil {
+ addr = ":http"
+ } else {
+ addr = ":https"
+ }
+ }
+ l, err := net.Listen("tcp", addr)
+ if err != nil {
+ stats.BumpSum(h.Stats, "listen.error", 1)
+ return nil, err
+ }
+ if s.TLSConfig != nil {
+ l = tls.NewListener(l, s.TLSConfig)
+ }
+ return h.Serve(s, l), nil
+}
+
+// server manages the serving process and allows for gracefully stopping it.
+type server struct {
+ stopTimeout time.Duration
+ killTimeout time.Duration
+ stats stats.Client
+ clock clock.Clock
+
+ oldConnState func(net.Conn, http.ConnState)
+ server *http.Server
+ serveDone chan struct{}
+ serveErr chan error
+ listener net.Listener
+
+ new chan net.Conn
+ active chan net.Conn
+ idle chan net.Conn
+ closed chan net.Conn
+ stop chan chan struct{}
+ kill chan chan struct{}
+
+ stopOnce sync.Once
+ stopErr error
+
+ certFile, keyFile string
+}
+
+func (s *server) connState(c net.Conn, cs http.ConnState) {
+ if s.oldConnState != nil {
+ s.oldConnState(c, cs)
+ }
+
+ switch cs {
+ case http.StateNew:
+ s.new <- c
+ case http.StateActive:
+ s.active <- c
+ case http.StateIdle:
+ s.idle <- c
+ case http.StateHijacked, http.StateClosed:
+ s.closed <- c
+ }
+}
+
+func (s *server) manage() {
+ defer func() {
+ close(s.new)
+ close(s.active)
+ close(s.idle)
+ close(s.closed)
+ close(s.stop)
+ close(s.kill)
+ }()
+
+ var stopDone chan struct{}
+
+ conns := map[net.Conn]http.ConnState{}
+ var countNew, countActive, countIdle float64
+
+ // decConn decrements the count associated with the current state of the
+ // given connection.
+ decConn := func(c net.Conn) {
+ switch conns[c] {
+ default:
+ panic(fmt.Errorf("unknown existing connection: %s", c))
+ case http.StateNew:
+ countNew--
+ case http.StateActive:
+ countActive--
+ case http.StateIdle:
+ countIdle--
+ }
+ }
+
+ // setup a ticker to report various values every minute. if we don't have a
+ // Stats implementation provided, we Stop it so it never ticks.
+ statsTicker := s.clock.Ticker(time.Minute)
+ if s.stats == nil {
+ statsTicker.Stop()
+ }
+
+ for {
+ select {
+ case <-statsTicker.C:
+ // we'll only get here when s.stats is not nil
+ s.stats.BumpAvg("http-state.new", countNew)
+ s.stats.BumpAvg("http-state.active", countActive)
+ s.stats.BumpAvg("http-state.idle", countIdle)
+ s.stats.BumpAvg("http-state.total", countNew+countActive+countIdle)
+ case c := <-s.new:
+ conns[c] = http.StateNew
+ countNew++
+ case c := <-s.active:
+ decConn(c)
+ countActive++
+
+ conns[c] = http.StateActive
+ case c := <-s.idle:
+ decConn(c)
+ countIdle++
+
+ conns[c] = http.StateIdle
+
+ // if we're already stopping, close it
+ if stopDone != nil {
+ c.Close()
+ }
+ case c := <-s.closed:
+ stats.BumpSum(s.stats, "conn.closed", 1)
+ decConn(c)
+ delete(conns, c)
+
+ // if we're waiting to stop and are all empty, we just closed the last
+ // connection and we're done.
+ if stopDone != nil && len(conns) == 0 {
+ close(stopDone)
+ return
+ }
+ case stopDone = <-s.stop:
+ // if we're already all empty, we're already done
+ if len(conns) == 0 {
+ close(stopDone)
+ return
+ }
+
+ // close current idle connections right away
+ for c, cs := range conns {
+ if cs == http.StateIdle {
+ c.Close()
+ }
+ }
+
+ // continue the loop and wait for all the ConnState updates which will
+ // eventually close(stopDone) and return from this goroutine.
+
+ case killDone := <-s.kill:
+ // force close all connections
+ stats.BumpSum(s.stats, "kill.conn.count", float64(len(conns)))
+ for c := range conns {
+ c.Close()
+ }
+
+ // don't block the kill.
+ close(killDone)
+
+ // continue the loop and we wait for all the ConnState updates and will
+ // return from this goroutine when we're all done. otherwise we'll try to
+ // send those ConnState updates on closed channels.
+
+ }
+ }
+}
+
+func (s *server) serve() {
+ stats.BumpSum(s.stats, "serve", 1)
+ if s.certFile == "" && s.keyFile == "" {
+ s.serveErr <- s.server.Serve(s.listener)
+ } else {
+ s.serveErr <- s.server.ServeTLS(s.listener, s.certFile, s.keyFile)
+ }
+ close(s.serveDone)
+ close(s.serveErr)
+}
+
+func (s *server) Wait() error {
+ if err := <-s.serveErr; !isUseOfClosedError(err) {
+ return err
+ }
+ return nil
+}
+
+func (s *server) Stop() error {
+ s.stopOnce.Do(func() {
+ defer stats.BumpTime(s.stats, "stop.time").End()
+ stats.BumpSum(s.stats, "stop", 1)
+
+ // first disable keep-alive for new connections
+ s.server.SetKeepAlivesEnabled(false)
+
+ // then close the listener so new connections can't connect come thru
+ closeErr := s.listener.Close()
+ <-s.serveDone
+
+ // then trigger the background goroutine to stop and wait for it
+ stopDone := make(chan struct{})
+ s.stop <- stopDone
+
+ // wait for stop
+ select {
+ case <-stopDone:
+ case <-s.clock.After(s.stopTimeout):
+ defer stats.BumpTime(s.stats, "kill.time").End()
+ stats.BumpSum(s.stats, "kill", 1)
+
+ // stop timed out, wait for kill
+ killDone := make(chan struct{})
+ s.kill <- killDone
+ select {
+ case <-killDone:
+ case <-s.clock.After(s.killTimeout):
+ // kill timed out, give up
+ stats.BumpSum(s.stats, "kill.timeout", 1)
+ }
+ }
+
+ if closeErr != nil && !isUseOfClosedError(closeErr) {
+ stats.BumpSum(s.stats, "listener.close.error", 1)
+ s.stopErr = closeErr
+ }
+ })
+ return s.stopErr
+}
+
+func isUseOfClosedError(err error) bool {
+ if err == nil {
+ return false
+ }
+ if opErr, ok := err.(*net.OpError); ok {
+ err = opErr.Err
+ }
+ return err.Error() == "use of closed network connection"
+}
+
+// ListenAndServe is a convenience function to serve and wait for a SIGTERM
+// or SIGINT before shutting down.
+func ListenAndServe(s *http.Server, hd *HTTP) error {
+ if hd == nil {
+ hd = &HTTP{}
+ }
+ hs, err := hd.ListenAndServe(s)
+ if err != nil {
+ return err
+ }
+
+ waiterr := make(chan error, 1)
+ go func() {
+ defer close(waiterr)
+ waiterr <- hs.Wait()
+ }()
+
+ signals := make(chan os.Signal, 10)
+ signal.Notify(signals, syscall.SIGTERM, syscall.SIGINT)
+
+ select {
+ case err := <-waiterr:
+ if err != nil {
+ return err
+ }
+ case <-signals:
+ signal.Stop(signals)
+ if err := hs.Stop(); err != nil {
+ return err
+ }
+ if err := <-waiterr; err != nil {
+ return err
+ }
+ }
+ return nil
+}
diff --git a/weed/util/inits.go b/weed/util/inits.go
new file mode 100644
index 000000000..378878012
--- /dev/null
+++ b/weed/util/inits.go
@@ -0,0 +1,52 @@
+package util
+
+import (
+ "fmt"
+ "sort"
+)
+
+// HumanReadableIntsMax joins a serials of inits into a smart one like 1-3 5 ... for human readable.
+func HumanReadableIntsMax(max int, ids ...int) string {
+ if len(ids) <= max {
+ return HumanReadableInts(ids...)
+ }
+
+ return HumanReadableInts(ids[:max]...) + " ..."
+}
+
+// HumanReadableInts joins a serials of inits into a smart one like 1-3 5 7-10 for human readable.
+func HumanReadableInts(ids ...int) string {
+ sort.Ints(ids)
+
+ s := ""
+ start := 0
+ last := 0
+
+ for i, v := range ids {
+ if i == 0 {
+ start = v
+ last = v
+ s = fmt.Sprintf("%d", v)
+ continue
+ }
+
+ if last+1 == v {
+ last = v
+ continue
+ }
+
+ if last > start {
+ s += fmt.Sprintf("-%d", last)
+ }
+
+ s += fmt.Sprintf(" %d", v)
+ start = v
+ last = v
+ }
+
+ if last != start {
+ s += fmt.Sprintf("-%d", last)
+ }
+
+ return s
+}
diff --git a/weed/util/inits_test.go b/weed/util/inits_test.go
new file mode 100644
index 000000000..f2c9b701f
--- /dev/null
+++ b/weed/util/inits_test.go
@@ -0,0 +1,19 @@
+package util
+
+import (
+ "github.com/stretchr/testify/assert"
+ "testing"
+)
+
+func TestHumanReadableIntsMax(t *testing.T) {
+ assert.Equal(t, "1-2 ...", HumanReadableIntsMax(2, 1, 2, 3))
+ assert.Equal(t, "1 3 ...", HumanReadableIntsMax(2, 1, 3, 5))
+}
+
+func TestHumanReadableInts(t *testing.T) {
+ assert.Equal(t, "1-3", HumanReadableInts(1, 2, 3))
+ assert.Equal(t, "1 3", HumanReadableInts(1, 3))
+ assert.Equal(t, "1 3 5", HumanReadableInts(5, 1, 3))
+ assert.Equal(t, "1-3 5", HumanReadableInts(1, 2, 3, 5))
+ assert.Equal(t, "1-3 5 7-9", HumanReadableInts(7, 9, 8, 1, 2, 3, 5))
+}
diff --git a/weed/util/log_buffer/log_buffer.go b/weed/util/log_buffer/log_buffer.go
new file mode 100644
index 000000000..b02c45b52
--- /dev/null
+++ b/weed/util/log_buffer/log_buffer.go
@@ -0,0 +1,278 @@
+package log_buffer
+
+import (
+ "bytes"
+ "sync"
+ "time"
+
+ "github.com/golang/protobuf/proto"
+
+ "github.com/chrislusf/seaweedfs/weed/glog"
+ "github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
+ "github.com/chrislusf/seaweedfs/weed/util"
+)
+
+const BufferSize = 4 * 1024 * 1024
+const PreviousBufferCount = 3
+
+type dataToFlush struct {
+ startTime time.Time
+ stopTime time.Time
+ data *bytes.Buffer
+}
+
+type LogBuffer struct {
+ prevBuffers *SealedBuffers
+ buf []byte
+ idx []int
+ pos int
+ startTime time.Time
+ stopTime time.Time
+ sizeBuf []byte
+ flushInterval time.Duration
+ flushFn func(startTime, stopTime time.Time, buf []byte)
+ notifyFn func()
+ isStopping bool
+ flushChan chan *dataToFlush
+ lastTsNs int64
+ sync.RWMutex
+}
+
+func NewLogBuffer(flushInterval time.Duration, flushFn func(startTime, stopTime time.Time, buf []byte), notifyFn func()) *LogBuffer {
+ lb := &LogBuffer{
+ prevBuffers: newSealedBuffers(PreviousBufferCount),
+ buf: make([]byte, BufferSize),
+ sizeBuf: make([]byte, 4),
+ flushInterval: flushInterval,
+ flushFn: flushFn,
+ notifyFn: notifyFn,
+ flushChan: make(chan *dataToFlush, 256),
+ }
+ go lb.loopFlush()
+ go lb.loopInterval()
+ return lb
+}
+
+func (m *LogBuffer) AddToBuffer(partitionKey, data []byte) {
+
+ m.Lock()
+ defer func() {
+ m.Unlock()
+ if m.notifyFn != nil {
+ m.notifyFn()
+ }
+ }()
+
+ // need to put the timestamp inside the lock
+ ts := time.Now()
+ tsNs := ts.UnixNano()
+ if m.lastTsNs >= tsNs {
+ // this is unlikely to happen, but just in case
+ tsNs = m.lastTsNs + 1
+ ts = time.Unix(0, tsNs)
+ }
+ m.lastTsNs = tsNs
+ logEntry := &filer_pb.LogEntry{
+ TsNs: tsNs,
+ PartitionKeyHash: util.HashToInt32(partitionKey),
+ Data: data,
+ }
+
+ logEntryData, _ := proto.Marshal(logEntry)
+
+ size := len(logEntryData)
+
+ if m.pos == 0 {
+ m.startTime = ts
+ }
+
+ if m.startTime.Add(m.flushInterval).Before(ts) || len(m.buf)-m.pos < size+4 {
+ m.flushChan <- m.copyToFlush()
+ m.startTime = ts
+ if len(m.buf) < size+4 {
+ m.buf = make([]byte, 2*size+4)
+ }
+ }
+ m.stopTime = ts
+
+ m.idx = append(m.idx, m.pos)
+ util.Uint32toBytes(m.sizeBuf, uint32(size))
+ copy(m.buf[m.pos:m.pos+4], m.sizeBuf)
+ copy(m.buf[m.pos+4:m.pos+4+size], logEntryData)
+ m.pos += size + 4
+
+ // fmt.Printf("entry size %d total %d count %d, buffer:%p\n", size, m.pos, len(m.idx), m)
+
+}
+
+func (m *LogBuffer) Shutdown() {
+ m.Lock()
+ defer m.Unlock()
+
+ if m.isStopping {
+ return
+ }
+ m.isStopping = true
+ toFlush := m.copyToFlush()
+ m.flushChan <- toFlush
+ close(m.flushChan)
+}
+
+func (m *LogBuffer) loopFlush() {
+ for d := range m.flushChan {
+ if d != nil {
+ // fmt.Printf("flush [%v, %v] size %d\n", d.startTime, d.stopTime, len(d.data.Bytes()))
+ m.flushFn(d.startTime, d.stopTime, d.data.Bytes())
+ d.releaseMemory()
+ }
+ }
+}
+
+func (m *LogBuffer) loopInterval() {
+ for !m.isStopping {
+ time.Sleep(m.flushInterval)
+ m.Lock()
+ if m.isStopping {
+ m.Unlock()
+ return
+ }
+ // println("loop interval")
+ toFlush := m.copyToFlush()
+ m.flushChan <- toFlush
+ m.Unlock()
+ }
+}
+
+func (m *LogBuffer) copyToFlush() *dataToFlush {
+
+ if m.flushFn != nil && m.pos > 0 {
+ // fmt.Printf("flush buffer %d pos %d empty space %d\n", len(m.buf), m.pos, len(m.buf)-m.pos)
+ d := &dataToFlush{
+ startTime: m.startTime,
+ stopTime: m.stopTime,
+ data: copiedBytes(m.buf[:m.pos]),
+ }
+ // fmt.Printf("flusing [0,%d) with %d entries\n", m.pos, len(m.idx))
+ m.buf = m.prevBuffers.SealBuffer(m.startTime, m.stopTime, m.buf, m.pos)
+ m.pos = 0
+ m.idx = m.idx[:0]
+ return d
+ }
+ return nil
+}
+
+func (d *dataToFlush) releaseMemory() {
+ d.data.Reset()
+ bufferPool.Put(d.data)
+}
+
+func (m *LogBuffer) ReadFromBuffer(lastReadTime time.Time) (bufferCopy *bytes.Buffer) {
+ m.RLock()
+ defer m.RUnlock()
+
+ /*
+ fmt.Printf("read buffer %p: %v last stop time: [%v,%v], pos %d, entries:%d, prevBufs:%d\n", m, lastReadTime, m.startTime, m.stopTime, m.pos, len(m.idx), len(m.prevBuffers.buffers))
+ for i, prevBuf := range m.prevBuffers.buffers {
+ fmt.Printf(" prev %d : %s\n", i, prevBuf.String())
+ }
+ */
+
+ if lastReadTime.Equal(m.stopTime) {
+ return nil
+ }
+ if lastReadTime.After(m.stopTime) {
+ // glog.Fatalf("unexpected last read time %v, older than latest %v", lastReadTime, m.stopTime)
+ return nil
+ }
+ if lastReadTime.Before(m.startTime) {
+ // println("checking ", lastReadTime.UnixNano())
+ for i, buf := range m.prevBuffers.buffers {
+ if buf.startTime.After(lastReadTime) {
+ if i == 0 {
+ // println("return the earliest in memory", buf.startTime.UnixNano())
+ return copiedBytes(buf.buf[:buf.size])
+ }
+ // println("return the", i, "th in memory", buf.startTime.UnixNano())
+ return copiedBytes(buf.buf[:buf.size])
+ }
+ if !buf.startTime.After(lastReadTime) && buf.stopTime.After(lastReadTime) {
+ pos := buf.locateByTs(lastReadTime)
+ // fmt.Printf("locate buffer[%d] pos %d\n", i, pos)
+ return copiedBytes(buf.buf[pos:buf.size])
+ }
+ }
+ // println("return the current buf", lastReadTime.UnixNano())
+ return copiedBytes(m.buf[:m.pos])
+ }
+
+ lastTs := lastReadTime.UnixNano()
+ l, h := 0, len(m.idx)-1
+
+ /*
+ for i, pos := range m.idx {
+ logEntry, ts := readTs(m.buf, pos)
+ event := &filer_pb.SubscribeMetadataResponse{}
+ proto.Unmarshal(logEntry.Data, event)
+ entry := event.EventNotification.OldEntry
+ if entry == nil {
+ entry = event.EventNotification.NewEntry
+ }
+ fmt.Printf("entry %d ts: %v offset:%d dir:%s name:%s\n", i, time.Unix(0, ts), pos, event.Directory, entry.Name)
+ }
+ fmt.Printf("l=%d, h=%d\n", l, h)
+ */
+
+ for l <= h {
+ mid := (l + h) / 2
+ pos := m.idx[mid]
+ _, t := readTs(m.buf, pos)
+ if t <= lastTs {
+ l = mid + 1
+ } else if lastTs < t {
+ var prevT int64
+ if mid > 0 {
+ _, prevT = readTs(m.buf, m.idx[mid-1])
+ }
+ if prevT <= lastTs {
+ // fmt.Printf("found l=%d, m-1=%d(ts=%d), m=%d(ts=%d), h=%d [%d, %d) \n", l, mid-1, prevT, mid, t, h, pos, m.pos)
+ return copiedBytes(m.buf[pos:m.pos])
+ }
+ h = mid
+ }
+ // fmt.Printf("l=%d, h=%d\n", l, h)
+ }
+
+ // FIXME: this could be that the buffer has been flushed already
+ return nil
+
+}
+func (m *LogBuffer) ReleaseMeory(b *bytes.Buffer) {
+ bufferPool.Put(b)
+}
+
+var bufferPool = sync.Pool{
+ New: func() interface{} {
+ return new(bytes.Buffer)
+ },
+}
+
+func copiedBytes(buf []byte) (copied *bytes.Buffer) {
+ copied = bufferPool.Get().(*bytes.Buffer)
+ copied.Reset()
+ copied.Write(buf)
+ return
+}
+
+func readTs(buf []byte, pos int) (size int, ts int64) {
+
+ size = int(util.BytesToUint32(buf[pos : pos+4]))
+ entryData := buf[pos+4 : pos+4+size]
+ logEntry := &filer_pb.LogEntry{}
+
+ err := proto.Unmarshal(entryData, logEntry)
+ if err != nil {
+ glog.Fatalf("unexpected unmarshal filer_pb.LogEntry: %v", err)
+ }
+ return size, logEntry.TsNs
+
+}
diff --git a/weed/util/log_buffer/log_buffer_test.go b/weed/util/log_buffer/log_buffer_test.go
new file mode 100644
index 000000000..f9ccc95c2
--- /dev/null
+++ b/weed/util/log_buffer/log_buffer_test.go
@@ -0,0 +1,42 @@
+package log_buffer
+
+import (
+ "fmt"
+ "math/rand"
+ "testing"
+ "time"
+
+ "github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
+)
+
+func TestNewLogBufferFirstBuffer(t *testing.T) {
+ lb := NewLogBuffer(time.Minute, func(startTime, stopTime time.Time, buf []byte) {
+
+ }, func() {
+
+ })
+
+ startTime := time.Now()
+
+ messageSize := 1024
+ messageCount := 5000
+ var buf = make([]byte, messageSize)
+ for i := 0; i < messageCount; i++ {
+ rand.Read(buf)
+ lb.AddToBuffer(nil, buf)
+ }
+
+ receivedmessageCount := 0
+ lb.LoopProcessLogData(startTime, func() bool {
+ // stop if no more messages
+ return false
+ }, func(logEntry *filer_pb.LogEntry) error {
+ receivedmessageCount++
+ return nil
+ })
+
+ if receivedmessageCount != messageCount {
+ fmt.Printf("sent %d received %d\n", messageCount, receivedmessageCount)
+ }
+
+}
diff --git a/weed/util/log_buffer/log_read.go b/weed/util/log_buffer/log_read.go
new file mode 100644
index 000000000..2b73a8064
--- /dev/null
+++ b/weed/util/log_buffer/log_read.go
@@ -0,0 +1,77 @@
+package log_buffer
+
+import (
+ "bytes"
+ "time"
+
+ "github.com/golang/protobuf/proto"
+
+ "github.com/chrislusf/seaweedfs/weed/glog"
+ "github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
+ "github.com/chrislusf/seaweedfs/weed/util"
+)
+
+func (logBuffer *LogBuffer) LoopProcessLogData(
+ startTreadTime time.Time,
+ waitForDataFn func() bool,
+ eachLogDataFn func(logEntry *filer_pb.LogEntry) error) (err error) {
+ // loop through all messages
+ var bytesBuf *bytes.Buffer
+ lastReadTime := startTreadTime
+ defer func() {
+ if bytesBuf != nil {
+ logBuffer.ReleaseMeory(bytesBuf)
+ }
+ }()
+
+ for {
+
+ if bytesBuf != nil {
+ logBuffer.ReleaseMeory(bytesBuf)
+ }
+ bytesBuf = logBuffer.ReadFromBuffer(lastReadTime)
+ // fmt.Printf("ReadFromBuffer by %v\n", lastReadTime)
+ if bytesBuf == nil {
+ if waitForDataFn() {
+ continue
+ } else {
+ return
+ }
+ }
+
+ buf := bytesBuf.Bytes()
+ // fmt.Printf("ReadFromBuffer by %v size %d\n", lastReadTime, len(buf))
+
+ batchSize := 0
+ var startReadTime time.Time
+
+ for pos := 0; pos+4 < len(buf); {
+
+ size := util.BytesToUint32(buf[pos : pos+4])
+ entryData := buf[pos+4 : pos+4+int(size)]
+
+ // fmt.Printf("read buffer read %d [%d,%d) from [0,%d)\n", batchSize, pos, pos+int(size)+4, len(buf))
+
+ logEntry := &filer_pb.LogEntry{}
+ if err = proto.Unmarshal(entryData, logEntry); err != nil {
+ glog.Errorf("unexpected unmarshal messaging_pb.Message: %v", err)
+ pos += 4 + int(size)
+ continue
+ }
+ lastReadTime = time.Unix(0, logEntry.TsNs)
+ if startReadTime.IsZero() {
+ startReadTime = lastReadTime
+ }
+
+ if err = eachLogDataFn(logEntry); err != nil {
+ return
+ }
+
+ pos += 4 + int(size)
+ batchSize++
+ }
+
+ // fmt.Printf("sent message ts[%d,%d] size %d\n", startReadTime.UnixNano(), lastReadTime.UnixNano(), batchSize)
+ }
+
+}
diff --git a/weed/util/log_buffer/sealed_buffer.go b/weed/util/log_buffer/sealed_buffer.go
new file mode 100644
index 000000000..d133cf8d3
--- /dev/null
+++ b/weed/util/log_buffer/sealed_buffer.go
@@ -0,0 +1,62 @@
+package log_buffer
+
+import (
+ "fmt"
+ "time"
+)
+
+type MemBuffer struct {
+ buf []byte
+ size int
+ startTime time.Time
+ stopTime time.Time
+}
+
+type SealedBuffers struct {
+ buffers []*MemBuffer
+}
+
+func newSealedBuffers(size int) *SealedBuffers {
+ sbs := &SealedBuffers{}
+
+ sbs.buffers = make([]*MemBuffer, size)
+ for i := 0; i < size; i++ {
+ sbs.buffers[i] = &MemBuffer{
+ buf: make([]byte, BufferSize),
+ }
+ }
+
+ return sbs
+}
+
+func (sbs *SealedBuffers) SealBuffer(startTime, stopTime time.Time, buf []byte, pos int) (newBuf []byte) {
+ oldMemBuffer := sbs.buffers[0]
+ size := len(sbs.buffers)
+ for i := 0; i < size-1; i++ {
+ sbs.buffers[i].buf = sbs.buffers[i+1].buf
+ sbs.buffers[i].size = sbs.buffers[i+1].size
+ sbs.buffers[i].startTime = sbs.buffers[i+1].startTime
+ sbs.buffers[i].stopTime = sbs.buffers[i+1].stopTime
+ }
+ sbs.buffers[size-1].buf = buf
+ sbs.buffers[size-1].size = pos
+ sbs.buffers[size-1].startTime = startTime
+ sbs.buffers[size-1].stopTime = stopTime
+ return oldMemBuffer.buf
+}
+
+func (mb *MemBuffer) locateByTs(lastReadTime time.Time) (pos int) {
+ lastReadTs := lastReadTime.UnixNano()
+ for pos < len(mb.buf) {
+ size, t := readTs(mb.buf, pos)
+ if t > lastReadTs {
+ return
+ }
+ pos += size + 4
+ }
+ return len(mb.buf)
+}
+
+func (mb *MemBuffer) String() string {
+ return fmt.Sprintf("[%v,%v] bytes:%d", mb.startTime, mb.stopTime, mb.size)
+}
diff --git a/weed/util/net_timeout.go b/weed/util/net_timeout.go
index b8068e67f..f057a8f5b 100644
--- a/weed/util/net_timeout.go
+++ b/weed/util/net_timeout.go
@@ -35,6 +35,7 @@ type Conn struct {
net.Conn
ReadTimeout time.Duration
WriteTimeout time.Duration
+ isClosed bool
}
func (c *Conn) Read(b []byte) (count int, e error) {
@@ -68,7 +69,10 @@ func (c *Conn) Write(b []byte) (count int, e error) {
func (c *Conn) Close() error {
err := c.Conn.Close()
if err == nil {
- stats.ConnectionClose()
+ if !c.isClosed {
+ stats.ConnectionClose()
+ c.isClosed = true
+ }
}
return err
}
diff --git a/weed/util/network.go b/weed/util/network.go
new file mode 100644
index 000000000..7108cfea6
--- /dev/null
+++ b/weed/util/network.go
@@ -0,0 +1,25 @@
+package util
+
+import (
+ "net"
+
+ "github.com/chrislusf/seaweedfs/weed/glog"
+)
+
+func DetectedHostAddress() string {
+ addrs, err := net.InterfaceAddrs()
+ if err != nil {
+ glog.V(0).Infof("failed to detect ip address: %v", err)
+ return ""
+ }
+
+ for _, a := range addrs {
+ if ipnet, ok := a.(*net.IPNet); ok && !ipnet.IP.IsLoopback() {
+ if ipnet.IP.To4() != nil {
+ return ipnet.IP.String()
+ }
+ }
+ }
+
+ return "localhost"
+}
diff --git a/weed/util/parse.go b/weed/util/parse.go
index 0a8317c19..0955db682 100644
--- a/weed/util/parse.go
+++ b/weed/util/parse.go
@@ -1,7 +1,10 @@
package util
import (
+ "fmt"
+ "net/url"
"strconv"
+ "strings"
)
func ParseInt(text string, defaultValue int) int {
@@ -24,3 +27,37 @@ func ParseUint64(text string, defaultValue uint64) uint64 {
}
return count
}
+
+func ParseFilerUrl(entryPath string) (filerServer string, filerPort int64, path string, err error) {
+ if !strings.HasPrefix(entryPath, "http://") && !strings.HasPrefix(entryPath, "https://") {
+ entryPath = "http://" + entryPath
+ }
+
+ var u *url.URL
+ u, err = url.Parse(entryPath)
+ if err != nil {
+ return
+ }
+ filerServer = u.Hostname()
+ portString := u.Port()
+ if portString != "" {
+ filerPort, err = strconv.ParseInt(portString, 10, 32)
+ }
+ path = u.Path
+ return
+}
+
+func ParseHostPort(hostPort string) (filerServer string, filerPort int64, err error) {
+ parts := strings.Split(hostPort, ":")
+ if len(parts) != 2 {
+ err = fmt.Errorf("failed to parse %s\n", hostPort)
+ return
+ }
+
+ filerPort, err = strconv.ParseInt(parts[1], 10, 64)
+ if err == nil {
+ filerServer = parts[0]
+ }
+
+ return
+}
diff --git a/weed/util/queue.go b/weed/util/queue.go
new file mode 100644
index 000000000..1e6211e0d
--- /dev/null
+++ b/weed/util/queue.go
@@ -0,0 +1,61 @@
+package util
+
+import "sync"
+
+type node struct {
+ data interface{}
+ next *node
+}
+
+type Queue struct {
+ head *node
+ tail *node
+ count int
+ sync.RWMutex
+}
+
+func NewQueue() *Queue {
+ q := &Queue{}
+ return q
+}
+
+func (q *Queue) Len() int {
+ q.RLock()
+ defer q.RUnlock()
+ return q.count
+}
+
+func (q *Queue) Enqueue(item interface{}) {
+ q.Lock()
+ defer q.Unlock()
+
+ n := &node{data: item}
+
+ if q.tail == nil {
+ q.tail = n
+ q.head = n
+ } else {
+ q.tail.next = n
+ q.tail = n
+ }
+ q.count++
+}
+
+func (q *Queue) Dequeue() interface{} {
+ q.Lock()
+ defer q.Unlock()
+
+ if q.head == nil {
+ return nil
+ }
+
+ n := q.head
+ q.head = n.next
+
+ if q.head == nil {
+ q.tail = nil
+ }
+ q.count--
+
+ return n.data
+}
diff --git a/weed/util/queue_unbounded.go b/weed/util/queue_unbounded.go
new file mode 100644
index 000000000..496b9f844
--- /dev/null
+++ b/weed/util/queue_unbounded.go
@@ -0,0 +1,45 @@
+package util
+
+import "sync"
+
+type UnboundedQueue struct {
+ outbound []string
+ outboundLock sync.RWMutex
+ inbound []string
+ inboundLock sync.RWMutex
+}
+
+func NewUnboundedQueue() *UnboundedQueue {
+ q := &UnboundedQueue{}
+ return q
+}
+
+func (q *UnboundedQueue) EnQueue(items ...string) {
+ q.inboundLock.Lock()
+ defer q.inboundLock.Unlock()
+
+ q.inbound = append(q.inbound, items...)
+
+}
+
+func (q *UnboundedQueue) Consume(fn func([]string)) {
+ q.outboundLock.Lock()
+ defer q.outboundLock.Unlock()
+
+ if len(q.outbound) == 0 {
+ q.inboundLock.Lock()
+ inbountLen := len(q.inbound)
+ if inbountLen > 0 {
+ t := q.outbound
+ q.outbound = q.inbound
+ q.inbound = t
+ }
+ q.inboundLock.Unlock()
+ }
+
+ if len(q.outbound) > 0 {
+ fn(q.outbound)
+ q.outbound = q.outbound[:0]
+ }
+
+}
diff --git a/weed/util/queue_unbounded_test.go b/weed/util/queue_unbounded_test.go
new file mode 100644
index 000000000..2d02032cb
--- /dev/null
+++ b/weed/util/queue_unbounded_test.go
@@ -0,0 +1,25 @@
+package util
+
+import "testing"
+
+func TestEnqueueAndConsume(t *testing.T) {
+
+ q := NewUnboundedQueue()
+
+ q.EnQueue("1", "2", "3")
+
+ f := func(items []string) {
+ for _, t := range items {
+ println(t)
+ }
+ println("-----------------------")
+ }
+ q.Consume(f)
+
+ q.Consume(f)
+
+ q.EnQueue("4", "5")
+ q.EnQueue("6", "7")
+ q.Consume(f)
+
+}
diff --git a/weed/util/throttler.go b/weed/util/throttler.go
new file mode 100644
index 000000000..873161e37
--- /dev/null
+++ b/weed/util/throttler.go
@@ -0,0 +1,34 @@
+package util
+
+import "time"
+
+type WriteThrottler struct {
+ compactionBytePerSecond int64
+ lastSizeCounter int64
+ lastSizeCheckTime time.Time
+}
+
+func NewWriteThrottler(bytesPerSecond int64) *WriteThrottler {
+ return &WriteThrottler{
+ compactionBytePerSecond: bytesPerSecond,
+ lastSizeCheckTime: time.Now(),
+ }
+}
+
+func (wt *WriteThrottler) MaybeSlowdown(delta int64) {
+ if wt.compactionBytePerSecond > 0 {
+ wt.lastSizeCounter += delta
+ now := time.Now()
+ elapsedDuration := now.Sub(wt.lastSizeCheckTime)
+ if elapsedDuration > 100*time.Millisecond {
+ overLimitBytes := wt.lastSizeCounter - wt.compactionBytePerSecond/10
+ if overLimitBytes > 0 {
+ overRatio := float64(overLimitBytes) / float64(wt.compactionBytePerSecond)
+ sleepTime := time.Duration(overRatio*1000) * time.Millisecond
+ // glog.V(0).Infof("currently %d bytes, limit to %d bytes, over by %d bytes, sleeping %v over %.4f", wt.lastSizeCounter, wt.compactionBytePerSecond/10, overLimitBytes, sleepTime, overRatio)
+ time.Sleep(sleepTime)
+ }
+ wt.lastSizeCounter, wt.lastSizeCheckTime = 0, time.Now()
+ }
+ }
+}