diff options
Diffstat (limited to 'weed/s3api')
| -rw-r--r-- | weed/s3api/auth_credentials.go | 21 | ||||
| -rw-r--r-- | weed/s3api/filer_multipart.go | 25 | ||||
| -rw-r--r-- | weed/s3api/s3api_bucket_handlers.go | 7 | ||||
| -rw-r--r-- | weed/s3api/s3api_circuit_breaker.go | 4 | ||||
| -rw-r--r-- | weed/s3api/s3api_handlers.go | 68 | ||||
| -rw-r--r-- | weed/s3api/s3api_object_handlers.go | 10 | ||||
| -rw-r--r-- | weed/s3api/s3api_object_handlers_multipart.go | 12 | ||||
| -rw-r--r-- | weed/s3api/s3api_object_handlers_postpolicy.go | 4 | ||||
| -rw-r--r-- | weed/s3api/s3api_object_handlers_put.go | 37 | ||||
| -rw-r--r-- | weed/s3api/s3api_object_handlers_test.go | 2 | ||||
| -rw-r--r-- | weed/s3api/s3api_object_versioning.go | 6 | ||||
| -rw-r--r-- | weed/s3api/s3api_server.go | 78 |
12 files changed, 206 insertions, 68 deletions
diff --git a/weed/s3api/auth_credentials.go b/weed/s3api/auth_credentials.go index cebcd17f5..148839d3e 100644 --- a/weed/s3api/auth_credentials.go +++ b/weed/s3api/auth_credentials.go @@ -14,6 +14,7 @@ import ( "github.com/seaweedfs/seaweedfs/weed/filer" "github.com/seaweedfs/seaweedfs/weed/glog" "github.com/seaweedfs/seaweedfs/weed/kms" + "github.com/seaweedfs/seaweedfs/weed/pb" "github.com/seaweedfs/seaweedfs/weed/pb/filer_pb" "github.com/seaweedfs/seaweedfs/weed/pb/iam_pb" "github.com/seaweedfs/seaweedfs/weed/s3api/s3_constants" @@ -136,12 +137,24 @@ func NewIdentityAccessManagementWithStore(option *S3ApiServerOption, explicitSto glog.Fatalf("failed to initialize credential manager: %v", err) } - // For stores that need filer client details, set them + // For stores that need filer client details, set them temporarily + // This will be updated to use FilerClient's GetCurrentFiler after FilerClient is created if store := credentialManager.GetStore(); store != nil { - if filerClientSetter, ok := store.(interface { - SetFilerClient(string, grpc.DialOption) + if filerFuncSetter, ok := store.(interface { + SetFilerAddressFunc(func() pb.ServerAddress, grpc.DialOption) }); ok { - filerClientSetter.SetFilerClient(string(option.Filer), option.GrpcDialOption) + // Temporary setup: use first filer until FilerClient is available + // See s3api_server.go where this is updated to FilerClient.GetCurrentFiler + if len(option.Filers) > 0 { + getFiler := func() pb.ServerAddress { + if len(option.Filers) > 0 { + return option.Filers[0] + } + return "" + } + filerFuncSetter.SetFilerAddressFunc(getFiler, option.GrpcDialOption) + glog.V(1).Infof("Credential store configured with temporary filer function (will be updated after FilerClient creation)") + } } } diff --git a/weed/s3api/filer_multipart.go b/weed/s3api/filer_multipart.go index 4b8fbaa62..1e4635ead 100644 --- a/weed/s3api/filer_multipart.go +++ b/weed/s3api/filer_multipart.go @@ -10,6 +10,7 @@ import ( "errors" "fmt" "math" + "net/url" "path/filepath" "slices" "sort" @@ -42,6 +43,20 @@ type InitiateMultipartUploadResult struct { s3.CreateMultipartUploadOutput } +// getRequestScheme determines the URL scheme (http or https) from the request +// Checks X-Forwarded-Proto header first (for proxies), then TLS state +func getRequestScheme(r *http.Request) string { + // Check X-Forwarded-Proto header for proxied requests + if proto := r.Header.Get("X-Forwarded-Proto"); proto != "" { + return proto + } + // Check if connection is TLS + if r.TLS != nil { + return "https" + } + return "http" +} + func (s3a *S3ApiServer) createMultipartUpload(r *http.Request, input *s3.CreateMultipartUploadInput) (output *InitiateMultipartUploadResult, code s3err.ErrorCode) { glog.V(2).Infof("createMultipartUpload input %v", input) @@ -183,8 +198,10 @@ func (s3a *S3ApiServer) completeMultipartUpload(r *http.Request, input *s3.Compl entryName, dirName := s3a.getEntryNameAndDir(input) if entry, _ := s3a.getEntry(dirName, entryName); entry != nil && entry.Extended != nil { if uploadId, ok := entry.Extended[s3_constants.SeaweedFSUploadId]; ok && *input.UploadId == string(uploadId) { + // Location uses the S3 endpoint that the client connected to + // Format: scheme://s3-endpoint/bucket/object (following AWS S3 API) return &CompleteMultipartUploadResult{ - Location: aws.String(fmt.Sprintf("http://%s%s/%s", s3a.option.Filer.ToHttpAddress(), urlEscapeObject(dirName), urlPathEscape(entryName))), + Location: aws.String(fmt.Sprintf("%s://%s/%s/%s", getRequestScheme(r), r.Host, url.PathEscape(*input.Bucket), urlPathEscape(*input.Key))), Bucket: input.Bucket, ETag: aws.String("\"" + filer.ETagChunks(entry.GetChunks()) + "\""), Key: objectKey(input.Key), @@ -401,7 +418,7 @@ func (s3a *S3ApiServer) completeMultipartUpload(r *http.Request, input *s3.Compl // The latest version information is tracked in the .versions directory metadata output = &CompleteMultipartUploadResult{ - Location: aws.String(fmt.Sprintf("http://%s%s/%s", s3a.option.Filer.ToHttpAddress(), urlEscapeObject(dirName), urlPathEscape(entryName))), + Location: aws.String(fmt.Sprintf("%s://%s/%s/%s", getRequestScheme(r), r.Host, url.PathEscape(*input.Bucket), urlPathEscape(*input.Key))), Bucket: input.Bucket, ETag: aws.String("\"" + filer.ETagChunks(finalParts) + "\""), Key: objectKey(input.Key), @@ -454,7 +471,7 @@ func (s3a *S3ApiServer) completeMultipartUpload(r *http.Request, input *s3.Compl // Note: Suspended versioning should NOT return VersionId field according to AWS S3 spec output = &CompleteMultipartUploadResult{ - Location: aws.String(fmt.Sprintf("http://%s%s/%s", s3a.option.Filer.ToHttpAddress(), urlEscapeObject(dirName), urlPathEscape(entryName))), + Location: aws.String(fmt.Sprintf("%s://%s/%s/%s", getRequestScheme(r), r.Host, url.PathEscape(*input.Bucket), urlPathEscape(*input.Key))), Bucket: input.Bucket, ETag: aws.String("\"" + filer.ETagChunks(finalParts) + "\""), Key: objectKey(input.Key), @@ -511,7 +528,7 @@ func (s3a *S3ApiServer) completeMultipartUpload(r *http.Request, input *s3.Compl // For non-versioned buckets, return response without VersionId output = &CompleteMultipartUploadResult{ - Location: aws.String(fmt.Sprintf("http://%s%s/%s", s3a.option.Filer.ToHttpAddress(), urlEscapeObject(dirName), urlPathEscape(entryName))), + Location: aws.String(fmt.Sprintf("%s://%s/%s/%s", getRequestScheme(r), r.Host, url.PathEscape(*input.Bucket), urlPathEscape(*input.Key))), Bucket: input.Bucket, ETag: aws.String("\"" + filer.ETagChunks(finalParts) + "\""), Key: objectKey(input.Key), diff --git a/weed/s3api/s3api_bucket_handlers.go b/weed/s3api/s3api_bucket_handlers.go index d73fabd2f..f0704fe23 100644 --- a/weed/s3api/s3api_bucket_handlers.go +++ b/weed/s3api/s3api_bucket_handlers.go @@ -877,7 +877,8 @@ func (s3a *S3ApiServer) GetBucketLifecycleConfigurationHandler(w http.ResponseWr s3err.WriteErrorResponse(w, r, err) return } - fc, err := filer.ReadFilerConf(s3a.option.Filer, s3a.option.GrpcDialOption, nil) + // ReadFilerConfFromFilers provides multi-filer failover + fc, err := filer.ReadFilerConfFromFilers(s3a.option.Filers, s3a.option.GrpcDialOption, nil) if err != nil { glog.Errorf("GetBucketLifecycleConfigurationHandler: %s", err) s3err.WriteErrorResponse(w, r, s3err.ErrInternalError) @@ -938,7 +939,7 @@ func (s3a *S3ApiServer) PutBucketLifecycleConfigurationHandler(w http.ResponseWr return } - fc, err := filer.ReadFilerConf(s3a.option.Filer, s3a.option.GrpcDialOption, nil) + fc, err := filer.ReadFilerConfFromFilers(s3a.option.Filers, s3a.option.GrpcDialOption, nil) if err != nil { glog.Errorf("PutBucketLifecycleConfigurationHandler read filer config: %s", err) s3err.WriteErrorResponse(w, r, s3err.ErrInternalError) @@ -1020,7 +1021,7 @@ func (s3a *S3ApiServer) DeleteBucketLifecycleHandler(w http.ResponseWriter, r *h return } - fc, err := filer.ReadFilerConf(s3a.option.Filer, s3a.option.GrpcDialOption, nil) + fc, err := filer.ReadFilerConfFromFilers(s3a.option.Filers, s3a.option.GrpcDialOption, nil) if err != nil { glog.Errorf("DeleteBucketLifecycleHandler read filer config: %s", err) s3err.WriteErrorResponse(w, r, s3err.ErrInternalError) diff --git a/weed/s3api/s3api_circuit_breaker.go b/weed/s3api/s3api_circuit_breaker.go index 47efa728a..2f5e1f580 100644 --- a/weed/s3api/s3api_circuit_breaker.go +++ b/weed/s3api/s3api_circuit_breaker.go @@ -29,7 +29,8 @@ func NewCircuitBreaker(option *S3ApiServerOption) *CircuitBreaker { limitations: make(map[string]int64), } - err := pb.WithFilerClient(false, 0, option.Filer, option.GrpcDialOption, func(client filer_pb.SeaweedFilerClient) error { + // Use WithOneOfGrpcFilerClients to support multiple filers with failover + err := pb.WithOneOfGrpcFilerClients(false, option.Filers, option.GrpcDialOption, func(client filer_pb.SeaweedFilerClient) error { content, err := filer.ReadInsideFiler(client, s3_constants.CircuitBreakerConfigDir, s3_constants.CircuitBreakerConfigFile) if errors.Is(err, filer_pb.ErrNotFound) { return nil @@ -41,6 +42,7 @@ func NewCircuitBreaker(option *S3ApiServerOption) *CircuitBreaker { }) if err != nil { + glog.Warningf("S3 circuit breaker disabled; failed to load config from any filer: %v", err) } return cb diff --git a/weed/s3api/s3api_handlers.go b/weed/s3api/s3api_handlers.go index c146a8b15..6c47e8256 100644 --- a/weed/s3api/s3api_handlers.go +++ b/weed/s3api/s3api_handlers.go @@ -5,6 +5,7 @@ import ( "fmt" "net/http" + "github.com/seaweedfs/seaweedfs/weed/glog" "github.com/seaweedfs/seaweedfs/weed/s3api/s3err" "google.golang.org/grpc" @@ -15,12 +16,75 @@ import ( var _ = filer_pb.FilerClient(&S3ApiServer{}) func (s3a *S3ApiServer) WithFilerClient(streamingMode bool, fn func(filer_pb.SeaweedFilerClient) error) error { - + // Use filerClient for proper connection management and failover + if s3a.filerClient != nil { + return s3a.withFilerClientFailover(streamingMode, fn) + } + + // Fallback to direct connection if filerClient not initialized + // This should only happen during initialization or testing return pb.WithGrpcClient(streamingMode, s3a.randomClientId, func(grpcConnection *grpc.ClientConn) error { client := filer_pb.NewSeaweedFilerClient(grpcConnection) return fn(client) - }, s3a.option.Filer.ToGrpcAddress(), false, s3a.option.GrpcDialOption) + }, s3a.getFilerAddress().ToGrpcAddress(), false, s3a.option.GrpcDialOption) + +} +// withFilerClientFailover attempts to execute fn with automatic failover to other filers +func (s3a *S3ApiServer) withFilerClientFailover(streamingMode bool, fn func(filer_pb.SeaweedFilerClient) error) error { + // Get current filer as starting point + currentFiler := s3a.filerClient.GetCurrentFiler() + + // Try current filer first (fast path) + err := pb.WithGrpcClient(streamingMode, s3a.randomClientId, func(grpcConnection *grpc.ClientConn) error { + client := filer_pb.NewSeaweedFilerClient(grpcConnection) + return fn(client) + }, currentFiler.ToGrpcAddress(), false, s3a.option.GrpcDialOption) + + if err == nil { + s3a.filerClient.RecordFilerSuccess(currentFiler) + return nil + } + + // Record failure for current filer + s3a.filerClient.RecordFilerFailure(currentFiler) + + // Current filer failed - try all other filers with health-aware selection + filers := s3a.filerClient.GetAllFilers() + var lastErr error = err + + for _, filer := range filers { + if filer == currentFiler { + continue // Already tried this one + } + + // Skip filers known to be unhealthy (circuit breaker pattern) + if s3a.filerClient.ShouldSkipUnhealthyFiler(filer) { + glog.V(2).Infof("WithFilerClient: skipping unhealthy filer %s", filer) + continue + } + + err = pb.WithGrpcClient(streamingMode, s3a.randomClientId, func(grpcConnection *grpc.ClientConn) error { + client := filer_pb.NewSeaweedFilerClient(grpcConnection) + return fn(client) + }, filer.ToGrpcAddress(), false, s3a.option.GrpcDialOption) + + if err == nil { + // Success! Record success and update current filer for future requests + s3a.filerClient.RecordFilerSuccess(filer) + s3a.filerClient.SetCurrentFiler(filer) + glog.V(1).Infof("WithFilerClient: failover from %s to %s succeeded", currentFiler, filer) + return nil + } + + // Record failure for health tracking + s3a.filerClient.RecordFilerFailure(filer) + glog.V(2).Infof("WithFilerClient: failover to %s failed: %v", filer, err) + lastErr = err + } + + // All filers failed + return fmt.Errorf("all filers failed, last error: %w", lastErr) } func (s3a *S3ApiServer) AdjustedUrl(location *filer_pb.Location) string { diff --git a/weed/s3api/s3api_object_handlers.go b/weed/s3api/s3api_object_handlers.go index cd0e82421..b1446c3e7 100644 --- a/weed/s3api/s3api_object_handlers.go +++ b/weed/s3api/s3api_object_handlers.go @@ -404,11 +404,11 @@ func newListEntry(entry *filer_pb.Entry, key string, dir string, name string, bu return listEntry } -func (s3a *S3ApiServer) toFilerUrl(bucket, object string) string { - object = urlPathEscape(removeDuplicateSlashes(object)) - destUrl := fmt.Sprintf("http://%s%s/%s%s", - s3a.option.Filer.ToHttpAddress(), s3a.option.BucketsPath, bucket, object) - return destUrl +func (s3a *S3ApiServer) toFilerPath(bucket, object string) string { + // Returns the raw file path - no URL escaping needed + // The path is used directly, not embedded in a URL + object = removeDuplicateSlashes(object) + return fmt.Sprintf("%s/%s%s", s3a.option.BucketsPath, bucket, object) } // hasConditionalHeaders checks if the request has any conditional headers diff --git a/weed/s3api/s3api_object_handlers_multipart.go b/weed/s3api/s3api_object_handlers_multipart.go index 2d9f8e620..becbd9bf9 100644 --- a/weed/s3api/s3api_object_handlers_multipart.go +++ b/weed/s3api/s3api_object_handlers_multipart.go @@ -404,7 +404,7 @@ func (s3a *S3ApiServer) PutObjectPartHandler(w http.ResponseWriter, r *http.Requ } } - uploadUrl := s3a.genPartUploadUrl(bucket, uploadID, partID) + filePath := s3a.genPartUploadPath(bucket, uploadID, partID) if partID == 1 && r.Header.Get("Content-Type") == "" { dataReader = mimeDetect(r, dataReader) @@ -413,7 +413,7 @@ func (s3a *S3ApiServer) PutObjectPartHandler(w http.ResponseWriter, r *http.Requ glog.V(2).Infof("PutObjectPart: bucket=%s, object=%s, uploadId=%s, partNumber=%d, size=%d", bucket, object, uploadID, partID, r.ContentLength) - etag, errCode, sseMetadata := s3a.putToFiler(r, uploadUrl, dataReader, bucket, partID) + etag, errCode, sseMetadata := s3a.putToFiler(r, filePath, dataReader, bucket, partID) if errCode != s3err.ErrNone { glog.Errorf("PutObjectPart: putToFiler failed with error code %v for bucket=%s, object=%s, partNumber=%d", errCode, bucket, object, partID) @@ -437,9 +437,11 @@ func (s3a *S3ApiServer) genUploadsFolder(bucket string) string { return fmt.Sprintf("%s/%s/%s", s3a.option.BucketsPath, bucket, s3_constants.MultipartUploadsFolder) } -func (s3a *S3ApiServer) genPartUploadUrl(bucket, uploadID string, partID int) string { - return fmt.Sprintf("http://%s%s/%s/%04d_%s.part", - s3a.option.Filer.ToHttpAddress(), s3a.genUploadsFolder(bucket), uploadID, partID, uuid.NewString()) +func (s3a *S3ApiServer) genPartUploadPath(bucket, uploadID string, partID int) string { + // Returns just the file path - no filer address needed + // Upload traffic goes directly to volume servers, not through filer + return fmt.Sprintf("%s/%s/%04d_%s.part", + s3a.genUploadsFolder(bucket), uploadID, partID, uuid.NewString()) } // Generate uploadID hash string from object diff --git a/weed/s3api/s3api_object_handlers_postpolicy.go b/weed/s3api/s3api_object_handlers_postpolicy.go index ecb2ac8d1..3ec6147f5 100644 --- a/weed/s3api/s3api_object_handlers_postpolicy.go +++ b/weed/s3api/s3api_object_handlers_postpolicy.go @@ -114,7 +114,7 @@ func (s3a *S3ApiServer) PostPolicyBucketHandler(w http.ResponseWriter, r *http.R } } - uploadUrl := fmt.Sprintf("http://%s%s/%s%s", s3a.option.Filer.ToHttpAddress(), s3a.option.BucketsPath, bucket, urlEscapeObject(object)) + filePath := fmt.Sprintf("%s/%s%s", s3a.option.BucketsPath, bucket, object) // Get ContentType from post formData // Otherwise from formFile ContentType @@ -136,7 +136,7 @@ func (s3a *S3ApiServer) PostPolicyBucketHandler(w http.ResponseWriter, r *http.R } } - etag, errCode, sseMetadata := s3a.putToFiler(r, uploadUrl, fileBody, bucket, 1) + etag, errCode, sseMetadata := s3a.putToFiler(r, filePath, fileBody, bucket, 1) if errCode != s3err.ErrNone { s3err.WriteErrorResponse(w, r, errCode) diff --git a/weed/s3api/s3api_object_handlers_put.go b/weed/s3api/s3api_object_handlers_put.go index 540a1e512..100796b2e 100644 --- a/weed/s3api/s3api_object_handlers_put.go +++ b/weed/s3api/s3api_object_handlers_put.go @@ -8,7 +8,6 @@ import ( "fmt" "io" "net/http" - "net/url" "path/filepath" "strconv" "strings" @@ -223,12 +222,12 @@ func (s3a *S3ApiServer) PutObjectHandler(w http.ResponseWriter, r *http.Request) s3a.setSSEResponseHeaders(w, r, sseMetadata) default: // Handle regular PUT (never configured versioning) - uploadUrl := s3a.toFilerUrl(bucket, object) + filePath := s3a.toFilerPath(bucket, object) if objectContentType == "" { dataReader = mimeDetect(r, dataReader) } - etag, errCode, sseMetadata := s3a.putToFiler(r, uploadUrl, dataReader, bucket, 1) + etag, errCode, sseMetadata := s3a.putToFiler(r, filePath, dataReader, bucket, 1) if errCode != s3err.ErrNone { s3err.WriteErrorResponse(w, r, errCode) @@ -248,9 +247,10 @@ func (s3a *S3ApiServer) PutObjectHandler(w http.ResponseWriter, r *http.Request) writeSuccessResponseEmpty(w, r) } -func (s3a *S3ApiServer) putToFiler(r *http.Request, uploadUrl string, dataReader io.Reader, bucket string, partNumber int) (etag string, code s3err.ErrorCode, sseMetadata SSEResponseMetadata) { +func (s3a *S3ApiServer) putToFiler(r *http.Request, filePath string, dataReader io.Reader, bucket string, partNumber int) (etag string, code s3err.ErrorCode, sseMetadata SSEResponseMetadata) { // NEW OPTIMIZATION: Write directly to volume servers, bypassing filer proxy // This eliminates the filer proxy overhead for PUT operations + // Note: filePath is now passed directly instead of URL (no parsing needed) // For SSE, encrypt with offset=0 for all parts // Each part is encrypted independently, then decrypted using metadata during GET @@ -311,20 +311,7 @@ func (s3a *S3ApiServer) putToFiler(r *http.Request, uploadUrl string, dataReader glog.V(4).Infof("putToFiler: explicit encryption already applied, skipping bucket default encryption") } - // Parse the upload URL to extract the file path - // uploadUrl format: http://filer:8888/path/to/bucket/object (or https://, IPv6, etc.) - // Use proper URL parsing instead of string manipulation for robustness - parsedUrl, parseErr := url.Parse(uploadUrl) - if parseErr != nil { - glog.Errorf("putToFiler: failed to parse uploadUrl %q: %v", uploadUrl, parseErr) - return "", s3err.ErrInternalError, SSEResponseMetadata{} - } - - // Use parsedUrl.Path directly - it's already decoded by url.Parse() - // Per Go documentation: "Path is stored in decoded form: /%47%6f%2f becomes /Go/" - // Calling PathUnescape again would double-decode and fail on keys like "b%ar" - filePath := parsedUrl.Path - + // filePath is already provided directly - no URL parsing needed // Step 1 & 2: Use auto-chunking to handle large files without OOM // This splits large uploads into 8MB chunks, preventing memory issues on both S3 API and volume servers const chunkSize = 8 * 1024 * 1024 // 8MB chunks (S3 standard) @@ -743,7 +730,7 @@ func (s3a *S3ApiServer) setObjectOwnerFromRequest(r *http.Request, entry *filer_ // For suspended versioning, objects are stored as regular files (version ID "null") in the bucket directory, // while existing versions from when versioning was enabled remain preserved in the .versions subdirectory. func (s3a *S3ApiServer) putSuspendedVersioningObject(r *http.Request, bucket, object string, dataReader io.Reader, objectContentType string) (etag string, errCode s3err.ErrorCode, sseMetadata SSEResponseMetadata) { - // Normalize object path to ensure consistency with toFilerUrl behavior + // Normalize object path to ensure consistency with toFilerPath behavior normalizedObject := removeDuplicateSlashes(object) glog.V(3).Infof("putSuspendedVersioningObject: START bucket=%s, object=%s, normalized=%s", @@ -783,7 +770,7 @@ func (s3a *S3ApiServer) putSuspendedVersioningObject(r *http.Request, bucket, ob glog.V(3).Infof("putSuspendedVersioningObject: no .versions directory for %s/%s", bucket, object) } - uploadUrl := s3a.toFilerUrl(bucket, normalizedObject) + filePath := s3a.toFilerPath(bucket, normalizedObject) body := dataReader if objectContentType == "" { @@ -846,7 +833,7 @@ func (s3a *S3ApiServer) putSuspendedVersioningObject(r *http.Request, bucket, ob } // Upload the file using putToFiler - this will create the file with version metadata - etag, errCode, sseMetadata = s3a.putToFiler(r, uploadUrl, body, bucket, 1) + etag, errCode, sseMetadata = s3a.putToFiler(r, filePath, body, bucket, 1) if errCode != s3err.ErrNone { glog.Errorf("putSuspendedVersioningObject: failed to upload object: %v", errCode) return "", errCode, SSEResponseMetadata{} @@ -937,7 +924,7 @@ func (s3a *S3ApiServer) putVersionedObject(r *http.Request, bucket, object strin // Generate version ID versionId = generateVersionId() - // Normalize object path to ensure consistency with toFilerUrl behavior + // Normalize object path to ensure consistency with toFilerPath behavior normalizedObject := removeDuplicateSlashes(object) glog.V(2).Infof("putVersionedObject: creating version %s for %s/%s (normalized: %s)", versionId, bucket, object, normalizedObject) @@ -948,7 +935,7 @@ func (s3a *S3ApiServer) putVersionedObject(r *http.Request, bucket, object strin // Upload directly to the versions directory // We need to construct the object path relative to the bucket versionObjectPath := normalizedObject + s3_constants.VersionsFolder + "/" + versionFileName - versionUploadUrl := s3a.toFilerUrl(bucket, versionObjectPath) + versionFilePath := s3a.toFilerPath(bucket, versionObjectPath) // Ensure the .versions directory exists before uploading bucketDir := s3a.option.BucketsPath + "/" + bucket @@ -966,9 +953,9 @@ func (s3a *S3ApiServer) putVersionedObject(r *http.Request, bucket, object strin body = mimeDetect(r, body) } - glog.V(2).Infof("putVersionedObject: uploading %s/%s version %s to %s", bucket, object, versionId, versionUploadUrl) + glog.V(2).Infof("putVersionedObject: uploading %s/%s version %s to %s", bucket, object, versionId, versionFilePath) - etag, errCode, sseMetadata = s3a.putToFiler(r, versionUploadUrl, body, bucket, 1) + etag, errCode, sseMetadata = s3a.putToFiler(r, versionFilePath, body, bucket, 1) if errCode != s3err.ErrNone { glog.Errorf("putVersionedObject: failed to upload version: %v", errCode) return "", "", errCode, SSEResponseMetadata{} diff --git a/weed/s3api/s3api_object_handlers_test.go b/weed/s3api/s3api_object_handlers_test.go index cf650a36e..a6592ad4b 100644 --- a/weed/s3api/s3api_object_handlers_test.go +++ b/weed/s3api/s3api_object_handlers_test.go @@ -114,7 +114,7 @@ func TestRemoveDuplicateSlashes(t *testing.T) { } } -func TestS3ApiServer_toFilerUrl(t *testing.T) { +func TestS3ApiServer_toFilerPath(t *testing.T) { tests := []struct { name string args string diff --git a/weed/s3api/s3api_object_versioning.go b/weed/s3api/s3api_object_versioning.go index 1c1dbee03..bbc43f205 100644 --- a/weed/s3api/s3api_object_versioning.go +++ b/weed/s3api/s3api_object_versioning.go @@ -607,7 +607,7 @@ func (s3a *S3ApiServer) calculateETagFromChunks(chunks []*filer_pb.FileChunk) st // getSpecificObjectVersion retrieves a specific version of an object func (s3a *S3ApiServer) getSpecificObjectVersion(bucket, object, versionId string) (*filer_pb.Entry, error) { - // Normalize object path to ensure consistency with toFilerUrl behavior + // Normalize object path to ensure consistency with toFilerPath behavior normalizedObject := removeDuplicateSlashes(object) if versionId == "" { @@ -639,7 +639,7 @@ func (s3a *S3ApiServer) getSpecificObjectVersion(bucket, object, versionId strin // deleteSpecificObjectVersion deletes a specific version of an object func (s3a *S3ApiServer) deleteSpecificObjectVersion(bucket, object, versionId string) error { - // Normalize object path to ensure consistency with toFilerUrl behavior + // Normalize object path to ensure consistency with toFilerPath behavior normalizedObject := removeDuplicateSlashes(object) if versionId == "" { @@ -843,7 +843,7 @@ func (s3a *S3ApiServer) ListObjectVersionsHandler(w http.ResponseWriter, r *http // getLatestObjectVersion finds the latest version of an object by reading .versions directory metadata func (s3a *S3ApiServer) getLatestObjectVersion(bucket, object string) (*filer_pb.Entry, error) { - // Normalize object path to ensure consistency with toFilerUrl behavior + // Normalize object path to ensure consistency with toFilerPath behavior normalizedObject := removeDuplicateSlashes(object) bucketDir := s3a.option.BucketsPath + "/" + bucket diff --git a/weed/s3api/s3api_server.go b/weed/s3api/s3api_server.go index 992027fda..dcf3a55f2 100644 --- a/weed/s3api/s3api_server.go +++ b/weed/s3api/s3api_server.go @@ -11,29 +11,31 @@ import ( "strings" "time" + "github.com/gorilla/mux" + "google.golang.org/grpc" + + "github.com/seaweedfs/seaweedfs/weed/cluster" "github.com/seaweedfs/seaweedfs/weed/credential" "github.com/seaweedfs/seaweedfs/weed/filer" "github.com/seaweedfs/seaweedfs/weed/glog" "github.com/seaweedfs/seaweedfs/weed/iam/integration" "github.com/seaweedfs/seaweedfs/weed/iam/policy" "github.com/seaweedfs/seaweedfs/weed/iam/sts" - "github.com/seaweedfs/seaweedfs/weed/pb/s3_pb" - "github.com/seaweedfs/seaweedfs/weed/util/grace" - "github.com/seaweedfs/seaweedfs/weed/wdclient" - - "github.com/gorilla/mux" "github.com/seaweedfs/seaweedfs/weed/pb" + "github.com/seaweedfs/seaweedfs/weed/pb/s3_pb" . "github.com/seaweedfs/seaweedfs/weed/s3api/s3_constants" "github.com/seaweedfs/seaweedfs/weed/s3api/s3err" "github.com/seaweedfs/seaweedfs/weed/security" "github.com/seaweedfs/seaweedfs/weed/util" + "github.com/seaweedfs/seaweedfs/weed/util/grace" util_http "github.com/seaweedfs/seaweedfs/weed/util/http" util_http_client "github.com/seaweedfs/seaweedfs/weed/util/http/client" - "google.golang.org/grpc" + "github.com/seaweedfs/seaweedfs/weed/wdclient" ) type S3ApiServerOption struct { - Filer pb.ServerAddress + Filers []pb.ServerAddress + Masters []pb.ServerAddress // For filer discovery Port int Config string DomainName string @@ -69,6 +71,10 @@ func NewS3ApiServer(router *mux.Router, option *S3ApiServerOption) (s3ApiServer } func NewS3ApiServerWithStore(router *mux.Router, option *S3ApiServerOption, explicitStore string) (s3ApiServer *S3ApiServer, err error) { + if len(option.Filers) == 0 { + return nil, fmt.Errorf("at least one filer address is required") + } + startTsNs := time.Now().UnixNano() v := util.GetViper() @@ -95,9 +101,38 @@ func NewS3ApiServerWithStore(router *mux.Router, option *S3ApiServerOption, expl // Initialize FilerClient for volume location caching // Uses the battle-tested vidMap with filer-based lookups - // S3 API typically connects to a single filer, but wrap in slice for consistency - filerClient := wdclient.NewFilerClient([]pb.ServerAddress{option.Filer}, option.GrpcDialOption, option.DataCenter) - glog.V(0).Infof("S3 API initialized FilerClient for volume location caching") + // Supports multiple filer addresses with automatic failover for high availability + var filerClient *wdclient.FilerClient + if len(option.Masters) > 0 && option.FilerGroup != "" { + // Enable filer discovery via master + masterMap := make(map[string]pb.ServerAddress) + for i, addr := range option.Masters { + masterMap[fmt.Sprintf("master%d", i)] = addr + } + masterClient := wdclient.NewMasterClient(option.GrpcDialOption, option.FilerGroup, cluster.S3Type, "", "", "", *pb.NewServiceDiscoveryFromMap(masterMap)) + + filerClient = wdclient.NewFilerClient(option.Filers, option.GrpcDialOption, option.DataCenter, &wdclient.FilerClientOption{ + MasterClient: masterClient, + FilerGroup: option.FilerGroup, + DiscoveryInterval: 5 * time.Minute, + }) + glog.V(0).Infof("S3 API initialized FilerClient with %d filer(s) and discovery enabled (group: %s, masters: %v)", + len(option.Filers), option.FilerGroup, option.Masters) + } else { + filerClient = wdclient.NewFilerClient(option.Filers, option.GrpcDialOption, option.DataCenter) + glog.V(0).Infof("S3 API initialized FilerClient with %d filer(s) (no discovery)", len(option.Filers)) + } + + // Update credential store to use FilerClient's current filer for HA + if store := iam.credentialManager.GetStore(); store != nil { + if filerFuncSetter, ok := store.(interface { + SetFilerAddressFunc(func() pb.ServerAddress, grpc.DialOption) + }); ok { + // Use FilerClient's GetCurrentFiler for true HA + filerFuncSetter.SetFilerAddressFunc(filerClient.GetCurrentFiler, option.GrpcDialOption) + glog.V(1).Infof("Updated credential store to use FilerClient's current active filer (HA-aware)") + } + } s3ApiServer = &S3ApiServer{ option: option, @@ -119,14 +154,16 @@ func NewS3ApiServerWithStore(router *mux.Router, option *S3ApiServerOption, expl if option.IamConfig != "" { glog.V(1).Infof("Loading advanced IAM configuration from: %s", option.IamConfig) + // Use FilerClient's GetCurrentFiler for HA-aware filer selection iamManager, err := loadIAMManagerFromConfig(option.IamConfig, func() string { - return string(option.Filer) + return string(filerClient.GetCurrentFiler()) }) if err != nil { glog.Errorf("Failed to load IAM configuration: %v", err) } else { // Create S3 IAM integration with the loaded IAM manager - s3iam := NewS3IAMIntegration(iamManager, string(option.Filer)) + // filerAddress not actually used, just for backward compatibility + s3iam := NewS3IAMIntegration(iamManager, "") // Set IAM integration in server s3ApiServer.iamIntegration = s3iam @@ -134,7 +171,7 @@ func NewS3ApiServerWithStore(router *mux.Router, option *S3ApiServerOption, expl // Set the integration in the traditional IAM for compatibility iam.SetIAMIntegration(s3iam) - glog.V(1).Infof("Advanced IAM system initialized successfully") + glog.V(1).Infof("Advanced IAM system initialized successfully with HA filer support") } } @@ -173,6 +210,21 @@ func NewS3ApiServerWithStore(router *mux.Router, option *S3ApiServerOption, expl return s3ApiServer, nil } +// getFilerAddress returns the current active filer address +// Uses FilerClient's tracked current filer which is updated on successful operations +// This provides better availability than always using the first filer +func (s3a *S3ApiServer) getFilerAddress() pb.ServerAddress { + if s3a.filerClient != nil { + return s3a.filerClient.GetCurrentFiler() + } + // Fallback to first filer if filerClient not initialized + if len(s3a.option.Filers) > 0 { + return s3a.option.Filers[0] + } + glog.Warningf("getFilerAddress: no filer addresses available") + return "" +} + // syncBucketPolicyToEngine syncs a bucket policy to the policy engine // This helper method centralizes the logic for loading bucket policies into the engine // to avoid duplication and ensure consistent error handling |
