diff options
Diffstat (limited to 'weed/s3api/s3api_server.go')
| -rw-r--r-- | weed/s3api/s3api_server.go | 78 |
1 files changed, 65 insertions, 13 deletions
diff --git a/weed/s3api/s3api_server.go b/weed/s3api/s3api_server.go index 992027fda..dcf3a55f2 100644 --- a/weed/s3api/s3api_server.go +++ b/weed/s3api/s3api_server.go @@ -11,29 +11,31 @@ import ( "strings" "time" + "github.com/gorilla/mux" + "google.golang.org/grpc" + + "github.com/seaweedfs/seaweedfs/weed/cluster" "github.com/seaweedfs/seaweedfs/weed/credential" "github.com/seaweedfs/seaweedfs/weed/filer" "github.com/seaweedfs/seaweedfs/weed/glog" "github.com/seaweedfs/seaweedfs/weed/iam/integration" "github.com/seaweedfs/seaweedfs/weed/iam/policy" "github.com/seaweedfs/seaweedfs/weed/iam/sts" - "github.com/seaweedfs/seaweedfs/weed/pb/s3_pb" - "github.com/seaweedfs/seaweedfs/weed/util/grace" - "github.com/seaweedfs/seaweedfs/weed/wdclient" - - "github.com/gorilla/mux" "github.com/seaweedfs/seaweedfs/weed/pb" + "github.com/seaweedfs/seaweedfs/weed/pb/s3_pb" . "github.com/seaweedfs/seaweedfs/weed/s3api/s3_constants" "github.com/seaweedfs/seaweedfs/weed/s3api/s3err" "github.com/seaweedfs/seaweedfs/weed/security" "github.com/seaweedfs/seaweedfs/weed/util" + "github.com/seaweedfs/seaweedfs/weed/util/grace" util_http "github.com/seaweedfs/seaweedfs/weed/util/http" util_http_client "github.com/seaweedfs/seaweedfs/weed/util/http/client" - "google.golang.org/grpc" + "github.com/seaweedfs/seaweedfs/weed/wdclient" ) type S3ApiServerOption struct { - Filer pb.ServerAddress + Filers []pb.ServerAddress + Masters []pb.ServerAddress // For filer discovery Port int Config string DomainName string @@ -69,6 +71,10 @@ func NewS3ApiServer(router *mux.Router, option *S3ApiServerOption) (s3ApiServer } func NewS3ApiServerWithStore(router *mux.Router, option *S3ApiServerOption, explicitStore string) (s3ApiServer *S3ApiServer, err error) { + if len(option.Filers) == 0 { + return nil, fmt.Errorf("at least one filer address is required") + } + startTsNs := time.Now().UnixNano() v := util.GetViper() @@ -95,9 +101,38 @@ func NewS3ApiServerWithStore(router *mux.Router, option *S3ApiServerOption, expl // Initialize FilerClient for volume location caching // Uses the battle-tested vidMap with filer-based lookups - // S3 API typically connects to a single filer, but wrap in slice for consistency - filerClient := wdclient.NewFilerClient([]pb.ServerAddress{option.Filer}, option.GrpcDialOption, option.DataCenter) - glog.V(0).Infof("S3 API initialized FilerClient for volume location caching") + // Supports multiple filer addresses with automatic failover for high availability + var filerClient *wdclient.FilerClient + if len(option.Masters) > 0 && option.FilerGroup != "" { + // Enable filer discovery via master + masterMap := make(map[string]pb.ServerAddress) + for i, addr := range option.Masters { + masterMap[fmt.Sprintf("master%d", i)] = addr + } + masterClient := wdclient.NewMasterClient(option.GrpcDialOption, option.FilerGroup, cluster.S3Type, "", "", "", *pb.NewServiceDiscoveryFromMap(masterMap)) + + filerClient = wdclient.NewFilerClient(option.Filers, option.GrpcDialOption, option.DataCenter, &wdclient.FilerClientOption{ + MasterClient: masterClient, + FilerGroup: option.FilerGroup, + DiscoveryInterval: 5 * time.Minute, + }) + glog.V(0).Infof("S3 API initialized FilerClient with %d filer(s) and discovery enabled (group: %s, masters: %v)", + len(option.Filers), option.FilerGroup, option.Masters) + } else { + filerClient = wdclient.NewFilerClient(option.Filers, option.GrpcDialOption, option.DataCenter) + glog.V(0).Infof("S3 API initialized FilerClient with %d filer(s) (no discovery)", len(option.Filers)) + } + + // Update credential store to use FilerClient's current filer for HA + if store := iam.credentialManager.GetStore(); store != nil { + if filerFuncSetter, ok := store.(interface { + SetFilerAddressFunc(func() pb.ServerAddress, grpc.DialOption) + }); ok { + // Use FilerClient's GetCurrentFiler for true HA + filerFuncSetter.SetFilerAddressFunc(filerClient.GetCurrentFiler, option.GrpcDialOption) + glog.V(1).Infof("Updated credential store to use FilerClient's current active filer (HA-aware)") + } + } s3ApiServer = &S3ApiServer{ option: option, @@ -119,14 +154,16 @@ func NewS3ApiServerWithStore(router *mux.Router, option *S3ApiServerOption, expl if option.IamConfig != "" { glog.V(1).Infof("Loading advanced IAM configuration from: %s", option.IamConfig) + // Use FilerClient's GetCurrentFiler for HA-aware filer selection iamManager, err := loadIAMManagerFromConfig(option.IamConfig, func() string { - return string(option.Filer) + return string(filerClient.GetCurrentFiler()) }) if err != nil { glog.Errorf("Failed to load IAM configuration: %v", err) } else { // Create S3 IAM integration with the loaded IAM manager - s3iam := NewS3IAMIntegration(iamManager, string(option.Filer)) + // filerAddress not actually used, just for backward compatibility + s3iam := NewS3IAMIntegration(iamManager, "") // Set IAM integration in server s3ApiServer.iamIntegration = s3iam @@ -134,7 +171,7 @@ func NewS3ApiServerWithStore(router *mux.Router, option *S3ApiServerOption, expl // Set the integration in the traditional IAM for compatibility iam.SetIAMIntegration(s3iam) - glog.V(1).Infof("Advanced IAM system initialized successfully") + glog.V(1).Infof("Advanced IAM system initialized successfully with HA filer support") } } @@ -173,6 +210,21 @@ func NewS3ApiServerWithStore(router *mux.Router, option *S3ApiServerOption, expl return s3ApiServer, nil } +// getFilerAddress returns the current active filer address +// Uses FilerClient's tracked current filer which is updated on successful operations +// This provides better availability than always using the first filer +func (s3a *S3ApiServer) getFilerAddress() pb.ServerAddress { + if s3a.filerClient != nil { + return s3a.filerClient.GetCurrentFiler() + } + // Fallback to first filer if filerClient not initialized + if len(s3a.option.Filers) > 0 { + return s3a.option.Filers[0] + } + glog.Warningf("getFilerAddress: no filer addresses available") + return "" +} + // syncBucketPolicyToEngine syncs a bucket policy to the policy engine // This helper method centralizes the logic for loading bucket policies into the engine // to avoid duplication and ensure consistent error handling |
