1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
|
package wdclient
import (
"context"
"errors"
"fmt"
"math/rand"
"sort"
"strconv"
"strings"
"sync"
"golang.org/x/sync/singleflight"
"github.com/seaweedfs/seaweedfs/weed/glog"
)
// VolumeLocationProvider is the interface for looking up volume locations
// This allows different implementations (master subscription, filer queries, etc.)
type VolumeLocationProvider interface {
// LookupVolumeIds looks up volume locations for the given volume IDs
// Returns a map of volume ID to locations
LookupVolumeIds(ctx context.Context, volumeIds []string) (map[string][]Location, error)
}
// vidMapClient provides volume location caching with pluggable lookup
// It wraps the battle-tested vidMap with customizable volume lookup strategies
type vidMapClient struct {
vidMap *vidMap
vidMapLock sync.RWMutex
vidMapCacheSize int
provider VolumeLocationProvider
vidLookupGroup singleflight.Group
}
const (
// DefaultVidMapCacheSize is the default number of historical vidMap snapshots to keep
// This provides cache history when volumes move between servers
DefaultVidMapCacheSize = 5
)
// newVidMapClient creates a new client with the given provider and data center
func newVidMapClient(provider VolumeLocationProvider, dataCenter string, cacheSize int) *vidMapClient {
if cacheSize <= 0 {
cacheSize = DefaultVidMapCacheSize
}
return &vidMapClient{
vidMap: newVidMap(dataCenter),
vidMapCacheSize: cacheSize,
provider: provider,
}
}
// GetLookupFileIdFunction returns a function that can be used to lookup file IDs
func (vc *vidMapClient) GetLookupFileIdFunction() LookupFileIdFunctionType {
return vc.LookupFileIdWithFallback
}
// LookupFileIdWithFallback looks up a file ID, checking cache first, then using provider
func (vc *vidMapClient) LookupFileIdWithFallback(ctx context.Context, fileId string) (fullUrls []string, err error) {
// Try cache first - hold read lock during entire vidMap access to prevent swap during operation
vc.vidMapLock.RLock()
vm := vc.vidMap
dataCenter := vm.DataCenter
fullUrls, err = vm.LookupFileId(ctx, fileId)
vc.vidMapLock.RUnlock()
// Cache hit - return immediately
if err == nil && len(fullUrls) > 0 {
return
}
// Cache miss - extract volume ID from file ID (format: "volumeId,needle_id_cookie")
parts := strings.Split(fileId, ",")
if len(parts) != 2 {
return nil, fmt.Errorf("invalid fileId %s", fileId)
}
volumeId := parts[0]
// Use shared lookup logic with batching and singleflight
vidLocations, err := vc.LookupVolumeIdsWithFallback(ctx, []string{volumeId})
// Check for partial results first (important for multi-volume batched lookups)
locations, found := vidLocations[volumeId]
if !found || len(locations) == 0 {
// Volume not found - return specific error with context from lookup if available
if err != nil {
return nil, fmt.Errorf("volume %s not found for fileId %s: %w", volumeId, fileId, err)
}
return nil, fmt.Errorf("volume %s not found for fileId %s", volumeId, fileId)
}
// Volume found successfully - ignore any errors about other volumes
// (not relevant for single-volume lookup, but defensive for future batching)
// Build HTTP URLs from locations, preferring same data center
var sameDcUrls, otherDcUrls []string
for _, loc := range locations {
httpUrl := "http://" + loc.Url + "/" + fileId
if dataCenter != "" && dataCenter == loc.DataCenter {
sameDcUrls = append(sameDcUrls, httpUrl)
} else {
otherDcUrls = append(otherDcUrls, httpUrl)
}
}
// Shuffle to distribute load across volume servers
rand.Shuffle(len(sameDcUrls), func(i, j int) { sameDcUrls[i], sameDcUrls[j] = sameDcUrls[j], sameDcUrls[i] })
rand.Shuffle(len(otherDcUrls), func(i, j int) { otherDcUrls[i], otherDcUrls[j] = otherDcUrls[j], otherDcUrls[i] })
// Prefer same data center
fullUrls = append(sameDcUrls, otherDcUrls...)
return fullUrls, nil
}
// LookupVolumeIdsWithFallback looks up volume locations, querying provider if not in cache.
// Uses singleflight to coalesce concurrent requests for the same batch of volumes.
//
// IMPORTANT: This function may return PARTIAL results with a non-nil error.
// The result map contains successfully looked up volumes, while the error aggregates
// failures for volumes that couldn't be found or had lookup errors.
//
// Callers MUST check both the result map AND the error:
// - result != nil && err == nil: All volumes found successfully
// - result != nil && err != nil: Some volumes found, some failed (check both)
// - result == nil && err != nil: Complete failure (connection error, etc.)
//
// Example usage:
//
// locs, err := mc.LookupVolumeIdsWithFallback(ctx, []string{"1", "2", "999"})
// if len(locs) > 0 {
// // Process successfully found volumes
// }
// if err != nil {
// // Log/handle failed volumes
// }
func (vc *vidMapClient) LookupVolumeIdsWithFallback(ctx context.Context, volumeIds []string) (map[string][]Location, error) {
result := make(map[string][]Location)
var needsLookup []string
var lookupErrors []error
// Check cache first and parse volume IDs once
vidStringToUint := make(map[string]uint32, len(volumeIds))
// Get stable pointer to vidMap with minimal lock hold time
vm := vc.getStableVidMap()
for _, vidString := range volumeIds {
vid, err := strconv.ParseUint(vidString, 10, 32)
if err != nil {
return nil, fmt.Errorf("invalid volume id %s: %v", vidString, err)
}
vidStringToUint[vidString] = uint32(vid)
locations, found := vm.GetLocations(uint32(vid))
if found && len(locations) > 0 {
result[vidString] = locations
} else {
needsLookup = append(needsLookup, vidString)
}
}
if len(needsLookup) == 0 {
return result, nil
}
// Batch query all missing volumes using singleflight on the batch key
// Sort for stable key to coalesce identical batches
sort.Strings(needsLookup)
batchKey := strings.Join(needsLookup, ",")
sfResult, err, _ := vc.vidLookupGroup.Do(batchKey, func() (interface{}, error) {
// Double-check cache for volumes that might have been populated while waiting
stillNeedLookup := make([]string, 0, len(needsLookup))
batchResult := make(map[string][]Location)
// Get stable pointer with minimal lock hold time
vm := vc.getStableVidMap()
for _, vidString := range needsLookup {
vid := vidStringToUint[vidString] // Use pre-parsed value
if locations, found := vm.GetLocations(vid); found && len(locations) > 0 {
batchResult[vidString] = locations
} else {
stillNeedLookup = append(stillNeedLookup, vidString)
}
}
if len(stillNeedLookup) == 0 {
return batchResult, nil
}
// Query provider with batched volume IDs
glog.V(2).Infof("Looking up %d volumes from provider: %v", len(stillNeedLookup), stillNeedLookup)
providerResults, err := vc.provider.LookupVolumeIds(ctx, stillNeedLookup)
if err != nil {
return batchResult, fmt.Errorf("provider lookup failed: %v", err)
}
// Update cache with results
for vidString, locations := range providerResults {
vid, err := strconv.ParseUint(vidString, 10, 32)
if err != nil {
glog.Warningf("Failed to parse volume id '%s': %v", vidString, err)
continue
}
for _, loc := range locations {
vc.addLocation(uint32(vid), loc)
}
if len(locations) > 0 {
batchResult[vidString] = locations
}
}
return batchResult, nil
})
if err != nil {
lookupErrors = append(lookupErrors, err)
}
// Merge singleflight batch results
if batchLocations, ok := sfResult.(map[string][]Location); ok {
for vid, locs := range batchLocations {
result[vid] = locs
}
}
// Check for volumes that still weren't found
for _, vidString := range needsLookup {
if _, found := result[vidString]; !found {
lookupErrors = append(lookupErrors, fmt.Errorf("volume %s not found", vidString))
}
}
// Return aggregated errors
return result, errors.Join(lookupErrors...)
}
// getStableVidMap gets a stable pointer to the vidMap, releasing the lock immediately.
// WARNING: Use with caution. The returned vidMap pointer is stable (won't be garbage collected
// due to cache chain), but the vidMapClient.vidMap field may be swapped by resetVidMap().
// For operations that must use the current vidMap atomically, use withCurrentVidMap() instead.
func (vc *vidMapClient) getStableVidMap() *vidMap {
vc.vidMapLock.RLock()
vm := vc.vidMap
vc.vidMapLock.RUnlock()
return vm
}
// withCurrentVidMap executes a function with the current vidMap under a read lock.
// This guarantees the vidMap instance cannot be swapped during the function execution.
// Use this when you need atomic access to the current vidMap for multiple operations.
func (vc *vidMapClient) withCurrentVidMap(f func(vm *vidMap)) {
vc.vidMapLock.RLock()
defer vc.vidMapLock.RUnlock()
f(vc.vidMap)
}
// Public methods for external access
// GetLocations safely retrieves volume locations
func (vc *vidMapClient) GetLocations(vid uint32) (locations []Location, found bool) {
return vc.getStableVidMap().GetLocations(vid)
}
// GetLocationsClone safely retrieves a clone of volume locations
func (vc *vidMapClient) GetLocationsClone(vid uint32) (locations []Location, found bool) {
return vc.getStableVidMap().GetLocationsClone(vid)
}
// GetVidLocations safely retrieves volume locations by string ID
func (vc *vidMapClient) GetVidLocations(vid string) (locations []Location, err error) {
return vc.getStableVidMap().GetVidLocations(vid)
}
// LookupFileId safely looks up URLs for a file ID
func (vc *vidMapClient) LookupFileId(ctx context.Context, fileId string) (fullUrls []string, err error) {
return vc.getStableVidMap().LookupFileId(ctx, fileId)
}
// LookupVolumeServerUrl safely looks up volume server URLs
func (vc *vidMapClient) LookupVolumeServerUrl(vid string) (serverUrls []string, err error) {
return vc.getStableVidMap().LookupVolumeServerUrl(vid)
}
// GetDataCenter safely retrieves the data center
func (vc *vidMapClient) GetDataCenter() string {
return vc.getStableVidMap().DataCenter
}
// Thread-safe helpers for vidMap operations
// addLocation adds a volume location
func (vc *vidMapClient) addLocation(vid uint32, location Location) {
vc.withCurrentVidMap(func(vm *vidMap) {
vm.addLocation(vid, location)
})
}
// deleteLocation removes a volume location
func (vc *vidMapClient) deleteLocation(vid uint32, location Location) {
vc.withCurrentVidMap(func(vm *vidMap) {
vm.deleteLocation(vid, location)
})
}
// addEcLocation adds an EC volume location
func (vc *vidMapClient) addEcLocation(vid uint32, location Location) {
vc.withCurrentVidMap(func(vm *vidMap) {
vm.addEcLocation(vid, location)
})
}
// deleteEcLocation removes an EC volume location
func (vc *vidMapClient) deleteEcLocation(vid uint32, location Location) {
vc.withCurrentVidMap(func(vm *vidMap) {
vm.deleteEcLocation(vid, location)
})
}
// resetVidMap resets the volume ID map
func (vc *vidMapClient) resetVidMap() {
vc.vidMapLock.Lock()
defer vc.vidMapLock.Unlock()
// Preserve the existing vidMap in the cache chain
tail := vc.vidMap
nvm := newVidMap(tail.DataCenter)
nvm.cache.Store(tail)
vc.vidMap = nvm
// Trim cache chain to vidMapCacheSize
node := tail
for i := 0; i < vc.vidMapCacheSize-1; i++ {
if node.cache.Load() == nil {
return
}
node = node.cache.Load()
}
// node is guaranteed to be non-nil after the loop
node.cache.Store(nil)
}
|