aboutsummaryrefslogtreecommitdiff
path: root/weed/s3api/s3api_handlers.go
diff options
context:
space:
mode:
Diffstat (limited to 'weed/s3api/s3api_handlers.go')
-rw-r--r--weed/s3api/s3api_handlers.go68
1 files changed, 66 insertions, 2 deletions
diff --git a/weed/s3api/s3api_handlers.go b/weed/s3api/s3api_handlers.go
index c146a8b15..6c47e8256 100644
--- a/weed/s3api/s3api_handlers.go
+++ b/weed/s3api/s3api_handlers.go
@@ -5,6 +5,7 @@ import (
"fmt"
"net/http"
+ "github.com/seaweedfs/seaweedfs/weed/glog"
"github.com/seaweedfs/seaweedfs/weed/s3api/s3err"
"google.golang.org/grpc"
@@ -15,12 +16,75 @@ import (
var _ = filer_pb.FilerClient(&S3ApiServer{})
func (s3a *S3ApiServer) WithFilerClient(streamingMode bool, fn func(filer_pb.SeaweedFilerClient) error) error {
-
+ // Use filerClient for proper connection management and failover
+ if s3a.filerClient != nil {
+ return s3a.withFilerClientFailover(streamingMode, fn)
+ }
+
+ // Fallback to direct connection if filerClient not initialized
+ // This should only happen during initialization or testing
return pb.WithGrpcClient(streamingMode, s3a.randomClientId, func(grpcConnection *grpc.ClientConn) error {
client := filer_pb.NewSeaweedFilerClient(grpcConnection)
return fn(client)
- }, s3a.option.Filer.ToGrpcAddress(), false, s3a.option.GrpcDialOption)
+ }, s3a.getFilerAddress().ToGrpcAddress(), false, s3a.option.GrpcDialOption)
+
+}
+// withFilerClientFailover attempts to execute fn with automatic failover to other filers
+func (s3a *S3ApiServer) withFilerClientFailover(streamingMode bool, fn func(filer_pb.SeaweedFilerClient) error) error {
+ // Get current filer as starting point
+ currentFiler := s3a.filerClient.GetCurrentFiler()
+
+ // Try current filer first (fast path)
+ err := pb.WithGrpcClient(streamingMode, s3a.randomClientId, func(grpcConnection *grpc.ClientConn) error {
+ client := filer_pb.NewSeaweedFilerClient(grpcConnection)
+ return fn(client)
+ }, currentFiler.ToGrpcAddress(), false, s3a.option.GrpcDialOption)
+
+ if err == nil {
+ s3a.filerClient.RecordFilerSuccess(currentFiler)
+ return nil
+ }
+
+ // Record failure for current filer
+ s3a.filerClient.RecordFilerFailure(currentFiler)
+
+ // Current filer failed - try all other filers with health-aware selection
+ filers := s3a.filerClient.GetAllFilers()
+ var lastErr error = err
+
+ for _, filer := range filers {
+ if filer == currentFiler {
+ continue // Already tried this one
+ }
+
+ // Skip filers known to be unhealthy (circuit breaker pattern)
+ if s3a.filerClient.ShouldSkipUnhealthyFiler(filer) {
+ glog.V(2).Infof("WithFilerClient: skipping unhealthy filer %s", filer)
+ continue
+ }
+
+ err = pb.WithGrpcClient(streamingMode, s3a.randomClientId, func(grpcConnection *grpc.ClientConn) error {
+ client := filer_pb.NewSeaweedFilerClient(grpcConnection)
+ return fn(client)
+ }, filer.ToGrpcAddress(), false, s3a.option.GrpcDialOption)
+
+ if err == nil {
+ // Success! Record success and update current filer for future requests
+ s3a.filerClient.RecordFilerSuccess(filer)
+ s3a.filerClient.SetCurrentFiler(filer)
+ glog.V(1).Infof("WithFilerClient: failover from %s to %s succeeded", currentFiler, filer)
+ return nil
+ }
+
+ // Record failure for health tracking
+ s3a.filerClient.RecordFilerFailure(filer)
+ glog.V(2).Infof("WithFilerClient: failover to %s failed: %v", filer, err)
+ lastErr = err
+ }
+
+ // All filers failed
+ return fmt.Errorf("all filers failed, last error: %w", lastErr)
}
func (s3a *S3ApiServer) AdjustedUrl(location *filer_pb.Location) string {