diff options
| author | bukton <buk_ton2@hotmail.com> | 2020-04-19 00:21:45 +0700 |
|---|---|---|
| committer | bukton <buk_ton2@hotmail.com> | 2020-04-19 00:21:45 +0700 |
| commit | 290c6b7f01f7b148a65ba10dd6536ad2567d7653 (patch) | |
| tree | e1abb141849419c40691097eea032b944fcbd748 /weed/util | |
| parent | 6234ea441b6388838a19635c656316047f42917d (diff) | |
| parent | 11f5a6d91346e5f3cbf3b46e0a660e231c5c2998 (diff) | |
| download | seaweedfs-290c6b7f01f7b148a65ba10dd6536ad2567d7653.tar.xz seaweedfs-290c6b7f01f7b148a65ba10dd6536ad2567d7653.zip | |
Merge remote-tracking branch 'origin/master' into filer_mongodb
# Conflicts:
# go.mod
# go.sum
# weed/server/filer_server.go
Diffstat (limited to 'weed/util')
| -rw-r--r-- | weed/util/bytes.go | 7 | ||||
| -rw-r--r-- | weed/util/chunk_cache/chunk_cache.go | 113 | ||||
| -rw-r--r-- | weed/util/chunk_cache/chunk_cache_in_memory.go | 36 | ||||
| -rw-r--r-- | weed/util/chunk_cache/chunk_cache_on_disk.go | 145 | ||||
| -rw-r--r-- | weed/util/chunk_cache/chunk_cache_on_disk_test.go | 59 | ||||
| -rw-r--r-- | weed/util/chunk_cache/on_disk_cache_layer.go | 89 | ||||
| -rw-r--r-- | weed/util/config.go | 3 | ||||
| -rw-r--r-- | weed/util/constants.go | 2 | ||||
| -rw-r--r-- | weed/util/http_util.go | 2 | ||||
| -rw-r--r-- | weed/util/log_buffer/log_buffer.go | 220 | ||||
| -rw-r--r-- | weed/util/log_buffer/sealed_buffer.go | 40 | ||||
| -rw-r--r-- | weed/util/network.go | 25 | ||||
| -rw-r--r-- | weed/util/parse.go | 16 |
13 files changed, 754 insertions, 3 deletions
diff --git a/weed/util/bytes.go b/weed/util/bytes.go index d9e462693..d72d199f8 100644 --- a/weed/util/bytes.go +++ b/weed/util/bytes.go @@ -2,6 +2,7 @@ package util import ( "crypto/md5" + "fmt" "io" ) @@ -91,3 +92,9 @@ func HashToInt32(data []byte) (v int32) { return } + +func Md5(data []byte) string { + hash := md5.New() + hash.Write(data) + return fmt.Sprintf("%x", hash.Sum(nil)) +} diff --git a/weed/util/chunk_cache/chunk_cache.go b/weed/util/chunk_cache/chunk_cache.go new file mode 100644 index 000000000..e1d4b639f --- /dev/null +++ b/weed/util/chunk_cache/chunk_cache.go @@ -0,0 +1,113 @@ +package chunk_cache + +import ( + "sync" + + "github.com/chrislusf/seaweedfs/weed/glog" + "github.com/chrislusf/seaweedfs/weed/storage/needle" +) + +const ( + memCacheSizeLimit = 1024 * 1024 + onDiskCacheSizeLimit0 = memCacheSizeLimit + onDiskCacheSizeLimit1 = 4 * memCacheSizeLimit +) + +// a global cache for recently accessed file chunks +type ChunkCache struct { + memCache *ChunkCacheInMemory + diskCaches []*OnDiskCacheLayer + sync.RWMutex +} + +func NewChunkCache(maxEntries int64, dir string, diskSizeMB int64) *ChunkCache { + + c := &ChunkCache{ + memCache: NewChunkCacheInMemory(maxEntries), + } + c.diskCaches = make([]*OnDiskCacheLayer, 3) + c.diskCaches[0] = NewOnDiskCacheLayer(dir, "c0_1", diskSizeMB/4, 4) + c.diskCaches[1] = NewOnDiskCacheLayer(dir, "c1_4", diskSizeMB/4, 4) + c.diskCaches[2] = NewOnDiskCacheLayer(dir, "cache", diskSizeMB/2, 4) + + return c +} + +func (c *ChunkCache) GetChunk(fileId string, chunkSize uint64) (data []byte) { + if c == nil { + return + } + + c.RLock() + defer c.RUnlock() + + return c.doGetChunk(fileId, chunkSize) +} + +func (c *ChunkCache) doGetChunk(fileId string, chunkSize uint64) (data []byte) { + + if chunkSize < memCacheSizeLimit { + if data = c.memCache.GetChunk(fileId); data != nil { + return data + } + } + + fid, err := needle.ParseFileIdFromString(fileId) + if err != nil { + glog.Errorf("failed to parse file id %s", fileId) + return nil + } + + for _, diskCache := range c.diskCaches { + data := diskCache.getChunk(fid.Key) + if len(data) != 0 { + return data + } + } + + return nil + +} + +func (c *ChunkCache) SetChunk(fileId string, data []byte) { + if c == nil { + return + } + c.Lock() + defer c.Unlock() + + c.doSetChunk(fileId, data) +} + +func (c *ChunkCache) doSetChunk(fileId string, data []byte) { + + if len(data) < memCacheSizeLimit { + c.memCache.SetChunk(fileId, data) + } + + fid, err := needle.ParseFileIdFromString(fileId) + if err != nil { + glog.Errorf("failed to parse file id %s", fileId) + return + } + + if len(data) < onDiskCacheSizeLimit0 { + c.diskCaches[0].setChunk(fid.Key, data) + } else if len(data) < onDiskCacheSizeLimit1 { + c.diskCaches[1].setChunk(fid.Key, data) + } else { + c.diskCaches[2].setChunk(fid.Key, data) + } + +} + +func (c *ChunkCache) Shutdown() { + if c == nil { + return + } + c.Lock() + defer c.Unlock() + for _, diskCache := range c.diskCaches { + diskCache.shutdown() + } +} diff --git a/weed/util/chunk_cache/chunk_cache_in_memory.go b/weed/util/chunk_cache/chunk_cache_in_memory.go new file mode 100644 index 000000000..931e45e9a --- /dev/null +++ b/weed/util/chunk_cache/chunk_cache_in_memory.go @@ -0,0 +1,36 @@ +package chunk_cache + +import ( + "time" + + "github.com/karlseguin/ccache" +) + +// a global cache for recently accessed file chunks +type ChunkCacheInMemory struct { + cache *ccache.Cache +} + +func NewChunkCacheInMemory(maxEntries int64) *ChunkCacheInMemory { + pruneCount := maxEntries >> 3 + if pruneCount <= 0 { + pruneCount = 500 + } + return &ChunkCacheInMemory{ + cache: ccache.New(ccache.Configure().MaxSize(maxEntries).ItemsToPrune(uint32(pruneCount))), + } +} + +func (c *ChunkCacheInMemory) GetChunk(fileId string) []byte { + item := c.cache.Get(fileId) + if item == nil { + return nil + } + data := item.Value().([]byte) + item.Extend(time.Hour) + return data +} + +func (c *ChunkCacheInMemory) SetChunk(fileId string, data []byte) { + c.cache.Set(fileId, data, time.Hour) +} diff --git a/weed/util/chunk_cache/chunk_cache_on_disk.go b/weed/util/chunk_cache/chunk_cache_on_disk.go new file mode 100644 index 000000000..2c7ef8d39 --- /dev/null +++ b/weed/util/chunk_cache/chunk_cache_on_disk.go @@ -0,0 +1,145 @@ +package chunk_cache + +import ( + "fmt" + "os" + "time" + + "github.com/syndtr/goleveldb/leveldb/opt" + + "github.com/chrislusf/seaweedfs/weed/glog" + "github.com/chrislusf/seaweedfs/weed/storage" + "github.com/chrislusf/seaweedfs/weed/storage/backend" + "github.com/chrislusf/seaweedfs/weed/storage/types" + "github.com/chrislusf/seaweedfs/weed/util" +) + +// This implements an on disk cache +// The entries are an FIFO with a size limit + +type ChunkCacheVolume struct { + DataBackend backend.BackendStorageFile + nm storage.NeedleMapper + fileName string + smallBuffer []byte + sizeLimit int64 + lastModTime time.Time + fileSize int64 +} + +func LoadOrCreateChunkCacheVolume(fileName string, preallocate int64) (*ChunkCacheVolume, error) { + + v := &ChunkCacheVolume{ + smallBuffer: make([]byte, types.NeedlePaddingSize), + fileName: fileName, + sizeLimit: preallocate, + } + + var err error + + if exists, canRead, canWrite, modTime, fileSize := util.CheckFile(v.fileName + ".dat"); exists { + if !canRead { + return nil, fmt.Errorf("cannot read cache file %s.dat", v.fileName) + } + if !canWrite { + return nil, fmt.Errorf("cannot write cache file %s.dat", v.fileName) + } + if dataFile, err := os.OpenFile(v.fileName+".dat", os.O_RDWR|os.O_CREATE, 0644); err != nil { + return nil, fmt.Errorf("cannot create cache file %s.dat: %v", v.fileName, err) + } else { + v.DataBackend = backend.NewDiskFile(dataFile) + v.lastModTime = modTime + v.fileSize = fileSize + } + } else { + if v.DataBackend, err = backend.CreateVolumeFile(v.fileName+".dat", preallocate, 0); err != nil { + return nil, fmt.Errorf("cannot create cache file %s.dat: %v", v.fileName, err) + } + v.lastModTime = time.Now() + } + + var indexFile *os.File + if indexFile, err = os.OpenFile(v.fileName+".idx", os.O_RDWR|os.O_CREATE, 0644); err != nil { + return nil, fmt.Errorf("cannot write cache index %s.idx: %v", v.fileName, err) + } + + glog.V(0).Infoln("loading leveldb", v.fileName+".ldb") + opts := &opt.Options{ + BlockCacheCapacity: 2 * 1024 * 1024, // default value is 8MiB + WriteBuffer: 1 * 1024 * 1024, // default value is 4MiB + CompactionTableSizeMultiplier: 10, // default value is 1 + } + if v.nm, err = storage.NewLevelDbNeedleMap(v.fileName+".ldb", indexFile, opts); err != nil { + return nil, fmt.Errorf("loading leveldb %s error: %v", v.fileName+".ldb", err) + } + + return v, nil + +} + +func (v *ChunkCacheVolume) Shutdown() { + if v.DataBackend != nil { + v.DataBackend.Close() + v.DataBackend = nil + } + if v.nm != nil { + v.nm.Close() + v.nm = nil + } +} + +func (v *ChunkCacheVolume) destroy() { + v.Shutdown() + os.Remove(v.fileName + ".dat") + os.Remove(v.fileName + ".idx") + os.RemoveAll(v.fileName + ".ldb") +} + +func (v *ChunkCacheVolume) Reset() (*ChunkCacheVolume, error) { + v.destroy() + return LoadOrCreateChunkCacheVolume(v.fileName, v.sizeLimit) +} + +func (v *ChunkCacheVolume) GetNeedle(key types.NeedleId) ([]byte, error) { + + nv, ok := v.nm.Get(key) + if !ok { + return nil, storage.ErrorNotFound + } + data := make([]byte, nv.Size) + if readSize, readErr := v.DataBackend.ReadAt(data, nv.Offset.ToAcutalOffset()); readErr != nil { + return nil, fmt.Errorf("read %s.dat [%d,%d): %v", + v.fileName, nv.Offset.ToAcutalOffset(), nv.Offset.ToAcutalOffset()+int64(nv.Size), readErr) + } else { + if readSize != int(nv.Size) { + return nil, fmt.Errorf("read %d, expected %d", readSize, nv.Size) + } + } + + return data, nil +} + +func (v *ChunkCacheVolume) WriteNeedle(key types.NeedleId, data []byte) error { + + offset := v.fileSize + + written, err := v.DataBackend.WriteAt(data, offset) + if err != nil { + return err + } else if written != len(data) { + return fmt.Errorf("partial written %d, expected %d", written, len(data)) + } + + v.fileSize += int64(written) + extraSize := written % types.NeedlePaddingSize + if extraSize != 0 { + v.DataBackend.WriteAt(v.smallBuffer[:types.NeedlePaddingSize-extraSize], offset+int64(written)) + v.fileSize += int64(types.NeedlePaddingSize - extraSize) + } + + if err := v.nm.Put(key, types.ToOffset(offset), uint32(len(data))); err != nil { + glog.V(4).Infof("failed to save in needle map %d: %v", key, err) + } + + return nil +} diff --git a/weed/util/chunk_cache/chunk_cache_on_disk_test.go b/weed/util/chunk_cache/chunk_cache_on_disk_test.go new file mode 100644 index 000000000..f061f2ba2 --- /dev/null +++ b/weed/util/chunk_cache/chunk_cache_on_disk_test.go @@ -0,0 +1,59 @@ +package chunk_cache + +import ( + "bytes" + "fmt" + "io/ioutil" + "math/rand" + "os" + "testing" +) + +func TestOnDisk(t *testing.T) { + + tmpDir, _ := ioutil.TempDir("", "c") + defer os.RemoveAll(tmpDir) + + totalDiskSizeMb := int64(32) + + cache := NewChunkCache(0, tmpDir, totalDiskSizeMb) + + writeCount := 5 + type test_data struct { + data []byte + fileId string + size uint64 + } + testData := make([]*test_data, writeCount) + for i := 0; i < writeCount; i++ { + buff := make([]byte, 1024*1024) + rand.Read(buff) + testData[i] = &test_data{ + data: buff, + fileId: fmt.Sprintf("1,%daabbccdd", i+1), + size: uint64(len(buff)), + } + cache.SetChunk(testData[i].fileId, testData[i].data) + } + + for i := 0; i < writeCount; i++ { + data := cache.GetChunk(testData[i].fileId, testData[i].size) + if bytes.Compare(data, testData[i].data) != 0 { + t.Errorf("failed to write to and read from cache: %d", i) + } + } + + cache.Shutdown() + + cache = NewChunkCache(0, tmpDir, totalDiskSizeMb) + + for i := 0; i < writeCount; i++ { + data := cache.GetChunk(testData[i].fileId, testData[i].size) + if bytes.Compare(data, testData[i].data) != 0 { + t.Errorf("failed to write to and read from cache: %d", i) + } + } + + cache.Shutdown() + +} diff --git a/weed/util/chunk_cache/on_disk_cache_layer.go b/weed/util/chunk_cache/on_disk_cache_layer.go new file mode 100644 index 000000000..9bd9c2b44 --- /dev/null +++ b/weed/util/chunk_cache/on_disk_cache_layer.go @@ -0,0 +1,89 @@ +package chunk_cache + +import ( + "fmt" + "path" + "sort" + + "github.com/chrislusf/seaweedfs/weed/glog" + "github.com/chrislusf/seaweedfs/weed/storage" + "github.com/chrislusf/seaweedfs/weed/storage/types" +) + +type OnDiskCacheLayer struct { + diskCaches []*ChunkCacheVolume +} + +func NewOnDiskCacheLayer(dir, namePrefix string, diskSizeMB int64, segmentCount int) *OnDiskCacheLayer{ + + volumeCount, volumeSize := int(diskSizeMB/30000), int64(30000) + if volumeCount < segmentCount { + volumeCount, volumeSize = segmentCount, diskSizeMB/int64(segmentCount) + } + + c := &OnDiskCacheLayer{} + for i := 0; i < volumeCount; i++ { + fileName := path.Join(dir, fmt.Sprintf("%s_%d", namePrefix, i)) + diskCache, err := LoadOrCreateChunkCacheVolume(fileName, volumeSize*1024*1024) + if err != nil { + glog.Errorf("failed to add cache %s : %v", fileName, err) + } else { + c.diskCaches = append(c.diskCaches, diskCache) + } + } + + // keep newest cache to the front + sort.Slice(c.diskCaches, func(i, j int) bool { + return c.diskCaches[i].lastModTime.After(c.diskCaches[j].lastModTime) + }) + + return c +} + +func (c *OnDiskCacheLayer) setChunk(needleId types.NeedleId, data []byte) { + + if c.diskCaches[0].fileSize+int64(len(data)) > c.diskCaches[0].sizeLimit { + t, resetErr := c.diskCaches[len(c.diskCaches)-1].Reset() + if resetErr != nil { + glog.Errorf("failed to reset cache file %s", c.diskCaches[len(c.diskCaches)-1].fileName) + return + } + for i := len(c.diskCaches) - 1; i > 0; i-- { + c.diskCaches[i] = c.diskCaches[i-1] + } + c.diskCaches[0] = t + } + + c.diskCaches[0].WriteNeedle(needleId, data) + +} + +func (c *OnDiskCacheLayer) getChunk(needleId types.NeedleId) (data []byte){ + + var err error + + for _, diskCache := range c.diskCaches { + data, err = diskCache.GetNeedle(needleId) + if err == storage.ErrorNotFound { + continue + } + if err != nil { + glog.Errorf("failed to read cache file %s id %d", diskCache.fileName, needleId) + continue + } + if len(data) != 0 { + return + } + } + + return nil + +} + +func (c *OnDiskCacheLayer) shutdown(){ + + for _, diskCache := range c.diskCaches { + diskCache.Shutdown() + } + +} diff --git a/weed/util/config.go b/weed/util/config.go index 33809d44d..7b6e92f08 100644 --- a/weed/util/config.go +++ b/weed/util/config.go @@ -42,7 +42,8 @@ func LoadConfiguration(configFileName string, required bool) (loaded bool) { } func GetViper() *viper.Viper { - v := viper.GetViper() + v := &viper.Viper{} + *v = *viper.GetViper() v.AutomaticEnv() v.SetEnvPrefix("weed") v.SetEnvKeyReplacer(strings.NewReplacer(".", "_")) diff --git a/weed/util/constants.go b/weed/util/constants.go index 6d1498164..e5b512524 100644 --- a/weed/util/constants.go +++ b/weed/util/constants.go @@ -5,5 +5,5 @@ import ( ) var ( - VERSION = fmt.Sprintf("%s %d.%d", sizeLimit, 1, 71) + VERSION = fmt.Sprintf("%s %d.%d", sizeLimit, 1, 74) ) diff --git a/weed/util/http_util.go b/weed/util/http_util.go index 4b1a7b895..5df79a7be 100644 --- a/weed/util/http_util.go +++ b/weed/util/http_util.go @@ -117,7 +117,7 @@ func Delete(url string, jwt string) error { return nil } m := make(map[string]interface{}) - if e := json.Unmarshal(body, m); e == nil { + if e := json.Unmarshal(body, &m); e == nil { if s, ok := m["error"].(string); ok { return errors.New(s) } diff --git a/weed/util/log_buffer/log_buffer.go b/weed/util/log_buffer/log_buffer.go new file mode 100644 index 000000000..c7cb90549 --- /dev/null +++ b/weed/util/log_buffer/log_buffer.go @@ -0,0 +1,220 @@ +package log_buffer + +import ( + "sync" + "time" + + "github.com/golang/protobuf/proto" + + "github.com/chrislusf/seaweedfs/weed/glog" + "github.com/chrislusf/seaweedfs/weed/pb/filer_pb" + "github.com/chrislusf/seaweedfs/weed/util" +) + +const BufferSize = 4 * 1024 * 1024 +const PreviousBufferCount = 3 + +type dataToFlush struct { + startTime time.Time + stopTime time.Time + data []byte +} + +type LogBuffer struct { + prevBuffers *SealedBuffers + buf []byte + idx []int + pos int + startTime time.Time + stopTime time.Time + sizeBuf []byte + flushInterval time.Duration + flushFn func(startTime, stopTime time.Time, buf []byte) + notifyFn func() + isStopping bool + flushChan chan *dataToFlush + sync.RWMutex +} + +func NewLogBuffer(flushInterval time.Duration, flushFn func(startTime, stopTime time.Time, buf []byte), notifyFn func()) *LogBuffer { + lb := &LogBuffer{ + prevBuffers: newSealedBuffers(PreviousBufferCount), + buf: make([]byte, BufferSize), + sizeBuf: make([]byte, 4), + flushInterval: flushInterval, + flushFn: flushFn, + notifyFn: notifyFn, + flushChan: make(chan *dataToFlush, 256), + } + go lb.loopFlush() + go lb.loopInterval() + return lb +} + +func (m *LogBuffer) AddToBuffer(partitionKey, data []byte) { + + m.Lock() + defer func() { + m.Unlock() + if m.notifyFn != nil { + m.notifyFn() + } + }() + + // need to put the timestamp inside the lock + ts := time.Now() + logEntry := &filer_pb.LogEntry{ + TsNs: ts.UnixNano(), + PartitionKeyHash: util.HashToInt32(partitionKey), + Data: data, + } + + logEntryData, _ := proto.Marshal(logEntry) + + size := len(logEntryData) + + if m.pos == 0 { + m.startTime = ts + } + + if m.startTime.Add(m.flushInterval).Before(ts) || len(m.buf)-m.pos < size+4 { + m.flushChan <- m.copyToFlush() + m.startTime = ts + if len(m.buf) < size+4 { + m.buf = make([]byte, 2*size+4) + } + } + m.stopTime = ts + + m.idx = append(m.idx, m.pos) + util.Uint32toBytes(m.sizeBuf, uint32(size)) + copy(m.buf[m.pos:m.pos+4], m.sizeBuf) + copy(m.buf[m.pos+4:m.pos+4+size], logEntryData) + m.pos += size + 4 +} + +func (m *LogBuffer) Shutdown() { + if m.isStopping { + return + } + m.isStopping = true + m.Lock() + toFlush := m.copyToFlush() + m.Unlock() + m.flushChan <- toFlush + close(m.flushChan) +} + +func (m *LogBuffer) loopFlush() { + for d := range m.flushChan { + if d != nil { + m.flushFn(d.startTime, d.stopTime, d.data) + } + } +} + +func (m *LogBuffer) loopInterval() { + for !m.isStopping { + m.Lock() + toFlush := m.copyToFlush() + m.Unlock() + m.flushChan <- toFlush + time.Sleep(m.flushInterval) + } +} + +func (m *LogBuffer) copyToFlush() *dataToFlush { + + if m.flushFn != nil && m.pos > 0 { + // fmt.Printf("flush buffer %d pos %d empty space %d\n", len(m.buf), m.pos, len(m.buf)-m.pos) + d := &dataToFlush{ + startTime: m.startTime, + stopTime: m.stopTime, + data: copiedBytes(m.buf[:m.pos]), + } + m.buf = m.prevBuffers.SealBuffer(m.startTime, m.stopTime, m.buf) + m.pos = 0 + m.idx = m.idx[:0] + return d + } + return nil +} + +func (m *LogBuffer) ReadFromBuffer(lastReadTime time.Time) (ts time.Time, bufferCopy []byte) { + m.RLock() + defer m.RUnlock() + + // fmt.Printf("read from buffer: %v\n", lastReadTime) + + if lastReadTime.Equal(m.stopTime) { + return lastReadTime, nil + } + if lastReadTime.After(m.stopTime) { + // glog.Fatalf("unexpected last read time %v, older than latest %v", lastReadTime, m.stopTime) + return lastReadTime, nil + } + if lastReadTime.Before(m.startTime) { + return m.stopTime, copiedBytes(m.buf[:m.pos]) + } + + lastTs := lastReadTime.UnixNano() + l, h := 0, len(m.idx)-1 + + /* + for i, pos := range m.idx { + logEntry, ts := readTs(m.buf, pos) + event := &filer_pb.SubscribeMetadataResponse{} + proto.Unmarshal(logEntry.Data, event) + entry := event.EventNotification.OldEntry + if entry == nil { + entry = event.EventNotification.NewEntry + } + fmt.Printf("entry %d ts: %v offset:%d dir:%s name:%s\n", i, time.Unix(0, ts), pos, event.Directory, entry.Name) + } + fmt.Printf("l=%d, h=%d\n", l, h) + */ + + for l <= h { + mid := (l + h) / 2 + pos := m.idx[mid] + _, t := readTs(m.buf, m.idx[mid]) + if t <= lastTs { + l = mid + 1 + } else if lastTs < t { + var prevT int64 + if mid > 0 { + _, prevT = readTs(m.buf, m.idx[mid-1]) + } + if prevT <= lastTs { + // println("found mid = ", mid) + return time.Unix(0, t), copiedBytes(m.buf[pos:m.pos]) + } + h = mid - 1 + } + // fmt.Printf("l=%d, h=%d\n", l, h) + } + + // FIXME: this could be that the buffer has been flushed already + // println("not found") + return lastReadTime, nil + +} +func copiedBytes(buf []byte) (copied []byte) { + copied = make([]byte, len(buf)) + copy(copied, buf) + return +} + +func readTs(buf []byte, pos int) (*filer_pb.LogEntry, int64) { + + size := util.BytesToUint32(buf[pos : pos+4]) + entryData := buf[pos+4 : pos+4+int(size)] + logEntry := &filer_pb.LogEntry{} + + err := proto.Unmarshal(entryData, logEntry) + if err != nil { + glog.Fatalf("unexpected unmarshal filer_pb.LogEntry: %v", err) + } + return logEntry, logEntry.TsNs + +} diff --git a/weed/util/log_buffer/sealed_buffer.go b/weed/util/log_buffer/sealed_buffer.go new file mode 100644 index 000000000..c5160fad0 --- /dev/null +++ b/weed/util/log_buffer/sealed_buffer.go @@ -0,0 +1,40 @@ +package log_buffer + +import "time" + +type MemBuffer struct { + buf []byte + startTime time.Time + stopTime time.Time +} + +type SealedBuffers struct { + buffers []*MemBuffer +} + +func newSealedBuffers(size int) *SealedBuffers { + sbs := &SealedBuffers{} + + sbs.buffers = make([]*MemBuffer, size) + for i := 0; i < size; i++ { + sbs.buffers[i] = &MemBuffer{ + buf: make([]byte, BufferSize), + } + } + + return sbs +} + +func (sbs *SealedBuffers) SealBuffer(startTime, stopTime time.Time, buf []byte) (newBuf []byte) { + oldMemBuffer := sbs.buffers[0] + size := len(sbs.buffers) + for i := 0; i < size-1; i++ { + sbs.buffers[i].buf = sbs.buffers[i+1].buf + sbs.buffers[i].startTime = sbs.buffers[i+1].startTime + sbs.buffers[i].stopTime = sbs.buffers[i+1].stopTime + } + sbs.buffers[size-1].buf = buf + sbs.buffers[size-1].startTime = startTime + sbs.buffers[size-1].stopTime = stopTime + return oldMemBuffer.buf +} diff --git a/weed/util/network.go b/weed/util/network.go new file mode 100644 index 000000000..7108cfea6 --- /dev/null +++ b/weed/util/network.go @@ -0,0 +1,25 @@ +package util + +import ( + "net" + + "github.com/chrislusf/seaweedfs/weed/glog" +) + +func DetectedHostAddress() string { + addrs, err := net.InterfaceAddrs() + if err != nil { + glog.V(0).Infof("failed to detect ip address: %v", err) + return "" + } + + for _, a := range addrs { + if ipnet, ok := a.(*net.IPNet); ok && !ipnet.IP.IsLoopback() { + if ipnet.IP.To4() != nil { + return ipnet.IP.String() + } + } + } + + return "localhost" +} diff --git a/weed/util/parse.go b/weed/util/parse.go index 6593d43b6..0955db682 100644 --- a/weed/util/parse.go +++ b/weed/util/parse.go @@ -1,6 +1,7 @@ package util import ( + "fmt" "net/url" "strconv" "strings" @@ -45,3 +46,18 @@ func ParseFilerUrl(entryPath string) (filerServer string, filerPort int64, path path = u.Path return } + +func ParseHostPort(hostPort string) (filerServer string, filerPort int64, err error) { + parts := strings.Split(hostPort, ":") + if len(parts) != 2 { + err = fmt.Errorf("failed to parse %s\n", hostPort) + return + } + + filerPort, err = strconv.ParseInt(parts[1], 10, 64) + if err == nil { + filerServer = parts[0] + } + + return +} |
