diff options
Diffstat (limited to 'weed/filer_client')
| -rw-r--r-- | weed/filer_client/filer_client_accessor.go | 190 | ||||
| -rw-r--r-- | weed/filer_client/filer_discovery.go | 199 |
2 files changed, 387 insertions, 2 deletions
diff --git a/weed/filer_client/filer_client_accessor.go b/weed/filer_client/filer_client_accessor.go index 9ec90195b..955a295cc 100644 --- a/weed/filer_client/filer_client_accessor.go +++ b/weed/filer_client/filer_client_accessor.go @@ -1,6 +1,12 @@ package filer_client import ( + "fmt" + "math/rand" + "sync" + "sync/atomic" + "time" + "github.com/seaweedfs/seaweedfs/weed/glog" "github.com/seaweedfs/seaweedfs/weed/mq/topic" "github.com/seaweedfs/seaweedfs/weed/pb" @@ -9,13 +15,155 @@ import ( "google.golang.org/grpc" ) +// filerHealth tracks the health status of a filer +type filerHealth struct { + address pb.ServerAddress + failureCount int32 + lastFailure time.Time + backoffUntil time.Time +} + +// isHealthy returns true if the filer is not in backoff period +func (fh *filerHealth) isHealthy() bool { + return time.Now().After(fh.backoffUntil) +} + +// recordFailure updates failure count and sets backoff time using exponential backoff +func (fh *filerHealth) recordFailure() { + count := atomic.AddInt32(&fh.failureCount, 1) + fh.lastFailure = time.Now() + + // Exponential backoff: 1s, 2s, 4s, 8s, 16s, 32s, max 30s + // Calculate 2^(count-1) but cap the result at 30 seconds + backoffSeconds := 1 << (count - 1) + if backoffSeconds > 30 { + backoffSeconds = 30 + } + fh.backoffUntil = time.Now().Add(time.Duration(backoffSeconds) * time.Second) + + glog.V(1).Infof("Filer %v failed %d times, backing off for %ds", fh.address, count, backoffSeconds) +} + +// recordSuccess resets failure count and clears backoff +func (fh *filerHealth) recordSuccess() { + atomic.StoreInt32(&fh.failureCount, 0) + fh.backoffUntil = time.Time{} +} + type FilerClientAccessor struct { - GetFiler func() pb.ServerAddress GetGrpcDialOption func() grpc.DialOption + GetFilers func() []pb.ServerAddress // Returns multiple filer addresses for failover + + // Health tracking for smart failover + filerHealthMap sync.Map // map[pb.ServerAddress]*filerHealth +} + +// getOrCreateFilerHealth returns the health tracker for a filer, creating one if needed +func (fca *FilerClientAccessor) getOrCreateFilerHealth(address pb.ServerAddress) *filerHealth { + if health, ok := fca.filerHealthMap.Load(address); ok { + return health.(*filerHealth) + } + + newHealth := &filerHealth{ + address: address, + failureCount: 0, + backoffUntil: time.Time{}, + } + + actual, _ := fca.filerHealthMap.LoadOrStore(address, newHealth) + return actual.(*filerHealth) +} + +// partitionFilers separates filers into healthy and backoff groups +func (fca *FilerClientAccessor) partitionFilers(filers []pb.ServerAddress) (healthy, backoff []pb.ServerAddress) { + for _, filer := range filers { + health := fca.getOrCreateFilerHealth(filer) + if health.isHealthy() { + healthy = append(healthy, filer) + } else { + backoff = append(backoff, filer) + } + } + return healthy, backoff +} + +// shuffleFilers randomizes the order of filers to distribute load +func (fca *FilerClientAccessor) shuffleFilers(filers []pb.ServerAddress) []pb.ServerAddress { + if len(filers) <= 1 { + return filers + } + + shuffled := make([]pb.ServerAddress, len(filers)) + copy(shuffled, filers) + + // Fisher-Yates shuffle + for i := len(shuffled) - 1; i > 0; i-- { + j := rand.Intn(i + 1) + shuffled[i], shuffled[j] = shuffled[j], shuffled[i] + } + + return shuffled } func (fca *FilerClientAccessor) WithFilerClient(streamingMode bool, fn func(filer_pb.SeaweedFilerClient) error) error { - return pb.WithFilerClient(streamingMode, 0, fca.GetFiler(), fca.GetGrpcDialOption(), fn) + return fca.withMultipleFilers(streamingMode, fn) +} + +// withMultipleFilers tries each filer with smart failover and backoff logic +func (fca *FilerClientAccessor) withMultipleFilers(streamingMode bool, fn func(filer_pb.SeaweedFilerClient) error) error { + filers := fca.GetFilers() + if len(filers) == 0 { + return fmt.Errorf("no filer addresses available") + } + + // Partition filers into healthy and backoff groups + healthyFilers, backoffFilers := fca.partitionFilers(filers) + + // Shuffle healthy filers to distribute load evenly + healthyFilers = fca.shuffleFilers(healthyFilers) + + // Try healthy filers first + var lastErr error + for _, filerAddress := range healthyFilers { + health := fca.getOrCreateFilerHealth(filerAddress) + + err := pb.WithFilerClient(streamingMode, 0, filerAddress, fca.GetGrpcDialOption(), fn) + if err == nil { + // Success - record it and return + health.recordSuccess() + glog.V(2).Infof("Filer %v succeeded", filerAddress) + return nil + } + + // Record failure and continue to next filer + health.recordFailure() + lastErr = err + glog.V(1).Infof("Healthy filer %v failed: %v, trying next", filerAddress, err) + } + + // If all healthy filers failed, try backoff filers as last resort + if len(backoffFilers) > 0 { + glog.V(1).Infof("All healthy filers failed, trying %d backoff filers", len(backoffFilers)) + + for _, filerAddress := range backoffFilers { + health := fca.getOrCreateFilerHealth(filerAddress) + + err := pb.WithFilerClient(streamingMode, 0, filerAddress, fca.GetGrpcDialOption(), fn) + if err == nil { + // Success - record it and return + health.recordSuccess() + glog.V(1).Infof("Backoff filer %v recovered and succeeded", filerAddress) + return nil + } + + // Update failure record + health.recordFailure() + lastErr = err + glog.V(1).Infof("Backoff filer %v still failing: %v", filerAddress, err) + } + } + + return fmt.Errorf("all filer connections failed, last error: %v", lastErr) } func (fca *FilerClientAccessor) SaveTopicConfToFiler(t topic.Topic, conf *mq_pb.ConfigureTopicResponse) error { @@ -56,3 +204,41 @@ func (fca *FilerClientAccessor) ReadTopicConfFromFilerWithMetadata(t topic.Topic return conf, createdAtNs, modifiedAtNs, nil } + +// NewFilerClientAccessor creates a FilerClientAccessor with one or more filers +func NewFilerClientAccessor(filerAddresses []pb.ServerAddress, grpcDialOption grpc.DialOption) *FilerClientAccessor { + if len(filerAddresses) == 0 { + panic("at least one filer address is required") + } + + return &FilerClientAccessor{ + GetGrpcDialOption: func() grpc.DialOption { + return grpcDialOption + }, + GetFilers: func() []pb.ServerAddress { + return filerAddresses + }, + filerHealthMap: sync.Map{}, + } +} + +// AddFilerAddresses adds more filer addresses to the existing list +func (fca *FilerClientAccessor) AddFilerAddresses(additionalFilers []pb.ServerAddress) { + if len(additionalFilers) == 0 { + return + } + + // Get the current filers if available + var allFilers []pb.ServerAddress + if fca.GetFilers != nil { + allFilers = append(allFilers, fca.GetFilers()...) + } + + // Add the additional filers + allFilers = append(allFilers, additionalFilers...) + + // Update the filers list + fca.GetFilers = func() []pb.ServerAddress { + return allFilers + } +} diff --git a/weed/filer_client/filer_discovery.go b/weed/filer_client/filer_discovery.go new file mode 100644 index 000000000..0729bae98 --- /dev/null +++ b/weed/filer_client/filer_discovery.go @@ -0,0 +1,199 @@ +package filer_client + +import ( + "context" + "fmt" + "sync" + "time" + + "github.com/seaweedfs/seaweedfs/weed/cluster" + "github.com/seaweedfs/seaweedfs/weed/glog" + "github.com/seaweedfs/seaweedfs/weed/pb" + "github.com/seaweedfs/seaweedfs/weed/pb/master_pb" + "google.golang.org/grpc" +) + +const ( + // FilerDiscoveryInterval is the interval for refreshing filer list from masters + FilerDiscoveryInterval = 30 * time.Second + // InitialDiscoveryInterval is the faster interval for initial discovery + InitialDiscoveryInterval = 5 * time.Second + // InitialDiscoveryRetries is the number of fast retries during startup + InitialDiscoveryRetries = 6 // 6 retries * 5 seconds = 30 seconds total +) + +// FilerDiscoveryService handles dynamic discovery and refresh of filers from masters +type FilerDiscoveryService struct { + masters []pb.ServerAddress + grpcDialOption grpc.DialOption + filers []pb.ServerAddress + filersMutex sync.RWMutex + refreshTicker *time.Ticker + stopChan chan struct{} + wg sync.WaitGroup + initialRetries int +} + +// NewFilerDiscoveryService creates a new filer discovery service +func NewFilerDiscoveryService(masters []pb.ServerAddress, grpcDialOption grpc.DialOption) *FilerDiscoveryService { + return &FilerDiscoveryService{ + masters: masters, + grpcDialOption: grpcDialOption, + filers: make([]pb.ServerAddress, 0), + stopChan: make(chan struct{}), + } +} + +// No need for convertHTTPToGRPC - pb.ServerAddress.ToGrpcAddress() already handles this + +// discoverFilersFromMaster discovers filers from a single master +func (fds *FilerDiscoveryService) discoverFilersFromMaster(masterAddr pb.ServerAddress) ([]pb.ServerAddress, error) { + // Convert HTTP master address to gRPC address (HTTP port + 10000) + grpcAddr := masterAddr.ToGrpcAddress() + glog.Infof("FILER DISCOVERY: Connecting to master gRPC at %s (converted from HTTP %s)", grpcAddr, masterAddr) + + conn, err := grpc.Dial(grpcAddr, fds.grpcDialOption) + if err != nil { + return nil, fmt.Errorf("failed to connect to master at %s: %v", grpcAddr, err) + } + defer conn.Close() + + client := master_pb.NewSeaweedClient(conn) + ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) + defer cancel() + + resp, err := client.ListClusterNodes(ctx, &master_pb.ListClusterNodesRequest{ + ClientType: cluster.FilerType, + }) + if err != nil { + glog.Errorf("FILER DISCOVERY: ListClusterNodes failed for master %s: %v", masterAddr, err) + return nil, fmt.Errorf("failed to list filers from master %s: %v", masterAddr, err) + } + + glog.Infof("FILER DISCOVERY: ListClusterNodes returned %d nodes from master %s", len(resp.ClusterNodes), masterAddr) + + var filers []pb.ServerAddress + for _, node := range resp.ClusterNodes { + glog.Infof("FILER DISCOVERY: Found filer HTTP address %s", node.Address) + // Return HTTP address (lock client will convert to gRPC when needed) + filers = append(filers, pb.ServerAddress(node.Address)) + } + + glog.Infof("FILER DISCOVERY: Returning %d filers from master %s", len(filers), masterAddr) + + return filers, nil +} + +// refreshFilers discovers filers from all masters and updates the filer list +func (fds *FilerDiscoveryService) refreshFilers() { + glog.V(2).Info("Refreshing filer list from masters") + + var allFilers []pb.ServerAddress + var discoveryErrors []error + + // Try each master to discover filers + for _, masterAddr := range fds.masters { + filers, err := fds.discoverFilersFromMaster(masterAddr) + if err != nil { + discoveryErrors = append(discoveryErrors, err) + glog.V(1).Infof("Failed to discover filers from master %s: %v", masterAddr, err) + continue + } + + allFilers = append(allFilers, filers...) + glog.V(2).Infof("Discovered %d filers from master %s", len(filers), masterAddr) + } + + // Deduplicate filers + filerSet := make(map[pb.ServerAddress]bool) + for _, filer := range allFilers { + filerSet[filer] = true + } + + uniqueFilers := make([]pb.ServerAddress, 0, len(filerSet)) + for filer := range filerSet { + uniqueFilers = append(uniqueFilers, filer) + } + + // Update the filer list + fds.filersMutex.Lock() + oldCount := len(fds.filers) + fds.filers = uniqueFilers + newCount := len(fds.filers) + fds.filersMutex.Unlock() + + if newCount > 0 { + glog.V(1).Infof("Filer discovery successful: updated from %d to %d filers", oldCount, newCount) + } else if len(discoveryErrors) > 0 { + glog.Warningf("Failed to discover any filers from %d masters, keeping existing %d filers", len(fds.masters), oldCount) + } +} + +// GetFilers returns the current list of filers +func (fds *FilerDiscoveryService) GetFilers() []pb.ServerAddress { + fds.filersMutex.RLock() + defer fds.filersMutex.RUnlock() + + // Return a copy to avoid concurrent modification + filers := make([]pb.ServerAddress, len(fds.filers)) + copy(filers, fds.filers) + return filers +} + +// Start begins the filer discovery service +func (fds *FilerDiscoveryService) Start() error { + glog.V(1).Info("Starting filer discovery service") + + // Initial discovery + fds.refreshFilers() + + // Start with faster discovery during startup + fds.initialRetries = InitialDiscoveryRetries + interval := InitialDiscoveryInterval + if len(fds.GetFilers()) > 0 { + // If we found filers immediately, use normal interval + interval = FilerDiscoveryInterval + fds.initialRetries = 0 + } + + // Start periodic refresh + fds.refreshTicker = time.NewTicker(interval) + fds.wg.Add(1) + go func() { + defer fds.wg.Done() + for { + select { + case <-fds.refreshTicker.C: + fds.refreshFilers() + + // Switch to normal interval after initial retries + if fds.initialRetries > 0 { + fds.initialRetries-- + if fds.initialRetries == 0 || len(fds.GetFilers()) > 0 { + glog.V(1).Info("Switching to normal filer discovery interval") + fds.refreshTicker.Stop() + fds.refreshTicker = time.NewTicker(FilerDiscoveryInterval) + } + } + case <-fds.stopChan: + glog.V(1).Info("Filer discovery service stopping") + return + } + } + }() + + return nil +} + +// Stop stops the filer discovery service +func (fds *FilerDiscoveryService) Stop() error { + glog.V(1).Info("Stopping filer discovery service") + + close(fds.stopChan) + if fds.refreshTicker != nil { + fds.refreshTicker.Stop() + } + fds.wg.Wait() + + return nil +} |
