aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--weed/wdclient/filer_client.go54
1 files changed, 27 insertions, 27 deletions
diff --git a/weed/wdclient/filer_client.go b/weed/wdclient/filer_client.go
index 2222575d6..f3fd7e753 100644
--- a/weed/wdclient/filer_client.go
+++ b/weed/wdclient/filer_client.go
@@ -29,8 +29,8 @@ const (
// filerHealth tracks the health status of a filer
type filerHealth struct {
- failureCount int32 // atomic: consecutive failures
lastFailureTimeNs int64 // atomic: last failure time in Unix nanoseconds
+ failureCount int32 // atomic: consecutive failures
}
// FilerClient provides volume location services by querying a filer
@@ -54,7 +54,7 @@ type FilerClient struct {
maxRetries int // Retry: maximum retry attempts for transient failures
initialRetryWait time.Duration // Retry: initial wait time before first retry
retryBackoffFactor float64 // Retry: backoff multiplier for wait time
-
+
// Filer discovery fields
masterClient *MasterClient // Optional: for discovering filers in the same group
filerGroup string // Optional: filer group for discovery
@@ -79,7 +79,7 @@ type FilerClientOption struct {
MaxRetries int // Retry: maximum retry attempts for transient failures (0 = use default of 3)
InitialRetryWait time.Duration // Retry: initial wait time before first retry (0 = use default of 1s)
RetryBackoffFactor float64 // Retry: backoff multiplier for wait time (0 = use default of 1.5)
-
+
// Filer discovery options
MasterClient *MasterClient // Optional: enables filer discovery from master
FilerGroup string // Optional: filer group name for discovery (required if MasterClient is set)
@@ -192,17 +192,17 @@ func NewFilerClient(filerAddresses []pb.ServerAddress, grpcDialOption grpc.DialO
func (fc *FilerClient) GetCurrentFiler() pb.ServerAddress {
fc.filerAddressesMu.RLock()
defer fc.filerAddressesMu.RUnlock()
-
+
if len(fc.filerAddresses) == 0 {
return ""
}
-
+
// Get current index (atomically updated on successful operations)
index := atomic.LoadInt32(&fc.filerIndex)
if index >= int32(len(fc.filerAddresses)) {
index = 0
}
-
+
return fc.filerAddresses[index]
}
@@ -211,7 +211,7 @@ func (fc *FilerClient) GetCurrentFiler() pb.ServerAddress {
func (fc *FilerClient) GetAllFilers() []pb.ServerAddress {
fc.filerAddressesMu.RLock()
defer fc.filerAddressesMu.RUnlock()
-
+
// Return a copy to avoid concurrent modification
filers := make([]pb.ServerAddress, len(fc.filerAddresses))
copy(filers, fc.filerAddresses)
@@ -223,7 +223,7 @@ func (fc *FilerClient) GetAllFilers() []pb.ServerAddress {
func (fc *FilerClient) SetCurrentFiler(addr pb.ServerAddress) {
fc.filerAddressesMu.RLock()
defer fc.filerAddressesMu.RUnlock()
-
+
// Find the index of the specified filer address
for i, filer := range fc.filerAddresses {
if filer == addr {
@@ -239,7 +239,7 @@ func (fc *FilerClient) SetCurrentFiler(addr pb.ServerAddress) {
func (fc *FilerClient) ShouldSkipUnhealthyFiler(addr pb.ServerAddress) bool {
fc.filerAddressesMu.RLock()
defer fc.filerAddressesMu.RUnlock()
-
+
// Find the health for this filer address
for i, filer := range fc.filerAddresses {
if filer == addr {
@@ -257,7 +257,7 @@ func (fc *FilerClient) ShouldSkipUnhealthyFiler(addr pb.ServerAddress) bool {
func (fc *FilerClient) RecordFilerSuccess(addr pb.ServerAddress) {
fc.filerAddressesMu.RLock()
defer fc.filerAddressesMu.RUnlock()
-
+
// Find the health for this filer address
for i, filer := range fc.filerAddresses {
if filer == addr {
@@ -273,7 +273,7 @@ func (fc *FilerClient) RecordFilerSuccess(addr pb.ServerAddress) {
func (fc *FilerClient) RecordFilerFailure(addr pb.ServerAddress) {
fc.filerAddressesMu.RLock()
defer fc.filerAddressesMu.RUnlock()
-
+
// Find the health for this filer address
for i, filer := range fc.filerAddresses {
if filer == addr {
@@ -303,13 +303,13 @@ func (fc *FilerClient) discoverFilers() {
glog.Errorf("FilerClient: panic in filer discovery goroutine for group '%s': %v", fc.filerGroup, r)
}
}()
-
+
// Do an initial discovery
fc.refreshFilerList()
-
+
ticker := time.NewTicker(fc.discoveryInterval)
defer ticker.Stop()
-
+
for {
select {
case <-ticker.C:
@@ -326,22 +326,22 @@ func (fc *FilerClient) refreshFilerList() {
if fc.masterClient == nil {
return
}
-
+
// Get current master address
currentMaster := fc.masterClient.GetMaster(context.Background())
if currentMaster == "" {
glog.V(1).Infof("FilerClient: no master available for filer discovery")
return
}
-
+
// Query master for filers in our group
updates := cluster.ListExistingPeerUpdates(currentMaster, fc.grpcDialOption, fc.filerGroup, cluster.FilerType)
-
+
if len(updates) == 0 {
glog.V(2).Infof("FilerClient: no filers found in group '%s'", fc.filerGroup)
return
}
-
+
// Build new filer address list
discoveredFilers := make(map[pb.ServerAddress]bool)
for _, update := range updates {
@@ -349,17 +349,17 @@ func (fc *FilerClient) refreshFilerList() {
discoveredFilers[pb.ServerAddress(update.Address)] = true
}
}
-
+
// Thread-safe update of filer list
fc.filerAddressesMu.Lock()
defer fc.filerAddressesMu.Unlock()
-
+
// Build a map of existing filers for efficient O(1) lookup
existingFilers := make(map[pb.ServerAddress]struct{}, len(fc.filerAddresses))
for _, f := range fc.filerAddresses {
existingFilers[f] = struct{}{}
}
-
+
// Find new filers - O(N+M) instead of O(N*M)
var newFilers []pb.ServerAddress
for addr := range discoveredFilers {
@@ -367,18 +367,18 @@ func (fc *FilerClient) refreshFilerList() {
newFilers = append(newFilers, addr)
}
}
-
+
// Add new filers
if len(newFilers) > 0 {
glog.V(0).Infof("FilerClient: discovered %d new filer(s) in group '%s': %v", len(newFilers), fc.filerGroup, newFilers)
fc.filerAddresses = append(fc.filerAddresses, newFilers...)
-
+
// Initialize health tracking for new filers
for range newFilers {
fc.filerHealth = append(fc.filerHealth, &filerHealth{})
}
}
-
+
// Optionally, remove filers that are no longer in the cluster
// For now, we keep all filers and rely on health checks to avoid dead ones
// This prevents removing filers that might be temporarily unavailable
@@ -571,7 +571,7 @@ func (p *filerVolumeProvider) LookupVolumeIds(ctx context.Context, volumeIds []s
// Try all filer addresses with round-robin starting from current index
// Skip known-unhealthy filers (circuit breaker pattern)
i := atomic.LoadInt32(&fc.filerIndex)
-
+
// Get filer count with read lock
fc.filerAddressesMu.RLock()
n := int32(len(fc.filerAddresses))
@@ -589,12 +589,12 @@ func (p *filerVolumeProvider) LookupVolumeIds(ctx context.Context, volumeIds []s
// Filer list changed, reset index
i = 0
}
-
+
// Get health pointer while holding lock
health := fc.filerHealth[i]
filerAddress := fc.filerAddresses[i]
fc.filerAddressesMu.RUnlock()
-
+
// Circuit breaker: skip unhealthy filers (no lock needed - uses atomics)
if fc.shouldSkipUnhealthyFilerWithHealth(health) {
glog.V(2).Infof("FilerClient: skipping unhealthy filer %s (consecutive failures: %d)",