diff options
Diffstat (limited to 'weed/util')
| -rw-r--r-- | weed/util/http_util.go | 38 | ||||
| -rw-r--r-- | weed/util/log_buffer/log_buffer.go | 209 | ||||
| -rw-r--r-- | weed/util/log_buffer/log_buffer_test.go | 2 | ||||
| -rw-r--r-- | weed/util/log_buffer/log_read.go | 45 | ||||
| -rw-r--r-- | weed/util/log_buffer/sealed_buffer.go | 5 |
5 files changed, 196 insertions, 103 deletions
diff --git a/weed/util/http_util.go b/weed/util/http_util.go index ef4b29158..d1505f673 100644 --- a/weed/util/http_util.go +++ b/weed/util/http_util.go @@ -10,6 +10,7 @@ import ( "net/http" "net/url" "strings" + "time" "github.com/seaweedfs/seaweedfs/weed/glog" ) @@ -450,3 +451,40 @@ func (r *CountingReader) Read(p []byte) (n int, err error) { r.BytesRead += n return n, err } + +func RetriedFetchChunkData(buffer []byte, urlStrings []string, cipherKey []byte, isGzipped bool, isFullChunk bool, offset int64) (n int, err error) { + + var shouldRetry bool + + for waitTime := time.Second; waitTime < RetryWaitTime; waitTime += waitTime / 2 { + for _, urlString := range urlStrings { + n = 0 + if strings.Contains(urlString, "%") { + urlString = url.PathEscape(urlString) + } + shouldRetry, err = ReadUrlAsStream(urlString+"?readDeleted=true", cipherKey, isGzipped, isFullChunk, offset, len(buffer), func(data []byte) { + if n < len(buffer) { + x := copy(buffer[n:], data) + n += x + } + }) + if !shouldRetry { + break + } + if err != nil { + glog.V(0).Infof("read %s failed, err: %v", urlString, err) + } else { + break + } + } + if err != nil && shouldRetry { + glog.V(0).Infof("retry reading in %v", waitTime) + time.Sleep(waitTime) + } else { + break + } + } + + return n, err + +} diff --git a/weed/util/log_buffer/log_buffer.go b/weed/util/log_buffer/log_buffer.go index bd124908e..dc61e44ce 100644 --- a/weed/util/log_buffer/log_buffer.go +++ b/weed/util/log_buffer/log_buffer.go @@ -22,10 +22,13 @@ type dataToFlush struct { data *bytes.Buffer } +type LogFlushFuncType func(startTime, stopTime time.Time, buf []byte) + type LogBuffer struct { name string prevBuffers *SealedBuffers buf []byte + batchIndex int64 idx []int pos int startTime time.Time @@ -33,7 +36,7 @@ type LogBuffer struct { lastFlushTime time.Time sizeBuf []byte flushInterval time.Duration - flushFn func(startTime, stopTime time.Time, buf []byte) + flushFn LogFlushFuncType notifyFn func() isStopping *atomic.Bool flushChan chan *dataToFlush @@ -41,7 +44,7 @@ type LogBuffer struct { sync.RWMutex } -func NewLogBuffer(name string, flushInterval time.Duration, flushFn func(startTime, stopTime time.Time, buf []byte), notifyFn func()) *LogBuffer { +func NewLogBuffer(name string, flushInterval time.Duration, flushFn LogFlushFuncType, notifyFn func()) *LogBuffer { lb := &LogBuffer{ name: name, prevBuffers: newSealedBuffers(PreviousBufferCount), @@ -58,17 +61,17 @@ func NewLogBuffer(name string, flushInterval time.Duration, flushFn func(startTi return lb } -func (m *LogBuffer) AddToBuffer(partitionKey, data []byte, processingTsNs int64) { +func (logBuffer *LogBuffer) AddToBuffer(partitionKey, data []byte, processingTsNs int64) { var toFlush *dataToFlush - m.Lock() + logBuffer.Lock() defer func() { - m.Unlock() + logBuffer.Unlock() if toFlush != nil { - m.flushChan <- toFlush + logBuffer.flushChan <- toFlush } - if m.notifyFn != nil { - m.notifyFn() + if logBuffer.notifyFn != nil { + logBuffer.notifyFn() } }() @@ -80,12 +83,12 @@ func (m *LogBuffer) AddToBuffer(partitionKey, data []byte, processingTsNs int64) } else { ts = time.Unix(0, processingTsNs) } - if m.lastTsNs >= processingTsNs { + if logBuffer.lastTsNs >= processingTsNs { // this is unlikely to happen, but just in case - processingTsNs = m.lastTsNs + 1 + processingTsNs = logBuffer.lastTsNs + 1 ts = time.Unix(0, processingTsNs) } - m.lastTsNs = processingTsNs + logBuffer.lastTsNs = processingTsNs logEntry := &filer_pb.LogEntry{ TsNs: processingTsNs, PartitionKeyHash: util.HashToInt32(partitionKey), @@ -96,105 +99,119 @@ func (m *LogBuffer) AddToBuffer(partitionKey, data []byte, processingTsNs int64) size := len(logEntryData) - if m.pos == 0 { - m.startTime = ts + if logBuffer.pos == 0 { + logBuffer.startTime = ts } - if m.startTime.Add(m.flushInterval).Before(ts) || len(m.buf)-m.pos < size+4 { - // glog.V(4).Infof("%s copyToFlush1 start time %v, ts %v, remaining %d bytes", m.name, m.startTime, ts, len(m.buf)-m.pos) - toFlush = m.copyToFlush() - m.startTime = ts - if len(m.buf) < size+4 { - m.buf = make([]byte, 2*size+4) + if logBuffer.startTime.Add(logBuffer.flushInterval).Before(ts) || len(logBuffer.buf)-logBuffer.pos < size+4 { + glog.V(0).Infof("%s copyToFlush1 batch:%d count:%d start time %v, ts %v, remaining %d bytes", logBuffer.name, logBuffer.batchIndex, len(logBuffer.idx), logBuffer.startTime, ts, len(logBuffer.buf)-logBuffer.pos) + toFlush = logBuffer.copyToFlush() + logBuffer.startTime = ts + if len(logBuffer.buf) < size+4 { + logBuffer.buf = make([]byte, 2*size+4) } } - m.stopTime = ts + logBuffer.stopTime = ts - m.idx = append(m.idx, m.pos) - util.Uint32toBytes(m.sizeBuf, uint32(size)) - copy(m.buf[m.pos:m.pos+4], m.sizeBuf) - copy(m.buf[m.pos+4:m.pos+4+size], logEntryData) - m.pos += size + 4 + logBuffer.idx = append(logBuffer.idx, logBuffer.pos) + util.Uint32toBytes(logBuffer.sizeBuf, uint32(size)) + copy(logBuffer.buf[logBuffer.pos:logBuffer.pos+4], logBuffer.sizeBuf) + copy(logBuffer.buf[logBuffer.pos+4:logBuffer.pos+4+size], logEntryData) + logBuffer.pos += size + 4 - // fmt.Printf("entry size %d total %d count %d, buffer:%p\n", size, m.pos, len(m.idx), m) + // fmt.Printf("partitionKey %v entry size %d total %d count %d\n", string(partitionKey), size, m.pos, len(m.idx)) } -func (m *LogBuffer) IsStopping() bool { - return m.isStopping.Load() +func (logBuffer *LogBuffer) IsStopping() bool { + return logBuffer.isStopping.Load() } -func (m *LogBuffer) Shutdown() { - isAlreadyStopped := m.isStopping.Swap(true) +func (logBuffer *LogBuffer) Shutdown() { + isAlreadyStopped := logBuffer.isStopping.Swap(true) if isAlreadyStopped { return } - toFlush := m.copyToFlush() - m.flushChan <- toFlush - close(m.flushChan) + toFlush := logBuffer.copyToFlush() + logBuffer.flushChan <- toFlush + close(logBuffer.flushChan) } -func (m *LogBuffer) loopFlush() { - for d := range m.flushChan { +func (logBuffer *LogBuffer) loopFlush() { + for d := range logBuffer.flushChan { if d != nil { // glog.V(4).Infof("%s flush [%v, %v] size %d", m.name, d.startTime, d.stopTime, len(d.data.Bytes())) - m.flushFn(d.startTime, d.stopTime, d.data.Bytes()) + logBuffer.flushFn(d.startTime, d.stopTime, d.data.Bytes()) d.releaseMemory() // local logbuffer is different from aggregate logbuffer here - m.lastFlushTime = d.stopTime + logBuffer.lastFlushTime = d.stopTime } } } -func (m *LogBuffer) loopInterval() { - for !m.IsStopping() { - time.Sleep(m.flushInterval) - if m.IsStopping() { +func (logBuffer *LogBuffer) loopInterval() { + for !logBuffer.IsStopping() { + time.Sleep(logBuffer.flushInterval) + if logBuffer.IsStopping() { return } - m.Lock() - toFlush := m.copyToFlush() - m.Unlock() + logBuffer.Lock() + toFlush := logBuffer.copyToFlush() + logBuffer.Unlock() if toFlush != nil { - m.flushChan <- toFlush + glog.V(0).Infof("%s flush [%v, %v] size %d", logBuffer.name, toFlush.startTime, toFlush.stopTime, len(toFlush.data.Bytes())) + logBuffer.flushChan <- toFlush + } else { + // glog.V(0).Infof("%s no flush", m.name) } } } -func (m *LogBuffer) copyToFlush() *dataToFlush { +func (logBuffer *LogBuffer) copyToFlush() *dataToFlush { - if m.pos > 0 { + if logBuffer.pos > 0 { // fmt.Printf("flush buffer %d pos %d empty space %d\n", len(m.buf), m.pos, len(m.buf)-m.pos) var d *dataToFlush - if m.flushFn != nil { + if logBuffer.flushFn != nil { d = &dataToFlush{ - startTime: m.startTime, - stopTime: m.stopTime, - data: copiedBytes(m.buf[:m.pos]), + startTime: logBuffer.startTime, + stopTime: logBuffer.stopTime, + data: copiedBytes(logBuffer.buf[:logBuffer.pos]), } // glog.V(4).Infof("%s flushing [0,%d) with %d entries [%v, %v]", m.name, m.pos, len(m.idx), m.startTime, m.stopTime) } else { // glog.V(4).Infof("%s removed from memory [0,%d) with %d entries [%v, %v]", m.name, m.pos, len(m.idx), m.startTime, m.stopTime) - m.lastFlushTime = m.stopTime + logBuffer.lastFlushTime = logBuffer.stopTime } - m.buf = m.prevBuffers.SealBuffer(m.startTime, m.stopTime, m.buf, m.pos) - m.startTime = time.Unix(0, 0) - m.stopTime = time.Unix(0, 0) - m.pos = 0 - m.idx = m.idx[:0] + logBuffer.buf = logBuffer.prevBuffers.SealBuffer(logBuffer.startTime, logBuffer.stopTime, logBuffer.buf, logBuffer.pos, logBuffer.batchIndex) + logBuffer.startTime = time.Unix(0, 0) + logBuffer.stopTime = time.Unix(0, 0) + logBuffer.pos = 0 + logBuffer.idx = logBuffer.idx[:0] + logBuffer.batchIndex++ return d } return nil } +func (logBuffer *LogBuffer) GetEarliestTime() time.Time{ + return logBuffer.startTime +} +func (logBuffer *LogBuffer) GetEarliestPosition() MessagePosition{ + return MessagePosition{ + Time: logBuffer.startTime, + BatchIndex: logBuffer.batchIndex, + } +} + func (d *dataToFlush) releaseMemory() { d.data.Reset() bufferPool.Put(d.data) } -func (m *LogBuffer) ReadFromBuffer(lastReadTime time.Time) (bufferCopy *bytes.Buffer, err error) { - m.RLock() - defer m.RUnlock() +func (logBuffer *LogBuffer) ReadFromBuffer(lastReadPosition MessagePosition, inMemoryOnly bool) (bufferCopy *bytes.Buffer, batchIndex int64, err error) { + logBuffer.RLock() + defer logBuffer.RUnlock() // Read from disk and memory // 1. read from disk, last time is = td @@ -206,52 +223,61 @@ func (m *LogBuffer) ReadFromBuffer(lastReadTime time.Time) (bufferCopy *bytes.Bu // if td < tm, case 2.3 // read from disk again var tsMemory time.Time - if !m.startTime.IsZero() { - tsMemory = m.startTime + var tsBatchIndex int64 + if !logBuffer.startTime.IsZero() { + tsMemory = logBuffer.startTime + tsBatchIndex = logBuffer.batchIndex } - for _, prevBuf := range m.prevBuffers.buffers { + for _, prevBuf := range logBuffer.prevBuffers.buffers { if !prevBuf.startTime.IsZero() && prevBuf.startTime.Before(tsMemory) { tsMemory = prevBuf.startTime + tsBatchIndex = prevBuf.batchIndex } } if tsMemory.IsZero() { // case 2.2 - return nil, nil - } else if lastReadTime.Before(tsMemory) { // case 2.3 - if !m.lastFlushTime.IsZero() { - glog.V(0).Infof("resume with last flush time: %v", m.lastFlushTime) - return nil, ResumeFromDiskError + println("2.2 no data") + return nil, -2,nil + } else if lastReadPosition.Before(tsMemory) && lastReadPosition.BatchIndex +1 < tsBatchIndex { // case 2.3 + if inMemoryOnly { + println("2.3 no data", lastReadPosition.BatchIndex, tsBatchIndex) + // FIXME: this is wrong: the data has been flushed to disk already + return nil, tsBatchIndex,nil + } + if !logBuffer.lastFlushTime.IsZero() { + glog.V(0).Infof("resume with last flush time: %v", logBuffer.lastFlushTime) + return nil, -2, ResumeFromDiskError } } // the following is case 2.1 - if lastReadTime.Equal(m.stopTime) { - return nil, nil + if lastReadPosition.Equal(logBuffer.stopTime) { + return nil, logBuffer.batchIndex, nil } - if lastReadTime.After(m.stopTime) { - // glog.Fatalf("unexpected last read time %v, older than latest %v", lastReadTime, m.stopTime) - return nil, nil + if lastReadPosition.After(logBuffer.stopTime) { + // glog.Fatalf("unexpected last read time %v, older than latest %v", lastReadPosition, m.stopTime) + return nil, logBuffer.batchIndex, nil } - if lastReadTime.Before(m.startTime) { - // println("checking ", lastReadTime.UnixNano()) - for _, buf := range m.prevBuffers.buffers { - if buf.startTime.After(lastReadTime) { + if lastReadPosition.Before(logBuffer.startTime) { + // println("checking ", lastReadPosition.UnixNano()) + for _, buf := range logBuffer.prevBuffers.buffers { + if buf.startTime.After(lastReadPosition.Time) { // glog.V(4).Infof("%s return the %d sealed buffer %v", m.name, i, buf.startTime) // println("return the", i, "th in memory", buf.startTime.UnixNano()) - return copiedBytes(buf.buf[:buf.size]), nil + return copiedBytes(buf.buf[:buf.size]), buf.batchIndex, nil } - if !buf.startTime.After(lastReadTime) && buf.stopTime.After(lastReadTime) { - pos := buf.locateByTs(lastReadTime) + if !buf.startTime.After(lastReadPosition.Time) && buf.stopTime.After(lastReadPosition.Time) { + pos := buf.locateByTs(lastReadPosition.Time) // fmt.Printf("locate buffer[%d] pos %d\n", i, pos) - return copiedBytes(buf.buf[pos:buf.size]), nil + return copiedBytes(buf.buf[pos:buf.size]), buf.batchIndex, nil } } - // glog.V(4).Infof("%s return the current buf %v", m.name, lastReadTime) - return copiedBytes(m.buf[:m.pos]), nil + // glog.V(4).Infof("%s return the current buf %v", m.name, lastReadPosition) + return copiedBytes(logBuffer.buf[:logBuffer.pos]), logBuffer.batchIndex,nil } - lastTs := lastReadTime.UnixNano() - l, h := 0, len(m.idx)-1 + lastTs := lastReadPosition.UnixNano() + l, h := 0, len(logBuffer.idx)-1 /* for i, pos := range m.idx { @@ -269,18 +295,18 @@ func (m *LogBuffer) ReadFromBuffer(lastReadTime time.Time) (bufferCopy *bytes.Bu for l <= h { mid := (l + h) / 2 - pos := m.idx[mid] - _, t := readTs(m.buf, pos) + pos := logBuffer.idx[mid] + _, t := readTs(logBuffer.buf, pos) if t <= lastTs { l = mid + 1 } else if lastTs < t { var prevT int64 if mid > 0 { - _, prevT = readTs(m.buf, m.idx[mid-1]) + _, prevT = readTs(logBuffer.buf, logBuffer.idx[mid-1]) } if prevT <= lastTs { // fmt.Printf("found l=%d, m-1=%d(ts=%d), m=%d(ts=%d), h=%d [%d, %d) \n", l, mid-1, prevT, mid, t, h, pos, m.pos) - return copiedBytes(m.buf[pos:m.pos]), nil + return copiedBytes(logBuffer.buf[pos:logBuffer.pos]), logBuffer.batchIndex, nil } h = mid } @@ -288,10 +314,11 @@ func (m *LogBuffer) ReadFromBuffer(lastReadTime time.Time) (bufferCopy *bytes.Bu } // FIXME: this could be that the buffer has been flushed already - return nil, nil + println("Not sure why no data", lastReadPosition.BatchIndex, tsBatchIndex) + return nil, -2, nil } -func (m *LogBuffer) ReleaseMemory(b *bytes.Buffer) { +func (logBuffer *LogBuffer) ReleaseMemory(b *bytes.Buffer) { bufferPool.Put(b) } diff --git a/weed/util/log_buffer/log_buffer_test.go b/weed/util/log_buffer/log_buffer_test.go index 9ecb90762..91c520c63 100644 --- a/weed/util/log_buffer/log_buffer_test.go +++ b/weed/util/log_buffer/log_buffer_test.go @@ -28,7 +28,7 @@ func TestNewLogBufferFirstBuffer(t *testing.T) { wg.Add(1) go func() { defer wg.Done() - lastProcessedTime, isDone, err := lb.LoopProcessLogData("test", startTime, 0, func() bool { + lastProcessedTime, isDone, err := lb.LoopProcessLogData("test", startTime, false, 0, func() bool { // stop if no more messages return receivedMessageCount < messageCount }, func(logEntry *filer_pb.LogEntry) error { diff --git a/weed/util/log_buffer/log_read.go b/weed/util/log_buffer/log_read.go index 059f74286..fea9c3b39 100644 --- a/weed/util/log_buffer/log_read.go +++ b/weed/util/log_buffer/log_read.go @@ -17,15 +17,30 @@ var ( ResumeFromDiskError = fmt.Errorf("resumeFromDisk") ) -func (logBuffer *LogBuffer) LoopProcessLogData(readerName string, startReadTime time.Time, stopTsNs int64, - waitForDataFn func() bool, eachLogDataFn func(logEntry *filer_pb.LogEntry) error) (lastReadTime time.Time, isDone bool, err error) { +type MessagePosition struct { + time.Time // this is the timestamp of the message + BatchIndex int64 // this is only used when the timestamp is not enough to identify the next message, when the timestamp is in the previous batch. +} + +func NewMessagePosition(tsNs int64, batchIndex int64) MessagePosition { + return MessagePosition{ + Time: time.Unix(0, tsNs).UTC(), + BatchIndex: batchIndex, + } +} + +func (logBuffer *LogBuffer) LoopProcessLogData(readerName string, startPosition MessagePosition, inMemoryOnly bool, stopTsNs int64, + waitForDataFn func() bool, eachLogDataFn func(logEntry *filer_pb.LogEntry) error) (lastReadPosition MessagePosition, isDone bool, err error) { // loop through all messages var bytesBuf *bytes.Buffer - lastReadTime = startReadTime + var batchIndex int64 + lastReadPosition = startPosition + var entryCounter int64 defer func() { if bytesBuf != nil { logBuffer.ReleaseMemory(bytesBuf) } + println("LoopProcessLogData", readerName, "sent messages total", entryCounter) }() for { @@ -33,13 +48,20 @@ func (logBuffer *LogBuffer) LoopProcessLogData(readerName string, startReadTime if bytesBuf != nil { logBuffer.ReleaseMemory(bytesBuf) } - bytesBuf, err = logBuffer.ReadFromBuffer(lastReadTime) + bytesBuf, batchIndex, err = logBuffer.ReadFromBuffer(lastReadPosition, inMemoryOnly) if err == ResumeFromDiskError { time.Sleep(1127 * time.Millisecond) - return lastReadTime, isDone, ResumeFromDiskError + return lastReadPosition, isDone, ResumeFromDiskError + } + readSize := 0 + if bytesBuf != nil { + readSize = bytesBuf.Len() } - // glog.V(4).Infof("%s ReadFromBuffer by %v", readerName, lastReadTime) + glog.V(0).Infof("%s ReadFromBuffer at %v batch:%d, read bytes:%v batch:%d", readerName, lastReadPosition, lastReadPosition.BatchIndex, readSize, batchIndex) if bytesBuf == nil { + if batchIndex >= 0 { + lastReadPosition = NewMessagePosition(lastReadPosition.UnixNano(), batchIndex) + } if stopTsNs != 0 { isDone = true return @@ -52,7 +74,7 @@ func (logBuffer *LogBuffer) LoopProcessLogData(readerName string, startReadTime } buf := bytesBuf.Bytes() - // fmt.Printf("ReadFromBuffer %s by %v size %d\n", readerName, lastReadTime, len(buf)) + // fmt.Printf("ReadFromBuffer %s by %v size %d\n", readerName, lastReadPosition, len(buf)) batchSize := 0 @@ -61,7 +83,7 @@ func (logBuffer *LogBuffer) LoopProcessLogData(readerName string, startReadTime size := util.BytesToUint32(buf[pos : pos+4]) if pos+4+int(size) > len(buf) { err = ResumeError - glog.Errorf("LoopProcessLogData: %s read buffer %v read %d [%d,%d) from [0,%d)", readerName, lastReadTime, batchSize, pos, pos+int(size)+4, len(buf)) + glog.Errorf("LoopProcessLogData: %s read buffer %v read %d entries [%d,%d) from [0,%d)", readerName, lastReadPosition, batchSize, pos, pos+int(size)+4, len(buf)) return } entryData := buf[pos+4 : pos+4+int(size)] @@ -74,20 +96,23 @@ func (logBuffer *LogBuffer) LoopProcessLogData(readerName string, startReadTime } if stopTsNs != 0 && logEntry.TsNs > stopTsNs { isDone = true + println("stopTsNs", stopTsNs, "logEntry.TsNs", logEntry.TsNs) return } - lastReadTime = time.Unix(0, logEntry.TsNs) + lastReadPosition = NewMessagePosition(logEntry.TsNs, batchIndex) if err = eachLogDataFn(logEntry); err != nil { + glog.Errorf("LoopProcessLogData: %s process log entry %d %v: %v", readerName, batchSize+1, logEntry, err) return } pos += 4 + int(size) batchSize++ + entryCounter++ } - // glog.V(4).Infof("%s sent messages ts[%+v,%+v] size %d\n", readerName, startReadTime, lastReadTime, batchSize) + glog.V(0).Infof("%s sent messages ts[%+v,%+v] size %d\n", readerName, startPosition, lastReadPosition, batchSize) } } diff --git a/weed/util/log_buffer/sealed_buffer.go b/weed/util/log_buffer/sealed_buffer.go index d133cf8d3..920a811f2 100644 --- a/weed/util/log_buffer/sealed_buffer.go +++ b/weed/util/log_buffer/sealed_buffer.go @@ -10,6 +10,7 @@ type MemBuffer struct { size int startTime time.Time stopTime time.Time + batchIndex int64 } type SealedBuffers struct { @@ -29,7 +30,7 @@ func newSealedBuffers(size int) *SealedBuffers { return sbs } -func (sbs *SealedBuffers) SealBuffer(startTime, stopTime time.Time, buf []byte, pos int) (newBuf []byte) { +func (sbs *SealedBuffers) SealBuffer(startTime, stopTime time.Time, buf []byte, pos int, batchIndex int64) (newBuf []byte) { oldMemBuffer := sbs.buffers[0] size := len(sbs.buffers) for i := 0; i < size-1; i++ { @@ -37,11 +38,13 @@ func (sbs *SealedBuffers) SealBuffer(startTime, stopTime time.Time, buf []byte, sbs.buffers[i].size = sbs.buffers[i+1].size sbs.buffers[i].startTime = sbs.buffers[i+1].startTime sbs.buffers[i].stopTime = sbs.buffers[i+1].stopTime + sbs.buffers[i].batchIndex = sbs.buffers[i+1].batchIndex } sbs.buffers[size-1].buf = buf sbs.buffers[size-1].size = pos sbs.buffers[size-1].startTime = startTime sbs.buffers[size-1].stopTime = stopTime + sbs.buffers[size-1].batchIndex = batchIndex return oldMemBuffer.buf } |
