diff options
| -rw-r--r-- | weed/wdclient/filer_client.go | 54 |
1 files changed, 27 insertions, 27 deletions
diff --git a/weed/wdclient/filer_client.go b/weed/wdclient/filer_client.go index 2222575d6..f3fd7e753 100644 --- a/weed/wdclient/filer_client.go +++ b/weed/wdclient/filer_client.go @@ -29,8 +29,8 @@ const ( // filerHealth tracks the health status of a filer type filerHealth struct { - failureCount int32 // atomic: consecutive failures lastFailureTimeNs int64 // atomic: last failure time in Unix nanoseconds + failureCount int32 // atomic: consecutive failures } // FilerClient provides volume location services by querying a filer @@ -54,7 +54,7 @@ type FilerClient struct { maxRetries int // Retry: maximum retry attempts for transient failures initialRetryWait time.Duration // Retry: initial wait time before first retry retryBackoffFactor float64 // Retry: backoff multiplier for wait time - + // Filer discovery fields masterClient *MasterClient // Optional: for discovering filers in the same group filerGroup string // Optional: filer group for discovery @@ -79,7 +79,7 @@ type FilerClientOption struct { MaxRetries int // Retry: maximum retry attempts for transient failures (0 = use default of 3) InitialRetryWait time.Duration // Retry: initial wait time before first retry (0 = use default of 1s) RetryBackoffFactor float64 // Retry: backoff multiplier for wait time (0 = use default of 1.5) - + // Filer discovery options MasterClient *MasterClient // Optional: enables filer discovery from master FilerGroup string // Optional: filer group name for discovery (required if MasterClient is set) @@ -192,17 +192,17 @@ func NewFilerClient(filerAddresses []pb.ServerAddress, grpcDialOption grpc.DialO func (fc *FilerClient) GetCurrentFiler() pb.ServerAddress { fc.filerAddressesMu.RLock() defer fc.filerAddressesMu.RUnlock() - + if len(fc.filerAddresses) == 0 { return "" } - + // Get current index (atomically updated on successful operations) index := atomic.LoadInt32(&fc.filerIndex) if index >= int32(len(fc.filerAddresses)) { index = 0 } - + return fc.filerAddresses[index] } @@ -211,7 +211,7 @@ func (fc *FilerClient) GetCurrentFiler() pb.ServerAddress { func (fc *FilerClient) GetAllFilers() []pb.ServerAddress { fc.filerAddressesMu.RLock() defer fc.filerAddressesMu.RUnlock() - + // Return a copy to avoid concurrent modification filers := make([]pb.ServerAddress, len(fc.filerAddresses)) copy(filers, fc.filerAddresses) @@ -223,7 +223,7 @@ func (fc *FilerClient) GetAllFilers() []pb.ServerAddress { func (fc *FilerClient) SetCurrentFiler(addr pb.ServerAddress) { fc.filerAddressesMu.RLock() defer fc.filerAddressesMu.RUnlock() - + // Find the index of the specified filer address for i, filer := range fc.filerAddresses { if filer == addr { @@ -239,7 +239,7 @@ func (fc *FilerClient) SetCurrentFiler(addr pb.ServerAddress) { func (fc *FilerClient) ShouldSkipUnhealthyFiler(addr pb.ServerAddress) bool { fc.filerAddressesMu.RLock() defer fc.filerAddressesMu.RUnlock() - + // Find the health for this filer address for i, filer := range fc.filerAddresses { if filer == addr { @@ -257,7 +257,7 @@ func (fc *FilerClient) ShouldSkipUnhealthyFiler(addr pb.ServerAddress) bool { func (fc *FilerClient) RecordFilerSuccess(addr pb.ServerAddress) { fc.filerAddressesMu.RLock() defer fc.filerAddressesMu.RUnlock() - + // Find the health for this filer address for i, filer := range fc.filerAddresses { if filer == addr { @@ -273,7 +273,7 @@ func (fc *FilerClient) RecordFilerSuccess(addr pb.ServerAddress) { func (fc *FilerClient) RecordFilerFailure(addr pb.ServerAddress) { fc.filerAddressesMu.RLock() defer fc.filerAddressesMu.RUnlock() - + // Find the health for this filer address for i, filer := range fc.filerAddresses { if filer == addr { @@ -303,13 +303,13 @@ func (fc *FilerClient) discoverFilers() { glog.Errorf("FilerClient: panic in filer discovery goroutine for group '%s': %v", fc.filerGroup, r) } }() - + // Do an initial discovery fc.refreshFilerList() - + ticker := time.NewTicker(fc.discoveryInterval) defer ticker.Stop() - + for { select { case <-ticker.C: @@ -326,22 +326,22 @@ func (fc *FilerClient) refreshFilerList() { if fc.masterClient == nil { return } - + // Get current master address currentMaster := fc.masterClient.GetMaster(context.Background()) if currentMaster == "" { glog.V(1).Infof("FilerClient: no master available for filer discovery") return } - + // Query master for filers in our group updates := cluster.ListExistingPeerUpdates(currentMaster, fc.grpcDialOption, fc.filerGroup, cluster.FilerType) - + if len(updates) == 0 { glog.V(2).Infof("FilerClient: no filers found in group '%s'", fc.filerGroup) return } - + // Build new filer address list discoveredFilers := make(map[pb.ServerAddress]bool) for _, update := range updates { @@ -349,17 +349,17 @@ func (fc *FilerClient) refreshFilerList() { discoveredFilers[pb.ServerAddress(update.Address)] = true } } - + // Thread-safe update of filer list fc.filerAddressesMu.Lock() defer fc.filerAddressesMu.Unlock() - + // Build a map of existing filers for efficient O(1) lookup existingFilers := make(map[pb.ServerAddress]struct{}, len(fc.filerAddresses)) for _, f := range fc.filerAddresses { existingFilers[f] = struct{}{} } - + // Find new filers - O(N+M) instead of O(N*M) var newFilers []pb.ServerAddress for addr := range discoveredFilers { @@ -367,18 +367,18 @@ func (fc *FilerClient) refreshFilerList() { newFilers = append(newFilers, addr) } } - + // Add new filers if len(newFilers) > 0 { glog.V(0).Infof("FilerClient: discovered %d new filer(s) in group '%s': %v", len(newFilers), fc.filerGroup, newFilers) fc.filerAddresses = append(fc.filerAddresses, newFilers...) - + // Initialize health tracking for new filers for range newFilers { fc.filerHealth = append(fc.filerHealth, &filerHealth{}) } } - + // Optionally, remove filers that are no longer in the cluster // For now, we keep all filers and rely on health checks to avoid dead ones // This prevents removing filers that might be temporarily unavailable @@ -571,7 +571,7 @@ func (p *filerVolumeProvider) LookupVolumeIds(ctx context.Context, volumeIds []s // Try all filer addresses with round-robin starting from current index // Skip known-unhealthy filers (circuit breaker pattern) i := atomic.LoadInt32(&fc.filerIndex) - + // Get filer count with read lock fc.filerAddressesMu.RLock() n := int32(len(fc.filerAddresses)) @@ -589,12 +589,12 @@ func (p *filerVolumeProvider) LookupVolumeIds(ctx context.Context, volumeIds []s // Filer list changed, reset index i = 0 } - + // Get health pointer while holding lock health := fc.filerHealth[i] filerAddress := fc.filerAddresses[i] fc.filerAddressesMu.RUnlock() - + // Circuit breaker: skip unhealthy filers (no lock needed - uses atomics) if fc.shouldSkipUnhealthyFilerWithHealth(health) { glog.V(2).Infof("FilerClient: skipping unhealthy filer %s (consecutive failures: %d)", |
