diff options
| author | shibinbin <shibinbin@megvii.com> | 2020-06-04 17:24:18 +0800 |
|---|---|---|
| committer | shibinbin <shibinbin@megvii.com> | 2020-06-04 17:24:18 +0800 |
| commit | 40334bc28d3fa694ce59b4e65077efb845264d20 (patch) | |
| tree | a085e2e33851c4d916bef2952abc7cfbfe95ee88 /weed/util | |
| parent | d892cad15d748327c2b7c649f6398ff35d8dce0b (diff) | |
| parent | fbed2e9026b71c810dd86bd826c9e068e93d3c48 (diff) | |
| download | seaweedfs-40334bc28d3fa694ce59b4e65077efb845264d20.tar.xz seaweedfs-40334bc28d3fa694ce59b4e65077efb845264d20.zip | |
Merge remote-tracking branch 'upstream/master'
Diffstat (limited to 'weed/util')
29 files changed, 1361 insertions, 155 deletions
diff --git a/weed/util/bytes.go b/weed/util/bytes.go index 9c7e5e2cb..0650919c0 100644 --- a/weed/util/bytes.go +++ b/weed/util/bytes.go @@ -2,9 +2,26 @@ package util import ( "crypto/md5" + "fmt" "io" ) +// BytesToHumanReadable returns the converted human readable representation of the bytes. +func BytesToHumanReadable(b uint64) string { + const unit = 1024 + if b < unit { + return fmt.Sprintf("%d B", b) + } + + div, exp := uint64(unit), 0 + for n := b / unit; n >= unit; n /= unit { + div *= unit + exp++ + } + + return fmt.Sprintf("%.2f %ciB", float64(b)/float64(div), "KMGTPE"[exp]) +} + // big endian func BytesToUint64(b []byte) (v uint64) { @@ -74,3 +91,26 @@ func HashStringToLong(dir string) (v int64) { return } + +func HashToInt32(data []byte) (v int32) { + h := md5.New() + h.Write(data) + + b := h.Sum(nil) + + v += int32(b[0]) + v <<= 8 + v += int32(b[1]) + v <<= 8 + v += int32(b[2]) + v <<= 8 + v += int32(b[3]) + + return +} + +func Md5(data []byte) string { + hash := md5.New() + hash.Write(data) + return fmt.Sprintf("%x", hash.Sum(nil)) +} diff --git a/weed/util/chunk_cache/chunk_cache.go b/weed/util/chunk_cache/chunk_cache.go new file mode 100644 index 000000000..e1d4b639f --- /dev/null +++ b/weed/util/chunk_cache/chunk_cache.go @@ -0,0 +1,113 @@ +package chunk_cache + +import ( + "sync" + + "github.com/chrislusf/seaweedfs/weed/glog" + "github.com/chrislusf/seaweedfs/weed/storage/needle" +) + +const ( + memCacheSizeLimit = 1024 * 1024 + onDiskCacheSizeLimit0 = memCacheSizeLimit + onDiskCacheSizeLimit1 = 4 * memCacheSizeLimit +) + +// a global cache for recently accessed file chunks +type ChunkCache struct { + memCache *ChunkCacheInMemory + diskCaches []*OnDiskCacheLayer + sync.RWMutex +} + +func NewChunkCache(maxEntries int64, dir string, diskSizeMB int64) *ChunkCache { + + c := &ChunkCache{ + memCache: NewChunkCacheInMemory(maxEntries), + } + c.diskCaches = make([]*OnDiskCacheLayer, 3) + c.diskCaches[0] = NewOnDiskCacheLayer(dir, "c0_1", diskSizeMB/4, 4) + c.diskCaches[1] = NewOnDiskCacheLayer(dir, "c1_4", diskSizeMB/4, 4) + c.diskCaches[2] = NewOnDiskCacheLayer(dir, "cache", diskSizeMB/2, 4) + + return c +} + +func (c *ChunkCache) GetChunk(fileId string, chunkSize uint64) (data []byte) { + if c == nil { + return + } + + c.RLock() + defer c.RUnlock() + + return c.doGetChunk(fileId, chunkSize) +} + +func (c *ChunkCache) doGetChunk(fileId string, chunkSize uint64) (data []byte) { + + if chunkSize < memCacheSizeLimit { + if data = c.memCache.GetChunk(fileId); data != nil { + return data + } + } + + fid, err := needle.ParseFileIdFromString(fileId) + if err != nil { + glog.Errorf("failed to parse file id %s", fileId) + return nil + } + + for _, diskCache := range c.diskCaches { + data := diskCache.getChunk(fid.Key) + if len(data) != 0 { + return data + } + } + + return nil + +} + +func (c *ChunkCache) SetChunk(fileId string, data []byte) { + if c == nil { + return + } + c.Lock() + defer c.Unlock() + + c.doSetChunk(fileId, data) +} + +func (c *ChunkCache) doSetChunk(fileId string, data []byte) { + + if len(data) < memCacheSizeLimit { + c.memCache.SetChunk(fileId, data) + } + + fid, err := needle.ParseFileIdFromString(fileId) + if err != nil { + glog.Errorf("failed to parse file id %s", fileId) + return + } + + if len(data) < onDiskCacheSizeLimit0 { + c.diskCaches[0].setChunk(fid.Key, data) + } else if len(data) < onDiskCacheSizeLimit1 { + c.diskCaches[1].setChunk(fid.Key, data) + } else { + c.diskCaches[2].setChunk(fid.Key, data) + } + +} + +func (c *ChunkCache) Shutdown() { + if c == nil { + return + } + c.Lock() + defer c.Unlock() + for _, diskCache := range c.diskCaches { + diskCache.shutdown() + } +} diff --git a/weed/util/chunk_cache/chunk_cache_in_memory.go b/weed/util/chunk_cache/chunk_cache_in_memory.go new file mode 100644 index 000000000..931e45e9a --- /dev/null +++ b/weed/util/chunk_cache/chunk_cache_in_memory.go @@ -0,0 +1,36 @@ +package chunk_cache + +import ( + "time" + + "github.com/karlseguin/ccache" +) + +// a global cache for recently accessed file chunks +type ChunkCacheInMemory struct { + cache *ccache.Cache +} + +func NewChunkCacheInMemory(maxEntries int64) *ChunkCacheInMemory { + pruneCount := maxEntries >> 3 + if pruneCount <= 0 { + pruneCount = 500 + } + return &ChunkCacheInMemory{ + cache: ccache.New(ccache.Configure().MaxSize(maxEntries).ItemsToPrune(uint32(pruneCount))), + } +} + +func (c *ChunkCacheInMemory) GetChunk(fileId string) []byte { + item := c.cache.Get(fileId) + if item == nil { + return nil + } + data := item.Value().([]byte) + item.Extend(time.Hour) + return data +} + +func (c *ChunkCacheInMemory) SetChunk(fileId string, data []byte) { + c.cache.Set(fileId, data, time.Hour) +} diff --git a/weed/util/chunk_cache/chunk_cache_on_disk.go b/weed/util/chunk_cache/chunk_cache_on_disk.go new file mode 100644 index 000000000..2c7ef8d39 --- /dev/null +++ b/weed/util/chunk_cache/chunk_cache_on_disk.go @@ -0,0 +1,145 @@ +package chunk_cache + +import ( + "fmt" + "os" + "time" + + "github.com/syndtr/goleveldb/leveldb/opt" + + "github.com/chrislusf/seaweedfs/weed/glog" + "github.com/chrislusf/seaweedfs/weed/storage" + "github.com/chrislusf/seaweedfs/weed/storage/backend" + "github.com/chrislusf/seaweedfs/weed/storage/types" + "github.com/chrislusf/seaweedfs/weed/util" +) + +// This implements an on disk cache +// The entries are an FIFO with a size limit + +type ChunkCacheVolume struct { + DataBackend backend.BackendStorageFile + nm storage.NeedleMapper + fileName string + smallBuffer []byte + sizeLimit int64 + lastModTime time.Time + fileSize int64 +} + +func LoadOrCreateChunkCacheVolume(fileName string, preallocate int64) (*ChunkCacheVolume, error) { + + v := &ChunkCacheVolume{ + smallBuffer: make([]byte, types.NeedlePaddingSize), + fileName: fileName, + sizeLimit: preallocate, + } + + var err error + + if exists, canRead, canWrite, modTime, fileSize := util.CheckFile(v.fileName + ".dat"); exists { + if !canRead { + return nil, fmt.Errorf("cannot read cache file %s.dat", v.fileName) + } + if !canWrite { + return nil, fmt.Errorf("cannot write cache file %s.dat", v.fileName) + } + if dataFile, err := os.OpenFile(v.fileName+".dat", os.O_RDWR|os.O_CREATE, 0644); err != nil { + return nil, fmt.Errorf("cannot create cache file %s.dat: %v", v.fileName, err) + } else { + v.DataBackend = backend.NewDiskFile(dataFile) + v.lastModTime = modTime + v.fileSize = fileSize + } + } else { + if v.DataBackend, err = backend.CreateVolumeFile(v.fileName+".dat", preallocate, 0); err != nil { + return nil, fmt.Errorf("cannot create cache file %s.dat: %v", v.fileName, err) + } + v.lastModTime = time.Now() + } + + var indexFile *os.File + if indexFile, err = os.OpenFile(v.fileName+".idx", os.O_RDWR|os.O_CREATE, 0644); err != nil { + return nil, fmt.Errorf("cannot write cache index %s.idx: %v", v.fileName, err) + } + + glog.V(0).Infoln("loading leveldb", v.fileName+".ldb") + opts := &opt.Options{ + BlockCacheCapacity: 2 * 1024 * 1024, // default value is 8MiB + WriteBuffer: 1 * 1024 * 1024, // default value is 4MiB + CompactionTableSizeMultiplier: 10, // default value is 1 + } + if v.nm, err = storage.NewLevelDbNeedleMap(v.fileName+".ldb", indexFile, opts); err != nil { + return nil, fmt.Errorf("loading leveldb %s error: %v", v.fileName+".ldb", err) + } + + return v, nil + +} + +func (v *ChunkCacheVolume) Shutdown() { + if v.DataBackend != nil { + v.DataBackend.Close() + v.DataBackend = nil + } + if v.nm != nil { + v.nm.Close() + v.nm = nil + } +} + +func (v *ChunkCacheVolume) destroy() { + v.Shutdown() + os.Remove(v.fileName + ".dat") + os.Remove(v.fileName + ".idx") + os.RemoveAll(v.fileName + ".ldb") +} + +func (v *ChunkCacheVolume) Reset() (*ChunkCacheVolume, error) { + v.destroy() + return LoadOrCreateChunkCacheVolume(v.fileName, v.sizeLimit) +} + +func (v *ChunkCacheVolume) GetNeedle(key types.NeedleId) ([]byte, error) { + + nv, ok := v.nm.Get(key) + if !ok { + return nil, storage.ErrorNotFound + } + data := make([]byte, nv.Size) + if readSize, readErr := v.DataBackend.ReadAt(data, nv.Offset.ToAcutalOffset()); readErr != nil { + return nil, fmt.Errorf("read %s.dat [%d,%d): %v", + v.fileName, nv.Offset.ToAcutalOffset(), nv.Offset.ToAcutalOffset()+int64(nv.Size), readErr) + } else { + if readSize != int(nv.Size) { + return nil, fmt.Errorf("read %d, expected %d", readSize, nv.Size) + } + } + + return data, nil +} + +func (v *ChunkCacheVolume) WriteNeedle(key types.NeedleId, data []byte) error { + + offset := v.fileSize + + written, err := v.DataBackend.WriteAt(data, offset) + if err != nil { + return err + } else if written != len(data) { + return fmt.Errorf("partial written %d, expected %d", written, len(data)) + } + + v.fileSize += int64(written) + extraSize := written % types.NeedlePaddingSize + if extraSize != 0 { + v.DataBackend.WriteAt(v.smallBuffer[:types.NeedlePaddingSize-extraSize], offset+int64(written)) + v.fileSize += int64(types.NeedlePaddingSize - extraSize) + } + + if err := v.nm.Put(key, types.ToOffset(offset), uint32(len(data))); err != nil { + glog.V(4).Infof("failed to save in needle map %d: %v", key, err) + } + + return nil +} diff --git a/weed/util/chunk_cache/chunk_cache_on_disk_test.go b/weed/util/chunk_cache/chunk_cache_on_disk_test.go new file mode 100644 index 000000000..f061f2ba2 --- /dev/null +++ b/weed/util/chunk_cache/chunk_cache_on_disk_test.go @@ -0,0 +1,59 @@ +package chunk_cache + +import ( + "bytes" + "fmt" + "io/ioutil" + "math/rand" + "os" + "testing" +) + +func TestOnDisk(t *testing.T) { + + tmpDir, _ := ioutil.TempDir("", "c") + defer os.RemoveAll(tmpDir) + + totalDiskSizeMb := int64(32) + + cache := NewChunkCache(0, tmpDir, totalDiskSizeMb) + + writeCount := 5 + type test_data struct { + data []byte + fileId string + size uint64 + } + testData := make([]*test_data, writeCount) + for i := 0; i < writeCount; i++ { + buff := make([]byte, 1024*1024) + rand.Read(buff) + testData[i] = &test_data{ + data: buff, + fileId: fmt.Sprintf("1,%daabbccdd", i+1), + size: uint64(len(buff)), + } + cache.SetChunk(testData[i].fileId, testData[i].data) + } + + for i := 0; i < writeCount; i++ { + data := cache.GetChunk(testData[i].fileId, testData[i].size) + if bytes.Compare(data, testData[i].data) != 0 { + t.Errorf("failed to write to and read from cache: %d", i) + } + } + + cache.Shutdown() + + cache = NewChunkCache(0, tmpDir, totalDiskSizeMb) + + for i := 0; i < writeCount; i++ { + data := cache.GetChunk(testData[i].fileId, testData[i].size) + if bytes.Compare(data, testData[i].data) != 0 { + t.Errorf("failed to write to and read from cache: %d", i) + } + } + + cache.Shutdown() + +} diff --git a/weed/util/chunk_cache/on_disk_cache_layer.go b/weed/util/chunk_cache/on_disk_cache_layer.go new file mode 100644 index 000000000..9cf8e3ab2 --- /dev/null +++ b/weed/util/chunk_cache/on_disk_cache_layer.go @@ -0,0 +1,89 @@ +package chunk_cache + +import ( + "fmt" + "path" + "sort" + + "github.com/chrislusf/seaweedfs/weed/glog" + "github.com/chrislusf/seaweedfs/weed/storage" + "github.com/chrislusf/seaweedfs/weed/storage/types" +) + +type OnDiskCacheLayer struct { + diskCaches []*ChunkCacheVolume +} + +func NewOnDiskCacheLayer(dir, namePrefix string, diskSizeMB int64, segmentCount int) *OnDiskCacheLayer { + + volumeCount, volumeSize := int(diskSizeMB/30000), int64(30000) + if volumeCount < segmentCount { + volumeCount, volumeSize = segmentCount, diskSizeMB/int64(segmentCount) + } + + c := &OnDiskCacheLayer{} + for i := 0; i < volumeCount; i++ { + fileName := path.Join(dir, fmt.Sprintf("%s_%d", namePrefix, i)) + diskCache, err := LoadOrCreateChunkCacheVolume(fileName, volumeSize*1024*1024) + if err != nil { + glog.Errorf("failed to add cache %s : %v", fileName, err) + } else { + c.diskCaches = append(c.diskCaches, diskCache) + } + } + + // keep newest cache to the front + sort.Slice(c.diskCaches, func(i, j int) bool { + return c.diskCaches[i].lastModTime.After(c.diskCaches[j].lastModTime) + }) + + return c +} + +func (c *OnDiskCacheLayer) setChunk(needleId types.NeedleId, data []byte) { + + if c.diskCaches[0].fileSize+int64(len(data)) > c.diskCaches[0].sizeLimit { + t, resetErr := c.diskCaches[len(c.diskCaches)-1].Reset() + if resetErr != nil { + glog.Errorf("failed to reset cache file %s", c.diskCaches[len(c.diskCaches)-1].fileName) + return + } + for i := len(c.diskCaches) - 1; i > 0; i-- { + c.diskCaches[i] = c.diskCaches[i-1] + } + c.diskCaches[0] = t + } + + c.diskCaches[0].WriteNeedle(needleId, data) + +} + +func (c *OnDiskCacheLayer) getChunk(needleId types.NeedleId) (data []byte) { + + var err error + + for _, diskCache := range c.diskCaches { + data, err = diskCache.GetNeedle(needleId) + if err == storage.ErrorNotFound { + continue + } + if err != nil { + glog.Errorf("failed to read cache file %s id %d", diskCache.fileName, needleId) + continue + } + if len(data) != 0 { + return + } + } + + return nil + +} + +func (c *OnDiskCacheLayer) shutdown() { + + for _, diskCache := range c.diskCaches { + diskCache.Shutdown() + } + +} diff --git a/weed/util/cipher.go b/weed/util/cipher.go new file mode 100644 index 000000000..f044c2ca3 --- /dev/null +++ b/weed/util/cipher.go @@ -0,0 +1,60 @@ +package util + +import ( + "crypto/aes" + "crypto/cipher" + "crypto/rand" + "errors" + "io" + + "github.com/chrislusf/seaweedfs/weed/glog" +) + +type CipherKey []byte + +func GenCipherKey() CipherKey { + key := make([]byte, 32) + if _, err := io.ReadFull(rand.Reader, key); err != nil { + glog.Fatalf("random key gen: %v", err) + } + return CipherKey(key) +} + +func Encrypt(plaintext []byte, key CipherKey) ([]byte, error) { + c, err := aes.NewCipher(key) + if err != nil { + return nil, err + } + + gcm, err := cipher.NewGCM(c) + if err != nil { + return nil, err + } + + nonce := make([]byte, gcm.NonceSize()) + if _, err = io.ReadFull(rand.Reader, nonce); err != nil { + return nil, err + } + + return gcm.Seal(nonce, nonce, plaintext, nil), nil +} + +func Decrypt(ciphertext []byte, key CipherKey) ([]byte, error) { + c, err := aes.NewCipher(key) + if err != nil { + return nil, err + } + + gcm, err := cipher.NewGCM(c) + if err != nil { + return nil, err + } + + nonceSize := gcm.NonceSize() + if len(ciphertext) < nonceSize { + return nil, errors.New("ciphertext too short") + } + + nonce, ciphertext := ciphertext[:nonceSize], ciphertext[nonceSize:] + return gcm.Open(nil, nonce, ciphertext, nil) +} diff --git a/weed/util/cipher_test.go b/weed/util/cipher_test.go new file mode 100644 index 000000000..026c96ea3 --- /dev/null +++ b/weed/util/cipher_test.go @@ -0,0 +1,17 @@ +package util + +import ( + "encoding/base64" + "testing" +) + +func TestSameAsJavaImplementation(t *testing.T) { + str := "QVVhmqg112NMT7F+G/7QPynqSln3xPIhKdFGmTVKZD6IS0noyr2Z5kXFF6fPjZ/7Hq8kRhlmLeeqZUccxyaZHezOdgkjS6d4NTdHf5IjXzk7" + cipherText, _ := base64.StdEncoding.DecodeString(str) + secretKey := []byte("256-bit key for AES 256 GCM encr") + plantext, err := Decrypt(cipherText, CipherKey(secretKey)) + if err != nil { + println(err.Error()) + } + println(string(plantext)) +} diff --git a/weed/util/compression.go b/weed/util/compression.go index 6072df632..1f778b5d5 100644 --- a/weed/util/compression.go +++ b/weed/util/compression.go @@ -7,8 +7,9 @@ import ( "io/ioutil" "strings" - "github.com/chrislusf/seaweedfs/weed/glog" "golang.org/x/tools/godoc/util" + + "github.com/chrislusf/seaweedfs/weed/glog" ) func GzipData(input []byte) ([]byte, error) { @@ -37,7 +38,8 @@ func UnGzipData(input []byte) ([]byte, error) { /* * Default more not to gzip since gzip can be done on client side. - */func IsGzippable(ext, mtype string, data []byte) bool { + */ +func IsGzippable(ext, mtype string, data []byte) bool { shouldBeZipped, iAmSure := IsGzippableFileType(ext, mtype) if iAmSure { diff --git a/weed/util/compression_test.go b/weed/util/compression_test.go new file mode 100644 index 000000000..b515e8988 --- /dev/null +++ b/weed/util/compression_test.go @@ -0,0 +1,21 @@ +package util + +import ( + "testing" + + "golang.org/x/tools/godoc/util" +) + +func TestIsGzippable(t *testing.T) { + buf := make([]byte, 1024) + + isText := util.IsText(buf) + + if isText { + t.Error("buf with zeros are not text") + } + + compressed, _ := GzipData(buf) + + t.Logf("compressed size %d\n", len(compressed)) +} diff --git a/weed/util/config.go b/weed/util/config.go index dfbfdbd82..7b6e92f08 100644 --- a/weed/util/config.go +++ b/weed/util/config.go @@ -27,7 +27,7 @@ func LoadConfiguration(configFileName string, required bool) (loaded bool) { glog.V(1).Infof("Reading %s.toml from %s", configFileName, viper.ConfigFileUsed()) if err := viper.MergeInConfig(); err != nil { // Handle errors reading the config file - glog.V(0).Infof("Reading %s: %v", viper.ConfigFileUsed(), err) + glog.V(1).Infof("Reading %s: %v", viper.ConfigFileUsed(), err) if required { glog.Fatalf("Failed to load %s.toml file from current directory, or $HOME/.seaweedfs/, or /etc/seaweedfs/"+ "\n\nPlease use this command to generate the default %s.toml file\n"+ @@ -42,7 +42,8 @@ func LoadConfiguration(configFileName string, required bool) (loaded bool) { } func GetViper() *viper.Viper { - v := viper.GetViper() + v := &viper.Viper{} + *v = *viper.GetViper() v.AutomaticEnv() v.SetEnvPrefix("weed") v.SetEnvKeyReplacer(strings.NewReplacer(".", "_")) diff --git a/weed/util/constants.go b/weed/util/constants.go index 3d61b2006..7c3927a66 100644 --- a/weed/util/constants.go +++ b/weed/util/constants.go @@ -5,5 +5,10 @@ import ( ) var ( - VERSION = fmt.Sprintf("%s %d.%d", sizeLimit, 1, 53) + VERSION = fmt.Sprintf("%s %d.%d", sizeLimit, 1, 79) + COMMIT = "" ) + +func Version() string { + return VERSION + " " + COMMIT +}
\ No newline at end of file diff --git a/weed/util/fullpath.go b/weed/util/fullpath.go new file mode 100644 index 000000000..4ce8a2f90 --- /dev/null +++ b/weed/util/fullpath.go @@ -0,0 +1,56 @@ +package util + +import ( + "path/filepath" + "strings" +) + +type FullPath string + +func NewFullPath(dir, name string) FullPath { + return FullPath(dir).Child(name) +} + +func (fp FullPath) DirAndName() (string, string) { + dir, name := filepath.Split(string(fp)) + if dir == "/" { + return dir, name + } + if len(dir) < 1 { + return "/", "" + } + return dir[:len(dir)-1], name +} + +func (fp FullPath) Name() string { + _, name := filepath.Split(string(fp)) + return name +} + +func (fp FullPath) Child(name string) FullPath { + dir := string(fp) + if strings.HasSuffix(dir, "/") { + return FullPath(dir + name) + } + return FullPath(dir + "/" + name) +} + +func (fp FullPath) AsInode() uint64 { + return uint64(HashStringToLong(string(fp))) +} + +// split, but skipping the root +func (fp FullPath) Split() []string { + if fp == "" || fp == "/" { + return []string{} + } + return strings.Split(string(fp)[1:], "/") +} + +func Join(names ...string) string { + return filepath.ToSlash(filepath.Join(names...)) +} + +func JoinPath(names ...string) FullPath { + return FullPath(Join(names...)) +} diff --git a/weed/util/pprof.go b/weed/util/grace/pprof.go index a2621ceee..14686bfc8 100644 --- a/weed/util/pprof.go +++ b/weed/util/grace/pprof.go @@ -1,4 +1,4 @@ -package util +package grace import ( "os" diff --git a/weed/util/signal_handling.go b/weed/util/grace/signal_handling.go index 99447e8be..7cca46764 100644 --- a/weed/util/signal_handling.go +++ b/weed/util/grace/signal_handling.go @@ -1,6 +1,6 @@ // +build !plan9 -package util +package grace import ( "os" diff --git a/weed/util/signal_handling_notsupported.go b/weed/util/grace/signal_handling_notsupported.go index c389cfb7e..5335915a1 100644 --- a/weed/util/signal_handling_notsupported.go +++ b/weed/util/grace/signal_handling_notsupported.go @@ -1,6 +1,6 @@ // +build plan9 -package util +package grace func OnInterrupt(fn func()) { } diff --git a/weed/util/grpc_client_server.go b/weed/util/grpc_client_server.go deleted file mode 100644 index 7e396342b..000000000 --- a/weed/util/grpc_client_server.go +++ /dev/null @@ -1,127 +0,0 @@ -package util - -import ( - "context" - "fmt" - "net/http" - "strconv" - "strings" - "sync" - "time" - - "google.golang.org/grpc" - "google.golang.org/grpc/keepalive" -) - -var ( - // cache grpc connections - grpcClients = make(map[string]*grpc.ClientConn) - grpcClientsLock sync.Mutex -) - -func init() { - http.DefaultTransport.(*http.Transport).MaxIdleConnsPerHost = 1024 -} - -func NewGrpcServer(opts ...grpc.ServerOption) *grpc.Server { - var options []grpc.ServerOption - options = append(options, grpc.KeepaliveParams(keepalive.ServerParameters{ - Time: 10 * time.Second, // wait time before ping if no activity - Timeout: 20 * time.Second, // ping timeout - }), grpc.KeepaliveEnforcementPolicy(keepalive.EnforcementPolicy{ - MinTime: 60 * time.Second, // min time a client should wait before sending a ping - })) - for _, opt := range opts { - if opt != nil { - options = append(options, opt) - } - } - return grpc.NewServer(options...) -} - -func GrpcDial(ctx context.Context, address string, opts ...grpc.DialOption) (*grpc.ClientConn, error) { - // opts = append(opts, grpc.WithBlock()) - // opts = append(opts, grpc.WithTimeout(time.Duration(5*time.Second))) - var options []grpc.DialOption - options = append(options, - // grpc.WithInsecure(), - grpc.WithKeepaliveParams(keepalive.ClientParameters{ - Time: 30 * time.Second, // client ping server if no activity for this long - Timeout: 20 * time.Second, - })) - for _, opt := range opts { - if opt != nil { - options = append(options, opt) - } - } - return grpc.DialContext(ctx, address, options...) -} - -func WithCachedGrpcClient(ctx context.Context, fn func(context.Context, *grpc.ClientConn) error, address string, opts ...grpc.DialOption) error { - - grpcClientsLock.Lock() - - existingConnection, found := grpcClients[address] - if found { - grpcClientsLock.Unlock() - err := fn(ctx, existingConnection) - if err != nil { - grpcClientsLock.Lock() - delete(grpcClients, address) - grpcClientsLock.Unlock() - existingConnection.Close() - } - return err - } - - grpcConnection, err := GrpcDial(ctx, address, opts...) - if err != nil { - grpcClientsLock.Unlock() - return fmt.Errorf("fail to dial %s: %v", address, err) - } - - grpcClients[address] = grpcConnection - grpcClientsLock.Unlock() - - err = fn(ctx, grpcConnection) - if err != nil { - grpcClientsLock.Lock() - delete(grpcClients, address) - grpcClientsLock.Unlock() - grpcConnection.Close() - } - - return err -} - -func ParseServerToGrpcAddress(server string) (serverGrpcAddress string, err error) { - colonIndex := strings.LastIndex(server, ":") - if colonIndex < 0 { - return "", fmt.Errorf("server should have hostname:port format: %v", server) - } - - port, parseErr := strconv.ParseUint(server[colonIndex+1:], 10, 64) - if parseErr != nil { - return "", fmt.Errorf("server port parse error: %v", parseErr) - } - - grpcPort := int(port) + 10000 - - return fmt.Sprintf("%s:%d", server[:colonIndex], grpcPort), nil -} - -func ServerToGrpcAddress(server string) (serverGrpcAddress string) { - hostnameAndPort := strings.Split(server, ":") - if len(hostnameAndPort) != 2 { - return fmt.Sprintf("unexpected server address: %s", server) - } - - port, parseErr := strconv.ParseUint(hostnameAndPort[1], 10, 64) - if parseErr != nil { - return fmt.Sprintf("failed to parse port for %s:%s", hostnameAndPort[0], hostnameAndPort[1]) - } - - grpcPort := int(port) + 10000 - - return fmt.Sprintf("%s:%d", hostnameAndPort[0], grpcPort) -} diff --git a/weed/util/http_util.go b/weed/util/http_util.go index 08007a038..5df79a7be 100644 --- a/weed/util/http_util.go +++ b/weed/util/http_util.go @@ -35,13 +35,13 @@ func PostBytes(url string, body []byte) ([]byte, error) { return nil, fmt.Errorf("Post to %s: %v", url, err) } defer r.Body.Close() - if r.StatusCode >= 400 { - return nil, fmt.Errorf("%s: %s", url, r.Status) - } b, err := ioutil.ReadAll(r.Body) if err != nil { return nil, fmt.Errorf("Read response body: %v", err) } + if r.StatusCode >= 400 { + return nil, fmt.Errorf("%s: %s", url, r.Status) + } return b, nil } @@ -88,7 +88,7 @@ func Head(url string) (http.Header, error) { if err != nil { return nil, err } - defer r.Body.Close() + defer CloseResponse(r) if r.StatusCode >= 400 { return nil, fmt.Errorf("%s: %s", url, r.Status) } @@ -117,7 +117,7 @@ func Delete(url string, jwt string) error { return nil } m := make(map[string]interface{}) - if e := json.Unmarshal(body, m); e == nil { + if e := json.Unmarshal(body, &m); e == nil { if s, ok := m["error"].(string); ok { return errors.New(s) } @@ -130,7 +130,7 @@ func GetBufferStream(url string, values url.Values, allocatedBytes []byte, eachB if err != nil { return err } - defer r.Body.Close() + defer CloseResponse(r) if r.StatusCode != 200 { return fmt.Errorf("%s: %s", url, r.Status) } @@ -153,7 +153,7 @@ func GetUrlStream(url string, values url.Values, readFn func(io.Reader) error) e if err != nil { return err } - defer r.Body.Close() + defer CloseResponse(r) if r.StatusCode != 200 { return fmt.Errorf("%s: %s", url, r.Status) } @@ -189,13 +189,21 @@ func NormalizeUrl(url string) string { return "http://" + url } -func ReadUrl(fileUrl string, offset int64, size int, buf []byte, isReadRange bool) (int64, error) { +func ReadUrl(fileUrl string, cipherKey []byte, isGzipped bool, isFullChunk bool, offset int64, size int, buf []byte) (int64, error) { + + if cipherKey != nil { + var n int + err := readEncryptedUrl(fileUrl, cipherKey, isGzipped, isFullChunk, offset, size, func(data []byte) { + n = copy(buf, data) + }) + return int64(n), err + } req, err := http.NewRequest("GET", fileUrl, nil) if err != nil { return 0, err } - if isReadRange { + if !isFullChunk { req.Header.Add("Range", fmt.Sprintf("bytes=%d-%d", offset, offset+int64(size)-1)) } else { req.Header.Set("Accept-Encoding", "gzip") @@ -250,43 +258,74 @@ func ReadUrl(fileUrl string, offset int64, size int, buf []byte, isReadRange boo return n, err } -func ReadUrlAsStream(fileUrl string, offset int64, size int, fn func(data []byte)) (int64, error) { +func ReadUrlAsStream(fileUrl string, cipherKey []byte, isContentGzipped bool, isFullChunk bool, offset int64, size int, fn func(data []byte)) error { + + if cipherKey != nil { + return readEncryptedUrl(fileUrl, cipherKey, isContentGzipped, isFullChunk, offset, size, fn) + } req, err := http.NewRequest("GET", fileUrl, nil) if err != nil { - return 0, err + return err + } + + if !isFullChunk { + req.Header.Add("Range", fmt.Sprintf("bytes=%d-%d", offset, offset+int64(size)-1)) } - req.Header.Add("Range", fmt.Sprintf("bytes=%d-%d", offset, offset+int64(size)-1)) r, err := client.Do(req) if err != nil { - return 0, err + return err } - defer r.Body.Close() + defer CloseResponse(r) if r.StatusCode >= 400 { - return 0, fmt.Errorf("%s: %s", fileUrl, r.Status) + return fmt.Errorf("%s: %s", fileUrl, r.Status) } var ( m int - n int64 ) buf := make([]byte, 64*1024) for { m, err = r.Body.Read(buf) fn(buf[:m]) - n += int64(m) if err == io.EOF { - return n, nil + return nil } if err != nil { - return n, err + return err } } } +func readEncryptedUrl(fileUrl string, cipherKey []byte, isContentGzipped bool, isFullChunk bool, offset int64, size int, fn func(data []byte)) error { + encryptedData, err := Get(fileUrl) + if err != nil { + return fmt.Errorf("fetch %s: %v", fileUrl, err) + } + decryptedData, err := Decrypt(encryptedData, CipherKey(cipherKey)) + if err != nil { + return fmt.Errorf("decrypt %s: %v", fileUrl, err) + } + if isContentGzipped { + decryptedData, err = UnGzipData(decryptedData) + if err != nil { + return fmt.Errorf("unzip decrypt %s: %v", fileUrl, err) + } + } + if len(decryptedData) < int(offset)+size { + return fmt.Errorf("read decrypted %s size %d [%d, %d)", fileUrl, len(decryptedData), offset, int(offset)+size) + } + if isFullChunk { + fn(decryptedData) + } else { + fn(decryptedData[int(offset) : int(offset)+size]) + } + return nil +} + func ReadUrlAsReaderCloser(fileUrl string, rangeHeader string) (io.ReadCloser, error) { req, err := http.NewRequest("GET", fileUrl, nil) @@ -307,3 +346,8 @@ func ReadUrlAsReaderCloser(fileUrl string, rangeHeader string) (io.ReadCloser, e return r.Body, nil } + +func CloseResponse(resp *http.Response) { + io.Copy(ioutil.Discard, resp.Body) + resp.Body.Close() +} diff --git a/weed/util/inits.go b/weed/util/inits.go new file mode 100644 index 000000000..378878012 --- /dev/null +++ b/weed/util/inits.go @@ -0,0 +1,52 @@ +package util + +import ( + "fmt" + "sort" +) + +// HumanReadableIntsMax joins a serials of inits into a smart one like 1-3 5 ... for human readable. +func HumanReadableIntsMax(max int, ids ...int) string { + if len(ids) <= max { + return HumanReadableInts(ids...) + } + + return HumanReadableInts(ids[:max]...) + " ..." +} + +// HumanReadableInts joins a serials of inits into a smart one like 1-3 5 7-10 for human readable. +func HumanReadableInts(ids ...int) string { + sort.Ints(ids) + + s := "" + start := 0 + last := 0 + + for i, v := range ids { + if i == 0 { + start = v + last = v + s = fmt.Sprintf("%d", v) + continue + } + + if last+1 == v { + last = v + continue + } + + if last > start { + s += fmt.Sprintf("-%d", last) + } + + s += fmt.Sprintf(" %d", v) + start = v + last = v + } + + if last != start { + s += fmt.Sprintf("-%d", last) + } + + return s +} diff --git a/weed/util/inits_test.go b/weed/util/inits_test.go new file mode 100644 index 000000000..f2c9b701f --- /dev/null +++ b/weed/util/inits_test.go @@ -0,0 +1,19 @@ +package util + +import ( + "github.com/stretchr/testify/assert" + "testing" +) + +func TestHumanReadableIntsMax(t *testing.T) { + assert.Equal(t, "1-2 ...", HumanReadableIntsMax(2, 1, 2, 3)) + assert.Equal(t, "1 3 ...", HumanReadableIntsMax(2, 1, 3, 5)) +} + +func TestHumanReadableInts(t *testing.T) { + assert.Equal(t, "1-3", HumanReadableInts(1, 2, 3)) + assert.Equal(t, "1 3", HumanReadableInts(1, 3)) + assert.Equal(t, "1 3 5", HumanReadableInts(5, 1, 3)) + assert.Equal(t, "1-3 5", HumanReadableInts(1, 2, 3, 5)) + assert.Equal(t, "1-3 5 7-9", HumanReadableInts(7, 9, 8, 1, 2, 3, 5)) +} diff --git a/weed/util/log_buffer/log_buffer.go b/weed/util/log_buffer/log_buffer.go new file mode 100644 index 000000000..b02c45b52 --- /dev/null +++ b/weed/util/log_buffer/log_buffer.go @@ -0,0 +1,278 @@ +package log_buffer + +import ( + "bytes" + "sync" + "time" + + "github.com/golang/protobuf/proto" + + "github.com/chrislusf/seaweedfs/weed/glog" + "github.com/chrislusf/seaweedfs/weed/pb/filer_pb" + "github.com/chrislusf/seaweedfs/weed/util" +) + +const BufferSize = 4 * 1024 * 1024 +const PreviousBufferCount = 3 + +type dataToFlush struct { + startTime time.Time + stopTime time.Time + data *bytes.Buffer +} + +type LogBuffer struct { + prevBuffers *SealedBuffers + buf []byte + idx []int + pos int + startTime time.Time + stopTime time.Time + sizeBuf []byte + flushInterval time.Duration + flushFn func(startTime, stopTime time.Time, buf []byte) + notifyFn func() + isStopping bool + flushChan chan *dataToFlush + lastTsNs int64 + sync.RWMutex +} + +func NewLogBuffer(flushInterval time.Duration, flushFn func(startTime, stopTime time.Time, buf []byte), notifyFn func()) *LogBuffer { + lb := &LogBuffer{ + prevBuffers: newSealedBuffers(PreviousBufferCount), + buf: make([]byte, BufferSize), + sizeBuf: make([]byte, 4), + flushInterval: flushInterval, + flushFn: flushFn, + notifyFn: notifyFn, + flushChan: make(chan *dataToFlush, 256), + } + go lb.loopFlush() + go lb.loopInterval() + return lb +} + +func (m *LogBuffer) AddToBuffer(partitionKey, data []byte) { + + m.Lock() + defer func() { + m.Unlock() + if m.notifyFn != nil { + m.notifyFn() + } + }() + + // need to put the timestamp inside the lock + ts := time.Now() + tsNs := ts.UnixNano() + if m.lastTsNs >= tsNs { + // this is unlikely to happen, but just in case + tsNs = m.lastTsNs + 1 + ts = time.Unix(0, tsNs) + } + m.lastTsNs = tsNs + logEntry := &filer_pb.LogEntry{ + TsNs: tsNs, + PartitionKeyHash: util.HashToInt32(partitionKey), + Data: data, + } + + logEntryData, _ := proto.Marshal(logEntry) + + size := len(logEntryData) + + if m.pos == 0 { + m.startTime = ts + } + + if m.startTime.Add(m.flushInterval).Before(ts) || len(m.buf)-m.pos < size+4 { + m.flushChan <- m.copyToFlush() + m.startTime = ts + if len(m.buf) < size+4 { + m.buf = make([]byte, 2*size+4) + } + } + m.stopTime = ts + + m.idx = append(m.idx, m.pos) + util.Uint32toBytes(m.sizeBuf, uint32(size)) + copy(m.buf[m.pos:m.pos+4], m.sizeBuf) + copy(m.buf[m.pos+4:m.pos+4+size], logEntryData) + m.pos += size + 4 + + // fmt.Printf("entry size %d total %d count %d, buffer:%p\n", size, m.pos, len(m.idx), m) + +} + +func (m *LogBuffer) Shutdown() { + m.Lock() + defer m.Unlock() + + if m.isStopping { + return + } + m.isStopping = true + toFlush := m.copyToFlush() + m.flushChan <- toFlush + close(m.flushChan) +} + +func (m *LogBuffer) loopFlush() { + for d := range m.flushChan { + if d != nil { + // fmt.Printf("flush [%v, %v] size %d\n", d.startTime, d.stopTime, len(d.data.Bytes())) + m.flushFn(d.startTime, d.stopTime, d.data.Bytes()) + d.releaseMemory() + } + } +} + +func (m *LogBuffer) loopInterval() { + for !m.isStopping { + time.Sleep(m.flushInterval) + m.Lock() + if m.isStopping { + m.Unlock() + return + } + // println("loop interval") + toFlush := m.copyToFlush() + m.flushChan <- toFlush + m.Unlock() + } +} + +func (m *LogBuffer) copyToFlush() *dataToFlush { + + if m.flushFn != nil && m.pos > 0 { + // fmt.Printf("flush buffer %d pos %d empty space %d\n", len(m.buf), m.pos, len(m.buf)-m.pos) + d := &dataToFlush{ + startTime: m.startTime, + stopTime: m.stopTime, + data: copiedBytes(m.buf[:m.pos]), + } + // fmt.Printf("flusing [0,%d) with %d entries\n", m.pos, len(m.idx)) + m.buf = m.prevBuffers.SealBuffer(m.startTime, m.stopTime, m.buf, m.pos) + m.pos = 0 + m.idx = m.idx[:0] + return d + } + return nil +} + +func (d *dataToFlush) releaseMemory() { + d.data.Reset() + bufferPool.Put(d.data) +} + +func (m *LogBuffer) ReadFromBuffer(lastReadTime time.Time) (bufferCopy *bytes.Buffer) { + m.RLock() + defer m.RUnlock() + + /* + fmt.Printf("read buffer %p: %v last stop time: [%v,%v], pos %d, entries:%d, prevBufs:%d\n", m, lastReadTime, m.startTime, m.stopTime, m.pos, len(m.idx), len(m.prevBuffers.buffers)) + for i, prevBuf := range m.prevBuffers.buffers { + fmt.Printf(" prev %d : %s\n", i, prevBuf.String()) + } + */ + + if lastReadTime.Equal(m.stopTime) { + return nil + } + if lastReadTime.After(m.stopTime) { + // glog.Fatalf("unexpected last read time %v, older than latest %v", lastReadTime, m.stopTime) + return nil + } + if lastReadTime.Before(m.startTime) { + // println("checking ", lastReadTime.UnixNano()) + for i, buf := range m.prevBuffers.buffers { + if buf.startTime.After(lastReadTime) { + if i == 0 { + // println("return the earliest in memory", buf.startTime.UnixNano()) + return copiedBytes(buf.buf[:buf.size]) + } + // println("return the", i, "th in memory", buf.startTime.UnixNano()) + return copiedBytes(buf.buf[:buf.size]) + } + if !buf.startTime.After(lastReadTime) && buf.stopTime.After(lastReadTime) { + pos := buf.locateByTs(lastReadTime) + // fmt.Printf("locate buffer[%d] pos %d\n", i, pos) + return copiedBytes(buf.buf[pos:buf.size]) + } + } + // println("return the current buf", lastReadTime.UnixNano()) + return copiedBytes(m.buf[:m.pos]) + } + + lastTs := lastReadTime.UnixNano() + l, h := 0, len(m.idx)-1 + + /* + for i, pos := range m.idx { + logEntry, ts := readTs(m.buf, pos) + event := &filer_pb.SubscribeMetadataResponse{} + proto.Unmarshal(logEntry.Data, event) + entry := event.EventNotification.OldEntry + if entry == nil { + entry = event.EventNotification.NewEntry + } + fmt.Printf("entry %d ts: %v offset:%d dir:%s name:%s\n", i, time.Unix(0, ts), pos, event.Directory, entry.Name) + } + fmt.Printf("l=%d, h=%d\n", l, h) + */ + + for l <= h { + mid := (l + h) / 2 + pos := m.idx[mid] + _, t := readTs(m.buf, pos) + if t <= lastTs { + l = mid + 1 + } else if lastTs < t { + var prevT int64 + if mid > 0 { + _, prevT = readTs(m.buf, m.idx[mid-1]) + } + if prevT <= lastTs { + // fmt.Printf("found l=%d, m-1=%d(ts=%d), m=%d(ts=%d), h=%d [%d, %d) \n", l, mid-1, prevT, mid, t, h, pos, m.pos) + return copiedBytes(m.buf[pos:m.pos]) + } + h = mid + } + // fmt.Printf("l=%d, h=%d\n", l, h) + } + + // FIXME: this could be that the buffer has been flushed already + return nil + +} +func (m *LogBuffer) ReleaseMeory(b *bytes.Buffer) { + bufferPool.Put(b) +} + +var bufferPool = sync.Pool{ + New: func() interface{} { + return new(bytes.Buffer) + }, +} + +func copiedBytes(buf []byte) (copied *bytes.Buffer) { + copied = bufferPool.Get().(*bytes.Buffer) + copied.Reset() + copied.Write(buf) + return +} + +func readTs(buf []byte, pos int) (size int, ts int64) { + + size = int(util.BytesToUint32(buf[pos : pos+4])) + entryData := buf[pos+4 : pos+4+size] + logEntry := &filer_pb.LogEntry{} + + err := proto.Unmarshal(entryData, logEntry) + if err != nil { + glog.Fatalf("unexpected unmarshal filer_pb.LogEntry: %v", err) + } + return size, logEntry.TsNs + +} diff --git a/weed/util/log_buffer/log_buffer_test.go b/weed/util/log_buffer/log_buffer_test.go new file mode 100644 index 000000000..f9ccc95c2 --- /dev/null +++ b/weed/util/log_buffer/log_buffer_test.go @@ -0,0 +1,42 @@ +package log_buffer + +import ( + "fmt" + "math/rand" + "testing" + "time" + + "github.com/chrislusf/seaweedfs/weed/pb/filer_pb" +) + +func TestNewLogBufferFirstBuffer(t *testing.T) { + lb := NewLogBuffer(time.Minute, func(startTime, stopTime time.Time, buf []byte) { + + }, func() { + + }) + + startTime := time.Now() + + messageSize := 1024 + messageCount := 5000 + var buf = make([]byte, messageSize) + for i := 0; i < messageCount; i++ { + rand.Read(buf) + lb.AddToBuffer(nil, buf) + } + + receivedmessageCount := 0 + lb.LoopProcessLogData(startTime, func() bool { + // stop if no more messages + return false + }, func(logEntry *filer_pb.LogEntry) error { + receivedmessageCount++ + return nil + }) + + if receivedmessageCount != messageCount { + fmt.Printf("sent %d received %d\n", messageCount, receivedmessageCount) + } + +} diff --git a/weed/util/log_buffer/log_read.go b/weed/util/log_buffer/log_read.go new file mode 100644 index 000000000..2b73a8064 --- /dev/null +++ b/weed/util/log_buffer/log_read.go @@ -0,0 +1,77 @@ +package log_buffer + +import ( + "bytes" + "time" + + "github.com/golang/protobuf/proto" + + "github.com/chrislusf/seaweedfs/weed/glog" + "github.com/chrislusf/seaweedfs/weed/pb/filer_pb" + "github.com/chrislusf/seaweedfs/weed/util" +) + +func (logBuffer *LogBuffer) LoopProcessLogData( + startTreadTime time.Time, + waitForDataFn func() bool, + eachLogDataFn func(logEntry *filer_pb.LogEntry) error) (err error) { + // loop through all messages + var bytesBuf *bytes.Buffer + lastReadTime := startTreadTime + defer func() { + if bytesBuf != nil { + logBuffer.ReleaseMeory(bytesBuf) + } + }() + + for { + + if bytesBuf != nil { + logBuffer.ReleaseMeory(bytesBuf) + } + bytesBuf = logBuffer.ReadFromBuffer(lastReadTime) + // fmt.Printf("ReadFromBuffer by %v\n", lastReadTime) + if bytesBuf == nil { + if waitForDataFn() { + continue + } else { + return + } + } + + buf := bytesBuf.Bytes() + // fmt.Printf("ReadFromBuffer by %v size %d\n", lastReadTime, len(buf)) + + batchSize := 0 + var startReadTime time.Time + + for pos := 0; pos+4 < len(buf); { + + size := util.BytesToUint32(buf[pos : pos+4]) + entryData := buf[pos+4 : pos+4+int(size)] + + // fmt.Printf("read buffer read %d [%d,%d) from [0,%d)\n", batchSize, pos, pos+int(size)+4, len(buf)) + + logEntry := &filer_pb.LogEntry{} + if err = proto.Unmarshal(entryData, logEntry); err != nil { + glog.Errorf("unexpected unmarshal messaging_pb.Message: %v", err) + pos += 4 + int(size) + continue + } + lastReadTime = time.Unix(0, logEntry.TsNs) + if startReadTime.IsZero() { + startReadTime = lastReadTime + } + + if err = eachLogDataFn(logEntry); err != nil { + return + } + + pos += 4 + int(size) + batchSize++ + } + + // fmt.Printf("sent message ts[%d,%d] size %d\n", startReadTime.UnixNano(), lastReadTime.UnixNano(), batchSize) + } + +} diff --git a/weed/util/log_buffer/sealed_buffer.go b/weed/util/log_buffer/sealed_buffer.go new file mode 100644 index 000000000..d133cf8d3 --- /dev/null +++ b/weed/util/log_buffer/sealed_buffer.go @@ -0,0 +1,62 @@ +package log_buffer + +import ( + "fmt" + "time" +) + +type MemBuffer struct { + buf []byte + size int + startTime time.Time + stopTime time.Time +} + +type SealedBuffers struct { + buffers []*MemBuffer +} + +func newSealedBuffers(size int) *SealedBuffers { + sbs := &SealedBuffers{} + + sbs.buffers = make([]*MemBuffer, size) + for i := 0; i < size; i++ { + sbs.buffers[i] = &MemBuffer{ + buf: make([]byte, BufferSize), + } + } + + return sbs +} + +func (sbs *SealedBuffers) SealBuffer(startTime, stopTime time.Time, buf []byte, pos int) (newBuf []byte) { + oldMemBuffer := sbs.buffers[0] + size := len(sbs.buffers) + for i := 0; i < size-1; i++ { + sbs.buffers[i].buf = sbs.buffers[i+1].buf + sbs.buffers[i].size = sbs.buffers[i+1].size + sbs.buffers[i].startTime = sbs.buffers[i+1].startTime + sbs.buffers[i].stopTime = sbs.buffers[i+1].stopTime + } + sbs.buffers[size-1].buf = buf + sbs.buffers[size-1].size = pos + sbs.buffers[size-1].startTime = startTime + sbs.buffers[size-1].stopTime = stopTime + return oldMemBuffer.buf +} + +func (mb *MemBuffer) locateByTs(lastReadTime time.Time) (pos int) { + lastReadTs := lastReadTime.UnixNano() + for pos < len(mb.buf) { + size, t := readTs(mb.buf, pos) + if t > lastReadTs { + return + } + pos += size + 4 + } + return len(mb.buf) +} + +func (mb *MemBuffer) String() string { + return fmt.Sprintf("[%v,%v] bytes:%d", mb.startTime, mb.stopTime, mb.size) +} diff --git a/weed/util/net_timeout.go b/weed/util/net_timeout.go index b8068e67f..f057a8f5b 100644 --- a/weed/util/net_timeout.go +++ b/weed/util/net_timeout.go @@ -35,6 +35,7 @@ type Conn struct { net.Conn ReadTimeout time.Duration WriteTimeout time.Duration + isClosed bool } func (c *Conn) Read(b []byte) (count int, e error) { @@ -68,7 +69,10 @@ func (c *Conn) Write(b []byte) (count int, e error) { func (c *Conn) Close() error { err := c.Conn.Close() if err == nil { - stats.ConnectionClose() + if !c.isClosed { + stats.ConnectionClose() + c.isClosed = true + } } return err } diff --git a/weed/util/network.go b/weed/util/network.go new file mode 100644 index 000000000..7108cfea6 --- /dev/null +++ b/weed/util/network.go @@ -0,0 +1,25 @@ +package util + +import ( + "net" + + "github.com/chrislusf/seaweedfs/weed/glog" +) + +func DetectedHostAddress() string { + addrs, err := net.InterfaceAddrs() + if err != nil { + glog.V(0).Infof("failed to detect ip address: %v", err) + return "" + } + + for _, a := range addrs { + if ipnet, ok := a.(*net.IPNet); ok && !ipnet.IP.IsLoopback() { + if ipnet.IP.To4() != nil { + return ipnet.IP.String() + } + } + } + + return "localhost" +} diff --git a/weed/util/parse.go b/weed/util/parse.go index 6593d43b6..0955db682 100644 --- a/weed/util/parse.go +++ b/weed/util/parse.go @@ -1,6 +1,7 @@ package util import ( + "fmt" "net/url" "strconv" "strings" @@ -45,3 +46,18 @@ func ParseFilerUrl(entryPath string) (filerServer string, filerPort int64, path path = u.Path return } + +func ParseHostPort(hostPort string) (filerServer string, filerPort int64, err error) { + parts := strings.Split(hostPort, ":") + if len(parts) != 2 { + err = fmt.Errorf("failed to parse %s\n", hostPort) + return + } + + filerPort, err = strconv.ParseInt(parts[1], 10, 64) + if err == nil { + filerServer = parts[0] + } + + return +} diff --git a/weed/util/queue_unbounded.go b/weed/util/queue_unbounded.go new file mode 100644 index 000000000..496b9f844 --- /dev/null +++ b/weed/util/queue_unbounded.go @@ -0,0 +1,45 @@ +package util + +import "sync" + +type UnboundedQueue struct { + outbound []string + outboundLock sync.RWMutex + inbound []string + inboundLock sync.RWMutex +} + +func NewUnboundedQueue() *UnboundedQueue { + q := &UnboundedQueue{} + return q +} + +func (q *UnboundedQueue) EnQueue(items ...string) { + q.inboundLock.Lock() + defer q.inboundLock.Unlock() + + q.inbound = append(q.inbound, items...) + +} + +func (q *UnboundedQueue) Consume(fn func([]string)) { + q.outboundLock.Lock() + defer q.outboundLock.Unlock() + + if len(q.outbound) == 0 { + q.inboundLock.Lock() + inbountLen := len(q.inbound) + if inbountLen > 0 { + t := q.outbound + q.outbound = q.inbound + q.inbound = t + } + q.inboundLock.Unlock() + } + + if len(q.outbound) > 0 { + fn(q.outbound) + q.outbound = q.outbound[:0] + } + +} diff --git a/weed/util/queue_unbounded_test.go b/weed/util/queue_unbounded_test.go new file mode 100644 index 000000000..2d02032cb --- /dev/null +++ b/weed/util/queue_unbounded_test.go @@ -0,0 +1,25 @@ +package util + +import "testing" + +func TestEnqueueAndConsume(t *testing.T) { + + q := NewUnboundedQueue() + + q.EnQueue("1", "2", "3") + + f := func(items []string) { + for _, t := range items { + println(t) + } + println("-----------------------") + } + q.Consume(f) + + q.Consume(f) + + q.EnQueue("4", "5") + q.EnQueue("6", "7") + q.Consume(f) + +} |
