aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--test/s3/iam/s3_iam_framework.go12
-rw-r--r--weed/cluster/cluster.go5
-rw-r--r--weed/command/iam.go5
-rw-r--r--weed/filer/filer.go4
-rw-r--r--weed/filer/reader_at.go47
-rw-r--r--weed/iamapi/iamapi_server.go43
-rw-r--r--weed/mount/weedfs.go27
-rw-r--r--weed/s3api/s3api_object_handlers.go33
-rw-r--r--weed/s3api/s3api_server.go9
-rw-r--r--weed/server/filer_server.go2
-rw-r--r--weed/wdclient/filer_client.go404
-rw-r--r--weed/wdclient/masterclient.go546
-rw-r--r--weed/wdclient/vidmap_client.go347
13 files changed, 1065 insertions, 419 deletions
diff --git a/test/s3/iam/s3_iam_framework.go b/test/s3/iam/s3_iam_framework.go
index 178ae0763..c155b7358 100644
--- a/test/s3/iam/s3_iam_framework.go
+++ b/test/s3/iam/s3_iam_framework.go
@@ -705,12 +705,22 @@ func (f *S3IAMTestFramework) CreateBucketWithCleanup(s3Client *s3.S3, bucketName
f.t.Logf("Warning: Failed to delete existing bucket %s: %v", bucketName, deleteErr)
}
+ // Add a small delay to allow deletion to propagate
+ time.Sleep(100 * time.Millisecond)
+
// Now create it fresh
_, err = s3Client.CreateBucket(&s3.CreateBucketInput{
Bucket: aws.String(bucketName),
})
if err != nil {
- return fmt.Errorf("failed to recreate bucket after cleanup: %v", err)
+ // If it still says bucket exists after cleanup, it might be in an inconsistent state
+ // In this case, just use the existing bucket since we emptied it
+ if awsErr, ok := err.(awserr.Error); ok && (awsErr.Code() == "BucketAlreadyExists" || awsErr.Code() == "BucketAlreadyOwnedByYou") {
+ f.t.Logf("Bucket %s still exists after cleanup, reusing it", bucketName)
+ // Bucket exists and is empty, so we can proceed
+ } else {
+ return fmt.Errorf("failed to recreate bucket after cleanup: %v", err)
+ }
}
} else {
return err
diff --git a/weed/cluster/cluster.go b/weed/cluster/cluster.go
index 52d32f697..638553b04 100644
--- a/weed/cluster/cluster.go
+++ b/weed/cluster/cluster.go
@@ -1,10 +1,11 @@
package cluster
import (
- "github.com/seaweedfs/seaweedfs/weed/pb"
- "github.com/seaweedfs/seaweedfs/weed/pb/master_pb"
"sync"
"time"
+
+ "github.com/seaweedfs/seaweedfs/weed/pb"
+ "github.com/seaweedfs/seaweedfs/weed/pb/master_pb"
)
const (
diff --git a/weed/command/iam.go b/weed/command/iam.go
index c484ed18d..8f4ac878d 100644
--- a/weed/command/iam.go
+++ b/weed/command/iam.go
@@ -76,7 +76,7 @@ func (iamopt *IamOptions) startIamServer() bool {
masters := pb.ServerAddresses(*iamopt.masters).ToAddressMap()
router := mux.NewRouter().SkipClean(true)
- _, iamApiServer_err := iamapi.NewIamApiServer(router, &iamapi.IamServerOption{
+ iamApiServer, iamApiServer_err := iamapi.NewIamApiServer(router, &iamapi.IamServerOption{
Masters: masters,
Filer: filerAddress,
Port: *iamopt.port,
@@ -86,6 +86,9 @@ func (iamopt *IamOptions) startIamServer() bool {
if iamApiServer_err != nil {
glog.Fatalf("IAM API Server startup error: %v", iamApiServer_err)
}
+
+ // Ensure cleanup on shutdown
+ defer iamApiServer.Shutdown()
listenAddress := fmt.Sprintf(":%d", *iamopt.port)
iamApiListener, iamApiLocalListener, err := util.NewIpAndLocalListeners(*iamopt.ip, *iamopt.port, time.Duration(10)*time.Second)
diff --git a/weed/filer/filer.go b/weed/filer/filer.go
index d3d2de948..b68004a8b 100644
--- a/weed/filer/filer.go
+++ b/weed/filer/filer.go
@@ -175,10 +175,6 @@ func (fs *Filer) GetMaster(ctx context.Context) pb.ServerAddress {
return fs.MasterClient.GetMaster(ctx)
}
-func (fs *Filer) KeepMasterClientConnected(ctx context.Context) {
- fs.MasterClient.KeepConnectedToMaster(ctx)
-}
-
func (f *Filer) BeginTransaction(ctx context.Context) (context.Context, error) {
return f.Store.BeginTransaction(ctx)
}
diff --git a/weed/filer/reader_at.go b/weed/filer/reader_at.go
index 27d773f49..aeac9b34a 100644
--- a/weed/filer/reader_at.go
+++ b/weed/filer/reader_at.go
@@ -26,15 +26,42 @@ type ChunkReadAt struct {
var _ = io.ReaderAt(&ChunkReadAt{})
var _ = io.Closer(&ChunkReadAt{})
+// LookupFn creates a basic volume location lookup function with simple caching.
+//
+// Deprecated: Use wdclient.FilerClient instead. This function has several limitations compared to wdclient.FilerClient:
+// - Simple bounded cache (10k entries, no eviction policy or TTL for stale entries)
+// - No singleflight deduplication (concurrent requests for same volume will duplicate work)
+// - No cache history for volume moves (no fallback chain when volumes migrate)
+// - No high availability (single filer address, no automatic failover)
+//
+// For NEW code, especially mount operations, use wdclient.FilerClient instead:
+// filerClient := wdclient.NewFilerClient(filerAddresses, grpcDialOption, dataCenter, opts)
+// lookupFn := filerClient.GetLookupFileIdFunction()
+//
+// This provides:
+// - Bounded cache with configurable size
+// - Singleflight deduplication of concurrent lookups
+// - Cache history when volumes move
+// - Battle-tested vidMap with cache chain
+//
+// This function is kept for backward compatibility with existing code paths
+// (shell commands, streaming, etc.) but should be avoided in long-running processes
+// or multi-tenant deployments where unbounded memory growth is a concern.
+//
+// Maximum recommended cache entries: ~10,000 volumes per process.
+// Beyond this, consider migrating to wdclient.FilerClient.
func LookupFn(filerClient filer_pb.FilerClient) wdclient.LookupFileIdFunctionType {
vidCache := make(map[string]*filer_pb.Locations)
- var vicCacheLock sync.RWMutex
+ var vidCacheLock sync.RWMutex
+ cacheSize := 0
+ const maxCacheSize = 10000 // Simple bound to prevent unbounded growth
+
return func(ctx context.Context, fileId string) (targetUrls []string, err error) {
vid := VolumeId(fileId)
- vicCacheLock.RLock()
+ vidCacheLock.RLock()
locations, found := vidCache[vid]
- vicCacheLock.RUnlock()
+ vidCacheLock.RUnlock()
if !found {
util.Retry("lookup volume "+vid, func() error {
@@ -51,9 +78,17 @@ func LookupFn(filerClient filer_pb.FilerClient) wdclient.LookupFileIdFunctionTyp
glog.V(0).InfofCtx(ctx, "failed to locate %s", fileId)
return fmt.Errorf("failed to locate %s", fileId)
}
- vicCacheLock.Lock()
- vidCache[vid] = locations
- vicCacheLock.Unlock()
+ vidCacheLock.Lock()
+ // Simple size limit to prevent unbounded growth
+ // For proper cache management, use wdclient.FilerClient instead
+ if cacheSize < maxCacheSize {
+ vidCache[vid] = locations
+ cacheSize++
+ } else if cacheSize == maxCacheSize {
+ glog.Warningf("filer.LookupFn cache reached limit of %d volumes, not caching new entries. Consider migrating to wdclient.FilerClient for bounded cache management.", maxCacheSize)
+ cacheSize++ // Only log once
+ }
+ vidCacheLock.Unlock()
return nil
})
diff --git a/weed/iamapi/iamapi_server.go b/weed/iamapi/iamapi_server.go
index cf507ee82..361d9bec9 100644
--- a/weed/iamapi/iamapi_server.go
+++ b/weed/iamapi/iamapi_server.go
@@ -12,6 +12,7 @@ import (
"github.com/gorilla/mux"
"github.com/seaweedfs/seaweedfs/weed/credential"
"github.com/seaweedfs/seaweedfs/weed/filer"
+ "github.com/seaweedfs/seaweedfs/weed/glog"
"github.com/seaweedfs/seaweedfs/weed/pb"
"github.com/seaweedfs/seaweedfs/weed/pb/filer_pb"
"github.com/seaweedfs/seaweedfs/weed/pb/iam_pb"
@@ -45,8 +46,11 @@ type IamServerOption struct {
}
type IamApiServer struct {
- s3ApiConfig IamS3ApiConfig
- iam *s3api.IdentityAccessManagement
+ s3ApiConfig IamS3ApiConfig
+ iam *s3api.IdentityAccessManagement
+ shutdownContext context.Context
+ shutdownCancel context.CancelFunc
+ masterClient *wdclient.MasterClient
}
var s3ApiConfigure IamS3ApiConfig
@@ -56,9 +60,21 @@ func NewIamApiServer(router *mux.Router, option *IamServerOption) (iamApiServer
}
func NewIamApiServerWithStore(router *mux.Router, option *IamServerOption, explicitStore string) (iamApiServer *IamApiServer, err error) {
+ masterClient := wdclient.NewMasterClient(option.GrpcDialOption, "", "iam", "", "", "", *pb.NewServiceDiscoveryFromMap(option.Masters))
+
+ // Create a cancellable context for the master client connection
+ // This allows graceful shutdown via Shutdown() method
+ shutdownCtx, shutdownCancel := context.WithCancel(context.Background())
+
+ // Start KeepConnectedToMaster for volume location lookups
+ // IAM config files are typically small and inline, but if they ever have chunks,
+ // ReadEntry→StreamContent needs masterClient for volume lookups
+ glog.V(0).Infof("IAM API starting master client connection for volume location lookups")
+ go masterClient.KeepConnectedToMaster(shutdownCtx)
+
configure := &IamS3ApiConfigure{
option: option,
- masterClient: wdclient.NewMasterClient(option.GrpcDialOption, "", "iam", "", "", "", *pb.NewServiceDiscoveryFromMap(option.Masters)),
+ masterClient: masterClient,
}
s3ApiConfigure = configure
@@ -72,8 +88,11 @@ func NewIamApiServerWithStore(router *mux.Router, option *IamServerOption, expli
configure.credentialManager = iam.GetCredentialManager()
iamApiServer = &IamApiServer{
- s3ApiConfig: s3ApiConfigure,
- iam: iam,
+ s3ApiConfig: s3ApiConfigure,
+ iam: iam,
+ shutdownContext: shutdownCtx,
+ shutdownCancel: shutdownCancel,
+ masterClient: masterClient,
}
iamApiServer.registerRouter(router)
@@ -93,6 +112,20 @@ func (iama *IamApiServer) registerRouter(router *mux.Router) {
apiRouter.NotFoundHandler = http.HandlerFunc(s3err.NotFoundHandler)
}
+// Shutdown gracefully stops the IAM API server and releases resources.
+// It cancels the master client connection goroutine and closes gRPC connections.
+// This method is safe to call multiple times.
+//
+// Note: This method is called via defer in weed/command/iam.go for best-effort cleanup.
+// For proper graceful shutdown on SIGTERM/SIGINT, signal handling should be added to
+// the command layer to call this method before process exit.
+func (iama *IamApiServer) Shutdown() {
+ if iama.shutdownCancel != nil {
+ glog.V(0).Infof("IAM API server shutting down, stopping master client connection")
+ iama.shutdownCancel()
+ }
+}
+
func (iama *IamS3ApiConfigure) GetS3ApiConfiguration(s3cfg *iam_pb.S3ApiConfiguration) (err error) {
return iama.GetS3ApiConfigurationFromCredentialManager(s3cfg)
}
diff --git a/weed/mount/weedfs.go b/weed/mount/weedfs.go
index 95864ef00..21c54841a 100644
--- a/weed/mount/weedfs.go
+++ b/weed/mount/weedfs.go
@@ -97,9 +97,32 @@ type WFS struct {
fhLockTable *util.LockTable[FileHandleId]
rdmaClient *RDMAMountClient
FilerConf *filer.FilerConf
+ filerClient *wdclient.FilerClient // Cached volume location client
}
func NewSeaweedFileSystem(option *Option) *WFS {
+ // Only create FilerClient for direct volume access modes
+ // When VolumeServerAccess == "filerProxy", all reads go through filer, so no volume lookup needed
+ var filerClient *wdclient.FilerClient
+ if option.VolumeServerAccess != "filerProxy" {
+ // Create FilerClient for efficient volume location caching
+ // Pass all filer addresses for high availability with automatic failover
+ // Configure URL preference based on VolumeServerAccess option
+ var opts *wdclient.FilerClientOption
+ if option.VolumeServerAccess == "publicUrl" {
+ opts = &wdclient.FilerClientOption{
+ UrlPreference: wdclient.PreferPublicUrl,
+ }
+ }
+
+ filerClient = wdclient.NewFilerClient(
+ option.FilerAddresses, // Pass all filer addresses for HA
+ option.GrpcDialOption,
+ option.DataCenter,
+ opts,
+ )
+ }
+
wfs := &WFS{
RawFileSystem: fuse.NewDefaultRawFileSystem(),
option: option,
@@ -107,6 +130,7 @@ func NewSeaweedFileSystem(option *Option) *WFS {
inodeToPath: NewInodeToPath(util.FullPath(option.FilerMountRootPath), option.CacheMetaTTlSec),
fhMap: NewFileHandleToInode(),
dhMap: NewDirectoryHandleToInode(),
+ filerClient: filerClient, // nil for proxy mode, initialized for direct access
fhLockTable: util.NewLockTable[FileHandleId](),
}
@@ -253,7 +277,8 @@ func (wfs *WFS) LookupFn() wdclient.LookupFileIdFunctionType {
return []string{"http://" + wfs.getCurrentFiler().ToHttpAddress() + "/?proxyChunkId=" + fileId}, nil
}
}
- return filer.LookupFn(wfs)
+ // Use the cached FilerClient for efficient lookups with singleflight and cache history
+ return wfs.filerClient.GetLookupFileIdFunction()
}
func (wfs *WFS) getCurrentFiler() pb.ServerAddress {
diff --git a/weed/s3api/s3api_object_handlers.go b/weed/s3api/s3api_object_handlers.go
index dee5f60c8..cd0e82421 100644
--- a/weed/s3api/s3api_object_handlers.go
+++ b/weed/s3api/s3api_object_handlers.go
@@ -24,7 +24,6 @@ import (
"github.com/seaweedfs/seaweedfs/weed/s3api/s3_constants"
"github.com/seaweedfs/seaweedfs/weed/s3api/s3err"
- util_http "github.com/seaweedfs/seaweedfs/weed/util/http"
"github.com/seaweedfs/seaweedfs/weed/util/mem"
"github.com/seaweedfs/seaweedfs/weed/glog"
@@ -994,36 +993,10 @@ var volumeServerHTTPClient = &http.Client{
}
// createLookupFileIdFunction creates a reusable lookup function for resolving volume URLs
+// Uses FilerClient's vidMap cache to eliminate per-chunk gRPC overhead
func (s3a *S3ApiServer) createLookupFileIdFunction() func(context.Context, string) ([]string, error) {
- return func(ctx context.Context, fileId string) ([]string, error) {
- var urls []string
- err := s3a.WithFilerClient(false, func(client filer_pb.SeaweedFilerClient) error {
- vid := filer.VolumeId(fileId)
- resp, err := client.LookupVolume(ctx, &filer_pb.LookupVolumeRequest{
- VolumeIds: []string{vid},
- })
- if err != nil {
- return err
- }
- if locs, found := resp.LocationsMap[vid]; found {
- for _, loc := range locs.Locations {
- // Build complete URL with volume server address and fileId
- // The fileId parameter contains the full "volumeId,fileKey" identifier (e.g., "3,01637037d6")
- // This constructs URLs like: http://127.0.0.1:8080/3,01637037d6 (or https:// if configured)
- // NormalizeUrl ensures the proper scheme (http:// or https://) is used based on configuration
- normalizedUrl, err := util_http.NormalizeUrl(loc.Url)
- if err != nil {
- glog.Warningf("Failed to normalize URL for %s: %v", loc.Url, err)
- continue
- }
- urls = append(urls, normalizedUrl+"/"+fileId)
- }
- }
- return nil
- })
- glog.V(3).Infof("createLookupFileIdFunction: fileId=%s, resolved urls=%v", fileId, urls)
- return urls, err
- }
+ // Return the FilerClient's lookup function which uses the battle-tested vidMap cache
+ return s3a.filerClient.GetLookupFileIdFunction()
}
// streamFromVolumeServersWithSSE handles streaming with inline SSE decryption
diff --git a/weed/s3api/s3api_server.go b/weed/s3api/s3api_server.go
index b9c4eb3fc..992027fda 100644
--- a/weed/s3api/s3api_server.go
+++ b/weed/s3api/s3api_server.go
@@ -19,6 +19,7 @@ import (
"github.com/seaweedfs/seaweedfs/weed/iam/sts"
"github.com/seaweedfs/seaweedfs/weed/pb/s3_pb"
"github.com/seaweedfs/seaweedfs/weed/util/grace"
+ "github.com/seaweedfs/seaweedfs/weed/wdclient"
"github.com/gorilla/mux"
"github.com/seaweedfs/seaweedfs/weed/pb"
@@ -55,6 +56,7 @@ type S3ApiServer struct {
cb *CircuitBreaker
randomClientId int32
filerGuard *security.Guard
+ filerClient *wdclient.FilerClient
client util_http_client.HTTPClientInterface
bucketRegistry *BucketRegistry
credentialManager *credential.CredentialManager
@@ -91,11 +93,18 @@ func NewS3ApiServerWithStore(router *mux.Router, option *S3ApiServerOption, expl
// Initialize bucket policy engine first
policyEngine := NewBucketPolicyEngine()
+ // Initialize FilerClient for volume location caching
+ // Uses the battle-tested vidMap with filer-based lookups
+ // S3 API typically connects to a single filer, but wrap in slice for consistency
+ filerClient := wdclient.NewFilerClient([]pb.ServerAddress{option.Filer}, option.GrpcDialOption, option.DataCenter)
+ glog.V(0).Infof("S3 API initialized FilerClient for volume location caching")
+
s3ApiServer = &S3ApiServer{
option: option,
iam: iam,
randomClientId: util.RandomInt32(),
filerGuard: security.NewGuard([]string{}, signingKey, expiresAfterSec, readSigningKey, readExpiresAfterSec),
+ filerClient: filerClient,
cb: NewCircuitBreaker(option),
credentialManager: iam.credentialManager,
bucketConfigCache: NewBucketConfigCache(60 * time.Minute), // Increased TTL since cache is now event-driven
diff --git a/weed/server/filer_server.go b/weed/server/filer_server.go
index 79fb90742..3d08c0980 100644
--- a/weed/server/filer_server.go
+++ b/weed/server/filer_server.go
@@ -178,7 +178,7 @@ func NewFilerServer(defaultMux, readonlyMux *http.ServeMux, option *FilerOption)
fs.checkWithMaster()
go stats.LoopPushingMetric("filer", string(fs.option.Host), fs.metricsAddress, fs.metricsIntervalSec)
- go fs.filer.KeepMasterClientConnected(context.Background())
+ go fs.filer.MasterClient.KeepConnectedToMaster(context.Background())
fs.option.recursiveDelete = v.GetBool("filer.options.recursive_delete")
v.SetDefault("filer.options.buckets_folder", "/buckets")
diff --git a/weed/wdclient/filer_client.go b/weed/wdclient/filer_client.go
new file mode 100644
index 000000000..f0dd5f2e6
--- /dev/null
+++ b/weed/wdclient/filer_client.go
@@ -0,0 +1,404 @@
+package wdclient
+
+import (
+ "context"
+ "fmt"
+ "math/rand"
+ "strings"
+ "sync/atomic"
+ "time"
+
+ "google.golang.org/grpc"
+ "google.golang.org/grpc/codes"
+ "google.golang.org/grpc/status"
+
+ "github.com/seaweedfs/seaweedfs/weed/glog"
+ "github.com/seaweedfs/seaweedfs/weed/pb"
+ "github.com/seaweedfs/seaweedfs/weed/pb/filer_pb"
+)
+
+// UrlPreference controls which URL to use for volume access
+type UrlPreference string
+
+const (
+ PreferUrl UrlPreference = "url" // Use private URL (default)
+ PreferPublicUrl UrlPreference = "publicUrl" // Use public URL
+)
+
+// filerHealth tracks the health status of a filer
+type filerHealth struct {
+ failureCount int32 // atomic: consecutive failures
+ lastFailureTimeNs int64 // atomic: last failure time in Unix nanoseconds
+}
+
+// FilerClient provides volume location services by querying a filer
+// It uses the shared vidMap cache for efficient lookups
+// Supports multiple filer addresses with automatic failover for high availability
+// Tracks filer health to avoid repeatedly trying known-unhealthy filers
+type FilerClient struct {
+ *vidMapClient
+ filerAddresses []pb.ServerAddress
+ filerIndex int32 // atomic: current filer index for round-robin
+ filerHealth []*filerHealth // health status per filer (same order as filerAddresses)
+ grpcDialOption grpc.DialOption
+ urlPreference UrlPreference
+ grpcTimeout time.Duration
+ cacheSize int // Number of historical vidMap snapshots to keep
+ clientId int32 // Unique client identifier for gRPC metadata
+ failureThreshold int32 // Circuit breaker: consecutive failures before circuit opens
+ resetTimeout time.Duration // Circuit breaker: time before re-checking unhealthy filer
+ maxRetries int // Retry: maximum retry attempts for transient failures
+ initialRetryWait time.Duration // Retry: initial wait time before first retry
+ retryBackoffFactor float64 // Retry: backoff multiplier for wait time
+}
+
+// filerVolumeProvider implements VolumeLocationProvider by querying filer
+// Supports multiple filer addresses with automatic failover
+type filerVolumeProvider struct {
+ filerClient *FilerClient
+}
+
+// FilerClientOption holds optional configuration for FilerClient
+type FilerClientOption struct {
+ GrpcTimeout time.Duration
+ UrlPreference UrlPreference
+ CacheSize int // Number of historical vidMap snapshots (0 = use default)
+ FailureThreshold int32 // Circuit breaker: consecutive failures before skipping filer (0 = use default of 3)
+ ResetTimeout time.Duration // Circuit breaker: time before re-checking unhealthy filer (0 = use default of 30s)
+ MaxRetries int // Retry: maximum retry attempts for transient failures (0 = use default of 3)
+ InitialRetryWait time.Duration // Retry: initial wait time before first retry (0 = use default of 1s)
+ RetryBackoffFactor float64 // Retry: backoff multiplier for wait time (0 = use default of 1.5)
+}
+
+// NewFilerClient creates a new client that queries filer(s) for volume locations
+// Supports multiple filer addresses for high availability with automatic failover
+// Uses sensible defaults: 5-second gRPC timeout, PreferUrl, DefaultVidMapCacheSize
+func NewFilerClient(filerAddresses []pb.ServerAddress, grpcDialOption grpc.DialOption, dataCenter string, opts ...*FilerClientOption) *FilerClient {
+ if len(filerAddresses) == 0 {
+ glog.Fatal("NewFilerClient requires at least one filer address")
+ }
+
+ // Apply defaults
+ grpcTimeout := 5 * time.Second
+ urlPref := PreferUrl
+ cacheSize := DefaultVidMapCacheSize
+ failureThreshold := int32(3) // Default: 3 consecutive failures before circuit opens
+ resetTimeout := 30 * time.Second // Default: 30 seconds before re-checking unhealthy filer
+ maxRetries := 3 // Default: 3 retry attempts for transient failures
+ initialRetryWait := time.Second // Default: 1 second initial retry wait
+ retryBackoffFactor := 1.5 // Default: 1.5x backoff multiplier
+
+ // Override with provided options
+ if len(opts) > 0 && opts[0] != nil {
+ opt := opts[0]
+ if opt.GrpcTimeout > 0 {
+ grpcTimeout = opt.GrpcTimeout
+ }
+ if opt.UrlPreference != "" {
+ urlPref = opt.UrlPreference
+ }
+ if opt.CacheSize > 0 {
+ cacheSize = opt.CacheSize
+ }
+ if opt.FailureThreshold > 0 {
+ failureThreshold = opt.FailureThreshold
+ }
+ if opt.ResetTimeout > 0 {
+ resetTimeout = opt.ResetTimeout
+ }
+ if opt.MaxRetries > 0 {
+ maxRetries = opt.MaxRetries
+ }
+ if opt.InitialRetryWait > 0 {
+ initialRetryWait = opt.InitialRetryWait
+ }
+ if opt.RetryBackoffFactor > 0 {
+ retryBackoffFactor = opt.RetryBackoffFactor
+ }
+ }
+
+ // Initialize health tracking for each filer
+ health := make([]*filerHealth, len(filerAddresses))
+ for i := range health {
+ health[i] = &filerHealth{}
+ }
+
+ fc := &FilerClient{
+ filerAddresses: filerAddresses,
+ filerIndex: 0,
+ filerHealth: health,
+ grpcDialOption: grpcDialOption,
+ urlPreference: urlPref,
+ grpcTimeout: grpcTimeout,
+ cacheSize: cacheSize,
+ clientId: rand.Int31(), // Random client ID for gRPC metadata tracking
+ failureThreshold: failureThreshold,
+ resetTimeout: resetTimeout,
+ maxRetries: maxRetries,
+ initialRetryWait: initialRetryWait,
+ retryBackoffFactor: retryBackoffFactor,
+ }
+
+ // Create provider that references this FilerClient for failover support
+ provider := &filerVolumeProvider{
+ filerClient: fc,
+ }
+
+ fc.vidMapClient = newVidMapClient(provider, dataCenter, cacheSize)
+
+ return fc
+}
+
+// GetLookupFileIdFunction returns a lookup function with URL preference handling
+func (fc *FilerClient) GetLookupFileIdFunction() LookupFileIdFunctionType {
+ if fc.urlPreference == PreferUrl {
+ // Use the default implementation from vidMapClient
+ return fc.vidMapClient.GetLookupFileIdFunction()
+ }
+
+ // Custom implementation that prefers PublicUrl
+ return func(ctx context.Context, fileId string) (fullUrls []string, err error) {
+ // Parse file ID to extract volume ID
+ parts := strings.Split(fileId, ",")
+ if len(parts) != 2 {
+ return nil, fmt.Errorf("invalid fileId format: %s", fileId)
+ }
+ volumeIdStr := parts[0]
+
+ // First try the cache using LookupVolumeIdsWithFallback
+ vidLocations, err := fc.LookupVolumeIdsWithFallback(ctx, []string{volumeIdStr})
+
+ // Check for partial results first (important for multi-volume batched lookups)
+ locations, found := vidLocations[volumeIdStr]
+ if !found || len(locations) == 0 {
+ // Volume not found - return specific error with context from lookup if available
+ if err != nil {
+ return nil, fmt.Errorf("volume %s not found for fileId %s: %w", volumeIdStr, fileId, err)
+ }
+ return nil, fmt.Errorf("volume %s not found for fileId %s", volumeIdStr, fileId)
+ }
+
+ // Volume found successfully - ignore any errors about other volumes
+ // (not relevant for single-volume lookup, but defensive for future batching)
+
+ // Build URLs with publicUrl preference, and also prefer same DC
+ var sameDcUrls, otherDcUrls []string
+ dataCenter := fc.GetDataCenter()
+ for _, loc := range locations {
+ url := loc.PublicUrl
+ if url == "" {
+ url = loc.Url
+ }
+ httpUrl := "http://" + url + "/" + fileId
+ if dataCenter != "" && dataCenter == loc.DataCenter {
+ sameDcUrls = append(sameDcUrls, httpUrl)
+ } else {
+ otherDcUrls = append(otherDcUrls, httpUrl)
+ }
+ }
+ // Shuffle to distribute load across volume servers
+ rand.Shuffle(len(sameDcUrls), func(i, j int) { sameDcUrls[i], sameDcUrls[j] = sameDcUrls[j], sameDcUrls[i] })
+ rand.Shuffle(len(otherDcUrls), func(i, j int) { otherDcUrls[i], otherDcUrls[j] = otherDcUrls[j], otherDcUrls[i] })
+ // Prefer same data center
+ fullUrls = append(sameDcUrls, otherDcUrls...)
+ return fullUrls, nil
+ }
+}
+
+// isRetryableGrpcError checks if a gRPC error is transient and should be retried
+//
+// Note on codes.Aborted: While Aborted can indicate application-level conflicts
+// (e.g., transaction failures), in the context of volume location lookups (which
+// are simple read-only operations with no transactions), Aborted is more likely
+// to indicate transient server issues during restart/recovery. We include it here
+// for volume lookups but log it for visibility in case misclassification occurs.
+func isRetryableGrpcError(err error) bool {
+ if err == nil {
+ return false
+ }
+
+ // Check gRPC status code
+ st, ok := status.FromError(err)
+ if ok {
+ switch st.Code() {
+ case codes.Unavailable: // Server unavailable (temporary)
+ return true
+ case codes.DeadlineExceeded: // Request timeout
+ return true
+ case codes.ResourceExhausted: // Rate limited or overloaded
+ return true
+ case codes.Aborted:
+ // Aborted during read-only volume lookups is likely transient
+ // (e.g., filer restarting), but log for visibility
+ glog.V(1).Infof("Treating Aborted as retryable for volume lookup: %v", err)
+ return true
+ }
+ }
+
+ // Fallback to string matching for non-gRPC errors (e.g., network errors)
+ errStr := err.Error()
+ return strings.Contains(errStr, "transport") ||
+ strings.Contains(errStr, "connection") ||
+ strings.Contains(errStr, "timeout") ||
+ strings.Contains(errStr, "unavailable")
+}
+
+// shouldSkipUnhealthyFiler checks if we should skip a filer based on recent failures
+// Circuit breaker pattern: skip filers with multiple recent consecutive failures
+func (fc *FilerClient) shouldSkipUnhealthyFiler(index int32) bool {
+ health := fc.filerHealth[index]
+ failureCount := atomic.LoadInt32(&health.failureCount)
+
+ // Check if failure count exceeds threshold
+ if failureCount < fc.failureThreshold {
+ return false
+ }
+
+ // Re-check unhealthy filers after reset timeout
+ lastFailureNs := atomic.LoadInt64(&health.lastFailureTimeNs)
+ if lastFailureNs == 0 {
+ return false // Never failed, shouldn't skip
+ }
+ lastFailureTime := time.Unix(0, lastFailureNs)
+ if time.Since(lastFailureTime) > fc.resetTimeout {
+ return false // Time to re-check
+ }
+
+ return true // Skip this unhealthy filer
+}
+
+// recordFilerSuccess resets failure tracking for a successful filer
+func (fc *FilerClient) recordFilerSuccess(index int32) {
+ health := fc.filerHealth[index]
+ atomic.StoreInt32(&health.failureCount, 0)
+}
+
+// recordFilerFailure increments failure count for an unhealthy filer
+func (fc *FilerClient) recordFilerFailure(index int32) {
+ health := fc.filerHealth[index]
+ atomic.AddInt32(&health.failureCount, 1)
+ atomic.StoreInt64(&health.lastFailureTimeNs, time.Now().UnixNano())
+}
+
+// LookupVolumeIds queries the filer for volume locations with automatic failover
+// Tries all configured filer addresses until one succeeds (high availability)
+// Retries transient gRPC errors (Unavailable, DeadlineExceeded, etc.) with exponential backoff
+// Note: Unlike master's VolumeIdLocation, filer's Locations message doesn't currently have
+// an Error field. This implementation handles the current structure while being prepared
+// for future error reporting enhancements.
+func (p *filerVolumeProvider) LookupVolumeIds(ctx context.Context, volumeIds []string) (map[string][]Location, error) {
+ fc := p.filerClient
+ result := make(map[string][]Location)
+
+ // Retry transient failures with configurable backoff
+ var lastErr error
+ waitTime := fc.initialRetryWait
+ maxRetries := fc.maxRetries
+
+ for retry := 0; retry < maxRetries; retry++ {
+ // Try all filer addresses with round-robin starting from current index
+ // Skip known-unhealthy filers (circuit breaker pattern)
+ i := atomic.LoadInt32(&fc.filerIndex)
+ n := int32(len(fc.filerAddresses))
+
+ for x := int32(0); x < n; x++ {
+ // Circuit breaker: skip unhealthy filers
+ if fc.shouldSkipUnhealthyFiler(i) {
+ glog.V(2).Infof("FilerClient: skipping unhealthy filer %s (consecutive failures: %d)",
+ fc.filerAddresses[i], atomic.LoadInt32(&fc.filerHealth[i].failureCount))
+ i++
+ if i >= n {
+ i = 0
+ }
+ continue
+ }
+
+ filerAddress := fc.filerAddresses[i]
+
+ // Use anonymous function to ensure defer cancel() is called per iteration, not accumulated
+ err := func() error {
+ // Create a fresh timeout context for each filer attempt
+ // This ensures each retry gets the full grpcTimeout, not a diminishing deadline
+ timeoutCtx, cancel := context.WithTimeout(ctx, fc.grpcTimeout)
+ defer cancel() // Always clean up context, even on panic or early return
+
+ return pb.WithGrpcFilerClient(false, fc.clientId, filerAddress, fc.grpcDialOption, func(client filer_pb.SeaweedFilerClient) error {
+ resp, err := client.LookupVolume(timeoutCtx, &filer_pb.LookupVolumeRequest{
+ VolumeIds: volumeIds,
+ })
+ if err != nil {
+ return fmt.Errorf("filer.LookupVolume failed: %w", err)
+ }
+
+ // Process each volume in the response
+ for vid, locs := range resp.LocationsMap {
+ // Convert locations from protobuf to internal format
+ var locations []Location
+ for _, loc := range locs.Locations {
+ locations = append(locations, Location{
+ Url: loc.Url,
+ PublicUrl: loc.PublicUrl,
+ DataCenter: loc.DataCenter,
+ GrpcPort: int(loc.GrpcPort),
+ })
+ }
+
+ // Only add to result if we have locations
+ // Empty locations with no gRPC error means "not found" (volume doesn't exist)
+ if len(locations) > 0 {
+ result[vid] = locations
+ glog.V(4).Infof("FilerClient: volume %s found with %d location(s)", vid, len(locations))
+ } else {
+ glog.V(2).Infof("FilerClient: volume %s not found (no locations in response)", vid)
+ }
+ }
+
+ // Check for volumes that weren't in the response at all
+ // This could indicate a problem with the filer
+ for _, vid := range volumeIds {
+ if _, found := resp.LocationsMap[vid]; !found {
+ glog.V(1).Infof("FilerClient: volume %s missing from filer response", vid)
+ }
+ }
+
+ return nil
+ })
+ }()
+
+ if err != nil {
+ glog.V(1).Infof("FilerClient: filer %s lookup failed (attempt %d/%d, retry %d/%d): %v", filerAddress, x+1, n, retry+1, maxRetries, err)
+ fc.recordFilerFailure(i)
+ lastErr = err
+ i++
+ if i >= n {
+ i = 0
+ }
+ continue
+ }
+
+ // Success - update the preferred filer index and reset health tracking
+ atomic.StoreInt32(&fc.filerIndex, i)
+ fc.recordFilerSuccess(i)
+ glog.V(3).Infof("FilerClient: looked up %d volumes on %s, found %d", len(volumeIds), filerAddress, len(result))
+ return result, nil
+ }
+
+ // All filers failed on this attempt
+ // Check if the error is retryable (transient gRPC error)
+ if !isRetryableGrpcError(lastErr) {
+ // Non-retryable error (e.g., NotFound, PermissionDenied) - fail immediately
+ return nil, fmt.Errorf("all %d filer(s) failed with non-retryable error: %w", n, lastErr)
+ }
+
+ // Transient error - retry if we have attempts left
+ if retry < maxRetries-1 {
+ glog.V(1).Infof("FilerClient: all %d filer(s) failed with retryable error (attempt %d/%d), retrying in %v: %v",
+ n, retry+1, maxRetries, waitTime, lastErr)
+ time.Sleep(waitTime)
+ waitTime = time.Duration(float64(waitTime) * fc.retryBackoffFactor)
+ }
+ }
+
+ // All retries exhausted
+ return nil, fmt.Errorf("all %d filer(s) failed after %d attempts, last error: %w", len(fc.filerAddresses), maxRetries, lastErr)
+}
diff --git a/weed/wdclient/masterclient.go b/weed/wdclient/masterclient.go
index 320156294..89218a8c7 100644
--- a/weed/wdclient/masterclient.go
+++ b/weed/wdclient/masterclient.go
@@ -5,328 +5,143 @@ import (
"errors"
"fmt"
"math/rand"
- "sort"
"strconv"
"strings"
"sync"
"time"
- "golang.org/x/sync/singleflight"
-
- "github.com/seaweedfs/seaweedfs/weed/util/version"
-
- "github.com/seaweedfs/seaweedfs/weed/stats"
-
- "github.com/seaweedfs/seaweedfs/weed/util"
"google.golang.org/grpc"
"github.com/seaweedfs/seaweedfs/weed/glog"
"github.com/seaweedfs/seaweedfs/weed/pb"
"github.com/seaweedfs/seaweedfs/weed/pb/master_pb"
+ "github.com/seaweedfs/seaweedfs/weed/stats"
+ "github.com/seaweedfs/seaweedfs/weed/util"
+ "github.com/seaweedfs/seaweedfs/weed/util/version"
)
-type MasterClient struct {
- FilerGroup string
- clientType string
- clientHost pb.ServerAddress
- rack string
- currentMaster pb.ServerAddress
- currentMasterLock sync.RWMutex
- masters pb.ServerDiscovery
- grpcDialOption grpc.DialOption
-
- // vidMap stores volume location mappings
- // Protected by vidMapLock to prevent race conditions during pointer swaps in resetVidMap
- vidMap *vidMap
- vidMapLock sync.RWMutex
- vidMapCacheSize int
- OnPeerUpdate func(update *master_pb.ClusterNodeUpdate, startFrom time.Time)
- OnPeerUpdateLock sync.RWMutex
-
- // Per-batch in-flight tracking to prevent duplicate lookups for the same set of volumes
- vidLookupGroup singleflight.Group
-}
-
-func NewMasterClient(grpcDialOption grpc.DialOption, filerGroup string, clientType string, clientHost pb.ServerAddress, clientDataCenter string, rack string, masters pb.ServerDiscovery) *MasterClient {
- return &MasterClient{
- FilerGroup: filerGroup,
- clientType: clientType,
- clientHost: clientHost,
- rack: rack,
- masters: masters,
- grpcDialOption: grpcDialOption,
- vidMap: newVidMap(clientDataCenter),
- vidMapCacheSize: 5,
- }
-}
-
-func (mc *MasterClient) SetOnPeerUpdateFn(onPeerUpdate func(update *master_pb.ClusterNodeUpdate, startFrom time.Time)) {
- mc.OnPeerUpdateLock.Lock()
- mc.OnPeerUpdate = onPeerUpdate
- mc.OnPeerUpdateLock.Unlock()
-}
-
-func (mc *MasterClient) GetLookupFileIdFunction() LookupFileIdFunctionType {
- return mc.LookupFileIdWithFallback
+// masterVolumeProvider implements VolumeLocationProvider by querying master
+// This is rarely called since master pushes updates proactively via KeepConnected stream
+type masterVolumeProvider struct {
+ masterClient *MasterClient
}
-func (mc *MasterClient) LookupFileIdWithFallback(ctx context.Context, fileId string) (fullUrls []string, err error) {
- // Try cache first using the fast path - grab both vidMap and dataCenter in one lock
- mc.vidMapLock.RLock()
- vm := mc.vidMap
- dataCenter := vm.DataCenter
- mc.vidMapLock.RUnlock()
-
- fullUrls, err = vm.LookupFileId(ctx, fileId)
- if err == nil && len(fullUrls) > 0 {
- return
- }
-
- // Extract volume ID from file ID (format: "volumeId,needle_id_cookie")
- parts := strings.Split(fileId, ",")
- if len(parts) != 2 {
- return nil, fmt.Errorf("invalid fileId %s", fileId)
- }
- volumeId := parts[0]
-
- // Use shared lookup logic with batching and singleflight
- vidLocations, err := mc.LookupVolumeIdsWithFallback(ctx, []string{volumeId})
- if err != nil {
- return nil, fmt.Errorf("LookupVolume %s failed: %v", fileId, err)
- }
-
- locations, found := vidLocations[volumeId]
- if !found || len(locations) == 0 {
- return nil, fmt.Errorf("volume %s not found for fileId %s", volumeId, fileId)
- }
-
- // Build HTTP URLs from locations, preferring same data center
- var sameDcUrls, otherDcUrls []string
- for _, loc := range locations {
- httpUrl := "http://" + loc.Url + "/" + fileId
- if dataCenter != "" && dataCenter == loc.DataCenter {
- sameDcUrls = append(sameDcUrls, httpUrl)
- } else {
- otherDcUrls = append(otherDcUrls, httpUrl)
- }
- }
-
- // Prefer same data center
- fullUrls = append(sameDcUrls, otherDcUrls...)
- return fullUrls, nil
-}
-
-// LookupVolumeIdsWithFallback looks up volume locations, querying master if not in cache
-// Uses singleflight to coalesce concurrent requests for the same batch of volumes
-func (mc *MasterClient) LookupVolumeIdsWithFallback(ctx context.Context, volumeIds []string) (map[string][]Location, error) {
+// LookupVolumeIds queries the master for volume locations (fallback when cache misses)
+// Returns partial results with aggregated errors for volumes that failed
+func (p *masterVolumeProvider) LookupVolumeIds(ctx context.Context, volumeIds []string) (map[string][]Location, error) {
result := make(map[string][]Location)
- var needsLookup []string
var lookupErrors []error
- // Check cache first and parse volume IDs once
- vidStringToUint := make(map[string]uint32, len(volumeIds))
+ glog.V(2).Infof("Looking up %d volumes from master: %v", len(volumeIds), volumeIds)
- // Get stable pointer to vidMap with minimal lock hold time
- vm := mc.getStableVidMap()
+ // Use a timeout for the master lookup to prevent indefinite blocking
+ timeoutCtx, cancel := context.WithTimeout(ctx, p.masterClient.grpcTimeout)
+ defer cancel()
- for _, vidString := range volumeIds {
- vid, err := strconv.ParseUint(vidString, 10, 32)
+ err := pb.WithMasterClient(false, p.masterClient.GetMaster(ctx), p.masterClient.grpcDialOption, false, func(client master_pb.SeaweedClient) error {
+ resp, err := client.LookupVolume(timeoutCtx, &master_pb.LookupVolumeRequest{
+ VolumeOrFileIds: volumeIds,
+ })
if err != nil {
- return nil, fmt.Errorf("invalid volume id %s: %v", vidString, err)
- }
- vidStringToUint[vidString] = uint32(vid)
-
- locations, found := vm.GetLocations(uint32(vid))
- if found && len(locations) > 0 {
- result[vidString] = locations
- } else {
- needsLookup = append(needsLookup, vidString)
+ return fmt.Errorf("master lookup failed: %v", err)
}
- }
-
- if len(needsLookup) == 0 {
- return result, nil
- }
- // Batch query all missing volumes using singleflight on the batch key
- // Sort for stable key to coalesce identical batches
- sort.Strings(needsLookup)
- batchKey := strings.Join(needsLookup, ",")
-
- sfResult, err, _ := mc.vidLookupGroup.Do(batchKey, func() (interface{}, error) {
- // Double-check cache for volumes that might have been populated while waiting
- stillNeedLookup := make([]string, 0, len(needsLookup))
- batchResult := make(map[string][]Location)
-
- // Get stable pointer with minimal lock hold time
- vm := mc.getStableVidMap()
-
- for _, vidString := range needsLookup {
- vid := vidStringToUint[vidString] // Use pre-parsed value
- if locations, found := vm.GetLocations(vid); found && len(locations) > 0 {
- batchResult[vidString] = locations
- } else {
- stillNeedLookup = append(stillNeedLookup, vidString)
+ for _, vidLoc := range resp.VolumeIdLocations {
+ // Preserve per-volume errors from master response
+ // These could indicate misconfiguration, volume deletion, etc.
+ if vidLoc.Error != "" {
+ lookupErrors = append(lookupErrors, fmt.Errorf("volume %s: %s", vidLoc.VolumeOrFileId, vidLoc.Error))
+ glog.V(1).Infof("volume %s lookup error from master: %s", vidLoc.VolumeOrFileId, vidLoc.Error)
+ continue
}
- }
-
- if len(stillNeedLookup) == 0 {
- return batchResult, nil
- }
-
- // Query master with batched volume IDs
- glog.V(2).Infof("Looking up %d volumes from master: %v", len(stillNeedLookup), stillNeedLookup)
- err := pb.WithMasterClient(false, mc.GetMaster(ctx), mc.grpcDialOption, false, func(client master_pb.SeaweedClient) error {
- resp, err := client.LookupVolume(ctx, &master_pb.LookupVolumeRequest{
- VolumeOrFileIds: stillNeedLookup,
- })
+ // Parse volume ID from response
+ parts := strings.Split(vidLoc.VolumeOrFileId, ",")
+ vidOnly := parts[0]
+ vid, err := strconv.ParseUint(vidOnly, 10, 32)
if err != nil {
- return fmt.Errorf("master lookup failed: %v", err)
+ lookupErrors = append(lookupErrors, fmt.Errorf("volume %s: invalid volume ID format: %w", vidLoc.VolumeOrFileId, err))
+ glog.Warningf("Failed to parse volume id '%s' from master response '%s': %v", vidOnly, vidLoc.VolumeOrFileId, err)
+ continue
}
- for _, vidLoc := range resp.VolumeIdLocations {
- if vidLoc.Error != "" {
- glog.V(0).Infof("volume %s lookup error: %s", vidLoc.VolumeOrFileId, vidLoc.Error)
- continue
- }
-
- // Parse volume ID from response
- parts := strings.Split(vidLoc.VolumeOrFileId, ",")
- vidOnly := parts[0]
- vid, err := strconv.ParseUint(vidOnly, 10, 32)
- if err != nil {
- glog.Warningf("Failed to parse volume id '%s' from master response '%s': %v", vidOnly, vidLoc.VolumeOrFileId, err)
- continue
- }
-
- var locations []Location
- for _, masterLoc := range vidLoc.Locations {
- loc := Location{
- Url: masterLoc.Url,
- PublicUrl: masterLoc.PublicUrl,
- GrpcPort: int(masterLoc.GrpcPort),
- DataCenter: masterLoc.DataCenter,
- }
- mc.addLocation(uint32(vid), loc)
- locations = append(locations, loc)
- }
-
- if len(locations) > 0 {
- batchResult[vidOnly] = locations
+ var locations []Location
+ for _, masterLoc := range vidLoc.Locations {
+ loc := Location{
+ Url: masterLoc.Url,
+ PublicUrl: masterLoc.PublicUrl,
+ GrpcPort: int(masterLoc.GrpcPort),
+ DataCenter: masterLoc.DataCenter,
}
+ // Update cache with the location
+ p.masterClient.addLocation(uint32(vid), loc)
+ locations = append(locations, loc)
}
- return nil
- })
- if err != nil {
- return batchResult, err
+ if len(locations) > 0 {
+ result[vidOnly] = locations
+ }
}
- return batchResult, nil
+ return nil
})
if err != nil {
- lookupErrors = append(lookupErrors, err)
+ return nil, err
}
- // Merge singleflight batch results
- if batchLocations, ok := sfResult.(map[string][]Location); ok {
- for vid, locs := range batchLocations {
- result[vid] = locs
- }
- }
-
- // Check for volumes that still weren't found
- for _, vidString := range needsLookup {
- if _, found := result[vidString]; !found {
- lookupErrors = append(lookupErrors, fmt.Errorf("volume %s not found", vidString))
- }
+ // Return partial results with detailed errors
+ // Callers should check both result map and error
+ if len(lookupErrors) > 0 {
+ glog.V(2).Infof("MasterClient: looked up %d volumes, found %d, %d errors", len(volumeIds), len(result), len(lookupErrors))
+ return result, fmt.Errorf("master volume lookup errors: %w", errors.Join(lookupErrors...))
}
- // Return aggregated errors using errors.Join to preserve error types
- return result, errors.Join(lookupErrors...)
+ glog.V(3).Infof("MasterClient: looked up %d volumes, found %d", len(volumeIds), len(result))
+ return result, nil
}
-func (mc *MasterClient) getCurrentMaster() pb.ServerAddress {
- mc.currentMasterLock.RLock()
- defer mc.currentMasterLock.RUnlock()
- return mc.currentMaster
-}
+// MasterClient connects to master servers and maintains volume location cache
+// It receives real-time updates via KeepConnected streaming and uses vidMapClient for caching
+type MasterClient struct {
+ *vidMapClient // Embedded cache with shared logic
-func (mc *MasterClient) setCurrentMaster(master pb.ServerAddress) {
- mc.currentMasterLock.Lock()
- mc.currentMaster = master
- mc.currentMasterLock.Unlock()
+ FilerGroup string
+ clientType string
+ clientHost pb.ServerAddress
+ rack string
+ currentMaster pb.ServerAddress
+ currentMasterLock sync.RWMutex
+ masters pb.ServerDiscovery
+ grpcDialOption grpc.DialOption
+ grpcTimeout time.Duration // Timeout for gRPC calls to master
+ OnPeerUpdate func(update *master_pb.ClusterNodeUpdate, startFrom time.Time)
+ OnPeerUpdateLock sync.RWMutex
}
-func (mc *MasterClient) GetMaster(ctx context.Context) pb.ServerAddress {
- mc.WaitUntilConnected(ctx)
- return mc.getCurrentMaster()
-}
+func NewMasterClient(grpcDialOption grpc.DialOption, filerGroup string, clientType string, clientHost pb.ServerAddress, clientDataCenter string, rack string, masters pb.ServerDiscovery) *MasterClient {
+ mc := &MasterClient{
+ FilerGroup: filerGroup,
+ clientType: clientType,
+ clientHost: clientHost,
+ rack: rack,
+ masters: masters,
+ grpcDialOption: grpcDialOption,
+ grpcTimeout: 5 * time.Second, // Default: 5 seconds for gRPC calls to master
+ }
-func (mc *MasterClient) GetMasters(ctx context.Context) []pb.ServerAddress {
- mc.WaitUntilConnected(ctx)
- return mc.masters.GetInstances()
-}
+ // Create provider that references this MasterClient
+ provider := &masterVolumeProvider{masterClient: mc}
-func (mc *MasterClient) WaitUntilConnected(ctx context.Context) {
- attempts := 0
- for {
- select {
- case <-ctx.Done():
- return
- default:
- currentMaster := mc.getCurrentMaster()
- if currentMaster != "" {
- return
- }
- attempts++
- if attempts%100 == 0 { // Log every 100 attempts (roughly every 20 seconds)
- glog.V(0).Infof("%s.%s WaitUntilConnected still waiting for master connection (attempt %d)...", mc.FilerGroup, mc.clientType, attempts)
- }
- time.Sleep(time.Duration(rand.Int31n(200)) * time.Millisecond)
- }
- }
-}
+ // Initialize embedded vidMapClient with the provider and default cache size
+ mc.vidMapClient = newVidMapClient(provider, clientDataCenter, DefaultVidMapCacheSize)
-func (mc *MasterClient) KeepConnectedToMaster(ctx context.Context) {
- glog.V(1).Infof("%s.%s masterClient bootstraps with masters %v", mc.FilerGroup, mc.clientType, mc.masters)
- for {
- select {
- case <-ctx.Done():
- glog.V(0).Infof("Connection to masters stopped: %v", ctx.Err())
- return
- default:
- mc.tryAllMasters(ctx)
- time.Sleep(time.Second)
- }
- }
+ return mc
}
-func (mc *MasterClient) FindLeaderFromOtherPeers(myMasterAddress pb.ServerAddress) (leader string) {
- for _, master := range mc.masters.GetInstances() {
- if master == myMasterAddress {
- continue
- }
- if grpcErr := pb.WithMasterClient(false, master, mc.grpcDialOption, false, func(client master_pb.SeaweedClient) error {
- ctx, cancel := context.WithTimeout(context.Background(), 120*time.Millisecond)
- defer cancel()
- resp, err := client.GetMasterConfiguration(ctx, &master_pb.GetMasterConfigurationRequest{})
- if err != nil {
- return err
- }
- leader = resp.Leader
- return nil
- }); grpcErr != nil {
- glog.V(0).Infof("connect to %s: %v", master, grpcErr)
- }
- if leader != "" {
- glog.V(0).Infof("existing leader is %s", leader)
- return
- }
- }
- glog.V(0).Infof("No existing leader found!")
- return
+func (mc *MasterClient) SetOnPeerUpdateFn(onPeerUpdate func(update *master_pb.ClusterNodeUpdate, startFrom time.Time)) {
+ mc.OnPeerUpdateLock.Lock()
+ mc.OnPeerUpdate = onPeerUpdate
+ mc.OnPeerUpdateLock.Unlock()
}
func (mc *MasterClient) tryAllMasters(ctx context.Context) {
@@ -393,6 +208,8 @@ func (mc *MasterClient) tryConnectToMaster(ctx context.Context, master pb.Server
mc.resetVidMap()
mc.updateVidMap(resp)
} else {
+ // First message from master is not VolumeLocation (e.g., ClusterNodeUpdate)
+ // Still need to reset cache to ensure we don't use stale data from previous master
mc.resetVidMap()
}
mc.setCurrentMaster(master)
@@ -406,7 +223,8 @@ func (mc *MasterClient) tryConnectToMaster(ctx context.Context, master pb.Server
}
if resp.VolumeLocation != nil {
- // maybe the leader is changed
+ // Check for leader change during the stream
+ // If master announces a new leader, reconnect to it
if resp.VolumeLocation.Leader != "" && string(mc.GetMaster(ctx)) != resp.VolumeLocation.Leader {
glog.V(0).Infof("currentMaster %v redirected to leader %v", mc.GetMaster(ctx), resp.VolumeLocation.Leader)
nextHintedLeader = pb.ServerAddress(resp.VolumeLocation.Leader)
@@ -415,7 +233,6 @@ func (mc *MasterClient) tryConnectToMaster(ctx context.Context, master pb.Server
}
mc.updateVidMap(resp)
}
-
if resp.ClusterNodeUpdate != nil {
update := resp.ClusterNodeUpdate
mc.OnPeerUpdateLock.RLock()
@@ -442,7 +259,7 @@ func (mc *MasterClient) tryConnectToMaster(ctx context.Context, master pb.Server
stats.MasterClientConnectCounter.WithLabelValues(stats.Failed).Inc()
glog.V(1).Infof("%s.%s masterClient failed to connect with master %v: %v", mc.FilerGroup, mc.clientType, master, gprcErr)
}
- return
+ return nextHintedLeader
}
func (mc *MasterClient) updateVidMap(resp *master_pb.KeepConnectedResponse) {
@@ -494,110 +311,103 @@ func (mc *MasterClient) WithClientCustomGetMaster(getMasterF func() pb.ServerAdd
})
}
-// getStableVidMap gets a stable pointer to the vidMap, releasing the lock immediately.
-// This is safe for read operations as the returned pointer is a stable snapshot,
-// and the underlying vidMap methods have their own internal locking.
-func (mc *MasterClient) getStableVidMap() *vidMap {
- mc.vidMapLock.RLock()
- vm := mc.vidMap
- mc.vidMapLock.RUnlock()
- return vm
-}
-
-// withCurrentVidMap executes a function with the current vidMap under a read lock.
-// This is for methods that modify vidMap's internal state, ensuring the pointer
-// is not swapped by resetVidMap during the operation. The actual map mutations
-// are protected by vidMap's internal mutex.
-func (mc *MasterClient) withCurrentVidMap(f func(vm *vidMap)) {
- mc.vidMapLock.RLock()
- defer mc.vidMapLock.RUnlock()
- f(mc.vidMap)
-}
-
-// Public methods for external packages to access vidMap safely
-
-// GetLocations safely retrieves volume locations
-func (mc *MasterClient) GetLocations(vid uint32) (locations []Location, found bool) {
- return mc.getStableVidMap().GetLocations(vid)
-}
-
-// GetLocationsClone safely retrieves a clone of volume locations
-func (mc *MasterClient) GetLocationsClone(vid uint32) (locations []Location, found bool) {
- return mc.getStableVidMap().GetLocationsClone(vid)
-}
-
-// GetVidLocations safely retrieves volume locations by string ID
-func (mc *MasterClient) GetVidLocations(vid string) (locations []Location, err error) {
- return mc.getStableVidMap().GetVidLocations(vid)
-}
-
-// LookupFileId safely looks up URLs for a file ID
-func (mc *MasterClient) LookupFileId(ctx context.Context, fileId string) (fullUrls []string, err error) {
- return mc.getStableVidMap().LookupFileId(ctx, fileId)
-}
-
-// LookupVolumeServerUrl safely looks up volume server URLs
-func (mc *MasterClient) LookupVolumeServerUrl(vid string) (serverUrls []string, err error) {
- return mc.getStableVidMap().LookupVolumeServerUrl(vid)
+func (mc *MasterClient) getCurrentMaster() pb.ServerAddress {
+ mc.currentMasterLock.RLock()
+ defer mc.currentMasterLock.RUnlock()
+ return mc.currentMaster
}
-// GetDataCenter safely retrieves the data center
-func (mc *MasterClient) GetDataCenter() string {
- return mc.getStableVidMap().DataCenter
+func (mc *MasterClient) setCurrentMaster(master pb.ServerAddress) {
+ mc.currentMasterLock.Lock()
+ mc.currentMaster = master
+ mc.currentMasterLock.Unlock()
}
-// Thread-safe helpers for vidMap operations
-
-// addLocation adds a volume location
-func (mc *MasterClient) addLocation(vid uint32, location Location) {
- mc.withCurrentVidMap(func(vm *vidMap) {
- vm.addLocation(vid, location)
- })
+// GetMaster returns the current master address, blocking until connected.
+//
+// IMPORTANT: This method blocks until KeepConnectedToMaster successfully establishes
+// a connection to a master server. If KeepConnectedToMaster hasn't been started in a
+// background goroutine, this will block indefinitely (or until ctx is canceled).
+//
+// Typical initialization pattern:
+//
+// mc := wdclient.NewMasterClient(...)
+// go mc.KeepConnectedToMaster(ctx) // Start connection management
+// // ... later ...
+// master := mc.GetMaster(ctx) // Will block until connected
+//
+// If called before KeepConnectedToMaster establishes a connection, this may cause
+// unexpected timeouts in LookupVolumeIds and other operations that depend on it.
+func (mc *MasterClient) GetMaster(ctx context.Context) pb.ServerAddress {
+ mc.WaitUntilConnected(ctx)
+ return mc.getCurrentMaster()
}
-// deleteLocation removes a volume location
-func (mc *MasterClient) deleteLocation(vid uint32, location Location) {
- mc.withCurrentVidMap(func(vm *vidMap) {
- vm.deleteLocation(vid, location)
- })
+// GetMasters returns all configured master addresses, blocking until connected.
+// See GetMaster() for important initialization contract details.
+func (mc *MasterClient) GetMasters(ctx context.Context) []pb.ServerAddress {
+ mc.WaitUntilConnected(ctx)
+ return mc.masters.GetInstances()
}
-// addEcLocation adds an EC volume location
-func (mc *MasterClient) addEcLocation(vid uint32, location Location) {
- mc.withCurrentVidMap(func(vm *vidMap) {
- vm.addEcLocation(vid, location)
- })
+// WaitUntilConnected blocks until a master connection is established or ctx is canceled.
+// This does NOT initiate connections - it only waits for KeepConnectedToMaster to succeed.
+func (mc *MasterClient) WaitUntilConnected(ctx context.Context) {
+ attempts := 0
+ for {
+ select {
+ case <-ctx.Done():
+ return
+ default:
+ currentMaster := mc.getCurrentMaster()
+ if currentMaster != "" {
+ return
+ }
+ attempts++
+ if attempts%100 == 0 { // Log every 100 attempts (roughly every 20 seconds)
+ glog.V(0).Infof("%s.%s WaitUntilConnected still waiting for master connection (attempt %d)...", mc.FilerGroup, mc.clientType, attempts)
+ }
+ time.Sleep(time.Duration(rand.Int31n(200)) * time.Millisecond)
+ }
+ }
}
-// deleteEcLocation removes an EC volume location
-func (mc *MasterClient) deleteEcLocation(vid uint32, location Location) {
- mc.withCurrentVidMap(func(vm *vidMap) {
- vm.deleteEcLocation(vid, location)
- })
+func (mc *MasterClient) KeepConnectedToMaster(ctx context.Context) {
+ glog.V(1).Infof("%s.%s masterClient bootstraps with masters %v", mc.FilerGroup, mc.clientType, mc.masters)
+ for {
+ select {
+ case <-ctx.Done():
+ glog.V(0).Infof("Connection to masters stopped: %v", ctx.Err())
+ return
+ default:
+ mc.tryAllMasters(ctx)
+ time.Sleep(time.Second)
+ }
+ }
}
-func (mc *MasterClient) resetVidMap() {
- mc.vidMapLock.Lock()
- defer mc.vidMapLock.Unlock()
-
- // Preserve the existing vidMap in the cache chain
- // No need to clone - the existing vidMap has its own mutex for thread safety
- tail := mc.vidMap
-
- nvm := newVidMap(tail.DataCenter)
- nvm.cache.Store(tail)
- mc.vidMap = nvm
-
- // Trim cache chain to vidMapCacheSize by traversing to the last node
- // that should remain and cutting the chain after it
- node := tail
- for i := 0; i < mc.vidMapCacheSize-1; i++ {
- if node.cache.Load() == nil {
+func (mc *MasterClient) FindLeaderFromOtherPeers(myMasterAddress pb.ServerAddress) (leader string) {
+ for _, master := range mc.masters.GetInstances() {
+ if master == myMasterAddress {
+ continue
+ }
+ if grpcErr := pb.WithMasterClient(false, master, mc.grpcDialOption, false, func(client master_pb.SeaweedClient) error {
+ ctx, cancel := context.WithTimeout(context.Background(), 120*time.Millisecond)
+ defer cancel()
+ resp, err := client.GetMasterConfiguration(ctx, &master_pb.GetMasterConfigurationRequest{})
+ if err != nil {
+ return err
+ }
+ leader = resp.Leader
+ return nil
+ }); grpcErr != nil {
+ glog.V(0).Infof("connect to %s: %v", master, grpcErr)
+ }
+ if leader != "" {
+ glog.V(0).Infof("existing leader is %s", leader)
return
}
- node = node.cache.Load()
- }
- if node != nil {
- node.cache.Store(nil)
}
+ glog.V(0).Infof("No existing leader found!")
+ return
}
diff --git a/weed/wdclient/vidmap_client.go b/weed/wdclient/vidmap_client.go
new file mode 100644
index 000000000..402eaf8c4
--- /dev/null
+++ b/weed/wdclient/vidmap_client.go
@@ -0,0 +1,347 @@
+package wdclient
+
+import (
+ "context"
+ "errors"
+ "fmt"
+ "math/rand"
+ "sort"
+ "strconv"
+ "strings"
+ "sync"
+
+ "golang.org/x/sync/singleflight"
+
+ "github.com/seaweedfs/seaweedfs/weed/glog"
+)
+
+// VolumeLocationProvider is the interface for looking up volume locations
+// This allows different implementations (master subscription, filer queries, etc.)
+type VolumeLocationProvider interface {
+ // LookupVolumeIds looks up volume locations for the given volume IDs
+ // Returns a map of volume ID to locations
+ LookupVolumeIds(ctx context.Context, volumeIds []string) (map[string][]Location, error)
+}
+
+// vidMapClient provides volume location caching with pluggable lookup
+// It wraps the battle-tested vidMap with customizable volume lookup strategies
+type vidMapClient struct {
+ vidMap *vidMap
+ vidMapLock sync.RWMutex
+ vidMapCacheSize int
+ provider VolumeLocationProvider
+ vidLookupGroup singleflight.Group
+}
+
+const (
+ // DefaultVidMapCacheSize is the default number of historical vidMap snapshots to keep
+ // This provides cache history when volumes move between servers
+ DefaultVidMapCacheSize = 5
+)
+
+// newVidMapClient creates a new client with the given provider and data center
+func newVidMapClient(provider VolumeLocationProvider, dataCenter string, cacheSize int) *vidMapClient {
+ if cacheSize <= 0 {
+ cacheSize = DefaultVidMapCacheSize
+ }
+ return &vidMapClient{
+ vidMap: newVidMap(dataCenter),
+ vidMapCacheSize: cacheSize,
+ provider: provider,
+ }
+}
+
+// GetLookupFileIdFunction returns a function that can be used to lookup file IDs
+func (vc *vidMapClient) GetLookupFileIdFunction() LookupFileIdFunctionType {
+ return vc.LookupFileIdWithFallback
+}
+
+// LookupFileIdWithFallback looks up a file ID, checking cache first, then using provider
+func (vc *vidMapClient) LookupFileIdWithFallback(ctx context.Context, fileId string) (fullUrls []string, err error) {
+ // Try cache first - hold read lock during entire vidMap access to prevent swap during operation
+ vc.vidMapLock.RLock()
+ vm := vc.vidMap
+ dataCenter := vm.DataCenter
+ fullUrls, err = vm.LookupFileId(ctx, fileId)
+ vc.vidMapLock.RUnlock()
+
+ // Cache hit - return immediately
+ if err == nil && len(fullUrls) > 0 {
+ return
+ }
+
+ // Cache miss - extract volume ID from file ID (format: "volumeId,needle_id_cookie")
+ parts := strings.Split(fileId, ",")
+ if len(parts) != 2 {
+ return nil, fmt.Errorf("invalid fileId %s", fileId)
+ }
+ volumeId := parts[0]
+
+ // Use shared lookup logic with batching and singleflight
+ vidLocations, err := vc.LookupVolumeIdsWithFallback(ctx, []string{volumeId})
+
+ // Check for partial results first (important for multi-volume batched lookups)
+ locations, found := vidLocations[volumeId]
+ if !found || len(locations) == 0 {
+ // Volume not found - return specific error with context from lookup if available
+ if err != nil {
+ return nil, fmt.Errorf("volume %s not found for fileId %s: %w", volumeId, fileId, err)
+ }
+ return nil, fmt.Errorf("volume %s not found for fileId %s", volumeId, fileId)
+ }
+
+ // Volume found successfully - ignore any errors about other volumes
+ // (not relevant for single-volume lookup, but defensive for future batching)
+
+ // Build HTTP URLs from locations, preferring same data center
+ var sameDcUrls, otherDcUrls []string
+ for _, loc := range locations {
+ httpUrl := "http://" + loc.Url + "/" + fileId
+ if dataCenter != "" && dataCenter == loc.DataCenter {
+ sameDcUrls = append(sameDcUrls, httpUrl)
+ } else {
+ otherDcUrls = append(otherDcUrls, httpUrl)
+ }
+ }
+
+ // Shuffle to distribute load across volume servers
+ rand.Shuffle(len(sameDcUrls), func(i, j int) { sameDcUrls[i], sameDcUrls[j] = sameDcUrls[j], sameDcUrls[i] })
+ rand.Shuffle(len(otherDcUrls), func(i, j int) { otherDcUrls[i], otherDcUrls[j] = otherDcUrls[j], otherDcUrls[i] })
+
+ // Prefer same data center
+ fullUrls = append(sameDcUrls, otherDcUrls...)
+ return fullUrls, nil
+}
+
+// LookupVolumeIdsWithFallback looks up volume locations, querying provider if not in cache.
+// Uses singleflight to coalesce concurrent requests for the same batch of volumes.
+//
+// IMPORTANT: This function may return PARTIAL results with a non-nil error.
+// The result map contains successfully looked up volumes, while the error aggregates
+// failures for volumes that couldn't be found or had lookup errors.
+//
+// Callers MUST check both the result map AND the error:
+// - result != nil && err == nil: All volumes found successfully
+// - result != nil && err != nil: Some volumes found, some failed (check both)
+// - result == nil && err != nil: Complete failure (connection error, etc.)
+//
+// Example usage:
+//
+// locs, err := mc.LookupVolumeIdsWithFallback(ctx, []string{"1", "2", "999"})
+// if len(locs) > 0 {
+// // Process successfully found volumes
+// }
+// if err != nil {
+// // Log/handle failed volumes
+// }
+func (vc *vidMapClient) LookupVolumeIdsWithFallback(ctx context.Context, volumeIds []string) (map[string][]Location, error) {
+ result := make(map[string][]Location)
+ var needsLookup []string
+ var lookupErrors []error
+
+ // Check cache first and parse volume IDs once
+ vidStringToUint := make(map[string]uint32, len(volumeIds))
+
+ // Get stable pointer to vidMap with minimal lock hold time
+ vm := vc.getStableVidMap()
+
+ for _, vidString := range volumeIds {
+ vid, err := strconv.ParseUint(vidString, 10, 32)
+ if err != nil {
+ return nil, fmt.Errorf("invalid volume id %s: %v", vidString, err)
+ }
+ vidStringToUint[vidString] = uint32(vid)
+
+ locations, found := vm.GetLocations(uint32(vid))
+ if found && len(locations) > 0 {
+ result[vidString] = locations
+ } else {
+ needsLookup = append(needsLookup, vidString)
+ }
+ }
+
+ if len(needsLookup) == 0 {
+ return result, nil
+ }
+
+ // Batch query all missing volumes using singleflight on the batch key
+ // Sort for stable key to coalesce identical batches
+ sort.Strings(needsLookup)
+ batchKey := strings.Join(needsLookup, ",")
+
+ sfResult, err, _ := vc.vidLookupGroup.Do(batchKey, func() (interface{}, error) {
+ // Double-check cache for volumes that might have been populated while waiting
+ stillNeedLookup := make([]string, 0, len(needsLookup))
+ batchResult := make(map[string][]Location)
+
+ // Get stable pointer with minimal lock hold time
+ vm := vc.getStableVidMap()
+
+ for _, vidString := range needsLookup {
+ vid := vidStringToUint[vidString] // Use pre-parsed value
+ if locations, found := vm.GetLocations(vid); found && len(locations) > 0 {
+ batchResult[vidString] = locations
+ } else {
+ stillNeedLookup = append(stillNeedLookup, vidString)
+ }
+ }
+
+ if len(stillNeedLookup) == 0 {
+ return batchResult, nil
+ }
+
+ // Query provider with batched volume IDs
+ glog.V(2).Infof("Looking up %d volumes from provider: %v", len(stillNeedLookup), stillNeedLookup)
+
+ providerResults, err := vc.provider.LookupVolumeIds(ctx, stillNeedLookup)
+ if err != nil {
+ return batchResult, fmt.Errorf("provider lookup failed: %v", err)
+ }
+
+ // Update cache with results
+ for vidString, locations := range providerResults {
+ vid, err := strconv.ParseUint(vidString, 10, 32)
+ if err != nil {
+ glog.Warningf("Failed to parse volume id '%s': %v", vidString, err)
+ continue
+ }
+
+ for _, loc := range locations {
+ vc.addLocation(uint32(vid), loc)
+ }
+
+ if len(locations) > 0 {
+ batchResult[vidString] = locations
+ }
+ }
+
+ return batchResult, nil
+ })
+
+ if err != nil {
+ lookupErrors = append(lookupErrors, err)
+ }
+
+ // Merge singleflight batch results
+ if batchLocations, ok := sfResult.(map[string][]Location); ok {
+ for vid, locs := range batchLocations {
+ result[vid] = locs
+ }
+ }
+
+ // Check for volumes that still weren't found
+ for _, vidString := range needsLookup {
+ if _, found := result[vidString]; !found {
+ lookupErrors = append(lookupErrors, fmt.Errorf("volume %s not found", vidString))
+ }
+ }
+
+ // Return aggregated errors
+ return result, errors.Join(lookupErrors...)
+}
+
+// getStableVidMap gets a stable pointer to the vidMap, releasing the lock immediately.
+// WARNING: Use with caution. The returned vidMap pointer is stable (won't be garbage collected
+// due to cache chain), but the vidMapClient.vidMap field may be swapped by resetVidMap().
+// For operations that must use the current vidMap atomically, use withCurrentVidMap() instead.
+func (vc *vidMapClient) getStableVidMap() *vidMap {
+ vc.vidMapLock.RLock()
+ vm := vc.vidMap
+ vc.vidMapLock.RUnlock()
+ return vm
+}
+
+// withCurrentVidMap executes a function with the current vidMap under a read lock.
+// This guarantees the vidMap instance cannot be swapped during the function execution.
+// Use this when you need atomic access to the current vidMap for multiple operations.
+func (vc *vidMapClient) withCurrentVidMap(f func(vm *vidMap)) {
+ vc.vidMapLock.RLock()
+ defer vc.vidMapLock.RUnlock()
+ f(vc.vidMap)
+}
+
+// Public methods for external access
+
+// GetLocations safely retrieves volume locations
+func (vc *vidMapClient) GetLocations(vid uint32) (locations []Location, found bool) {
+ return vc.getStableVidMap().GetLocations(vid)
+}
+
+// GetLocationsClone safely retrieves a clone of volume locations
+func (vc *vidMapClient) GetLocationsClone(vid uint32) (locations []Location, found bool) {
+ return vc.getStableVidMap().GetLocationsClone(vid)
+}
+
+// GetVidLocations safely retrieves volume locations by string ID
+func (vc *vidMapClient) GetVidLocations(vid string) (locations []Location, err error) {
+ return vc.getStableVidMap().GetVidLocations(vid)
+}
+
+// LookupFileId safely looks up URLs for a file ID
+func (vc *vidMapClient) LookupFileId(ctx context.Context, fileId string) (fullUrls []string, err error) {
+ return vc.getStableVidMap().LookupFileId(ctx, fileId)
+}
+
+// LookupVolumeServerUrl safely looks up volume server URLs
+func (vc *vidMapClient) LookupVolumeServerUrl(vid string) (serverUrls []string, err error) {
+ return vc.getStableVidMap().LookupVolumeServerUrl(vid)
+}
+
+// GetDataCenter safely retrieves the data center
+func (vc *vidMapClient) GetDataCenter() string {
+ return vc.getStableVidMap().DataCenter
+}
+
+// Thread-safe helpers for vidMap operations
+
+// addLocation adds a volume location
+func (vc *vidMapClient) addLocation(vid uint32, location Location) {
+ vc.withCurrentVidMap(func(vm *vidMap) {
+ vm.addLocation(vid, location)
+ })
+}
+
+// deleteLocation removes a volume location
+func (vc *vidMapClient) deleteLocation(vid uint32, location Location) {
+ vc.withCurrentVidMap(func(vm *vidMap) {
+ vm.deleteLocation(vid, location)
+ })
+}
+
+// addEcLocation adds an EC volume location
+func (vc *vidMapClient) addEcLocation(vid uint32, location Location) {
+ vc.withCurrentVidMap(func(vm *vidMap) {
+ vm.addEcLocation(vid, location)
+ })
+}
+
+// deleteEcLocation removes an EC volume location
+func (vc *vidMapClient) deleteEcLocation(vid uint32, location Location) {
+ vc.withCurrentVidMap(func(vm *vidMap) {
+ vm.deleteEcLocation(vid, location)
+ })
+}
+
+// resetVidMap resets the volume ID map
+func (vc *vidMapClient) resetVidMap() {
+ vc.vidMapLock.Lock()
+ defer vc.vidMapLock.Unlock()
+
+ // Preserve the existing vidMap in the cache chain
+ tail := vc.vidMap
+
+ nvm := newVidMap(tail.DataCenter)
+ nvm.cache.Store(tail)
+ vc.vidMap = nvm
+
+ // Trim cache chain to vidMapCacheSize
+ node := tail
+ for i := 0; i < vc.vidMapCacheSize-1; i++ {
+ if node.cache.Load() == nil {
+ return
+ }
+ node = node.cache.Load()
+ }
+ // node is guaranteed to be non-nil after the loop
+ node.cache.Store(nil)
+}