aboutsummaryrefslogtreecommitdiff
path: root/weed/s3api
diff options
context:
space:
mode:
Diffstat (limited to 'weed/s3api')
-rw-r--r--weed/s3api/auth_credentials.go21
-rw-r--r--weed/s3api/filer_multipart.go25
-rw-r--r--weed/s3api/s3api_bucket_handlers.go7
-rw-r--r--weed/s3api/s3api_circuit_breaker.go4
-rw-r--r--weed/s3api/s3api_handlers.go68
-rw-r--r--weed/s3api/s3api_object_handlers.go10
-rw-r--r--weed/s3api/s3api_object_handlers_multipart.go12
-rw-r--r--weed/s3api/s3api_object_handlers_postpolicy.go4
-rw-r--r--weed/s3api/s3api_object_handlers_put.go37
-rw-r--r--weed/s3api/s3api_object_handlers_test.go2
-rw-r--r--weed/s3api/s3api_object_versioning.go6
-rw-r--r--weed/s3api/s3api_server.go78
12 files changed, 206 insertions, 68 deletions
diff --git a/weed/s3api/auth_credentials.go b/weed/s3api/auth_credentials.go
index cebcd17f5..148839d3e 100644
--- a/weed/s3api/auth_credentials.go
+++ b/weed/s3api/auth_credentials.go
@@ -14,6 +14,7 @@ import (
"github.com/seaweedfs/seaweedfs/weed/filer"
"github.com/seaweedfs/seaweedfs/weed/glog"
"github.com/seaweedfs/seaweedfs/weed/kms"
+ "github.com/seaweedfs/seaweedfs/weed/pb"
"github.com/seaweedfs/seaweedfs/weed/pb/filer_pb"
"github.com/seaweedfs/seaweedfs/weed/pb/iam_pb"
"github.com/seaweedfs/seaweedfs/weed/s3api/s3_constants"
@@ -136,12 +137,24 @@ func NewIdentityAccessManagementWithStore(option *S3ApiServerOption, explicitSto
glog.Fatalf("failed to initialize credential manager: %v", err)
}
- // For stores that need filer client details, set them
+ // For stores that need filer client details, set them temporarily
+ // This will be updated to use FilerClient's GetCurrentFiler after FilerClient is created
if store := credentialManager.GetStore(); store != nil {
- if filerClientSetter, ok := store.(interface {
- SetFilerClient(string, grpc.DialOption)
+ if filerFuncSetter, ok := store.(interface {
+ SetFilerAddressFunc(func() pb.ServerAddress, grpc.DialOption)
}); ok {
- filerClientSetter.SetFilerClient(string(option.Filer), option.GrpcDialOption)
+ // Temporary setup: use first filer until FilerClient is available
+ // See s3api_server.go where this is updated to FilerClient.GetCurrentFiler
+ if len(option.Filers) > 0 {
+ getFiler := func() pb.ServerAddress {
+ if len(option.Filers) > 0 {
+ return option.Filers[0]
+ }
+ return ""
+ }
+ filerFuncSetter.SetFilerAddressFunc(getFiler, option.GrpcDialOption)
+ glog.V(1).Infof("Credential store configured with temporary filer function (will be updated after FilerClient creation)")
+ }
}
}
diff --git a/weed/s3api/filer_multipart.go b/weed/s3api/filer_multipart.go
index 4b8fbaa62..1e4635ead 100644
--- a/weed/s3api/filer_multipart.go
+++ b/weed/s3api/filer_multipart.go
@@ -10,6 +10,7 @@ import (
"errors"
"fmt"
"math"
+ "net/url"
"path/filepath"
"slices"
"sort"
@@ -42,6 +43,20 @@ type InitiateMultipartUploadResult struct {
s3.CreateMultipartUploadOutput
}
+// getRequestScheme determines the URL scheme (http or https) from the request
+// Checks X-Forwarded-Proto header first (for proxies), then TLS state
+func getRequestScheme(r *http.Request) string {
+ // Check X-Forwarded-Proto header for proxied requests
+ if proto := r.Header.Get("X-Forwarded-Proto"); proto != "" {
+ return proto
+ }
+ // Check if connection is TLS
+ if r.TLS != nil {
+ return "https"
+ }
+ return "http"
+}
+
func (s3a *S3ApiServer) createMultipartUpload(r *http.Request, input *s3.CreateMultipartUploadInput) (output *InitiateMultipartUploadResult, code s3err.ErrorCode) {
glog.V(2).Infof("createMultipartUpload input %v", input)
@@ -183,8 +198,10 @@ func (s3a *S3ApiServer) completeMultipartUpload(r *http.Request, input *s3.Compl
entryName, dirName := s3a.getEntryNameAndDir(input)
if entry, _ := s3a.getEntry(dirName, entryName); entry != nil && entry.Extended != nil {
if uploadId, ok := entry.Extended[s3_constants.SeaweedFSUploadId]; ok && *input.UploadId == string(uploadId) {
+ // Location uses the S3 endpoint that the client connected to
+ // Format: scheme://s3-endpoint/bucket/object (following AWS S3 API)
return &CompleteMultipartUploadResult{
- Location: aws.String(fmt.Sprintf("http://%s%s/%s", s3a.option.Filer.ToHttpAddress(), urlEscapeObject(dirName), urlPathEscape(entryName))),
+ Location: aws.String(fmt.Sprintf("%s://%s/%s/%s", getRequestScheme(r), r.Host, url.PathEscape(*input.Bucket), urlPathEscape(*input.Key))),
Bucket: input.Bucket,
ETag: aws.String("\"" + filer.ETagChunks(entry.GetChunks()) + "\""),
Key: objectKey(input.Key),
@@ -401,7 +418,7 @@ func (s3a *S3ApiServer) completeMultipartUpload(r *http.Request, input *s3.Compl
// The latest version information is tracked in the .versions directory metadata
output = &CompleteMultipartUploadResult{
- Location: aws.String(fmt.Sprintf("http://%s%s/%s", s3a.option.Filer.ToHttpAddress(), urlEscapeObject(dirName), urlPathEscape(entryName))),
+ Location: aws.String(fmt.Sprintf("%s://%s/%s/%s", getRequestScheme(r), r.Host, url.PathEscape(*input.Bucket), urlPathEscape(*input.Key))),
Bucket: input.Bucket,
ETag: aws.String("\"" + filer.ETagChunks(finalParts) + "\""),
Key: objectKey(input.Key),
@@ -454,7 +471,7 @@ func (s3a *S3ApiServer) completeMultipartUpload(r *http.Request, input *s3.Compl
// Note: Suspended versioning should NOT return VersionId field according to AWS S3 spec
output = &CompleteMultipartUploadResult{
- Location: aws.String(fmt.Sprintf("http://%s%s/%s", s3a.option.Filer.ToHttpAddress(), urlEscapeObject(dirName), urlPathEscape(entryName))),
+ Location: aws.String(fmt.Sprintf("%s://%s/%s/%s", getRequestScheme(r), r.Host, url.PathEscape(*input.Bucket), urlPathEscape(*input.Key))),
Bucket: input.Bucket,
ETag: aws.String("\"" + filer.ETagChunks(finalParts) + "\""),
Key: objectKey(input.Key),
@@ -511,7 +528,7 @@ func (s3a *S3ApiServer) completeMultipartUpload(r *http.Request, input *s3.Compl
// For non-versioned buckets, return response without VersionId
output = &CompleteMultipartUploadResult{
- Location: aws.String(fmt.Sprintf("http://%s%s/%s", s3a.option.Filer.ToHttpAddress(), urlEscapeObject(dirName), urlPathEscape(entryName))),
+ Location: aws.String(fmt.Sprintf("%s://%s/%s/%s", getRequestScheme(r), r.Host, url.PathEscape(*input.Bucket), urlPathEscape(*input.Key))),
Bucket: input.Bucket,
ETag: aws.String("\"" + filer.ETagChunks(finalParts) + "\""),
Key: objectKey(input.Key),
diff --git a/weed/s3api/s3api_bucket_handlers.go b/weed/s3api/s3api_bucket_handlers.go
index d73fabd2f..f0704fe23 100644
--- a/weed/s3api/s3api_bucket_handlers.go
+++ b/weed/s3api/s3api_bucket_handlers.go
@@ -877,7 +877,8 @@ func (s3a *S3ApiServer) GetBucketLifecycleConfigurationHandler(w http.ResponseWr
s3err.WriteErrorResponse(w, r, err)
return
}
- fc, err := filer.ReadFilerConf(s3a.option.Filer, s3a.option.GrpcDialOption, nil)
+ // ReadFilerConfFromFilers provides multi-filer failover
+ fc, err := filer.ReadFilerConfFromFilers(s3a.option.Filers, s3a.option.GrpcDialOption, nil)
if err != nil {
glog.Errorf("GetBucketLifecycleConfigurationHandler: %s", err)
s3err.WriteErrorResponse(w, r, s3err.ErrInternalError)
@@ -938,7 +939,7 @@ func (s3a *S3ApiServer) PutBucketLifecycleConfigurationHandler(w http.ResponseWr
return
}
- fc, err := filer.ReadFilerConf(s3a.option.Filer, s3a.option.GrpcDialOption, nil)
+ fc, err := filer.ReadFilerConfFromFilers(s3a.option.Filers, s3a.option.GrpcDialOption, nil)
if err != nil {
glog.Errorf("PutBucketLifecycleConfigurationHandler read filer config: %s", err)
s3err.WriteErrorResponse(w, r, s3err.ErrInternalError)
@@ -1020,7 +1021,7 @@ func (s3a *S3ApiServer) DeleteBucketLifecycleHandler(w http.ResponseWriter, r *h
return
}
- fc, err := filer.ReadFilerConf(s3a.option.Filer, s3a.option.GrpcDialOption, nil)
+ fc, err := filer.ReadFilerConfFromFilers(s3a.option.Filers, s3a.option.GrpcDialOption, nil)
if err != nil {
glog.Errorf("DeleteBucketLifecycleHandler read filer config: %s", err)
s3err.WriteErrorResponse(w, r, s3err.ErrInternalError)
diff --git a/weed/s3api/s3api_circuit_breaker.go b/weed/s3api/s3api_circuit_breaker.go
index 47efa728a..2f5e1f580 100644
--- a/weed/s3api/s3api_circuit_breaker.go
+++ b/weed/s3api/s3api_circuit_breaker.go
@@ -29,7 +29,8 @@ func NewCircuitBreaker(option *S3ApiServerOption) *CircuitBreaker {
limitations: make(map[string]int64),
}
- err := pb.WithFilerClient(false, 0, option.Filer, option.GrpcDialOption, func(client filer_pb.SeaweedFilerClient) error {
+ // Use WithOneOfGrpcFilerClients to support multiple filers with failover
+ err := pb.WithOneOfGrpcFilerClients(false, option.Filers, option.GrpcDialOption, func(client filer_pb.SeaweedFilerClient) error {
content, err := filer.ReadInsideFiler(client, s3_constants.CircuitBreakerConfigDir, s3_constants.CircuitBreakerConfigFile)
if errors.Is(err, filer_pb.ErrNotFound) {
return nil
@@ -41,6 +42,7 @@ func NewCircuitBreaker(option *S3ApiServerOption) *CircuitBreaker {
})
if err != nil {
+ glog.Warningf("S3 circuit breaker disabled; failed to load config from any filer: %v", err)
}
return cb
diff --git a/weed/s3api/s3api_handlers.go b/weed/s3api/s3api_handlers.go
index c146a8b15..6c47e8256 100644
--- a/weed/s3api/s3api_handlers.go
+++ b/weed/s3api/s3api_handlers.go
@@ -5,6 +5,7 @@ import (
"fmt"
"net/http"
+ "github.com/seaweedfs/seaweedfs/weed/glog"
"github.com/seaweedfs/seaweedfs/weed/s3api/s3err"
"google.golang.org/grpc"
@@ -15,12 +16,75 @@ import (
var _ = filer_pb.FilerClient(&S3ApiServer{})
func (s3a *S3ApiServer) WithFilerClient(streamingMode bool, fn func(filer_pb.SeaweedFilerClient) error) error {
-
+ // Use filerClient for proper connection management and failover
+ if s3a.filerClient != nil {
+ return s3a.withFilerClientFailover(streamingMode, fn)
+ }
+
+ // Fallback to direct connection if filerClient not initialized
+ // This should only happen during initialization or testing
return pb.WithGrpcClient(streamingMode, s3a.randomClientId, func(grpcConnection *grpc.ClientConn) error {
client := filer_pb.NewSeaweedFilerClient(grpcConnection)
return fn(client)
- }, s3a.option.Filer.ToGrpcAddress(), false, s3a.option.GrpcDialOption)
+ }, s3a.getFilerAddress().ToGrpcAddress(), false, s3a.option.GrpcDialOption)
+
+}
+// withFilerClientFailover attempts to execute fn with automatic failover to other filers
+func (s3a *S3ApiServer) withFilerClientFailover(streamingMode bool, fn func(filer_pb.SeaweedFilerClient) error) error {
+ // Get current filer as starting point
+ currentFiler := s3a.filerClient.GetCurrentFiler()
+
+ // Try current filer first (fast path)
+ err := pb.WithGrpcClient(streamingMode, s3a.randomClientId, func(grpcConnection *grpc.ClientConn) error {
+ client := filer_pb.NewSeaweedFilerClient(grpcConnection)
+ return fn(client)
+ }, currentFiler.ToGrpcAddress(), false, s3a.option.GrpcDialOption)
+
+ if err == nil {
+ s3a.filerClient.RecordFilerSuccess(currentFiler)
+ return nil
+ }
+
+ // Record failure for current filer
+ s3a.filerClient.RecordFilerFailure(currentFiler)
+
+ // Current filer failed - try all other filers with health-aware selection
+ filers := s3a.filerClient.GetAllFilers()
+ var lastErr error = err
+
+ for _, filer := range filers {
+ if filer == currentFiler {
+ continue // Already tried this one
+ }
+
+ // Skip filers known to be unhealthy (circuit breaker pattern)
+ if s3a.filerClient.ShouldSkipUnhealthyFiler(filer) {
+ glog.V(2).Infof("WithFilerClient: skipping unhealthy filer %s", filer)
+ continue
+ }
+
+ err = pb.WithGrpcClient(streamingMode, s3a.randomClientId, func(grpcConnection *grpc.ClientConn) error {
+ client := filer_pb.NewSeaweedFilerClient(grpcConnection)
+ return fn(client)
+ }, filer.ToGrpcAddress(), false, s3a.option.GrpcDialOption)
+
+ if err == nil {
+ // Success! Record success and update current filer for future requests
+ s3a.filerClient.RecordFilerSuccess(filer)
+ s3a.filerClient.SetCurrentFiler(filer)
+ glog.V(1).Infof("WithFilerClient: failover from %s to %s succeeded", currentFiler, filer)
+ return nil
+ }
+
+ // Record failure for health tracking
+ s3a.filerClient.RecordFilerFailure(filer)
+ glog.V(2).Infof("WithFilerClient: failover to %s failed: %v", filer, err)
+ lastErr = err
+ }
+
+ // All filers failed
+ return fmt.Errorf("all filers failed, last error: %w", lastErr)
}
func (s3a *S3ApiServer) AdjustedUrl(location *filer_pb.Location) string {
diff --git a/weed/s3api/s3api_object_handlers.go b/weed/s3api/s3api_object_handlers.go
index cd0e82421..b1446c3e7 100644
--- a/weed/s3api/s3api_object_handlers.go
+++ b/weed/s3api/s3api_object_handlers.go
@@ -404,11 +404,11 @@ func newListEntry(entry *filer_pb.Entry, key string, dir string, name string, bu
return listEntry
}
-func (s3a *S3ApiServer) toFilerUrl(bucket, object string) string {
- object = urlPathEscape(removeDuplicateSlashes(object))
- destUrl := fmt.Sprintf("http://%s%s/%s%s",
- s3a.option.Filer.ToHttpAddress(), s3a.option.BucketsPath, bucket, object)
- return destUrl
+func (s3a *S3ApiServer) toFilerPath(bucket, object string) string {
+ // Returns the raw file path - no URL escaping needed
+ // The path is used directly, not embedded in a URL
+ object = removeDuplicateSlashes(object)
+ return fmt.Sprintf("%s/%s%s", s3a.option.BucketsPath, bucket, object)
}
// hasConditionalHeaders checks if the request has any conditional headers
diff --git a/weed/s3api/s3api_object_handlers_multipart.go b/weed/s3api/s3api_object_handlers_multipart.go
index 2d9f8e620..becbd9bf9 100644
--- a/weed/s3api/s3api_object_handlers_multipart.go
+++ b/weed/s3api/s3api_object_handlers_multipart.go
@@ -404,7 +404,7 @@ func (s3a *S3ApiServer) PutObjectPartHandler(w http.ResponseWriter, r *http.Requ
}
}
- uploadUrl := s3a.genPartUploadUrl(bucket, uploadID, partID)
+ filePath := s3a.genPartUploadPath(bucket, uploadID, partID)
if partID == 1 && r.Header.Get("Content-Type") == "" {
dataReader = mimeDetect(r, dataReader)
@@ -413,7 +413,7 @@ func (s3a *S3ApiServer) PutObjectPartHandler(w http.ResponseWriter, r *http.Requ
glog.V(2).Infof("PutObjectPart: bucket=%s, object=%s, uploadId=%s, partNumber=%d, size=%d",
bucket, object, uploadID, partID, r.ContentLength)
- etag, errCode, sseMetadata := s3a.putToFiler(r, uploadUrl, dataReader, bucket, partID)
+ etag, errCode, sseMetadata := s3a.putToFiler(r, filePath, dataReader, bucket, partID)
if errCode != s3err.ErrNone {
glog.Errorf("PutObjectPart: putToFiler failed with error code %v for bucket=%s, object=%s, partNumber=%d",
errCode, bucket, object, partID)
@@ -437,9 +437,11 @@ func (s3a *S3ApiServer) genUploadsFolder(bucket string) string {
return fmt.Sprintf("%s/%s/%s", s3a.option.BucketsPath, bucket, s3_constants.MultipartUploadsFolder)
}
-func (s3a *S3ApiServer) genPartUploadUrl(bucket, uploadID string, partID int) string {
- return fmt.Sprintf("http://%s%s/%s/%04d_%s.part",
- s3a.option.Filer.ToHttpAddress(), s3a.genUploadsFolder(bucket), uploadID, partID, uuid.NewString())
+func (s3a *S3ApiServer) genPartUploadPath(bucket, uploadID string, partID int) string {
+ // Returns just the file path - no filer address needed
+ // Upload traffic goes directly to volume servers, not through filer
+ return fmt.Sprintf("%s/%s/%04d_%s.part",
+ s3a.genUploadsFolder(bucket), uploadID, partID, uuid.NewString())
}
// Generate uploadID hash string from object
diff --git a/weed/s3api/s3api_object_handlers_postpolicy.go b/weed/s3api/s3api_object_handlers_postpolicy.go
index ecb2ac8d1..3ec6147f5 100644
--- a/weed/s3api/s3api_object_handlers_postpolicy.go
+++ b/weed/s3api/s3api_object_handlers_postpolicy.go
@@ -114,7 +114,7 @@ func (s3a *S3ApiServer) PostPolicyBucketHandler(w http.ResponseWriter, r *http.R
}
}
- uploadUrl := fmt.Sprintf("http://%s%s/%s%s", s3a.option.Filer.ToHttpAddress(), s3a.option.BucketsPath, bucket, urlEscapeObject(object))
+ filePath := fmt.Sprintf("%s/%s%s", s3a.option.BucketsPath, bucket, object)
// Get ContentType from post formData
// Otherwise from formFile ContentType
@@ -136,7 +136,7 @@ func (s3a *S3ApiServer) PostPolicyBucketHandler(w http.ResponseWriter, r *http.R
}
}
- etag, errCode, sseMetadata := s3a.putToFiler(r, uploadUrl, fileBody, bucket, 1)
+ etag, errCode, sseMetadata := s3a.putToFiler(r, filePath, fileBody, bucket, 1)
if errCode != s3err.ErrNone {
s3err.WriteErrorResponse(w, r, errCode)
diff --git a/weed/s3api/s3api_object_handlers_put.go b/weed/s3api/s3api_object_handlers_put.go
index 540a1e512..100796b2e 100644
--- a/weed/s3api/s3api_object_handlers_put.go
+++ b/weed/s3api/s3api_object_handlers_put.go
@@ -8,7 +8,6 @@ import (
"fmt"
"io"
"net/http"
- "net/url"
"path/filepath"
"strconv"
"strings"
@@ -223,12 +222,12 @@ func (s3a *S3ApiServer) PutObjectHandler(w http.ResponseWriter, r *http.Request)
s3a.setSSEResponseHeaders(w, r, sseMetadata)
default:
// Handle regular PUT (never configured versioning)
- uploadUrl := s3a.toFilerUrl(bucket, object)
+ filePath := s3a.toFilerPath(bucket, object)
if objectContentType == "" {
dataReader = mimeDetect(r, dataReader)
}
- etag, errCode, sseMetadata := s3a.putToFiler(r, uploadUrl, dataReader, bucket, 1)
+ etag, errCode, sseMetadata := s3a.putToFiler(r, filePath, dataReader, bucket, 1)
if errCode != s3err.ErrNone {
s3err.WriteErrorResponse(w, r, errCode)
@@ -248,9 +247,10 @@ func (s3a *S3ApiServer) PutObjectHandler(w http.ResponseWriter, r *http.Request)
writeSuccessResponseEmpty(w, r)
}
-func (s3a *S3ApiServer) putToFiler(r *http.Request, uploadUrl string, dataReader io.Reader, bucket string, partNumber int) (etag string, code s3err.ErrorCode, sseMetadata SSEResponseMetadata) {
+func (s3a *S3ApiServer) putToFiler(r *http.Request, filePath string, dataReader io.Reader, bucket string, partNumber int) (etag string, code s3err.ErrorCode, sseMetadata SSEResponseMetadata) {
// NEW OPTIMIZATION: Write directly to volume servers, bypassing filer proxy
// This eliminates the filer proxy overhead for PUT operations
+ // Note: filePath is now passed directly instead of URL (no parsing needed)
// For SSE, encrypt with offset=0 for all parts
// Each part is encrypted independently, then decrypted using metadata during GET
@@ -311,20 +311,7 @@ func (s3a *S3ApiServer) putToFiler(r *http.Request, uploadUrl string, dataReader
glog.V(4).Infof("putToFiler: explicit encryption already applied, skipping bucket default encryption")
}
- // Parse the upload URL to extract the file path
- // uploadUrl format: http://filer:8888/path/to/bucket/object (or https://, IPv6, etc.)
- // Use proper URL parsing instead of string manipulation for robustness
- parsedUrl, parseErr := url.Parse(uploadUrl)
- if parseErr != nil {
- glog.Errorf("putToFiler: failed to parse uploadUrl %q: %v", uploadUrl, parseErr)
- return "", s3err.ErrInternalError, SSEResponseMetadata{}
- }
-
- // Use parsedUrl.Path directly - it's already decoded by url.Parse()
- // Per Go documentation: "Path is stored in decoded form: /%47%6f%2f becomes /Go/"
- // Calling PathUnescape again would double-decode and fail on keys like "b%ar"
- filePath := parsedUrl.Path
-
+ // filePath is already provided directly - no URL parsing needed
// Step 1 & 2: Use auto-chunking to handle large files without OOM
// This splits large uploads into 8MB chunks, preventing memory issues on both S3 API and volume servers
const chunkSize = 8 * 1024 * 1024 // 8MB chunks (S3 standard)
@@ -743,7 +730,7 @@ func (s3a *S3ApiServer) setObjectOwnerFromRequest(r *http.Request, entry *filer_
// For suspended versioning, objects are stored as regular files (version ID "null") in the bucket directory,
// while existing versions from when versioning was enabled remain preserved in the .versions subdirectory.
func (s3a *S3ApiServer) putSuspendedVersioningObject(r *http.Request, bucket, object string, dataReader io.Reader, objectContentType string) (etag string, errCode s3err.ErrorCode, sseMetadata SSEResponseMetadata) {
- // Normalize object path to ensure consistency with toFilerUrl behavior
+ // Normalize object path to ensure consistency with toFilerPath behavior
normalizedObject := removeDuplicateSlashes(object)
glog.V(3).Infof("putSuspendedVersioningObject: START bucket=%s, object=%s, normalized=%s",
@@ -783,7 +770,7 @@ func (s3a *S3ApiServer) putSuspendedVersioningObject(r *http.Request, bucket, ob
glog.V(3).Infof("putSuspendedVersioningObject: no .versions directory for %s/%s", bucket, object)
}
- uploadUrl := s3a.toFilerUrl(bucket, normalizedObject)
+ filePath := s3a.toFilerPath(bucket, normalizedObject)
body := dataReader
if objectContentType == "" {
@@ -846,7 +833,7 @@ func (s3a *S3ApiServer) putSuspendedVersioningObject(r *http.Request, bucket, ob
}
// Upload the file using putToFiler - this will create the file with version metadata
- etag, errCode, sseMetadata = s3a.putToFiler(r, uploadUrl, body, bucket, 1)
+ etag, errCode, sseMetadata = s3a.putToFiler(r, filePath, body, bucket, 1)
if errCode != s3err.ErrNone {
glog.Errorf("putSuspendedVersioningObject: failed to upload object: %v", errCode)
return "", errCode, SSEResponseMetadata{}
@@ -937,7 +924,7 @@ func (s3a *S3ApiServer) putVersionedObject(r *http.Request, bucket, object strin
// Generate version ID
versionId = generateVersionId()
- // Normalize object path to ensure consistency with toFilerUrl behavior
+ // Normalize object path to ensure consistency with toFilerPath behavior
normalizedObject := removeDuplicateSlashes(object)
glog.V(2).Infof("putVersionedObject: creating version %s for %s/%s (normalized: %s)", versionId, bucket, object, normalizedObject)
@@ -948,7 +935,7 @@ func (s3a *S3ApiServer) putVersionedObject(r *http.Request, bucket, object strin
// Upload directly to the versions directory
// We need to construct the object path relative to the bucket
versionObjectPath := normalizedObject + s3_constants.VersionsFolder + "/" + versionFileName
- versionUploadUrl := s3a.toFilerUrl(bucket, versionObjectPath)
+ versionFilePath := s3a.toFilerPath(bucket, versionObjectPath)
// Ensure the .versions directory exists before uploading
bucketDir := s3a.option.BucketsPath + "/" + bucket
@@ -966,9 +953,9 @@ func (s3a *S3ApiServer) putVersionedObject(r *http.Request, bucket, object strin
body = mimeDetect(r, body)
}
- glog.V(2).Infof("putVersionedObject: uploading %s/%s version %s to %s", bucket, object, versionId, versionUploadUrl)
+ glog.V(2).Infof("putVersionedObject: uploading %s/%s version %s to %s", bucket, object, versionId, versionFilePath)
- etag, errCode, sseMetadata = s3a.putToFiler(r, versionUploadUrl, body, bucket, 1)
+ etag, errCode, sseMetadata = s3a.putToFiler(r, versionFilePath, body, bucket, 1)
if errCode != s3err.ErrNone {
glog.Errorf("putVersionedObject: failed to upload version: %v", errCode)
return "", "", errCode, SSEResponseMetadata{}
diff --git a/weed/s3api/s3api_object_handlers_test.go b/weed/s3api/s3api_object_handlers_test.go
index cf650a36e..a6592ad4b 100644
--- a/weed/s3api/s3api_object_handlers_test.go
+++ b/weed/s3api/s3api_object_handlers_test.go
@@ -114,7 +114,7 @@ func TestRemoveDuplicateSlashes(t *testing.T) {
}
}
-func TestS3ApiServer_toFilerUrl(t *testing.T) {
+func TestS3ApiServer_toFilerPath(t *testing.T) {
tests := []struct {
name string
args string
diff --git a/weed/s3api/s3api_object_versioning.go b/weed/s3api/s3api_object_versioning.go
index 1c1dbee03..bbc43f205 100644
--- a/weed/s3api/s3api_object_versioning.go
+++ b/weed/s3api/s3api_object_versioning.go
@@ -607,7 +607,7 @@ func (s3a *S3ApiServer) calculateETagFromChunks(chunks []*filer_pb.FileChunk) st
// getSpecificObjectVersion retrieves a specific version of an object
func (s3a *S3ApiServer) getSpecificObjectVersion(bucket, object, versionId string) (*filer_pb.Entry, error) {
- // Normalize object path to ensure consistency with toFilerUrl behavior
+ // Normalize object path to ensure consistency with toFilerPath behavior
normalizedObject := removeDuplicateSlashes(object)
if versionId == "" {
@@ -639,7 +639,7 @@ func (s3a *S3ApiServer) getSpecificObjectVersion(bucket, object, versionId strin
// deleteSpecificObjectVersion deletes a specific version of an object
func (s3a *S3ApiServer) deleteSpecificObjectVersion(bucket, object, versionId string) error {
- // Normalize object path to ensure consistency with toFilerUrl behavior
+ // Normalize object path to ensure consistency with toFilerPath behavior
normalizedObject := removeDuplicateSlashes(object)
if versionId == "" {
@@ -843,7 +843,7 @@ func (s3a *S3ApiServer) ListObjectVersionsHandler(w http.ResponseWriter, r *http
// getLatestObjectVersion finds the latest version of an object by reading .versions directory metadata
func (s3a *S3ApiServer) getLatestObjectVersion(bucket, object string) (*filer_pb.Entry, error) {
- // Normalize object path to ensure consistency with toFilerUrl behavior
+ // Normalize object path to ensure consistency with toFilerPath behavior
normalizedObject := removeDuplicateSlashes(object)
bucketDir := s3a.option.BucketsPath + "/" + bucket
diff --git a/weed/s3api/s3api_server.go b/weed/s3api/s3api_server.go
index 992027fda..dcf3a55f2 100644
--- a/weed/s3api/s3api_server.go
+++ b/weed/s3api/s3api_server.go
@@ -11,29 +11,31 @@ import (
"strings"
"time"
+ "github.com/gorilla/mux"
+ "google.golang.org/grpc"
+
+ "github.com/seaweedfs/seaweedfs/weed/cluster"
"github.com/seaweedfs/seaweedfs/weed/credential"
"github.com/seaweedfs/seaweedfs/weed/filer"
"github.com/seaweedfs/seaweedfs/weed/glog"
"github.com/seaweedfs/seaweedfs/weed/iam/integration"
"github.com/seaweedfs/seaweedfs/weed/iam/policy"
"github.com/seaweedfs/seaweedfs/weed/iam/sts"
- "github.com/seaweedfs/seaweedfs/weed/pb/s3_pb"
- "github.com/seaweedfs/seaweedfs/weed/util/grace"
- "github.com/seaweedfs/seaweedfs/weed/wdclient"
-
- "github.com/gorilla/mux"
"github.com/seaweedfs/seaweedfs/weed/pb"
+ "github.com/seaweedfs/seaweedfs/weed/pb/s3_pb"
. "github.com/seaweedfs/seaweedfs/weed/s3api/s3_constants"
"github.com/seaweedfs/seaweedfs/weed/s3api/s3err"
"github.com/seaweedfs/seaweedfs/weed/security"
"github.com/seaweedfs/seaweedfs/weed/util"
+ "github.com/seaweedfs/seaweedfs/weed/util/grace"
util_http "github.com/seaweedfs/seaweedfs/weed/util/http"
util_http_client "github.com/seaweedfs/seaweedfs/weed/util/http/client"
- "google.golang.org/grpc"
+ "github.com/seaweedfs/seaweedfs/weed/wdclient"
)
type S3ApiServerOption struct {
- Filer pb.ServerAddress
+ Filers []pb.ServerAddress
+ Masters []pb.ServerAddress // For filer discovery
Port int
Config string
DomainName string
@@ -69,6 +71,10 @@ func NewS3ApiServer(router *mux.Router, option *S3ApiServerOption) (s3ApiServer
}
func NewS3ApiServerWithStore(router *mux.Router, option *S3ApiServerOption, explicitStore string) (s3ApiServer *S3ApiServer, err error) {
+ if len(option.Filers) == 0 {
+ return nil, fmt.Errorf("at least one filer address is required")
+ }
+
startTsNs := time.Now().UnixNano()
v := util.GetViper()
@@ -95,9 +101,38 @@ func NewS3ApiServerWithStore(router *mux.Router, option *S3ApiServerOption, expl
// Initialize FilerClient for volume location caching
// Uses the battle-tested vidMap with filer-based lookups
- // S3 API typically connects to a single filer, but wrap in slice for consistency
- filerClient := wdclient.NewFilerClient([]pb.ServerAddress{option.Filer}, option.GrpcDialOption, option.DataCenter)
- glog.V(0).Infof("S3 API initialized FilerClient for volume location caching")
+ // Supports multiple filer addresses with automatic failover for high availability
+ var filerClient *wdclient.FilerClient
+ if len(option.Masters) > 0 && option.FilerGroup != "" {
+ // Enable filer discovery via master
+ masterMap := make(map[string]pb.ServerAddress)
+ for i, addr := range option.Masters {
+ masterMap[fmt.Sprintf("master%d", i)] = addr
+ }
+ masterClient := wdclient.NewMasterClient(option.GrpcDialOption, option.FilerGroup, cluster.S3Type, "", "", "", *pb.NewServiceDiscoveryFromMap(masterMap))
+
+ filerClient = wdclient.NewFilerClient(option.Filers, option.GrpcDialOption, option.DataCenter, &wdclient.FilerClientOption{
+ MasterClient: masterClient,
+ FilerGroup: option.FilerGroup,
+ DiscoveryInterval: 5 * time.Minute,
+ })
+ glog.V(0).Infof("S3 API initialized FilerClient with %d filer(s) and discovery enabled (group: %s, masters: %v)",
+ len(option.Filers), option.FilerGroup, option.Masters)
+ } else {
+ filerClient = wdclient.NewFilerClient(option.Filers, option.GrpcDialOption, option.DataCenter)
+ glog.V(0).Infof("S3 API initialized FilerClient with %d filer(s) (no discovery)", len(option.Filers))
+ }
+
+ // Update credential store to use FilerClient's current filer for HA
+ if store := iam.credentialManager.GetStore(); store != nil {
+ if filerFuncSetter, ok := store.(interface {
+ SetFilerAddressFunc(func() pb.ServerAddress, grpc.DialOption)
+ }); ok {
+ // Use FilerClient's GetCurrentFiler for true HA
+ filerFuncSetter.SetFilerAddressFunc(filerClient.GetCurrentFiler, option.GrpcDialOption)
+ glog.V(1).Infof("Updated credential store to use FilerClient's current active filer (HA-aware)")
+ }
+ }
s3ApiServer = &S3ApiServer{
option: option,
@@ -119,14 +154,16 @@ func NewS3ApiServerWithStore(router *mux.Router, option *S3ApiServerOption, expl
if option.IamConfig != "" {
glog.V(1).Infof("Loading advanced IAM configuration from: %s", option.IamConfig)
+ // Use FilerClient's GetCurrentFiler for HA-aware filer selection
iamManager, err := loadIAMManagerFromConfig(option.IamConfig, func() string {
- return string(option.Filer)
+ return string(filerClient.GetCurrentFiler())
})
if err != nil {
glog.Errorf("Failed to load IAM configuration: %v", err)
} else {
// Create S3 IAM integration with the loaded IAM manager
- s3iam := NewS3IAMIntegration(iamManager, string(option.Filer))
+ // filerAddress not actually used, just for backward compatibility
+ s3iam := NewS3IAMIntegration(iamManager, "")
// Set IAM integration in server
s3ApiServer.iamIntegration = s3iam
@@ -134,7 +171,7 @@ func NewS3ApiServerWithStore(router *mux.Router, option *S3ApiServerOption, expl
// Set the integration in the traditional IAM for compatibility
iam.SetIAMIntegration(s3iam)
- glog.V(1).Infof("Advanced IAM system initialized successfully")
+ glog.V(1).Infof("Advanced IAM system initialized successfully with HA filer support")
}
}
@@ -173,6 +210,21 @@ func NewS3ApiServerWithStore(router *mux.Router, option *S3ApiServerOption, expl
return s3ApiServer, nil
}
+// getFilerAddress returns the current active filer address
+// Uses FilerClient's tracked current filer which is updated on successful operations
+// This provides better availability than always using the first filer
+func (s3a *S3ApiServer) getFilerAddress() pb.ServerAddress {
+ if s3a.filerClient != nil {
+ return s3a.filerClient.GetCurrentFiler()
+ }
+ // Fallback to first filer if filerClient not initialized
+ if len(s3a.option.Filers) > 0 {
+ return s3a.option.Filers[0]
+ }
+ glog.Warningf("getFilerAddress: no filer addresses available")
+ return ""
+}
+
// syncBucketPolicyToEngine syncs a bucket policy to the policy engine
// This helper method centralizes the logic for loading bucket policies into the engine
// to avoid duplication and ensure consistent error handling