aboutsummaryrefslogtreecommitdiff
path: root/weed/wdclient/filer_client.go
diff options
context:
space:
mode:
Diffstat (limited to 'weed/wdclient/filer_client.go')
-rw-r--r--weed/wdclient/filer_client.go320
1 files changed, 307 insertions, 13 deletions
diff --git a/weed/wdclient/filer_client.go b/weed/wdclient/filer_client.go
index f0dd5f2e6..2222575d6 100644
--- a/weed/wdclient/filer_client.go
+++ b/weed/wdclient/filer_client.go
@@ -5,6 +5,7 @@ import (
"fmt"
"math/rand"
"strings"
+ "sync"
"sync/atomic"
"time"
@@ -12,6 +13,7 @@ import (
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
+ "github.com/seaweedfs/seaweedfs/weed/cluster"
"github.com/seaweedfs/seaweedfs/weed/glog"
"github.com/seaweedfs/seaweedfs/weed/pb"
"github.com/seaweedfs/seaweedfs/weed/pb/filer_pb"
@@ -35,9 +37,11 @@ type filerHealth struct {
// It uses the shared vidMap cache for efficient lookups
// Supports multiple filer addresses with automatic failover for high availability
// Tracks filer health to avoid repeatedly trying known-unhealthy filers
+// Can discover additional filers from master server when configured with filer group
type FilerClient struct {
*vidMapClient
filerAddresses []pb.ServerAddress
+ filerAddressesMu sync.RWMutex // Protects filerAddresses and filerHealth
filerIndex int32 // atomic: current filer index for round-robin
filerHealth []*filerHealth // health status per filer (same order as filerAddresses)
grpcDialOption grpc.DialOption
@@ -50,6 +54,13 @@ type FilerClient struct {
maxRetries int // Retry: maximum retry attempts for transient failures
initialRetryWait time.Duration // Retry: initial wait time before first retry
retryBackoffFactor float64 // Retry: backoff multiplier for wait time
+
+ // Filer discovery fields
+ masterClient *MasterClient // Optional: for discovering filers in the same group
+ filerGroup string // Optional: filer group for discovery
+ discoveryInterval time.Duration // How often to refresh filer list from master
+ stopDiscovery chan struct{} // Signal to stop discovery goroutine
+ closeDiscoveryOnce sync.Once // Ensures discovery channel is closed at most once
}
// filerVolumeProvider implements VolumeLocationProvider by querying filer
@@ -68,6 +79,11 @@ type FilerClientOption struct {
MaxRetries int // Retry: maximum retry attempts for transient failures (0 = use default of 3)
InitialRetryWait time.Duration // Retry: initial wait time before first retry (0 = use default of 1s)
RetryBackoffFactor float64 // Retry: backoff multiplier for wait time (0 = use default of 1.5)
+
+ // Filer discovery options
+ MasterClient *MasterClient // Optional: enables filer discovery from master
+ FilerGroup string // Optional: filer group name for discovery (required if MasterClient is set)
+ DiscoveryInterval time.Duration // Optional: how often to refresh filer list (0 = use default of 5 minutes)
}
// NewFilerClient creates a new client that queries filer(s) for volume locations
@@ -87,6 +103,9 @@ func NewFilerClient(filerAddresses []pb.ServerAddress, grpcDialOption grpc.DialO
maxRetries := 3 // Default: 3 retry attempts for transient failures
initialRetryWait := time.Second // Default: 1 second initial retry wait
retryBackoffFactor := 1.5 // Default: 1.5x backoff multiplier
+ var masterClient *MasterClient
+ var filerGroup string
+ discoveryInterval := 5 * time.Minute // Default: refresh every 5 minutes
// Override with provided options
if len(opts) > 0 && opts[0] != nil {
@@ -115,6 +134,13 @@ func NewFilerClient(filerAddresses []pb.ServerAddress, grpcDialOption grpc.DialO
if opt.RetryBackoffFactor > 0 {
retryBackoffFactor = opt.RetryBackoffFactor
}
+ if opt.MasterClient != nil {
+ masterClient = opt.MasterClient
+ filerGroup = opt.FilerGroup
+ if opt.DiscoveryInterval > 0 {
+ discoveryInterval = opt.DiscoveryInterval
+ }
+ }
}
// Initialize health tracking for each filer
@@ -137,6 +163,17 @@ func NewFilerClient(filerAddresses []pb.ServerAddress, grpcDialOption grpc.DialO
maxRetries: maxRetries,
initialRetryWait: initialRetryWait,
retryBackoffFactor: retryBackoffFactor,
+ masterClient: masterClient,
+ filerGroup: filerGroup,
+ discoveryInterval: discoveryInterval,
+ }
+
+ // Start filer discovery if master client is configured
+ // Empty filerGroup is valid (represents default group)
+ if masterClient != nil {
+ fc.stopDiscovery = make(chan struct{})
+ go fc.discoverFilers()
+ glog.V(0).Infof("FilerClient: started filer discovery for group '%s' (refresh interval: %v)", filerGroup, discoveryInterval)
}
// Create provider that references this FilerClient for failover support
@@ -149,6 +186,204 @@ func NewFilerClient(filerAddresses []pb.ServerAddress, grpcDialOption grpc.DialO
return fc
}
+// GetCurrentFiler returns the currently active filer address
+// This is the filer that was last successfully used or the one indicated by round-robin
+// Returns empty string if no filers are configured
+func (fc *FilerClient) GetCurrentFiler() pb.ServerAddress {
+ fc.filerAddressesMu.RLock()
+ defer fc.filerAddressesMu.RUnlock()
+
+ if len(fc.filerAddresses) == 0 {
+ return ""
+ }
+
+ // Get current index (atomically updated on successful operations)
+ index := atomic.LoadInt32(&fc.filerIndex)
+ if index >= int32(len(fc.filerAddresses)) {
+ index = 0
+ }
+
+ return fc.filerAddresses[index]
+}
+
+// GetAllFilers returns a snapshot of all filer addresses
+// Returns a copy to avoid concurrent modification issues
+func (fc *FilerClient) GetAllFilers() []pb.ServerAddress {
+ fc.filerAddressesMu.RLock()
+ defer fc.filerAddressesMu.RUnlock()
+
+ // Return a copy to avoid concurrent modification
+ filers := make([]pb.ServerAddress, len(fc.filerAddresses))
+ copy(filers, fc.filerAddresses)
+ return filers
+}
+
+// SetCurrentFiler updates the current filer index to the specified address
+// This is useful after successful failover to prefer the healthy filer for future requests
+func (fc *FilerClient) SetCurrentFiler(addr pb.ServerAddress) {
+ fc.filerAddressesMu.RLock()
+ defer fc.filerAddressesMu.RUnlock()
+
+ // Find the index of the specified filer address
+ for i, filer := range fc.filerAddresses {
+ if filer == addr {
+ atomic.StoreInt32(&fc.filerIndex, int32(i))
+ return
+ }
+ }
+ // If address not found, leave index unchanged
+}
+
+// ShouldSkipUnhealthyFiler checks if a filer address should be skipped based on health tracking
+// Returns true if the filer has exceeded failure threshold and reset timeout hasn't elapsed
+func (fc *FilerClient) ShouldSkipUnhealthyFiler(addr pb.ServerAddress) bool {
+ fc.filerAddressesMu.RLock()
+ defer fc.filerAddressesMu.RUnlock()
+
+ // Find the health for this filer address
+ for i, filer := range fc.filerAddresses {
+ if filer == addr {
+ if i < len(fc.filerHealth) {
+ return fc.shouldSkipUnhealthyFilerWithHealth(fc.filerHealth[i])
+ }
+ return false
+ }
+ }
+ // If address not found, don't skip it
+ return false
+}
+
+// RecordFilerSuccess resets failure tracking for a successful filer
+func (fc *FilerClient) RecordFilerSuccess(addr pb.ServerAddress) {
+ fc.filerAddressesMu.RLock()
+ defer fc.filerAddressesMu.RUnlock()
+
+ // Find the health for this filer address
+ for i, filer := range fc.filerAddresses {
+ if filer == addr {
+ if i < len(fc.filerHealth) {
+ fc.recordFilerSuccessWithHealth(fc.filerHealth[i])
+ }
+ return
+ }
+ }
+}
+
+// RecordFilerFailure increments failure count for an unhealthy filer
+func (fc *FilerClient) RecordFilerFailure(addr pb.ServerAddress) {
+ fc.filerAddressesMu.RLock()
+ defer fc.filerAddressesMu.RUnlock()
+
+ // Find the health for this filer address
+ for i, filer := range fc.filerAddresses {
+ if filer == addr {
+ if i < len(fc.filerHealth) {
+ fc.recordFilerFailureWithHealth(fc.filerHealth[i])
+ }
+ return
+ }
+ }
+}
+
+// Close stops the filer discovery goroutine if running
+// Safe to call multiple times (idempotent)
+func (fc *FilerClient) Close() {
+ if fc.stopDiscovery != nil {
+ fc.closeDiscoveryOnce.Do(func() {
+ close(fc.stopDiscovery)
+ })
+ }
+}
+
+// discoverFilers periodically queries the master to discover filers in the same group
+// and updates the filer list. This runs in a background goroutine.
+func (fc *FilerClient) discoverFilers() {
+ defer func() {
+ if r := recover(); r != nil {
+ glog.Errorf("FilerClient: panic in filer discovery goroutine for group '%s': %v", fc.filerGroup, r)
+ }
+ }()
+
+ // Do an initial discovery
+ fc.refreshFilerList()
+
+ ticker := time.NewTicker(fc.discoveryInterval)
+ defer ticker.Stop()
+
+ for {
+ select {
+ case <-ticker.C:
+ fc.refreshFilerList()
+ case <-fc.stopDiscovery:
+ glog.V(0).Infof("FilerClient: stopping filer discovery for group '%s'", fc.filerGroup)
+ return
+ }
+ }
+}
+
+// refreshFilerList queries the master for the current list of filers and updates the local list
+func (fc *FilerClient) refreshFilerList() {
+ if fc.masterClient == nil {
+ return
+ }
+
+ // Get current master address
+ currentMaster := fc.masterClient.GetMaster(context.Background())
+ if currentMaster == "" {
+ glog.V(1).Infof("FilerClient: no master available for filer discovery")
+ return
+ }
+
+ // Query master for filers in our group
+ updates := cluster.ListExistingPeerUpdates(currentMaster, fc.grpcDialOption, fc.filerGroup, cluster.FilerType)
+
+ if len(updates) == 0 {
+ glog.V(2).Infof("FilerClient: no filers found in group '%s'", fc.filerGroup)
+ return
+ }
+
+ // Build new filer address list
+ discoveredFilers := make(map[pb.ServerAddress]bool)
+ for _, update := range updates {
+ if update.Address != "" {
+ discoveredFilers[pb.ServerAddress(update.Address)] = true
+ }
+ }
+
+ // Thread-safe update of filer list
+ fc.filerAddressesMu.Lock()
+ defer fc.filerAddressesMu.Unlock()
+
+ // Build a map of existing filers for efficient O(1) lookup
+ existingFilers := make(map[pb.ServerAddress]struct{}, len(fc.filerAddresses))
+ for _, f := range fc.filerAddresses {
+ existingFilers[f] = struct{}{}
+ }
+
+ // Find new filers - O(N+M) instead of O(N*M)
+ var newFilers []pb.ServerAddress
+ for addr := range discoveredFilers {
+ if _, found := existingFilers[addr]; !found {
+ newFilers = append(newFilers, addr)
+ }
+ }
+
+ // Add new filers
+ if len(newFilers) > 0 {
+ glog.V(0).Infof("FilerClient: discovered %d new filer(s) in group '%s': %v", len(newFilers), fc.filerGroup, newFilers)
+ fc.filerAddresses = append(fc.filerAddresses, newFilers...)
+
+ // Initialize health tracking for new filers
+ for range newFilers {
+ fc.filerHealth = append(fc.filerHealth, &filerHealth{})
+ }
+ }
+
+ // Optionally, remove filers that are no longer in the cluster
+ // For now, we keep all filers and rely on health checks to avoid dead ones
+ // This prevents removing filers that might be temporarily unavailable
+}
+
// GetLookupFileIdFunction returns a lookup function with URL preference handling
func (fc *FilerClient) GetLookupFileIdFunction() LookupFileIdFunctionType {
if fc.urlPreference == PreferUrl {
@@ -245,8 +480,9 @@ func isRetryableGrpcError(err error) bool {
// shouldSkipUnhealthyFiler checks if we should skip a filer based on recent failures
// Circuit breaker pattern: skip filers with multiple recent consecutive failures
-func (fc *FilerClient) shouldSkipUnhealthyFiler(index int32) bool {
- health := fc.filerHealth[index]
+// shouldSkipUnhealthyFilerWithHealth checks if a filer should be skipped based on health
+// Uses atomic operations only - safe to call without locks
+func (fc *FilerClient) shouldSkipUnhealthyFilerWithHealth(health *filerHealth) bool {
failureCount := atomic.LoadInt32(&health.failureCount)
// Check if failure count exceeds threshold
@@ -267,17 +503,53 @@ func (fc *FilerClient) shouldSkipUnhealthyFiler(index int32) bool {
return true // Skip this unhealthy filer
}
+// Deprecated: Use shouldSkipUnhealthyFilerWithHealth instead
+// This function is kept for backward compatibility but requires array access
+// Note: This function is now thread-safe.
+func (fc *FilerClient) shouldSkipUnhealthyFiler(index int32) bool {
+ fc.filerAddressesMu.RLock()
+ if index >= int32(len(fc.filerHealth)) {
+ fc.filerAddressesMu.RUnlock()
+ return true // Invalid index - skip
+ }
+ health := fc.filerHealth[index]
+ fc.filerAddressesMu.RUnlock()
+ return fc.shouldSkipUnhealthyFilerWithHealth(health)
+}
+
+// recordFilerSuccessWithHealth resets failure tracking for a successful filer
+func (fc *FilerClient) recordFilerSuccessWithHealth(health *filerHealth) {
+ atomic.StoreInt32(&health.failureCount, 0)
+}
+
// recordFilerSuccess resets failure tracking for a successful filer
func (fc *FilerClient) recordFilerSuccess(index int32) {
+ fc.filerAddressesMu.RLock()
+ if index >= int32(len(fc.filerHealth)) {
+ fc.filerAddressesMu.RUnlock()
+ return // Invalid index
+ }
health := fc.filerHealth[index]
- atomic.StoreInt32(&health.failureCount, 0)
+ fc.filerAddressesMu.RUnlock()
+ fc.recordFilerSuccessWithHealth(health)
+}
+
+// recordFilerFailureWithHealth increments failure count for an unhealthy filer
+func (fc *FilerClient) recordFilerFailureWithHealth(health *filerHealth) {
+ atomic.AddInt32(&health.failureCount, 1)
+ atomic.StoreInt64(&health.lastFailureTimeNs, time.Now().UnixNano())
}
// recordFilerFailure increments failure count for an unhealthy filer
func (fc *FilerClient) recordFilerFailure(index int32) {
+ fc.filerAddressesMu.RLock()
+ if index >= int32(len(fc.filerHealth)) {
+ fc.filerAddressesMu.RUnlock()
+ return // Invalid index
+ }
health := fc.filerHealth[index]
- atomic.AddInt32(&health.failureCount, 1)
- atomic.StoreInt64(&health.lastFailureTimeNs, time.Now().UnixNano())
+ fc.filerAddressesMu.RUnlock()
+ fc.recordFilerFailureWithHealth(health)
}
// LookupVolumeIds queries the filer for volume locations with automatic failover
@@ -299,13 +571,34 @@ func (p *filerVolumeProvider) LookupVolumeIds(ctx context.Context, volumeIds []s
// Try all filer addresses with round-robin starting from current index
// Skip known-unhealthy filers (circuit breaker pattern)
i := atomic.LoadInt32(&fc.filerIndex)
+
+ // Get filer count with read lock
+ fc.filerAddressesMu.RLock()
n := int32(len(fc.filerAddresses))
+ fc.filerAddressesMu.RUnlock()
for x := int32(0); x < n; x++ {
- // Circuit breaker: skip unhealthy filers
- if fc.shouldSkipUnhealthyFiler(i) {
+ // Get current filer address and health with read lock
+ fc.filerAddressesMu.RLock()
+ if len(fc.filerAddresses) == 0 {
+ fc.filerAddressesMu.RUnlock()
+ lastErr = fmt.Errorf("no filers available")
+ break
+ }
+ if i >= int32(len(fc.filerAddresses)) {
+ // Filer list changed, reset index
+ i = 0
+ }
+
+ // Get health pointer while holding lock
+ health := fc.filerHealth[i]
+ filerAddress := fc.filerAddresses[i]
+ fc.filerAddressesMu.RUnlock()
+
+ // Circuit breaker: skip unhealthy filers (no lock needed - uses atomics)
+ if fc.shouldSkipUnhealthyFilerWithHealth(health) {
glog.V(2).Infof("FilerClient: skipping unhealthy filer %s (consecutive failures: %d)",
- fc.filerAddresses[i], atomic.LoadInt32(&fc.filerHealth[i].failureCount))
+ filerAddress, atomic.LoadInt32(&health.failureCount))
i++
if i >= n {
i = 0
@@ -313,8 +606,6 @@ func (p *filerVolumeProvider) LookupVolumeIds(ctx context.Context, volumeIds []s
continue
}
- filerAddress := fc.filerAddresses[i]
-
// Use anonymous function to ensure defer cancel() is called per iteration, not accumulated
err := func() error {
// Create a fresh timeout context for each filer attempt
@@ -367,7 +658,7 @@ func (p *filerVolumeProvider) LookupVolumeIds(ctx context.Context, volumeIds []s
if err != nil {
glog.V(1).Infof("FilerClient: filer %s lookup failed (attempt %d/%d, retry %d/%d): %v", filerAddress, x+1, n, retry+1, maxRetries, err)
- fc.recordFilerFailure(i)
+ fc.recordFilerFailureWithHealth(health)
lastErr = err
i++
if i >= n {
@@ -378,7 +669,7 @@ func (p *filerVolumeProvider) LookupVolumeIds(ctx context.Context, volumeIds []s
// Success - update the preferred filer index and reset health tracking
atomic.StoreInt32(&fc.filerIndex, i)
- fc.recordFilerSuccess(i)
+ fc.recordFilerSuccessWithHealth(health)
glog.V(3).Infof("FilerClient: looked up %d volumes on %s, found %d", len(volumeIds), filerAddress, len(result))
return result, nil
}
@@ -400,5 +691,8 @@ func (p *filerVolumeProvider) LookupVolumeIds(ctx context.Context, volumeIds []s
}
// All retries exhausted
- return nil, fmt.Errorf("all %d filer(s) failed after %d attempts, last error: %w", len(fc.filerAddresses), maxRetries, lastErr)
+ fc.filerAddressesMu.RLock()
+ totalFilers := len(fc.filerAddresses)
+ fc.filerAddressesMu.RUnlock()
+ return nil, fmt.Errorf("all %d filer(s) failed after %d attempts, last error: %w", totalFilers, maxRetries, lastErr)
}