aboutsummaryrefslogtreecommitdiff
path: root/weed/util/log_buffer/sealed_buffer.go
blob: 397dab1d4b98a81b0c1364b0324021eae3963378 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
package log_buffer

import (
	"fmt"
	"time"
)

type MemBuffer struct {
	buf         []byte
	size        int
	startTime   time.Time
	stopTime    time.Time
	startOffset int64 // First offset in this buffer
	offset      int64 // Last offset in this buffer (endOffset)
}

type SealedBuffers struct {
	buffers []*MemBuffer
}

func newSealedBuffers(size int) *SealedBuffers {
	sbs := &SealedBuffers{}

	sbs.buffers = make([]*MemBuffer, size)
	for i := 0; i < size; i++ {
		sbs.buffers[i] = &MemBuffer{
			buf: make([]byte, BufferSize),
		}
	}

	return sbs
}

func (sbs *SealedBuffers) SealBuffer(startTime, stopTime time.Time, buf []byte, pos int, startOffset int64, endOffset int64) (newBuf []byte) {
	oldMemBuffer := sbs.buffers[0]
	size := len(sbs.buffers)
	for i := 0; i < size-1; i++ {
		sbs.buffers[i].buf = sbs.buffers[i+1].buf
		sbs.buffers[i].size = sbs.buffers[i+1].size
		sbs.buffers[i].startTime = sbs.buffers[i+1].startTime
		sbs.buffers[i].stopTime = sbs.buffers[i+1].stopTime
		sbs.buffers[i].startOffset = sbs.buffers[i+1].startOffset
		sbs.buffers[i].offset = sbs.buffers[i+1].offset
	}
	sbs.buffers[size-1].buf = buf
	sbs.buffers[size-1].size = pos
	sbs.buffers[size-1].startTime = startTime
	sbs.buffers[size-1].stopTime = stopTime
	sbs.buffers[size-1].startOffset = startOffset
	sbs.buffers[size-1].offset = endOffset
	return oldMemBuffer.buf
}

func (mb *MemBuffer) locateByTs(lastReadTime time.Time) (pos int) {
	lastReadTs := lastReadTime.UnixNano()
	for pos < len(mb.buf) {
		size, t := readTs(mb.buf, pos)
		if t > lastReadTs {
			return
		}
		pos += size + 4
	}
	return len(mb.buf)
}

func (mb *MemBuffer) String() string {
	return fmt.Sprintf("[%v,%v] bytes:%d", mb.startTime, mb.stopTime, mb.size)
}