aboutsummaryrefslogtreecommitdiff
path: root/weed/wdclient/filer_client.go
diff options
context:
space:
mode:
Diffstat (limited to 'weed/wdclient/filer_client.go')
-rw-r--r--weed/wdclient/filer_client.go404
1 files changed, 404 insertions, 0 deletions
diff --git a/weed/wdclient/filer_client.go b/weed/wdclient/filer_client.go
new file mode 100644
index 000000000..f0dd5f2e6
--- /dev/null
+++ b/weed/wdclient/filer_client.go
@@ -0,0 +1,404 @@
+package wdclient
+
+import (
+ "context"
+ "fmt"
+ "math/rand"
+ "strings"
+ "sync/atomic"
+ "time"
+
+ "google.golang.org/grpc"
+ "google.golang.org/grpc/codes"
+ "google.golang.org/grpc/status"
+
+ "github.com/seaweedfs/seaweedfs/weed/glog"
+ "github.com/seaweedfs/seaweedfs/weed/pb"
+ "github.com/seaweedfs/seaweedfs/weed/pb/filer_pb"
+)
+
+// UrlPreference controls which URL to use for volume access
+type UrlPreference string
+
+const (
+ PreferUrl UrlPreference = "url" // Use private URL (default)
+ PreferPublicUrl UrlPreference = "publicUrl" // Use public URL
+)
+
+// filerHealth tracks the health status of a filer
+type filerHealth struct {
+ failureCount int32 // atomic: consecutive failures
+ lastFailureTimeNs int64 // atomic: last failure time in Unix nanoseconds
+}
+
+// FilerClient provides volume location services by querying a filer
+// It uses the shared vidMap cache for efficient lookups
+// Supports multiple filer addresses with automatic failover for high availability
+// Tracks filer health to avoid repeatedly trying known-unhealthy filers
+type FilerClient struct {
+ *vidMapClient
+ filerAddresses []pb.ServerAddress
+ filerIndex int32 // atomic: current filer index for round-robin
+ filerHealth []*filerHealth // health status per filer (same order as filerAddresses)
+ grpcDialOption grpc.DialOption
+ urlPreference UrlPreference
+ grpcTimeout time.Duration
+ cacheSize int // Number of historical vidMap snapshots to keep
+ clientId int32 // Unique client identifier for gRPC metadata
+ failureThreshold int32 // Circuit breaker: consecutive failures before circuit opens
+ resetTimeout time.Duration // Circuit breaker: time before re-checking unhealthy filer
+ maxRetries int // Retry: maximum retry attempts for transient failures
+ initialRetryWait time.Duration // Retry: initial wait time before first retry
+ retryBackoffFactor float64 // Retry: backoff multiplier for wait time
+}
+
+// filerVolumeProvider implements VolumeLocationProvider by querying filer
+// Supports multiple filer addresses with automatic failover
+type filerVolumeProvider struct {
+ filerClient *FilerClient
+}
+
+// FilerClientOption holds optional configuration for FilerClient
+type FilerClientOption struct {
+ GrpcTimeout time.Duration
+ UrlPreference UrlPreference
+ CacheSize int // Number of historical vidMap snapshots (0 = use default)
+ FailureThreshold int32 // Circuit breaker: consecutive failures before skipping filer (0 = use default of 3)
+ ResetTimeout time.Duration // Circuit breaker: time before re-checking unhealthy filer (0 = use default of 30s)
+ MaxRetries int // Retry: maximum retry attempts for transient failures (0 = use default of 3)
+ InitialRetryWait time.Duration // Retry: initial wait time before first retry (0 = use default of 1s)
+ RetryBackoffFactor float64 // Retry: backoff multiplier for wait time (0 = use default of 1.5)
+}
+
+// NewFilerClient creates a new client that queries filer(s) for volume locations
+// Supports multiple filer addresses for high availability with automatic failover
+// Uses sensible defaults: 5-second gRPC timeout, PreferUrl, DefaultVidMapCacheSize
+func NewFilerClient(filerAddresses []pb.ServerAddress, grpcDialOption grpc.DialOption, dataCenter string, opts ...*FilerClientOption) *FilerClient {
+ if len(filerAddresses) == 0 {
+ glog.Fatal("NewFilerClient requires at least one filer address")
+ }
+
+ // Apply defaults
+ grpcTimeout := 5 * time.Second
+ urlPref := PreferUrl
+ cacheSize := DefaultVidMapCacheSize
+ failureThreshold := int32(3) // Default: 3 consecutive failures before circuit opens
+ resetTimeout := 30 * time.Second // Default: 30 seconds before re-checking unhealthy filer
+ maxRetries := 3 // Default: 3 retry attempts for transient failures
+ initialRetryWait := time.Second // Default: 1 second initial retry wait
+ retryBackoffFactor := 1.5 // Default: 1.5x backoff multiplier
+
+ // Override with provided options
+ if len(opts) > 0 && opts[0] != nil {
+ opt := opts[0]
+ if opt.GrpcTimeout > 0 {
+ grpcTimeout = opt.GrpcTimeout
+ }
+ if opt.UrlPreference != "" {
+ urlPref = opt.UrlPreference
+ }
+ if opt.CacheSize > 0 {
+ cacheSize = opt.CacheSize
+ }
+ if opt.FailureThreshold > 0 {
+ failureThreshold = opt.FailureThreshold
+ }
+ if opt.ResetTimeout > 0 {
+ resetTimeout = opt.ResetTimeout
+ }
+ if opt.MaxRetries > 0 {
+ maxRetries = opt.MaxRetries
+ }
+ if opt.InitialRetryWait > 0 {
+ initialRetryWait = opt.InitialRetryWait
+ }
+ if opt.RetryBackoffFactor > 0 {
+ retryBackoffFactor = opt.RetryBackoffFactor
+ }
+ }
+
+ // Initialize health tracking for each filer
+ health := make([]*filerHealth, len(filerAddresses))
+ for i := range health {
+ health[i] = &filerHealth{}
+ }
+
+ fc := &FilerClient{
+ filerAddresses: filerAddresses,
+ filerIndex: 0,
+ filerHealth: health,
+ grpcDialOption: grpcDialOption,
+ urlPreference: urlPref,
+ grpcTimeout: grpcTimeout,
+ cacheSize: cacheSize,
+ clientId: rand.Int31(), // Random client ID for gRPC metadata tracking
+ failureThreshold: failureThreshold,
+ resetTimeout: resetTimeout,
+ maxRetries: maxRetries,
+ initialRetryWait: initialRetryWait,
+ retryBackoffFactor: retryBackoffFactor,
+ }
+
+ // Create provider that references this FilerClient for failover support
+ provider := &filerVolumeProvider{
+ filerClient: fc,
+ }
+
+ fc.vidMapClient = newVidMapClient(provider, dataCenter, cacheSize)
+
+ return fc
+}
+
+// GetLookupFileIdFunction returns a lookup function with URL preference handling
+func (fc *FilerClient) GetLookupFileIdFunction() LookupFileIdFunctionType {
+ if fc.urlPreference == PreferUrl {
+ // Use the default implementation from vidMapClient
+ return fc.vidMapClient.GetLookupFileIdFunction()
+ }
+
+ // Custom implementation that prefers PublicUrl
+ return func(ctx context.Context, fileId string) (fullUrls []string, err error) {
+ // Parse file ID to extract volume ID
+ parts := strings.Split(fileId, ",")
+ if len(parts) != 2 {
+ return nil, fmt.Errorf("invalid fileId format: %s", fileId)
+ }
+ volumeIdStr := parts[0]
+
+ // First try the cache using LookupVolumeIdsWithFallback
+ vidLocations, err := fc.LookupVolumeIdsWithFallback(ctx, []string{volumeIdStr})
+
+ // Check for partial results first (important for multi-volume batched lookups)
+ locations, found := vidLocations[volumeIdStr]
+ if !found || len(locations) == 0 {
+ // Volume not found - return specific error with context from lookup if available
+ if err != nil {
+ return nil, fmt.Errorf("volume %s not found for fileId %s: %w", volumeIdStr, fileId, err)
+ }
+ return nil, fmt.Errorf("volume %s not found for fileId %s", volumeIdStr, fileId)
+ }
+
+ // Volume found successfully - ignore any errors about other volumes
+ // (not relevant for single-volume lookup, but defensive for future batching)
+
+ // Build URLs with publicUrl preference, and also prefer same DC
+ var sameDcUrls, otherDcUrls []string
+ dataCenter := fc.GetDataCenter()
+ for _, loc := range locations {
+ url := loc.PublicUrl
+ if url == "" {
+ url = loc.Url
+ }
+ httpUrl := "http://" + url + "/" + fileId
+ if dataCenter != "" && dataCenter == loc.DataCenter {
+ sameDcUrls = append(sameDcUrls, httpUrl)
+ } else {
+ otherDcUrls = append(otherDcUrls, httpUrl)
+ }
+ }
+ // Shuffle to distribute load across volume servers
+ rand.Shuffle(len(sameDcUrls), func(i, j int) { sameDcUrls[i], sameDcUrls[j] = sameDcUrls[j], sameDcUrls[i] })
+ rand.Shuffle(len(otherDcUrls), func(i, j int) { otherDcUrls[i], otherDcUrls[j] = otherDcUrls[j], otherDcUrls[i] })
+ // Prefer same data center
+ fullUrls = append(sameDcUrls, otherDcUrls...)
+ return fullUrls, nil
+ }
+}
+
+// isRetryableGrpcError checks if a gRPC error is transient and should be retried
+//
+// Note on codes.Aborted: While Aborted can indicate application-level conflicts
+// (e.g., transaction failures), in the context of volume location lookups (which
+// are simple read-only operations with no transactions), Aborted is more likely
+// to indicate transient server issues during restart/recovery. We include it here
+// for volume lookups but log it for visibility in case misclassification occurs.
+func isRetryableGrpcError(err error) bool {
+ if err == nil {
+ return false
+ }
+
+ // Check gRPC status code
+ st, ok := status.FromError(err)
+ if ok {
+ switch st.Code() {
+ case codes.Unavailable: // Server unavailable (temporary)
+ return true
+ case codes.DeadlineExceeded: // Request timeout
+ return true
+ case codes.ResourceExhausted: // Rate limited or overloaded
+ return true
+ case codes.Aborted:
+ // Aborted during read-only volume lookups is likely transient
+ // (e.g., filer restarting), but log for visibility
+ glog.V(1).Infof("Treating Aborted as retryable for volume lookup: %v", err)
+ return true
+ }
+ }
+
+ // Fallback to string matching for non-gRPC errors (e.g., network errors)
+ errStr := err.Error()
+ return strings.Contains(errStr, "transport") ||
+ strings.Contains(errStr, "connection") ||
+ strings.Contains(errStr, "timeout") ||
+ strings.Contains(errStr, "unavailable")
+}
+
+// shouldSkipUnhealthyFiler checks if we should skip a filer based on recent failures
+// Circuit breaker pattern: skip filers with multiple recent consecutive failures
+func (fc *FilerClient) shouldSkipUnhealthyFiler(index int32) bool {
+ health := fc.filerHealth[index]
+ failureCount := atomic.LoadInt32(&health.failureCount)
+
+ // Check if failure count exceeds threshold
+ if failureCount < fc.failureThreshold {
+ return false
+ }
+
+ // Re-check unhealthy filers after reset timeout
+ lastFailureNs := atomic.LoadInt64(&health.lastFailureTimeNs)
+ if lastFailureNs == 0 {
+ return false // Never failed, shouldn't skip
+ }
+ lastFailureTime := time.Unix(0, lastFailureNs)
+ if time.Since(lastFailureTime) > fc.resetTimeout {
+ return false // Time to re-check
+ }
+
+ return true // Skip this unhealthy filer
+}
+
+// recordFilerSuccess resets failure tracking for a successful filer
+func (fc *FilerClient) recordFilerSuccess(index int32) {
+ health := fc.filerHealth[index]
+ atomic.StoreInt32(&health.failureCount, 0)
+}
+
+// recordFilerFailure increments failure count for an unhealthy filer
+func (fc *FilerClient) recordFilerFailure(index int32) {
+ health := fc.filerHealth[index]
+ atomic.AddInt32(&health.failureCount, 1)
+ atomic.StoreInt64(&health.lastFailureTimeNs, time.Now().UnixNano())
+}
+
+// LookupVolumeIds queries the filer for volume locations with automatic failover
+// Tries all configured filer addresses until one succeeds (high availability)
+// Retries transient gRPC errors (Unavailable, DeadlineExceeded, etc.) with exponential backoff
+// Note: Unlike master's VolumeIdLocation, filer's Locations message doesn't currently have
+// an Error field. This implementation handles the current structure while being prepared
+// for future error reporting enhancements.
+func (p *filerVolumeProvider) LookupVolumeIds(ctx context.Context, volumeIds []string) (map[string][]Location, error) {
+ fc := p.filerClient
+ result := make(map[string][]Location)
+
+ // Retry transient failures with configurable backoff
+ var lastErr error
+ waitTime := fc.initialRetryWait
+ maxRetries := fc.maxRetries
+
+ for retry := 0; retry < maxRetries; retry++ {
+ // Try all filer addresses with round-robin starting from current index
+ // Skip known-unhealthy filers (circuit breaker pattern)
+ i := atomic.LoadInt32(&fc.filerIndex)
+ n := int32(len(fc.filerAddresses))
+
+ for x := int32(0); x < n; x++ {
+ // Circuit breaker: skip unhealthy filers
+ if fc.shouldSkipUnhealthyFiler(i) {
+ glog.V(2).Infof("FilerClient: skipping unhealthy filer %s (consecutive failures: %d)",
+ fc.filerAddresses[i], atomic.LoadInt32(&fc.filerHealth[i].failureCount))
+ i++
+ if i >= n {
+ i = 0
+ }
+ continue
+ }
+
+ filerAddress := fc.filerAddresses[i]
+
+ // Use anonymous function to ensure defer cancel() is called per iteration, not accumulated
+ err := func() error {
+ // Create a fresh timeout context for each filer attempt
+ // This ensures each retry gets the full grpcTimeout, not a diminishing deadline
+ timeoutCtx, cancel := context.WithTimeout(ctx, fc.grpcTimeout)
+ defer cancel() // Always clean up context, even on panic or early return
+
+ return pb.WithGrpcFilerClient(false, fc.clientId, filerAddress, fc.grpcDialOption, func(client filer_pb.SeaweedFilerClient) error {
+ resp, err := client.LookupVolume(timeoutCtx, &filer_pb.LookupVolumeRequest{
+ VolumeIds: volumeIds,
+ })
+ if err != nil {
+ return fmt.Errorf("filer.LookupVolume failed: %w", err)
+ }
+
+ // Process each volume in the response
+ for vid, locs := range resp.LocationsMap {
+ // Convert locations from protobuf to internal format
+ var locations []Location
+ for _, loc := range locs.Locations {
+ locations = append(locations, Location{
+ Url: loc.Url,
+ PublicUrl: loc.PublicUrl,
+ DataCenter: loc.DataCenter,
+ GrpcPort: int(loc.GrpcPort),
+ })
+ }
+
+ // Only add to result if we have locations
+ // Empty locations with no gRPC error means "not found" (volume doesn't exist)
+ if len(locations) > 0 {
+ result[vid] = locations
+ glog.V(4).Infof("FilerClient: volume %s found with %d location(s)", vid, len(locations))
+ } else {
+ glog.V(2).Infof("FilerClient: volume %s not found (no locations in response)", vid)
+ }
+ }
+
+ // Check for volumes that weren't in the response at all
+ // This could indicate a problem with the filer
+ for _, vid := range volumeIds {
+ if _, found := resp.LocationsMap[vid]; !found {
+ glog.V(1).Infof("FilerClient: volume %s missing from filer response", vid)
+ }
+ }
+
+ return nil
+ })
+ }()
+
+ if err != nil {
+ glog.V(1).Infof("FilerClient: filer %s lookup failed (attempt %d/%d, retry %d/%d): %v", filerAddress, x+1, n, retry+1, maxRetries, err)
+ fc.recordFilerFailure(i)
+ lastErr = err
+ i++
+ if i >= n {
+ i = 0
+ }
+ continue
+ }
+
+ // Success - update the preferred filer index and reset health tracking
+ atomic.StoreInt32(&fc.filerIndex, i)
+ fc.recordFilerSuccess(i)
+ glog.V(3).Infof("FilerClient: looked up %d volumes on %s, found %d", len(volumeIds), filerAddress, len(result))
+ return result, nil
+ }
+
+ // All filers failed on this attempt
+ // Check if the error is retryable (transient gRPC error)
+ if !isRetryableGrpcError(lastErr) {
+ // Non-retryable error (e.g., NotFound, PermissionDenied) - fail immediately
+ return nil, fmt.Errorf("all %d filer(s) failed with non-retryable error: %w", n, lastErr)
+ }
+
+ // Transient error - retry if we have attempts left
+ if retry < maxRetries-1 {
+ glog.V(1).Infof("FilerClient: all %d filer(s) failed with retryable error (attempt %d/%d), retrying in %v: %v",
+ n, retry+1, maxRetries, waitTime, lastErr)
+ time.Sleep(waitTime)
+ waitTime = time.Duration(float64(waitTime) * fc.retryBackoffFactor)
+ }
+ }
+
+ // All retries exhausted
+ return nil, fmt.Errorf("all %d filer(s) failed after %d attempts, last error: %w", len(fc.filerAddresses), maxRetries, lastErr)
+}