aboutsummaryrefslogtreecommitdiff
path: root/weed/util
diff options
context:
space:
mode:
Diffstat (limited to 'weed/util')
-rw-r--r--weed/util/bytes.go40
-rw-r--r--weed/util/chunk_cache/chunk_cache.go113
-rw-r--r--weed/util/chunk_cache/chunk_cache_in_memory.go36
-rw-r--r--weed/util/chunk_cache/chunk_cache_on_disk.go145
-rw-r--r--weed/util/chunk_cache/chunk_cache_on_disk_test.go59
-rw-r--r--weed/util/chunk_cache/on_disk_cache_layer.go89
-rw-r--r--weed/util/cipher.go60
-rw-r--r--weed/util/cipher_test.go17
-rw-r--r--weed/util/compression.go6
-rw-r--r--weed/util/compression_test.go21
-rw-r--r--weed/util/config.go5
-rw-r--r--weed/util/constants.go7
-rw-r--r--weed/util/fullpath.go56
-rw-r--r--weed/util/grace/pprof.go (renamed from weed/util/pprof.go)2
-rw-r--r--weed/util/grace/signal_handling.go (renamed from weed/util/signal_handling.go)2
-rw-r--r--weed/util/grace/signal_handling_notsupported.go (renamed from weed/util/signal_handling_notsupported.go)2
-rw-r--r--weed/util/grpc_client_server.go127
-rw-r--r--weed/util/http_util.go82
-rw-r--r--weed/util/inits.go52
-rw-r--r--weed/util/inits_test.go19
-rw-r--r--weed/util/log_buffer/log_buffer.go278
-rw-r--r--weed/util/log_buffer/log_buffer_test.go42
-rw-r--r--weed/util/log_buffer/log_read.go77
-rw-r--r--weed/util/log_buffer/sealed_buffer.go62
-rw-r--r--weed/util/net_timeout.go6
-rw-r--r--weed/util/network.go25
-rw-r--r--weed/util/parse.go16
-rw-r--r--weed/util/queue_unbounded.go45
-rw-r--r--weed/util/queue_unbounded_test.go25
29 files changed, 1361 insertions, 155 deletions
diff --git a/weed/util/bytes.go b/weed/util/bytes.go
index 9c7e5e2cb..0650919c0 100644
--- a/weed/util/bytes.go
+++ b/weed/util/bytes.go
@@ -2,9 +2,26 @@ package util
import (
"crypto/md5"
+ "fmt"
"io"
)
+// BytesToHumanReadable returns the converted human readable representation of the bytes.
+func BytesToHumanReadable(b uint64) string {
+ const unit = 1024
+ if b < unit {
+ return fmt.Sprintf("%d B", b)
+ }
+
+ div, exp := uint64(unit), 0
+ for n := b / unit; n >= unit; n /= unit {
+ div *= unit
+ exp++
+ }
+
+ return fmt.Sprintf("%.2f %ciB", float64(b)/float64(div), "KMGTPE"[exp])
+}
+
// big endian
func BytesToUint64(b []byte) (v uint64) {
@@ -74,3 +91,26 @@ func HashStringToLong(dir string) (v int64) {
return
}
+
+func HashToInt32(data []byte) (v int32) {
+ h := md5.New()
+ h.Write(data)
+
+ b := h.Sum(nil)
+
+ v += int32(b[0])
+ v <<= 8
+ v += int32(b[1])
+ v <<= 8
+ v += int32(b[2])
+ v <<= 8
+ v += int32(b[3])
+
+ return
+}
+
+func Md5(data []byte) string {
+ hash := md5.New()
+ hash.Write(data)
+ return fmt.Sprintf("%x", hash.Sum(nil))
+}
diff --git a/weed/util/chunk_cache/chunk_cache.go b/weed/util/chunk_cache/chunk_cache.go
new file mode 100644
index 000000000..e1d4b639f
--- /dev/null
+++ b/weed/util/chunk_cache/chunk_cache.go
@@ -0,0 +1,113 @@
+package chunk_cache
+
+import (
+ "sync"
+
+ "github.com/chrislusf/seaweedfs/weed/glog"
+ "github.com/chrislusf/seaweedfs/weed/storage/needle"
+)
+
+const (
+ memCacheSizeLimit = 1024 * 1024
+ onDiskCacheSizeLimit0 = memCacheSizeLimit
+ onDiskCacheSizeLimit1 = 4 * memCacheSizeLimit
+)
+
+// a global cache for recently accessed file chunks
+type ChunkCache struct {
+ memCache *ChunkCacheInMemory
+ diskCaches []*OnDiskCacheLayer
+ sync.RWMutex
+}
+
+func NewChunkCache(maxEntries int64, dir string, diskSizeMB int64) *ChunkCache {
+
+ c := &ChunkCache{
+ memCache: NewChunkCacheInMemory(maxEntries),
+ }
+ c.diskCaches = make([]*OnDiskCacheLayer, 3)
+ c.diskCaches[0] = NewOnDiskCacheLayer(dir, "c0_1", diskSizeMB/4, 4)
+ c.diskCaches[1] = NewOnDiskCacheLayer(dir, "c1_4", diskSizeMB/4, 4)
+ c.diskCaches[2] = NewOnDiskCacheLayer(dir, "cache", diskSizeMB/2, 4)
+
+ return c
+}
+
+func (c *ChunkCache) GetChunk(fileId string, chunkSize uint64) (data []byte) {
+ if c == nil {
+ return
+ }
+
+ c.RLock()
+ defer c.RUnlock()
+
+ return c.doGetChunk(fileId, chunkSize)
+}
+
+func (c *ChunkCache) doGetChunk(fileId string, chunkSize uint64) (data []byte) {
+
+ if chunkSize < memCacheSizeLimit {
+ if data = c.memCache.GetChunk(fileId); data != nil {
+ return data
+ }
+ }
+
+ fid, err := needle.ParseFileIdFromString(fileId)
+ if err != nil {
+ glog.Errorf("failed to parse file id %s", fileId)
+ return nil
+ }
+
+ for _, diskCache := range c.diskCaches {
+ data := diskCache.getChunk(fid.Key)
+ if len(data) != 0 {
+ return data
+ }
+ }
+
+ return nil
+
+}
+
+func (c *ChunkCache) SetChunk(fileId string, data []byte) {
+ if c == nil {
+ return
+ }
+ c.Lock()
+ defer c.Unlock()
+
+ c.doSetChunk(fileId, data)
+}
+
+func (c *ChunkCache) doSetChunk(fileId string, data []byte) {
+
+ if len(data) < memCacheSizeLimit {
+ c.memCache.SetChunk(fileId, data)
+ }
+
+ fid, err := needle.ParseFileIdFromString(fileId)
+ if err != nil {
+ glog.Errorf("failed to parse file id %s", fileId)
+ return
+ }
+
+ if len(data) < onDiskCacheSizeLimit0 {
+ c.diskCaches[0].setChunk(fid.Key, data)
+ } else if len(data) < onDiskCacheSizeLimit1 {
+ c.diskCaches[1].setChunk(fid.Key, data)
+ } else {
+ c.diskCaches[2].setChunk(fid.Key, data)
+ }
+
+}
+
+func (c *ChunkCache) Shutdown() {
+ if c == nil {
+ return
+ }
+ c.Lock()
+ defer c.Unlock()
+ for _, diskCache := range c.diskCaches {
+ diskCache.shutdown()
+ }
+}
diff --git a/weed/util/chunk_cache/chunk_cache_in_memory.go b/weed/util/chunk_cache/chunk_cache_in_memory.go
new file mode 100644
index 000000000..931e45e9a
--- /dev/null
+++ b/weed/util/chunk_cache/chunk_cache_in_memory.go
@@ -0,0 +1,36 @@
+package chunk_cache
+
+import (
+ "time"
+
+ "github.com/karlseguin/ccache"
+)
+
+// a global cache for recently accessed file chunks
+type ChunkCacheInMemory struct {
+ cache *ccache.Cache
+}
+
+func NewChunkCacheInMemory(maxEntries int64) *ChunkCacheInMemory {
+ pruneCount := maxEntries >> 3
+ if pruneCount <= 0 {
+ pruneCount = 500
+ }
+ return &ChunkCacheInMemory{
+ cache: ccache.New(ccache.Configure().MaxSize(maxEntries).ItemsToPrune(uint32(pruneCount))),
+ }
+}
+
+func (c *ChunkCacheInMemory) GetChunk(fileId string) []byte {
+ item := c.cache.Get(fileId)
+ if item == nil {
+ return nil
+ }
+ data := item.Value().([]byte)
+ item.Extend(time.Hour)
+ return data
+}
+
+func (c *ChunkCacheInMemory) SetChunk(fileId string, data []byte) {
+ c.cache.Set(fileId, data, time.Hour)
+}
diff --git a/weed/util/chunk_cache/chunk_cache_on_disk.go b/weed/util/chunk_cache/chunk_cache_on_disk.go
new file mode 100644
index 000000000..2c7ef8d39
--- /dev/null
+++ b/weed/util/chunk_cache/chunk_cache_on_disk.go
@@ -0,0 +1,145 @@
+package chunk_cache
+
+import (
+ "fmt"
+ "os"
+ "time"
+
+ "github.com/syndtr/goleveldb/leveldb/opt"
+
+ "github.com/chrislusf/seaweedfs/weed/glog"
+ "github.com/chrislusf/seaweedfs/weed/storage"
+ "github.com/chrislusf/seaweedfs/weed/storage/backend"
+ "github.com/chrislusf/seaweedfs/weed/storage/types"
+ "github.com/chrislusf/seaweedfs/weed/util"
+)
+
+// This implements an on disk cache
+// The entries are an FIFO with a size limit
+
+type ChunkCacheVolume struct {
+ DataBackend backend.BackendStorageFile
+ nm storage.NeedleMapper
+ fileName string
+ smallBuffer []byte
+ sizeLimit int64
+ lastModTime time.Time
+ fileSize int64
+}
+
+func LoadOrCreateChunkCacheVolume(fileName string, preallocate int64) (*ChunkCacheVolume, error) {
+
+ v := &ChunkCacheVolume{
+ smallBuffer: make([]byte, types.NeedlePaddingSize),
+ fileName: fileName,
+ sizeLimit: preallocate,
+ }
+
+ var err error
+
+ if exists, canRead, canWrite, modTime, fileSize := util.CheckFile(v.fileName + ".dat"); exists {
+ if !canRead {
+ return nil, fmt.Errorf("cannot read cache file %s.dat", v.fileName)
+ }
+ if !canWrite {
+ return nil, fmt.Errorf("cannot write cache file %s.dat", v.fileName)
+ }
+ if dataFile, err := os.OpenFile(v.fileName+".dat", os.O_RDWR|os.O_CREATE, 0644); err != nil {
+ return nil, fmt.Errorf("cannot create cache file %s.dat: %v", v.fileName, err)
+ } else {
+ v.DataBackend = backend.NewDiskFile(dataFile)
+ v.lastModTime = modTime
+ v.fileSize = fileSize
+ }
+ } else {
+ if v.DataBackend, err = backend.CreateVolumeFile(v.fileName+".dat", preallocate, 0); err != nil {
+ return nil, fmt.Errorf("cannot create cache file %s.dat: %v", v.fileName, err)
+ }
+ v.lastModTime = time.Now()
+ }
+
+ var indexFile *os.File
+ if indexFile, err = os.OpenFile(v.fileName+".idx", os.O_RDWR|os.O_CREATE, 0644); err != nil {
+ return nil, fmt.Errorf("cannot write cache index %s.idx: %v", v.fileName, err)
+ }
+
+ glog.V(0).Infoln("loading leveldb", v.fileName+".ldb")
+ opts := &opt.Options{
+ BlockCacheCapacity: 2 * 1024 * 1024, // default value is 8MiB
+ WriteBuffer: 1 * 1024 * 1024, // default value is 4MiB
+ CompactionTableSizeMultiplier: 10, // default value is 1
+ }
+ if v.nm, err = storage.NewLevelDbNeedleMap(v.fileName+".ldb", indexFile, opts); err != nil {
+ return nil, fmt.Errorf("loading leveldb %s error: %v", v.fileName+".ldb", err)
+ }
+
+ return v, nil
+
+}
+
+func (v *ChunkCacheVolume) Shutdown() {
+ if v.DataBackend != nil {
+ v.DataBackend.Close()
+ v.DataBackend = nil
+ }
+ if v.nm != nil {
+ v.nm.Close()
+ v.nm = nil
+ }
+}
+
+func (v *ChunkCacheVolume) destroy() {
+ v.Shutdown()
+ os.Remove(v.fileName + ".dat")
+ os.Remove(v.fileName + ".idx")
+ os.RemoveAll(v.fileName + ".ldb")
+}
+
+func (v *ChunkCacheVolume) Reset() (*ChunkCacheVolume, error) {
+ v.destroy()
+ return LoadOrCreateChunkCacheVolume(v.fileName, v.sizeLimit)
+}
+
+func (v *ChunkCacheVolume) GetNeedle(key types.NeedleId) ([]byte, error) {
+
+ nv, ok := v.nm.Get(key)
+ if !ok {
+ return nil, storage.ErrorNotFound
+ }
+ data := make([]byte, nv.Size)
+ if readSize, readErr := v.DataBackend.ReadAt(data, nv.Offset.ToAcutalOffset()); readErr != nil {
+ return nil, fmt.Errorf("read %s.dat [%d,%d): %v",
+ v.fileName, nv.Offset.ToAcutalOffset(), nv.Offset.ToAcutalOffset()+int64(nv.Size), readErr)
+ } else {
+ if readSize != int(nv.Size) {
+ return nil, fmt.Errorf("read %d, expected %d", readSize, nv.Size)
+ }
+ }
+
+ return data, nil
+}
+
+func (v *ChunkCacheVolume) WriteNeedle(key types.NeedleId, data []byte) error {
+
+ offset := v.fileSize
+
+ written, err := v.DataBackend.WriteAt(data, offset)
+ if err != nil {
+ return err
+ } else if written != len(data) {
+ return fmt.Errorf("partial written %d, expected %d", written, len(data))
+ }
+
+ v.fileSize += int64(written)
+ extraSize := written % types.NeedlePaddingSize
+ if extraSize != 0 {
+ v.DataBackend.WriteAt(v.smallBuffer[:types.NeedlePaddingSize-extraSize], offset+int64(written))
+ v.fileSize += int64(types.NeedlePaddingSize - extraSize)
+ }
+
+ if err := v.nm.Put(key, types.ToOffset(offset), uint32(len(data))); err != nil {
+ glog.V(4).Infof("failed to save in needle map %d: %v", key, err)
+ }
+
+ return nil
+}
diff --git a/weed/util/chunk_cache/chunk_cache_on_disk_test.go b/weed/util/chunk_cache/chunk_cache_on_disk_test.go
new file mode 100644
index 000000000..f061f2ba2
--- /dev/null
+++ b/weed/util/chunk_cache/chunk_cache_on_disk_test.go
@@ -0,0 +1,59 @@
+package chunk_cache
+
+import (
+ "bytes"
+ "fmt"
+ "io/ioutil"
+ "math/rand"
+ "os"
+ "testing"
+)
+
+func TestOnDisk(t *testing.T) {
+
+ tmpDir, _ := ioutil.TempDir("", "c")
+ defer os.RemoveAll(tmpDir)
+
+ totalDiskSizeMb := int64(32)
+
+ cache := NewChunkCache(0, tmpDir, totalDiskSizeMb)
+
+ writeCount := 5
+ type test_data struct {
+ data []byte
+ fileId string
+ size uint64
+ }
+ testData := make([]*test_data, writeCount)
+ for i := 0; i < writeCount; i++ {
+ buff := make([]byte, 1024*1024)
+ rand.Read(buff)
+ testData[i] = &test_data{
+ data: buff,
+ fileId: fmt.Sprintf("1,%daabbccdd", i+1),
+ size: uint64(len(buff)),
+ }
+ cache.SetChunk(testData[i].fileId, testData[i].data)
+ }
+
+ for i := 0; i < writeCount; i++ {
+ data := cache.GetChunk(testData[i].fileId, testData[i].size)
+ if bytes.Compare(data, testData[i].data) != 0 {
+ t.Errorf("failed to write to and read from cache: %d", i)
+ }
+ }
+
+ cache.Shutdown()
+
+ cache = NewChunkCache(0, tmpDir, totalDiskSizeMb)
+
+ for i := 0; i < writeCount; i++ {
+ data := cache.GetChunk(testData[i].fileId, testData[i].size)
+ if bytes.Compare(data, testData[i].data) != 0 {
+ t.Errorf("failed to write to and read from cache: %d", i)
+ }
+ }
+
+ cache.Shutdown()
+
+}
diff --git a/weed/util/chunk_cache/on_disk_cache_layer.go b/weed/util/chunk_cache/on_disk_cache_layer.go
new file mode 100644
index 000000000..9cf8e3ab2
--- /dev/null
+++ b/weed/util/chunk_cache/on_disk_cache_layer.go
@@ -0,0 +1,89 @@
+package chunk_cache
+
+import (
+ "fmt"
+ "path"
+ "sort"
+
+ "github.com/chrislusf/seaweedfs/weed/glog"
+ "github.com/chrislusf/seaweedfs/weed/storage"
+ "github.com/chrislusf/seaweedfs/weed/storage/types"
+)
+
+type OnDiskCacheLayer struct {
+ diskCaches []*ChunkCacheVolume
+}
+
+func NewOnDiskCacheLayer(dir, namePrefix string, diskSizeMB int64, segmentCount int) *OnDiskCacheLayer {
+
+ volumeCount, volumeSize := int(diskSizeMB/30000), int64(30000)
+ if volumeCount < segmentCount {
+ volumeCount, volumeSize = segmentCount, diskSizeMB/int64(segmentCount)
+ }
+
+ c := &OnDiskCacheLayer{}
+ for i := 0; i < volumeCount; i++ {
+ fileName := path.Join(dir, fmt.Sprintf("%s_%d", namePrefix, i))
+ diskCache, err := LoadOrCreateChunkCacheVolume(fileName, volumeSize*1024*1024)
+ if err != nil {
+ glog.Errorf("failed to add cache %s : %v", fileName, err)
+ } else {
+ c.diskCaches = append(c.diskCaches, diskCache)
+ }
+ }
+
+ // keep newest cache to the front
+ sort.Slice(c.diskCaches, func(i, j int) bool {
+ return c.diskCaches[i].lastModTime.After(c.diskCaches[j].lastModTime)
+ })
+
+ return c
+}
+
+func (c *OnDiskCacheLayer) setChunk(needleId types.NeedleId, data []byte) {
+
+ if c.diskCaches[0].fileSize+int64(len(data)) > c.diskCaches[0].sizeLimit {
+ t, resetErr := c.diskCaches[len(c.diskCaches)-1].Reset()
+ if resetErr != nil {
+ glog.Errorf("failed to reset cache file %s", c.diskCaches[len(c.diskCaches)-1].fileName)
+ return
+ }
+ for i := len(c.diskCaches) - 1; i > 0; i-- {
+ c.diskCaches[i] = c.diskCaches[i-1]
+ }
+ c.diskCaches[0] = t
+ }
+
+ c.diskCaches[0].WriteNeedle(needleId, data)
+
+}
+
+func (c *OnDiskCacheLayer) getChunk(needleId types.NeedleId) (data []byte) {
+
+ var err error
+
+ for _, diskCache := range c.diskCaches {
+ data, err = diskCache.GetNeedle(needleId)
+ if err == storage.ErrorNotFound {
+ continue
+ }
+ if err != nil {
+ glog.Errorf("failed to read cache file %s id %d", diskCache.fileName, needleId)
+ continue
+ }
+ if len(data) != 0 {
+ return
+ }
+ }
+
+ return nil
+
+}
+
+func (c *OnDiskCacheLayer) shutdown() {
+
+ for _, diskCache := range c.diskCaches {
+ diskCache.Shutdown()
+ }
+
+}
diff --git a/weed/util/cipher.go b/weed/util/cipher.go
new file mode 100644
index 000000000..f044c2ca3
--- /dev/null
+++ b/weed/util/cipher.go
@@ -0,0 +1,60 @@
+package util
+
+import (
+ "crypto/aes"
+ "crypto/cipher"
+ "crypto/rand"
+ "errors"
+ "io"
+
+ "github.com/chrislusf/seaweedfs/weed/glog"
+)
+
+type CipherKey []byte
+
+func GenCipherKey() CipherKey {
+ key := make([]byte, 32)
+ if _, err := io.ReadFull(rand.Reader, key); err != nil {
+ glog.Fatalf("random key gen: %v", err)
+ }
+ return CipherKey(key)
+}
+
+func Encrypt(plaintext []byte, key CipherKey) ([]byte, error) {
+ c, err := aes.NewCipher(key)
+ if err != nil {
+ return nil, err
+ }
+
+ gcm, err := cipher.NewGCM(c)
+ if err != nil {
+ return nil, err
+ }
+
+ nonce := make([]byte, gcm.NonceSize())
+ if _, err = io.ReadFull(rand.Reader, nonce); err != nil {
+ return nil, err
+ }
+
+ return gcm.Seal(nonce, nonce, plaintext, nil), nil
+}
+
+func Decrypt(ciphertext []byte, key CipherKey) ([]byte, error) {
+ c, err := aes.NewCipher(key)
+ if err != nil {
+ return nil, err
+ }
+
+ gcm, err := cipher.NewGCM(c)
+ if err != nil {
+ return nil, err
+ }
+
+ nonceSize := gcm.NonceSize()
+ if len(ciphertext) < nonceSize {
+ return nil, errors.New("ciphertext too short")
+ }
+
+ nonce, ciphertext := ciphertext[:nonceSize], ciphertext[nonceSize:]
+ return gcm.Open(nil, nonce, ciphertext, nil)
+}
diff --git a/weed/util/cipher_test.go b/weed/util/cipher_test.go
new file mode 100644
index 000000000..026c96ea3
--- /dev/null
+++ b/weed/util/cipher_test.go
@@ -0,0 +1,17 @@
+package util
+
+import (
+ "encoding/base64"
+ "testing"
+)
+
+func TestSameAsJavaImplementation(t *testing.T) {
+ str := "QVVhmqg112NMT7F+G/7QPynqSln3xPIhKdFGmTVKZD6IS0noyr2Z5kXFF6fPjZ/7Hq8kRhlmLeeqZUccxyaZHezOdgkjS6d4NTdHf5IjXzk7"
+ cipherText, _ := base64.StdEncoding.DecodeString(str)
+ secretKey := []byte("256-bit key for AES 256 GCM encr")
+ plantext, err := Decrypt(cipherText, CipherKey(secretKey))
+ if err != nil {
+ println(err.Error())
+ }
+ println(string(plantext))
+}
diff --git a/weed/util/compression.go b/weed/util/compression.go
index 6072df632..1f778b5d5 100644
--- a/weed/util/compression.go
+++ b/weed/util/compression.go
@@ -7,8 +7,9 @@ import (
"io/ioutil"
"strings"
- "github.com/chrislusf/seaweedfs/weed/glog"
"golang.org/x/tools/godoc/util"
+
+ "github.com/chrislusf/seaweedfs/weed/glog"
)
func GzipData(input []byte) ([]byte, error) {
@@ -37,7 +38,8 @@ func UnGzipData(input []byte) ([]byte, error) {
/*
* Default more not to gzip since gzip can be done on client side.
- */func IsGzippable(ext, mtype string, data []byte) bool {
+ */
+func IsGzippable(ext, mtype string, data []byte) bool {
shouldBeZipped, iAmSure := IsGzippableFileType(ext, mtype)
if iAmSure {
diff --git a/weed/util/compression_test.go b/weed/util/compression_test.go
new file mode 100644
index 000000000..b515e8988
--- /dev/null
+++ b/weed/util/compression_test.go
@@ -0,0 +1,21 @@
+package util
+
+import (
+ "testing"
+
+ "golang.org/x/tools/godoc/util"
+)
+
+func TestIsGzippable(t *testing.T) {
+ buf := make([]byte, 1024)
+
+ isText := util.IsText(buf)
+
+ if isText {
+ t.Error("buf with zeros are not text")
+ }
+
+ compressed, _ := GzipData(buf)
+
+ t.Logf("compressed size %d\n", len(compressed))
+}
diff --git a/weed/util/config.go b/weed/util/config.go
index dfbfdbd82..7b6e92f08 100644
--- a/weed/util/config.go
+++ b/weed/util/config.go
@@ -27,7 +27,7 @@ func LoadConfiguration(configFileName string, required bool) (loaded bool) {
glog.V(1).Infof("Reading %s.toml from %s", configFileName, viper.ConfigFileUsed())
if err := viper.MergeInConfig(); err != nil { // Handle errors reading the config file
- glog.V(0).Infof("Reading %s: %v", viper.ConfigFileUsed(), err)
+ glog.V(1).Infof("Reading %s: %v", viper.ConfigFileUsed(), err)
if required {
glog.Fatalf("Failed to load %s.toml file from current directory, or $HOME/.seaweedfs/, or /etc/seaweedfs/"+
"\n\nPlease use this command to generate the default %s.toml file\n"+
@@ -42,7 +42,8 @@ func LoadConfiguration(configFileName string, required bool) (loaded bool) {
}
func GetViper() *viper.Viper {
- v := viper.GetViper()
+ v := &viper.Viper{}
+ *v = *viper.GetViper()
v.AutomaticEnv()
v.SetEnvPrefix("weed")
v.SetEnvKeyReplacer(strings.NewReplacer(".", "_"))
diff --git a/weed/util/constants.go b/weed/util/constants.go
index 3d61b2006..7c3927a66 100644
--- a/weed/util/constants.go
+++ b/weed/util/constants.go
@@ -5,5 +5,10 @@ import (
)
var (
- VERSION = fmt.Sprintf("%s %d.%d", sizeLimit, 1, 53)
+ VERSION = fmt.Sprintf("%s %d.%d", sizeLimit, 1, 79)
+ COMMIT = ""
)
+
+func Version() string {
+ return VERSION + " " + COMMIT
+} \ No newline at end of file
diff --git a/weed/util/fullpath.go b/weed/util/fullpath.go
new file mode 100644
index 000000000..4ce8a2f90
--- /dev/null
+++ b/weed/util/fullpath.go
@@ -0,0 +1,56 @@
+package util
+
+import (
+ "path/filepath"
+ "strings"
+)
+
+type FullPath string
+
+func NewFullPath(dir, name string) FullPath {
+ return FullPath(dir).Child(name)
+}
+
+func (fp FullPath) DirAndName() (string, string) {
+ dir, name := filepath.Split(string(fp))
+ if dir == "/" {
+ return dir, name
+ }
+ if len(dir) < 1 {
+ return "/", ""
+ }
+ return dir[:len(dir)-1], name
+}
+
+func (fp FullPath) Name() string {
+ _, name := filepath.Split(string(fp))
+ return name
+}
+
+func (fp FullPath) Child(name string) FullPath {
+ dir := string(fp)
+ if strings.HasSuffix(dir, "/") {
+ return FullPath(dir + name)
+ }
+ return FullPath(dir + "/" + name)
+}
+
+func (fp FullPath) AsInode() uint64 {
+ return uint64(HashStringToLong(string(fp)))
+}
+
+// split, but skipping the root
+func (fp FullPath) Split() []string {
+ if fp == "" || fp == "/" {
+ return []string{}
+ }
+ return strings.Split(string(fp)[1:], "/")
+}
+
+func Join(names ...string) string {
+ return filepath.ToSlash(filepath.Join(names...))
+}
+
+func JoinPath(names ...string) FullPath {
+ return FullPath(Join(names...))
+}
diff --git a/weed/util/pprof.go b/weed/util/grace/pprof.go
index a2621ceee..14686bfc8 100644
--- a/weed/util/pprof.go
+++ b/weed/util/grace/pprof.go
@@ -1,4 +1,4 @@
-package util
+package grace
import (
"os"
diff --git a/weed/util/signal_handling.go b/weed/util/grace/signal_handling.go
index 99447e8be..7cca46764 100644
--- a/weed/util/signal_handling.go
+++ b/weed/util/grace/signal_handling.go
@@ -1,6 +1,6 @@
// +build !plan9
-package util
+package grace
import (
"os"
diff --git a/weed/util/signal_handling_notsupported.go b/weed/util/grace/signal_handling_notsupported.go
index c389cfb7e..5335915a1 100644
--- a/weed/util/signal_handling_notsupported.go
+++ b/weed/util/grace/signal_handling_notsupported.go
@@ -1,6 +1,6 @@
// +build plan9
-package util
+package grace
func OnInterrupt(fn func()) {
}
diff --git a/weed/util/grpc_client_server.go b/weed/util/grpc_client_server.go
deleted file mode 100644
index 7e396342b..000000000
--- a/weed/util/grpc_client_server.go
+++ /dev/null
@@ -1,127 +0,0 @@
-package util
-
-import (
- "context"
- "fmt"
- "net/http"
- "strconv"
- "strings"
- "sync"
- "time"
-
- "google.golang.org/grpc"
- "google.golang.org/grpc/keepalive"
-)
-
-var (
- // cache grpc connections
- grpcClients = make(map[string]*grpc.ClientConn)
- grpcClientsLock sync.Mutex
-)
-
-func init() {
- http.DefaultTransport.(*http.Transport).MaxIdleConnsPerHost = 1024
-}
-
-func NewGrpcServer(opts ...grpc.ServerOption) *grpc.Server {
- var options []grpc.ServerOption
- options = append(options, grpc.KeepaliveParams(keepalive.ServerParameters{
- Time: 10 * time.Second, // wait time before ping if no activity
- Timeout: 20 * time.Second, // ping timeout
- }), grpc.KeepaliveEnforcementPolicy(keepalive.EnforcementPolicy{
- MinTime: 60 * time.Second, // min time a client should wait before sending a ping
- }))
- for _, opt := range opts {
- if opt != nil {
- options = append(options, opt)
- }
- }
- return grpc.NewServer(options...)
-}
-
-func GrpcDial(ctx context.Context, address string, opts ...grpc.DialOption) (*grpc.ClientConn, error) {
- // opts = append(opts, grpc.WithBlock())
- // opts = append(opts, grpc.WithTimeout(time.Duration(5*time.Second)))
- var options []grpc.DialOption
- options = append(options,
- // grpc.WithInsecure(),
- grpc.WithKeepaliveParams(keepalive.ClientParameters{
- Time: 30 * time.Second, // client ping server if no activity for this long
- Timeout: 20 * time.Second,
- }))
- for _, opt := range opts {
- if opt != nil {
- options = append(options, opt)
- }
- }
- return grpc.DialContext(ctx, address, options...)
-}
-
-func WithCachedGrpcClient(ctx context.Context, fn func(context.Context, *grpc.ClientConn) error, address string, opts ...grpc.DialOption) error {
-
- grpcClientsLock.Lock()
-
- existingConnection, found := grpcClients[address]
- if found {
- grpcClientsLock.Unlock()
- err := fn(ctx, existingConnection)
- if err != nil {
- grpcClientsLock.Lock()
- delete(grpcClients, address)
- grpcClientsLock.Unlock()
- existingConnection.Close()
- }
- return err
- }
-
- grpcConnection, err := GrpcDial(ctx, address, opts...)
- if err != nil {
- grpcClientsLock.Unlock()
- return fmt.Errorf("fail to dial %s: %v", address, err)
- }
-
- grpcClients[address] = grpcConnection
- grpcClientsLock.Unlock()
-
- err = fn(ctx, grpcConnection)
- if err != nil {
- grpcClientsLock.Lock()
- delete(grpcClients, address)
- grpcClientsLock.Unlock()
- grpcConnection.Close()
- }
-
- return err
-}
-
-func ParseServerToGrpcAddress(server string) (serverGrpcAddress string, err error) {
- colonIndex := strings.LastIndex(server, ":")
- if colonIndex < 0 {
- return "", fmt.Errorf("server should have hostname:port format: %v", server)
- }
-
- port, parseErr := strconv.ParseUint(server[colonIndex+1:], 10, 64)
- if parseErr != nil {
- return "", fmt.Errorf("server port parse error: %v", parseErr)
- }
-
- grpcPort := int(port) + 10000
-
- return fmt.Sprintf("%s:%d", server[:colonIndex], grpcPort), nil
-}
-
-func ServerToGrpcAddress(server string) (serverGrpcAddress string) {
- hostnameAndPort := strings.Split(server, ":")
- if len(hostnameAndPort) != 2 {
- return fmt.Sprintf("unexpected server address: %s", server)
- }
-
- port, parseErr := strconv.ParseUint(hostnameAndPort[1], 10, 64)
- if parseErr != nil {
- return fmt.Sprintf("failed to parse port for %s:%s", hostnameAndPort[0], hostnameAndPort[1])
- }
-
- grpcPort := int(port) + 10000
-
- return fmt.Sprintf("%s:%d", hostnameAndPort[0], grpcPort)
-}
diff --git a/weed/util/http_util.go b/weed/util/http_util.go
index 08007a038..5df79a7be 100644
--- a/weed/util/http_util.go
+++ b/weed/util/http_util.go
@@ -35,13 +35,13 @@ func PostBytes(url string, body []byte) ([]byte, error) {
return nil, fmt.Errorf("Post to %s: %v", url, err)
}
defer r.Body.Close()
- if r.StatusCode >= 400 {
- return nil, fmt.Errorf("%s: %s", url, r.Status)
- }
b, err := ioutil.ReadAll(r.Body)
if err != nil {
return nil, fmt.Errorf("Read response body: %v", err)
}
+ if r.StatusCode >= 400 {
+ return nil, fmt.Errorf("%s: %s", url, r.Status)
+ }
return b, nil
}
@@ -88,7 +88,7 @@ func Head(url string) (http.Header, error) {
if err != nil {
return nil, err
}
- defer r.Body.Close()
+ defer CloseResponse(r)
if r.StatusCode >= 400 {
return nil, fmt.Errorf("%s: %s", url, r.Status)
}
@@ -117,7 +117,7 @@ func Delete(url string, jwt string) error {
return nil
}
m := make(map[string]interface{})
- if e := json.Unmarshal(body, m); e == nil {
+ if e := json.Unmarshal(body, &m); e == nil {
if s, ok := m["error"].(string); ok {
return errors.New(s)
}
@@ -130,7 +130,7 @@ func GetBufferStream(url string, values url.Values, allocatedBytes []byte, eachB
if err != nil {
return err
}
- defer r.Body.Close()
+ defer CloseResponse(r)
if r.StatusCode != 200 {
return fmt.Errorf("%s: %s", url, r.Status)
}
@@ -153,7 +153,7 @@ func GetUrlStream(url string, values url.Values, readFn func(io.Reader) error) e
if err != nil {
return err
}
- defer r.Body.Close()
+ defer CloseResponse(r)
if r.StatusCode != 200 {
return fmt.Errorf("%s: %s", url, r.Status)
}
@@ -189,13 +189,21 @@ func NormalizeUrl(url string) string {
return "http://" + url
}
-func ReadUrl(fileUrl string, offset int64, size int, buf []byte, isReadRange bool) (int64, error) {
+func ReadUrl(fileUrl string, cipherKey []byte, isGzipped bool, isFullChunk bool, offset int64, size int, buf []byte) (int64, error) {
+
+ if cipherKey != nil {
+ var n int
+ err := readEncryptedUrl(fileUrl, cipherKey, isGzipped, isFullChunk, offset, size, func(data []byte) {
+ n = copy(buf, data)
+ })
+ return int64(n), err
+ }
req, err := http.NewRequest("GET", fileUrl, nil)
if err != nil {
return 0, err
}
- if isReadRange {
+ if !isFullChunk {
req.Header.Add("Range", fmt.Sprintf("bytes=%d-%d", offset, offset+int64(size)-1))
} else {
req.Header.Set("Accept-Encoding", "gzip")
@@ -250,43 +258,74 @@ func ReadUrl(fileUrl string, offset int64, size int, buf []byte, isReadRange boo
return n, err
}
-func ReadUrlAsStream(fileUrl string, offset int64, size int, fn func(data []byte)) (int64, error) {
+func ReadUrlAsStream(fileUrl string, cipherKey []byte, isContentGzipped bool, isFullChunk bool, offset int64, size int, fn func(data []byte)) error {
+
+ if cipherKey != nil {
+ return readEncryptedUrl(fileUrl, cipherKey, isContentGzipped, isFullChunk, offset, size, fn)
+ }
req, err := http.NewRequest("GET", fileUrl, nil)
if err != nil {
- return 0, err
+ return err
+ }
+
+ if !isFullChunk {
+ req.Header.Add("Range", fmt.Sprintf("bytes=%d-%d", offset, offset+int64(size)-1))
}
- req.Header.Add("Range", fmt.Sprintf("bytes=%d-%d", offset, offset+int64(size)-1))
r, err := client.Do(req)
if err != nil {
- return 0, err
+ return err
}
- defer r.Body.Close()
+ defer CloseResponse(r)
if r.StatusCode >= 400 {
- return 0, fmt.Errorf("%s: %s", fileUrl, r.Status)
+ return fmt.Errorf("%s: %s", fileUrl, r.Status)
}
var (
m int
- n int64
)
buf := make([]byte, 64*1024)
for {
m, err = r.Body.Read(buf)
fn(buf[:m])
- n += int64(m)
if err == io.EOF {
- return n, nil
+ return nil
}
if err != nil {
- return n, err
+ return err
}
}
}
+func readEncryptedUrl(fileUrl string, cipherKey []byte, isContentGzipped bool, isFullChunk bool, offset int64, size int, fn func(data []byte)) error {
+ encryptedData, err := Get(fileUrl)
+ if err != nil {
+ return fmt.Errorf("fetch %s: %v", fileUrl, err)
+ }
+ decryptedData, err := Decrypt(encryptedData, CipherKey(cipherKey))
+ if err != nil {
+ return fmt.Errorf("decrypt %s: %v", fileUrl, err)
+ }
+ if isContentGzipped {
+ decryptedData, err = UnGzipData(decryptedData)
+ if err != nil {
+ return fmt.Errorf("unzip decrypt %s: %v", fileUrl, err)
+ }
+ }
+ if len(decryptedData) < int(offset)+size {
+ return fmt.Errorf("read decrypted %s size %d [%d, %d)", fileUrl, len(decryptedData), offset, int(offset)+size)
+ }
+ if isFullChunk {
+ fn(decryptedData)
+ } else {
+ fn(decryptedData[int(offset) : int(offset)+size])
+ }
+ return nil
+}
+
func ReadUrlAsReaderCloser(fileUrl string, rangeHeader string) (io.ReadCloser, error) {
req, err := http.NewRequest("GET", fileUrl, nil)
@@ -307,3 +346,8 @@ func ReadUrlAsReaderCloser(fileUrl string, rangeHeader string) (io.ReadCloser, e
return r.Body, nil
}
+
+func CloseResponse(resp *http.Response) {
+ io.Copy(ioutil.Discard, resp.Body)
+ resp.Body.Close()
+}
diff --git a/weed/util/inits.go b/weed/util/inits.go
new file mode 100644
index 000000000..378878012
--- /dev/null
+++ b/weed/util/inits.go
@@ -0,0 +1,52 @@
+package util
+
+import (
+ "fmt"
+ "sort"
+)
+
+// HumanReadableIntsMax joins a serials of inits into a smart one like 1-3 5 ... for human readable.
+func HumanReadableIntsMax(max int, ids ...int) string {
+ if len(ids) <= max {
+ return HumanReadableInts(ids...)
+ }
+
+ return HumanReadableInts(ids[:max]...) + " ..."
+}
+
+// HumanReadableInts joins a serials of inits into a smart one like 1-3 5 7-10 for human readable.
+func HumanReadableInts(ids ...int) string {
+ sort.Ints(ids)
+
+ s := ""
+ start := 0
+ last := 0
+
+ for i, v := range ids {
+ if i == 0 {
+ start = v
+ last = v
+ s = fmt.Sprintf("%d", v)
+ continue
+ }
+
+ if last+1 == v {
+ last = v
+ continue
+ }
+
+ if last > start {
+ s += fmt.Sprintf("-%d", last)
+ }
+
+ s += fmt.Sprintf(" %d", v)
+ start = v
+ last = v
+ }
+
+ if last != start {
+ s += fmt.Sprintf("-%d", last)
+ }
+
+ return s
+}
diff --git a/weed/util/inits_test.go b/weed/util/inits_test.go
new file mode 100644
index 000000000..f2c9b701f
--- /dev/null
+++ b/weed/util/inits_test.go
@@ -0,0 +1,19 @@
+package util
+
+import (
+ "github.com/stretchr/testify/assert"
+ "testing"
+)
+
+func TestHumanReadableIntsMax(t *testing.T) {
+ assert.Equal(t, "1-2 ...", HumanReadableIntsMax(2, 1, 2, 3))
+ assert.Equal(t, "1 3 ...", HumanReadableIntsMax(2, 1, 3, 5))
+}
+
+func TestHumanReadableInts(t *testing.T) {
+ assert.Equal(t, "1-3", HumanReadableInts(1, 2, 3))
+ assert.Equal(t, "1 3", HumanReadableInts(1, 3))
+ assert.Equal(t, "1 3 5", HumanReadableInts(5, 1, 3))
+ assert.Equal(t, "1-3 5", HumanReadableInts(1, 2, 3, 5))
+ assert.Equal(t, "1-3 5 7-9", HumanReadableInts(7, 9, 8, 1, 2, 3, 5))
+}
diff --git a/weed/util/log_buffer/log_buffer.go b/weed/util/log_buffer/log_buffer.go
new file mode 100644
index 000000000..b02c45b52
--- /dev/null
+++ b/weed/util/log_buffer/log_buffer.go
@@ -0,0 +1,278 @@
+package log_buffer
+
+import (
+ "bytes"
+ "sync"
+ "time"
+
+ "github.com/golang/protobuf/proto"
+
+ "github.com/chrislusf/seaweedfs/weed/glog"
+ "github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
+ "github.com/chrislusf/seaweedfs/weed/util"
+)
+
+const BufferSize = 4 * 1024 * 1024
+const PreviousBufferCount = 3
+
+type dataToFlush struct {
+ startTime time.Time
+ stopTime time.Time
+ data *bytes.Buffer
+}
+
+type LogBuffer struct {
+ prevBuffers *SealedBuffers
+ buf []byte
+ idx []int
+ pos int
+ startTime time.Time
+ stopTime time.Time
+ sizeBuf []byte
+ flushInterval time.Duration
+ flushFn func(startTime, stopTime time.Time, buf []byte)
+ notifyFn func()
+ isStopping bool
+ flushChan chan *dataToFlush
+ lastTsNs int64
+ sync.RWMutex
+}
+
+func NewLogBuffer(flushInterval time.Duration, flushFn func(startTime, stopTime time.Time, buf []byte), notifyFn func()) *LogBuffer {
+ lb := &LogBuffer{
+ prevBuffers: newSealedBuffers(PreviousBufferCount),
+ buf: make([]byte, BufferSize),
+ sizeBuf: make([]byte, 4),
+ flushInterval: flushInterval,
+ flushFn: flushFn,
+ notifyFn: notifyFn,
+ flushChan: make(chan *dataToFlush, 256),
+ }
+ go lb.loopFlush()
+ go lb.loopInterval()
+ return lb
+}
+
+func (m *LogBuffer) AddToBuffer(partitionKey, data []byte) {
+
+ m.Lock()
+ defer func() {
+ m.Unlock()
+ if m.notifyFn != nil {
+ m.notifyFn()
+ }
+ }()
+
+ // need to put the timestamp inside the lock
+ ts := time.Now()
+ tsNs := ts.UnixNano()
+ if m.lastTsNs >= tsNs {
+ // this is unlikely to happen, but just in case
+ tsNs = m.lastTsNs + 1
+ ts = time.Unix(0, tsNs)
+ }
+ m.lastTsNs = tsNs
+ logEntry := &filer_pb.LogEntry{
+ TsNs: tsNs,
+ PartitionKeyHash: util.HashToInt32(partitionKey),
+ Data: data,
+ }
+
+ logEntryData, _ := proto.Marshal(logEntry)
+
+ size := len(logEntryData)
+
+ if m.pos == 0 {
+ m.startTime = ts
+ }
+
+ if m.startTime.Add(m.flushInterval).Before(ts) || len(m.buf)-m.pos < size+4 {
+ m.flushChan <- m.copyToFlush()
+ m.startTime = ts
+ if len(m.buf) < size+4 {
+ m.buf = make([]byte, 2*size+4)
+ }
+ }
+ m.stopTime = ts
+
+ m.idx = append(m.idx, m.pos)
+ util.Uint32toBytes(m.sizeBuf, uint32(size))
+ copy(m.buf[m.pos:m.pos+4], m.sizeBuf)
+ copy(m.buf[m.pos+4:m.pos+4+size], logEntryData)
+ m.pos += size + 4
+
+ // fmt.Printf("entry size %d total %d count %d, buffer:%p\n", size, m.pos, len(m.idx), m)
+
+}
+
+func (m *LogBuffer) Shutdown() {
+ m.Lock()
+ defer m.Unlock()
+
+ if m.isStopping {
+ return
+ }
+ m.isStopping = true
+ toFlush := m.copyToFlush()
+ m.flushChan <- toFlush
+ close(m.flushChan)
+}
+
+func (m *LogBuffer) loopFlush() {
+ for d := range m.flushChan {
+ if d != nil {
+ // fmt.Printf("flush [%v, %v] size %d\n", d.startTime, d.stopTime, len(d.data.Bytes()))
+ m.flushFn(d.startTime, d.stopTime, d.data.Bytes())
+ d.releaseMemory()
+ }
+ }
+}
+
+func (m *LogBuffer) loopInterval() {
+ for !m.isStopping {
+ time.Sleep(m.flushInterval)
+ m.Lock()
+ if m.isStopping {
+ m.Unlock()
+ return
+ }
+ // println("loop interval")
+ toFlush := m.copyToFlush()
+ m.flushChan <- toFlush
+ m.Unlock()
+ }
+}
+
+func (m *LogBuffer) copyToFlush() *dataToFlush {
+
+ if m.flushFn != nil && m.pos > 0 {
+ // fmt.Printf("flush buffer %d pos %d empty space %d\n", len(m.buf), m.pos, len(m.buf)-m.pos)
+ d := &dataToFlush{
+ startTime: m.startTime,
+ stopTime: m.stopTime,
+ data: copiedBytes(m.buf[:m.pos]),
+ }
+ // fmt.Printf("flusing [0,%d) with %d entries\n", m.pos, len(m.idx))
+ m.buf = m.prevBuffers.SealBuffer(m.startTime, m.stopTime, m.buf, m.pos)
+ m.pos = 0
+ m.idx = m.idx[:0]
+ return d
+ }
+ return nil
+}
+
+func (d *dataToFlush) releaseMemory() {
+ d.data.Reset()
+ bufferPool.Put(d.data)
+}
+
+func (m *LogBuffer) ReadFromBuffer(lastReadTime time.Time) (bufferCopy *bytes.Buffer) {
+ m.RLock()
+ defer m.RUnlock()
+
+ /*
+ fmt.Printf("read buffer %p: %v last stop time: [%v,%v], pos %d, entries:%d, prevBufs:%d\n", m, lastReadTime, m.startTime, m.stopTime, m.pos, len(m.idx), len(m.prevBuffers.buffers))
+ for i, prevBuf := range m.prevBuffers.buffers {
+ fmt.Printf(" prev %d : %s\n", i, prevBuf.String())
+ }
+ */
+
+ if lastReadTime.Equal(m.stopTime) {
+ return nil
+ }
+ if lastReadTime.After(m.stopTime) {
+ // glog.Fatalf("unexpected last read time %v, older than latest %v", lastReadTime, m.stopTime)
+ return nil
+ }
+ if lastReadTime.Before(m.startTime) {
+ // println("checking ", lastReadTime.UnixNano())
+ for i, buf := range m.prevBuffers.buffers {
+ if buf.startTime.After(lastReadTime) {
+ if i == 0 {
+ // println("return the earliest in memory", buf.startTime.UnixNano())
+ return copiedBytes(buf.buf[:buf.size])
+ }
+ // println("return the", i, "th in memory", buf.startTime.UnixNano())
+ return copiedBytes(buf.buf[:buf.size])
+ }
+ if !buf.startTime.After(lastReadTime) && buf.stopTime.After(lastReadTime) {
+ pos := buf.locateByTs(lastReadTime)
+ // fmt.Printf("locate buffer[%d] pos %d\n", i, pos)
+ return copiedBytes(buf.buf[pos:buf.size])
+ }
+ }
+ // println("return the current buf", lastReadTime.UnixNano())
+ return copiedBytes(m.buf[:m.pos])
+ }
+
+ lastTs := lastReadTime.UnixNano()
+ l, h := 0, len(m.idx)-1
+
+ /*
+ for i, pos := range m.idx {
+ logEntry, ts := readTs(m.buf, pos)
+ event := &filer_pb.SubscribeMetadataResponse{}
+ proto.Unmarshal(logEntry.Data, event)
+ entry := event.EventNotification.OldEntry
+ if entry == nil {
+ entry = event.EventNotification.NewEntry
+ }
+ fmt.Printf("entry %d ts: %v offset:%d dir:%s name:%s\n", i, time.Unix(0, ts), pos, event.Directory, entry.Name)
+ }
+ fmt.Printf("l=%d, h=%d\n", l, h)
+ */
+
+ for l <= h {
+ mid := (l + h) / 2
+ pos := m.idx[mid]
+ _, t := readTs(m.buf, pos)
+ if t <= lastTs {
+ l = mid + 1
+ } else if lastTs < t {
+ var prevT int64
+ if mid > 0 {
+ _, prevT = readTs(m.buf, m.idx[mid-1])
+ }
+ if prevT <= lastTs {
+ // fmt.Printf("found l=%d, m-1=%d(ts=%d), m=%d(ts=%d), h=%d [%d, %d) \n", l, mid-1, prevT, mid, t, h, pos, m.pos)
+ return copiedBytes(m.buf[pos:m.pos])
+ }
+ h = mid
+ }
+ // fmt.Printf("l=%d, h=%d\n", l, h)
+ }
+
+ // FIXME: this could be that the buffer has been flushed already
+ return nil
+
+}
+func (m *LogBuffer) ReleaseMeory(b *bytes.Buffer) {
+ bufferPool.Put(b)
+}
+
+var bufferPool = sync.Pool{
+ New: func() interface{} {
+ return new(bytes.Buffer)
+ },
+}
+
+func copiedBytes(buf []byte) (copied *bytes.Buffer) {
+ copied = bufferPool.Get().(*bytes.Buffer)
+ copied.Reset()
+ copied.Write(buf)
+ return
+}
+
+func readTs(buf []byte, pos int) (size int, ts int64) {
+
+ size = int(util.BytesToUint32(buf[pos : pos+4]))
+ entryData := buf[pos+4 : pos+4+size]
+ logEntry := &filer_pb.LogEntry{}
+
+ err := proto.Unmarshal(entryData, logEntry)
+ if err != nil {
+ glog.Fatalf("unexpected unmarshal filer_pb.LogEntry: %v", err)
+ }
+ return size, logEntry.TsNs
+
+}
diff --git a/weed/util/log_buffer/log_buffer_test.go b/weed/util/log_buffer/log_buffer_test.go
new file mode 100644
index 000000000..f9ccc95c2
--- /dev/null
+++ b/weed/util/log_buffer/log_buffer_test.go
@@ -0,0 +1,42 @@
+package log_buffer
+
+import (
+ "fmt"
+ "math/rand"
+ "testing"
+ "time"
+
+ "github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
+)
+
+func TestNewLogBufferFirstBuffer(t *testing.T) {
+ lb := NewLogBuffer(time.Minute, func(startTime, stopTime time.Time, buf []byte) {
+
+ }, func() {
+
+ })
+
+ startTime := time.Now()
+
+ messageSize := 1024
+ messageCount := 5000
+ var buf = make([]byte, messageSize)
+ for i := 0; i < messageCount; i++ {
+ rand.Read(buf)
+ lb.AddToBuffer(nil, buf)
+ }
+
+ receivedmessageCount := 0
+ lb.LoopProcessLogData(startTime, func() bool {
+ // stop if no more messages
+ return false
+ }, func(logEntry *filer_pb.LogEntry) error {
+ receivedmessageCount++
+ return nil
+ })
+
+ if receivedmessageCount != messageCount {
+ fmt.Printf("sent %d received %d\n", messageCount, receivedmessageCount)
+ }
+
+}
diff --git a/weed/util/log_buffer/log_read.go b/weed/util/log_buffer/log_read.go
new file mode 100644
index 000000000..2b73a8064
--- /dev/null
+++ b/weed/util/log_buffer/log_read.go
@@ -0,0 +1,77 @@
+package log_buffer
+
+import (
+ "bytes"
+ "time"
+
+ "github.com/golang/protobuf/proto"
+
+ "github.com/chrislusf/seaweedfs/weed/glog"
+ "github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
+ "github.com/chrislusf/seaweedfs/weed/util"
+)
+
+func (logBuffer *LogBuffer) LoopProcessLogData(
+ startTreadTime time.Time,
+ waitForDataFn func() bool,
+ eachLogDataFn func(logEntry *filer_pb.LogEntry) error) (err error) {
+ // loop through all messages
+ var bytesBuf *bytes.Buffer
+ lastReadTime := startTreadTime
+ defer func() {
+ if bytesBuf != nil {
+ logBuffer.ReleaseMeory(bytesBuf)
+ }
+ }()
+
+ for {
+
+ if bytesBuf != nil {
+ logBuffer.ReleaseMeory(bytesBuf)
+ }
+ bytesBuf = logBuffer.ReadFromBuffer(lastReadTime)
+ // fmt.Printf("ReadFromBuffer by %v\n", lastReadTime)
+ if bytesBuf == nil {
+ if waitForDataFn() {
+ continue
+ } else {
+ return
+ }
+ }
+
+ buf := bytesBuf.Bytes()
+ // fmt.Printf("ReadFromBuffer by %v size %d\n", lastReadTime, len(buf))
+
+ batchSize := 0
+ var startReadTime time.Time
+
+ for pos := 0; pos+4 < len(buf); {
+
+ size := util.BytesToUint32(buf[pos : pos+4])
+ entryData := buf[pos+4 : pos+4+int(size)]
+
+ // fmt.Printf("read buffer read %d [%d,%d) from [0,%d)\n", batchSize, pos, pos+int(size)+4, len(buf))
+
+ logEntry := &filer_pb.LogEntry{}
+ if err = proto.Unmarshal(entryData, logEntry); err != nil {
+ glog.Errorf("unexpected unmarshal messaging_pb.Message: %v", err)
+ pos += 4 + int(size)
+ continue
+ }
+ lastReadTime = time.Unix(0, logEntry.TsNs)
+ if startReadTime.IsZero() {
+ startReadTime = lastReadTime
+ }
+
+ if err = eachLogDataFn(logEntry); err != nil {
+ return
+ }
+
+ pos += 4 + int(size)
+ batchSize++
+ }
+
+ // fmt.Printf("sent message ts[%d,%d] size %d\n", startReadTime.UnixNano(), lastReadTime.UnixNano(), batchSize)
+ }
+
+}
diff --git a/weed/util/log_buffer/sealed_buffer.go b/weed/util/log_buffer/sealed_buffer.go
new file mode 100644
index 000000000..d133cf8d3
--- /dev/null
+++ b/weed/util/log_buffer/sealed_buffer.go
@@ -0,0 +1,62 @@
+package log_buffer
+
+import (
+ "fmt"
+ "time"
+)
+
+type MemBuffer struct {
+ buf []byte
+ size int
+ startTime time.Time
+ stopTime time.Time
+}
+
+type SealedBuffers struct {
+ buffers []*MemBuffer
+}
+
+func newSealedBuffers(size int) *SealedBuffers {
+ sbs := &SealedBuffers{}
+
+ sbs.buffers = make([]*MemBuffer, size)
+ for i := 0; i < size; i++ {
+ sbs.buffers[i] = &MemBuffer{
+ buf: make([]byte, BufferSize),
+ }
+ }
+
+ return sbs
+}
+
+func (sbs *SealedBuffers) SealBuffer(startTime, stopTime time.Time, buf []byte, pos int) (newBuf []byte) {
+ oldMemBuffer := sbs.buffers[0]
+ size := len(sbs.buffers)
+ for i := 0; i < size-1; i++ {
+ sbs.buffers[i].buf = sbs.buffers[i+1].buf
+ sbs.buffers[i].size = sbs.buffers[i+1].size
+ sbs.buffers[i].startTime = sbs.buffers[i+1].startTime
+ sbs.buffers[i].stopTime = sbs.buffers[i+1].stopTime
+ }
+ sbs.buffers[size-1].buf = buf
+ sbs.buffers[size-1].size = pos
+ sbs.buffers[size-1].startTime = startTime
+ sbs.buffers[size-1].stopTime = stopTime
+ return oldMemBuffer.buf
+}
+
+func (mb *MemBuffer) locateByTs(lastReadTime time.Time) (pos int) {
+ lastReadTs := lastReadTime.UnixNano()
+ for pos < len(mb.buf) {
+ size, t := readTs(mb.buf, pos)
+ if t > lastReadTs {
+ return
+ }
+ pos += size + 4
+ }
+ return len(mb.buf)
+}
+
+func (mb *MemBuffer) String() string {
+ return fmt.Sprintf("[%v,%v] bytes:%d", mb.startTime, mb.stopTime, mb.size)
+}
diff --git a/weed/util/net_timeout.go b/weed/util/net_timeout.go
index b8068e67f..f057a8f5b 100644
--- a/weed/util/net_timeout.go
+++ b/weed/util/net_timeout.go
@@ -35,6 +35,7 @@ type Conn struct {
net.Conn
ReadTimeout time.Duration
WriteTimeout time.Duration
+ isClosed bool
}
func (c *Conn) Read(b []byte) (count int, e error) {
@@ -68,7 +69,10 @@ func (c *Conn) Write(b []byte) (count int, e error) {
func (c *Conn) Close() error {
err := c.Conn.Close()
if err == nil {
- stats.ConnectionClose()
+ if !c.isClosed {
+ stats.ConnectionClose()
+ c.isClosed = true
+ }
}
return err
}
diff --git a/weed/util/network.go b/weed/util/network.go
new file mode 100644
index 000000000..7108cfea6
--- /dev/null
+++ b/weed/util/network.go
@@ -0,0 +1,25 @@
+package util
+
+import (
+ "net"
+
+ "github.com/chrislusf/seaweedfs/weed/glog"
+)
+
+func DetectedHostAddress() string {
+ addrs, err := net.InterfaceAddrs()
+ if err != nil {
+ glog.V(0).Infof("failed to detect ip address: %v", err)
+ return ""
+ }
+
+ for _, a := range addrs {
+ if ipnet, ok := a.(*net.IPNet); ok && !ipnet.IP.IsLoopback() {
+ if ipnet.IP.To4() != nil {
+ return ipnet.IP.String()
+ }
+ }
+ }
+
+ return "localhost"
+}
diff --git a/weed/util/parse.go b/weed/util/parse.go
index 6593d43b6..0955db682 100644
--- a/weed/util/parse.go
+++ b/weed/util/parse.go
@@ -1,6 +1,7 @@
package util
import (
+ "fmt"
"net/url"
"strconv"
"strings"
@@ -45,3 +46,18 @@ func ParseFilerUrl(entryPath string) (filerServer string, filerPort int64, path
path = u.Path
return
}
+
+func ParseHostPort(hostPort string) (filerServer string, filerPort int64, err error) {
+ parts := strings.Split(hostPort, ":")
+ if len(parts) != 2 {
+ err = fmt.Errorf("failed to parse %s\n", hostPort)
+ return
+ }
+
+ filerPort, err = strconv.ParseInt(parts[1], 10, 64)
+ if err == nil {
+ filerServer = parts[0]
+ }
+
+ return
+}
diff --git a/weed/util/queue_unbounded.go b/weed/util/queue_unbounded.go
new file mode 100644
index 000000000..496b9f844
--- /dev/null
+++ b/weed/util/queue_unbounded.go
@@ -0,0 +1,45 @@
+package util
+
+import "sync"
+
+type UnboundedQueue struct {
+ outbound []string
+ outboundLock sync.RWMutex
+ inbound []string
+ inboundLock sync.RWMutex
+}
+
+func NewUnboundedQueue() *UnboundedQueue {
+ q := &UnboundedQueue{}
+ return q
+}
+
+func (q *UnboundedQueue) EnQueue(items ...string) {
+ q.inboundLock.Lock()
+ defer q.inboundLock.Unlock()
+
+ q.inbound = append(q.inbound, items...)
+
+}
+
+func (q *UnboundedQueue) Consume(fn func([]string)) {
+ q.outboundLock.Lock()
+ defer q.outboundLock.Unlock()
+
+ if len(q.outbound) == 0 {
+ q.inboundLock.Lock()
+ inbountLen := len(q.inbound)
+ if inbountLen > 0 {
+ t := q.outbound
+ q.outbound = q.inbound
+ q.inbound = t
+ }
+ q.inboundLock.Unlock()
+ }
+
+ if len(q.outbound) > 0 {
+ fn(q.outbound)
+ q.outbound = q.outbound[:0]
+ }
+
+}
diff --git a/weed/util/queue_unbounded_test.go b/weed/util/queue_unbounded_test.go
new file mode 100644
index 000000000..2d02032cb
--- /dev/null
+++ b/weed/util/queue_unbounded_test.go
@@ -0,0 +1,25 @@
+package util
+
+import "testing"
+
+func TestEnqueueAndConsume(t *testing.T) {
+
+ q := NewUnboundedQueue()
+
+ q.EnQueue("1", "2", "3")
+
+ f := func(items []string) {
+ for _, t := range items {
+ println(t)
+ }
+ println("-----------------------")
+ }
+ q.Consume(f)
+
+ q.Consume(f)
+
+ q.EnQueue("4", "5")
+ q.EnQueue("6", "7")
+ q.Consume(f)
+
+}