diff options
Diffstat (limited to 'weed/util')
39 files changed, 2511 insertions, 133 deletions
diff --git a/weed/util/bounded_tree/bounded_tree.go b/weed/util/bounded_tree/bounded_tree.go new file mode 100644 index 000000000..40b9c4e47 --- /dev/null +++ b/weed/util/bounded_tree/bounded_tree.go @@ -0,0 +1,166 @@ +package bounded_tree + +import ( + "sync" + + "github.com/chrislusf/seaweedfs/weed/glog" + "github.com/chrislusf/seaweedfs/weed/util" +) + +type Node struct { + Parent *Node + Name string + Children map[string]*Node +} + +type BoundedTree struct { + root *Node + sync.Mutex +} + +func NewBoundedTree() *BoundedTree { + return &BoundedTree{ + root: &Node{ + Name: "/", + }, + } +} + +type VisitNodeFunc func(path util.FullPath) (childDirectories []string, err error) + +// If the path is not visited, call the visitFn for each level of directory +// No action if the directory has been visited before or does not exist. +// A leaf node, which has no children, represents a directory not visited. +// A non-leaf node or a non-existing node represents a directory already visited, or does not need to visit. +func (t *BoundedTree) EnsureVisited(p util.FullPath, visitFn VisitNodeFunc) { + t.Lock() + defer t.Unlock() + + if t.root == nil { + return + } + components := p.Split() + // fmt.Printf("components %v %d\n", components, len(components)) + if canDelete := t.ensureVisited(t.root, util.FullPath("/"), components, 0, visitFn); canDelete { + t.root = nil + } +} + +func (t *BoundedTree) ensureVisited(n *Node, currentPath util.FullPath, components []string, i int, visitFn VisitNodeFunc) (canDeleteNode bool) { + + // println("ensureVisited", currentPath, i) + + if n == nil { + // fmt.Printf("%s null\n", currentPath) + return + } + + if n.isVisited() { + // fmt.Printf("%s visited %v\n", currentPath, n.Name) + } else { + // fmt.Printf("ensure %v\n", currentPath) + + children, err := visitFn(currentPath) + if err != nil { + glog.V(0).Infof("failed to visit %s: %v", currentPath, err) + return + } + + if len(children) == 0 { + // fmt.Printf(" canDelete %v without children\n", currentPath) + return true + } + + n.Children = make(map[string]*Node) + for _, child := range children { + // fmt.Printf(" add child %v %v\n", currentPath, child) + n.Children[child] = &Node{ + Name: child, + } + } + } + + if i >= len(components) { + return + } + + // fmt.Printf(" check child %v %v\n", currentPath, components[i]) + + toVisitNode, found := n.Children[components[i]] + if !found { + // fmt.Printf(" did not find child %v %v\n", currentPath, components[i]) + return + } + + // fmt.Printf(" ensureVisited %v %v\n", currentPath, toVisitNode.Name) + + if canDelete := t.ensureVisited(toVisitNode, currentPath.Child(components[i]), components, i+1, visitFn); canDelete { + + // fmt.Printf(" delete %v %v\n", currentPath, components[i]) + delete(n.Children, components[i]) + + if len(n.Children) == 0 { + // fmt.Printf(" canDelete %v\n", currentPath) + return true + } + } + + return false + +} + +func (n *Node) isVisited() bool { + if n == nil { + return true + } + if len(n.Children) > 0 { + return true + } + return false +} + +func (n *Node) getChild(childName string) *Node { + if n == nil { + return nil + } + if len(n.Children) > 0 { + return n.Children[childName] + } + return nil +} + +func (t *BoundedTree) HasVisited(p util.FullPath) bool { + + if t.root == nil { + return true + } + + components := p.Split() + // fmt.Printf("components %v %d\n", components, len(components)) + return t.hasVisited(t.root, util.FullPath("/"), components, 0) +} + +func (t *BoundedTree) hasVisited(n *Node, currentPath util.FullPath, components []string, i int) bool { + + if n == nil { + return true + } + + if !n.isVisited() { + return false + } + + // fmt.Printf(" hasVisited child %v %+v %d\n", currentPath, components, i) + + if i >= len(components) { + return true + } + + toVisitNode, found := n.Children[components[i]] + if !found { + return true + } + + return t.hasVisited(toVisitNode, currentPath.Child(components[i]), components, i+1) + +} diff --git a/weed/util/bounded_tree/bounded_tree_test.go b/weed/util/bounded_tree/bounded_tree_test.go new file mode 100644 index 000000000..2328f0497 --- /dev/null +++ b/weed/util/bounded_tree/bounded_tree_test.go @@ -0,0 +1,131 @@ +package bounded_tree + +import ( + "fmt" + "testing" + + "github.com/stretchr/testify/assert" + + "github.com/chrislusf/seaweedfs/weed/util" +) + + +var ( + + visitFn = func(path util.FullPath) (childDirectories []string, err error) { + fmt.Printf(" visit %v ...\n", path) + switch path { + case "/": + return []string{"a", "g", "h"}, nil + case "/a": + return []string{"b", "f"}, nil + case "/a/b": + return []string{"c", "e"}, nil + case "/a/b/c": + return []string{"d"}, nil + case "/a/b/c/d": + return []string{"i", "j"}, nil + case "/a/b/c/d/i": + return []string{}, nil + case "/a/b/c/d/j": + return []string{}, nil + case "/a/b/e": + return []string{}, nil + case "/a/f": + return []string{}, nil + } + return nil, nil + } + + + printMap = func(m map[string]*Node) { + for k := range m { + println(" >", k) + } + } + + +) + +func TestBoundedTree(t *testing.T) { + + // a/b/c/d/i + // a/b/c/d/j + // a/b/c/d + // a/b/e + // a/f + // g + // h + + tree := NewBoundedTree() + + tree.EnsureVisited(util.FullPath("/a/b/c"), visitFn) + + assert.Equal(t, true, tree.HasVisited(util.FullPath("/a/b"))) + assert.Equal(t, true, tree.HasVisited(util.FullPath("/a/b/c"))) + assert.Equal(t, false, tree.HasVisited(util.FullPath("/a/b/c/d"))) + assert.Equal(t, false, tree.HasVisited(util.FullPath("/a/b/e"))) + assert.Equal(t, false, tree.HasVisited(util.FullPath("/a/f"))) + assert.Equal(t, false, tree.HasVisited(util.FullPath("/g"))) + assert.Equal(t, false, tree.HasVisited(util.FullPath("/h"))) + assert.Equal(t, true, tree.HasVisited(util.FullPath("/"))) + assert.Equal(t, true, tree.HasVisited(util.FullPath("/x"))) + assert.Equal(t, false, tree.HasVisited(util.FullPath("/a/b/e/x"))) + + printMap(tree.root.Children) + + a := tree.root.getChild("a") + + b := a.getChild("b") + if !b.isVisited() { + t.Errorf("expect visited /a/b") + } + c := b.getChild("c") + if !c.isVisited() { + t.Errorf("expect visited /a/b/c") + } + + d := c.getChild("d") + if d.isVisited() { + t.Errorf("expect unvisited /a/b/c/d") + } + + tree.EnsureVisited(util.FullPath("/a/b/c/d"), visitFn) + tree.EnsureVisited(util.FullPath("/a/b/c/d/i"), visitFn) + tree.EnsureVisited(util.FullPath("/a/b/c/d/j"), visitFn) + tree.EnsureVisited(util.FullPath("/a/b/e"), visitFn) + tree.EnsureVisited(util.FullPath("/a/f"), visitFn) + + printMap(tree.root.Children) + +} + +func TestEmptyBoundedTree(t *testing.T) { + + // g + // h + + tree := NewBoundedTree() + + visitFn := func(path util.FullPath) (childDirectories []string, err error) { + fmt.Printf(" visit %v ...\n", path) + switch path { + case "/": + return []string{"g", "h"}, nil + } + t.Fatalf("expected visit %s", path) + return nil, nil + } + + tree.EnsureVisited(util.FullPath("/a/b"), visitFn) + + tree.EnsureVisited(util.FullPath("/a/b"), visitFn) + + printMap(tree.root.Children) + + assert.Equal(t, true, tree.HasVisited(util.FullPath("/a/b"))) + assert.Equal(t, true, tree.HasVisited(util.FullPath("/a"))) + assert.Equal(t, false, tree.HasVisited(util.FullPath("/g"))) + assert.Equal(t, false, tree.HasVisited(util.FullPath("/g/x"))) + +} diff --git a/weed/util/bytes.go b/weed/util/bytes.go index dfa4ae665..0650919c0 100644 --- a/weed/util/bytes.go +++ b/weed/util/bytes.go @@ -1,5 +1,27 @@ package util +import ( + "crypto/md5" + "fmt" + "io" +) + +// BytesToHumanReadable returns the converted human readable representation of the bytes. +func BytesToHumanReadable(b uint64) string { + const unit = 1024 + if b < unit { + return fmt.Sprintf("%d B", b) + } + + div, exp := uint64(unit), 0 + for n := b / unit; n >= unit; n /= unit { + div *= unit + exp++ + } + + return fmt.Sprintf("%.2f %ciB", float64(b)/float64(div), "KMGTPE"[exp]) +} + // big endian func BytesToUint64(b []byte) (v uint64) { @@ -43,3 +65,52 @@ func Uint16toBytes(b []byte, v uint16) { func Uint8toBytes(b []byte, v uint8) { b[0] = byte(v) } + +// returns a 64 bit big int +func HashStringToLong(dir string) (v int64) { + h := md5.New() + io.WriteString(h, dir) + + b := h.Sum(nil) + + v += int64(b[0]) + v <<= 8 + v += int64(b[1]) + v <<= 8 + v += int64(b[2]) + v <<= 8 + v += int64(b[3]) + v <<= 8 + v += int64(b[4]) + v <<= 8 + v += int64(b[5]) + v <<= 8 + v += int64(b[6]) + v <<= 8 + v += int64(b[7]) + + return +} + +func HashToInt32(data []byte) (v int32) { + h := md5.New() + h.Write(data) + + b := h.Sum(nil) + + v += int32(b[0]) + v <<= 8 + v += int32(b[1]) + v <<= 8 + v += int32(b[2]) + v <<= 8 + v += int32(b[3]) + + return +} + +func Md5(data []byte) string { + hash := md5.New() + hash.Write(data) + return fmt.Sprintf("%x", hash.Sum(nil)) +} diff --git a/weed/util/chunk_cache/chunk_cache.go b/weed/util/chunk_cache/chunk_cache.go new file mode 100644 index 000000000..17b64fb6c --- /dev/null +++ b/weed/util/chunk_cache/chunk_cache.go @@ -0,0 +1,128 @@ +package chunk_cache + +import ( + "sync" + + "github.com/chrislusf/seaweedfs/weed/glog" + "github.com/chrislusf/seaweedfs/weed/storage/needle" +) + +const ( + memCacheSizeLimit = 1024 * 1024 + onDiskCacheSizeLimit0 = memCacheSizeLimit + onDiskCacheSizeLimit1 = 4 * memCacheSizeLimit +) + +// a global cache for recently accessed file chunks +type ChunkCache struct { + memCache *ChunkCacheInMemory + diskCaches []*OnDiskCacheLayer + sync.RWMutex +} + +func NewChunkCache(maxEntries int64, dir string, diskSizeMB int64) *ChunkCache { + + c := &ChunkCache{ + memCache: NewChunkCacheInMemory(maxEntries), + } + c.diskCaches = make([]*OnDiskCacheLayer, 3) + c.diskCaches[0] = NewOnDiskCacheLayer(dir, "c0_1", diskSizeMB/4, 4) + c.diskCaches[1] = NewOnDiskCacheLayer(dir, "c1_4", diskSizeMB/4, 4) + c.diskCaches[2] = NewOnDiskCacheLayer(dir, "cache", diskSizeMB/2, 4) + + return c +} + +func (c *ChunkCache) GetChunk(fileId string, chunkSize uint64) (data []byte) { + if c == nil { + return + } + + c.RLock() + defer c.RUnlock() + + return c.doGetChunk(fileId, chunkSize) +} + +func (c *ChunkCache) doGetChunk(fileId string, chunkSize uint64) (data []byte) { + + if chunkSize < memCacheSizeLimit { + data = c.memCache.GetChunk(fileId) + if len(data) >= int(chunkSize) { + return data + } + } + + fid, err := needle.ParseFileIdFromString(fileId) + if err != nil { + glog.Errorf("failed to parse file id %s", fileId) + return nil + } + + if chunkSize < onDiskCacheSizeLimit0 { + data = c.diskCaches[0].getChunk(fid.Key) + if len(data) >= int(chunkSize) { + return data + } + } + if chunkSize < onDiskCacheSizeLimit1 { + data = c.diskCaches[1].getChunk(fid.Key) + if len(data) >= int(chunkSize) { + return data + } + } + { + data = c.diskCaches[2].getChunk(fid.Key) + if len(data) >= int(chunkSize) { + return data + } + } + + return nil + +} + +func (c *ChunkCache) SetChunk(fileId string, data []byte) { + if c == nil { + return + } + c.Lock() + defer c.Unlock() + + glog.V(4).Infof("SetChunk %s size %d\n", fileId, len(data)) + + c.doSetChunk(fileId, data) +} + +func (c *ChunkCache) doSetChunk(fileId string, data []byte) { + + if len(data) < memCacheSizeLimit { + c.memCache.SetChunk(fileId, data) + } + + fid, err := needle.ParseFileIdFromString(fileId) + if err != nil { + glog.Errorf("failed to parse file id %s", fileId) + return + } + + if len(data) < onDiskCacheSizeLimit0 { + c.diskCaches[0].setChunk(fid.Key, data) + } else if len(data) < onDiskCacheSizeLimit1 { + c.diskCaches[1].setChunk(fid.Key, data) + } else { + c.diskCaches[2].setChunk(fid.Key, data) + } + +} + +func (c *ChunkCache) Shutdown() { + if c == nil { + return + } + c.Lock() + defer c.Unlock() + for _, diskCache := range c.diskCaches { + diskCache.shutdown() + } +} diff --git a/weed/util/chunk_cache/chunk_cache_in_memory.go b/weed/util/chunk_cache/chunk_cache_in_memory.go new file mode 100644 index 000000000..931e45e9a --- /dev/null +++ b/weed/util/chunk_cache/chunk_cache_in_memory.go @@ -0,0 +1,36 @@ +package chunk_cache + +import ( + "time" + + "github.com/karlseguin/ccache" +) + +// a global cache for recently accessed file chunks +type ChunkCacheInMemory struct { + cache *ccache.Cache +} + +func NewChunkCacheInMemory(maxEntries int64) *ChunkCacheInMemory { + pruneCount := maxEntries >> 3 + if pruneCount <= 0 { + pruneCount = 500 + } + return &ChunkCacheInMemory{ + cache: ccache.New(ccache.Configure().MaxSize(maxEntries).ItemsToPrune(uint32(pruneCount))), + } +} + +func (c *ChunkCacheInMemory) GetChunk(fileId string) []byte { + item := c.cache.Get(fileId) + if item == nil { + return nil + } + data := item.Value().([]byte) + item.Extend(time.Hour) + return data +} + +func (c *ChunkCacheInMemory) SetChunk(fileId string, data []byte) { + c.cache.Set(fileId, data, time.Hour) +} diff --git a/weed/util/chunk_cache/chunk_cache_on_disk.go b/weed/util/chunk_cache/chunk_cache_on_disk.go new file mode 100644 index 000000000..d74f87b0c --- /dev/null +++ b/weed/util/chunk_cache/chunk_cache_on_disk.go @@ -0,0 +1,145 @@ +package chunk_cache + +import ( + "fmt" + "os" + "time" + + "github.com/syndtr/goleveldb/leveldb/opt" + + "github.com/chrislusf/seaweedfs/weed/glog" + "github.com/chrislusf/seaweedfs/weed/storage" + "github.com/chrislusf/seaweedfs/weed/storage/backend" + "github.com/chrislusf/seaweedfs/weed/storage/types" + "github.com/chrislusf/seaweedfs/weed/util" +) + +// This implements an on disk cache +// The entries are an FIFO with a size limit + +type ChunkCacheVolume struct { + DataBackend backend.BackendStorageFile + nm storage.NeedleMapper + fileName string + smallBuffer []byte + sizeLimit int64 + lastModTime time.Time + fileSize int64 +} + +func LoadOrCreateChunkCacheVolume(fileName string, preallocate int64) (*ChunkCacheVolume, error) { + + v := &ChunkCacheVolume{ + smallBuffer: make([]byte, types.NeedlePaddingSize), + fileName: fileName, + sizeLimit: preallocate, + } + + var err error + + if exists, canRead, canWrite, modTime, fileSize := util.CheckFile(v.fileName + ".dat"); exists { + if !canRead { + return nil, fmt.Errorf("cannot read cache file %s.dat", v.fileName) + } + if !canWrite { + return nil, fmt.Errorf("cannot write cache file %s.dat", v.fileName) + } + if dataFile, err := os.OpenFile(v.fileName+".dat", os.O_RDWR|os.O_CREATE, 0644); err != nil { + return nil, fmt.Errorf("cannot create cache file %s.dat: %v", v.fileName, err) + } else { + v.DataBackend = backend.NewDiskFile(dataFile) + v.lastModTime = modTime + v.fileSize = fileSize + } + } else { + if v.DataBackend, err = backend.CreateVolumeFile(v.fileName+".dat", preallocate, 0); err != nil { + return nil, fmt.Errorf("cannot create cache file %s.dat: %v", v.fileName, err) + } + v.lastModTime = time.Now() + } + + var indexFile *os.File + if indexFile, err = os.OpenFile(v.fileName+".idx", os.O_RDWR|os.O_CREATE, 0644); err != nil { + return nil, fmt.Errorf("cannot write cache index %s.idx: %v", v.fileName, err) + } + + glog.V(0).Infoln("loading leveldb", v.fileName+".ldb") + opts := &opt.Options{ + BlockCacheCapacity: 2 * 1024 * 1024, // default value is 8MiB + WriteBuffer: 1 * 1024 * 1024, // default value is 4MiB + CompactionTableSizeMultiplier: 10, // default value is 1 + } + if v.nm, err = storage.NewLevelDbNeedleMap(v.fileName+".ldb", indexFile, opts); err != nil { + return nil, fmt.Errorf("loading leveldb %s error: %v", v.fileName+".ldb", err) + } + + return v, nil + +} + +func (v *ChunkCacheVolume) Shutdown() { + if v.DataBackend != nil { + v.DataBackend.Close() + v.DataBackend = nil + } + if v.nm != nil { + v.nm.Close() + v.nm = nil + } +} + +func (v *ChunkCacheVolume) destroy() { + v.Shutdown() + os.Remove(v.fileName + ".dat") + os.Remove(v.fileName + ".idx") + os.RemoveAll(v.fileName + ".ldb") +} + +func (v *ChunkCacheVolume) Reset() (*ChunkCacheVolume, error) { + v.destroy() + return LoadOrCreateChunkCacheVolume(v.fileName, v.sizeLimit) +} + +func (v *ChunkCacheVolume) GetNeedle(key types.NeedleId) ([]byte, error) { + + nv, ok := v.nm.Get(key) + if !ok { + return nil, storage.ErrorNotFound + } + data := make([]byte, nv.Size) + if readSize, readErr := v.DataBackend.ReadAt(data, nv.Offset.ToAcutalOffset()); readErr != nil { + return nil, fmt.Errorf("read %s.dat [%d,%d): %v", + v.fileName, nv.Offset.ToAcutalOffset(), nv.Offset.ToAcutalOffset()+int64(nv.Size), readErr) + } else { + if readSize != int(nv.Size) { + return nil, fmt.Errorf("read %d, expected %d", readSize, nv.Size) + } + } + + return data, nil +} + +func (v *ChunkCacheVolume) WriteNeedle(key types.NeedleId, data []byte) error { + + offset := v.fileSize + + written, err := v.DataBackend.WriteAt(data, offset) + if err != nil { + return err + } else if written != len(data) { + return fmt.Errorf("partial written %d, expected %d", written, len(data)) + } + + v.fileSize += int64(written) + extraSize := written % types.NeedlePaddingSize + if extraSize != 0 { + v.DataBackend.WriteAt(v.smallBuffer[:types.NeedlePaddingSize-extraSize], offset+int64(written)) + v.fileSize += int64(types.NeedlePaddingSize - extraSize) + } + + if err := v.nm.Put(key, types.ToOffset(offset), uint32(len(data))); err != nil { + return err + } + + return nil +} diff --git a/weed/util/chunk_cache/chunk_cache_on_disk_test.go b/weed/util/chunk_cache/chunk_cache_on_disk_test.go new file mode 100644 index 000000000..f061f2ba2 --- /dev/null +++ b/weed/util/chunk_cache/chunk_cache_on_disk_test.go @@ -0,0 +1,59 @@ +package chunk_cache + +import ( + "bytes" + "fmt" + "io/ioutil" + "math/rand" + "os" + "testing" +) + +func TestOnDisk(t *testing.T) { + + tmpDir, _ := ioutil.TempDir("", "c") + defer os.RemoveAll(tmpDir) + + totalDiskSizeMb := int64(32) + + cache := NewChunkCache(0, tmpDir, totalDiskSizeMb) + + writeCount := 5 + type test_data struct { + data []byte + fileId string + size uint64 + } + testData := make([]*test_data, writeCount) + for i := 0; i < writeCount; i++ { + buff := make([]byte, 1024*1024) + rand.Read(buff) + testData[i] = &test_data{ + data: buff, + fileId: fmt.Sprintf("1,%daabbccdd", i+1), + size: uint64(len(buff)), + } + cache.SetChunk(testData[i].fileId, testData[i].data) + } + + for i := 0; i < writeCount; i++ { + data := cache.GetChunk(testData[i].fileId, testData[i].size) + if bytes.Compare(data, testData[i].data) != 0 { + t.Errorf("failed to write to and read from cache: %d", i) + } + } + + cache.Shutdown() + + cache = NewChunkCache(0, tmpDir, totalDiskSizeMb) + + for i := 0; i < writeCount; i++ { + data := cache.GetChunk(testData[i].fileId, testData[i].size) + if bytes.Compare(data, testData[i].data) != 0 { + t.Errorf("failed to write to and read from cache: %d", i) + } + } + + cache.Shutdown() + +} diff --git a/weed/util/chunk_cache/on_disk_cache_layer.go b/weed/util/chunk_cache/on_disk_cache_layer.go new file mode 100644 index 000000000..c3192b548 --- /dev/null +++ b/weed/util/chunk_cache/on_disk_cache_layer.go @@ -0,0 +1,91 @@ +package chunk_cache + +import ( + "fmt" + "path" + "sort" + + "github.com/chrislusf/seaweedfs/weed/glog" + "github.com/chrislusf/seaweedfs/weed/storage" + "github.com/chrislusf/seaweedfs/weed/storage/types" +) + +type OnDiskCacheLayer struct { + diskCaches []*ChunkCacheVolume +} + +func NewOnDiskCacheLayer(dir, namePrefix string, diskSizeMB int64, segmentCount int) *OnDiskCacheLayer { + + volumeCount, volumeSize := int(diskSizeMB/30000), int64(30000) + if volumeCount < segmentCount { + volumeCount, volumeSize = segmentCount, diskSizeMB/int64(segmentCount) + } + + c := &OnDiskCacheLayer{} + for i := 0; i < volumeCount; i++ { + fileName := path.Join(dir, fmt.Sprintf("%s_%d", namePrefix, i)) + diskCache, err := LoadOrCreateChunkCacheVolume(fileName, volumeSize*1024*1024) + if err != nil { + glog.Errorf("failed to add cache %s : %v", fileName, err) + } else { + c.diskCaches = append(c.diskCaches, diskCache) + } + } + + // keep newest cache to the front + sort.Slice(c.diskCaches, func(i, j int) bool { + return c.diskCaches[i].lastModTime.After(c.diskCaches[j].lastModTime) + }) + + return c +} + +func (c *OnDiskCacheLayer) setChunk(needleId types.NeedleId, data []byte) { + + if c.diskCaches[0].fileSize+int64(len(data)) > c.diskCaches[0].sizeLimit { + t, resetErr := c.diskCaches[len(c.diskCaches)-1].Reset() + if resetErr != nil { + glog.Errorf("failed to reset cache file %s", c.diskCaches[len(c.diskCaches)-1].fileName) + return + } + for i := len(c.diskCaches) - 1; i > 0; i-- { + c.diskCaches[i] = c.diskCaches[i-1] + } + c.diskCaches[0] = t + } + + if err := c.diskCaches[0].WriteNeedle(needleId, data); err != nil { + glog.V(0).Infof("cache write %v size %d: %v", needleId, len(data), err) + } + +} + +func (c *OnDiskCacheLayer) getChunk(needleId types.NeedleId) (data []byte) { + + var err error + + for _, diskCache := range c.diskCaches { + data, err = diskCache.GetNeedle(needleId) + if err == storage.ErrorNotFound { + continue + } + if err != nil { + glog.Errorf("failed to read cache file %s id %d", diskCache.fileName, needleId) + continue + } + if len(data) != 0 { + return + } + } + + return nil + +} + +func (c *OnDiskCacheLayer) shutdown() { + + for _, diskCache := range c.diskCaches { + diskCache.Shutdown() + } + +} diff --git a/weed/util/cipher.go b/weed/util/cipher.go new file mode 100644 index 000000000..f044c2ca3 --- /dev/null +++ b/weed/util/cipher.go @@ -0,0 +1,60 @@ +package util + +import ( + "crypto/aes" + "crypto/cipher" + "crypto/rand" + "errors" + "io" + + "github.com/chrislusf/seaweedfs/weed/glog" +) + +type CipherKey []byte + +func GenCipherKey() CipherKey { + key := make([]byte, 32) + if _, err := io.ReadFull(rand.Reader, key); err != nil { + glog.Fatalf("random key gen: %v", err) + } + return CipherKey(key) +} + +func Encrypt(plaintext []byte, key CipherKey) ([]byte, error) { + c, err := aes.NewCipher(key) + if err != nil { + return nil, err + } + + gcm, err := cipher.NewGCM(c) + if err != nil { + return nil, err + } + + nonce := make([]byte, gcm.NonceSize()) + if _, err = io.ReadFull(rand.Reader, nonce); err != nil { + return nil, err + } + + return gcm.Seal(nonce, nonce, plaintext, nil), nil +} + +func Decrypt(ciphertext []byte, key CipherKey) ([]byte, error) { + c, err := aes.NewCipher(key) + if err != nil { + return nil, err + } + + gcm, err := cipher.NewGCM(c) + if err != nil { + return nil, err + } + + nonceSize := gcm.NonceSize() + if len(ciphertext) < nonceSize { + return nil, errors.New("ciphertext too short") + } + + nonce, ciphertext := ciphertext[:nonceSize], ciphertext[nonceSize:] + return gcm.Open(nil, nonce, ciphertext, nil) +} diff --git a/weed/util/cipher_test.go b/weed/util/cipher_test.go new file mode 100644 index 000000000..026c96ea3 --- /dev/null +++ b/weed/util/cipher_test.go @@ -0,0 +1,17 @@ +package util + +import ( + "encoding/base64" + "testing" +) + +func TestSameAsJavaImplementation(t *testing.T) { + str := "QVVhmqg112NMT7F+G/7QPynqSln3xPIhKdFGmTVKZD6IS0noyr2Z5kXFF6fPjZ/7Hq8kRhlmLeeqZUccxyaZHezOdgkjS6d4NTdHf5IjXzk7" + cipherText, _ := base64.StdEncoding.DecodeString(str) + secretKey := []byte("256-bit key for AES 256 GCM encr") + plantext, err := Decrypt(cipherText, CipherKey(secretKey)) + if err != nil { + println(err.Error()) + } + println(string(plantext)) +} diff --git a/weed/util/compression.go b/weed/util/compression.go new file mode 100644 index 000000000..b526f47c9 --- /dev/null +++ b/weed/util/compression.go @@ -0,0 +1,126 @@ +package util + +import ( + "bytes" + "compress/flate" + "compress/gzip" + "fmt" + "io/ioutil" + "strings" + + "github.com/chrislusf/seaweedfs/weed/glog" + "github.com/klauspost/compress/zstd" +) + +func GzipData(input []byte) ([]byte, error) { + buf := new(bytes.Buffer) + w, _ := gzip.NewWriterLevel(buf, flate.BestSpeed) + if _, err := w.Write(input); err != nil { + glog.V(2).Infoln("error compressing data:", err) + return nil, err + } + if err := w.Close(); err != nil { + glog.V(2).Infoln("error closing compressed data:", err) + return nil, err + } + return buf.Bytes(), nil +} + +var zstdEncoder, _ = zstd.NewWriter(nil) + +func ZstdData(input []byte) ([]byte, error) { + return zstdEncoder.EncodeAll(input, nil), nil +} + +func DecompressData(input []byte) ([]byte, error) { + if IsGzippedContent(input) { + return ungzipData(input) + } + if IsZstdContent(input) { + return unzstdData(input) + } + return nil, fmt.Errorf("unsupported compression") +} + +func ungzipData(input []byte) ([]byte, error) { + buf := bytes.NewBuffer(input) + r, _ := gzip.NewReader(buf) + defer r.Close() + output, err := ioutil.ReadAll(r) + if err != nil { + glog.V(2).Infoln("error uncompressing data:", err) + } + return output, err +} + +var decoder, _ = zstd.NewReader(nil) +func unzstdData(input []byte) ([]byte, error) { + return decoder.DecodeAll(input, nil) +} + +func IsGzippedContent(data []byte) bool { + if len(data) < 2 { + return false + } + return data[0] == 31 && data[1] == 139 +} + +func IsZstdContent(data []byte) bool { + if len(data) < 4 { + return false + } + return data[3] == 0xFD && data[2] == 0x2F && data[1] == 0xB5 && data[0] == 0x28 +} + +/* +* Default not to compressed since compression can be done on client side. + */func IsCompressableFileType(ext, mtype string) (shouldBeCompressed, iAmSure bool) { + + // text + if strings.HasPrefix(mtype, "text/") { + return true, true + } + + // images + switch ext { + case ".svg", ".bmp", ".wav": + return true, true + } + if strings.HasPrefix(mtype, "image/") { + return false, true + } + + // by file name extension + switch ext { + case ".zip", ".rar", ".gz", ".bz2", ".xz", ".zst": + return false, true + case ".pdf", ".txt", ".html", ".htm", ".css", ".js", ".json": + return true, true + case ".php", ".java", ".go", ".rb", ".c", ".cpp", ".h", ".hpp": + return true, true + case ".png", ".jpg", ".jpeg": + return false, true + } + + // by mime type + if strings.HasPrefix(mtype, "application/") { + if strings.HasSuffix(mtype, "zstd") { + return false, true + } + if strings.HasSuffix(mtype, "xml") { + return true, true + } + if strings.HasSuffix(mtype, "script") { + return true, true + } + } + + if strings.HasPrefix(mtype, "audio/") { + switch strings.TrimPrefix(mtype, "audio/") { + case "wave", "wav", "x-wav", "x-pn-wav": + return true, true + } + } + + return false, false +} diff --git a/weed/util/compression_test.go b/weed/util/compression_test.go new file mode 100644 index 000000000..b515e8988 --- /dev/null +++ b/weed/util/compression_test.go @@ -0,0 +1,21 @@ +package util + +import ( + "testing" + + "golang.org/x/tools/godoc/util" +) + +func TestIsGzippable(t *testing.T) { + buf := make([]byte, 1024) + + isText := util.IsText(buf) + + if isText { + t.Error("buf with zeros are not text") + } + + compressed, _ := GzipData(buf) + + t.Logf("compressed size %d\n", len(compressed)) +} diff --git a/weed/util/config.go b/weed/util/config.go index 77cab3019..7b6e92f08 100644 --- a/weed/util/config.go +++ b/weed/util/config.go @@ -1,10 +1,51 @@ package util +import ( + "strings" + + "github.com/spf13/viper" + + "github.com/chrislusf/seaweedfs/weed/glog" +) + type Configuration interface { GetString(key string) string GetBool(key string) bool GetInt(key string) int - GetInt64(key string) int64 - GetFloat64(key string) float64 GetStringSlice(key string) []string + SetDefault(key string, value interface{}) +} + +func LoadConfiguration(configFileName string, required bool) (loaded bool) { + + // find a filer store + viper.SetConfigName(configFileName) // name of config file (without extension) + viper.AddConfigPath(".") // optionally look for config in the working directory + viper.AddConfigPath("$HOME/.seaweedfs") // call multiple times to add many search paths + viper.AddConfigPath("/etc/seaweedfs/") // path to look for the config file in + + glog.V(1).Infof("Reading %s.toml from %s", configFileName, viper.ConfigFileUsed()) + + if err := viper.MergeInConfig(); err != nil { // Handle errors reading the config file + glog.V(1).Infof("Reading %s: %v", viper.ConfigFileUsed(), err) + if required { + glog.Fatalf("Failed to load %s.toml file from current directory, or $HOME/.seaweedfs/, or /etc/seaweedfs/"+ + "\n\nPlease use this command to generate the default %s.toml file\n"+ + " weed scaffold -config=%s -output=.\n\n\n", + configFileName, configFileName, configFileName) + } else { + return false + } + } + + return true +} + +func GetViper() *viper.Viper { + v := &viper.Viper{} + *v = *viper.GetViper() + v.AutomaticEnv() + v.SetEnvPrefix("weed") + v.SetEnvKeyReplacer(strings.NewReplacer(".", "_")) + return v } diff --git a/weed/util/constants.go b/weed/util/constants.go index 9ddf07261..3433c550b 100644 --- a/weed/util/constants.go +++ b/weed/util/constants.go @@ -1,5 +1,14 @@ package util -const ( - VERSION = "1.23" +import ( + "fmt" ) + +var ( + VERSION = fmt.Sprintf("%s %d.%d", sizeLimit, 1, 84) + COMMIT = "" +) + +func Version() string { + return VERSION + " " + COMMIT +} diff --git a/weed/util/constants_4bytes.go b/weed/util/constants_4bytes.go new file mode 100644 index 000000000..a29d9d3b0 --- /dev/null +++ b/weed/util/constants_4bytes.go @@ -0,0 +1,8 @@ +// +build !5BytesOffset + +package util + +const ( + sizeLimit = "30GB" + VolumeSizeLimitGB = 30 +) diff --git a/weed/util/constants_5bytes.go b/weed/util/constants_5bytes.go new file mode 100644 index 000000000..91ce4066f --- /dev/null +++ b/weed/util/constants_5bytes.go @@ -0,0 +1,8 @@ +// +build 5BytesOffset + +package util + +const ( + sizeLimit = "8000GB" + VolumeSizeLimitGB = 8000 +) diff --git a/weed/util/file_util.go b/weed/util/file_util.go index 8ff2978ba..ff725830b 100644 --- a/weed/util/file_util.go +++ b/weed/util/file_util.go @@ -3,6 +3,7 @@ package util import ( "errors" "os" + "time" "github.com/chrislusf/seaweedfs/weed/glog" ) @@ -30,3 +31,35 @@ func GetFileSize(file *os.File) (size int64, err error) { } return } + +func FileExists(filename string) bool { + + _, err := os.Stat(filename) + if os.IsNotExist(err) { + return false + } + return true + +} + +func CheckFile(filename string) (exists, canRead, canWrite bool, modTime time.Time, fileSize int64) { + exists = true + fi, err := os.Stat(filename) + if os.IsNotExist(err) { + exists = false + return + } + if err != nil { + glog.Errorf("check %s: %v", filename, err) + return + } + if fi.Mode()&0400 != 0 { + canRead = true + } + if fi.Mode()&0200 != 0 { + canWrite = true + } + modTime = fi.ModTime() + fileSize = fi.Size() + return +} diff --git a/weed/util/file_util_non_posix.go b/weed/util/file_util_non_posix.go new file mode 100644 index 000000000..ffcfef6d5 --- /dev/null +++ b/weed/util/file_util_non_posix.go @@ -0,0 +1,12 @@ +// +build linux darwin freebsd netbsd openbsd plan9 solaris zos + +package util + +import ( + "os" + "syscall" +) + +func GetFileUidGid(fi os.FileInfo) (uid, gid uint32) { + return fi.Sys().(*syscall.Stat_t).Uid, fi.Sys().(*syscall.Stat_t).Gid +} diff --git a/weed/util/file_util_posix.go b/weed/util/file_util_posix.go new file mode 100644 index 000000000..22ca60b3b --- /dev/null +++ b/weed/util/file_util_posix.go @@ -0,0 +1,11 @@ +// +build windows + +package util + +import ( + "os" +) + +func GetFileUidGid(fi os.FileInfo) (uid, gid uint32) { + return 0, 0 +} diff --git a/weed/util/fullpath.go b/weed/util/fullpath.go new file mode 100644 index 000000000..4ce8a2f90 --- /dev/null +++ b/weed/util/fullpath.go @@ -0,0 +1,56 @@ +package util + +import ( + "path/filepath" + "strings" +) + +type FullPath string + +func NewFullPath(dir, name string) FullPath { + return FullPath(dir).Child(name) +} + +func (fp FullPath) DirAndName() (string, string) { + dir, name := filepath.Split(string(fp)) + if dir == "/" { + return dir, name + } + if len(dir) < 1 { + return "/", "" + } + return dir[:len(dir)-1], name +} + +func (fp FullPath) Name() string { + _, name := filepath.Split(string(fp)) + return name +} + +func (fp FullPath) Child(name string) FullPath { + dir := string(fp) + if strings.HasSuffix(dir, "/") { + return FullPath(dir + name) + } + return FullPath(dir + "/" + name) +} + +func (fp FullPath) AsInode() uint64 { + return uint64(HashStringToLong(string(fp))) +} + +// split, but skipping the root +func (fp FullPath) Split() []string { + if fp == "" || fp == "/" { + return []string{} + } + return strings.Split(string(fp)[1:], "/") +} + +func Join(names ...string) string { + return filepath.ToSlash(filepath.Join(names...)) +} + +func JoinPath(names ...string) FullPath { + return FullPath(Join(names...)) +} diff --git a/weed/util/pprof.go b/weed/util/grace/pprof.go index a2621ceee..14686bfc8 100644 --- a/weed/util/pprof.go +++ b/weed/util/grace/pprof.go @@ -1,4 +1,4 @@ -package util +package grace import ( "os" diff --git a/weed/util/signal_handling.go b/weed/util/grace/signal_handling.go index 99447e8be..7cca46764 100644 --- a/weed/util/signal_handling.go +++ b/weed/util/grace/signal_handling.go @@ -1,6 +1,6 @@ // +build !plan9 -package util +package grace import ( "os" diff --git a/weed/util/signal_handling_notsupported.go b/weed/util/grace/signal_handling_notsupported.go index c389cfb7e..5335915a1 100644 --- a/weed/util/signal_handling_notsupported.go +++ b/weed/util/grace/signal_handling_notsupported.go @@ -1,6 +1,6 @@ // +build plan9 -package util +package grace func OnInterrupt(fn func()) { } diff --git a/weed/util/grpc_client_server.go b/weed/util/grpc_client_server.go deleted file mode 100644 index d029d21ae..000000000 --- a/weed/util/grpc_client_server.go +++ /dev/null @@ -1,87 +0,0 @@ -package util - -import ( - "fmt" - "strconv" - "strings" - "sync" - "time" - - "google.golang.org/grpc" - "google.golang.org/grpc/keepalive" -) - -var ( - // cache grpc connections - grpcClients = make(map[string]*grpc.ClientConn) - grpcClientsLock sync.Mutex -) - -func NewGrpcServer() *grpc.Server { - return grpc.NewServer(grpc.KeepaliveParams(keepalive.ServerParameters{ - Time: 10 * time.Second, // wait time before ping if no activity - Timeout: 20 * time.Second, // ping timeout - }), grpc.KeepaliveEnforcementPolicy(keepalive.EnforcementPolicy{ - MinTime: 60 * time.Second, // min time a client should wait before sending a ping - })) -} - -func GrpcDial(address string, opts ...grpc.DialOption) (*grpc.ClientConn, error) { - // opts = append(opts, grpc.WithBlock()) - // opts = append(opts, grpc.WithTimeout(time.Duration(5*time.Second))) - opts = append(opts, grpc.WithInsecure()) - opts = append(opts, grpc.WithKeepaliveParams(keepalive.ClientParameters{ - Time: 30 * time.Second, // client ping server if no activity for this long - Timeout: 20 * time.Second, - })) - - return grpc.Dial(address, opts...) -} - -func WithCachedGrpcClient(fn func(*grpc.ClientConn) error, address string, opts ...grpc.DialOption) error { - - grpcClientsLock.Lock() - - existingConnection, found := grpcClients[address] - if found { - grpcClientsLock.Unlock() - return fn(existingConnection) - } - - grpcConnection, err := GrpcDial(address, opts...) - if err != nil { - grpcClientsLock.Unlock() - return fmt.Errorf("fail to dial %s: %v", address, err) - } - - grpcClients[address] = grpcConnection - grpcClientsLock.Unlock() - - err = fn(grpcConnection) - if err != nil { - grpcClientsLock.Lock() - delete(grpcClients, address) - grpcClientsLock.Unlock() - } - - return err -} - -func ParseServerToGrpcAddress(server string, optionalGrpcPort int) (serverGrpcAddress string, err error) { - hostnameAndPort := strings.Split(server, ":") - if len(hostnameAndPort) != 2 { - return "", fmt.Errorf("The server should have hostname:port format: %v", hostnameAndPort) - } - - filerPort, parseErr := strconv.ParseUint(hostnameAndPort[1], 10, 64) - if parseErr != nil { - return "", fmt.Errorf("The server port parse error: %v", parseErr) - } - - filerGrpcPort := int(filerPort) + 10000 - if optionalGrpcPort != 0 { - filerGrpcPort = optionalGrpcPort - } - - return fmt.Sprintf("%s:%d", hostnameAndPort[0], filerGrpcPort), nil -} diff --git a/weed/util/http_util.go b/weed/util/http_util.go index 21e0a678d..c67eb3276 100644 --- a/weed/util/http_util.go +++ b/weed/util/http_util.go @@ -11,9 +11,8 @@ import ( "net/http" "net/url" "strings" - "time" - "github.com/chrislusf/seaweedfs/weed/security" + "github.com/chrislusf/seaweedfs/weed/glog" ) var ( @@ -27,23 +26,22 @@ func init() { } client = &http.Client{ Transport: Transport, - Timeout: 5 * time.Second, } } func PostBytes(url string, body []byte) ([]byte, error) { - r, err := client.Post(url, "application/octet-stream", bytes.NewReader(body)) + r, err := client.Post(url, "", bytes.NewReader(body)) if err != nil { return nil, fmt.Errorf("Post to %s: %v", url, err) } defer r.Body.Close() - if r.StatusCode >= 400 { - return nil, fmt.Errorf("%s: %s", url, r.Status) - } b, err := ioutil.ReadAll(r.Body) if err != nil { return nil, fmt.Errorf("Read response body: %v", err) } + if r.StatusCode >= 400 { + return nil, fmt.Errorf("%s: %s", url, r.Status) + } return b, nil } @@ -90,14 +88,14 @@ func Head(url string) (http.Header, error) { if err != nil { return nil, err } - defer r.Body.Close() + defer CloseResponse(r) if r.StatusCode >= 400 { return nil, fmt.Errorf("%s: %s", url, r.Status) } return r.Header, nil } -func Delete(url string, jwt security.EncodedJwt) error { +func Delete(url string, jwt string) error { req, err := http.NewRequest("DELETE", url, nil) if jwt != "" { req.Header.Set("Authorization", "BEARER "+string(jwt)) @@ -119,7 +117,7 @@ func Delete(url string, jwt security.EncodedJwt) error { return nil } m := make(map[string]interface{}) - if e := json.Unmarshal(body, m); e == nil { + if e := json.Unmarshal(body, &m); e == nil { if s, ok := m["error"].(string); ok { return errors.New(s) } @@ -132,7 +130,7 @@ func GetBufferStream(url string, values url.Values, allocatedBytes []byte, eachB if err != nil { return err } - defer r.Body.Close() + defer CloseResponse(r) if r.StatusCode != 200 { return fmt.Errorf("%s: %s", url, r.Status) } @@ -155,7 +153,7 @@ func GetUrlStream(url string, values url.Values, readFn func(io.Reader) error) e if err != nil { return err } - defer r.Body.Close() + defer CloseResponse(r) if r.StatusCode != 200 { return fmt.Errorf("%s: %s", url, r.Status) } @@ -191,11 +189,22 @@ func NormalizeUrl(url string) string { return "http://" + url } -func ReadUrl(fileUrl string, offset int64, size int, buf []byte, isReadRange bool) (n int64, e error) { +func ReadUrl(fileUrl string, cipherKey []byte, isContentCompressed bool, isFullChunk bool, offset int64, size int, buf []byte) (int64, error) { + + if cipherKey != nil { + var n int + err := readEncryptedUrl(fileUrl, cipherKey, isContentCompressed, isFullChunk, offset, size, func(data []byte) { + n = copy(buf, data) + }) + return int64(n), err + } - req, _ := http.NewRequest("GET", fileUrl, nil) - if isReadRange { - req.Header.Add("Range", fmt.Sprintf("bytes=%d-%d", offset, offset+int64(size))) + req, err := http.NewRequest("GET", fileUrl, nil) + if err != nil { + return 0, err + } + if !isFullChunk { + req.Header.Add("Range", fmt.Sprintf("bytes=%d-%d", offset, offset+int64(size)-1)) } else { req.Header.Set("Accept-Encoding", "gzip") } @@ -211,7 +220,8 @@ func ReadUrl(fileUrl string, offset int64, size int, buf []byte, isReadRange boo } var reader io.ReadCloser - switch r.Header.Get("Content-Encoding") { + contentEncoding := r.Header.Get("Content-Encoding") + switch contentEncoding { case "gzip": reader, err = gzip.NewReader(r.Body) defer reader.Close() @@ -219,55 +229,125 @@ func ReadUrl(fileUrl string, offset int64, size int, buf []byte, isReadRange boo reader = r.Body } - var i, m int + var ( + i, m int + n int64 + ) + // refers to https://github.com/golang/go/blob/master/src/bytes/buffer.go#L199 + // commit id c170b14c2c1cfb2fd853a37add92a82fd6eb4318 for { m, err = reader.Read(buf[i:]) - if m == 0 { - return - } i += m n += int64(m) if err == io.EOF { return n, nil } - if e != nil { - return n, e + if err != nil { + return n, err + } + if n == int64(len(buf)) { + break } } - + // drains the response body to avoid memory leak + data, _ := ioutil.ReadAll(reader) + if len(data) != 0 { + glog.V(1).Infof("%s reader has remaining %d bytes", contentEncoding, len(data)) + } + return n, err } -func ReadUrlAsStream(fileUrl string, offset int64, size int, fn func(data []byte)) (n int64, e error) { +func ReadUrlAsStream(fileUrl string, cipherKey []byte, isContentGzipped bool, isFullChunk bool, offset int64, size int, fn func(data []byte)) error { + + if cipherKey != nil { + return readEncryptedUrl(fileUrl, cipherKey, isContentGzipped, isFullChunk, offset, size, fn) + } + + req, err := http.NewRequest("GET", fileUrl, nil) + if err != nil { + return err + } - req, _ := http.NewRequest("GET", fileUrl, nil) - req.Header.Add("Range", fmt.Sprintf("bytes=%d-%d", offset, offset+int64(size))) + if !isFullChunk { + req.Header.Add("Range", fmt.Sprintf("bytes=%d-%d", offset, offset+int64(size)-1)) + } r, err := client.Do(req) if err != nil { - return 0, err + return err } - defer r.Body.Close() + defer CloseResponse(r) if r.StatusCode >= 400 { - return 0, fmt.Errorf("%s: %s", fileUrl, r.Status) + return fmt.Errorf("%s: %s", fileUrl, r.Status) } - var m int + var ( + m int + ) buf := make([]byte, 64*1024) for { m, err = r.Body.Read(buf) - if m == 0 { - return - } fn(buf[:m]) - n += int64(m) if err == io.EOF { - return n, nil + return nil + } + if err != nil { + return err } - if e != nil { - return n, e + } + +} + +func readEncryptedUrl(fileUrl string, cipherKey []byte, isContentCompressed bool, isFullChunk bool, offset int64, size int, fn func(data []byte)) error { + encryptedData, err := Get(fileUrl) + if err != nil { + return fmt.Errorf("fetch %s: %v", fileUrl, err) + } + decryptedData, err := Decrypt(encryptedData, CipherKey(cipherKey)) + if err != nil { + return fmt.Errorf("decrypt %s: %v", fileUrl, err) + } + if isContentCompressed { + decryptedData, err = DecompressData(decryptedData) + if err != nil { + return fmt.Errorf("unzip decrypt %s: %v", fileUrl, err) } } + if len(decryptedData) < int(offset)+size { + return fmt.Errorf("read decrypted %s size %d [%d, %d)", fileUrl, len(decryptedData), offset, int(offset)+size) + } + if isFullChunk { + fn(decryptedData) + } else { + fn(decryptedData[int(offset) : int(offset)+size]) + } + return nil +} + +func ReadUrlAsReaderCloser(fileUrl string, rangeHeader string) (io.ReadCloser, error) { + + req, err := http.NewRequest("GET", fileUrl, nil) + if err != nil { + return nil, err + } + if rangeHeader != "" { + req.Header.Add("Range", rangeHeader) + } + + r, err := client.Do(req) + if err != nil { + return nil, err + } + if r.StatusCode >= 400 { + return nil, fmt.Errorf("%s: %s", fileUrl, r.Status) + } + + return r.Body, nil +} +func CloseResponse(resp *http.Response) { + io.Copy(ioutil.Discard, resp.Body) + resp.Body.Close() } diff --git a/weed/util/httpdown/http_down.go b/weed/util/httpdown/http_down.go new file mode 100644 index 000000000..5cbd9611c --- /dev/null +++ b/weed/util/httpdown/http_down.go @@ -0,0 +1,395 @@ +// Package httpdown provides http.ConnState enabled graceful termination of +// http.Server. +// based on github.com/facebookarchive/httpdown, who's licence is MIT-licence, +// we add a feature of supporting for http TLS +package httpdown + +import ( + "crypto/tls" + "fmt" + "net" + "net/http" + "os" + "os/signal" + "sync" + "syscall" + "time" + + "github.com/facebookgo/clock" + "github.com/facebookgo/stats" +) + +const ( + defaultStopTimeout = time.Minute + defaultKillTimeout = time.Minute +) + +// A Server allows encapsulates the process of accepting new connections and +// serving them, and gracefully shutting down the listener without dropping +// active connections. +type Server interface { + // Wait waits for the serving loop to finish. This will happen when Stop is + // called, at which point it returns no error, or if there is an error in the + // serving loop. You must call Wait after calling Serve or ListenAndServe. + Wait() error + + // Stop stops the listener. It will block until all connections have been + // closed. + Stop() error +} + +// HTTP defines the configuration for serving a http.Server. Multiple calls to +// Serve or ListenAndServe can be made on the same HTTP instance. The default +// timeouts of 1 minute each result in a maximum of 2 minutes before a Stop() +// returns. +type HTTP struct { + // StopTimeout is the duration before we begin force closing connections. + // Defaults to 1 minute. + StopTimeout time.Duration + + // KillTimeout is the duration before which we completely give up and abort + // even though we still have connected clients. This is useful when a large + // number of client connections exist and closing them can take a long time. + // Note, this is in addition to the StopTimeout. Defaults to 1 minute. + KillTimeout time.Duration + + // Stats is optional. If provided, it will be used to record various metrics. + Stats stats.Client + + // Clock allows for testing timing related functionality. Do not specify this + // in production code. + Clock clock.Clock + + // when set CertFile and KeyFile, the httpDown will start a http with TLS. + // Files containing a certificate and matching private key for the + // server must be provided if neither the Server's + // TLSConfig.Certificates nor TLSConfig.GetCertificate are populated. + // If the certificate is signed by a certificate authority, the + // certFile should be the concatenation of the server's certificate, + // any intermediates, and the CA's certificate. + CertFile, KeyFile string +} + +// Serve provides the low-level API which is useful if you're creating your own +// net.Listener. +func (h HTTP) Serve(s *http.Server, l net.Listener) Server { + stopTimeout := h.StopTimeout + if stopTimeout == 0 { + stopTimeout = defaultStopTimeout + } + killTimeout := h.KillTimeout + if killTimeout == 0 { + killTimeout = defaultKillTimeout + } + klock := h.Clock + if klock == nil { + klock = clock.New() + } + + ss := &server{ + stopTimeout: stopTimeout, + killTimeout: killTimeout, + stats: h.Stats, + clock: klock, + oldConnState: s.ConnState, + listener: l, + server: s, + serveDone: make(chan struct{}), + serveErr: make(chan error, 1), + new: make(chan net.Conn), + active: make(chan net.Conn), + idle: make(chan net.Conn), + closed: make(chan net.Conn), + stop: make(chan chan struct{}), + kill: make(chan chan struct{}), + certFile: h.CertFile, + keyFile: h.KeyFile, + } + s.ConnState = ss.connState + go ss.manage() + go ss.serve() + return ss +} + +// ListenAndServe returns a Server for the given http.Server. It is equivalent +// to ListenAndServe from the standard library, but returns immediately. +// Requests will be accepted in a background goroutine. If the http.Server has +// a non-nil TLSConfig, a TLS enabled listener will be setup. +func (h HTTP) ListenAndServe(s *http.Server) (Server, error) { + addr := s.Addr + if addr == "" { + if s.TLSConfig == nil { + addr = ":http" + } else { + addr = ":https" + } + } + l, err := net.Listen("tcp", addr) + if err != nil { + stats.BumpSum(h.Stats, "listen.error", 1) + return nil, err + } + if s.TLSConfig != nil { + l = tls.NewListener(l, s.TLSConfig) + } + return h.Serve(s, l), nil +} + +// server manages the serving process and allows for gracefully stopping it. +type server struct { + stopTimeout time.Duration + killTimeout time.Duration + stats stats.Client + clock clock.Clock + + oldConnState func(net.Conn, http.ConnState) + server *http.Server + serveDone chan struct{} + serveErr chan error + listener net.Listener + + new chan net.Conn + active chan net.Conn + idle chan net.Conn + closed chan net.Conn + stop chan chan struct{} + kill chan chan struct{} + + stopOnce sync.Once + stopErr error + + certFile, keyFile string +} + +func (s *server) connState(c net.Conn, cs http.ConnState) { + if s.oldConnState != nil { + s.oldConnState(c, cs) + } + + switch cs { + case http.StateNew: + s.new <- c + case http.StateActive: + s.active <- c + case http.StateIdle: + s.idle <- c + case http.StateHijacked, http.StateClosed: + s.closed <- c + } +} + +func (s *server) manage() { + defer func() { + close(s.new) + close(s.active) + close(s.idle) + close(s.closed) + close(s.stop) + close(s.kill) + }() + + var stopDone chan struct{} + + conns := map[net.Conn]http.ConnState{} + var countNew, countActive, countIdle float64 + + // decConn decrements the count associated with the current state of the + // given connection. + decConn := func(c net.Conn) { + switch conns[c] { + default: + panic(fmt.Errorf("unknown existing connection: %s", c)) + case http.StateNew: + countNew-- + case http.StateActive: + countActive-- + case http.StateIdle: + countIdle-- + } + } + + // setup a ticker to report various values every minute. if we don't have a + // Stats implementation provided, we Stop it so it never ticks. + statsTicker := s.clock.Ticker(time.Minute) + if s.stats == nil { + statsTicker.Stop() + } + + for { + select { + case <-statsTicker.C: + // we'll only get here when s.stats is not nil + s.stats.BumpAvg("http-state.new", countNew) + s.stats.BumpAvg("http-state.active", countActive) + s.stats.BumpAvg("http-state.idle", countIdle) + s.stats.BumpAvg("http-state.total", countNew+countActive+countIdle) + case c := <-s.new: + conns[c] = http.StateNew + countNew++ + case c := <-s.active: + decConn(c) + countActive++ + + conns[c] = http.StateActive + case c := <-s.idle: + decConn(c) + countIdle++ + + conns[c] = http.StateIdle + + // if we're already stopping, close it + if stopDone != nil { + c.Close() + } + case c := <-s.closed: + stats.BumpSum(s.stats, "conn.closed", 1) + decConn(c) + delete(conns, c) + + // if we're waiting to stop and are all empty, we just closed the last + // connection and we're done. + if stopDone != nil && len(conns) == 0 { + close(stopDone) + return + } + case stopDone = <-s.stop: + // if we're already all empty, we're already done + if len(conns) == 0 { + close(stopDone) + return + } + + // close current idle connections right away + for c, cs := range conns { + if cs == http.StateIdle { + c.Close() + } + } + + // continue the loop and wait for all the ConnState updates which will + // eventually close(stopDone) and return from this goroutine. + + case killDone := <-s.kill: + // force close all connections + stats.BumpSum(s.stats, "kill.conn.count", float64(len(conns))) + for c := range conns { + c.Close() + } + + // don't block the kill. + close(killDone) + + // continue the loop and we wait for all the ConnState updates and will + // return from this goroutine when we're all done. otherwise we'll try to + // send those ConnState updates on closed channels. + + } + } +} + +func (s *server) serve() { + stats.BumpSum(s.stats, "serve", 1) + if s.certFile == "" && s.keyFile == "" { + s.serveErr <- s.server.Serve(s.listener) + } else { + s.serveErr <- s.server.ServeTLS(s.listener, s.certFile, s.keyFile) + } + close(s.serveDone) + close(s.serveErr) +} + +func (s *server) Wait() error { + if err := <-s.serveErr; !isUseOfClosedError(err) { + return err + } + return nil +} + +func (s *server) Stop() error { + s.stopOnce.Do(func() { + defer stats.BumpTime(s.stats, "stop.time").End() + stats.BumpSum(s.stats, "stop", 1) + + // first disable keep-alive for new connections + s.server.SetKeepAlivesEnabled(false) + + // then close the listener so new connections can't connect come thru + closeErr := s.listener.Close() + <-s.serveDone + + // then trigger the background goroutine to stop and wait for it + stopDone := make(chan struct{}) + s.stop <- stopDone + + // wait for stop + select { + case <-stopDone: + case <-s.clock.After(s.stopTimeout): + defer stats.BumpTime(s.stats, "kill.time").End() + stats.BumpSum(s.stats, "kill", 1) + + // stop timed out, wait for kill + killDone := make(chan struct{}) + s.kill <- killDone + select { + case <-killDone: + case <-s.clock.After(s.killTimeout): + // kill timed out, give up + stats.BumpSum(s.stats, "kill.timeout", 1) + } + } + + if closeErr != nil && !isUseOfClosedError(closeErr) { + stats.BumpSum(s.stats, "listener.close.error", 1) + s.stopErr = closeErr + } + }) + return s.stopErr +} + +func isUseOfClosedError(err error) bool { + if err == nil { + return false + } + if opErr, ok := err.(*net.OpError); ok { + err = opErr.Err + } + return err.Error() == "use of closed network connection" +} + +// ListenAndServe is a convenience function to serve and wait for a SIGTERM +// or SIGINT before shutting down. +func ListenAndServe(s *http.Server, hd *HTTP) error { + if hd == nil { + hd = &HTTP{} + } + hs, err := hd.ListenAndServe(s) + if err != nil { + return err + } + + waiterr := make(chan error, 1) + go func() { + defer close(waiterr) + waiterr <- hs.Wait() + }() + + signals := make(chan os.Signal, 10) + signal.Notify(signals, syscall.SIGTERM, syscall.SIGINT) + + select { + case err := <-waiterr: + if err != nil { + return err + } + case <-signals: + signal.Stop(signals) + if err := hs.Stop(); err != nil { + return err + } + if err := <-waiterr; err != nil { + return err + } + } + return nil +} diff --git a/weed/util/inits.go b/weed/util/inits.go new file mode 100644 index 000000000..378878012 --- /dev/null +++ b/weed/util/inits.go @@ -0,0 +1,52 @@ +package util + +import ( + "fmt" + "sort" +) + +// HumanReadableIntsMax joins a serials of inits into a smart one like 1-3 5 ... for human readable. +func HumanReadableIntsMax(max int, ids ...int) string { + if len(ids) <= max { + return HumanReadableInts(ids...) + } + + return HumanReadableInts(ids[:max]...) + " ..." +} + +// HumanReadableInts joins a serials of inits into a smart one like 1-3 5 7-10 for human readable. +func HumanReadableInts(ids ...int) string { + sort.Ints(ids) + + s := "" + start := 0 + last := 0 + + for i, v := range ids { + if i == 0 { + start = v + last = v + s = fmt.Sprintf("%d", v) + continue + } + + if last+1 == v { + last = v + continue + } + + if last > start { + s += fmt.Sprintf("-%d", last) + } + + s += fmt.Sprintf(" %d", v) + start = v + last = v + } + + if last != start { + s += fmt.Sprintf("-%d", last) + } + + return s +} diff --git a/weed/util/inits_test.go b/weed/util/inits_test.go new file mode 100644 index 000000000..f2c9b701f --- /dev/null +++ b/weed/util/inits_test.go @@ -0,0 +1,19 @@ +package util + +import ( + "github.com/stretchr/testify/assert" + "testing" +) + +func TestHumanReadableIntsMax(t *testing.T) { + assert.Equal(t, "1-2 ...", HumanReadableIntsMax(2, 1, 2, 3)) + assert.Equal(t, "1 3 ...", HumanReadableIntsMax(2, 1, 3, 5)) +} + +func TestHumanReadableInts(t *testing.T) { + assert.Equal(t, "1-3", HumanReadableInts(1, 2, 3)) + assert.Equal(t, "1 3", HumanReadableInts(1, 3)) + assert.Equal(t, "1 3 5", HumanReadableInts(5, 1, 3)) + assert.Equal(t, "1-3 5", HumanReadableInts(1, 2, 3, 5)) + assert.Equal(t, "1-3 5 7-9", HumanReadableInts(7, 9, 8, 1, 2, 3, 5)) +} diff --git a/weed/util/log_buffer/log_buffer.go b/weed/util/log_buffer/log_buffer.go new file mode 100644 index 000000000..b02c45b52 --- /dev/null +++ b/weed/util/log_buffer/log_buffer.go @@ -0,0 +1,278 @@ +package log_buffer + +import ( + "bytes" + "sync" + "time" + + "github.com/golang/protobuf/proto" + + "github.com/chrislusf/seaweedfs/weed/glog" + "github.com/chrislusf/seaweedfs/weed/pb/filer_pb" + "github.com/chrislusf/seaweedfs/weed/util" +) + +const BufferSize = 4 * 1024 * 1024 +const PreviousBufferCount = 3 + +type dataToFlush struct { + startTime time.Time + stopTime time.Time + data *bytes.Buffer +} + +type LogBuffer struct { + prevBuffers *SealedBuffers + buf []byte + idx []int + pos int + startTime time.Time + stopTime time.Time + sizeBuf []byte + flushInterval time.Duration + flushFn func(startTime, stopTime time.Time, buf []byte) + notifyFn func() + isStopping bool + flushChan chan *dataToFlush + lastTsNs int64 + sync.RWMutex +} + +func NewLogBuffer(flushInterval time.Duration, flushFn func(startTime, stopTime time.Time, buf []byte), notifyFn func()) *LogBuffer { + lb := &LogBuffer{ + prevBuffers: newSealedBuffers(PreviousBufferCount), + buf: make([]byte, BufferSize), + sizeBuf: make([]byte, 4), + flushInterval: flushInterval, + flushFn: flushFn, + notifyFn: notifyFn, + flushChan: make(chan *dataToFlush, 256), + } + go lb.loopFlush() + go lb.loopInterval() + return lb +} + +func (m *LogBuffer) AddToBuffer(partitionKey, data []byte) { + + m.Lock() + defer func() { + m.Unlock() + if m.notifyFn != nil { + m.notifyFn() + } + }() + + // need to put the timestamp inside the lock + ts := time.Now() + tsNs := ts.UnixNano() + if m.lastTsNs >= tsNs { + // this is unlikely to happen, but just in case + tsNs = m.lastTsNs + 1 + ts = time.Unix(0, tsNs) + } + m.lastTsNs = tsNs + logEntry := &filer_pb.LogEntry{ + TsNs: tsNs, + PartitionKeyHash: util.HashToInt32(partitionKey), + Data: data, + } + + logEntryData, _ := proto.Marshal(logEntry) + + size := len(logEntryData) + + if m.pos == 0 { + m.startTime = ts + } + + if m.startTime.Add(m.flushInterval).Before(ts) || len(m.buf)-m.pos < size+4 { + m.flushChan <- m.copyToFlush() + m.startTime = ts + if len(m.buf) < size+4 { + m.buf = make([]byte, 2*size+4) + } + } + m.stopTime = ts + + m.idx = append(m.idx, m.pos) + util.Uint32toBytes(m.sizeBuf, uint32(size)) + copy(m.buf[m.pos:m.pos+4], m.sizeBuf) + copy(m.buf[m.pos+4:m.pos+4+size], logEntryData) + m.pos += size + 4 + + // fmt.Printf("entry size %d total %d count %d, buffer:%p\n", size, m.pos, len(m.idx), m) + +} + +func (m *LogBuffer) Shutdown() { + m.Lock() + defer m.Unlock() + + if m.isStopping { + return + } + m.isStopping = true + toFlush := m.copyToFlush() + m.flushChan <- toFlush + close(m.flushChan) +} + +func (m *LogBuffer) loopFlush() { + for d := range m.flushChan { + if d != nil { + // fmt.Printf("flush [%v, %v] size %d\n", d.startTime, d.stopTime, len(d.data.Bytes())) + m.flushFn(d.startTime, d.stopTime, d.data.Bytes()) + d.releaseMemory() + } + } +} + +func (m *LogBuffer) loopInterval() { + for !m.isStopping { + time.Sleep(m.flushInterval) + m.Lock() + if m.isStopping { + m.Unlock() + return + } + // println("loop interval") + toFlush := m.copyToFlush() + m.flushChan <- toFlush + m.Unlock() + } +} + +func (m *LogBuffer) copyToFlush() *dataToFlush { + + if m.flushFn != nil && m.pos > 0 { + // fmt.Printf("flush buffer %d pos %d empty space %d\n", len(m.buf), m.pos, len(m.buf)-m.pos) + d := &dataToFlush{ + startTime: m.startTime, + stopTime: m.stopTime, + data: copiedBytes(m.buf[:m.pos]), + } + // fmt.Printf("flusing [0,%d) with %d entries\n", m.pos, len(m.idx)) + m.buf = m.prevBuffers.SealBuffer(m.startTime, m.stopTime, m.buf, m.pos) + m.pos = 0 + m.idx = m.idx[:0] + return d + } + return nil +} + +func (d *dataToFlush) releaseMemory() { + d.data.Reset() + bufferPool.Put(d.data) +} + +func (m *LogBuffer) ReadFromBuffer(lastReadTime time.Time) (bufferCopy *bytes.Buffer) { + m.RLock() + defer m.RUnlock() + + /* + fmt.Printf("read buffer %p: %v last stop time: [%v,%v], pos %d, entries:%d, prevBufs:%d\n", m, lastReadTime, m.startTime, m.stopTime, m.pos, len(m.idx), len(m.prevBuffers.buffers)) + for i, prevBuf := range m.prevBuffers.buffers { + fmt.Printf(" prev %d : %s\n", i, prevBuf.String()) + } + */ + + if lastReadTime.Equal(m.stopTime) { + return nil + } + if lastReadTime.After(m.stopTime) { + // glog.Fatalf("unexpected last read time %v, older than latest %v", lastReadTime, m.stopTime) + return nil + } + if lastReadTime.Before(m.startTime) { + // println("checking ", lastReadTime.UnixNano()) + for i, buf := range m.prevBuffers.buffers { + if buf.startTime.After(lastReadTime) { + if i == 0 { + // println("return the earliest in memory", buf.startTime.UnixNano()) + return copiedBytes(buf.buf[:buf.size]) + } + // println("return the", i, "th in memory", buf.startTime.UnixNano()) + return copiedBytes(buf.buf[:buf.size]) + } + if !buf.startTime.After(lastReadTime) && buf.stopTime.After(lastReadTime) { + pos := buf.locateByTs(lastReadTime) + // fmt.Printf("locate buffer[%d] pos %d\n", i, pos) + return copiedBytes(buf.buf[pos:buf.size]) + } + } + // println("return the current buf", lastReadTime.UnixNano()) + return copiedBytes(m.buf[:m.pos]) + } + + lastTs := lastReadTime.UnixNano() + l, h := 0, len(m.idx)-1 + + /* + for i, pos := range m.idx { + logEntry, ts := readTs(m.buf, pos) + event := &filer_pb.SubscribeMetadataResponse{} + proto.Unmarshal(logEntry.Data, event) + entry := event.EventNotification.OldEntry + if entry == nil { + entry = event.EventNotification.NewEntry + } + fmt.Printf("entry %d ts: %v offset:%d dir:%s name:%s\n", i, time.Unix(0, ts), pos, event.Directory, entry.Name) + } + fmt.Printf("l=%d, h=%d\n", l, h) + */ + + for l <= h { + mid := (l + h) / 2 + pos := m.idx[mid] + _, t := readTs(m.buf, pos) + if t <= lastTs { + l = mid + 1 + } else if lastTs < t { + var prevT int64 + if mid > 0 { + _, prevT = readTs(m.buf, m.idx[mid-1]) + } + if prevT <= lastTs { + // fmt.Printf("found l=%d, m-1=%d(ts=%d), m=%d(ts=%d), h=%d [%d, %d) \n", l, mid-1, prevT, mid, t, h, pos, m.pos) + return copiedBytes(m.buf[pos:m.pos]) + } + h = mid + } + // fmt.Printf("l=%d, h=%d\n", l, h) + } + + // FIXME: this could be that the buffer has been flushed already + return nil + +} +func (m *LogBuffer) ReleaseMeory(b *bytes.Buffer) { + bufferPool.Put(b) +} + +var bufferPool = sync.Pool{ + New: func() interface{} { + return new(bytes.Buffer) + }, +} + +func copiedBytes(buf []byte) (copied *bytes.Buffer) { + copied = bufferPool.Get().(*bytes.Buffer) + copied.Reset() + copied.Write(buf) + return +} + +func readTs(buf []byte, pos int) (size int, ts int64) { + + size = int(util.BytesToUint32(buf[pos : pos+4])) + entryData := buf[pos+4 : pos+4+size] + logEntry := &filer_pb.LogEntry{} + + err := proto.Unmarshal(entryData, logEntry) + if err != nil { + glog.Fatalf("unexpected unmarshal filer_pb.LogEntry: %v", err) + } + return size, logEntry.TsNs + +} diff --git a/weed/util/log_buffer/log_buffer_test.go b/weed/util/log_buffer/log_buffer_test.go new file mode 100644 index 000000000..f9ccc95c2 --- /dev/null +++ b/weed/util/log_buffer/log_buffer_test.go @@ -0,0 +1,42 @@ +package log_buffer + +import ( + "fmt" + "math/rand" + "testing" + "time" + + "github.com/chrislusf/seaweedfs/weed/pb/filer_pb" +) + +func TestNewLogBufferFirstBuffer(t *testing.T) { + lb := NewLogBuffer(time.Minute, func(startTime, stopTime time.Time, buf []byte) { + + }, func() { + + }) + + startTime := time.Now() + + messageSize := 1024 + messageCount := 5000 + var buf = make([]byte, messageSize) + for i := 0; i < messageCount; i++ { + rand.Read(buf) + lb.AddToBuffer(nil, buf) + } + + receivedmessageCount := 0 + lb.LoopProcessLogData(startTime, func() bool { + // stop if no more messages + return false + }, func(logEntry *filer_pb.LogEntry) error { + receivedmessageCount++ + return nil + }) + + if receivedmessageCount != messageCount { + fmt.Printf("sent %d received %d\n", messageCount, receivedmessageCount) + } + +} diff --git a/weed/util/log_buffer/log_read.go b/weed/util/log_buffer/log_read.go new file mode 100644 index 000000000..2b73a8064 --- /dev/null +++ b/weed/util/log_buffer/log_read.go @@ -0,0 +1,77 @@ +package log_buffer + +import ( + "bytes" + "time" + + "github.com/golang/protobuf/proto" + + "github.com/chrislusf/seaweedfs/weed/glog" + "github.com/chrislusf/seaweedfs/weed/pb/filer_pb" + "github.com/chrislusf/seaweedfs/weed/util" +) + +func (logBuffer *LogBuffer) LoopProcessLogData( + startTreadTime time.Time, + waitForDataFn func() bool, + eachLogDataFn func(logEntry *filer_pb.LogEntry) error) (err error) { + // loop through all messages + var bytesBuf *bytes.Buffer + lastReadTime := startTreadTime + defer func() { + if bytesBuf != nil { + logBuffer.ReleaseMeory(bytesBuf) + } + }() + + for { + + if bytesBuf != nil { + logBuffer.ReleaseMeory(bytesBuf) + } + bytesBuf = logBuffer.ReadFromBuffer(lastReadTime) + // fmt.Printf("ReadFromBuffer by %v\n", lastReadTime) + if bytesBuf == nil { + if waitForDataFn() { + continue + } else { + return + } + } + + buf := bytesBuf.Bytes() + // fmt.Printf("ReadFromBuffer by %v size %d\n", lastReadTime, len(buf)) + + batchSize := 0 + var startReadTime time.Time + + for pos := 0; pos+4 < len(buf); { + + size := util.BytesToUint32(buf[pos : pos+4]) + entryData := buf[pos+4 : pos+4+int(size)] + + // fmt.Printf("read buffer read %d [%d,%d) from [0,%d)\n", batchSize, pos, pos+int(size)+4, len(buf)) + + logEntry := &filer_pb.LogEntry{} + if err = proto.Unmarshal(entryData, logEntry); err != nil { + glog.Errorf("unexpected unmarshal messaging_pb.Message: %v", err) + pos += 4 + int(size) + continue + } + lastReadTime = time.Unix(0, logEntry.TsNs) + if startReadTime.IsZero() { + startReadTime = lastReadTime + } + + if err = eachLogDataFn(logEntry); err != nil { + return + } + + pos += 4 + int(size) + batchSize++ + } + + // fmt.Printf("sent message ts[%d,%d] size %d\n", startReadTime.UnixNano(), lastReadTime.UnixNano(), batchSize) + } + +} diff --git a/weed/util/log_buffer/sealed_buffer.go b/weed/util/log_buffer/sealed_buffer.go new file mode 100644 index 000000000..d133cf8d3 --- /dev/null +++ b/weed/util/log_buffer/sealed_buffer.go @@ -0,0 +1,62 @@ +package log_buffer + +import ( + "fmt" + "time" +) + +type MemBuffer struct { + buf []byte + size int + startTime time.Time + stopTime time.Time +} + +type SealedBuffers struct { + buffers []*MemBuffer +} + +func newSealedBuffers(size int) *SealedBuffers { + sbs := &SealedBuffers{} + + sbs.buffers = make([]*MemBuffer, size) + for i := 0; i < size; i++ { + sbs.buffers[i] = &MemBuffer{ + buf: make([]byte, BufferSize), + } + } + + return sbs +} + +func (sbs *SealedBuffers) SealBuffer(startTime, stopTime time.Time, buf []byte, pos int) (newBuf []byte) { + oldMemBuffer := sbs.buffers[0] + size := len(sbs.buffers) + for i := 0; i < size-1; i++ { + sbs.buffers[i].buf = sbs.buffers[i+1].buf + sbs.buffers[i].size = sbs.buffers[i+1].size + sbs.buffers[i].startTime = sbs.buffers[i+1].startTime + sbs.buffers[i].stopTime = sbs.buffers[i+1].stopTime + } + sbs.buffers[size-1].buf = buf + sbs.buffers[size-1].size = pos + sbs.buffers[size-1].startTime = startTime + sbs.buffers[size-1].stopTime = stopTime + return oldMemBuffer.buf +} + +func (mb *MemBuffer) locateByTs(lastReadTime time.Time) (pos int) { + lastReadTs := lastReadTime.UnixNano() + for pos < len(mb.buf) { + size, t := readTs(mb.buf, pos) + if t > lastReadTs { + return + } + pos += size + 4 + } + return len(mb.buf) +} + +func (mb *MemBuffer) String() string { + return fmt.Sprintf("[%v,%v] bytes:%d", mb.startTime, mb.stopTime, mb.size) +} diff --git a/weed/util/net_timeout.go b/weed/util/net_timeout.go index b8068e67f..f057a8f5b 100644 --- a/weed/util/net_timeout.go +++ b/weed/util/net_timeout.go @@ -35,6 +35,7 @@ type Conn struct { net.Conn ReadTimeout time.Duration WriteTimeout time.Duration + isClosed bool } func (c *Conn) Read(b []byte) (count int, e error) { @@ -68,7 +69,10 @@ func (c *Conn) Write(b []byte) (count int, e error) { func (c *Conn) Close() error { err := c.Conn.Close() if err == nil { - stats.ConnectionClose() + if !c.isClosed { + stats.ConnectionClose() + c.isClosed = true + } } return err } diff --git a/weed/util/network.go b/weed/util/network.go new file mode 100644 index 000000000..7108cfea6 --- /dev/null +++ b/weed/util/network.go @@ -0,0 +1,25 @@ +package util + +import ( + "net" + + "github.com/chrislusf/seaweedfs/weed/glog" +) + +func DetectedHostAddress() string { + addrs, err := net.InterfaceAddrs() + if err != nil { + glog.V(0).Infof("failed to detect ip address: %v", err) + return "" + } + + for _, a := range addrs { + if ipnet, ok := a.(*net.IPNet); ok && !ipnet.IP.IsLoopback() { + if ipnet.IP.To4() != nil { + return ipnet.IP.String() + } + } + } + + return "localhost" +} diff --git a/weed/util/parse.go b/weed/util/parse.go index 0a8317c19..0955db682 100644 --- a/weed/util/parse.go +++ b/weed/util/parse.go @@ -1,7 +1,10 @@ package util import ( + "fmt" + "net/url" "strconv" + "strings" ) func ParseInt(text string, defaultValue int) int { @@ -24,3 +27,37 @@ func ParseUint64(text string, defaultValue uint64) uint64 { } return count } + +func ParseFilerUrl(entryPath string) (filerServer string, filerPort int64, path string, err error) { + if !strings.HasPrefix(entryPath, "http://") && !strings.HasPrefix(entryPath, "https://") { + entryPath = "http://" + entryPath + } + + var u *url.URL + u, err = url.Parse(entryPath) + if err != nil { + return + } + filerServer = u.Hostname() + portString := u.Port() + if portString != "" { + filerPort, err = strconv.ParseInt(portString, 10, 32) + } + path = u.Path + return +} + +func ParseHostPort(hostPort string) (filerServer string, filerPort int64, err error) { + parts := strings.Split(hostPort, ":") + if len(parts) != 2 { + err = fmt.Errorf("failed to parse %s\n", hostPort) + return + } + + filerPort, err = strconv.ParseInt(parts[1], 10, 64) + if err == nil { + filerServer = parts[0] + } + + return +} diff --git a/weed/util/queue.go b/weed/util/queue.go new file mode 100644 index 000000000..1e6211e0d --- /dev/null +++ b/weed/util/queue.go @@ -0,0 +1,61 @@ +package util + +import "sync" + +type node struct { + data interface{} + next *node +} + +type Queue struct { + head *node + tail *node + count int + sync.RWMutex +} + +func NewQueue() *Queue { + q := &Queue{} + return q +} + +func (q *Queue) Len() int { + q.RLock() + defer q.RUnlock() + return q.count +} + +func (q *Queue) Enqueue(item interface{}) { + q.Lock() + defer q.Unlock() + + n := &node{data: item} + + if q.tail == nil { + q.tail = n + q.head = n + } else { + q.tail.next = n + q.tail = n + } + q.count++ +} + +func (q *Queue) Dequeue() interface{} { + q.Lock() + defer q.Unlock() + + if q.head == nil { + return nil + } + + n := q.head + q.head = n.next + + if q.head == nil { + q.tail = nil + } + q.count-- + + return n.data +} diff --git a/weed/util/queue_unbounded.go b/weed/util/queue_unbounded.go new file mode 100644 index 000000000..496b9f844 --- /dev/null +++ b/weed/util/queue_unbounded.go @@ -0,0 +1,45 @@ +package util + +import "sync" + +type UnboundedQueue struct { + outbound []string + outboundLock sync.RWMutex + inbound []string + inboundLock sync.RWMutex +} + +func NewUnboundedQueue() *UnboundedQueue { + q := &UnboundedQueue{} + return q +} + +func (q *UnboundedQueue) EnQueue(items ...string) { + q.inboundLock.Lock() + defer q.inboundLock.Unlock() + + q.inbound = append(q.inbound, items...) + +} + +func (q *UnboundedQueue) Consume(fn func([]string)) { + q.outboundLock.Lock() + defer q.outboundLock.Unlock() + + if len(q.outbound) == 0 { + q.inboundLock.Lock() + inbountLen := len(q.inbound) + if inbountLen > 0 { + t := q.outbound + q.outbound = q.inbound + q.inbound = t + } + q.inboundLock.Unlock() + } + + if len(q.outbound) > 0 { + fn(q.outbound) + q.outbound = q.outbound[:0] + } + +} diff --git a/weed/util/queue_unbounded_test.go b/weed/util/queue_unbounded_test.go new file mode 100644 index 000000000..2d02032cb --- /dev/null +++ b/weed/util/queue_unbounded_test.go @@ -0,0 +1,25 @@ +package util + +import "testing" + +func TestEnqueueAndConsume(t *testing.T) { + + q := NewUnboundedQueue() + + q.EnQueue("1", "2", "3") + + f := func(items []string) { + for _, t := range items { + println(t) + } + println("-----------------------") + } + q.Consume(f) + + q.Consume(f) + + q.EnQueue("4", "5") + q.EnQueue("6", "7") + q.Consume(f) + +} diff --git a/weed/util/throttler.go b/weed/util/throttler.go new file mode 100644 index 000000000..873161e37 --- /dev/null +++ b/weed/util/throttler.go @@ -0,0 +1,34 @@ +package util + +import "time" + +type WriteThrottler struct { + compactionBytePerSecond int64 + lastSizeCounter int64 + lastSizeCheckTime time.Time +} + +func NewWriteThrottler(bytesPerSecond int64) *WriteThrottler { + return &WriteThrottler{ + compactionBytePerSecond: bytesPerSecond, + lastSizeCheckTime: time.Now(), + } +} + +func (wt *WriteThrottler) MaybeSlowdown(delta int64) { + if wt.compactionBytePerSecond > 0 { + wt.lastSizeCounter += delta + now := time.Now() + elapsedDuration := now.Sub(wt.lastSizeCheckTime) + if elapsedDuration > 100*time.Millisecond { + overLimitBytes := wt.lastSizeCounter - wt.compactionBytePerSecond/10 + if overLimitBytes > 0 { + overRatio := float64(overLimitBytes) / float64(wt.compactionBytePerSecond) + sleepTime := time.Duration(overRatio*1000) * time.Millisecond + // glog.V(0).Infof("currently %d bytes, limit to %d bytes, over by %d bytes, sleeping %v over %.4f", wt.lastSizeCounter, wt.compactionBytePerSecond/10, overLimitBytes, sleepTime, overRatio) + time.Sleep(sleepTime) + } + wt.lastSizeCounter, wt.lastSizeCheckTime = 0, time.Now() + } + } +} |
