diff options
Diffstat (limited to 'weed/s3api/s3api_handlers.go')
| -rw-r--r-- | weed/s3api/s3api_handlers.go | 68 |
1 files changed, 66 insertions, 2 deletions
diff --git a/weed/s3api/s3api_handlers.go b/weed/s3api/s3api_handlers.go index c146a8b15..6c47e8256 100644 --- a/weed/s3api/s3api_handlers.go +++ b/weed/s3api/s3api_handlers.go @@ -5,6 +5,7 @@ import ( "fmt" "net/http" + "github.com/seaweedfs/seaweedfs/weed/glog" "github.com/seaweedfs/seaweedfs/weed/s3api/s3err" "google.golang.org/grpc" @@ -15,12 +16,75 @@ import ( var _ = filer_pb.FilerClient(&S3ApiServer{}) func (s3a *S3ApiServer) WithFilerClient(streamingMode bool, fn func(filer_pb.SeaweedFilerClient) error) error { - + // Use filerClient for proper connection management and failover + if s3a.filerClient != nil { + return s3a.withFilerClientFailover(streamingMode, fn) + } + + // Fallback to direct connection if filerClient not initialized + // This should only happen during initialization or testing return pb.WithGrpcClient(streamingMode, s3a.randomClientId, func(grpcConnection *grpc.ClientConn) error { client := filer_pb.NewSeaweedFilerClient(grpcConnection) return fn(client) - }, s3a.option.Filer.ToGrpcAddress(), false, s3a.option.GrpcDialOption) + }, s3a.getFilerAddress().ToGrpcAddress(), false, s3a.option.GrpcDialOption) + +} +// withFilerClientFailover attempts to execute fn with automatic failover to other filers +func (s3a *S3ApiServer) withFilerClientFailover(streamingMode bool, fn func(filer_pb.SeaweedFilerClient) error) error { + // Get current filer as starting point + currentFiler := s3a.filerClient.GetCurrentFiler() + + // Try current filer first (fast path) + err := pb.WithGrpcClient(streamingMode, s3a.randomClientId, func(grpcConnection *grpc.ClientConn) error { + client := filer_pb.NewSeaweedFilerClient(grpcConnection) + return fn(client) + }, currentFiler.ToGrpcAddress(), false, s3a.option.GrpcDialOption) + + if err == nil { + s3a.filerClient.RecordFilerSuccess(currentFiler) + return nil + } + + // Record failure for current filer + s3a.filerClient.RecordFilerFailure(currentFiler) + + // Current filer failed - try all other filers with health-aware selection + filers := s3a.filerClient.GetAllFilers() + var lastErr error = err + + for _, filer := range filers { + if filer == currentFiler { + continue // Already tried this one + } + + // Skip filers known to be unhealthy (circuit breaker pattern) + if s3a.filerClient.ShouldSkipUnhealthyFiler(filer) { + glog.V(2).Infof("WithFilerClient: skipping unhealthy filer %s", filer) + continue + } + + err = pb.WithGrpcClient(streamingMode, s3a.randomClientId, func(grpcConnection *grpc.ClientConn) error { + client := filer_pb.NewSeaweedFilerClient(grpcConnection) + return fn(client) + }, filer.ToGrpcAddress(), false, s3a.option.GrpcDialOption) + + if err == nil { + // Success! Record success and update current filer for future requests + s3a.filerClient.RecordFilerSuccess(filer) + s3a.filerClient.SetCurrentFiler(filer) + glog.V(1).Infof("WithFilerClient: failover from %s to %s succeeded", currentFiler, filer) + return nil + } + + // Record failure for health tracking + s3a.filerClient.RecordFilerFailure(filer) + glog.V(2).Infof("WithFilerClient: failover to %s failed: %v", filer, err) + lastErr = err + } + + // All filers failed + return fmt.Errorf("all filers failed, last error: %w", lastErr) } func (s3a *S3ApiServer) AdjustedUrl(location *filer_pb.Location) string { |
