aboutsummaryrefslogtreecommitdiff
path: root/weed/filer/filechunk_section.go
blob: 76eb84c23146fbca7385aceaafb8fe51233eb2a4 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
package filer

import (
	"context"
	"sync"

	"github.com/seaweedfs/seaweedfs/weed/pb/filer_pb"
)

const SectionSize = 2 * 1024 * 1024 * 32 // 64MiB
type SectionIndex int64
type FileChunkSection struct {
	sectionIndex     SectionIndex
	chunks           []*filer_pb.FileChunk
	visibleIntervals *IntervalList[*VisibleInterval]
	chunkViews       *IntervalList[*ChunkView]
	reader           *ChunkReadAt
	lock             sync.RWMutex
	isPrepared       bool
}

func NewFileChunkSection(si SectionIndex) *FileChunkSection {
	return &FileChunkSection{
		sectionIndex: si,
	}
}

func (section *FileChunkSection) addChunk(chunk *filer_pb.FileChunk) error {
	section.lock.Lock()
	defer section.lock.Unlock()

	start, stop := max(int64(section.sectionIndex)*SectionSize, chunk.Offset), min(((int64(section.sectionIndex)+1)*SectionSize), chunk.Offset+int64(chunk.Size))

	section.chunks = append(section.chunks, chunk)

	if section.visibleIntervals == nil {
		section.visibleIntervals = readResolvedChunks(section.chunks, int64(section.sectionIndex)*SectionSize, (int64(section.sectionIndex)+1)*SectionSize)
	} else {
		MergeIntoVisibles(section.visibleIntervals, start, stop, chunk)
		garbageFileIds := FindGarbageChunks(section.visibleIntervals, start, stop)
		removeGarbageChunks(section, garbageFileIds)
	}

	if section.chunkViews != nil {
		MergeIntoChunkViews(section.chunkViews, start, stop, chunk)
	}

	return nil
}

func removeGarbageChunks(section *FileChunkSection, garbageFileIds map[string]struct{}) {
	for i := 0; i < len(section.chunks); {
		t := section.chunks[i]
		length := len(section.chunks)
		if _, found := garbageFileIds[t.FileId]; found {
			if i < length-1 {
				section.chunks[i] = section.chunks[length-1]
			}
			section.chunks = section.chunks[:length-1]
		} else {
			i++
		}
	}
}

func (section *FileChunkSection) setupForRead(ctx context.Context, group *ChunkGroup, fileSize int64) {
	section.lock.Lock()
	defer section.lock.Unlock()

	if section.isPrepared {
		section.reader.fileSize = fileSize
		return
	}

	if section.visibleIntervals == nil {
		section.visibleIntervals = readResolvedChunks(section.chunks, int64(section.sectionIndex)*SectionSize, (int64(section.sectionIndex)+1)*SectionSize)
		section.chunks, _ = SeparateGarbageChunks(section.visibleIntervals, section.chunks)
		if section.reader != nil {
			_ = section.reader.Close()
			section.reader = nil
		}
	}
	if section.chunkViews == nil {
		section.chunkViews = ViewFromVisibleIntervals(section.visibleIntervals, int64(section.sectionIndex)*SectionSize, (int64(section.sectionIndex)+1)*SectionSize)
	}

	if section.reader == nil {
		section.reader = NewChunkReaderAtFromClient(ctx, group.readerCache, section.chunkViews, min(int64(section.sectionIndex+1)*SectionSize, fileSize))
	}

	section.isPrepared = true
	section.reader.fileSize = fileSize
}

func (section *FileChunkSection) readDataAt(ctx context.Context, group *ChunkGroup, fileSize int64, buff []byte, offset int64) (n int, tsNs int64, err error) {

	section.setupForRead(ctx, group, fileSize)
	section.lock.RLock()
	defer section.lock.RUnlock()

	return section.reader.ReadAtWithTime(ctx, buff, offset)
}

func (section *FileChunkSection) DataStartOffset(ctx context.Context, group *ChunkGroup, offset int64, fileSize int64) int64 {

	section.setupForRead(ctx, group, fileSize)
	section.lock.RLock()
	defer section.lock.RUnlock()

	for x := section.visibleIntervals.Front(); x != nil; x = x.Next {
		visible := x.Value
		if visible.stop <= offset {
			continue
		}
		if offset < visible.start {
			return offset
		}
		return offset
	}
	return -1
}

func (section *FileChunkSection) NextStopOffset(ctx context.Context, group *ChunkGroup, offset int64, fileSize int64) int64 {

	section.setupForRead(ctx, group, fileSize)
	section.lock.RLock()
	defer section.lock.RUnlock()

	isAfterOffset := false
	for x := section.visibleIntervals.Front(); x != nil; x = x.Next {
		visible := x.Value
		if !isAfterOffset {
			if visible.stop <= offset {
				continue
			}
			isAfterOffset = true
		}
		if offset < visible.start {
			return offset
		}
		// now visible.start <= offset
		if offset < visible.stop {
			offset = visible.stop
		}
	}
	return offset
}