aboutsummaryrefslogtreecommitdiff
path: root/weed/filer/reader_cache.go
blob: 66cbac1e3f3a3988f8c0260cc941739f17a72c4a (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
package filer

import (
	"context"
	"fmt"
	"sync"
	"sync/atomic"
	"time"

	"github.com/seaweedfs/seaweedfs/weed/glog"
	"github.com/seaweedfs/seaweedfs/weed/util/chunk_cache"
	util_http "github.com/seaweedfs/seaweedfs/weed/util/http"
	"github.com/seaweedfs/seaweedfs/weed/util/mem"
	"github.com/seaweedfs/seaweedfs/weed/wdclient"
)

type ReaderCache struct {
	chunkCache     chunk_cache.ChunkCache
	lookupFileIdFn wdclient.LookupFileIdFunctionType
	sync.Mutex
	downloaders map[string]*SingleChunkCacher
	limit       int
}

type SingleChunkCacher struct {
	completedTimeNew int64
	sync.Mutex
	parent         *ReaderCache
	chunkFileId    string
	data           []byte
	err            error
	cipherKey      []byte
	isGzipped      bool
	chunkSize      int
	shouldCache    bool
	wg             sync.WaitGroup
	cacheStartedCh chan struct{}
	done           chan struct{} // signals when download is complete
}

func NewReaderCache(limit int, chunkCache chunk_cache.ChunkCache, lookupFileIdFn wdclient.LookupFileIdFunctionType) *ReaderCache {
	return &ReaderCache{
		limit:          limit,
		chunkCache:     chunkCache,
		lookupFileIdFn: lookupFileIdFn,
		downloaders:    make(map[string]*SingleChunkCacher),
	}
}

// MaybeCache prefetches up to 'count' chunks ahead in parallel.
// This improves read throughput for sequential reads by keeping the
// network pipeline full with parallel chunk fetches.
func (rc *ReaderCache) MaybeCache(chunkViews *Interval[*ChunkView], count int) {
	if rc.lookupFileIdFn == nil {
		return
	}
	if count <= 0 {
		count = 1
	}

	rc.Lock()
	defer rc.Unlock()

	if len(rc.downloaders) >= rc.limit {
		return
	}

	cached := 0
	for x := chunkViews; x != nil && cached < count; x = x.Next {
		chunkView := x.Value
		if _, found := rc.downloaders[chunkView.FileId]; found {
			continue
		}
		if rc.chunkCache.IsInCache(chunkView.FileId, true) {
			glog.V(4).Infof("%s is in cache", chunkView.FileId)
			continue
		}

		if len(rc.downloaders) >= rc.limit {
			// abort when slots are filled
			return
		}

		// glog.V(4).Infof("prefetch %s offset %d", chunkView.FileId, chunkView.ViewOffset)
		// cache this chunk if not yet
		shouldCache := (uint64(chunkView.ViewOffset) + chunkView.ChunkSize) <= rc.chunkCache.GetMaxFilePartSizeInCache()
		cacher := newSingleChunkCacher(rc, chunkView.FileId, chunkView.CipherKey, chunkView.IsGzipped, int(chunkView.ChunkSize), shouldCache)
		go cacher.startCaching()
		<-cacher.cacheStartedCh
		rc.downloaders[chunkView.FileId] = cacher
		cached++
	}

	return
}

func (rc *ReaderCache) ReadChunkAt(ctx context.Context, buffer []byte, fileId string, cipherKey []byte, isGzipped bool, offset int64, chunkSize int, shouldCache bool) (int, error) {
	rc.Lock()

	if cacher, found := rc.downloaders[fileId]; found {
		rc.Unlock()
		n, err := cacher.readChunkAt(ctx, buffer, offset)
		if n > 0 || err != nil {
			return n, err
		}
		// If n=0 and err=nil, the cacher couldn't provide data for this offset.
		// Fall through to try chunkCache.
		rc.Lock()
	}
	if shouldCache || rc.lookupFileIdFn == nil {
		n, err := rc.chunkCache.ReadChunkAt(buffer, fileId, uint64(offset))
		if n > 0 {
			rc.Unlock()
			return n, err
		}
	}

	// clean up old downloaders
	if len(rc.downloaders) >= rc.limit {
		oldestFid, oldestTime := "", time.Now().UnixNano()
		for fid, downloader := range rc.downloaders {
			completedTime := atomic.LoadInt64(&downloader.completedTimeNew)
			if completedTime > 0 && completedTime < oldestTime {
				oldestFid, oldestTime = fid, completedTime
			}
		}
		if oldestFid != "" {
			oldDownloader := rc.downloaders[oldestFid]
			delete(rc.downloaders, oldestFid)
			oldDownloader.destroy()
		}
	}

	// glog.V(4).Infof("cache1 %s", fileId)

	cacher := newSingleChunkCacher(rc, fileId, cipherKey, isGzipped, chunkSize, shouldCache)
	go cacher.startCaching()
	<-cacher.cacheStartedCh
	rc.downloaders[fileId] = cacher
	rc.Unlock()

	return cacher.readChunkAt(ctx, buffer, offset)
}

func (rc *ReaderCache) UnCache(fileId string) {
	rc.Lock()
	defer rc.Unlock()
	// glog.V(4).Infof("uncache %s", fileId)
	if downloader, found := rc.downloaders[fileId]; found {
		downloader.destroy()
		delete(rc.downloaders, fileId)
	}
}

func (rc *ReaderCache) destroy() {
	rc.Lock()
	defer rc.Unlock()

	for _, downloader := range rc.downloaders {
		downloader.destroy()
	}

}

func newSingleChunkCacher(parent *ReaderCache, fileId string, cipherKey []byte, isGzipped bool, chunkSize int, shouldCache bool) *SingleChunkCacher {
	return &SingleChunkCacher{
		parent:         parent,
		chunkFileId:    fileId,
		cipherKey:      cipherKey,
		isGzipped:      isGzipped,
		chunkSize:      chunkSize,
		shouldCache:    shouldCache,
		cacheStartedCh: make(chan struct{}),
		done:           make(chan struct{}),
	}
}

// startCaching downloads the chunk data in the background.
// It does NOT hold the lock during the HTTP download to allow concurrent readers
// to wait efficiently using the done channel.
func (s *SingleChunkCacher) startCaching() {
	s.wg.Add(1)
	defer s.wg.Done()
	defer close(s.done) // guarantee completion signal even on panic

	s.cacheStartedCh <- struct{}{} // signal that we've started

	// Note: We intentionally use context.Background() here, NOT a request-specific context.
	// The downloaded chunk is a shared resource - multiple concurrent readers may be waiting
	// for this same download to complete. If we used a request context and that request was
	// cancelled, it would abort the download and cause errors for all other waiting readers.
	// The download should always complete once started to serve all potential consumers.

	// Lookup file ID without holding the lock
	urlStrings, err := s.parent.lookupFileIdFn(context.Background(), s.chunkFileId)
	if err != nil {
		s.Lock()
		s.err = fmt.Errorf("operation LookupFileId %s failed, err: %v", s.chunkFileId, err)
		s.Unlock()
		return
	}

	// Allocate buffer and download without holding the lock
	// This allows multiple downloads to proceed in parallel
	data := mem.Allocate(s.chunkSize)
	_, fetchErr := util_http.RetriedFetchChunkData(context.Background(), data, urlStrings, s.cipherKey, s.isGzipped, true, 0, s.chunkFileId)

	// Now acquire lock to update state
	s.Lock()
	if fetchErr != nil {
		mem.Free(data)
		s.err = fetchErr
	} else {
		s.data = data
		if s.shouldCache {
			s.parent.chunkCache.SetChunk(s.chunkFileId, s.data)
		}
		atomic.StoreInt64(&s.completedTimeNew, time.Now().UnixNano())
	}
	s.Unlock()
}

func (s *SingleChunkCacher) destroy() {
	// wait for all reads to finish before destroying the data
	s.wg.Wait()
	s.Lock()
	defer s.Unlock()

	if s.data != nil {
		mem.Free(s.data)
		s.data = nil
	}
}

// readChunkAt reads data from the cached chunk.
// It waits for the download to complete if it's still in progress.
// The ctx parameter allows the reader to cancel its wait (but the download continues
// for other readers - see comment in startCaching about shared resource semantics).
func (s *SingleChunkCacher) readChunkAt(ctx context.Context, buf []byte, offset int64) (int, error) {
	s.wg.Add(1)
	defer s.wg.Done()

	// Wait for download to complete, but allow reader cancellation.
	// Prioritize checking done first - if data is already available,
	// return it even if context is also cancelled.
	select {
	case <-s.done:
		// Download already completed, proceed immediately
	default:
		// Download not complete, wait for it or context cancellation
		select {
		case <-s.done:
			// Download completed
		case <-ctx.Done():
			// Reader cancelled while waiting - download continues for other readers
			return 0, ctx.Err()
		}
	}

	s.Lock()
	defer s.Unlock()

	if s.err != nil {
		return 0, s.err
	}

	if len(s.data) <= int(offset) {
		return 0, nil
	}

	return copy(buf, s.data[offset:]), nil
}