aboutsummaryrefslogtreecommitdiff
path: root/weed/filer/filechunk_section.go
blob: 60c9195693f3bbe477c1b0e1d7800da5b2c93a6a (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
package filer

import (
	"github.com/seaweedfs/seaweedfs/weed/pb/filer_pb"
	"sync"
)

const SectionSize = 2 * 1024 * 1024 * 128 // 256MiB
type SectionIndex int64
type FileChunkSection struct {
	sectionIndex     SectionIndex
	chunks           []*filer_pb.FileChunk
	visibleIntervals *IntervalList[*VisibleInterval]
	chunkViews       *IntervalList[*ChunkView]
	reader           *ChunkReadAt
	lock             sync.Mutex
}

func NewFileChunkSection(si SectionIndex) *FileChunkSection {
	return &FileChunkSection{
		sectionIndex: si,
	}
}

func (section *FileChunkSection) addChunk(chunk *filer_pb.FileChunk) error {
	section.lock.Lock()
	defer section.lock.Unlock()

	start, stop := max(int64(section.sectionIndex)*SectionSize, chunk.Offset), min(((int64(section.sectionIndex)+1)*SectionSize), chunk.Offset+int64(chunk.Size))

	section.chunks = append(section.chunks, chunk)

	if section.visibleIntervals != nil {
		MergeIntoVisibles(section.visibleIntervals, start, stop, chunk)
	}

	if section.visibleIntervals != nil {
		section.chunks, _ = SeparateGarbageChunks(section.visibleIntervals, section.chunks)
	}

	if section.chunkViews != nil {
		MergeIntoChunkViews(section.chunkViews, start, stop, chunk)
	}

	return nil
}

func (section *FileChunkSection) setupForRead(group *ChunkGroup, fileSize int64) {
	if section.visibleIntervals == nil {
		section.visibleIntervals = readResolvedChunks(section.chunks, int64(section.sectionIndex)*SectionSize, (int64(section.sectionIndex)+1)*SectionSize)
		section.chunks, _ = SeparateGarbageChunks(section.visibleIntervals, section.chunks)
		if section.reader != nil {
			_ = section.reader.Close()
			section.reader = nil
		}
	}
	if section.chunkViews == nil {
		section.chunkViews = ViewFromVisibleIntervals(section.visibleIntervals, int64(section.sectionIndex)*SectionSize, (int64(section.sectionIndex)+1)*SectionSize)
	}

	if section.reader == nil {
		section.reader = NewChunkReaderAtFromClient(group.lookupFn, section.chunkViews, group.chunkCache, min(int64(section.sectionIndex+1)*SectionSize, fileSize))
	}
	section.reader.fileSize = fileSize
}

func (section *FileChunkSection) readDataAt(group *ChunkGroup, fileSize int64, buff []byte, offset int64) (n int, tsNs int64, err error) {
	section.lock.Lock()
	defer section.lock.Unlock()

	section.setupForRead(group, fileSize)

	return section.reader.ReadAtWithTime(buff, offset)
}

func (section *FileChunkSection) DataStartOffset(group *ChunkGroup, offset int64, fileSize int64) int64 {
	section.lock.Lock()
	defer section.lock.Unlock()

	section.setupForRead(group, fileSize)

	for x := section.visibleIntervals.Front(); x != nil; x = x.Next {
		visible := x.Value
		if visible.stop <= offset {
			continue
		}
		if offset < visible.start {
			return offset
		}
		return offset
	}
	return -1
}

func (section *FileChunkSection) NextStopOffset(group *ChunkGroup, offset int64, fileSize int64) int64 {
	section.lock.Lock()
	defer section.lock.Unlock()

	section.setupForRead(group, fileSize)

	isAfterOffset := false
	for x := section.visibleIntervals.Front(); x != nil; x = x.Next {
		visible := x.Value
		if !isAfterOffset {
			if visible.stop <= offset {
				continue
			}
			isAfterOffset = true
		}
		if offset < visible.start {
			return offset
		}
		// now visible.start <= offset
		if offset < visible.stop {
			offset = visible.stop
		}
	}
	return offset
}