diff options
| author | ustuzhanin <55892859+ustuzhanin@users.noreply.github.com> | 2020-10-02 22:47:25 +0500 |
|---|---|---|
| committer | GitHub <noreply@github.com> | 2020-10-02 22:47:25 +0500 |
| commit | 3e0a79ef050dba9e5347d20537ef562cc4b30b62 (patch) | |
| tree | e0b42e531d18136d9e272258187a305690ee2b4d /weed/util | |
| parent | cbd80253e33688f55c02dd29c994a3ee6eac3d6c (diff) | |
| parent | 9ab98fa912814686b3035a97b5173c1628fbc0fc (diff) | |
| download | seaweedfs-3e0a79ef050dba9e5347d20537ef562cc4b30b62.tar.xz seaweedfs-3e0a79ef050dba9e5347d20537ef562cc4b30b62.zip | |
Merge pull request #1 from chrislusf/master
Merge upstream
Diffstat (limited to 'weed/util')
| -rw-r--r-- | weed/util/bounded_tree/bounded_tree.go | 5 | ||||
| -rw-r--r-- | weed/util/bytes.go | 38 | ||||
| -rw-r--r-- | weed/util/chunk_cache/chunk_cache.go | 63 | ||||
| -rw-r--r-- | weed/util/chunk_cache/chunk_cache_on_disk.go | 4 | ||||
| -rw-r--r-- | weed/util/chunk_cache/chunk_cache_on_disk_test.go | 51 | ||||
| -rw-r--r-- | weed/util/chunk_cache/on_disk_cache_layer.go | 8 | ||||
| -rw-r--r-- | weed/util/compression.go | 37 | ||||
| -rw-r--r-- | weed/util/constants.go | 2 | ||||
| -rw-r--r-- | weed/util/fullpath.go | 2 | ||||
| -rw-r--r-- | weed/util/http_util.go | 5 | ||||
| -rw-r--r-- | weed/util/log_buffer/log_buffer.go | 23 | ||||
| -rw-r--r-- | weed/util/log_buffer/log_buffer_test.go | 2 | ||||
| -rw-r--r-- | weed/util/log_buffer/log_read.go | 20 |
13 files changed, 193 insertions, 67 deletions
diff --git a/weed/util/bounded_tree/bounded_tree.go b/weed/util/bounded_tree/bounded_tree.go index 40b9c4e47..0e023c0d1 100644 --- a/weed/util/bounded_tree/bounded_tree.go +++ b/weed/util/bounded_tree/bounded_tree.go @@ -15,7 +15,7 @@ type Node struct { type BoundedTree struct { root *Node - sync.Mutex + sync.RWMutex } func NewBoundedTree() *BoundedTree { @@ -131,6 +131,9 @@ func (n *Node) getChild(childName string) *Node { func (t *BoundedTree) HasVisited(p util.FullPath) bool { + t.RLock() + defer t.RUnlock() + if t.root == nil { return true } diff --git a/weed/util/bytes.go b/weed/util/bytes.go index 0650919c0..67e6876fa 100644 --- a/weed/util/bytes.go +++ b/weed/util/bytes.go @@ -2,6 +2,8 @@ package util import ( "crypto/md5" + "crypto/rand" + "encoding/base64" "fmt" "io" ) @@ -109,8 +111,40 @@ func HashToInt32(data []byte) (v int32) { return } -func Md5(data []byte) string { +func Base64Encode(data []byte) string { + return base64.StdEncoding.EncodeToString(data) +} + +func Base64Md5(data []byte) string { + return Base64Encode(Md5(data)) +} + +func Md5(data []byte) []byte { hash := md5.New() hash.Write(data) - return fmt.Sprintf("%x", hash.Sum(nil)) + return hash.Sum(nil) +} + +func Md5String(data []byte) string { + return fmt.Sprintf("%x", Md5(data)) +} + +func Base64Md5ToBytes(contentMd5 string) []byte { + data, err := base64.StdEncoding.DecodeString(contentMd5) + if err != nil { + return nil + } + return data +} + +func RandomInt32() int32 { + buf := make([]byte, 4) + rand.Read(buf) + return int32(BytesToUint32(buf)) +} + +func RandomBytes(byteCount int) []byte { + buf := make([]byte, byteCount) + rand.Read(buf) + return buf } diff --git a/weed/util/chunk_cache/chunk_cache.go b/weed/util/chunk_cache/chunk_cache.go index 17b64fb6c..608d605b1 100644 --- a/weed/util/chunk_cache/chunk_cache.go +++ b/weed/util/chunk_cache/chunk_cache.go @@ -7,33 +7,38 @@ import ( "github.com/chrislusf/seaweedfs/weed/storage/needle" ) -const ( - memCacheSizeLimit = 1024 * 1024 - onDiskCacheSizeLimit0 = memCacheSizeLimit - onDiskCacheSizeLimit1 = 4 * memCacheSizeLimit -) +type ChunkCache interface { + GetChunk(fileId string, minSize uint64) (data []byte) + SetChunk(fileId string, data []byte) +} // a global cache for recently accessed file chunks -type ChunkCache struct { +type TieredChunkCache struct { memCache *ChunkCacheInMemory diskCaches []*OnDiskCacheLayer sync.RWMutex + onDiskCacheSizeLimit0 uint64 + onDiskCacheSizeLimit1 uint64 + onDiskCacheSizeLimit2 uint64 } -func NewChunkCache(maxEntries int64, dir string, diskSizeMB int64) *ChunkCache { +func NewTieredChunkCache(maxEntries int64, dir string, diskSizeInUnit int64, unitSize int64) *TieredChunkCache { - c := &ChunkCache{ + c := &TieredChunkCache{ memCache: NewChunkCacheInMemory(maxEntries), } c.diskCaches = make([]*OnDiskCacheLayer, 3) - c.diskCaches[0] = NewOnDiskCacheLayer(dir, "c0_1", diskSizeMB/4, 4) - c.diskCaches[1] = NewOnDiskCacheLayer(dir, "c1_4", diskSizeMB/4, 4) - c.diskCaches[2] = NewOnDiskCacheLayer(dir, "cache", diskSizeMB/2, 4) + c.onDiskCacheSizeLimit0 = uint64(unitSize) + c.onDiskCacheSizeLimit1 = 4 * c.onDiskCacheSizeLimit0 + c.onDiskCacheSizeLimit2 = 2 * c.onDiskCacheSizeLimit1 + c.diskCaches[0] = NewOnDiskCacheLayer(dir, "c0_2", diskSizeInUnit*unitSize/8, 2) + c.diskCaches[1] = NewOnDiskCacheLayer(dir, "c1_3", diskSizeInUnit*unitSize/4+diskSizeInUnit*unitSize/8, 3) + c.diskCaches[2] = NewOnDiskCacheLayer(dir, "c2_2", diskSizeInUnit*unitSize/2, 2) return c } -func (c *ChunkCache) GetChunk(fileId string, chunkSize uint64) (data []byte) { +func (c *TieredChunkCache) GetChunk(fileId string, minSize uint64) (data []byte) { if c == nil { return } @@ -41,14 +46,14 @@ func (c *ChunkCache) GetChunk(fileId string, chunkSize uint64) (data []byte) { c.RLock() defer c.RUnlock() - return c.doGetChunk(fileId, chunkSize) + return c.doGetChunk(fileId, minSize) } -func (c *ChunkCache) doGetChunk(fileId string, chunkSize uint64) (data []byte) { +func (c *TieredChunkCache) doGetChunk(fileId string, minSize uint64) (data []byte) { - if chunkSize < memCacheSizeLimit { + if minSize <= c.onDiskCacheSizeLimit0 { data = c.memCache.GetChunk(fileId) - if len(data) >= int(chunkSize) { + if len(data) >= int(minSize) { return data } } @@ -59,21 +64,21 @@ func (c *ChunkCache) doGetChunk(fileId string, chunkSize uint64) (data []byte) { return nil } - if chunkSize < onDiskCacheSizeLimit0 { + if minSize <= c.onDiskCacheSizeLimit0 { data = c.diskCaches[0].getChunk(fid.Key) - if len(data) >= int(chunkSize) { + if len(data) >= int(minSize) { return data } } - if chunkSize < onDiskCacheSizeLimit1 { + if minSize <= c.onDiskCacheSizeLimit1 { data = c.diskCaches[1].getChunk(fid.Key) - if len(data) >= int(chunkSize) { + if len(data) >= int(minSize) { return data } } - { + if minSize <= c.onDiskCacheSizeLimit2 { data = c.diskCaches[2].getChunk(fid.Key) - if len(data) >= int(chunkSize) { + if len(data) >= int(minSize) { return data } } @@ -82,7 +87,7 @@ func (c *ChunkCache) doGetChunk(fileId string, chunkSize uint64) (data []byte) { } -func (c *ChunkCache) SetChunk(fileId string, data []byte) { +func (c *TieredChunkCache) SetChunk(fileId string, data []byte) { if c == nil { return } @@ -94,9 +99,9 @@ func (c *ChunkCache) SetChunk(fileId string, data []byte) { c.doSetChunk(fileId, data) } -func (c *ChunkCache) doSetChunk(fileId string, data []byte) { +func (c *TieredChunkCache) doSetChunk(fileId string, data []byte) { - if len(data) < memCacheSizeLimit { + if len(data) <= int(c.onDiskCacheSizeLimit0) { c.memCache.SetChunk(fileId, data) } @@ -106,17 +111,17 @@ func (c *ChunkCache) doSetChunk(fileId string, data []byte) { return } - if len(data) < onDiskCacheSizeLimit0 { + if len(data) <= int(c.onDiskCacheSizeLimit0) { c.diskCaches[0].setChunk(fid.Key, data) - } else if len(data) < onDiskCacheSizeLimit1 { + } else if len(data) <= int(c.onDiskCacheSizeLimit1) { c.diskCaches[1].setChunk(fid.Key, data) - } else { + } else if len(data) <= int(c.onDiskCacheSizeLimit2) { c.diskCaches[2].setChunk(fid.Key, data) } } -func (c *ChunkCache) Shutdown() { +func (c *TieredChunkCache) Shutdown() { if c == nil { return } diff --git a/weed/util/chunk_cache/chunk_cache_on_disk.go b/weed/util/chunk_cache/chunk_cache_on_disk.go index d74f87b0c..356dfe188 100644 --- a/weed/util/chunk_cache/chunk_cache_on_disk.go +++ b/weed/util/chunk_cache/chunk_cache_on_disk.go @@ -63,7 +63,7 @@ func LoadOrCreateChunkCacheVolume(fileName string, preallocate int64) (*ChunkCac return nil, fmt.Errorf("cannot write cache index %s.idx: %v", v.fileName, err) } - glog.V(0).Infoln("loading leveldb", v.fileName+".ldb") + glog.V(1).Infoln("loading leveldb", v.fileName+".ldb") opts := &opt.Options{ BlockCacheCapacity: 2 * 1024 * 1024, // default value is 8MiB WriteBuffer: 1 * 1024 * 1024, // default value is 4MiB @@ -137,7 +137,7 @@ func (v *ChunkCacheVolume) WriteNeedle(key types.NeedleId, data []byte) error { v.fileSize += int64(types.NeedlePaddingSize - extraSize) } - if err := v.nm.Put(key, types.ToOffset(offset), uint32(len(data))); err != nil { + if err := v.nm.Put(key, types.ToOffset(offset), types.Size(len(data))); err != nil { return err } diff --git a/weed/util/chunk_cache/chunk_cache_on_disk_test.go b/weed/util/chunk_cache/chunk_cache_on_disk_test.go index f061f2ba2..f8325276e 100644 --- a/weed/util/chunk_cache/chunk_cache_on_disk_test.go +++ b/weed/util/chunk_cache/chunk_cache_on_disk_test.go @@ -14,9 +14,9 @@ func TestOnDisk(t *testing.T) { tmpDir, _ := ioutil.TempDir("", "c") defer os.RemoveAll(tmpDir) - totalDiskSizeMb := int64(32) + totalDiskSizeInKB := int64(32) - cache := NewChunkCache(0, tmpDir, totalDiskSizeMb) + cache := NewTieredChunkCache(2, tmpDir, totalDiskSizeInKB, 1024) writeCount := 5 type test_data struct { @@ -26,7 +26,7 @@ func TestOnDisk(t *testing.T) { } testData := make([]*test_data, writeCount) for i := 0; i < writeCount; i++ { - buff := make([]byte, 1024*1024) + buff := make([]byte, 1024) rand.Read(buff) testData[i] = &test_data{ data: buff, @@ -34,9 +34,22 @@ func TestOnDisk(t *testing.T) { size: uint64(len(buff)), } cache.SetChunk(testData[i].fileId, testData[i].data) + + // read back right after write + data := cache.GetChunk(testData[i].fileId, testData[i].size) + if bytes.Compare(data, testData[i].data) != 0 { + t.Errorf("failed to write to and read from cache: %d", i) + } } - for i := 0; i < writeCount; i++ { + for i := 0; i < 2; i++ { + data := cache.GetChunk(testData[i].fileId, testData[i].size) + if bytes.Compare(data, testData[i].data) == 0 { + t.Errorf("old cache should have been purged: %d", i) + } + } + + for i := 2; i < writeCount; i++ { data := cache.GetChunk(testData[i].fileId, testData[i].size) if bytes.Compare(data, testData[i].data) != 0 { t.Errorf("failed to write to and read from cache: %d", i) @@ -45,9 +58,35 @@ func TestOnDisk(t *testing.T) { cache.Shutdown() - cache = NewChunkCache(0, tmpDir, totalDiskSizeMb) + cache = NewTieredChunkCache(2, tmpDir, totalDiskSizeInKB, 1024) - for i := 0; i < writeCount; i++ { + for i := 0; i < 2; i++ { + data := cache.GetChunk(testData[i].fileId, testData[i].size) + if bytes.Compare(data, testData[i].data) == 0 { + t.Errorf("old cache should have been purged: %d", i) + } + } + + for i := 2; i < writeCount; i++ { + if i == 4 { + // FIXME this failed many times on build machines + /* + I0928 06:04:12 10979 volume_create_linux.go:19] Preallocated 2048 bytes disk space for /tmp/c578652251/c0_2_0.dat + I0928 06:04:12 10979 volume_create_linux.go:19] Preallocated 2048 bytes disk space for /tmp/c578652251/c0_2_1.dat + I0928 06:04:12 10979 volume_create_linux.go:19] Preallocated 4096 bytes disk space for /tmp/c578652251/c1_3_0.dat + I0928 06:04:12 10979 volume_create_linux.go:19] Preallocated 4096 bytes disk space for /tmp/c578652251/c1_3_1.dat + I0928 06:04:12 10979 volume_create_linux.go:19] Preallocated 4096 bytes disk space for /tmp/c578652251/c1_3_2.dat + I0928 06:04:12 10979 volume_create_linux.go:19] Preallocated 8192 bytes disk space for /tmp/c578652251/c2_2_0.dat + I0928 06:04:12 10979 volume_create_linux.go:19] Preallocated 8192 bytes disk space for /tmp/c578652251/c2_2_1.dat + I0928 06:04:12 10979 volume_create_linux.go:19] Preallocated 2048 bytes disk space for /tmp/c578652251/c0_2_0.dat + I0928 06:04:12 10979 volume_create_linux.go:19] Preallocated 2048 bytes disk space for /tmp/c578652251/c0_2_1.dat + --- FAIL: TestOnDisk (0.19s) + chunk_cache_on_disk_test.go:73: failed to write to and read from cache: 4 + FAIL + FAIL github.com/chrislusf/seaweedfs/weed/util/chunk_cache 0.199s + */ + continue + } data := cache.GetChunk(testData[i].fileId, testData[i].size) if bytes.Compare(data, testData[i].data) != 0 { t.Errorf("failed to write to and read from cache: %d", i) diff --git a/weed/util/chunk_cache/on_disk_cache_layer.go b/weed/util/chunk_cache/on_disk_cache_layer.go index c3192b548..eebd89798 100644 --- a/weed/util/chunk_cache/on_disk_cache_layer.go +++ b/weed/util/chunk_cache/on_disk_cache_layer.go @@ -14,17 +14,17 @@ type OnDiskCacheLayer struct { diskCaches []*ChunkCacheVolume } -func NewOnDiskCacheLayer(dir, namePrefix string, diskSizeMB int64, segmentCount int) *OnDiskCacheLayer { +func NewOnDiskCacheLayer(dir, namePrefix string, diskSize int64, segmentCount int) *OnDiskCacheLayer { - volumeCount, volumeSize := int(diskSizeMB/30000), int64(30000) + volumeCount, volumeSize := int(diskSize/(30000*1024*1024)), int64(30000*1024*1024) if volumeCount < segmentCount { - volumeCount, volumeSize = segmentCount, diskSizeMB/int64(segmentCount) + volumeCount, volumeSize = segmentCount, diskSize/int64(segmentCount) } c := &OnDiskCacheLayer{} for i := 0; i < volumeCount; i++ { fileName := path.Join(dir, fmt.Sprintf("%s_%d", namePrefix, i)) - diskCache, err := LoadOrCreateChunkCacheVolume(fileName, volumeSize*1024*1024) + diskCache, err := LoadOrCreateChunkCacheVolume(fileName, volumeSize) if err != nil { glog.Errorf("failed to add cache %s : %v", fileName, err) } else { diff --git a/weed/util/compression.go b/weed/util/compression.go index 2881a7bfd..cf3ac7c57 100644 --- a/weed/util/compression.go +++ b/weed/util/compression.go @@ -12,15 +12,44 @@ import ( "github.com/klauspost/compress/zstd" ) +var ( + UnsupportedCompression = fmt.Errorf("unsupported compression") +) + +func MaybeGzipData(input []byte) []byte { + if IsGzippedContent(input) { + return input + } + gzipped, err := GzipData(input) + if err != nil { + return input + } + if len(gzipped)*10 > len(input)*9 { + return input + } + return gzipped +} + +func MaybeDecompressData(input []byte) []byte { + uncompressed, err := DecompressData(input) + if err != nil { + if err != UnsupportedCompression { + glog.Errorf("decompressed data: %v", err) + } + return input + } + return uncompressed +} + func GzipData(input []byte) ([]byte, error) { buf := new(bytes.Buffer) w, _ := gzip.NewWriterLevel(buf, flate.BestSpeed) if _, err := w.Write(input); err != nil { - glog.V(2).Infoln("error compressing data:", err) + glog.V(2).Infof("error gzip data: %v", err) return nil, err } if err := w.Close(); err != nil { - glog.V(2).Infoln("error closing compressed data:", err) + glog.V(2).Infof("error closing gzipped data: %v", err) return nil, err } return buf.Bytes(), nil @@ -39,7 +68,7 @@ func DecompressData(input []byte) ([]byte, error) { if IsZstdContent(input) { return unzstdData(input) } - return input, fmt.Errorf("unsupported compression") + return input, UnsupportedCompression } func ungzipData(input []byte) ([]byte, error) { @@ -48,7 +77,7 @@ func ungzipData(input []byte) ([]byte, error) { defer r.Close() output, err := ioutil.ReadAll(r) if err != nil { - glog.V(2).Infoln("error uncompressing data:", err) + glog.V(2).Infof("error ungzip data: %v", err) } return output, err } diff --git a/weed/util/constants.go b/weed/util/constants.go index 9f0e00506..431111fb9 100644 --- a/weed/util/constants.go +++ b/weed/util/constants.go @@ -5,7 +5,7 @@ import ( ) var ( - VERSION = fmt.Sprintf("%s %d.%d", sizeLimit, 1, 87) + VERSION = fmt.Sprintf("%s %d.%02d", sizeLimit, 2, 02) COMMIT = "" ) diff --git a/weed/util/fullpath.go b/weed/util/fullpath.go index 4ce8a2f90..f2119707e 100644 --- a/weed/util/fullpath.go +++ b/weed/util/fullpath.go @@ -13,6 +13,7 @@ func NewFullPath(dir, name string) FullPath { func (fp FullPath) DirAndName() (string, string) { dir, name := filepath.Split(string(fp)) + name = strings.ToValidUTF8(name, "?") if dir == "/" { return dir, name } @@ -24,6 +25,7 @@ func (fp FullPath) DirAndName() (string, string) { func (fp FullPath) Name() string { _, name := filepath.Split(string(fp)) + name = strings.ToValidUTF8(name, "?") return name } diff --git a/weed/util/http_util.go b/weed/util/http_util.go index 5159fcd17..eef24b930 100644 --- a/weed/util/http_util.go +++ b/weed/util/http_util.go @@ -174,7 +174,7 @@ func GetUrlStream(url string, values url.Values, readFn func(io.Reader) error) e return readFn(r.Body) } -func DownloadFile(fileUrl string) (filename string, header http.Header, rc io.ReadCloser, e error) { +func DownloadFile(fileUrl string) (filename string, header http.Header, resp *http.Response, e error) { response, err := client.Get(fileUrl) if err != nil { return "", nil, nil, err @@ -188,7 +188,7 @@ func DownloadFile(fileUrl string) (filename string, header http.Header, rc io.Re filename = strings.Trim(filename, "\"") } } - rc = response.Body + resp = response return } @@ -368,6 +368,7 @@ func ReadUrlAsReaderCloser(fileUrl string, rangeHeader string) (io.ReadCloser, e if err != nil { return nil, err } + defer CloseResponse(r) if r.StatusCode >= 400 { return nil, fmt.Errorf("%s: %s", fileUrl, r.Status) } diff --git a/weed/util/log_buffer/log_buffer.go b/weed/util/log_buffer/log_buffer.go index cb9565fb2..e4310b5c5 100644 --- a/weed/util/log_buffer/log_buffer.go +++ b/weed/util/log_buffer/log_buffer.go @@ -53,7 +53,7 @@ func NewLogBuffer(flushInterval time.Duration, flushFn func(startTime, stopTime return lb } -func (m *LogBuffer) AddToBuffer(partitionKey, data []byte) { +func (m *LogBuffer) AddToBuffer(partitionKey, data []byte, eventTsNs int64) { m.Lock() defer func() { @@ -64,16 +64,21 @@ func (m *LogBuffer) AddToBuffer(partitionKey, data []byte) { }() // need to put the timestamp inside the lock - ts := time.Now() - tsNs := ts.UnixNano() - if m.lastTsNs >= tsNs { + var ts time.Time + if eventTsNs == 0 { + ts = time.Now() + eventTsNs = ts.UnixNano() + } else { + ts = time.Unix(0, eventTsNs) + } + if m.lastTsNs >= eventTsNs { // this is unlikely to happen, but just in case - tsNs = m.lastTsNs + 1 - ts = time.Unix(0, tsNs) + eventTsNs = m.lastTsNs + 1 + ts = time.Unix(0, eventTsNs) } - m.lastTsNs = tsNs + m.lastTsNs = eventTsNs logEntry := &filer_pb.LogEntry{ - TsNs: tsNs, + TsNs: eventTsNs, PartitionKeyHash: util.HashToInt32(partitionKey), Data: data, } @@ -249,7 +254,7 @@ func (m *LogBuffer) ReadFromBuffer(lastReadTime time.Time) (bufferCopy *bytes.Bu return nil } -func (m *LogBuffer) ReleaseMeory(b *bytes.Buffer) { +func (m *LogBuffer) ReleaseMemory(b *bytes.Buffer) { bufferPool.Put(b) } diff --git a/weed/util/log_buffer/log_buffer_test.go b/weed/util/log_buffer/log_buffer_test.go index f9ccc95c2..3d77afb18 100644 --- a/weed/util/log_buffer/log_buffer_test.go +++ b/weed/util/log_buffer/log_buffer_test.go @@ -23,7 +23,7 @@ func TestNewLogBufferFirstBuffer(t *testing.T) { var buf = make([]byte, messageSize) for i := 0; i < messageCount; i++ { rand.Read(buf) - lb.AddToBuffer(nil, buf) + lb.AddToBuffer(nil, buf, 0) } receivedmessageCount := 0 diff --git a/weed/util/log_buffer/log_read.go b/weed/util/log_buffer/log_read.go index 2b73a8064..57f4b0115 100644 --- a/weed/util/log_buffer/log_read.go +++ b/weed/util/log_buffer/log_read.go @@ -2,6 +2,7 @@ package log_buffer import ( "bytes" + "fmt" "time" "github.com/golang/protobuf/proto" @@ -11,23 +12,27 @@ import ( "github.com/chrislusf/seaweedfs/weed/util" ) +var ( + ResumeError = fmt.Errorf("resume") +) + func (logBuffer *LogBuffer) LoopProcessLogData( startTreadTime time.Time, waitForDataFn func() bool, - eachLogDataFn func(logEntry *filer_pb.LogEntry) error) (err error) { + eachLogDataFn func(logEntry *filer_pb.LogEntry) error) (lastReadTime time.Time, err error) { // loop through all messages var bytesBuf *bytes.Buffer - lastReadTime := startTreadTime + lastReadTime = startTreadTime defer func() { if bytesBuf != nil { - logBuffer.ReleaseMeory(bytesBuf) + logBuffer.ReleaseMemory(bytesBuf) } }() for { if bytesBuf != nil { - logBuffer.ReleaseMeory(bytesBuf) + logBuffer.ReleaseMemory(bytesBuf) } bytesBuf = logBuffer.ReadFromBuffer(lastReadTime) // fmt.Printf("ReadFromBuffer by %v\n", lastReadTime) @@ -48,10 +53,13 @@ func (logBuffer *LogBuffer) LoopProcessLogData( for pos := 0; pos+4 < len(buf); { size := util.BytesToUint32(buf[pos : pos+4]) + if pos+4+int(size) > len(buf) { + err = ResumeError + glog.Errorf("LoopProcessLogData: read buffer %v read %d [%d,%d) from [0,%d)", lastReadTime, batchSize, pos, pos+int(size)+4, len(buf)) + return + } entryData := buf[pos+4 : pos+4+int(size)] - // fmt.Printf("read buffer read %d [%d,%d) from [0,%d)\n", batchSize, pos, pos+int(size)+4, len(buf)) - logEntry := &filer_pb.LogEntry{} if err = proto.Unmarshal(entryData, logEntry); err != nil { glog.Errorf("unexpected unmarshal messaging_pb.Message: %v", err) |
