diff options
Diffstat (limited to 'weed/wdclient/filer_client.go')
| -rw-r--r-- | weed/wdclient/filer_client.go | 320 |
1 files changed, 307 insertions, 13 deletions
diff --git a/weed/wdclient/filer_client.go b/weed/wdclient/filer_client.go index f0dd5f2e6..2222575d6 100644 --- a/weed/wdclient/filer_client.go +++ b/weed/wdclient/filer_client.go @@ -5,6 +5,7 @@ import ( "fmt" "math/rand" "strings" + "sync" "sync/atomic" "time" @@ -12,6 +13,7 @@ import ( "google.golang.org/grpc/codes" "google.golang.org/grpc/status" + "github.com/seaweedfs/seaweedfs/weed/cluster" "github.com/seaweedfs/seaweedfs/weed/glog" "github.com/seaweedfs/seaweedfs/weed/pb" "github.com/seaweedfs/seaweedfs/weed/pb/filer_pb" @@ -35,9 +37,11 @@ type filerHealth struct { // It uses the shared vidMap cache for efficient lookups // Supports multiple filer addresses with automatic failover for high availability // Tracks filer health to avoid repeatedly trying known-unhealthy filers +// Can discover additional filers from master server when configured with filer group type FilerClient struct { *vidMapClient filerAddresses []pb.ServerAddress + filerAddressesMu sync.RWMutex // Protects filerAddresses and filerHealth filerIndex int32 // atomic: current filer index for round-robin filerHealth []*filerHealth // health status per filer (same order as filerAddresses) grpcDialOption grpc.DialOption @@ -50,6 +54,13 @@ type FilerClient struct { maxRetries int // Retry: maximum retry attempts for transient failures initialRetryWait time.Duration // Retry: initial wait time before first retry retryBackoffFactor float64 // Retry: backoff multiplier for wait time + + // Filer discovery fields + masterClient *MasterClient // Optional: for discovering filers in the same group + filerGroup string // Optional: filer group for discovery + discoveryInterval time.Duration // How often to refresh filer list from master + stopDiscovery chan struct{} // Signal to stop discovery goroutine + closeDiscoveryOnce sync.Once // Ensures discovery channel is closed at most once } // filerVolumeProvider implements VolumeLocationProvider by querying filer @@ -68,6 +79,11 @@ type FilerClientOption struct { MaxRetries int // Retry: maximum retry attempts for transient failures (0 = use default of 3) InitialRetryWait time.Duration // Retry: initial wait time before first retry (0 = use default of 1s) RetryBackoffFactor float64 // Retry: backoff multiplier for wait time (0 = use default of 1.5) + + // Filer discovery options + MasterClient *MasterClient // Optional: enables filer discovery from master + FilerGroup string // Optional: filer group name for discovery (required if MasterClient is set) + DiscoveryInterval time.Duration // Optional: how often to refresh filer list (0 = use default of 5 minutes) } // NewFilerClient creates a new client that queries filer(s) for volume locations @@ -87,6 +103,9 @@ func NewFilerClient(filerAddresses []pb.ServerAddress, grpcDialOption grpc.DialO maxRetries := 3 // Default: 3 retry attempts for transient failures initialRetryWait := time.Second // Default: 1 second initial retry wait retryBackoffFactor := 1.5 // Default: 1.5x backoff multiplier + var masterClient *MasterClient + var filerGroup string + discoveryInterval := 5 * time.Minute // Default: refresh every 5 minutes // Override with provided options if len(opts) > 0 && opts[0] != nil { @@ -115,6 +134,13 @@ func NewFilerClient(filerAddresses []pb.ServerAddress, grpcDialOption grpc.DialO if opt.RetryBackoffFactor > 0 { retryBackoffFactor = opt.RetryBackoffFactor } + if opt.MasterClient != nil { + masterClient = opt.MasterClient + filerGroup = opt.FilerGroup + if opt.DiscoveryInterval > 0 { + discoveryInterval = opt.DiscoveryInterval + } + } } // Initialize health tracking for each filer @@ -137,6 +163,17 @@ func NewFilerClient(filerAddresses []pb.ServerAddress, grpcDialOption grpc.DialO maxRetries: maxRetries, initialRetryWait: initialRetryWait, retryBackoffFactor: retryBackoffFactor, + masterClient: masterClient, + filerGroup: filerGroup, + discoveryInterval: discoveryInterval, + } + + // Start filer discovery if master client is configured + // Empty filerGroup is valid (represents default group) + if masterClient != nil { + fc.stopDiscovery = make(chan struct{}) + go fc.discoverFilers() + glog.V(0).Infof("FilerClient: started filer discovery for group '%s' (refresh interval: %v)", filerGroup, discoveryInterval) } // Create provider that references this FilerClient for failover support @@ -149,6 +186,204 @@ func NewFilerClient(filerAddresses []pb.ServerAddress, grpcDialOption grpc.DialO return fc } +// GetCurrentFiler returns the currently active filer address +// This is the filer that was last successfully used or the one indicated by round-robin +// Returns empty string if no filers are configured +func (fc *FilerClient) GetCurrentFiler() pb.ServerAddress { + fc.filerAddressesMu.RLock() + defer fc.filerAddressesMu.RUnlock() + + if len(fc.filerAddresses) == 0 { + return "" + } + + // Get current index (atomically updated on successful operations) + index := atomic.LoadInt32(&fc.filerIndex) + if index >= int32(len(fc.filerAddresses)) { + index = 0 + } + + return fc.filerAddresses[index] +} + +// GetAllFilers returns a snapshot of all filer addresses +// Returns a copy to avoid concurrent modification issues +func (fc *FilerClient) GetAllFilers() []pb.ServerAddress { + fc.filerAddressesMu.RLock() + defer fc.filerAddressesMu.RUnlock() + + // Return a copy to avoid concurrent modification + filers := make([]pb.ServerAddress, len(fc.filerAddresses)) + copy(filers, fc.filerAddresses) + return filers +} + +// SetCurrentFiler updates the current filer index to the specified address +// This is useful after successful failover to prefer the healthy filer for future requests +func (fc *FilerClient) SetCurrentFiler(addr pb.ServerAddress) { + fc.filerAddressesMu.RLock() + defer fc.filerAddressesMu.RUnlock() + + // Find the index of the specified filer address + for i, filer := range fc.filerAddresses { + if filer == addr { + atomic.StoreInt32(&fc.filerIndex, int32(i)) + return + } + } + // If address not found, leave index unchanged +} + +// ShouldSkipUnhealthyFiler checks if a filer address should be skipped based on health tracking +// Returns true if the filer has exceeded failure threshold and reset timeout hasn't elapsed +func (fc *FilerClient) ShouldSkipUnhealthyFiler(addr pb.ServerAddress) bool { + fc.filerAddressesMu.RLock() + defer fc.filerAddressesMu.RUnlock() + + // Find the health for this filer address + for i, filer := range fc.filerAddresses { + if filer == addr { + if i < len(fc.filerHealth) { + return fc.shouldSkipUnhealthyFilerWithHealth(fc.filerHealth[i]) + } + return false + } + } + // If address not found, don't skip it + return false +} + +// RecordFilerSuccess resets failure tracking for a successful filer +func (fc *FilerClient) RecordFilerSuccess(addr pb.ServerAddress) { + fc.filerAddressesMu.RLock() + defer fc.filerAddressesMu.RUnlock() + + // Find the health for this filer address + for i, filer := range fc.filerAddresses { + if filer == addr { + if i < len(fc.filerHealth) { + fc.recordFilerSuccessWithHealth(fc.filerHealth[i]) + } + return + } + } +} + +// RecordFilerFailure increments failure count for an unhealthy filer +func (fc *FilerClient) RecordFilerFailure(addr pb.ServerAddress) { + fc.filerAddressesMu.RLock() + defer fc.filerAddressesMu.RUnlock() + + // Find the health for this filer address + for i, filer := range fc.filerAddresses { + if filer == addr { + if i < len(fc.filerHealth) { + fc.recordFilerFailureWithHealth(fc.filerHealth[i]) + } + return + } + } +} + +// Close stops the filer discovery goroutine if running +// Safe to call multiple times (idempotent) +func (fc *FilerClient) Close() { + if fc.stopDiscovery != nil { + fc.closeDiscoveryOnce.Do(func() { + close(fc.stopDiscovery) + }) + } +} + +// discoverFilers periodically queries the master to discover filers in the same group +// and updates the filer list. This runs in a background goroutine. +func (fc *FilerClient) discoverFilers() { + defer func() { + if r := recover(); r != nil { + glog.Errorf("FilerClient: panic in filer discovery goroutine for group '%s': %v", fc.filerGroup, r) + } + }() + + // Do an initial discovery + fc.refreshFilerList() + + ticker := time.NewTicker(fc.discoveryInterval) + defer ticker.Stop() + + for { + select { + case <-ticker.C: + fc.refreshFilerList() + case <-fc.stopDiscovery: + glog.V(0).Infof("FilerClient: stopping filer discovery for group '%s'", fc.filerGroup) + return + } + } +} + +// refreshFilerList queries the master for the current list of filers and updates the local list +func (fc *FilerClient) refreshFilerList() { + if fc.masterClient == nil { + return + } + + // Get current master address + currentMaster := fc.masterClient.GetMaster(context.Background()) + if currentMaster == "" { + glog.V(1).Infof("FilerClient: no master available for filer discovery") + return + } + + // Query master for filers in our group + updates := cluster.ListExistingPeerUpdates(currentMaster, fc.grpcDialOption, fc.filerGroup, cluster.FilerType) + + if len(updates) == 0 { + glog.V(2).Infof("FilerClient: no filers found in group '%s'", fc.filerGroup) + return + } + + // Build new filer address list + discoveredFilers := make(map[pb.ServerAddress]bool) + for _, update := range updates { + if update.Address != "" { + discoveredFilers[pb.ServerAddress(update.Address)] = true + } + } + + // Thread-safe update of filer list + fc.filerAddressesMu.Lock() + defer fc.filerAddressesMu.Unlock() + + // Build a map of existing filers for efficient O(1) lookup + existingFilers := make(map[pb.ServerAddress]struct{}, len(fc.filerAddresses)) + for _, f := range fc.filerAddresses { + existingFilers[f] = struct{}{} + } + + // Find new filers - O(N+M) instead of O(N*M) + var newFilers []pb.ServerAddress + for addr := range discoveredFilers { + if _, found := existingFilers[addr]; !found { + newFilers = append(newFilers, addr) + } + } + + // Add new filers + if len(newFilers) > 0 { + glog.V(0).Infof("FilerClient: discovered %d new filer(s) in group '%s': %v", len(newFilers), fc.filerGroup, newFilers) + fc.filerAddresses = append(fc.filerAddresses, newFilers...) + + // Initialize health tracking for new filers + for range newFilers { + fc.filerHealth = append(fc.filerHealth, &filerHealth{}) + } + } + + // Optionally, remove filers that are no longer in the cluster + // For now, we keep all filers and rely on health checks to avoid dead ones + // This prevents removing filers that might be temporarily unavailable +} + // GetLookupFileIdFunction returns a lookup function with URL preference handling func (fc *FilerClient) GetLookupFileIdFunction() LookupFileIdFunctionType { if fc.urlPreference == PreferUrl { @@ -245,8 +480,9 @@ func isRetryableGrpcError(err error) bool { // shouldSkipUnhealthyFiler checks if we should skip a filer based on recent failures // Circuit breaker pattern: skip filers with multiple recent consecutive failures -func (fc *FilerClient) shouldSkipUnhealthyFiler(index int32) bool { - health := fc.filerHealth[index] +// shouldSkipUnhealthyFilerWithHealth checks if a filer should be skipped based on health +// Uses atomic operations only - safe to call without locks +func (fc *FilerClient) shouldSkipUnhealthyFilerWithHealth(health *filerHealth) bool { failureCount := atomic.LoadInt32(&health.failureCount) // Check if failure count exceeds threshold @@ -267,17 +503,53 @@ func (fc *FilerClient) shouldSkipUnhealthyFiler(index int32) bool { return true // Skip this unhealthy filer } +// Deprecated: Use shouldSkipUnhealthyFilerWithHealth instead +// This function is kept for backward compatibility but requires array access +// Note: This function is now thread-safe. +func (fc *FilerClient) shouldSkipUnhealthyFiler(index int32) bool { + fc.filerAddressesMu.RLock() + if index >= int32(len(fc.filerHealth)) { + fc.filerAddressesMu.RUnlock() + return true // Invalid index - skip + } + health := fc.filerHealth[index] + fc.filerAddressesMu.RUnlock() + return fc.shouldSkipUnhealthyFilerWithHealth(health) +} + +// recordFilerSuccessWithHealth resets failure tracking for a successful filer +func (fc *FilerClient) recordFilerSuccessWithHealth(health *filerHealth) { + atomic.StoreInt32(&health.failureCount, 0) +} + // recordFilerSuccess resets failure tracking for a successful filer func (fc *FilerClient) recordFilerSuccess(index int32) { + fc.filerAddressesMu.RLock() + if index >= int32(len(fc.filerHealth)) { + fc.filerAddressesMu.RUnlock() + return // Invalid index + } health := fc.filerHealth[index] - atomic.StoreInt32(&health.failureCount, 0) + fc.filerAddressesMu.RUnlock() + fc.recordFilerSuccessWithHealth(health) +} + +// recordFilerFailureWithHealth increments failure count for an unhealthy filer +func (fc *FilerClient) recordFilerFailureWithHealth(health *filerHealth) { + atomic.AddInt32(&health.failureCount, 1) + atomic.StoreInt64(&health.lastFailureTimeNs, time.Now().UnixNano()) } // recordFilerFailure increments failure count for an unhealthy filer func (fc *FilerClient) recordFilerFailure(index int32) { + fc.filerAddressesMu.RLock() + if index >= int32(len(fc.filerHealth)) { + fc.filerAddressesMu.RUnlock() + return // Invalid index + } health := fc.filerHealth[index] - atomic.AddInt32(&health.failureCount, 1) - atomic.StoreInt64(&health.lastFailureTimeNs, time.Now().UnixNano()) + fc.filerAddressesMu.RUnlock() + fc.recordFilerFailureWithHealth(health) } // LookupVolumeIds queries the filer for volume locations with automatic failover @@ -299,13 +571,34 @@ func (p *filerVolumeProvider) LookupVolumeIds(ctx context.Context, volumeIds []s // Try all filer addresses with round-robin starting from current index // Skip known-unhealthy filers (circuit breaker pattern) i := atomic.LoadInt32(&fc.filerIndex) + + // Get filer count with read lock + fc.filerAddressesMu.RLock() n := int32(len(fc.filerAddresses)) + fc.filerAddressesMu.RUnlock() for x := int32(0); x < n; x++ { - // Circuit breaker: skip unhealthy filers - if fc.shouldSkipUnhealthyFiler(i) { + // Get current filer address and health with read lock + fc.filerAddressesMu.RLock() + if len(fc.filerAddresses) == 0 { + fc.filerAddressesMu.RUnlock() + lastErr = fmt.Errorf("no filers available") + break + } + if i >= int32(len(fc.filerAddresses)) { + // Filer list changed, reset index + i = 0 + } + + // Get health pointer while holding lock + health := fc.filerHealth[i] + filerAddress := fc.filerAddresses[i] + fc.filerAddressesMu.RUnlock() + + // Circuit breaker: skip unhealthy filers (no lock needed - uses atomics) + if fc.shouldSkipUnhealthyFilerWithHealth(health) { glog.V(2).Infof("FilerClient: skipping unhealthy filer %s (consecutive failures: %d)", - fc.filerAddresses[i], atomic.LoadInt32(&fc.filerHealth[i].failureCount)) + filerAddress, atomic.LoadInt32(&health.failureCount)) i++ if i >= n { i = 0 @@ -313,8 +606,6 @@ func (p *filerVolumeProvider) LookupVolumeIds(ctx context.Context, volumeIds []s continue } - filerAddress := fc.filerAddresses[i] - // Use anonymous function to ensure defer cancel() is called per iteration, not accumulated err := func() error { // Create a fresh timeout context for each filer attempt @@ -367,7 +658,7 @@ func (p *filerVolumeProvider) LookupVolumeIds(ctx context.Context, volumeIds []s if err != nil { glog.V(1).Infof("FilerClient: filer %s lookup failed (attempt %d/%d, retry %d/%d): %v", filerAddress, x+1, n, retry+1, maxRetries, err) - fc.recordFilerFailure(i) + fc.recordFilerFailureWithHealth(health) lastErr = err i++ if i >= n { @@ -378,7 +669,7 @@ func (p *filerVolumeProvider) LookupVolumeIds(ctx context.Context, volumeIds []s // Success - update the preferred filer index and reset health tracking atomic.StoreInt32(&fc.filerIndex, i) - fc.recordFilerSuccess(i) + fc.recordFilerSuccessWithHealth(health) glog.V(3).Infof("FilerClient: looked up %d volumes on %s, found %d", len(volumeIds), filerAddress, len(result)) return result, nil } @@ -400,5 +691,8 @@ func (p *filerVolumeProvider) LookupVolumeIds(ctx context.Context, volumeIds []s } // All retries exhausted - return nil, fmt.Errorf("all %d filer(s) failed after %d attempts, last error: %w", len(fc.filerAddresses), maxRetries, lastErr) + fc.filerAddressesMu.RLock() + totalFilers := len(fc.filerAddresses) + fc.filerAddressesMu.RUnlock() + return nil, fmt.Errorf("all %d filer(s) failed after %d attempts, last error: %w", totalFilers, maxRetries, lastErr) } |
