aboutsummaryrefslogtreecommitdiff
path: root/weed/wdclient
diff options
context:
space:
mode:
Diffstat (limited to 'weed/wdclient')
-rw-r--r--weed/wdclient/filer_client.go404
-rw-r--r--weed/wdclient/masterclient.go546
-rw-r--r--weed/wdclient/vidmap_client.go347
3 files changed, 929 insertions, 368 deletions
diff --git a/weed/wdclient/filer_client.go b/weed/wdclient/filer_client.go
new file mode 100644
index 000000000..f0dd5f2e6
--- /dev/null
+++ b/weed/wdclient/filer_client.go
@@ -0,0 +1,404 @@
+package wdclient
+
+import (
+ "context"
+ "fmt"
+ "math/rand"
+ "strings"
+ "sync/atomic"
+ "time"
+
+ "google.golang.org/grpc"
+ "google.golang.org/grpc/codes"
+ "google.golang.org/grpc/status"
+
+ "github.com/seaweedfs/seaweedfs/weed/glog"
+ "github.com/seaweedfs/seaweedfs/weed/pb"
+ "github.com/seaweedfs/seaweedfs/weed/pb/filer_pb"
+)
+
+// UrlPreference controls which URL to use for volume access
+type UrlPreference string
+
+const (
+ PreferUrl UrlPreference = "url" // Use private URL (default)
+ PreferPublicUrl UrlPreference = "publicUrl" // Use public URL
+)
+
+// filerHealth tracks the health status of a filer
+type filerHealth struct {
+ failureCount int32 // atomic: consecutive failures
+ lastFailureTimeNs int64 // atomic: last failure time in Unix nanoseconds
+}
+
+// FilerClient provides volume location services by querying a filer
+// It uses the shared vidMap cache for efficient lookups
+// Supports multiple filer addresses with automatic failover for high availability
+// Tracks filer health to avoid repeatedly trying known-unhealthy filers
+type FilerClient struct {
+ *vidMapClient
+ filerAddresses []pb.ServerAddress
+ filerIndex int32 // atomic: current filer index for round-robin
+ filerHealth []*filerHealth // health status per filer (same order as filerAddresses)
+ grpcDialOption grpc.DialOption
+ urlPreference UrlPreference
+ grpcTimeout time.Duration
+ cacheSize int // Number of historical vidMap snapshots to keep
+ clientId int32 // Unique client identifier for gRPC metadata
+ failureThreshold int32 // Circuit breaker: consecutive failures before circuit opens
+ resetTimeout time.Duration // Circuit breaker: time before re-checking unhealthy filer
+ maxRetries int // Retry: maximum retry attempts for transient failures
+ initialRetryWait time.Duration // Retry: initial wait time before first retry
+ retryBackoffFactor float64 // Retry: backoff multiplier for wait time
+}
+
+// filerVolumeProvider implements VolumeLocationProvider by querying filer
+// Supports multiple filer addresses with automatic failover
+type filerVolumeProvider struct {
+ filerClient *FilerClient
+}
+
+// FilerClientOption holds optional configuration for FilerClient
+type FilerClientOption struct {
+ GrpcTimeout time.Duration
+ UrlPreference UrlPreference
+ CacheSize int // Number of historical vidMap snapshots (0 = use default)
+ FailureThreshold int32 // Circuit breaker: consecutive failures before skipping filer (0 = use default of 3)
+ ResetTimeout time.Duration // Circuit breaker: time before re-checking unhealthy filer (0 = use default of 30s)
+ MaxRetries int // Retry: maximum retry attempts for transient failures (0 = use default of 3)
+ InitialRetryWait time.Duration // Retry: initial wait time before first retry (0 = use default of 1s)
+ RetryBackoffFactor float64 // Retry: backoff multiplier for wait time (0 = use default of 1.5)
+}
+
+// NewFilerClient creates a new client that queries filer(s) for volume locations
+// Supports multiple filer addresses for high availability with automatic failover
+// Uses sensible defaults: 5-second gRPC timeout, PreferUrl, DefaultVidMapCacheSize
+func NewFilerClient(filerAddresses []pb.ServerAddress, grpcDialOption grpc.DialOption, dataCenter string, opts ...*FilerClientOption) *FilerClient {
+ if len(filerAddresses) == 0 {
+ glog.Fatal("NewFilerClient requires at least one filer address")
+ }
+
+ // Apply defaults
+ grpcTimeout := 5 * time.Second
+ urlPref := PreferUrl
+ cacheSize := DefaultVidMapCacheSize
+ failureThreshold := int32(3) // Default: 3 consecutive failures before circuit opens
+ resetTimeout := 30 * time.Second // Default: 30 seconds before re-checking unhealthy filer
+ maxRetries := 3 // Default: 3 retry attempts for transient failures
+ initialRetryWait := time.Second // Default: 1 second initial retry wait
+ retryBackoffFactor := 1.5 // Default: 1.5x backoff multiplier
+
+ // Override with provided options
+ if len(opts) > 0 && opts[0] != nil {
+ opt := opts[0]
+ if opt.GrpcTimeout > 0 {
+ grpcTimeout = opt.GrpcTimeout
+ }
+ if opt.UrlPreference != "" {
+ urlPref = opt.UrlPreference
+ }
+ if opt.CacheSize > 0 {
+ cacheSize = opt.CacheSize
+ }
+ if opt.FailureThreshold > 0 {
+ failureThreshold = opt.FailureThreshold
+ }
+ if opt.ResetTimeout > 0 {
+ resetTimeout = opt.ResetTimeout
+ }
+ if opt.MaxRetries > 0 {
+ maxRetries = opt.MaxRetries
+ }
+ if opt.InitialRetryWait > 0 {
+ initialRetryWait = opt.InitialRetryWait
+ }
+ if opt.RetryBackoffFactor > 0 {
+ retryBackoffFactor = opt.RetryBackoffFactor
+ }
+ }
+
+ // Initialize health tracking for each filer
+ health := make([]*filerHealth, len(filerAddresses))
+ for i := range health {
+ health[i] = &filerHealth{}
+ }
+
+ fc := &FilerClient{
+ filerAddresses: filerAddresses,
+ filerIndex: 0,
+ filerHealth: health,
+ grpcDialOption: grpcDialOption,
+ urlPreference: urlPref,
+ grpcTimeout: grpcTimeout,
+ cacheSize: cacheSize,
+ clientId: rand.Int31(), // Random client ID for gRPC metadata tracking
+ failureThreshold: failureThreshold,
+ resetTimeout: resetTimeout,
+ maxRetries: maxRetries,
+ initialRetryWait: initialRetryWait,
+ retryBackoffFactor: retryBackoffFactor,
+ }
+
+ // Create provider that references this FilerClient for failover support
+ provider := &filerVolumeProvider{
+ filerClient: fc,
+ }
+
+ fc.vidMapClient = newVidMapClient(provider, dataCenter, cacheSize)
+
+ return fc
+}
+
+// GetLookupFileIdFunction returns a lookup function with URL preference handling
+func (fc *FilerClient) GetLookupFileIdFunction() LookupFileIdFunctionType {
+ if fc.urlPreference == PreferUrl {
+ // Use the default implementation from vidMapClient
+ return fc.vidMapClient.GetLookupFileIdFunction()
+ }
+
+ // Custom implementation that prefers PublicUrl
+ return func(ctx context.Context, fileId string) (fullUrls []string, err error) {
+ // Parse file ID to extract volume ID
+ parts := strings.Split(fileId, ",")
+ if len(parts) != 2 {
+ return nil, fmt.Errorf("invalid fileId format: %s", fileId)
+ }
+ volumeIdStr := parts[0]
+
+ // First try the cache using LookupVolumeIdsWithFallback
+ vidLocations, err := fc.LookupVolumeIdsWithFallback(ctx, []string{volumeIdStr})
+
+ // Check for partial results first (important for multi-volume batched lookups)
+ locations, found := vidLocations[volumeIdStr]
+ if !found || len(locations) == 0 {
+ // Volume not found - return specific error with context from lookup if available
+ if err != nil {
+ return nil, fmt.Errorf("volume %s not found for fileId %s: %w", volumeIdStr, fileId, err)
+ }
+ return nil, fmt.Errorf("volume %s not found for fileId %s", volumeIdStr, fileId)
+ }
+
+ // Volume found successfully - ignore any errors about other volumes
+ // (not relevant for single-volume lookup, but defensive for future batching)
+
+ // Build URLs with publicUrl preference, and also prefer same DC
+ var sameDcUrls, otherDcUrls []string
+ dataCenter := fc.GetDataCenter()
+ for _, loc := range locations {
+ url := loc.PublicUrl
+ if url == "" {
+ url = loc.Url
+ }
+ httpUrl := "http://" + url + "/" + fileId
+ if dataCenter != "" && dataCenter == loc.DataCenter {
+ sameDcUrls = append(sameDcUrls, httpUrl)
+ } else {
+ otherDcUrls = append(otherDcUrls, httpUrl)
+ }
+ }
+ // Shuffle to distribute load across volume servers
+ rand.Shuffle(len(sameDcUrls), func(i, j int) { sameDcUrls[i], sameDcUrls[j] = sameDcUrls[j], sameDcUrls[i] })
+ rand.Shuffle(len(otherDcUrls), func(i, j int) { otherDcUrls[i], otherDcUrls[j] = otherDcUrls[j], otherDcUrls[i] })
+ // Prefer same data center
+ fullUrls = append(sameDcUrls, otherDcUrls...)
+ return fullUrls, nil
+ }
+}
+
+// isRetryableGrpcError checks if a gRPC error is transient and should be retried
+//
+// Note on codes.Aborted: While Aborted can indicate application-level conflicts
+// (e.g., transaction failures), in the context of volume location lookups (which
+// are simple read-only operations with no transactions), Aborted is more likely
+// to indicate transient server issues during restart/recovery. We include it here
+// for volume lookups but log it for visibility in case misclassification occurs.
+func isRetryableGrpcError(err error) bool {
+ if err == nil {
+ return false
+ }
+
+ // Check gRPC status code
+ st, ok := status.FromError(err)
+ if ok {
+ switch st.Code() {
+ case codes.Unavailable: // Server unavailable (temporary)
+ return true
+ case codes.DeadlineExceeded: // Request timeout
+ return true
+ case codes.ResourceExhausted: // Rate limited or overloaded
+ return true
+ case codes.Aborted:
+ // Aborted during read-only volume lookups is likely transient
+ // (e.g., filer restarting), but log for visibility
+ glog.V(1).Infof("Treating Aborted as retryable for volume lookup: %v", err)
+ return true
+ }
+ }
+
+ // Fallback to string matching for non-gRPC errors (e.g., network errors)
+ errStr := err.Error()
+ return strings.Contains(errStr, "transport") ||
+ strings.Contains(errStr, "connection") ||
+ strings.Contains(errStr, "timeout") ||
+ strings.Contains(errStr, "unavailable")
+}
+
+// shouldSkipUnhealthyFiler checks if we should skip a filer based on recent failures
+// Circuit breaker pattern: skip filers with multiple recent consecutive failures
+func (fc *FilerClient) shouldSkipUnhealthyFiler(index int32) bool {
+ health := fc.filerHealth[index]
+ failureCount := atomic.LoadInt32(&health.failureCount)
+
+ // Check if failure count exceeds threshold
+ if failureCount < fc.failureThreshold {
+ return false
+ }
+
+ // Re-check unhealthy filers after reset timeout
+ lastFailureNs := atomic.LoadInt64(&health.lastFailureTimeNs)
+ if lastFailureNs == 0 {
+ return false // Never failed, shouldn't skip
+ }
+ lastFailureTime := time.Unix(0, lastFailureNs)
+ if time.Since(lastFailureTime) > fc.resetTimeout {
+ return false // Time to re-check
+ }
+
+ return true // Skip this unhealthy filer
+}
+
+// recordFilerSuccess resets failure tracking for a successful filer
+func (fc *FilerClient) recordFilerSuccess(index int32) {
+ health := fc.filerHealth[index]
+ atomic.StoreInt32(&health.failureCount, 0)
+}
+
+// recordFilerFailure increments failure count for an unhealthy filer
+func (fc *FilerClient) recordFilerFailure(index int32) {
+ health := fc.filerHealth[index]
+ atomic.AddInt32(&health.failureCount, 1)
+ atomic.StoreInt64(&health.lastFailureTimeNs, time.Now().UnixNano())
+}
+
+// LookupVolumeIds queries the filer for volume locations with automatic failover
+// Tries all configured filer addresses until one succeeds (high availability)
+// Retries transient gRPC errors (Unavailable, DeadlineExceeded, etc.) with exponential backoff
+// Note: Unlike master's VolumeIdLocation, filer's Locations message doesn't currently have
+// an Error field. This implementation handles the current structure while being prepared
+// for future error reporting enhancements.
+func (p *filerVolumeProvider) LookupVolumeIds(ctx context.Context, volumeIds []string) (map[string][]Location, error) {
+ fc := p.filerClient
+ result := make(map[string][]Location)
+
+ // Retry transient failures with configurable backoff
+ var lastErr error
+ waitTime := fc.initialRetryWait
+ maxRetries := fc.maxRetries
+
+ for retry := 0; retry < maxRetries; retry++ {
+ // Try all filer addresses with round-robin starting from current index
+ // Skip known-unhealthy filers (circuit breaker pattern)
+ i := atomic.LoadInt32(&fc.filerIndex)
+ n := int32(len(fc.filerAddresses))
+
+ for x := int32(0); x < n; x++ {
+ // Circuit breaker: skip unhealthy filers
+ if fc.shouldSkipUnhealthyFiler(i) {
+ glog.V(2).Infof("FilerClient: skipping unhealthy filer %s (consecutive failures: %d)",
+ fc.filerAddresses[i], atomic.LoadInt32(&fc.filerHealth[i].failureCount))
+ i++
+ if i >= n {
+ i = 0
+ }
+ continue
+ }
+
+ filerAddress := fc.filerAddresses[i]
+
+ // Use anonymous function to ensure defer cancel() is called per iteration, not accumulated
+ err := func() error {
+ // Create a fresh timeout context for each filer attempt
+ // This ensures each retry gets the full grpcTimeout, not a diminishing deadline
+ timeoutCtx, cancel := context.WithTimeout(ctx, fc.grpcTimeout)
+ defer cancel() // Always clean up context, even on panic or early return
+
+ return pb.WithGrpcFilerClient(false, fc.clientId, filerAddress, fc.grpcDialOption, func(client filer_pb.SeaweedFilerClient) error {
+ resp, err := client.LookupVolume(timeoutCtx, &filer_pb.LookupVolumeRequest{
+ VolumeIds: volumeIds,
+ })
+ if err != nil {
+ return fmt.Errorf("filer.LookupVolume failed: %w", err)
+ }
+
+ // Process each volume in the response
+ for vid, locs := range resp.LocationsMap {
+ // Convert locations from protobuf to internal format
+ var locations []Location
+ for _, loc := range locs.Locations {
+ locations = append(locations, Location{
+ Url: loc.Url,
+ PublicUrl: loc.PublicUrl,
+ DataCenter: loc.DataCenter,
+ GrpcPort: int(loc.GrpcPort),
+ })
+ }
+
+ // Only add to result if we have locations
+ // Empty locations with no gRPC error means "not found" (volume doesn't exist)
+ if len(locations) > 0 {
+ result[vid] = locations
+ glog.V(4).Infof("FilerClient: volume %s found with %d location(s)", vid, len(locations))
+ } else {
+ glog.V(2).Infof("FilerClient: volume %s not found (no locations in response)", vid)
+ }
+ }
+
+ // Check for volumes that weren't in the response at all
+ // This could indicate a problem with the filer
+ for _, vid := range volumeIds {
+ if _, found := resp.LocationsMap[vid]; !found {
+ glog.V(1).Infof("FilerClient: volume %s missing from filer response", vid)
+ }
+ }
+
+ return nil
+ })
+ }()
+
+ if err != nil {
+ glog.V(1).Infof("FilerClient: filer %s lookup failed (attempt %d/%d, retry %d/%d): %v", filerAddress, x+1, n, retry+1, maxRetries, err)
+ fc.recordFilerFailure(i)
+ lastErr = err
+ i++
+ if i >= n {
+ i = 0
+ }
+ continue
+ }
+
+ // Success - update the preferred filer index and reset health tracking
+ atomic.StoreInt32(&fc.filerIndex, i)
+ fc.recordFilerSuccess(i)
+ glog.V(3).Infof("FilerClient: looked up %d volumes on %s, found %d", len(volumeIds), filerAddress, len(result))
+ return result, nil
+ }
+
+ // All filers failed on this attempt
+ // Check if the error is retryable (transient gRPC error)
+ if !isRetryableGrpcError(lastErr) {
+ // Non-retryable error (e.g., NotFound, PermissionDenied) - fail immediately
+ return nil, fmt.Errorf("all %d filer(s) failed with non-retryable error: %w", n, lastErr)
+ }
+
+ // Transient error - retry if we have attempts left
+ if retry < maxRetries-1 {
+ glog.V(1).Infof("FilerClient: all %d filer(s) failed with retryable error (attempt %d/%d), retrying in %v: %v",
+ n, retry+1, maxRetries, waitTime, lastErr)
+ time.Sleep(waitTime)
+ waitTime = time.Duration(float64(waitTime) * fc.retryBackoffFactor)
+ }
+ }
+
+ // All retries exhausted
+ return nil, fmt.Errorf("all %d filer(s) failed after %d attempts, last error: %w", len(fc.filerAddresses), maxRetries, lastErr)
+}
diff --git a/weed/wdclient/masterclient.go b/weed/wdclient/masterclient.go
index 320156294..89218a8c7 100644
--- a/weed/wdclient/masterclient.go
+++ b/weed/wdclient/masterclient.go
@@ -5,328 +5,143 @@ import (
"errors"
"fmt"
"math/rand"
- "sort"
"strconv"
"strings"
"sync"
"time"
- "golang.org/x/sync/singleflight"
-
- "github.com/seaweedfs/seaweedfs/weed/util/version"
-
- "github.com/seaweedfs/seaweedfs/weed/stats"
-
- "github.com/seaweedfs/seaweedfs/weed/util"
"google.golang.org/grpc"
"github.com/seaweedfs/seaweedfs/weed/glog"
"github.com/seaweedfs/seaweedfs/weed/pb"
"github.com/seaweedfs/seaweedfs/weed/pb/master_pb"
+ "github.com/seaweedfs/seaweedfs/weed/stats"
+ "github.com/seaweedfs/seaweedfs/weed/util"
+ "github.com/seaweedfs/seaweedfs/weed/util/version"
)
-type MasterClient struct {
- FilerGroup string
- clientType string
- clientHost pb.ServerAddress
- rack string
- currentMaster pb.ServerAddress
- currentMasterLock sync.RWMutex
- masters pb.ServerDiscovery
- grpcDialOption grpc.DialOption
-
- // vidMap stores volume location mappings
- // Protected by vidMapLock to prevent race conditions during pointer swaps in resetVidMap
- vidMap *vidMap
- vidMapLock sync.RWMutex
- vidMapCacheSize int
- OnPeerUpdate func(update *master_pb.ClusterNodeUpdate, startFrom time.Time)
- OnPeerUpdateLock sync.RWMutex
-
- // Per-batch in-flight tracking to prevent duplicate lookups for the same set of volumes
- vidLookupGroup singleflight.Group
-}
-
-func NewMasterClient(grpcDialOption grpc.DialOption, filerGroup string, clientType string, clientHost pb.ServerAddress, clientDataCenter string, rack string, masters pb.ServerDiscovery) *MasterClient {
- return &MasterClient{
- FilerGroup: filerGroup,
- clientType: clientType,
- clientHost: clientHost,
- rack: rack,
- masters: masters,
- grpcDialOption: grpcDialOption,
- vidMap: newVidMap(clientDataCenter),
- vidMapCacheSize: 5,
- }
-}
-
-func (mc *MasterClient) SetOnPeerUpdateFn(onPeerUpdate func(update *master_pb.ClusterNodeUpdate, startFrom time.Time)) {
- mc.OnPeerUpdateLock.Lock()
- mc.OnPeerUpdate = onPeerUpdate
- mc.OnPeerUpdateLock.Unlock()
-}
-
-func (mc *MasterClient) GetLookupFileIdFunction() LookupFileIdFunctionType {
- return mc.LookupFileIdWithFallback
+// masterVolumeProvider implements VolumeLocationProvider by querying master
+// This is rarely called since master pushes updates proactively via KeepConnected stream
+type masterVolumeProvider struct {
+ masterClient *MasterClient
}
-func (mc *MasterClient) LookupFileIdWithFallback(ctx context.Context, fileId string) (fullUrls []string, err error) {
- // Try cache first using the fast path - grab both vidMap and dataCenter in one lock
- mc.vidMapLock.RLock()
- vm := mc.vidMap
- dataCenter := vm.DataCenter
- mc.vidMapLock.RUnlock()
-
- fullUrls, err = vm.LookupFileId(ctx, fileId)
- if err == nil && len(fullUrls) > 0 {
- return
- }
-
- // Extract volume ID from file ID (format: "volumeId,needle_id_cookie")
- parts := strings.Split(fileId, ",")
- if len(parts) != 2 {
- return nil, fmt.Errorf("invalid fileId %s", fileId)
- }
- volumeId := parts[0]
-
- // Use shared lookup logic with batching and singleflight
- vidLocations, err := mc.LookupVolumeIdsWithFallback(ctx, []string{volumeId})
- if err != nil {
- return nil, fmt.Errorf("LookupVolume %s failed: %v", fileId, err)
- }
-
- locations, found := vidLocations[volumeId]
- if !found || len(locations) == 0 {
- return nil, fmt.Errorf("volume %s not found for fileId %s", volumeId, fileId)
- }
-
- // Build HTTP URLs from locations, preferring same data center
- var sameDcUrls, otherDcUrls []string
- for _, loc := range locations {
- httpUrl := "http://" + loc.Url + "/" + fileId
- if dataCenter != "" && dataCenter == loc.DataCenter {
- sameDcUrls = append(sameDcUrls, httpUrl)
- } else {
- otherDcUrls = append(otherDcUrls, httpUrl)
- }
- }
-
- // Prefer same data center
- fullUrls = append(sameDcUrls, otherDcUrls...)
- return fullUrls, nil
-}
-
-// LookupVolumeIdsWithFallback looks up volume locations, querying master if not in cache
-// Uses singleflight to coalesce concurrent requests for the same batch of volumes
-func (mc *MasterClient) LookupVolumeIdsWithFallback(ctx context.Context, volumeIds []string) (map[string][]Location, error) {
+// LookupVolumeIds queries the master for volume locations (fallback when cache misses)
+// Returns partial results with aggregated errors for volumes that failed
+func (p *masterVolumeProvider) LookupVolumeIds(ctx context.Context, volumeIds []string) (map[string][]Location, error) {
result := make(map[string][]Location)
- var needsLookup []string
var lookupErrors []error
- // Check cache first and parse volume IDs once
- vidStringToUint := make(map[string]uint32, len(volumeIds))
+ glog.V(2).Infof("Looking up %d volumes from master: %v", len(volumeIds), volumeIds)
- // Get stable pointer to vidMap with minimal lock hold time
- vm := mc.getStableVidMap()
+ // Use a timeout for the master lookup to prevent indefinite blocking
+ timeoutCtx, cancel := context.WithTimeout(ctx, p.masterClient.grpcTimeout)
+ defer cancel()
- for _, vidString := range volumeIds {
- vid, err := strconv.ParseUint(vidString, 10, 32)
+ err := pb.WithMasterClient(false, p.masterClient.GetMaster(ctx), p.masterClient.grpcDialOption, false, func(client master_pb.SeaweedClient) error {
+ resp, err := client.LookupVolume(timeoutCtx, &master_pb.LookupVolumeRequest{
+ VolumeOrFileIds: volumeIds,
+ })
if err != nil {
- return nil, fmt.Errorf("invalid volume id %s: %v", vidString, err)
- }
- vidStringToUint[vidString] = uint32(vid)
-
- locations, found := vm.GetLocations(uint32(vid))
- if found && len(locations) > 0 {
- result[vidString] = locations
- } else {
- needsLookup = append(needsLookup, vidString)
+ return fmt.Errorf("master lookup failed: %v", err)
}
- }
-
- if len(needsLookup) == 0 {
- return result, nil
- }
- // Batch query all missing volumes using singleflight on the batch key
- // Sort for stable key to coalesce identical batches
- sort.Strings(needsLookup)
- batchKey := strings.Join(needsLookup, ",")
-
- sfResult, err, _ := mc.vidLookupGroup.Do(batchKey, func() (interface{}, error) {
- // Double-check cache for volumes that might have been populated while waiting
- stillNeedLookup := make([]string, 0, len(needsLookup))
- batchResult := make(map[string][]Location)
-
- // Get stable pointer with minimal lock hold time
- vm := mc.getStableVidMap()
-
- for _, vidString := range needsLookup {
- vid := vidStringToUint[vidString] // Use pre-parsed value
- if locations, found := vm.GetLocations(vid); found && len(locations) > 0 {
- batchResult[vidString] = locations
- } else {
- stillNeedLookup = append(stillNeedLookup, vidString)
+ for _, vidLoc := range resp.VolumeIdLocations {
+ // Preserve per-volume errors from master response
+ // These could indicate misconfiguration, volume deletion, etc.
+ if vidLoc.Error != "" {
+ lookupErrors = append(lookupErrors, fmt.Errorf("volume %s: %s", vidLoc.VolumeOrFileId, vidLoc.Error))
+ glog.V(1).Infof("volume %s lookup error from master: %s", vidLoc.VolumeOrFileId, vidLoc.Error)
+ continue
}
- }
-
- if len(stillNeedLookup) == 0 {
- return batchResult, nil
- }
-
- // Query master with batched volume IDs
- glog.V(2).Infof("Looking up %d volumes from master: %v", len(stillNeedLookup), stillNeedLookup)
- err := pb.WithMasterClient(false, mc.GetMaster(ctx), mc.grpcDialOption, false, func(client master_pb.SeaweedClient) error {
- resp, err := client.LookupVolume(ctx, &master_pb.LookupVolumeRequest{
- VolumeOrFileIds: stillNeedLookup,
- })
+ // Parse volume ID from response
+ parts := strings.Split(vidLoc.VolumeOrFileId, ",")
+ vidOnly := parts[0]
+ vid, err := strconv.ParseUint(vidOnly, 10, 32)
if err != nil {
- return fmt.Errorf("master lookup failed: %v", err)
+ lookupErrors = append(lookupErrors, fmt.Errorf("volume %s: invalid volume ID format: %w", vidLoc.VolumeOrFileId, err))
+ glog.Warningf("Failed to parse volume id '%s' from master response '%s': %v", vidOnly, vidLoc.VolumeOrFileId, err)
+ continue
}
- for _, vidLoc := range resp.VolumeIdLocations {
- if vidLoc.Error != "" {
- glog.V(0).Infof("volume %s lookup error: %s", vidLoc.VolumeOrFileId, vidLoc.Error)
- continue
- }
-
- // Parse volume ID from response
- parts := strings.Split(vidLoc.VolumeOrFileId, ",")
- vidOnly := parts[0]
- vid, err := strconv.ParseUint(vidOnly, 10, 32)
- if err != nil {
- glog.Warningf("Failed to parse volume id '%s' from master response '%s': %v", vidOnly, vidLoc.VolumeOrFileId, err)
- continue
- }
-
- var locations []Location
- for _, masterLoc := range vidLoc.Locations {
- loc := Location{
- Url: masterLoc.Url,
- PublicUrl: masterLoc.PublicUrl,
- GrpcPort: int(masterLoc.GrpcPort),
- DataCenter: masterLoc.DataCenter,
- }
- mc.addLocation(uint32(vid), loc)
- locations = append(locations, loc)
- }
-
- if len(locations) > 0 {
- batchResult[vidOnly] = locations
+ var locations []Location
+ for _, masterLoc := range vidLoc.Locations {
+ loc := Location{
+ Url: masterLoc.Url,
+ PublicUrl: masterLoc.PublicUrl,
+ GrpcPort: int(masterLoc.GrpcPort),
+ DataCenter: masterLoc.DataCenter,
}
+ // Update cache with the location
+ p.masterClient.addLocation(uint32(vid), loc)
+ locations = append(locations, loc)
}
- return nil
- })
- if err != nil {
- return batchResult, err
+ if len(locations) > 0 {
+ result[vidOnly] = locations
+ }
}
- return batchResult, nil
+ return nil
})
if err != nil {
- lookupErrors = append(lookupErrors, err)
+ return nil, err
}
- // Merge singleflight batch results
- if batchLocations, ok := sfResult.(map[string][]Location); ok {
- for vid, locs := range batchLocations {
- result[vid] = locs
- }
- }
-
- // Check for volumes that still weren't found
- for _, vidString := range needsLookup {
- if _, found := result[vidString]; !found {
- lookupErrors = append(lookupErrors, fmt.Errorf("volume %s not found", vidString))
- }
+ // Return partial results with detailed errors
+ // Callers should check both result map and error
+ if len(lookupErrors) > 0 {
+ glog.V(2).Infof("MasterClient: looked up %d volumes, found %d, %d errors", len(volumeIds), len(result), len(lookupErrors))
+ return result, fmt.Errorf("master volume lookup errors: %w", errors.Join(lookupErrors...))
}
- // Return aggregated errors using errors.Join to preserve error types
- return result, errors.Join(lookupErrors...)
+ glog.V(3).Infof("MasterClient: looked up %d volumes, found %d", len(volumeIds), len(result))
+ return result, nil
}
-func (mc *MasterClient) getCurrentMaster() pb.ServerAddress {
- mc.currentMasterLock.RLock()
- defer mc.currentMasterLock.RUnlock()
- return mc.currentMaster
-}
+// MasterClient connects to master servers and maintains volume location cache
+// It receives real-time updates via KeepConnected streaming and uses vidMapClient for caching
+type MasterClient struct {
+ *vidMapClient // Embedded cache with shared logic
-func (mc *MasterClient) setCurrentMaster(master pb.ServerAddress) {
- mc.currentMasterLock.Lock()
- mc.currentMaster = master
- mc.currentMasterLock.Unlock()
+ FilerGroup string
+ clientType string
+ clientHost pb.ServerAddress
+ rack string
+ currentMaster pb.ServerAddress
+ currentMasterLock sync.RWMutex
+ masters pb.ServerDiscovery
+ grpcDialOption grpc.DialOption
+ grpcTimeout time.Duration // Timeout for gRPC calls to master
+ OnPeerUpdate func(update *master_pb.ClusterNodeUpdate, startFrom time.Time)
+ OnPeerUpdateLock sync.RWMutex
}
-func (mc *MasterClient) GetMaster(ctx context.Context) pb.ServerAddress {
- mc.WaitUntilConnected(ctx)
- return mc.getCurrentMaster()
-}
+func NewMasterClient(grpcDialOption grpc.DialOption, filerGroup string, clientType string, clientHost pb.ServerAddress, clientDataCenter string, rack string, masters pb.ServerDiscovery) *MasterClient {
+ mc := &MasterClient{
+ FilerGroup: filerGroup,
+ clientType: clientType,
+ clientHost: clientHost,
+ rack: rack,
+ masters: masters,
+ grpcDialOption: grpcDialOption,
+ grpcTimeout: 5 * time.Second, // Default: 5 seconds for gRPC calls to master
+ }
-func (mc *MasterClient) GetMasters(ctx context.Context) []pb.ServerAddress {
- mc.WaitUntilConnected(ctx)
- return mc.masters.GetInstances()
-}
+ // Create provider that references this MasterClient
+ provider := &masterVolumeProvider{masterClient: mc}
-func (mc *MasterClient) WaitUntilConnected(ctx context.Context) {
- attempts := 0
- for {
- select {
- case <-ctx.Done():
- return
- default:
- currentMaster := mc.getCurrentMaster()
- if currentMaster != "" {
- return
- }
- attempts++
- if attempts%100 == 0 { // Log every 100 attempts (roughly every 20 seconds)
- glog.V(0).Infof("%s.%s WaitUntilConnected still waiting for master connection (attempt %d)...", mc.FilerGroup, mc.clientType, attempts)
- }
- time.Sleep(time.Duration(rand.Int31n(200)) * time.Millisecond)
- }
- }
-}
+ // Initialize embedded vidMapClient with the provider and default cache size
+ mc.vidMapClient = newVidMapClient(provider, clientDataCenter, DefaultVidMapCacheSize)
-func (mc *MasterClient) KeepConnectedToMaster(ctx context.Context) {
- glog.V(1).Infof("%s.%s masterClient bootstraps with masters %v", mc.FilerGroup, mc.clientType, mc.masters)
- for {
- select {
- case <-ctx.Done():
- glog.V(0).Infof("Connection to masters stopped: %v", ctx.Err())
- return
- default:
- mc.tryAllMasters(ctx)
- time.Sleep(time.Second)
- }
- }
+ return mc
}
-func (mc *MasterClient) FindLeaderFromOtherPeers(myMasterAddress pb.ServerAddress) (leader string) {
- for _, master := range mc.masters.GetInstances() {
- if master == myMasterAddress {
- continue
- }
- if grpcErr := pb.WithMasterClient(false, master, mc.grpcDialOption, false, func(client master_pb.SeaweedClient) error {
- ctx, cancel := context.WithTimeout(context.Background(), 120*time.Millisecond)
- defer cancel()
- resp, err := client.GetMasterConfiguration(ctx, &master_pb.GetMasterConfigurationRequest{})
- if err != nil {
- return err
- }
- leader = resp.Leader
- return nil
- }); grpcErr != nil {
- glog.V(0).Infof("connect to %s: %v", master, grpcErr)
- }
- if leader != "" {
- glog.V(0).Infof("existing leader is %s", leader)
- return
- }
- }
- glog.V(0).Infof("No existing leader found!")
- return
+func (mc *MasterClient) SetOnPeerUpdateFn(onPeerUpdate func(update *master_pb.ClusterNodeUpdate, startFrom time.Time)) {
+ mc.OnPeerUpdateLock.Lock()
+ mc.OnPeerUpdate = onPeerUpdate
+ mc.OnPeerUpdateLock.Unlock()
}
func (mc *MasterClient) tryAllMasters(ctx context.Context) {
@@ -393,6 +208,8 @@ func (mc *MasterClient) tryConnectToMaster(ctx context.Context, master pb.Server
mc.resetVidMap()
mc.updateVidMap(resp)
} else {
+ // First message from master is not VolumeLocation (e.g., ClusterNodeUpdate)
+ // Still need to reset cache to ensure we don't use stale data from previous master
mc.resetVidMap()
}
mc.setCurrentMaster(master)
@@ -406,7 +223,8 @@ func (mc *MasterClient) tryConnectToMaster(ctx context.Context, master pb.Server
}
if resp.VolumeLocation != nil {
- // maybe the leader is changed
+ // Check for leader change during the stream
+ // If master announces a new leader, reconnect to it
if resp.VolumeLocation.Leader != "" && string(mc.GetMaster(ctx)) != resp.VolumeLocation.Leader {
glog.V(0).Infof("currentMaster %v redirected to leader %v", mc.GetMaster(ctx), resp.VolumeLocation.Leader)
nextHintedLeader = pb.ServerAddress(resp.VolumeLocation.Leader)
@@ -415,7 +233,6 @@ func (mc *MasterClient) tryConnectToMaster(ctx context.Context, master pb.Server
}
mc.updateVidMap(resp)
}
-
if resp.ClusterNodeUpdate != nil {
update := resp.ClusterNodeUpdate
mc.OnPeerUpdateLock.RLock()
@@ -442,7 +259,7 @@ func (mc *MasterClient) tryConnectToMaster(ctx context.Context, master pb.Server
stats.MasterClientConnectCounter.WithLabelValues(stats.Failed).Inc()
glog.V(1).Infof("%s.%s masterClient failed to connect with master %v: %v", mc.FilerGroup, mc.clientType, master, gprcErr)
}
- return
+ return nextHintedLeader
}
func (mc *MasterClient) updateVidMap(resp *master_pb.KeepConnectedResponse) {
@@ -494,110 +311,103 @@ func (mc *MasterClient) WithClientCustomGetMaster(getMasterF func() pb.ServerAdd
})
}
-// getStableVidMap gets a stable pointer to the vidMap, releasing the lock immediately.
-// This is safe for read operations as the returned pointer is a stable snapshot,
-// and the underlying vidMap methods have their own internal locking.
-func (mc *MasterClient) getStableVidMap() *vidMap {
- mc.vidMapLock.RLock()
- vm := mc.vidMap
- mc.vidMapLock.RUnlock()
- return vm
-}
-
-// withCurrentVidMap executes a function with the current vidMap under a read lock.
-// This is for methods that modify vidMap's internal state, ensuring the pointer
-// is not swapped by resetVidMap during the operation. The actual map mutations
-// are protected by vidMap's internal mutex.
-func (mc *MasterClient) withCurrentVidMap(f func(vm *vidMap)) {
- mc.vidMapLock.RLock()
- defer mc.vidMapLock.RUnlock()
- f(mc.vidMap)
-}
-
-// Public methods for external packages to access vidMap safely
-
-// GetLocations safely retrieves volume locations
-func (mc *MasterClient) GetLocations(vid uint32) (locations []Location, found bool) {
- return mc.getStableVidMap().GetLocations(vid)
-}
-
-// GetLocationsClone safely retrieves a clone of volume locations
-func (mc *MasterClient) GetLocationsClone(vid uint32) (locations []Location, found bool) {
- return mc.getStableVidMap().GetLocationsClone(vid)
-}
-
-// GetVidLocations safely retrieves volume locations by string ID
-func (mc *MasterClient) GetVidLocations(vid string) (locations []Location, err error) {
- return mc.getStableVidMap().GetVidLocations(vid)
-}
-
-// LookupFileId safely looks up URLs for a file ID
-func (mc *MasterClient) LookupFileId(ctx context.Context, fileId string) (fullUrls []string, err error) {
- return mc.getStableVidMap().LookupFileId(ctx, fileId)
-}
-
-// LookupVolumeServerUrl safely looks up volume server URLs
-func (mc *MasterClient) LookupVolumeServerUrl(vid string) (serverUrls []string, err error) {
- return mc.getStableVidMap().LookupVolumeServerUrl(vid)
+func (mc *MasterClient) getCurrentMaster() pb.ServerAddress {
+ mc.currentMasterLock.RLock()
+ defer mc.currentMasterLock.RUnlock()
+ return mc.currentMaster
}
-// GetDataCenter safely retrieves the data center
-func (mc *MasterClient) GetDataCenter() string {
- return mc.getStableVidMap().DataCenter
+func (mc *MasterClient) setCurrentMaster(master pb.ServerAddress) {
+ mc.currentMasterLock.Lock()
+ mc.currentMaster = master
+ mc.currentMasterLock.Unlock()
}
-// Thread-safe helpers for vidMap operations
-
-// addLocation adds a volume location
-func (mc *MasterClient) addLocation(vid uint32, location Location) {
- mc.withCurrentVidMap(func(vm *vidMap) {
- vm.addLocation(vid, location)
- })
+// GetMaster returns the current master address, blocking until connected.
+//
+// IMPORTANT: This method blocks until KeepConnectedToMaster successfully establishes
+// a connection to a master server. If KeepConnectedToMaster hasn't been started in a
+// background goroutine, this will block indefinitely (or until ctx is canceled).
+//
+// Typical initialization pattern:
+//
+// mc := wdclient.NewMasterClient(...)
+// go mc.KeepConnectedToMaster(ctx) // Start connection management
+// // ... later ...
+// master := mc.GetMaster(ctx) // Will block until connected
+//
+// If called before KeepConnectedToMaster establishes a connection, this may cause
+// unexpected timeouts in LookupVolumeIds and other operations that depend on it.
+func (mc *MasterClient) GetMaster(ctx context.Context) pb.ServerAddress {
+ mc.WaitUntilConnected(ctx)
+ return mc.getCurrentMaster()
}
-// deleteLocation removes a volume location
-func (mc *MasterClient) deleteLocation(vid uint32, location Location) {
- mc.withCurrentVidMap(func(vm *vidMap) {
- vm.deleteLocation(vid, location)
- })
+// GetMasters returns all configured master addresses, blocking until connected.
+// See GetMaster() for important initialization contract details.
+func (mc *MasterClient) GetMasters(ctx context.Context) []pb.ServerAddress {
+ mc.WaitUntilConnected(ctx)
+ return mc.masters.GetInstances()
}
-// addEcLocation adds an EC volume location
-func (mc *MasterClient) addEcLocation(vid uint32, location Location) {
- mc.withCurrentVidMap(func(vm *vidMap) {
- vm.addEcLocation(vid, location)
- })
+// WaitUntilConnected blocks until a master connection is established or ctx is canceled.
+// This does NOT initiate connections - it only waits for KeepConnectedToMaster to succeed.
+func (mc *MasterClient) WaitUntilConnected(ctx context.Context) {
+ attempts := 0
+ for {
+ select {
+ case <-ctx.Done():
+ return
+ default:
+ currentMaster := mc.getCurrentMaster()
+ if currentMaster != "" {
+ return
+ }
+ attempts++
+ if attempts%100 == 0 { // Log every 100 attempts (roughly every 20 seconds)
+ glog.V(0).Infof("%s.%s WaitUntilConnected still waiting for master connection (attempt %d)...", mc.FilerGroup, mc.clientType, attempts)
+ }
+ time.Sleep(time.Duration(rand.Int31n(200)) * time.Millisecond)
+ }
+ }
}
-// deleteEcLocation removes an EC volume location
-func (mc *MasterClient) deleteEcLocation(vid uint32, location Location) {
- mc.withCurrentVidMap(func(vm *vidMap) {
- vm.deleteEcLocation(vid, location)
- })
+func (mc *MasterClient) KeepConnectedToMaster(ctx context.Context) {
+ glog.V(1).Infof("%s.%s masterClient bootstraps with masters %v", mc.FilerGroup, mc.clientType, mc.masters)
+ for {
+ select {
+ case <-ctx.Done():
+ glog.V(0).Infof("Connection to masters stopped: %v", ctx.Err())
+ return
+ default:
+ mc.tryAllMasters(ctx)
+ time.Sleep(time.Second)
+ }
+ }
}
-func (mc *MasterClient) resetVidMap() {
- mc.vidMapLock.Lock()
- defer mc.vidMapLock.Unlock()
-
- // Preserve the existing vidMap in the cache chain
- // No need to clone - the existing vidMap has its own mutex for thread safety
- tail := mc.vidMap
-
- nvm := newVidMap(tail.DataCenter)
- nvm.cache.Store(tail)
- mc.vidMap = nvm
-
- // Trim cache chain to vidMapCacheSize by traversing to the last node
- // that should remain and cutting the chain after it
- node := tail
- for i := 0; i < mc.vidMapCacheSize-1; i++ {
- if node.cache.Load() == nil {
+func (mc *MasterClient) FindLeaderFromOtherPeers(myMasterAddress pb.ServerAddress) (leader string) {
+ for _, master := range mc.masters.GetInstances() {
+ if master == myMasterAddress {
+ continue
+ }
+ if grpcErr := pb.WithMasterClient(false, master, mc.grpcDialOption, false, func(client master_pb.SeaweedClient) error {
+ ctx, cancel := context.WithTimeout(context.Background(), 120*time.Millisecond)
+ defer cancel()
+ resp, err := client.GetMasterConfiguration(ctx, &master_pb.GetMasterConfigurationRequest{})
+ if err != nil {
+ return err
+ }
+ leader = resp.Leader
+ return nil
+ }); grpcErr != nil {
+ glog.V(0).Infof("connect to %s: %v", master, grpcErr)
+ }
+ if leader != "" {
+ glog.V(0).Infof("existing leader is %s", leader)
return
}
- node = node.cache.Load()
- }
- if node != nil {
- node.cache.Store(nil)
}
+ glog.V(0).Infof("No existing leader found!")
+ return
}
diff --git a/weed/wdclient/vidmap_client.go b/weed/wdclient/vidmap_client.go
new file mode 100644
index 000000000..402eaf8c4
--- /dev/null
+++ b/weed/wdclient/vidmap_client.go
@@ -0,0 +1,347 @@
+package wdclient
+
+import (
+ "context"
+ "errors"
+ "fmt"
+ "math/rand"
+ "sort"
+ "strconv"
+ "strings"
+ "sync"
+
+ "golang.org/x/sync/singleflight"
+
+ "github.com/seaweedfs/seaweedfs/weed/glog"
+)
+
+// VolumeLocationProvider is the interface for looking up volume locations
+// This allows different implementations (master subscription, filer queries, etc.)
+type VolumeLocationProvider interface {
+ // LookupVolumeIds looks up volume locations for the given volume IDs
+ // Returns a map of volume ID to locations
+ LookupVolumeIds(ctx context.Context, volumeIds []string) (map[string][]Location, error)
+}
+
+// vidMapClient provides volume location caching with pluggable lookup
+// It wraps the battle-tested vidMap with customizable volume lookup strategies
+type vidMapClient struct {
+ vidMap *vidMap
+ vidMapLock sync.RWMutex
+ vidMapCacheSize int
+ provider VolumeLocationProvider
+ vidLookupGroup singleflight.Group
+}
+
+const (
+ // DefaultVidMapCacheSize is the default number of historical vidMap snapshots to keep
+ // This provides cache history when volumes move between servers
+ DefaultVidMapCacheSize = 5
+)
+
+// newVidMapClient creates a new client with the given provider and data center
+func newVidMapClient(provider VolumeLocationProvider, dataCenter string, cacheSize int) *vidMapClient {
+ if cacheSize <= 0 {
+ cacheSize = DefaultVidMapCacheSize
+ }
+ return &vidMapClient{
+ vidMap: newVidMap(dataCenter),
+ vidMapCacheSize: cacheSize,
+ provider: provider,
+ }
+}
+
+// GetLookupFileIdFunction returns a function that can be used to lookup file IDs
+func (vc *vidMapClient) GetLookupFileIdFunction() LookupFileIdFunctionType {
+ return vc.LookupFileIdWithFallback
+}
+
+// LookupFileIdWithFallback looks up a file ID, checking cache first, then using provider
+func (vc *vidMapClient) LookupFileIdWithFallback(ctx context.Context, fileId string) (fullUrls []string, err error) {
+ // Try cache first - hold read lock during entire vidMap access to prevent swap during operation
+ vc.vidMapLock.RLock()
+ vm := vc.vidMap
+ dataCenter := vm.DataCenter
+ fullUrls, err = vm.LookupFileId(ctx, fileId)
+ vc.vidMapLock.RUnlock()
+
+ // Cache hit - return immediately
+ if err == nil && len(fullUrls) > 0 {
+ return
+ }
+
+ // Cache miss - extract volume ID from file ID (format: "volumeId,needle_id_cookie")
+ parts := strings.Split(fileId, ",")
+ if len(parts) != 2 {
+ return nil, fmt.Errorf("invalid fileId %s", fileId)
+ }
+ volumeId := parts[0]
+
+ // Use shared lookup logic with batching and singleflight
+ vidLocations, err := vc.LookupVolumeIdsWithFallback(ctx, []string{volumeId})
+
+ // Check for partial results first (important for multi-volume batched lookups)
+ locations, found := vidLocations[volumeId]
+ if !found || len(locations) == 0 {
+ // Volume not found - return specific error with context from lookup if available
+ if err != nil {
+ return nil, fmt.Errorf("volume %s not found for fileId %s: %w", volumeId, fileId, err)
+ }
+ return nil, fmt.Errorf("volume %s not found for fileId %s", volumeId, fileId)
+ }
+
+ // Volume found successfully - ignore any errors about other volumes
+ // (not relevant for single-volume lookup, but defensive for future batching)
+
+ // Build HTTP URLs from locations, preferring same data center
+ var sameDcUrls, otherDcUrls []string
+ for _, loc := range locations {
+ httpUrl := "http://" + loc.Url + "/" + fileId
+ if dataCenter != "" && dataCenter == loc.DataCenter {
+ sameDcUrls = append(sameDcUrls, httpUrl)
+ } else {
+ otherDcUrls = append(otherDcUrls, httpUrl)
+ }
+ }
+
+ // Shuffle to distribute load across volume servers
+ rand.Shuffle(len(sameDcUrls), func(i, j int) { sameDcUrls[i], sameDcUrls[j] = sameDcUrls[j], sameDcUrls[i] })
+ rand.Shuffle(len(otherDcUrls), func(i, j int) { otherDcUrls[i], otherDcUrls[j] = otherDcUrls[j], otherDcUrls[i] })
+
+ // Prefer same data center
+ fullUrls = append(sameDcUrls, otherDcUrls...)
+ return fullUrls, nil
+}
+
+// LookupVolumeIdsWithFallback looks up volume locations, querying provider if not in cache.
+// Uses singleflight to coalesce concurrent requests for the same batch of volumes.
+//
+// IMPORTANT: This function may return PARTIAL results with a non-nil error.
+// The result map contains successfully looked up volumes, while the error aggregates
+// failures for volumes that couldn't be found or had lookup errors.
+//
+// Callers MUST check both the result map AND the error:
+// - result != nil && err == nil: All volumes found successfully
+// - result != nil && err != nil: Some volumes found, some failed (check both)
+// - result == nil && err != nil: Complete failure (connection error, etc.)
+//
+// Example usage:
+//
+// locs, err := mc.LookupVolumeIdsWithFallback(ctx, []string{"1", "2", "999"})
+// if len(locs) > 0 {
+// // Process successfully found volumes
+// }
+// if err != nil {
+// // Log/handle failed volumes
+// }
+func (vc *vidMapClient) LookupVolumeIdsWithFallback(ctx context.Context, volumeIds []string) (map[string][]Location, error) {
+ result := make(map[string][]Location)
+ var needsLookup []string
+ var lookupErrors []error
+
+ // Check cache first and parse volume IDs once
+ vidStringToUint := make(map[string]uint32, len(volumeIds))
+
+ // Get stable pointer to vidMap with minimal lock hold time
+ vm := vc.getStableVidMap()
+
+ for _, vidString := range volumeIds {
+ vid, err := strconv.ParseUint(vidString, 10, 32)
+ if err != nil {
+ return nil, fmt.Errorf("invalid volume id %s: %v", vidString, err)
+ }
+ vidStringToUint[vidString] = uint32(vid)
+
+ locations, found := vm.GetLocations(uint32(vid))
+ if found && len(locations) > 0 {
+ result[vidString] = locations
+ } else {
+ needsLookup = append(needsLookup, vidString)
+ }
+ }
+
+ if len(needsLookup) == 0 {
+ return result, nil
+ }
+
+ // Batch query all missing volumes using singleflight on the batch key
+ // Sort for stable key to coalesce identical batches
+ sort.Strings(needsLookup)
+ batchKey := strings.Join(needsLookup, ",")
+
+ sfResult, err, _ := vc.vidLookupGroup.Do(batchKey, func() (interface{}, error) {
+ // Double-check cache for volumes that might have been populated while waiting
+ stillNeedLookup := make([]string, 0, len(needsLookup))
+ batchResult := make(map[string][]Location)
+
+ // Get stable pointer with minimal lock hold time
+ vm := vc.getStableVidMap()
+
+ for _, vidString := range needsLookup {
+ vid := vidStringToUint[vidString] // Use pre-parsed value
+ if locations, found := vm.GetLocations(vid); found && len(locations) > 0 {
+ batchResult[vidString] = locations
+ } else {
+ stillNeedLookup = append(stillNeedLookup, vidString)
+ }
+ }
+
+ if len(stillNeedLookup) == 0 {
+ return batchResult, nil
+ }
+
+ // Query provider with batched volume IDs
+ glog.V(2).Infof("Looking up %d volumes from provider: %v", len(stillNeedLookup), stillNeedLookup)
+
+ providerResults, err := vc.provider.LookupVolumeIds(ctx, stillNeedLookup)
+ if err != nil {
+ return batchResult, fmt.Errorf("provider lookup failed: %v", err)
+ }
+
+ // Update cache with results
+ for vidString, locations := range providerResults {
+ vid, err := strconv.ParseUint(vidString, 10, 32)
+ if err != nil {
+ glog.Warningf("Failed to parse volume id '%s': %v", vidString, err)
+ continue
+ }
+
+ for _, loc := range locations {
+ vc.addLocation(uint32(vid), loc)
+ }
+
+ if len(locations) > 0 {
+ batchResult[vidString] = locations
+ }
+ }
+
+ return batchResult, nil
+ })
+
+ if err != nil {
+ lookupErrors = append(lookupErrors, err)
+ }
+
+ // Merge singleflight batch results
+ if batchLocations, ok := sfResult.(map[string][]Location); ok {
+ for vid, locs := range batchLocations {
+ result[vid] = locs
+ }
+ }
+
+ // Check for volumes that still weren't found
+ for _, vidString := range needsLookup {
+ if _, found := result[vidString]; !found {
+ lookupErrors = append(lookupErrors, fmt.Errorf("volume %s not found", vidString))
+ }
+ }
+
+ // Return aggregated errors
+ return result, errors.Join(lookupErrors...)
+}
+
+// getStableVidMap gets a stable pointer to the vidMap, releasing the lock immediately.
+// WARNING: Use with caution. The returned vidMap pointer is stable (won't be garbage collected
+// due to cache chain), but the vidMapClient.vidMap field may be swapped by resetVidMap().
+// For operations that must use the current vidMap atomically, use withCurrentVidMap() instead.
+func (vc *vidMapClient) getStableVidMap() *vidMap {
+ vc.vidMapLock.RLock()
+ vm := vc.vidMap
+ vc.vidMapLock.RUnlock()
+ return vm
+}
+
+// withCurrentVidMap executes a function with the current vidMap under a read lock.
+// This guarantees the vidMap instance cannot be swapped during the function execution.
+// Use this when you need atomic access to the current vidMap for multiple operations.
+func (vc *vidMapClient) withCurrentVidMap(f func(vm *vidMap)) {
+ vc.vidMapLock.RLock()
+ defer vc.vidMapLock.RUnlock()
+ f(vc.vidMap)
+}
+
+// Public methods for external access
+
+// GetLocations safely retrieves volume locations
+func (vc *vidMapClient) GetLocations(vid uint32) (locations []Location, found bool) {
+ return vc.getStableVidMap().GetLocations(vid)
+}
+
+// GetLocationsClone safely retrieves a clone of volume locations
+func (vc *vidMapClient) GetLocationsClone(vid uint32) (locations []Location, found bool) {
+ return vc.getStableVidMap().GetLocationsClone(vid)
+}
+
+// GetVidLocations safely retrieves volume locations by string ID
+func (vc *vidMapClient) GetVidLocations(vid string) (locations []Location, err error) {
+ return vc.getStableVidMap().GetVidLocations(vid)
+}
+
+// LookupFileId safely looks up URLs for a file ID
+func (vc *vidMapClient) LookupFileId(ctx context.Context, fileId string) (fullUrls []string, err error) {
+ return vc.getStableVidMap().LookupFileId(ctx, fileId)
+}
+
+// LookupVolumeServerUrl safely looks up volume server URLs
+func (vc *vidMapClient) LookupVolumeServerUrl(vid string) (serverUrls []string, err error) {
+ return vc.getStableVidMap().LookupVolumeServerUrl(vid)
+}
+
+// GetDataCenter safely retrieves the data center
+func (vc *vidMapClient) GetDataCenter() string {
+ return vc.getStableVidMap().DataCenter
+}
+
+// Thread-safe helpers for vidMap operations
+
+// addLocation adds a volume location
+func (vc *vidMapClient) addLocation(vid uint32, location Location) {
+ vc.withCurrentVidMap(func(vm *vidMap) {
+ vm.addLocation(vid, location)
+ })
+}
+
+// deleteLocation removes a volume location
+func (vc *vidMapClient) deleteLocation(vid uint32, location Location) {
+ vc.withCurrentVidMap(func(vm *vidMap) {
+ vm.deleteLocation(vid, location)
+ })
+}
+
+// addEcLocation adds an EC volume location
+func (vc *vidMapClient) addEcLocation(vid uint32, location Location) {
+ vc.withCurrentVidMap(func(vm *vidMap) {
+ vm.addEcLocation(vid, location)
+ })
+}
+
+// deleteEcLocation removes an EC volume location
+func (vc *vidMapClient) deleteEcLocation(vid uint32, location Location) {
+ vc.withCurrentVidMap(func(vm *vidMap) {
+ vm.deleteEcLocation(vid, location)
+ })
+}
+
+// resetVidMap resets the volume ID map
+func (vc *vidMapClient) resetVidMap() {
+ vc.vidMapLock.Lock()
+ defer vc.vidMapLock.Unlock()
+
+ // Preserve the existing vidMap in the cache chain
+ tail := vc.vidMap
+
+ nvm := newVidMap(tail.DataCenter)
+ nvm.cache.Store(tail)
+ vc.vidMap = nvm
+
+ // Trim cache chain to vidMapCacheSize
+ node := tail
+ for i := 0; i < vc.vidMapCacheSize-1; i++ {
+ if node.cache.Load() == nil {
+ return
+ }
+ node = node.cache.Load()
+ }
+ // node is guaranteed to be non-nil after the loop
+ node.cache.Store(nil)
+}