aboutsummaryrefslogtreecommitdiff
path: root/weed/filer_client
diff options
context:
space:
mode:
Diffstat (limited to 'weed/filer_client')
-rw-r--r--weed/filer_client/filer_client_accessor.go190
-rw-r--r--weed/filer_client/filer_discovery.go199
2 files changed, 387 insertions, 2 deletions
diff --git a/weed/filer_client/filer_client_accessor.go b/weed/filer_client/filer_client_accessor.go
index 9ec90195b..955a295cc 100644
--- a/weed/filer_client/filer_client_accessor.go
+++ b/weed/filer_client/filer_client_accessor.go
@@ -1,6 +1,12 @@
package filer_client
import (
+ "fmt"
+ "math/rand"
+ "sync"
+ "sync/atomic"
+ "time"
+
"github.com/seaweedfs/seaweedfs/weed/glog"
"github.com/seaweedfs/seaweedfs/weed/mq/topic"
"github.com/seaweedfs/seaweedfs/weed/pb"
@@ -9,13 +15,155 @@ import (
"google.golang.org/grpc"
)
+// filerHealth tracks the health status of a filer
+type filerHealth struct {
+ address pb.ServerAddress
+ failureCount int32
+ lastFailure time.Time
+ backoffUntil time.Time
+}
+
+// isHealthy returns true if the filer is not in backoff period
+func (fh *filerHealth) isHealthy() bool {
+ return time.Now().After(fh.backoffUntil)
+}
+
+// recordFailure updates failure count and sets backoff time using exponential backoff
+func (fh *filerHealth) recordFailure() {
+ count := atomic.AddInt32(&fh.failureCount, 1)
+ fh.lastFailure = time.Now()
+
+ // Exponential backoff: 1s, 2s, 4s, 8s, 16s, 32s, max 30s
+ // Calculate 2^(count-1) but cap the result at 30 seconds
+ backoffSeconds := 1 << (count - 1)
+ if backoffSeconds > 30 {
+ backoffSeconds = 30
+ }
+ fh.backoffUntil = time.Now().Add(time.Duration(backoffSeconds) * time.Second)
+
+ glog.V(1).Infof("Filer %v failed %d times, backing off for %ds", fh.address, count, backoffSeconds)
+}
+
+// recordSuccess resets failure count and clears backoff
+func (fh *filerHealth) recordSuccess() {
+ atomic.StoreInt32(&fh.failureCount, 0)
+ fh.backoffUntil = time.Time{}
+}
+
type FilerClientAccessor struct {
- GetFiler func() pb.ServerAddress
GetGrpcDialOption func() grpc.DialOption
+ GetFilers func() []pb.ServerAddress // Returns multiple filer addresses for failover
+
+ // Health tracking for smart failover
+ filerHealthMap sync.Map // map[pb.ServerAddress]*filerHealth
+}
+
+// getOrCreateFilerHealth returns the health tracker for a filer, creating one if needed
+func (fca *FilerClientAccessor) getOrCreateFilerHealth(address pb.ServerAddress) *filerHealth {
+ if health, ok := fca.filerHealthMap.Load(address); ok {
+ return health.(*filerHealth)
+ }
+
+ newHealth := &filerHealth{
+ address: address,
+ failureCount: 0,
+ backoffUntil: time.Time{},
+ }
+
+ actual, _ := fca.filerHealthMap.LoadOrStore(address, newHealth)
+ return actual.(*filerHealth)
+}
+
+// partitionFilers separates filers into healthy and backoff groups
+func (fca *FilerClientAccessor) partitionFilers(filers []pb.ServerAddress) (healthy, backoff []pb.ServerAddress) {
+ for _, filer := range filers {
+ health := fca.getOrCreateFilerHealth(filer)
+ if health.isHealthy() {
+ healthy = append(healthy, filer)
+ } else {
+ backoff = append(backoff, filer)
+ }
+ }
+ return healthy, backoff
+}
+
+// shuffleFilers randomizes the order of filers to distribute load
+func (fca *FilerClientAccessor) shuffleFilers(filers []pb.ServerAddress) []pb.ServerAddress {
+ if len(filers) <= 1 {
+ return filers
+ }
+
+ shuffled := make([]pb.ServerAddress, len(filers))
+ copy(shuffled, filers)
+
+ // Fisher-Yates shuffle
+ for i := len(shuffled) - 1; i > 0; i-- {
+ j := rand.Intn(i + 1)
+ shuffled[i], shuffled[j] = shuffled[j], shuffled[i]
+ }
+
+ return shuffled
}
func (fca *FilerClientAccessor) WithFilerClient(streamingMode bool, fn func(filer_pb.SeaweedFilerClient) error) error {
- return pb.WithFilerClient(streamingMode, 0, fca.GetFiler(), fca.GetGrpcDialOption(), fn)
+ return fca.withMultipleFilers(streamingMode, fn)
+}
+
+// withMultipleFilers tries each filer with smart failover and backoff logic
+func (fca *FilerClientAccessor) withMultipleFilers(streamingMode bool, fn func(filer_pb.SeaweedFilerClient) error) error {
+ filers := fca.GetFilers()
+ if len(filers) == 0 {
+ return fmt.Errorf("no filer addresses available")
+ }
+
+ // Partition filers into healthy and backoff groups
+ healthyFilers, backoffFilers := fca.partitionFilers(filers)
+
+ // Shuffle healthy filers to distribute load evenly
+ healthyFilers = fca.shuffleFilers(healthyFilers)
+
+ // Try healthy filers first
+ var lastErr error
+ for _, filerAddress := range healthyFilers {
+ health := fca.getOrCreateFilerHealth(filerAddress)
+
+ err := pb.WithFilerClient(streamingMode, 0, filerAddress, fca.GetGrpcDialOption(), fn)
+ if err == nil {
+ // Success - record it and return
+ health.recordSuccess()
+ glog.V(2).Infof("Filer %v succeeded", filerAddress)
+ return nil
+ }
+
+ // Record failure and continue to next filer
+ health.recordFailure()
+ lastErr = err
+ glog.V(1).Infof("Healthy filer %v failed: %v, trying next", filerAddress, err)
+ }
+
+ // If all healthy filers failed, try backoff filers as last resort
+ if len(backoffFilers) > 0 {
+ glog.V(1).Infof("All healthy filers failed, trying %d backoff filers", len(backoffFilers))
+
+ for _, filerAddress := range backoffFilers {
+ health := fca.getOrCreateFilerHealth(filerAddress)
+
+ err := pb.WithFilerClient(streamingMode, 0, filerAddress, fca.GetGrpcDialOption(), fn)
+ if err == nil {
+ // Success - record it and return
+ health.recordSuccess()
+ glog.V(1).Infof("Backoff filer %v recovered and succeeded", filerAddress)
+ return nil
+ }
+
+ // Update failure record
+ health.recordFailure()
+ lastErr = err
+ glog.V(1).Infof("Backoff filer %v still failing: %v", filerAddress, err)
+ }
+ }
+
+ return fmt.Errorf("all filer connections failed, last error: %v", lastErr)
}
func (fca *FilerClientAccessor) SaveTopicConfToFiler(t topic.Topic, conf *mq_pb.ConfigureTopicResponse) error {
@@ -56,3 +204,41 @@ func (fca *FilerClientAccessor) ReadTopicConfFromFilerWithMetadata(t topic.Topic
return conf, createdAtNs, modifiedAtNs, nil
}
+
+// NewFilerClientAccessor creates a FilerClientAccessor with one or more filers
+func NewFilerClientAccessor(filerAddresses []pb.ServerAddress, grpcDialOption grpc.DialOption) *FilerClientAccessor {
+ if len(filerAddresses) == 0 {
+ panic("at least one filer address is required")
+ }
+
+ return &FilerClientAccessor{
+ GetGrpcDialOption: func() grpc.DialOption {
+ return grpcDialOption
+ },
+ GetFilers: func() []pb.ServerAddress {
+ return filerAddresses
+ },
+ filerHealthMap: sync.Map{},
+ }
+}
+
+// AddFilerAddresses adds more filer addresses to the existing list
+func (fca *FilerClientAccessor) AddFilerAddresses(additionalFilers []pb.ServerAddress) {
+ if len(additionalFilers) == 0 {
+ return
+ }
+
+ // Get the current filers if available
+ var allFilers []pb.ServerAddress
+ if fca.GetFilers != nil {
+ allFilers = append(allFilers, fca.GetFilers()...)
+ }
+
+ // Add the additional filers
+ allFilers = append(allFilers, additionalFilers...)
+
+ // Update the filers list
+ fca.GetFilers = func() []pb.ServerAddress {
+ return allFilers
+ }
+}
diff --git a/weed/filer_client/filer_discovery.go b/weed/filer_client/filer_discovery.go
new file mode 100644
index 000000000..0729bae98
--- /dev/null
+++ b/weed/filer_client/filer_discovery.go
@@ -0,0 +1,199 @@
+package filer_client
+
+import (
+ "context"
+ "fmt"
+ "sync"
+ "time"
+
+ "github.com/seaweedfs/seaweedfs/weed/cluster"
+ "github.com/seaweedfs/seaweedfs/weed/glog"
+ "github.com/seaweedfs/seaweedfs/weed/pb"
+ "github.com/seaweedfs/seaweedfs/weed/pb/master_pb"
+ "google.golang.org/grpc"
+)
+
+const (
+ // FilerDiscoveryInterval is the interval for refreshing filer list from masters
+ FilerDiscoveryInterval = 30 * time.Second
+ // InitialDiscoveryInterval is the faster interval for initial discovery
+ InitialDiscoveryInterval = 5 * time.Second
+ // InitialDiscoveryRetries is the number of fast retries during startup
+ InitialDiscoveryRetries = 6 // 6 retries * 5 seconds = 30 seconds total
+)
+
+// FilerDiscoveryService handles dynamic discovery and refresh of filers from masters
+type FilerDiscoveryService struct {
+ masters []pb.ServerAddress
+ grpcDialOption grpc.DialOption
+ filers []pb.ServerAddress
+ filersMutex sync.RWMutex
+ refreshTicker *time.Ticker
+ stopChan chan struct{}
+ wg sync.WaitGroup
+ initialRetries int
+}
+
+// NewFilerDiscoveryService creates a new filer discovery service
+func NewFilerDiscoveryService(masters []pb.ServerAddress, grpcDialOption grpc.DialOption) *FilerDiscoveryService {
+ return &FilerDiscoveryService{
+ masters: masters,
+ grpcDialOption: grpcDialOption,
+ filers: make([]pb.ServerAddress, 0),
+ stopChan: make(chan struct{}),
+ }
+}
+
+// No need for convertHTTPToGRPC - pb.ServerAddress.ToGrpcAddress() already handles this
+
+// discoverFilersFromMaster discovers filers from a single master
+func (fds *FilerDiscoveryService) discoverFilersFromMaster(masterAddr pb.ServerAddress) ([]pb.ServerAddress, error) {
+ // Convert HTTP master address to gRPC address (HTTP port + 10000)
+ grpcAddr := masterAddr.ToGrpcAddress()
+ glog.Infof("FILER DISCOVERY: Connecting to master gRPC at %s (converted from HTTP %s)", grpcAddr, masterAddr)
+
+ conn, err := grpc.Dial(grpcAddr, fds.grpcDialOption)
+ if err != nil {
+ return nil, fmt.Errorf("failed to connect to master at %s: %v", grpcAddr, err)
+ }
+ defer conn.Close()
+
+ client := master_pb.NewSeaweedClient(conn)
+ ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
+ defer cancel()
+
+ resp, err := client.ListClusterNodes(ctx, &master_pb.ListClusterNodesRequest{
+ ClientType: cluster.FilerType,
+ })
+ if err != nil {
+ glog.Errorf("FILER DISCOVERY: ListClusterNodes failed for master %s: %v", masterAddr, err)
+ return nil, fmt.Errorf("failed to list filers from master %s: %v", masterAddr, err)
+ }
+
+ glog.Infof("FILER DISCOVERY: ListClusterNodes returned %d nodes from master %s", len(resp.ClusterNodes), masterAddr)
+
+ var filers []pb.ServerAddress
+ for _, node := range resp.ClusterNodes {
+ glog.Infof("FILER DISCOVERY: Found filer HTTP address %s", node.Address)
+ // Return HTTP address (lock client will convert to gRPC when needed)
+ filers = append(filers, pb.ServerAddress(node.Address))
+ }
+
+ glog.Infof("FILER DISCOVERY: Returning %d filers from master %s", len(filers), masterAddr)
+
+ return filers, nil
+}
+
+// refreshFilers discovers filers from all masters and updates the filer list
+func (fds *FilerDiscoveryService) refreshFilers() {
+ glog.V(2).Info("Refreshing filer list from masters")
+
+ var allFilers []pb.ServerAddress
+ var discoveryErrors []error
+
+ // Try each master to discover filers
+ for _, masterAddr := range fds.masters {
+ filers, err := fds.discoverFilersFromMaster(masterAddr)
+ if err != nil {
+ discoveryErrors = append(discoveryErrors, err)
+ glog.V(1).Infof("Failed to discover filers from master %s: %v", masterAddr, err)
+ continue
+ }
+
+ allFilers = append(allFilers, filers...)
+ glog.V(2).Infof("Discovered %d filers from master %s", len(filers), masterAddr)
+ }
+
+ // Deduplicate filers
+ filerSet := make(map[pb.ServerAddress]bool)
+ for _, filer := range allFilers {
+ filerSet[filer] = true
+ }
+
+ uniqueFilers := make([]pb.ServerAddress, 0, len(filerSet))
+ for filer := range filerSet {
+ uniqueFilers = append(uniqueFilers, filer)
+ }
+
+ // Update the filer list
+ fds.filersMutex.Lock()
+ oldCount := len(fds.filers)
+ fds.filers = uniqueFilers
+ newCount := len(fds.filers)
+ fds.filersMutex.Unlock()
+
+ if newCount > 0 {
+ glog.V(1).Infof("Filer discovery successful: updated from %d to %d filers", oldCount, newCount)
+ } else if len(discoveryErrors) > 0 {
+ glog.Warningf("Failed to discover any filers from %d masters, keeping existing %d filers", len(fds.masters), oldCount)
+ }
+}
+
+// GetFilers returns the current list of filers
+func (fds *FilerDiscoveryService) GetFilers() []pb.ServerAddress {
+ fds.filersMutex.RLock()
+ defer fds.filersMutex.RUnlock()
+
+ // Return a copy to avoid concurrent modification
+ filers := make([]pb.ServerAddress, len(fds.filers))
+ copy(filers, fds.filers)
+ return filers
+}
+
+// Start begins the filer discovery service
+func (fds *FilerDiscoveryService) Start() error {
+ glog.V(1).Info("Starting filer discovery service")
+
+ // Initial discovery
+ fds.refreshFilers()
+
+ // Start with faster discovery during startup
+ fds.initialRetries = InitialDiscoveryRetries
+ interval := InitialDiscoveryInterval
+ if len(fds.GetFilers()) > 0 {
+ // If we found filers immediately, use normal interval
+ interval = FilerDiscoveryInterval
+ fds.initialRetries = 0
+ }
+
+ // Start periodic refresh
+ fds.refreshTicker = time.NewTicker(interval)
+ fds.wg.Add(1)
+ go func() {
+ defer fds.wg.Done()
+ for {
+ select {
+ case <-fds.refreshTicker.C:
+ fds.refreshFilers()
+
+ // Switch to normal interval after initial retries
+ if fds.initialRetries > 0 {
+ fds.initialRetries--
+ if fds.initialRetries == 0 || len(fds.GetFilers()) > 0 {
+ glog.V(1).Info("Switching to normal filer discovery interval")
+ fds.refreshTicker.Stop()
+ fds.refreshTicker = time.NewTicker(FilerDiscoveryInterval)
+ }
+ }
+ case <-fds.stopChan:
+ glog.V(1).Info("Filer discovery service stopping")
+ return
+ }
+ }
+ }()
+
+ return nil
+}
+
+// Stop stops the filer discovery service
+func (fds *FilerDiscoveryService) Stop() error {
+ glog.V(1).Info("Stopping filer discovery service")
+
+ close(fds.stopChan)
+ if fds.refreshTicker != nil {
+ fds.refreshTicker.Stop()
+ }
+ fds.wg.Wait()
+
+ return nil
+}