diff options
| author | chrislu <chris.lu@gmail.com> | 2024-03-16 11:42:23 -0700 |
|---|---|---|
| committer | chrislu <chris.lu@gmail.com> | 2024-03-16 11:42:23 -0700 |
| commit | 6a61b54f29df704bbe2b8fb69cb234cac530bd48 (patch) | |
| tree | b21c2f6923941d8287acdc9a080b8de21d59f848 /weed/util | |
| parent | 205829fa22216228be40effb9d684aa7900ded57 (diff) | |
| parent | 27bb38228b647e34fe20a6016fa04c829138c272 (diff) | |
| download | seaweedfs-6a61b54f29df704bbe2b8fb69cb234cac530bd48.tar.xz seaweedfs-6a61b54f29df704bbe2b8fb69cb234cac530bd48.zip | |
Merge branch 'mq-subscribe'
Diffstat (limited to 'weed/util')
| -rw-r--r-- | weed/util/buffered_queue/buffered_queue.go | 137 | ||||
| -rw-r--r-- | weed/util/buffered_queue/buffered_queue_test.go | 128 | ||||
| -rw-r--r-- | weed/util/http_util.go | 38 | ||||
| -rw-r--r-- | weed/util/log_buffer/log_buffer.go | 257 | ||||
| -rw-r--r-- | weed/util/log_buffer/log_buffer_test.go | 2 | ||||
| -rw-r--r-- | weed/util/log_buffer/log_read.go | 66 | ||||
| -rw-r--r-- | weed/util/log_buffer/sealed_buffer.go | 13 |
7 files changed, 507 insertions, 134 deletions
diff --git a/weed/util/buffered_queue/buffered_queue.go b/weed/util/buffered_queue/buffered_queue.go new file mode 100644 index 000000000..edaa0a7ce --- /dev/null +++ b/weed/util/buffered_queue/buffered_queue.go @@ -0,0 +1,137 @@ +package buffered_queue + +import ( + "fmt" + "sync" +) + +// ItemChunkNode represents a node in the linked list of job chunks +type ItemChunkNode[T any] struct { + items []T + headIndex int + tailIndex int + next *ItemChunkNode[T] + nodeId int +} + +// BufferedQueue implements a buffered queue using a linked list of job chunks +type BufferedQueue[T any] struct { + chunkSize int // Maximum number of items per chunk + head *ItemChunkNode[T] + tail *ItemChunkNode[T] + last *ItemChunkNode[T] // Pointer to the last chunk, for reclaiming memory + count int // Total number of items in the queue + mutex sync.Mutex + nodeCounter int + waitCond *sync.Cond + isClosed bool +} + +// NewBufferedQueue creates a new buffered queue with the specified chunk size +func NewBufferedQueue[T any](chunkSize int) *BufferedQueue[T] { + // Create an empty chunk to initialize head and tail + chunk := &ItemChunkNode[T]{items: make([]T, chunkSize), nodeId: 0} + bq := &BufferedQueue[T]{ + chunkSize: chunkSize, + head: chunk, + tail: chunk, + last: chunk, + count: 0, + mutex: sync.Mutex{}, + } + bq.waitCond = sync.NewCond(&bq.mutex) + return bq +} + +// Enqueue adds a job to the queue +func (q *BufferedQueue[T]) Enqueue(job T) error { + + if q.isClosed { + return fmt.Errorf("queue is closed") + } + + q.mutex.Lock() + defer q.mutex.Unlock() + + // If the tail chunk is full, create a new chunk (reusing empty chunks if available) + if q.tail.tailIndex == q.chunkSize { + if q.tail == q.last { + // Create a new chunk + q.nodeCounter++ + newChunk := &ItemChunkNode[T]{items: make([]T, q.chunkSize), nodeId: q.nodeCounter} + q.tail.next = newChunk + q.tail = newChunk + q.last = newChunk + } else { + // Reuse an empty chunk + q.tail = q.tail.next + q.tail.headIndex = 0 + q.tail.tailIndex = 0 + // println("tail moved to chunk", q.tail.nodeId) + } + } + + // Add the job to the tail chunk + q.tail.items[q.tail.tailIndex] = job + q.tail.tailIndex++ + q.count++ + if q.count == 1 { + q.waitCond.Signal() + } + + return nil +} + +// Dequeue removes and returns a job from the queue +func (q *BufferedQueue[T]) Dequeue() (T, bool) { + q.mutex.Lock() + defer q.mutex.Unlock() + + for q.count <= 0 && !q.isClosed { + q.waitCond.Wait() + } + if q.count <= 0 && q.isClosed { + var a T + return a, false + } + + job := q.head.items[q.head.headIndex] + q.head.headIndex++ + q.count-- + + if q.head.headIndex == q.chunkSize { + q.last.next = q.head + q.head = q.head.next + q.last = q.last.next + q.last.next = nil + //println("reusing chunk", q.last.nodeId) + //fmt.Printf("head: %+v\n", q.head) + //fmt.Printf("tail: %+v\n", q.tail) + //fmt.Printf("last: %+v\n", q.last) + //fmt.Printf("count: %d\n", q.count) + //for p := q.head; p != nil ; p = p.next { + // fmt.Printf("Node: %+v\n", p) + //} + } + + return job, true +} + +// Size returns the number of items in the queue +func (q *BufferedQueue[T]) Size() int { + q.mutex.Lock() + defer q.mutex.Unlock() + return q.count +} + +// IsEmpty returns true if the queue is empty +func (q *BufferedQueue[T]) IsEmpty() bool { + return q.Size() == 0 +} + +func (q *BufferedQueue[T]) CloseInput() { + q.mutex.Lock() + defer q.mutex.Unlock() + q.isClosed = true + q.waitCond.Broadcast() +} diff --git a/weed/util/buffered_queue/buffered_queue_test.go b/weed/util/buffered_queue/buffered_queue_test.go new file mode 100644 index 000000000..97c9f25a7 --- /dev/null +++ b/weed/util/buffered_queue/buffered_queue_test.go @@ -0,0 +1,128 @@ +package buffered_queue + +import ( + "sync" + "testing" +) + +func TestJobQueue(t *testing.T) { + type Job[T any] struct { + ID int + Action string + Data T + } + + queue := NewBufferedQueue[Job[string]](2) // Chunk size of 5 + queue.Enqueue(Job[string]{ID: 1, Action: "task1", Data: "hello"}) + queue.Enqueue(Job[string]{ID: 2, Action: "task2", Data: "world"}) + + if queue.Size() != 2 { + t.Errorf("Expected queue size of 2, got %d", queue.Size()) + } + + queue.Enqueue(Job[string]{ID: 3, Action: "task3", Data: "3!"}) + queue.Enqueue(Job[string]{ID: 4, Action: "task4", Data: "4!"}) + queue.Enqueue(Job[string]{ID: 5, Action: "task5", Data: "5!"}) + + if queue.Size() != 5 { + t.Errorf("Expected queue size of 5, got %d", queue.Size()) + } + + println("enqueued 5 items") + + println("dequeue", 1) + job, ok := queue.Dequeue() + if !ok { + t.Errorf("Expected dequeue to return true") + } + if job.ID != 1 { + t.Errorf("Expected job ID of 1, got %d", job.ID) + } + + println("dequeue", 2) + job, ok = queue.Dequeue() + if !ok { + t.Errorf("Expected dequeue to return true") + } + + println("enqueue", 6) + queue.Enqueue(Job[string]{ID: 6, Action: "task6", Data: "6!"}) + println("enqueue", 7) + queue.Enqueue(Job[string]{ID: 7, Action: "task7", Data: "7!"}) + + for i := 0; i < 5; i++ { + println("dequeue ...") + job, ok = queue.Dequeue() + if !ok { + t.Errorf("Expected dequeue to return true") + } + println("dequeued", job.ID) + } + + if queue.Size() != 0 { + t.Errorf("Expected queue size of 0, got %d", queue.Size()) + } + + for i := 0; i < 5; i++ { + println("enqueue", i+8) + queue.Enqueue(Job[string]{ID: i + 8, Action: "task", Data: "data"}) + } + for i := 0; i < 5; i++ { + job, ok = queue.Dequeue() + if !ok { + t.Errorf("Expected dequeue to return true") + } + if job.ID != i+8 { + t.Errorf("Expected job ID of %d, got %d", i, job.ID) + } + println("dequeued", job.ID) + } + +} + +func TestJobQueueClose(t *testing.T) { + type Job[T any] struct { + ID int + Action string + Data T + } + + queue := NewBufferedQueue[Job[string]](2) + queue.Enqueue(Job[string]{ID: 1, Action: "task1", Data: "hello"}) + queue.Enqueue(Job[string]{ID: 2, Action: "task2", Data: "world"}) + + wg := sync.WaitGroup{} + wg.Add(1) + go func() { + defer wg.Done() + for data, ok := queue.Dequeue(); ok; data, ok = queue.Dequeue() { + println("dequeued", data.ID) + } + }() + + for i := 0; i < 5; i++ { + queue.Enqueue(Job[string]{ID: i + 3, Action: "task", Data: "data"}) + } + + queue.CloseInput() + wg.Wait() + +} + +func BenchmarkBufferedQueue(b *testing.B) { + type Job[T any] struct { + ID int + Action string + Data T + } + + queue := NewBufferedQueue[Job[string]](1024) + + for i := 0; i < b.N; i++ { + queue.Enqueue(Job[string]{ID: i, Action: "task", Data: "data"}) + } + for i := 0; i < b.N; i++ { + _, _ = queue.Dequeue() + } + +} diff --git a/weed/util/http_util.go b/weed/util/http_util.go index ef4b29158..d1505f673 100644 --- a/weed/util/http_util.go +++ b/weed/util/http_util.go @@ -10,6 +10,7 @@ import ( "net/http" "net/url" "strings" + "time" "github.com/seaweedfs/seaweedfs/weed/glog" ) @@ -450,3 +451,40 @@ func (r *CountingReader) Read(p []byte) (n int, err error) { r.BytesRead += n return n, err } + +func RetriedFetchChunkData(buffer []byte, urlStrings []string, cipherKey []byte, isGzipped bool, isFullChunk bool, offset int64) (n int, err error) { + + var shouldRetry bool + + for waitTime := time.Second; waitTime < RetryWaitTime; waitTime += waitTime / 2 { + for _, urlString := range urlStrings { + n = 0 + if strings.Contains(urlString, "%") { + urlString = url.PathEscape(urlString) + } + shouldRetry, err = ReadUrlAsStream(urlString+"?readDeleted=true", cipherKey, isGzipped, isFullChunk, offset, len(buffer), func(data []byte) { + if n < len(buffer) { + x := copy(buffer[n:], data) + n += x + } + }) + if !shouldRetry { + break + } + if err != nil { + glog.V(0).Infof("read %s failed, err: %v", urlString, err) + } else { + break + } + } + if err != nil && shouldRetry { + glog.V(0).Infof("retry reading in %v", waitTime) + time.Sleep(waitTime) + } else { + break + } + } + + return n, err + +} diff --git a/weed/util/log_buffer/log_buffer.go b/weed/util/log_buffer/log_buffer.go index bd124908e..4d393d88b 100644 --- a/weed/util/log_buffer/log_buffer.go +++ b/weed/util/log_buffer/log_buffer.go @@ -22,53 +22,62 @@ type dataToFlush struct { data *bytes.Buffer } +type EachLogEntryFuncType func(logEntry *filer_pb.LogEntry) (isDone bool, err error) +type LogFlushFuncType func(logBuffer *LogBuffer, startTime, stopTime time.Time, buf []byte) +type LogReadFromDiskFuncType func(startPosition MessagePosition, stopTsNs int64, eachLogEntryFn EachLogEntryFuncType) (lastReadPosition MessagePosition, isDone bool, err error) + type LogBuffer struct { - name string - prevBuffers *SealedBuffers - buf []byte - idx []int - pos int - startTime time.Time - stopTime time.Time - lastFlushTime time.Time - sizeBuf []byte - flushInterval time.Duration - flushFn func(startTime, stopTime time.Time, buf []byte) - notifyFn func() - isStopping *atomic.Bool - flushChan chan *dataToFlush - lastTsNs int64 + LastFlushTsNs int64 + name string + prevBuffers *SealedBuffers + buf []byte + batchIndex int64 + idx []int + pos int + startTime time.Time + stopTime time.Time + lastFlushDataTime time.Time + sizeBuf []byte + flushInterval time.Duration + flushFn LogFlushFuncType + ReadFromDiskFn LogReadFromDiskFuncType + notifyFn func() + isStopping *atomic.Bool + flushChan chan *dataToFlush + LastTsNs int64 sync.RWMutex } -func NewLogBuffer(name string, flushInterval time.Duration, flushFn func(startTime, stopTime time.Time, buf []byte), notifyFn func()) *LogBuffer { +func NewLogBuffer(name string, flushInterval time.Duration, flushFn LogFlushFuncType, + readFromDiskFn LogReadFromDiskFuncType, notifyFn func()) *LogBuffer { lb := &LogBuffer{ - name: name, - prevBuffers: newSealedBuffers(PreviousBufferCount), - buf: make([]byte, BufferSize), - sizeBuf: make([]byte, 4), - flushInterval: flushInterval, - flushFn: flushFn, - notifyFn: notifyFn, - flushChan: make(chan *dataToFlush, 256), - isStopping: new(atomic.Bool), + name: name, + prevBuffers: newSealedBuffers(PreviousBufferCount), + buf: make([]byte, BufferSize), + sizeBuf: make([]byte, 4), + flushInterval: flushInterval, + flushFn: flushFn, + ReadFromDiskFn: readFromDiskFn, + notifyFn: notifyFn, + flushChan: make(chan *dataToFlush, 256), + isStopping: new(atomic.Bool), } go lb.loopFlush() go lb.loopInterval() return lb } -func (m *LogBuffer) AddToBuffer(partitionKey, data []byte, processingTsNs int64) { +func (logBuffer *LogBuffer) AddToBuffer(partitionKey, data []byte, processingTsNs int64) { var toFlush *dataToFlush - m.Lock() + logBuffer.Lock() defer func() { - m.Unlock() + logBuffer.Unlock() if toFlush != nil { - m.flushChan <- toFlush + logBuffer.flushChan <- toFlush } - if m.notifyFn != nil { - m.notifyFn() + if logBuffer.notifyFn != nil { + logBuffer.notifyFn() } }() @@ -80,121 +89,136 @@ func (m *LogBuffer) AddToBuffer(partitionKey, data []byte, processingTsNs int64) } else { ts = time.Unix(0, processingTsNs) } - if m.lastTsNs >= processingTsNs { + if logBuffer.LastTsNs >= processingTsNs { // this is unlikely to happen, but just in case - processingTsNs = m.lastTsNs + 1 + processingTsNs = logBuffer.LastTsNs + 1 ts = time.Unix(0, processingTsNs) } - m.lastTsNs = processingTsNs + logBuffer.LastTsNs = processingTsNs logEntry := &filer_pb.LogEntry{ TsNs: processingTsNs, PartitionKeyHash: util.HashToInt32(partitionKey), Data: data, + Key: partitionKey, } logEntryData, _ := proto.Marshal(logEntry) size := len(logEntryData) - if m.pos == 0 { - m.startTime = ts + if logBuffer.pos == 0 { + logBuffer.startTime = ts } - if m.startTime.Add(m.flushInterval).Before(ts) || len(m.buf)-m.pos < size+4 { - // glog.V(4).Infof("%s copyToFlush1 start time %v, ts %v, remaining %d bytes", m.name, m.startTime, ts, len(m.buf)-m.pos) - toFlush = m.copyToFlush() - m.startTime = ts - if len(m.buf) < size+4 { - m.buf = make([]byte, 2*size+4) + if logBuffer.startTime.Add(logBuffer.flushInterval).Before(ts) || len(logBuffer.buf)-logBuffer.pos < size+4 { + // glog.V(0).Infof("%s copyToFlush1 batch:%d count:%d start time %v, ts %v, remaining %d bytes", logBuffer.name, logBuffer.batchIndex, len(logBuffer.idx), logBuffer.startTime, ts, len(logBuffer.buf)-logBuffer.pos) + toFlush = logBuffer.copyToFlush() + logBuffer.startTime = ts + if len(logBuffer.buf) < size+4 { + logBuffer.buf = make([]byte, 2*size+4) } } - m.stopTime = ts + logBuffer.stopTime = ts - m.idx = append(m.idx, m.pos) - util.Uint32toBytes(m.sizeBuf, uint32(size)) - copy(m.buf[m.pos:m.pos+4], m.sizeBuf) - copy(m.buf[m.pos+4:m.pos+4+size], logEntryData) - m.pos += size + 4 + logBuffer.idx = append(logBuffer.idx, logBuffer.pos) + util.Uint32toBytes(logBuffer.sizeBuf, uint32(size)) + copy(logBuffer.buf[logBuffer.pos:logBuffer.pos+4], logBuffer.sizeBuf) + copy(logBuffer.buf[logBuffer.pos+4:logBuffer.pos+4+size], logEntryData) + logBuffer.pos += size + 4 - // fmt.Printf("entry size %d total %d count %d, buffer:%p\n", size, m.pos, len(m.idx), m) + // fmt.Printf("partitionKey %v entry size %d total %d count %d\n", string(partitionKey), size, m.pos, len(m.idx)) } -func (m *LogBuffer) IsStopping() bool { - return m.isStopping.Load() +func (logBuffer *LogBuffer) IsStopping() bool { + return logBuffer.isStopping.Load() } -func (m *LogBuffer) Shutdown() { - isAlreadyStopped := m.isStopping.Swap(true) +func (logBuffer *LogBuffer) ShutdownLogBuffer() { + isAlreadyStopped := logBuffer.isStopping.Swap(true) if isAlreadyStopped { return } - toFlush := m.copyToFlush() - m.flushChan <- toFlush - close(m.flushChan) + toFlush := logBuffer.copyToFlush() + logBuffer.flushChan <- toFlush + close(logBuffer.flushChan) } -func (m *LogBuffer) loopFlush() { - for d := range m.flushChan { +func (logBuffer *LogBuffer) loopFlush() { + for d := range logBuffer.flushChan { if d != nil { // glog.V(4).Infof("%s flush [%v, %v] size %d", m.name, d.startTime, d.stopTime, len(d.data.Bytes())) - m.flushFn(d.startTime, d.stopTime, d.data.Bytes()) + logBuffer.flushFn(logBuffer, d.startTime, d.stopTime, d.data.Bytes()) d.releaseMemory() // local logbuffer is different from aggregate logbuffer here - m.lastFlushTime = d.stopTime + logBuffer.lastFlushDataTime = d.stopTime } } } -func (m *LogBuffer) loopInterval() { - for !m.IsStopping() { - time.Sleep(m.flushInterval) - if m.IsStopping() { +func (logBuffer *LogBuffer) loopInterval() { + for !logBuffer.IsStopping() { + time.Sleep(logBuffer.flushInterval) + if logBuffer.IsStopping() { return } - m.Lock() - toFlush := m.copyToFlush() - m.Unlock() + logBuffer.Lock() + toFlush := logBuffer.copyToFlush() + logBuffer.Unlock() if toFlush != nil { - m.flushChan <- toFlush + glog.V(0).Infof("%s flush [%v, %v] size %d", logBuffer.name, toFlush.startTime, toFlush.stopTime, len(toFlush.data.Bytes())) + logBuffer.flushChan <- toFlush + } else { + // glog.V(0).Infof("%s no flush", m.name) } } } -func (m *LogBuffer) copyToFlush() *dataToFlush { +func (logBuffer *LogBuffer) copyToFlush() *dataToFlush { - if m.pos > 0 { + if logBuffer.pos > 0 { // fmt.Printf("flush buffer %d pos %d empty space %d\n", len(m.buf), m.pos, len(m.buf)-m.pos) var d *dataToFlush - if m.flushFn != nil { + if logBuffer.flushFn != nil { d = &dataToFlush{ - startTime: m.startTime, - stopTime: m.stopTime, - data: copiedBytes(m.buf[:m.pos]), + startTime: logBuffer.startTime, + stopTime: logBuffer.stopTime, + data: copiedBytes(logBuffer.buf[:logBuffer.pos]), } // glog.V(4).Infof("%s flushing [0,%d) with %d entries [%v, %v]", m.name, m.pos, len(m.idx), m.startTime, m.stopTime) } else { // glog.V(4).Infof("%s removed from memory [0,%d) with %d entries [%v, %v]", m.name, m.pos, len(m.idx), m.startTime, m.stopTime) - m.lastFlushTime = m.stopTime + logBuffer.lastFlushDataTime = logBuffer.stopTime } - m.buf = m.prevBuffers.SealBuffer(m.startTime, m.stopTime, m.buf, m.pos) - m.startTime = time.Unix(0, 0) - m.stopTime = time.Unix(0, 0) - m.pos = 0 - m.idx = m.idx[:0] + logBuffer.buf = logBuffer.prevBuffers.SealBuffer(logBuffer.startTime, logBuffer.stopTime, logBuffer.buf, logBuffer.pos, logBuffer.batchIndex) + logBuffer.startTime = time.Unix(0, 0) + logBuffer.stopTime = time.Unix(0, 0) + logBuffer.pos = 0 + logBuffer.idx = logBuffer.idx[:0] + logBuffer.batchIndex++ return d } return nil } +func (logBuffer *LogBuffer) GetEarliestTime() time.Time { + return logBuffer.startTime +} +func (logBuffer *LogBuffer) GetEarliestPosition() MessagePosition { + return MessagePosition{ + Time: logBuffer.startTime, + BatchIndex: logBuffer.batchIndex, + } +} + func (d *dataToFlush) releaseMemory() { d.data.Reset() bufferPool.Put(d.data) } -func (m *LogBuffer) ReadFromBuffer(lastReadTime time.Time) (bufferCopy *bytes.Buffer, err error) { - m.RLock() - defer m.RUnlock() +func (logBuffer *LogBuffer) ReadFromBuffer(lastReadPosition MessagePosition) (bufferCopy *bytes.Buffer, batchIndex int64, err error) { + logBuffer.RLock() + defer logBuffer.RUnlock() // Read from disk and memory // 1. read from disk, last time is = td @@ -206,52 +230,56 @@ func (m *LogBuffer) ReadFromBuffer(lastReadTime time.Time) (bufferCopy *bytes.Bu // if td < tm, case 2.3 // read from disk again var tsMemory time.Time - if !m.startTime.IsZero() { - tsMemory = m.startTime + var tsBatchIndex int64 + if !logBuffer.startTime.IsZero() { + tsMemory = logBuffer.startTime + tsBatchIndex = logBuffer.batchIndex } - for _, prevBuf := range m.prevBuffers.buffers { + for _, prevBuf := range logBuffer.prevBuffers.buffers { if !prevBuf.startTime.IsZero() && prevBuf.startTime.Before(tsMemory) { tsMemory = prevBuf.startTime + tsBatchIndex = prevBuf.batchIndex } } if tsMemory.IsZero() { // case 2.2 - return nil, nil - } else if lastReadTime.Before(tsMemory) { // case 2.3 - if !m.lastFlushTime.IsZero() { - glog.V(0).Infof("resume with last flush time: %v", m.lastFlushTime) - return nil, ResumeFromDiskError + println("2.2 no data") + return nil, -2, nil + } else if lastReadPosition.Before(tsMemory) && lastReadPosition.BatchIndex+1 < tsBatchIndex { // case 2.3 + if !logBuffer.lastFlushDataTime.IsZero() { + glog.V(0).Infof("resume with last flush time: %v", logBuffer.lastFlushDataTime) + return nil, -2, ResumeFromDiskError } } // the following is case 2.1 - if lastReadTime.Equal(m.stopTime) { - return nil, nil + if lastReadPosition.Equal(logBuffer.stopTime) { + return nil, logBuffer.batchIndex, nil } - if lastReadTime.After(m.stopTime) { - // glog.Fatalf("unexpected last read time %v, older than latest %v", lastReadTime, m.stopTime) - return nil, nil + if lastReadPosition.After(logBuffer.stopTime) { + // glog.Fatalf("unexpected last read time %v, older than latest %v", lastReadPosition, m.stopTime) + return nil, logBuffer.batchIndex, nil } - if lastReadTime.Before(m.startTime) { - // println("checking ", lastReadTime.UnixNano()) - for _, buf := range m.prevBuffers.buffers { - if buf.startTime.After(lastReadTime) { + if lastReadPosition.Before(logBuffer.startTime) { + // println("checking ", lastReadPosition.UnixNano()) + for _, buf := range logBuffer.prevBuffers.buffers { + if buf.startTime.After(lastReadPosition.Time) { // glog.V(4).Infof("%s return the %d sealed buffer %v", m.name, i, buf.startTime) // println("return the", i, "th in memory", buf.startTime.UnixNano()) - return copiedBytes(buf.buf[:buf.size]), nil + return copiedBytes(buf.buf[:buf.size]), buf.batchIndex, nil } - if !buf.startTime.After(lastReadTime) && buf.stopTime.After(lastReadTime) { - pos := buf.locateByTs(lastReadTime) + if !buf.startTime.After(lastReadPosition.Time) && buf.stopTime.After(lastReadPosition.Time) { + pos := buf.locateByTs(lastReadPosition.Time) // fmt.Printf("locate buffer[%d] pos %d\n", i, pos) - return copiedBytes(buf.buf[pos:buf.size]), nil + return copiedBytes(buf.buf[pos:buf.size]), buf.batchIndex, nil } } - // glog.V(4).Infof("%s return the current buf %v", m.name, lastReadTime) - return copiedBytes(m.buf[:m.pos]), nil + // glog.V(4).Infof("%s return the current buf %v", m.name, lastReadPosition) + return copiedBytes(logBuffer.buf[:logBuffer.pos]), logBuffer.batchIndex, nil } - lastTs := lastReadTime.UnixNano() - l, h := 0, len(m.idx)-1 + lastTs := lastReadPosition.UnixNano() + l, h := 0, len(logBuffer.idx)-1 /* for i, pos := range m.idx { @@ -269,18 +297,18 @@ func (m *LogBuffer) ReadFromBuffer(lastReadTime time.Time) (bufferCopy *bytes.Bu for l <= h { mid := (l + h) / 2 - pos := m.idx[mid] - _, t := readTs(m.buf, pos) + pos := logBuffer.idx[mid] + _, t := readTs(logBuffer.buf, pos) if t <= lastTs { l = mid + 1 } else if lastTs < t { var prevT int64 if mid > 0 { - _, prevT = readTs(m.buf, m.idx[mid-1]) + _, prevT = readTs(logBuffer.buf, logBuffer.idx[mid-1]) } if prevT <= lastTs { // fmt.Printf("found l=%d, m-1=%d(ts=%d), m=%d(ts=%d), h=%d [%d, %d) \n", l, mid-1, prevT, mid, t, h, pos, m.pos) - return copiedBytes(m.buf[pos:m.pos]), nil + return copiedBytes(logBuffer.buf[pos:logBuffer.pos]), logBuffer.batchIndex, nil } h = mid } @@ -288,10 +316,11 @@ func (m *LogBuffer) ReadFromBuffer(lastReadTime time.Time) (bufferCopy *bytes.Bu } // FIXME: this could be that the buffer has been flushed already - return nil, nil + println("Not sure why no data", lastReadPosition.BatchIndex, tsBatchIndex) + return nil, -2, nil } -func (m *LogBuffer) ReleaseMemory(b *bytes.Buffer) { +func (logBuffer *LogBuffer) ReleaseMemory(b *bytes.Buffer) { bufferPool.Put(b) } diff --git a/weed/util/log_buffer/log_buffer_test.go b/weed/util/log_buffer/log_buffer_test.go index 9ecb90762..ac46a096c 100644 --- a/weed/util/log_buffer/log_buffer_test.go +++ b/weed/util/log_buffer/log_buffer_test.go @@ -15,7 +15,7 @@ func TestNewLogBufferFirstBuffer(t *testing.T) { flushInterval := time.Second lb := NewLogBuffer("test", flushInterval, func(startTime, stopTime time.Time, buf []byte) { fmt.Printf("flush from %v to %v %d bytes\n", startTime, stopTime, len(buf)) - }, func() { + }, nil, func() { }) startTime := time.Now() diff --git a/weed/util/log_buffer/log_read.go b/weed/util/log_buffer/log_read.go index 059f74286..0354f0e7f 100644 --- a/weed/util/log_buffer/log_read.go +++ b/weed/util/log_buffer/log_read.go @@ -17,15 +17,30 @@ var ( ResumeFromDiskError = fmt.Errorf("resumeFromDisk") ) -func (logBuffer *LogBuffer) LoopProcessLogData(readerName string, startReadTime time.Time, stopTsNs int64, - waitForDataFn func() bool, eachLogDataFn func(logEntry *filer_pb.LogEntry) error) (lastReadTime time.Time, isDone bool, err error) { +type MessagePosition struct { + time.Time // this is the timestamp of the message + BatchIndex int64 // this is only used when the timestamp is not enough to identify the next message, when the timestamp is in the previous batch. +} + +func NewMessagePosition(tsNs int64, batchIndex int64) MessagePosition { + return MessagePosition{ + Time: time.Unix(0, tsNs).UTC(), + BatchIndex: batchIndex, + } +} + +func (logBuffer *LogBuffer) LoopProcessLogData(readerName string, startPosition MessagePosition, stopTsNs int64, + waitForDataFn func() bool, eachLogDataFn EachLogEntryFuncType) (lastReadPosition MessagePosition, isDone bool, err error) { // loop through all messages var bytesBuf *bytes.Buffer - lastReadTime = startReadTime + var batchIndex int64 + lastReadPosition = startPosition + var entryCounter int64 defer func() { if bytesBuf != nil { logBuffer.ReleaseMemory(bytesBuf) } + println("LoopProcessLogData", readerName, "sent messages total", entryCounter) }() for { @@ -33,26 +48,42 @@ func (logBuffer *LogBuffer) LoopProcessLogData(readerName string, startReadTime if bytesBuf != nil { logBuffer.ReleaseMemory(bytesBuf) } - bytesBuf, err = logBuffer.ReadFromBuffer(lastReadTime) + bytesBuf, batchIndex, err = logBuffer.ReadFromBuffer(lastReadPosition) if err == ResumeFromDiskError { time.Sleep(1127 * time.Millisecond) - return lastReadTime, isDone, ResumeFromDiskError + return lastReadPosition, isDone, ResumeFromDiskError + } + readSize := 0 + if bytesBuf != nil { + readSize = bytesBuf.Len() } - // glog.V(4).Infof("%s ReadFromBuffer by %v", readerName, lastReadTime) + glog.V(0).Infof("%s ReadFromBuffer at %v batch %d. Read bytes %v batch %d", readerName, lastReadPosition, lastReadPosition.BatchIndex, readSize, batchIndex) if bytesBuf == nil { + if batchIndex >= 0 { + lastReadPosition = NewMessagePosition(lastReadPosition.UnixNano(), batchIndex) + } if stopTsNs != 0 { isDone = true return } - if waitForDataFn() { - continue - } else { + lastTsNs := logBuffer.LastTsNs + for lastTsNs == logBuffer.LastTsNs { + if waitForDataFn() { + continue + } else { + isDone = true + return + } + } + if logBuffer.IsStopping() { + isDone = true return } + continue } buf := bytesBuf.Bytes() - // fmt.Printf("ReadFromBuffer %s by %v size %d\n", readerName, lastReadTime, len(buf)) + // fmt.Printf("ReadFromBuffer %s by %v size %d\n", readerName, lastReadPosition, len(buf)) batchSize := 0 @@ -61,7 +92,7 @@ func (logBuffer *LogBuffer) LoopProcessLogData(readerName string, startReadTime size := util.BytesToUint32(buf[pos : pos+4]) if pos+4+int(size) > len(buf) { err = ResumeError - glog.Errorf("LoopProcessLogData: %s read buffer %v read %d [%d,%d) from [0,%d)", readerName, lastReadTime, batchSize, pos, pos+int(size)+4, len(buf)) + glog.Errorf("LoopProcessLogData: %s read buffer %v read %d entries [%d,%d) from [0,%d)", readerName, lastReadPosition, batchSize, pos, pos+int(size)+4, len(buf)) return } entryData := buf[pos+4 : pos+4+int(size)] @@ -74,20 +105,27 @@ func (logBuffer *LogBuffer) LoopProcessLogData(readerName string, startReadTime } if stopTsNs != 0 && logEntry.TsNs > stopTsNs { isDone = true + println("stopTsNs", stopTsNs, "logEntry.TsNs", logEntry.TsNs) return } - lastReadTime = time.Unix(0, logEntry.TsNs) + lastReadPosition = NewMessagePosition(logEntry.TsNs, batchIndex) - if err = eachLogDataFn(logEntry); err != nil { + if isDone, err = eachLogDataFn(logEntry); err != nil { + glog.Errorf("LoopProcessLogData: %s process log entry %d %v: %v", readerName, batchSize+1, logEntry, err) + return + } + if isDone { + glog.V(0).Infof("LoopProcessLogData2: %s process log entry %d", readerName, batchSize+1) return } pos += 4 + int(size) batchSize++ + entryCounter++ } - // glog.V(4).Infof("%s sent messages ts[%+v,%+v] size %d\n", readerName, startReadTime, lastReadTime, batchSize) + glog.V(0).Infof("%s sent messages ts[%+v,%+v] size %d\n", readerName, startPosition, lastReadPosition, batchSize) } } diff --git a/weed/util/log_buffer/sealed_buffer.go b/weed/util/log_buffer/sealed_buffer.go index d133cf8d3..c41b30fcc 100644 --- a/weed/util/log_buffer/sealed_buffer.go +++ b/weed/util/log_buffer/sealed_buffer.go @@ -6,10 +6,11 @@ import ( ) type MemBuffer struct { - buf []byte - size int - startTime time.Time - stopTime time.Time + buf []byte + size int + startTime time.Time + stopTime time.Time + batchIndex int64 } type SealedBuffers struct { @@ -29,7 +30,7 @@ func newSealedBuffers(size int) *SealedBuffers { return sbs } -func (sbs *SealedBuffers) SealBuffer(startTime, stopTime time.Time, buf []byte, pos int) (newBuf []byte) { +func (sbs *SealedBuffers) SealBuffer(startTime, stopTime time.Time, buf []byte, pos int, batchIndex int64) (newBuf []byte) { oldMemBuffer := sbs.buffers[0] size := len(sbs.buffers) for i := 0; i < size-1; i++ { @@ -37,11 +38,13 @@ func (sbs *SealedBuffers) SealBuffer(startTime, stopTime time.Time, buf []byte, sbs.buffers[i].size = sbs.buffers[i+1].size sbs.buffers[i].startTime = sbs.buffers[i+1].startTime sbs.buffers[i].stopTime = sbs.buffers[i+1].stopTime + sbs.buffers[i].batchIndex = sbs.buffers[i+1].batchIndex } sbs.buffers[size-1].buf = buf sbs.buffers[size-1].size = pos sbs.buffers[size-1].startTime = startTime sbs.buffers[size-1].stopTime = stopTime + sbs.buffers[size-1].batchIndex = batchIndex return oldMemBuffer.buf } |
