aboutsummaryrefslogtreecommitdiff
path: root/weed/filer_client/filer_client_accessor.go
diff options
context:
space:
mode:
Diffstat (limited to 'weed/filer_client/filer_client_accessor.go')
-rw-r--r--weed/filer_client/filer_client_accessor.go190
1 files changed, 188 insertions, 2 deletions
diff --git a/weed/filer_client/filer_client_accessor.go b/weed/filer_client/filer_client_accessor.go
index 9ec90195b..955a295cc 100644
--- a/weed/filer_client/filer_client_accessor.go
+++ b/weed/filer_client/filer_client_accessor.go
@@ -1,6 +1,12 @@
package filer_client
import (
+ "fmt"
+ "math/rand"
+ "sync"
+ "sync/atomic"
+ "time"
+
"github.com/seaweedfs/seaweedfs/weed/glog"
"github.com/seaweedfs/seaweedfs/weed/mq/topic"
"github.com/seaweedfs/seaweedfs/weed/pb"
@@ -9,13 +15,155 @@ import (
"google.golang.org/grpc"
)
+// filerHealth tracks the health status of a filer
+type filerHealth struct {
+ address pb.ServerAddress
+ failureCount int32
+ lastFailure time.Time
+ backoffUntil time.Time
+}
+
+// isHealthy returns true if the filer is not in backoff period
+func (fh *filerHealth) isHealthy() bool {
+ return time.Now().After(fh.backoffUntil)
+}
+
+// recordFailure updates failure count and sets backoff time using exponential backoff
+func (fh *filerHealth) recordFailure() {
+ count := atomic.AddInt32(&fh.failureCount, 1)
+ fh.lastFailure = time.Now()
+
+ // Exponential backoff: 1s, 2s, 4s, 8s, 16s, 32s, max 30s
+ // Calculate 2^(count-1) but cap the result at 30 seconds
+ backoffSeconds := 1 << (count - 1)
+ if backoffSeconds > 30 {
+ backoffSeconds = 30
+ }
+ fh.backoffUntil = time.Now().Add(time.Duration(backoffSeconds) * time.Second)
+
+ glog.V(1).Infof("Filer %v failed %d times, backing off for %ds", fh.address, count, backoffSeconds)
+}
+
+// recordSuccess resets failure count and clears backoff
+func (fh *filerHealth) recordSuccess() {
+ atomic.StoreInt32(&fh.failureCount, 0)
+ fh.backoffUntil = time.Time{}
+}
+
type FilerClientAccessor struct {
- GetFiler func() pb.ServerAddress
GetGrpcDialOption func() grpc.DialOption
+ GetFilers func() []pb.ServerAddress // Returns multiple filer addresses for failover
+
+ // Health tracking for smart failover
+ filerHealthMap sync.Map // map[pb.ServerAddress]*filerHealth
+}
+
+// getOrCreateFilerHealth returns the health tracker for a filer, creating one if needed
+func (fca *FilerClientAccessor) getOrCreateFilerHealth(address pb.ServerAddress) *filerHealth {
+ if health, ok := fca.filerHealthMap.Load(address); ok {
+ return health.(*filerHealth)
+ }
+
+ newHealth := &filerHealth{
+ address: address,
+ failureCount: 0,
+ backoffUntil: time.Time{},
+ }
+
+ actual, _ := fca.filerHealthMap.LoadOrStore(address, newHealth)
+ return actual.(*filerHealth)
+}
+
+// partitionFilers separates filers into healthy and backoff groups
+func (fca *FilerClientAccessor) partitionFilers(filers []pb.ServerAddress) (healthy, backoff []pb.ServerAddress) {
+ for _, filer := range filers {
+ health := fca.getOrCreateFilerHealth(filer)
+ if health.isHealthy() {
+ healthy = append(healthy, filer)
+ } else {
+ backoff = append(backoff, filer)
+ }
+ }
+ return healthy, backoff
+}
+
+// shuffleFilers randomizes the order of filers to distribute load
+func (fca *FilerClientAccessor) shuffleFilers(filers []pb.ServerAddress) []pb.ServerAddress {
+ if len(filers) <= 1 {
+ return filers
+ }
+
+ shuffled := make([]pb.ServerAddress, len(filers))
+ copy(shuffled, filers)
+
+ // Fisher-Yates shuffle
+ for i := len(shuffled) - 1; i > 0; i-- {
+ j := rand.Intn(i + 1)
+ shuffled[i], shuffled[j] = shuffled[j], shuffled[i]
+ }
+
+ return shuffled
}
func (fca *FilerClientAccessor) WithFilerClient(streamingMode bool, fn func(filer_pb.SeaweedFilerClient) error) error {
- return pb.WithFilerClient(streamingMode, 0, fca.GetFiler(), fca.GetGrpcDialOption(), fn)
+ return fca.withMultipleFilers(streamingMode, fn)
+}
+
+// withMultipleFilers tries each filer with smart failover and backoff logic
+func (fca *FilerClientAccessor) withMultipleFilers(streamingMode bool, fn func(filer_pb.SeaweedFilerClient) error) error {
+ filers := fca.GetFilers()
+ if len(filers) == 0 {
+ return fmt.Errorf("no filer addresses available")
+ }
+
+ // Partition filers into healthy and backoff groups
+ healthyFilers, backoffFilers := fca.partitionFilers(filers)
+
+ // Shuffle healthy filers to distribute load evenly
+ healthyFilers = fca.shuffleFilers(healthyFilers)
+
+ // Try healthy filers first
+ var lastErr error
+ for _, filerAddress := range healthyFilers {
+ health := fca.getOrCreateFilerHealth(filerAddress)
+
+ err := pb.WithFilerClient(streamingMode, 0, filerAddress, fca.GetGrpcDialOption(), fn)
+ if err == nil {
+ // Success - record it and return
+ health.recordSuccess()
+ glog.V(2).Infof("Filer %v succeeded", filerAddress)
+ return nil
+ }
+
+ // Record failure and continue to next filer
+ health.recordFailure()
+ lastErr = err
+ glog.V(1).Infof("Healthy filer %v failed: %v, trying next", filerAddress, err)
+ }
+
+ // If all healthy filers failed, try backoff filers as last resort
+ if len(backoffFilers) > 0 {
+ glog.V(1).Infof("All healthy filers failed, trying %d backoff filers", len(backoffFilers))
+
+ for _, filerAddress := range backoffFilers {
+ health := fca.getOrCreateFilerHealth(filerAddress)
+
+ err := pb.WithFilerClient(streamingMode, 0, filerAddress, fca.GetGrpcDialOption(), fn)
+ if err == nil {
+ // Success - record it and return
+ health.recordSuccess()
+ glog.V(1).Infof("Backoff filer %v recovered and succeeded", filerAddress)
+ return nil
+ }
+
+ // Update failure record
+ health.recordFailure()
+ lastErr = err
+ glog.V(1).Infof("Backoff filer %v still failing: %v", filerAddress, err)
+ }
+ }
+
+ return fmt.Errorf("all filer connections failed, last error: %v", lastErr)
}
func (fca *FilerClientAccessor) SaveTopicConfToFiler(t topic.Topic, conf *mq_pb.ConfigureTopicResponse) error {
@@ -56,3 +204,41 @@ func (fca *FilerClientAccessor) ReadTopicConfFromFilerWithMetadata(t topic.Topic
return conf, createdAtNs, modifiedAtNs, nil
}
+
+// NewFilerClientAccessor creates a FilerClientAccessor with one or more filers
+func NewFilerClientAccessor(filerAddresses []pb.ServerAddress, grpcDialOption grpc.DialOption) *FilerClientAccessor {
+ if len(filerAddresses) == 0 {
+ panic("at least one filer address is required")
+ }
+
+ return &FilerClientAccessor{
+ GetGrpcDialOption: func() grpc.DialOption {
+ return grpcDialOption
+ },
+ GetFilers: func() []pb.ServerAddress {
+ return filerAddresses
+ },
+ filerHealthMap: sync.Map{},
+ }
+}
+
+// AddFilerAddresses adds more filer addresses to the existing list
+func (fca *FilerClientAccessor) AddFilerAddresses(additionalFilers []pb.ServerAddress) {
+ if len(additionalFilers) == 0 {
+ return
+ }
+
+ // Get the current filers if available
+ var allFilers []pb.ServerAddress
+ if fca.GetFilers != nil {
+ allFilers = append(allFilers, fca.GetFilers()...)
+ }
+
+ // Add the additional filers
+ allFilers = append(allFilers, additionalFilers...)
+
+ // Update the filers list
+ fca.GetFilers = func() []pb.ServerAddress {
+ return allFilers
+ }
+}