aboutsummaryrefslogtreecommitdiff
path: root/weed/s3api/s3api_server.go
diff options
context:
space:
mode:
Diffstat (limited to 'weed/s3api/s3api_server.go')
-rw-r--r--weed/s3api/s3api_server.go78
1 files changed, 65 insertions, 13 deletions
diff --git a/weed/s3api/s3api_server.go b/weed/s3api/s3api_server.go
index 992027fda..dcf3a55f2 100644
--- a/weed/s3api/s3api_server.go
+++ b/weed/s3api/s3api_server.go
@@ -11,29 +11,31 @@ import (
"strings"
"time"
+ "github.com/gorilla/mux"
+ "google.golang.org/grpc"
+
+ "github.com/seaweedfs/seaweedfs/weed/cluster"
"github.com/seaweedfs/seaweedfs/weed/credential"
"github.com/seaweedfs/seaweedfs/weed/filer"
"github.com/seaweedfs/seaweedfs/weed/glog"
"github.com/seaweedfs/seaweedfs/weed/iam/integration"
"github.com/seaweedfs/seaweedfs/weed/iam/policy"
"github.com/seaweedfs/seaweedfs/weed/iam/sts"
- "github.com/seaweedfs/seaweedfs/weed/pb/s3_pb"
- "github.com/seaweedfs/seaweedfs/weed/util/grace"
- "github.com/seaweedfs/seaweedfs/weed/wdclient"
-
- "github.com/gorilla/mux"
"github.com/seaweedfs/seaweedfs/weed/pb"
+ "github.com/seaweedfs/seaweedfs/weed/pb/s3_pb"
. "github.com/seaweedfs/seaweedfs/weed/s3api/s3_constants"
"github.com/seaweedfs/seaweedfs/weed/s3api/s3err"
"github.com/seaweedfs/seaweedfs/weed/security"
"github.com/seaweedfs/seaweedfs/weed/util"
+ "github.com/seaweedfs/seaweedfs/weed/util/grace"
util_http "github.com/seaweedfs/seaweedfs/weed/util/http"
util_http_client "github.com/seaweedfs/seaweedfs/weed/util/http/client"
- "google.golang.org/grpc"
+ "github.com/seaweedfs/seaweedfs/weed/wdclient"
)
type S3ApiServerOption struct {
- Filer pb.ServerAddress
+ Filers []pb.ServerAddress
+ Masters []pb.ServerAddress // For filer discovery
Port int
Config string
DomainName string
@@ -69,6 +71,10 @@ func NewS3ApiServer(router *mux.Router, option *S3ApiServerOption) (s3ApiServer
}
func NewS3ApiServerWithStore(router *mux.Router, option *S3ApiServerOption, explicitStore string) (s3ApiServer *S3ApiServer, err error) {
+ if len(option.Filers) == 0 {
+ return nil, fmt.Errorf("at least one filer address is required")
+ }
+
startTsNs := time.Now().UnixNano()
v := util.GetViper()
@@ -95,9 +101,38 @@ func NewS3ApiServerWithStore(router *mux.Router, option *S3ApiServerOption, expl
// Initialize FilerClient for volume location caching
// Uses the battle-tested vidMap with filer-based lookups
- // S3 API typically connects to a single filer, but wrap in slice for consistency
- filerClient := wdclient.NewFilerClient([]pb.ServerAddress{option.Filer}, option.GrpcDialOption, option.DataCenter)
- glog.V(0).Infof("S3 API initialized FilerClient for volume location caching")
+ // Supports multiple filer addresses with automatic failover for high availability
+ var filerClient *wdclient.FilerClient
+ if len(option.Masters) > 0 && option.FilerGroup != "" {
+ // Enable filer discovery via master
+ masterMap := make(map[string]pb.ServerAddress)
+ for i, addr := range option.Masters {
+ masterMap[fmt.Sprintf("master%d", i)] = addr
+ }
+ masterClient := wdclient.NewMasterClient(option.GrpcDialOption, option.FilerGroup, cluster.S3Type, "", "", "", *pb.NewServiceDiscoveryFromMap(masterMap))
+
+ filerClient = wdclient.NewFilerClient(option.Filers, option.GrpcDialOption, option.DataCenter, &wdclient.FilerClientOption{
+ MasterClient: masterClient,
+ FilerGroup: option.FilerGroup,
+ DiscoveryInterval: 5 * time.Minute,
+ })
+ glog.V(0).Infof("S3 API initialized FilerClient with %d filer(s) and discovery enabled (group: %s, masters: %v)",
+ len(option.Filers), option.FilerGroup, option.Masters)
+ } else {
+ filerClient = wdclient.NewFilerClient(option.Filers, option.GrpcDialOption, option.DataCenter)
+ glog.V(0).Infof("S3 API initialized FilerClient with %d filer(s) (no discovery)", len(option.Filers))
+ }
+
+ // Update credential store to use FilerClient's current filer for HA
+ if store := iam.credentialManager.GetStore(); store != nil {
+ if filerFuncSetter, ok := store.(interface {
+ SetFilerAddressFunc(func() pb.ServerAddress, grpc.DialOption)
+ }); ok {
+ // Use FilerClient's GetCurrentFiler for true HA
+ filerFuncSetter.SetFilerAddressFunc(filerClient.GetCurrentFiler, option.GrpcDialOption)
+ glog.V(1).Infof("Updated credential store to use FilerClient's current active filer (HA-aware)")
+ }
+ }
s3ApiServer = &S3ApiServer{
option: option,
@@ -119,14 +154,16 @@ func NewS3ApiServerWithStore(router *mux.Router, option *S3ApiServerOption, expl
if option.IamConfig != "" {
glog.V(1).Infof("Loading advanced IAM configuration from: %s", option.IamConfig)
+ // Use FilerClient's GetCurrentFiler for HA-aware filer selection
iamManager, err := loadIAMManagerFromConfig(option.IamConfig, func() string {
- return string(option.Filer)
+ return string(filerClient.GetCurrentFiler())
})
if err != nil {
glog.Errorf("Failed to load IAM configuration: %v", err)
} else {
// Create S3 IAM integration with the loaded IAM manager
- s3iam := NewS3IAMIntegration(iamManager, string(option.Filer))
+ // filerAddress not actually used, just for backward compatibility
+ s3iam := NewS3IAMIntegration(iamManager, "")
// Set IAM integration in server
s3ApiServer.iamIntegration = s3iam
@@ -134,7 +171,7 @@ func NewS3ApiServerWithStore(router *mux.Router, option *S3ApiServerOption, expl
// Set the integration in the traditional IAM for compatibility
iam.SetIAMIntegration(s3iam)
- glog.V(1).Infof("Advanced IAM system initialized successfully")
+ glog.V(1).Infof("Advanced IAM system initialized successfully with HA filer support")
}
}
@@ -173,6 +210,21 @@ func NewS3ApiServerWithStore(router *mux.Router, option *S3ApiServerOption, expl
return s3ApiServer, nil
}
+// getFilerAddress returns the current active filer address
+// Uses FilerClient's tracked current filer which is updated on successful operations
+// This provides better availability than always using the first filer
+func (s3a *S3ApiServer) getFilerAddress() pb.ServerAddress {
+ if s3a.filerClient != nil {
+ return s3a.filerClient.GetCurrentFiler()
+ }
+ // Fallback to first filer if filerClient not initialized
+ if len(s3a.option.Filers) > 0 {
+ return s3a.option.Filers[0]
+ }
+ glog.Warningf("getFilerAddress: no filer addresses available")
+ return ""
+}
+
// syncBucketPolicyToEngine syncs a bucket policy to the policy engine
// This helper method centralizes the logic for loading bucket policies into the engine
// to avoid duplication and ensure consistent error handling