aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--.github/workflows/tus-tests.yml114
-rw-r--r--.gitignore1
-rw-r--r--test/s3/sse/github_7562_copy_test.go505
-rw-r--r--test/tus/Makefile226
-rw-r--r--test/tus/README.md241
-rw-r--r--test/tus/tus_integration_test.go772
-rw-r--r--weed/command/filer.go3
-rw-r--r--weed/command/server.go1
-rw-r--r--weed/s3api/s3api_object_handlers_copy.go277
-rw-r--r--weed/server/filer_server.go12
-rw-r--r--weed/server/filer_server_tus_handlers.go415
-rw-r--r--weed/server/filer_server_tus_session.go341
12 files changed, 2895 insertions, 13 deletions
diff --git a/.github/workflows/tus-tests.yml b/.github/workflows/tus-tests.yml
new file mode 100644
index 000000000..7fa8818ce
--- /dev/null
+++ b/.github/workflows/tus-tests.yml
@@ -0,0 +1,114 @@
+name: "TUS Protocol Tests"
+
+on:
+ pull_request:
+ paths:
+ - 'weed/server/filer_server_tus*.go'
+ - 'weed/server/filer_server.go'
+ - 'test/tus/**'
+ - '.github/workflows/tus-tests.yml'
+ push:
+ branches: [ master, main ]
+ paths:
+ - 'weed/server/filer_server_tus*.go'
+ - 'weed/server/filer_server.go'
+ - 'test/tus/**'
+
+concurrency:
+ group: ${{ github.head_ref || github.ref }}/tus-tests
+ cancel-in-progress: true
+
+permissions:
+ contents: read
+
+defaults:
+ run:
+ working-directory: weed
+
+jobs:
+ tus-integration-tests:
+ name: TUS Protocol Integration Tests
+ runs-on: ubuntu-22.04
+ timeout-minutes: 20
+
+ steps:
+ - name: Check out code
+ uses: actions/checkout@v6
+
+ - name: Set up Go
+ uses: actions/setup-go@v6
+ with:
+ go-version-file: 'go.mod'
+ id: go
+
+ - name: Install SeaweedFS
+ run: |
+ go install -buildvcs=false
+
+ - name: Run TUS Integration Tests
+ timeout-minutes: 15
+ working-directory: test/tus
+ run: |
+ set -x
+ echo "=== System Information ==="
+ uname -a
+ free -h
+ df -h
+ echo "=== Starting TUS Tests ==="
+
+ # Run tests with automatic server management
+ make test-with-server || {
+ echo "TUS integration tests failed, checking logs..."
+ if [ -f /tmp/seaweedfs-tus-filer.log ]; then
+ echo "=== Filer logs ==="
+ tail -100 /tmp/seaweedfs-tus-filer.log
+ fi
+ if [ -f /tmp/seaweedfs-tus-master.log ]; then
+ echo "=== Master logs ==="
+ tail -50 /tmp/seaweedfs-tus-master.log
+ fi
+ if [ -f /tmp/seaweedfs-tus-volume.log ]; then
+ echo "=== Volume logs ==="
+ tail -50 /tmp/seaweedfs-tus-volume.log
+ fi
+ exit 1
+ }
+
+ - name: Show server logs on failure
+ if: failure()
+ working-directory: test/tus
+ run: |
+ echo "=== Filer Server Logs ==="
+ if [ -f /tmp/seaweedfs-tus-filer.log ]; then
+ echo "Last 100 lines of filer logs:"
+ tail -100 /tmp/seaweedfs-tus-filer.log
+ else
+ echo "No filer log file found"
+ fi
+
+ echo "=== Master Server Logs ==="
+ if [ -f /tmp/seaweedfs-tus-master.log ]; then
+ tail -50 /tmp/seaweedfs-tus-master.log
+ else
+ echo "No master log file found"
+ fi
+
+ echo "=== Volume Server Logs ==="
+ if [ -f /tmp/seaweedfs-tus-volume.log ]; then
+ tail -50 /tmp/seaweedfs-tus-volume.log
+ else
+ echo "No volume log file found"
+ fi
+
+ echo "=== Test Environment ==="
+ ps aux | grep -E "(weed|test)" || true
+ netstat -tlnp 2>/dev/null | grep -E "(18888|19333|18080)" || true
+
+ - name: Upload test logs on failure
+ if: failure()
+ uses: actions/upload-artifact@v5
+ with:
+ name: tus-test-logs
+ path: |
+ /tmp/seaweedfs-tus-*.log
+ retention-days: 3
diff --git a/.gitignore b/.gitignore
index cd240ab6d..81b4b107d 100644
--- a/.gitignore
+++ b/.gitignore
@@ -124,3 +124,4 @@ ADVANCED_IAM_DEVELOPMENT_PLAN.md
*.log
weed-iam
test/kafka/kafka-client-loadtest/weed-linux-arm64
+/test/tus/filerldb2
diff --git a/test/s3/sse/github_7562_copy_test.go b/test/s3/sse/github_7562_copy_test.go
new file mode 100644
index 000000000..5831c0b80
--- /dev/null
+++ b/test/s3/sse/github_7562_copy_test.go
@@ -0,0 +1,505 @@
+package sse_test
+
+import (
+ "bytes"
+ "context"
+ "fmt"
+ "io"
+ "testing"
+
+ "github.com/aws/aws-sdk-go-v2/aws"
+ "github.com/aws/aws-sdk-go-v2/service/s3"
+ "github.com/aws/aws-sdk-go-v2/service/s3/types"
+ "github.com/stretchr/testify/assert"
+ "github.com/stretchr/testify/require"
+)
+
+// TestGitHub7562CopyFromEncryptedToTempToEncrypted reproduces the exact scenario from
+// GitHub issue #7562: copying from an encrypted bucket to a temp bucket, then to another
+// encrypted bucket fails with InternalError.
+//
+// Reproduction steps:
+// 1. Create source bucket with SSE-S3 encryption enabled
+// 2. Upload object (automatically encrypted)
+// 3. Create temp bucket (no encryption)
+// 4. Copy object from source to temp (decrypts)
+// 5. Delete source bucket
+// 6. Create destination bucket with SSE-S3 encryption
+// 7. Copy object from temp to dest (should re-encrypt) - THIS FAILS
+func TestGitHub7562CopyFromEncryptedToTempToEncrypted(t *testing.T) {
+ ctx := context.Background()
+ client, err := createS3Client(ctx, defaultConfig)
+ require.NoError(t, err, "Failed to create S3 client")
+
+ // Create three buckets
+ srcBucket, err := createTestBucket(ctx, client, defaultConfig.BucketPrefix+"7562-src-")
+ require.NoError(t, err, "Failed to create source bucket")
+
+ tempBucket, err := createTestBucket(ctx, client, defaultConfig.BucketPrefix+"7562-temp-")
+ require.NoError(t, err, "Failed to create temp bucket")
+
+ destBucket, err := createTestBucket(ctx, client, defaultConfig.BucketPrefix+"7562-dest-")
+ require.NoError(t, err, "Failed to create destination bucket")
+
+ // Cleanup at the end
+ defer func() {
+ // Clean up in reverse order of creation
+ cleanupTestBucket(ctx, client, destBucket)
+ cleanupTestBucket(ctx, client, tempBucket)
+ // Note: srcBucket is deleted during the test
+ }()
+
+ testData := []byte("Test data for GitHub issue #7562 - copy from encrypted to temp to encrypted bucket")
+ objectKey := "demo-file.txt"
+
+ t.Logf("[1] Creating source bucket with SSE-S3 default encryption: %s", srcBucket)
+
+ // Step 1: Enable SSE-S3 default encryption on source bucket
+ _, err = client.PutBucketEncryption(ctx, &s3.PutBucketEncryptionInput{
+ Bucket: aws.String(srcBucket),
+ ServerSideEncryptionConfiguration: &types.ServerSideEncryptionConfiguration{
+ Rules: []types.ServerSideEncryptionRule{
+ {
+ ApplyServerSideEncryptionByDefault: &types.ServerSideEncryptionByDefault{
+ SSEAlgorithm: types.ServerSideEncryptionAes256,
+ },
+ },
+ },
+ },
+ })
+ require.NoError(t, err, "Failed to set source bucket default encryption")
+
+ t.Log("[2] Uploading demo object to source bucket")
+
+ // Step 2: Upload object to source bucket (will be automatically encrypted)
+ _, err = client.PutObject(ctx, &s3.PutObjectInput{
+ Bucket: aws.String(srcBucket),
+ Key: aws.String(objectKey),
+ Body: bytes.NewReader(testData),
+ // No encryption header - bucket default applies
+ })
+ require.NoError(t, err, "Failed to upload to source bucket")
+
+ // Verify source object is encrypted
+ srcHead, err := client.HeadObject(ctx, &s3.HeadObjectInput{
+ Bucket: aws.String(srcBucket),
+ Key: aws.String(objectKey),
+ })
+ require.NoError(t, err, "Failed to HEAD source object")
+ assert.Equal(t, types.ServerSideEncryptionAes256, srcHead.ServerSideEncryption,
+ "Source object should be SSE-S3 encrypted")
+ t.Logf("Source object encryption: %v", srcHead.ServerSideEncryption)
+
+ t.Logf("[3] Creating temp bucket (no encryption): %s", tempBucket)
+ // Temp bucket already created without encryption
+
+ t.Log("[4] Copying object from source to temp (should decrypt)")
+
+ // Step 4: Copy to temp bucket (no encryption = decrypts)
+ _, err = client.CopyObject(ctx, &s3.CopyObjectInput{
+ Bucket: aws.String(tempBucket),
+ Key: aws.String(objectKey),
+ CopySource: aws.String(fmt.Sprintf("%s/%s", srcBucket, objectKey)),
+ // No encryption header - data stored unencrypted
+ })
+ require.NoError(t, err, "Failed to copy to temp bucket")
+
+ // Verify temp object is NOT encrypted
+ tempHead, err := client.HeadObject(ctx, &s3.HeadObjectInput{
+ Bucket: aws.String(tempBucket),
+ Key: aws.String(objectKey),
+ })
+ require.NoError(t, err, "Failed to HEAD temp object")
+ assert.Empty(t, tempHead.ServerSideEncryption, "Temp object should NOT be encrypted")
+ t.Logf("Temp object encryption: %v (should be empty)", tempHead.ServerSideEncryption)
+
+ // Verify temp object content
+ tempGet, err := client.GetObject(ctx, &s3.GetObjectInput{
+ Bucket: aws.String(tempBucket),
+ Key: aws.String(objectKey),
+ })
+ require.NoError(t, err, "Failed to GET temp object")
+ tempData, err := io.ReadAll(tempGet.Body)
+ tempGet.Body.Close()
+ require.NoError(t, err, "Failed to read temp object")
+ assertDataEqual(t, testData, tempData, "Temp object data should match original")
+
+ t.Log("[5] Deleting original source bucket")
+
+ // Step 5: Delete source bucket
+ err = cleanupTestBucket(ctx, client, srcBucket)
+ require.NoError(t, err, "Failed to delete source bucket")
+
+ t.Logf("[6] Creating destination bucket with SSE-S3 encryption: %s", destBucket)
+
+ // Step 6: Enable SSE-S3 default encryption on destination bucket
+ _, err = client.PutBucketEncryption(ctx, &s3.PutBucketEncryptionInput{
+ Bucket: aws.String(destBucket),
+ ServerSideEncryptionConfiguration: &types.ServerSideEncryptionConfiguration{
+ Rules: []types.ServerSideEncryptionRule{
+ {
+ ApplyServerSideEncryptionByDefault: &types.ServerSideEncryptionByDefault{
+ SSEAlgorithm: types.ServerSideEncryptionAes256,
+ },
+ },
+ },
+ },
+ })
+ require.NoError(t, err, "Failed to set destination bucket default encryption")
+
+ t.Log("[7] Copying object from temp to dest (should re-encrypt) - THIS IS WHERE #7562 FAILS")
+
+ // Step 7: Copy from temp to dest bucket (should re-encrypt with SSE-S3)
+ // THIS IS THE STEP THAT FAILS IN GITHUB ISSUE #7562
+ _, err = client.CopyObject(ctx, &s3.CopyObjectInput{
+ Bucket: aws.String(destBucket),
+ Key: aws.String(objectKey),
+ CopySource: aws.String(fmt.Sprintf("%s/%s", tempBucket, objectKey)),
+ // No encryption header - bucket default should apply
+ })
+ require.NoError(t, err, "GitHub #7562: Failed to copy from temp to encrypted dest bucket")
+
+ // Verify destination object is encrypted
+ destHead, err := client.HeadObject(ctx, &s3.HeadObjectInput{
+ Bucket: aws.String(destBucket),
+ Key: aws.String(objectKey),
+ })
+ require.NoError(t, err, "Failed to HEAD destination object")
+ assert.Equal(t, types.ServerSideEncryptionAes256, destHead.ServerSideEncryption,
+ "Destination object should be SSE-S3 encrypted via bucket default")
+ t.Logf("Destination object encryption: %v", destHead.ServerSideEncryption)
+
+ // Verify destination object content is correct
+ destGet, err := client.GetObject(ctx, &s3.GetObjectInput{
+ Bucket: aws.String(destBucket),
+ Key: aws.String(objectKey),
+ })
+ require.NoError(t, err, "Failed to GET destination object")
+ destData, err := io.ReadAll(destGet.Body)
+ destGet.Body.Close()
+ require.NoError(t, err, "Failed to read destination object")
+ assertDataEqual(t, testData, destData, "GitHub #7562: Destination object data mismatch after re-encryption")
+
+ t.Log("[done] GitHub #7562 reproduction test completed successfully!")
+}
+
+// TestGitHub7562SimpleScenario tests the simpler variant: just copy unencrypted to encrypted bucket
+func TestGitHub7562SimpleScenario(t *testing.T) {
+ ctx := context.Background()
+ client, err := createS3Client(ctx, defaultConfig)
+ require.NoError(t, err, "Failed to create S3 client")
+
+ // Create two buckets
+ srcBucket, err := createTestBucket(ctx, client, defaultConfig.BucketPrefix+"7562-simple-src-")
+ require.NoError(t, err, "Failed to create source bucket")
+ defer cleanupTestBucket(ctx, client, srcBucket)
+
+ destBucket, err := createTestBucket(ctx, client, defaultConfig.BucketPrefix+"7562-simple-dest-")
+ require.NoError(t, err, "Failed to create destination bucket")
+ defer cleanupTestBucket(ctx, client, destBucket)
+
+ testData := []byte("Simple test for unencrypted to encrypted copy")
+ objectKey := "test-object.txt"
+
+ t.Logf("Source bucket (no encryption): %s", srcBucket)
+ t.Logf("Dest bucket (SSE-S3 default): %s", destBucket)
+
+ // Upload to unencrypted source bucket
+ _, err = client.PutObject(ctx, &s3.PutObjectInput{
+ Bucket: aws.String(srcBucket),
+ Key: aws.String(objectKey),
+ Body: bytes.NewReader(testData),
+ })
+ require.NoError(t, err, "Failed to upload to source bucket")
+
+ // Enable SSE-S3 on destination bucket
+ _, err = client.PutBucketEncryption(ctx, &s3.PutBucketEncryptionInput{
+ Bucket: aws.String(destBucket),
+ ServerSideEncryptionConfiguration: &types.ServerSideEncryptionConfiguration{
+ Rules: []types.ServerSideEncryptionRule{
+ {
+ ApplyServerSideEncryptionByDefault: &types.ServerSideEncryptionByDefault{
+ SSEAlgorithm: types.ServerSideEncryptionAes256,
+ },
+ },
+ },
+ },
+ })
+ require.NoError(t, err, "Failed to set dest bucket encryption")
+
+ // Copy to encrypted bucket (should use bucket default encryption)
+ _, err = client.CopyObject(ctx, &s3.CopyObjectInput{
+ Bucket: aws.String(destBucket),
+ Key: aws.String(objectKey),
+ CopySource: aws.String(fmt.Sprintf("%s/%s", srcBucket, objectKey)),
+ })
+ require.NoError(t, err, "Failed to copy to encrypted bucket")
+
+ // Verify destination is encrypted
+ destHead, err := client.HeadObject(ctx, &s3.HeadObjectInput{
+ Bucket: aws.String(destBucket),
+ Key: aws.String(objectKey),
+ })
+ require.NoError(t, err, "Failed to HEAD dest object")
+ assert.Equal(t, types.ServerSideEncryptionAes256, destHead.ServerSideEncryption,
+ "Object should be encrypted via bucket default")
+
+ // Verify content
+ destGet, err := client.GetObject(ctx, &s3.GetObjectInput{
+ Bucket: aws.String(destBucket),
+ Key: aws.String(objectKey),
+ })
+ require.NoError(t, err, "Failed to GET dest object")
+ destData, err := io.ReadAll(destGet.Body)
+ destGet.Body.Close()
+ require.NoError(t, err, "Failed to read dest object")
+ assertDataEqual(t, testData, destData, "Data mismatch")
+}
+
+// TestGitHub7562DebugMetadata helps debug what metadata is present on objects at each step
+func TestGitHub7562DebugMetadata(t *testing.T) {
+ ctx := context.Background()
+ client, err := createS3Client(ctx, defaultConfig)
+ require.NoError(t, err, "Failed to create S3 client")
+
+ // Create three buckets
+ srcBucket, err := createTestBucket(ctx, client, defaultConfig.BucketPrefix+"7562-debug-src-")
+ require.NoError(t, err, "Failed to create source bucket")
+
+ tempBucket, err := createTestBucket(ctx, client, defaultConfig.BucketPrefix+"7562-debug-temp-")
+ require.NoError(t, err, "Failed to create temp bucket")
+
+ destBucket, err := createTestBucket(ctx, client, defaultConfig.BucketPrefix+"7562-debug-dest-")
+ require.NoError(t, err, "Failed to create destination bucket")
+
+ defer func() {
+ cleanupTestBucket(ctx, client, destBucket)
+ cleanupTestBucket(ctx, client, tempBucket)
+ }()
+
+ testData := []byte("Debug metadata test for GitHub #7562")
+ objectKey := "debug-file.txt"
+
+ // Enable SSE-S3 on source
+ _, err = client.PutBucketEncryption(ctx, &s3.PutBucketEncryptionInput{
+ Bucket: aws.String(srcBucket),
+ ServerSideEncryptionConfiguration: &types.ServerSideEncryptionConfiguration{
+ Rules: []types.ServerSideEncryptionRule{
+ {
+ ApplyServerSideEncryptionByDefault: &types.ServerSideEncryptionByDefault{
+ SSEAlgorithm: types.ServerSideEncryptionAes256,
+ },
+ },
+ },
+ },
+ })
+ require.NoError(t, err, "Failed to set source bucket encryption")
+
+ // Upload
+ _, err = client.PutObject(ctx, &s3.PutObjectInput{
+ Bucket: aws.String(srcBucket),
+ Key: aws.String(objectKey),
+ Body: bytes.NewReader(testData),
+ })
+ require.NoError(t, err, "Failed to upload")
+
+ // Log source object headers
+ srcHead, err := client.HeadObject(ctx, &s3.HeadObjectInput{
+ Bucket: aws.String(srcBucket),
+ Key: aws.String(objectKey),
+ })
+ require.NoError(t, err, "Failed to HEAD source")
+ t.Logf("=== SOURCE OBJECT (encrypted) ===")
+ t.Logf("ServerSideEncryption: %v", srcHead.ServerSideEncryption)
+ t.Logf("Metadata: %v", srcHead.Metadata)
+ t.Logf("ContentLength: %d", aws.ToInt64(srcHead.ContentLength))
+
+ // Copy to temp
+ _, err = client.CopyObject(ctx, &s3.CopyObjectInput{
+ Bucket: aws.String(tempBucket),
+ Key: aws.String(objectKey),
+ CopySource: aws.String(fmt.Sprintf("%s/%s", srcBucket, objectKey)),
+ })
+ require.NoError(t, err, "Failed to copy to temp")
+
+ // Log temp object headers
+ tempHead, err := client.HeadObject(ctx, &s3.HeadObjectInput{
+ Bucket: aws.String(tempBucket),
+ Key: aws.String(objectKey),
+ })
+ require.NoError(t, err, "Failed to HEAD temp")
+ t.Logf("=== TEMP OBJECT (should be unencrypted) ===")
+ t.Logf("ServerSideEncryption: %v (should be empty)", tempHead.ServerSideEncryption)
+ t.Logf("Metadata: %v", tempHead.Metadata)
+ t.Logf("ContentLength: %d", aws.ToInt64(tempHead.ContentLength))
+
+ // Verify temp is NOT encrypted
+ if tempHead.ServerSideEncryption != "" {
+ t.Logf("WARNING: Temp object unexpectedly has encryption: %v", tempHead.ServerSideEncryption)
+ }
+
+ // Delete source bucket
+ cleanupTestBucket(ctx, client, srcBucket)
+
+ // Enable SSE-S3 on dest
+ _, err = client.PutBucketEncryption(ctx, &s3.PutBucketEncryptionInput{
+ Bucket: aws.String(destBucket),
+ ServerSideEncryptionConfiguration: &types.ServerSideEncryptionConfiguration{
+ Rules: []types.ServerSideEncryptionRule{
+ {
+ ApplyServerSideEncryptionByDefault: &types.ServerSideEncryptionByDefault{
+ SSEAlgorithm: types.ServerSideEncryptionAes256,
+ },
+ },
+ },
+ },
+ })
+ require.NoError(t, err, "Failed to set dest bucket encryption")
+
+ // Copy to dest - THIS IS WHERE #7562 FAILS
+ t.Log("=== COPYING TO ENCRYPTED DEST ===")
+ _, err = client.CopyObject(ctx, &s3.CopyObjectInput{
+ Bucket: aws.String(destBucket),
+ Key: aws.String(objectKey),
+ CopySource: aws.String(fmt.Sprintf("%s/%s", tempBucket, objectKey)),
+ })
+ if err != nil {
+ t.Logf("!!! COPY FAILED (GitHub #7562): %v", err)
+ t.FailNow()
+ }
+
+ // Log dest object headers
+ destHead, err := client.HeadObject(ctx, &s3.HeadObjectInput{
+ Bucket: aws.String(destBucket),
+ Key: aws.String(objectKey),
+ })
+ require.NoError(t, err, "Failed to HEAD dest")
+ t.Logf("=== DEST OBJECT (should be encrypted) ===")
+ t.Logf("ServerSideEncryption: %v", destHead.ServerSideEncryption)
+ t.Logf("Metadata: %v", destHead.Metadata)
+ t.Logf("ContentLength: %d", aws.ToInt64(destHead.ContentLength))
+
+ // Verify dest IS encrypted
+ assert.Equal(t, types.ServerSideEncryptionAes256, destHead.ServerSideEncryption,
+ "Dest object should be encrypted")
+
+ // Verify content is readable
+ destGet, err := client.GetObject(ctx, &s3.GetObjectInput{
+ Bucket: aws.String(destBucket),
+ Key: aws.String(objectKey),
+ })
+ require.NoError(t, err, "Failed to GET dest")
+ destData, err := io.ReadAll(destGet.Body)
+ destGet.Body.Close()
+ require.NoError(t, err, "Failed to read dest")
+ assertDataEqual(t, testData, destData, "Data mismatch")
+
+ t.Log("=== DEBUG TEST PASSED ===")
+}
+
+// TestGitHub7562LargeFile tests the issue with larger files that might trigger multipart handling
+func TestGitHub7562LargeFile(t *testing.T) {
+ ctx := context.Background()
+ client, err := createS3Client(ctx, defaultConfig)
+ require.NoError(t, err, "Failed to create S3 client")
+
+ srcBucket, err := createTestBucket(ctx, client, defaultConfig.BucketPrefix+"7562-large-src-")
+ require.NoError(t, err, "Failed to create source bucket")
+
+ tempBucket, err := createTestBucket(ctx, client, defaultConfig.BucketPrefix+"7562-large-temp-")
+ require.NoError(t, err, "Failed to create temp bucket")
+
+ destBucket, err := createTestBucket(ctx, client, defaultConfig.BucketPrefix+"7562-large-dest-")
+ require.NoError(t, err, "Failed to create destination bucket")
+
+ defer func() {
+ cleanupTestBucket(ctx, client, destBucket)
+ cleanupTestBucket(ctx, client, tempBucket)
+ }()
+
+ // Use larger file to potentially trigger different code paths
+ testData := generateTestData(5 * 1024 * 1024) // 5MB
+ objectKey := "large-file.bin"
+
+ t.Logf("Testing with %d byte file", len(testData))
+
+ // Enable SSE-S3 on source
+ _, err = client.PutBucketEncryption(ctx, &s3.PutBucketEncryptionInput{
+ Bucket: aws.String(srcBucket),
+ ServerSideEncryptionConfiguration: &types.ServerSideEncryptionConfiguration{
+ Rules: []types.ServerSideEncryptionRule{
+ {
+ ApplyServerSideEncryptionByDefault: &types.ServerSideEncryptionByDefault{
+ SSEAlgorithm: types.ServerSideEncryptionAes256,
+ },
+ },
+ },
+ },
+ })
+ require.NoError(t, err, "Failed to set source bucket encryption")
+
+ // Upload
+ _, err = client.PutObject(ctx, &s3.PutObjectInput{
+ Bucket: aws.String(srcBucket),
+ Key: aws.String(objectKey),
+ Body: bytes.NewReader(testData),
+ })
+ require.NoError(t, err, "Failed to upload")
+
+ // Copy to temp (decrypt)
+ _, err = client.CopyObject(ctx, &s3.CopyObjectInput{
+ Bucket: aws.String(tempBucket),
+ Key: aws.String(objectKey),
+ CopySource: aws.String(fmt.Sprintf("%s/%s", srcBucket, objectKey)),
+ })
+ require.NoError(t, err, "Failed to copy to temp")
+
+ // Delete source
+ cleanupTestBucket(ctx, client, srcBucket)
+
+ // Enable SSE-S3 on dest
+ _, err = client.PutBucketEncryption(ctx, &s3.PutBucketEncryptionInput{
+ Bucket: aws.String(destBucket),
+ ServerSideEncryptionConfiguration: &types.ServerSideEncryptionConfiguration{
+ Rules: []types.ServerSideEncryptionRule{
+ {
+ ApplyServerSideEncryptionByDefault: &types.ServerSideEncryptionByDefault{
+ SSEAlgorithm: types.ServerSideEncryptionAes256,
+ },
+ },
+ },
+ },
+ })
+ require.NoError(t, err, "Failed to set dest bucket encryption")
+
+ // Copy to dest (re-encrypt) - GitHub #7562
+ _, err = client.CopyObject(ctx, &s3.CopyObjectInput{
+ Bucket: aws.String(destBucket),
+ Key: aws.String(objectKey),
+ CopySource: aws.String(fmt.Sprintf("%s/%s", tempBucket, objectKey)),
+ })
+ require.NoError(t, err, "GitHub #7562: Large file copy to encrypted bucket failed")
+
+ // Verify
+ destHead, err := client.HeadObject(ctx, &s3.HeadObjectInput{
+ Bucket: aws.String(destBucket),
+ Key: aws.String(objectKey),
+ })
+ require.NoError(t, err, "Failed to HEAD dest")
+ assert.Equal(t, types.ServerSideEncryptionAes256, destHead.ServerSideEncryption)
+ assert.Equal(t, int64(len(testData)), aws.ToInt64(destHead.ContentLength))
+
+ // Verify content
+ destGet, err := client.GetObject(ctx, &s3.GetObjectInput{
+ Bucket: aws.String(destBucket),
+ Key: aws.String(objectKey),
+ })
+ require.NoError(t, err, "Failed to GET dest")
+ destData, err := io.ReadAll(destGet.Body)
+ destGet.Body.Close()
+ require.NoError(t, err, "Failed to read dest")
+ assertDataEqual(t, testData, destData, "Large file data mismatch")
+
+ t.Log("Large file test passed!")
+}
+
diff --git a/test/tus/Makefile b/test/tus/Makefile
new file mode 100644
index 000000000..71b05e8ab
--- /dev/null
+++ b/test/tus/Makefile
@@ -0,0 +1,226 @@
+# Makefile for TUS Protocol Integration Tests
+# This Makefile provides targets for running TUS (resumable upload) integration tests
+
+# Default values
+SEAWEEDFS_BINARY ?= weed
+FILER_PORT ?= 18888
+VOLUME_PORT ?= 18080
+MASTER_PORT ?= 19333
+TEST_TIMEOUT ?= 10m
+VOLUME_MAX_SIZE_MB ?= 50
+VOLUME_MAX_COUNT ?= 100
+
+# Test directory
+TEST_DIR := $(shell pwd)
+SEAWEEDFS_ROOT := $(shell cd ../.. && pwd)
+
+# Colors for output
+RED := \033[0;31m
+GREEN := \033[0;32m
+YELLOW := \033[1;33m
+NC := \033[0m # No Color
+
+.PHONY: all test clean start-seaweedfs stop-seaweedfs check-binary build-weed help test-basic test-chunked test-resume test-errors test-with-server
+
+all: test
+
+# Build SeaweedFS binary
+build-weed:
+ @echo "Building SeaweedFS binary..."
+ @cd $(SEAWEEDFS_ROOT)/weed && go build -o weed
+ @echo "$(GREEN)SeaweedFS binary built successfully$(NC)"
+
+help:
+ @echo "SeaweedFS TUS Protocol Integration Tests"
+ @echo ""
+ @echo "Available targets:"
+ @echo " test - Run all TUS integration tests"
+ @echo " test-basic - Run basic TUS upload tests"
+ @echo " test-chunked - Run chunked upload tests"
+ @echo " test-resume - Run upload resume tests"
+ @echo " test-errors - Run error handling tests"
+ @echo " test-with-server - Run tests with automatic server management"
+ @echo " start-seaweedfs - Start SeaweedFS server for testing"
+ @echo " stop-seaweedfs - Stop SeaweedFS server"
+ @echo " clean - Clean up test artifacts"
+ @echo " check-binary - Check if SeaweedFS binary exists"
+ @echo " build-weed - Build SeaweedFS binary"
+ @echo ""
+ @echo "Configuration:"
+ @echo " SEAWEEDFS_BINARY=$(SEAWEEDFS_BINARY)"
+ @echo " FILER_PORT=$(FILER_PORT)"
+ @echo " VOLUME_PORT=$(VOLUME_PORT)"
+ @echo " MASTER_PORT=$(MASTER_PORT)"
+ @echo " TEST_TIMEOUT=$(TEST_TIMEOUT)"
+
+check-binary:
+ @if ! command -v $(SEAWEEDFS_BINARY) > /dev/null 2>&1 && [ ! -f "$(SEAWEEDFS_ROOT)/weed/weed" ]; then \
+ echo "$(RED)Error: SeaweedFS binary not found$(NC)"; \
+ echo "Please build SeaweedFS first: make build-weed"; \
+ exit 1; \
+ fi
+ @echo "$(GREEN)SeaweedFS binary found$(NC)"
+
+start-seaweedfs: check-binary
+ @echo "$(YELLOW)Starting SeaweedFS server for TUS testing...$(NC)"
+ @# Clean up any existing processes on our test ports
+ @lsof -ti :$(MASTER_PORT) | xargs kill -TERM 2>/dev/null || true
+ @lsof -ti :$(VOLUME_PORT) | xargs kill -TERM 2>/dev/null || true
+ @lsof -ti :$(FILER_PORT) | xargs kill -TERM 2>/dev/null || true
+ @sleep 2
+
+ # Create necessary directories
+ @mkdir -p /tmp/seaweedfs-test-tus-master
+ @mkdir -p /tmp/seaweedfs-test-tus-volume
+ @mkdir -p /tmp/seaweedfs-test-tus-filer
+
+ # Start master server (use freshly built binary)
+ @echo "Starting master server..."
+ @nohup $(SEAWEEDFS_ROOT)/weed/weed master \
+ -port=$(MASTER_PORT) \
+ -mdir=/tmp/seaweedfs-test-tus-master \
+ -volumeSizeLimitMB=$(VOLUME_MAX_SIZE_MB) \
+ -ip=127.0.0.1 \
+ > /tmp/seaweedfs-tus-master.log 2>&1 &
+ @sleep 3
+
+ # Start volume server
+ @echo "Starting volume server..."
+ @nohup $(SEAWEEDFS_ROOT)/weed/weed volume \
+ -port=$(VOLUME_PORT) \
+ -mserver=127.0.0.1:$(MASTER_PORT) \
+ -dir=/tmp/seaweedfs-test-tus-volume \
+ -max=$(VOLUME_MAX_COUNT) \
+ -ip=127.0.0.1 \
+ > /tmp/seaweedfs-tus-volume.log 2>&1 &
+ @sleep 3
+
+ # Start filer server with TUS enabled (default tusBasePath is .tus)
+ @echo "Starting filer server..."
+ @nohup $(SEAWEEDFS_ROOT)/weed/weed filer \
+ -port=$(FILER_PORT) \
+ -master=127.0.0.1:$(MASTER_PORT) \
+ -ip=127.0.0.1 \
+ > /tmp/seaweedfs-tus-filer.log 2>&1 &
+ @sleep 5
+
+ # Wait for filer to be ready
+ @echo "$(YELLOW)Waiting for filer to be ready...$(NC)"
+ @for i in $$(seq 1 30); do \
+ if curl -s -f http://127.0.0.1:$(FILER_PORT)/ > /dev/null 2>&1; then \
+ echo "$(GREEN)Filer is ready$(NC)"; \
+ break; \
+ fi; \
+ if [ $$i -eq 30 ]; then \
+ echo "$(RED)Filer failed to start within 30 seconds$(NC)"; \
+ $(MAKE) debug-logs; \
+ exit 1; \
+ fi; \
+ echo "Waiting for filer... ($$i/30)"; \
+ sleep 1; \
+ done
+
+ @echo "$(GREEN)SeaweedFS server started successfully for TUS testing$(NC)"
+ @echo "Master: http://localhost:$(MASTER_PORT)"
+ @echo "Volume: http://localhost:$(VOLUME_PORT)"
+ @echo "Filer: http://localhost:$(FILER_PORT)"
+ @echo "TUS Endpoint: http://localhost:$(FILER_PORT)/.tus/"
+
+stop-seaweedfs:
+ @echo "$(YELLOW)Stopping SeaweedFS server...$(NC)"
+ @lsof -ti :$(MASTER_PORT) | xargs -r kill -TERM 2>/dev/null || true
+ @lsof -ti :$(VOLUME_PORT) | xargs -r kill -TERM 2>/dev/null || true
+ @lsof -ti :$(FILER_PORT) | xargs -r kill -TERM 2>/dev/null || true
+ @sleep 2
+ @echo "$(GREEN)SeaweedFS server stopped$(NC)"
+
+clean:
+ @echo "$(YELLOW)Cleaning up TUS test artifacts...$(NC)"
+ @rm -rf /tmp/seaweedfs-test-tus-*
+ @rm -f /tmp/seaweedfs-tus-*.log
+ @echo "$(GREEN)TUS test cleanup completed$(NC)"
+
+# Run all tests
+test: check-binary
+ @echo "$(YELLOW)Running all TUS integration tests...$(NC)"
+ @cd $(SEAWEEDFS_ROOT) && go test -v -timeout=$(TEST_TIMEOUT) ./test/tus/...
+ @echo "$(GREEN)All TUS tests completed$(NC)"
+
+# Run basic upload tests
+test-basic: check-binary
+ @echo "$(YELLOW)Running basic TUS upload tests...$(NC)"
+ @cd $(SEAWEEDFS_ROOT) && go test -v -timeout=$(TEST_TIMEOUT) -run "TestTusBasicUpload|TestTusOptionsHandler" ./test/tus/...
+ @echo "$(GREEN)Basic TUS tests completed$(NC)"
+
+# Run chunked upload tests
+test-chunked: check-binary
+ @echo "$(YELLOW)Running chunked TUS upload tests...$(NC)"
+ @cd $(SEAWEEDFS_ROOT) && go test -v -timeout=$(TEST_TIMEOUT) -run "TestTusChunkedUpload" ./test/tus/...
+ @echo "$(GREEN)Chunked TUS tests completed$(NC)"
+
+# Run resume tests
+test-resume: check-binary
+ @echo "$(YELLOW)Running TUS upload resume tests...$(NC)"
+ @cd $(SEAWEEDFS_ROOT) && go test -v -timeout=$(TEST_TIMEOUT) -run "TestTusResumeAfterInterruption|TestTusHeadRequest" ./test/tus/...
+ @echo "$(GREEN)TUS resume tests completed$(NC)"
+
+# Run error handling tests
+test-errors: check-binary
+ @echo "$(YELLOW)Running TUS error handling tests...$(NC)"
+ @cd $(SEAWEEDFS_ROOT) && go test -v -timeout=$(TEST_TIMEOUT) -run "TestTusInvalidOffset|TestTusUploadNotFound|TestTusDeleteUpload" ./test/tus/...
+ @echo "$(GREEN)TUS error tests completed$(NC)"
+
+# Run tests with automatic server management
+test-with-server: build-weed
+ @echo "$(YELLOW)Running TUS tests with automatic server management...$(NC)"
+ @$(MAKE) -C $(TEST_DIR) start-seaweedfs && \
+ sleep 3 && \
+ cd $(SEAWEEDFS_ROOT) && go test -v -timeout=$(TEST_TIMEOUT) ./test/tus/...; \
+ TEST_RESULT=$$?; \
+ $(MAKE) -C $(TEST_DIR) stop-seaweedfs; \
+ $(MAKE) -C $(TEST_DIR) clean; \
+ if [ $$TEST_RESULT -eq 0 ]; then echo "$(GREEN)All TUS tests passed!$(NC)"; fi; \
+ exit $$TEST_RESULT
+
+# Debug targets
+debug-logs:
+ @echo "$(YELLOW)=== Master Log ===$(NC)"
+ @tail -n 50 /tmp/seaweedfs-tus-master.log 2>/dev/null || echo "No master log found"
+ @echo "$(YELLOW)=== Volume Log ===$(NC)"
+ @tail -n 50 /tmp/seaweedfs-tus-volume.log 2>/dev/null || echo "No volume log found"
+ @echo "$(YELLOW)=== Filer Log ===$(NC)"
+ @tail -n 50 /tmp/seaweedfs-tus-filer.log 2>/dev/null || echo "No filer log found"
+
+debug-status:
+ @echo "$(YELLOW)=== Process Status ===$(NC)"
+ @ps aux | grep -E "(weed|seaweedfs)" | grep -v grep || echo "No SeaweedFS processes found"
+ @echo "$(YELLOW)=== Port Status ===$(NC)"
+ @lsof -i :$(MASTER_PORT) -i :$(VOLUME_PORT) -i :$(FILER_PORT) 2>/dev/null || echo "No ports in use"
+
+# Manual testing targets
+manual-start: start-seaweedfs
+ @echo "$(GREEN)SeaweedFS is now running for manual TUS testing$(NC)"
+ @echo ""
+ @echo "TUS Endpoints:"
+ @echo " OPTIONS /.tus/ - Capability discovery"
+ @echo " POST /.tus/{path} - Create upload"
+ @echo " HEAD /.tus/.uploads/{id} - Get offset"
+ @echo " PATCH /.tus/.uploads/{id} - Upload data"
+ @echo " DELETE /.tus/.uploads/{id} - Cancel upload"
+ @echo ""
+ @echo "Example curl commands:"
+ @echo " curl -X OPTIONS http://localhost:$(FILER_PORT)/.tus/ -H 'Tus-Resumable: 1.0.0'"
+ @echo ""
+ @echo "Run 'make manual-stop' when finished"
+
+manual-stop: stop-seaweedfs clean
+
+# CI targets
+ci-test: test-with-server
+
+# Skip integration tests (short mode)
+test-short:
+ @echo "$(YELLOW)Running TUS tests in short mode (skipping integration tests)...$(NC)"
+ @cd $(SEAWEEDFS_ROOT) && go test -v -short ./test/tus/...
+ @echo "$(GREEN)Short tests completed$(NC)"
+
diff --git a/test/tus/README.md b/test/tus/README.md
new file mode 100644
index 000000000..03c980a3d
--- /dev/null
+++ b/test/tus/README.md
@@ -0,0 +1,241 @@
+# TUS Protocol Integration Tests
+
+This directory contains integration tests for the TUS (resumable upload) protocol support in SeaweedFS Filer.
+
+## Overview
+
+TUS is an open protocol for resumable file uploads over HTTP. It allows clients to upload files in chunks and resume uploads after network failures or interruptions.
+
+### Why TUS?
+
+- **Resumable uploads**: Resume interrupted uploads without re-sending data
+- **Chunked uploads**: Upload large files in smaller pieces
+- **Simple protocol**: Standard HTTP methods with custom headers
+- **Wide client support**: Libraries available for JavaScript, Python, Go, and more
+
+## TUS Protocol Endpoints
+
+| Method | Path | Description |
+|--------|------|-------------|
+| `OPTIONS` | `/.tus/` | Server capability discovery |
+| `POST` | `/.tus/{path}` | Create new upload session |
+| `HEAD` | `/.tus/.uploads/{id}` | Get current upload offset |
+| `PATCH` | `/.tus/.uploads/{id}` | Upload data at offset |
+| `DELETE` | `/.tus/.uploads/{id}` | Cancel upload |
+
+### TUS Headers
+
+**Request Headers:**
+- `Tus-Resumable: 1.0.0` - Protocol version (required)
+- `Upload-Length` - Total file size in bytes (required on POST)
+- `Upload-Offset` - Current byte offset (required on PATCH)
+- `Upload-Metadata` - Base64-encoded key-value pairs (optional)
+- `Content-Type: application/offset+octet-stream` (required on PATCH)
+
+**Response Headers:**
+- `Tus-Resumable` - Protocol version
+- `Tus-Version` - Supported versions
+- `Tus-Extension` - Supported extensions
+- `Tus-Max-Size` - Maximum upload size
+- `Upload-Offset` - Current byte offset
+- `Location` - Upload URL (on POST)
+
+## Enabling TUS
+
+TUS protocol support is enabled by default at `/.tus` path. You can customize the path using the `-tusBasePath` flag:
+
+```bash
+# Start filer with default TUS path (/.tus)
+weed filer -master=localhost:9333
+
+# Use a custom path
+weed filer -master=localhost:9333 -tusBasePath=uploads/tus
+
+# Disable TUS by setting empty path
+weed filer -master=localhost:9333 -tusBasePath=
+```
+
+## Test Structure
+
+### Integration Tests
+
+The tests cover:
+
+1. **Basic Functionality**
+ - `TestTusOptionsHandler` - Capability discovery
+ - `TestTusBasicUpload` - Simple complete upload
+ - `TestTusCreationWithUpload` - Creation-with-upload extension
+
+2. **Chunked Uploads**
+ - `TestTusChunkedUpload` - Upload in multiple chunks
+
+3. **Resumable Uploads**
+ - `TestTusHeadRequest` - Offset tracking
+ - `TestTusResumeAfterInterruption` - Resume after failure
+
+4. **Error Handling**
+ - `TestTusInvalidOffset` - Offset mismatch (409 Conflict)
+ - `TestTusUploadNotFound` - Missing upload (404 Not Found)
+ - `TestTusDeleteUpload` - Upload cancellation
+
+## Running Tests
+
+### Prerequisites
+
+1. **Build SeaweedFS**:
+```bash
+make build-weed
+# or
+cd ../../weed && go build -o weed
+```
+
+### Using Makefile
+
+```bash
+# Show available targets
+make help
+
+# Run all tests with automatic server management
+make test-with-server
+
+# Run all tests (requires running server)
+make test
+
+# Run specific test categories
+make test-basic # Basic upload tests
+make test-chunked # Chunked upload tests
+make test-resume # Resume/HEAD tests
+make test-errors # Error handling tests
+
+# Manual testing
+make manual-start # Start SeaweedFS for manual testing
+make manual-stop # Stop and cleanup
+```
+
+### Using Go Test Directly
+
+```bash
+# Run all TUS tests
+go test -v ./test/tus/...
+
+# Run specific test
+go test -v ./test/tus -run TestTusBasicUpload
+
+# Skip integration tests (short mode)
+go test -v -short ./test/tus/...
+```
+
+### Debug
+
+```bash
+# View server logs
+make debug-logs
+
+# Check process and port status
+make debug-status
+```
+
+## Test Environment
+
+Each test run:
+1. Starts a SeaweedFS cluster (master, volume, filer)
+2. Creates uploads using TUS protocol
+3. Verifies files are stored correctly
+4. Cleans up test data
+
+### Default Ports
+
+| Service | Port |
+|---------|------|
+| Master | 19333 |
+| Volume | 18080 |
+| Filer | 18888 |
+
+### Configuration
+
+Override defaults via environment or Makefile variables:
+```bash
+FILER_PORT=8889 MASTER_PORT=9334 make test
+```
+
+## Example Usage
+
+### Create Upload
+
+```bash
+curl -X POST http://localhost:18888/.tus/mydir/file.txt \
+ -H "Tus-Resumable: 1.0.0" \
+ -H "Upload-Length: 1000" \
+ -H "Upload-Metadata: filename dGVzdC50eHQ="
+```
+
+### Upload Data
+
+```bash
+curl -X PATCH http://localhost:18888/.tus/.uploads/{upload-id} \
+ -H "Tus-Resumable: 1.0.0" \
+ -H "Upload-Offset: 0" \
+ -H "Content-Type: application/offset+octet-stream" \
+ --data-binary @file.txt
+```
+
+### Check Offset
+
+```bash
+curl -I http://localhost:18888/.tus/.uploads/{upload-id} \
+ -H "Tus-Resumable: 1.0.0"
+```
+
+### Cancel Upload
+
+```bash
+curl -X DELETE http://localhost:18888/.tus/.uploads/{upload-id} \
+ -H "Tus-Resumable: 1.0.0"
+```
+
+## TUS Extensions Supported
+
+- **creation**: Create new uploads with POST
+- **creation-with-upload**: Send data in creation request
+- **termination**: Cancel uploads with DELETE
+
+## Architecture
+
+```text
+Client Filer Volume Servers
+ | | |
+ |-- POST /.tus/path/file.mp4 ->| |
+ | |-- Create session dir ------->|
+ |<-- 201 Location: /.../{id} --| |
+ | | |
+ |-- PATCH /.tus/.uploads/{id} >| |
+ | Upload-Offset: 0 |-- Assign volume ------------>|
+ | [chunk data] |-- Upload chunk ------------->|
+ |<-- 204 Upload-Offset: N -----| |
+ | | |
+ | (network failure) | |
+ | | |
+ |-- HEAD /.tus/.uploads/{id} ->| |
+ |<-- Upload-Offset: N ---------| |
+ | | |
+ |-- PATCH (resume) ----------->|-- Upload remaining -------->|
+ |<-- 204 (complete) -----------|-- Assemble final file ----->|
+```
+
+## Comparison with S3 Multipart
+
+| Feature | TUS | S3 Multipart |
+|---------|-----|--------------|
+| Protocol | Custom HTTP headers | S3 API |
+| Session Init | POST with Upload-Length | CreateMultipartUpload |
+| Upload Data | PATCH with offset | UploadPart with partNumber |
+| Resume | HEAD to get offset | ListParts |
+| Complete | Automatic at final offset | CompleteMultipartUpload |
+| Ordering | Sequential (offset-based) | Parallel (part numbers) |
+
+## Related Resources
+
+- [TUS Protocol Specification](https://tus.io/protocols/resumable-upload)
+- [tus-js-client](https://github.com/tus/tus-js-client) - JavaScript client
+- [go-tus](https://github.com/eventials/go-tus) - Go client
+- [SeaweedFS S3 API](../../weed/s3api) - Alternative multipart upload
diff --git a/test/tus/tus_integration_test.go b/test/tus/tus_integration_test.go
new file mode 100644
index 000000000..a03c21dab
--- /dev/null
+++ b/test/tus/tus_integration_test.go
@@ -0,0 +1,772 @@
+package tus
+
+import (
+ "bytes"
+ "context"
+ "encoding/base64"
+ "fmt"
+ "io"
+ "net/http"
+ "os"
+ "os/exec"
+ "path/filepath"
+ "strconv"
+ "strings"
+ "testing"
+ "time"
+
+ "github.com/stretchr/testify/assert"
+ "github.com/stretchr/testify/require"
+)
+
+const (
+ TusVersion = "1.0.0"
+ testFilerPort = "18888"
+ testMasterPort = "19333"
+ testVolumePort = "18080"
+)
+
+// TestCluster represents a running SeaweedFS cluster for testing
+type TestCluster struct {
+ masterCmd *exec.Cmd
+ volumeCmd *exec.Cmd
+ filerCmd *exec.Cmd
+ dataDir string
+}
+
+func (c *TestCluster) Stop() {
+ if c.filerCmd != nil && c.filerCmd.Process != nil {
+ c.filerCmd.Process.Signal(os.Interrupt)
+ c.filerCmd.Wait()
+ }
+ if c.volumeCmd != nil && c.volumeCmd.Process != nil {
+ c.volumeCmd.Process.Signal(os.Interrupt)
+ c.volumeCmd.Wait()
+ }
+ if c.masterCmd != nil && c.masterCmd.Process != nil {
+ c.masterCmd.Process.Signal(os.Interrupt)
+ c.masterCmd.Wait()
+ }
+}
+
+func (c *TestCluster) FilerURL() string {
+ return fmt.Sprintf("http://127.0.0.1:%s", testFilerPort)
+}
+
+func (c *TestCluster) TusURL() string {
+ return fmt.Sprintf("%s/.tus", c.FilerURL())
+}
+
+// FullURL converts a relative path to a full URL
+func (c *TestCluster) FullURL(path string) string {
+ if strings.HasPrefix(path, "http://") || strings.HasPrefix(path, "https://") {
+ return path
+ }
+ return fmt.Sprintf("http://127.0.0.1:%s%s", testFilerPort, path)
+}
+
+// startTestCluster starts a SeaweedFS cluster for testing
+func startTestCluster(t *testing.T, ctx context.Context) (*TestCluster, error) {
+ weedBinary := findWeedBinary()
+ if weedBinary == "" {
+ return nil, fmt.Errorf("weed binary not found - please build it first: cd weed && go build")
+ }
+
+ dataDir, err := os.MkdirTemp("", "seaweedfs_tus_test_")
+ if err != nil {
+ return nil, err
+ }
+
+ cluster := &TestCluster{dataDir: dataDir}
+
+ // Create subdirectories
+ masterDir := filepath.Join(dataDir, "master")
+ volumeDir := filepath.Join(dataDir, "volume")
+ filerDir := filepath.Join(dataDir, "filer")
+ os.MkdirAll(masterDir, 0755)
+ os.MkdirAll(volumeDir, 0755)
+ os.MkdirAll(filerDir, 0755)
+
+ // Start master
+ masterCmd := exec.CommandContext(ctx, weedBinary, "master",
+ "-port", testMasterPort,
+ "-mdir", masterDir,
+ "-ip", "127.0.0.1",
+ )
+ masterLogFile, err := os.Create(filepath.Join(masterDir, "master.log"))
+ if err != nil {
+ os.RemoveAll(dataDir)
+ return nil, fmt.Errorf("failed to create master log: %v", err)
+ }
+ masterCmd.Stdout = masterLogFile
+ masterCmd.Stderr = masterLogFile
+ if err := masterCmd.Start(); err != nil {
+ os.RemoveAll(dataDir)
+ return nil, fmt.Errorf("failed to start master: %v", err)
+ }
+ cluster.masterCmd = masterCmd
+
+ // Wait for master to be ready
+ if err := waitForHTTPServer("http://127.0.0.1:"+testMasterPort+"/dir/status", 30*time.Second); err != nil {
+ cluster.Stop()
+ os.RemoveAll(dataDir)
+ return nil, fmt.Errorf("master not ready: %v", err)
+ }
+
+ // Start volume server
+ volumeCmd := exec.CommandContext(ctx, weedBinary, "volume",
+ "-port", testVolumePort,
+ "-dir", volumeDir,
+ "-mserver", "127.0.0.1:"+testMasterPort,
+ "-ip", "127.0.0.1",
+ )
+ volumeLogFile, err := os.Create(filepath.Join(volumeDir, "volume.log"))
+ if err != nil {
+ cluster.Stop()
+ os.RemoveAll(dataDir)
+ return nil, fmt.Errorf("failed to create volume log: %v", err)
+ }
+ volumeCmd.Stdout = volumeLogFile
+ volumeCmd.Stderr = volumeLogFile
+ if err := volumeCmd.Start(); err != nil {
+ cluster.Stop()
+ os.RemoveAll(dataDir)
+ return nil, fmt.Errorf("failed to start volume server: %v", err)
+ }
+ cluster.volumeCmd = volumeCmd
+
+ // Wait for volume server to register with master
+ if err := waitForHTTPServer("http://127.0.0.1:"+testVolumePort+"/status", 30*time.Second); err != nil {
+ cluster.Stop()
+ os.RemoveAll(dataDir)
+ return nil, fmt.Errorf("volume server not ready: %v", err)
+ }
+
+ // Start filer with TUS enabled
+ filerCmd := exec.CommandContext(ctx, weedBinary, "filer",
+ "-port", testFilerPort,
+ "-master", "127.0.0.1:"+testMasterPort,
+ "-ip", "127.0.0.1",
+ "-defaultStoreDir", filerDir,
+ )
+ filerLogFile, err := os.Create(filepath.Join(filerDir, "filer.log"))
+ if err != nil {
+ cluster.Stop()
+ os.RemoveAll(dataDir)
+ return nil, fmt.Errorf("failed to create filer log: %v", err)
+ }
+ filerCmd.Stdout = filerLogFile
+ filerCmd.Stderr = filerLogFile
+ if err := filerCmd.Start(); err != nil {
+ cluster.Stop()
+ os.RemoveAll(dataDir)
+ return nil, fmt.Errorf("failed to start filer: %v", err)
+ }
+ cluster.filerCmd = filerCmd
+
+ // Wait for filer
+ if err := waitForHTTPServer("http://127.0.0.1:"+testFilerPort+"/", 30*time.Second); err != nil {
+ cluster.Stop()
+ os.RemoveAll(dataDir)
+ return nil, fmt.Errorf("filer not ready: %v", err)
+ }
+
+ // Wait a bit more for the cluster to fully stabilize
+ // Volumes are created lazily, and we need to ensure the master topology is ready
+ time.Sleep(5 * time.Second)
+
+ return cluster, nil
+}
+
+func findWeedBinary() string {
+ candidates := []string{
+ "../../weed/weed",
+ "../weed/weed",
+ "./weed/weed",
+ "weed",
+ }
+ for _, candidate := range candidates {
+ if _, err := os.Stat(candidate); err == nil {
+ return candidate
+ }
+ }
+ if path, err := exec.LookPath("weed"); err == nil {
+ return path
+ }
+ return ""
+}
+
+func waitForHTTPServer(url string, timeout time.Duration) error {
+ start := time.Now()
+ client := &http.Client{Timeout: 1 * time.Second}
+ for time.Since(start) < timeout {
+ resp, err := client.Get(url)
+ if err == nil {
+ resp.Body.Close()
+ return nil
+ }
+ time.Sleep(500 * time.Millisecond)
+ }
+ return fmt.Errorf("timeout waiting for %s", url)
+}
+
+// encodeTusMetadata encodes key-value pairs for Upload-Metadata header
+func encodeTusMetadata(metadata map[string]string) string {
+ var parts []string
+ for k, v := range metadata {
+ encoded := base64.StdEncoding.EncodeToString([]byte(v))
+ parts = append(parts, fmt.Sprintf("%s %s", k, encoded))
+ }
+ return strings.Join(parts, ",")
+}
+
+// TestTusOptionsHandler tests the OPTIONS endpoint for capability discovery
+func TestTusOptionsHandler(t *testing.T) {
+ if testing.Short() {
+ t.Skip("Skipping integration test in short mode")
+ }
+
+ ctx, cancel := context.WithTimeout(context.Background(), 120*time.Second)
+ defer cancel()
+
+ cluster, err := startTestCluster(t, ctx)
+ require.NoError(t, err)
+ defer func() {
+ cluster.Stop()
+ os.RemoveAll(cluster.dataDir)
+ }()
+
+ // Test OPTIONS request
+ req, err := http.NewRequest(http.MethodOptions, cluster.TusURL()+"/", nil)
+ require.NoError(t, err)
+ req.Header.Set("Tus-Resumable", TusVersion)
+
+ client := &http.Client{}
+ resp, err := client.Do(req)
+ require.NoError(t, err)
+ defer resp.Body.Close()
+
+ // Verify TUS headers
+ assert.Equal(t, http.StatusOK, resp.StatusCode, "OPTIONS should return 200 OK")
+ assert.Equal(t, TusVersion, resp.Header.Get("Tus-Resumable"), "Should return Tus-Resumable header")
+ assert.NotEmpty(t, resp.Header.Get("Tus-Version"), "Should return Tus-Version header")
+ assert.NotEmpty(t, resp.Header.Get("Tus-Extension"), "Should return Tus-Extension header")
+ assert.NotEmpty(t, resp.Header.Get("Tus-Max-Size"), "Should return Tus-Max-Size header")
+}
+
+// TestTusBasicUpload tests a simple complete upload
+func TestTusBasicUpload(t *testing.T) {
+ if testing.Short() {
+ t.Skip("Skipping integration test in short mode")
+ }
+
+ ctx, cancel := context.WithTimeout(context.Background(), 120*time.Second)
+ defer cancel()
+
+ cluster, err := startTestCluster(t, ctx)
+ require.NoError(t, err)
+ defer func() {
+ cluster.Stop()
+ os.RemoveAll(cluster.dataDir)
+ }()
+
+ testData := []byte("Hello, TUS Protocol! This is a test file.")
+ targetPath := "/testdir/testfile.txt"
+
+ // Step 1: Create upload (POST)
+ createReq, err := http.NewRequest(http.MethodPost, cluster.TusURL()+targetPath, nil)
+ require.NoError(t, err)
+ createReq.Header.Set("Tus-Resumable", TusVersion)
+ createReq.Header.Set("Upload-Length", strconv.Itoa(len(testData)))
+ createReq.Header.Set("Upload-Metadata", encodeTusMetadata(map[string]string{
+ "filename": "testfile.txt",
+ "content-type": "text/plain",
+ }))
+
+ client := &http.Client{}
+ createResp, err := client.Do(createReq)
+ require.NoError(t, err)
+ defer createResp.Body.Close()
+
+ assert.Equal(t, http.StatusCreated, createResp.StatusCode, "POST should return 201 Created")
+ uploadLocation := createResp.Header.Get("Location")
+ assert.NotEmpty(t, uploadLocation, "Should return Location header with upload URL")
+ t.Logf("Upload location: %s", uploadLocation)
+
+ // Step 2: Upload data (PATCH)
+ patchReq, err := http.NewRequest(http.MethodPatch, cluster.FullURL(uploadLocation), bytes.NewReader(testData))
+ require.NoError(t, err)
+ patchReq.Header.Set("Tus-Resumable", TusVersion)
+ patchReq.Header.Set("Upload-Offset", "0")
+ patchReq.Header.Set("Content-Type", "application/offset+octet-stream")
+ patchReq.Header.Set("Content-Length", strconv.Itoa(len(testData)))
+
+ patchResp, err := client.Do(patchReq)
+ require.NoError(t, err)
+ defer patchResp.Body.Close()
+
+ assert.Equal(t, http.StatusNoContent, patchResp.StatusCode, "PATCH should return 204 No Content")
+ newOffset := patchResp.Header.Get("Upload-Offset")
+ assert.Equal(t, strconv.Itoa(len(testData)), newOffset, "Upload-Offset should equal total file size")
+
+ // Step 3: Verify the file was created
+ getResp, err := client.Get(cluster.FilerURL() + targetPath)
+ require.NoError(t, err)
+ defer getResp.Body.Close()
+
+ assert.Equal(t, http.StatusOK, getResp.StatusCode, "GET should return 200 OK")
+ body, err := io.ReadAll(getResp.Body)
+ require.NoError(t, err)
+ assert.Equal(t, testData, body, "File content should match uploaded data")
+}
+
+// TestTusChunkedUpload tests uploading a file in multiple chunks
+func TestTusChunkedUpload(t *testing.T) {
+ if testing.Short() {
+ t.Skip("Skipping integration test in short mode")
+ }
+
+ ctx, cancel := context.WithTimeout(context.Background(), 120*time.Second)
+ defer cancel()
+
+ cluster, err := startTestCluster(t, ctx)
+ require.NoError(t, err)
+ defer func() {
+ cluster.Stop()
+ os.RemoveAll(cluster.dataDir)
+ }()
+
+ // Create test data (100KB)
+ testData := make([]byte, 100*1024)
+ for i := range testData {
+ testData[i] = byte(i % 256)
+ }
+ chunkSize := 32 * 1024 // 32KB chunks
+ targetPath := "/chunked/largefile.bin"
+
+ client := &http.Client{}
+
+ // Step 1: Create upload
+ createReq, err := http.NewRequest(http.MethodPost, cluster.TusURL()+targetPath, nil)
+ require.NoError(t, err)
+ createReq.Header.Set("Tus-Resumable", TusVersion)
+ createReq.Header.Set("Upload-Length", strconv.Itoa(len(testData)))
+
+ createResp, err := client.Do(createReq)
+ require.NoError(t, err)
+ defer createResp.Body.Close()
+
+ require.Equal(t, http.StatusCreated, createResp.StatusCode)
+ uploadLocation := createResp.Header.Get("Location")
+ require.NotEmpty(t, uploadLocation)
+ t.Logf("Upload location: %s", uploadLocation)
+
+ // Step 2: Upload in chunks
+ offset := 0
+ for offset < len(testData) {
+ end := offset + chunkSize
+ if end > len(testData) {
+ end = len(testData)
+ }
+ chunk := testData[offset:end]
+
+ patchReq, err := http.NewRequest(http.MethodPatch, cluster.FullURL(uploadLocation), bytes.NewReader(chunk))
+ require.NoError(t, err)
+ patchReq.Header.Set("Tus-Resumable", TusVersion)
+ patchReq.Header.Set("Upload-Offset", strconv.Itoa(offset))
+ patchReq.Header.Set("Content-Type", "application/offset+octet-stream")
+ patchReq.Header.Set("Content-Length", strconv.Itoa(len(chunk)))
+
+ patchResp, err := client.Do(patchReq)
+ require.NoError(t, err)
+ patchResp.Body.Close()
+
+ require.Equal(t, http.StatusNoContent, patchResp.StatusCode,
+ "PATCH chunk at offset %d should return 204", offset)
+ newOffset, _ := strconv.Atoi(patchResp.Header.Get("Upload-Offset"))
+ require.Equal(t, end, newOffset, "New offset should be %d", end)
+
+ t.Logf("Uploaded chunk: offset=%d, size=%d, newOffset=%d", offset, len(chunk), newOffset)
+ offset = end
+ }
+
+ // Step 3: Verify the complete file
+ getResp, err := client.Get(cluster.FilerURL() + targetPath)
+ require.NoError(t, err)
+ defer getResp.Body.Close()
+
+ assert.Equal(t, http.StatusOK, getResp.StatusCode)
+ body, err := io.ReadAll(getResp.Body)
+ require.NoError(t, err)
+ assert.Equal(t, testData, body, "File content should match uploaded data")
+}
+
+// TestTusHeadRequest tests the HEAD endpoint to get upload offset
+func TestTusHeadRequest(t *testing.T) {
+ if testing.Short() {
+ t.Skip("Skipping integration test in short mode")
+ }
+
+ ctx, cancel := context.WithTimeout(context.Background(), 120*time.Second)
+ defer cancel()
+
+ cluster, err := startTestCluster(t, ctx)
+ require.NoError(t, err)
+ defer func() {
+ cluster.Stop()
+ os.RemoveAll(cluster.dataDir)
+ }()
+
+ testData := []byte("Test data for HEAD request verification")
+ targetPath := "/headtest/file.txt"
+ client := &http.Client{}
+
+ // Create upload
+ createReq, err := http.NewRequest(http.MethodPost, cluster.TusURL()+targetPath, nil)
+ require.NoError(t, err)
+ createReq.Header.Set("Tus-Resumable", TusVersion)
+ createReq.Header.Set("Upload-Length", strconv.Itoa(len(testData)))
+
+ createResp, err := client.Do(createReq)
+ require.NoError(t, err)
+ defer createResp.Body.Close()
+ require.Equal(t, http.StatusCreated, createResp.StatusCode)
+ uploadLocation := createResp.Header.Get("Location")
+
+ // HEAD before any data uploaded - offset should be 0
+ headReq1, err := http.NewRequest(http.MethodHead, cluster.FullURL(uploadLocation), nil)
+ require.NoError(t, err)
+ headReq1.Header.Set("Tus-Resumable", TusVersion)
+
+ headResp1, err := client.Do(headReq1)
+ require.NoError(t, err)
+ defer headResp1.Body.Close()
+
+ assert.Equal(t, http.StatusOK, headResp1.StatusCode)
+ assert.Equal(t, "0", headResp1.Header.Get("Upload-Offset"), "Initial offset should be 0")
+ assert.Equal(t, strconv.Itoa(len(testData)), headResp1.Header.Get("Upload-Length"))
+
+ // Upload half the data
+ halfLen := len(testData) / 2
+ patchReq, err := http.NewRequest(http.MethodPatch, cluster.FullURL(uploadLocation), bytes.NewReader(testData[:halfLen]))
+ require.NoError(t, err)
+ patchReq.Header.Set("Tus-Resumable", TusVersion)
+ patchReq.Header.Set("Upload-Offset", "0")
+ patchReq.Header.Set("Content-Type", "application/offset+octet-stream")
+
+ patchResp, err := client.Do(patchReq)
+ require.NoError(t, err)
+ patchResp.Body.Close()
+ require.Equal(t, http.StatusNoContent, patchResp.StatusCode)
+
+ // HEAD after partial upload - offset should be halfLen
+ headReq2, err := http.NewRequest(http.MethodHead, cluster.FullURL(uploadLocation), nil)
+ require.NoError(t, err)
+ headReq2.Header.Set("Tus-Resumable", TusVersion)
+
+ headResp2, err := client.Do(headReq2)
+ require.NoError(t, err)
+ defer headResp2.Body.Close()
+
+ assert.Equal(t, http.StatusOK, headResp2.StatusCode)
+ assert.Equal(t, strconv.Itoa(halfLen), headResp2.Header.Get("Upload-Offset"),
+ "Offset should be %d after partial upload", halfLen)
+}
+
+// TestTusDeleteUpload tests canceling an in-progress upload
+func TestTusDeleteUpload(t *testing.T) {
+ if testing.Short() {
+ t.Skip("Skipping integration test in short mode")
+ }
+
+ ctx, cancel := context.WithTimeout(context.Background(), 120*time.Second)
+ defer cancel()
+
+ cluster, err := startTestCluster(t, ctx)
+ require.NoError(t, err)
+ defer func() {
+ cluster.Stop()
+ os.RemoveAll(cluster.dataDir)
+ }()
+
+ testData := []byte("Data to be deleted")
+ targetPath := "/deletetest/file.txt"
+ client := &http.Client{}
+
+ // Create upload
+ createReq, err := http.NewRequest(http.MethodPost, cluster.TusURL()+targetPath, nil)
+ require.NoError(t, err)
+ createReq.Header.Set("Tus-Resumable", TusVersion)
+ createReq.Header.Set("Upload-Length", strconv.Itoa(len(testData)))
+
+ createResp, err := client.Do(createReq)
+ require.NoError(t, err)
+ defer createResp.Body.Close()
+ require.Equal(t, http.StatusCreated, createResp.StatusCode)
+ uploadLocation := createResp.Header.Get("Location")
+
+ // Upload some data
+ patchReq, err := http.NewRequest(http.MethodPatch, cluster.FullURL(uploadLocation), bytes.NewReader(testData[:10]))
+ require.NoError(t, err)
+ patchReq.Header.Set("Tus-Resumable", TusVersion)
+ patchReq.Header.Set("Upload-Offset", "0")
+ patchReq.Header.Set("Content-Type", "application/offset+octet-stream")
+
+ patchResp, err := client.Do(patchReq)
+ require.NoError(t, err)
+ patchResp.Body.Close()
+
+ // Delete the upload
+ deleteReq, err := http.NewRequest(http.MethodDelete, cluster.FullURL(uploadLocation), nil)
+ require.NoError(t, err)
+ deleteReq.Header.Set("Tus-Resumable", TusVersion)
+
+ deleteResp, err := client.Do(deleteReq)
+ require.NoError(t, err)
+ defer deleteResp.Body.Close()
+
+ assert.Equal(t, http.StatusNoContent, deleteResp.StatusCode, "DELETE should return 204")
+
+ // Verify upload is gone - HEAD should return 404
+ headReq, err := http.NewRequest(http.MethodHead, cluster.FullURL(uploadLocation), nil)
+ require.NoError(t, err)
+ headReq.Header.Set("Tus-Resumable", TusVersion)
+
+ headResp, err := client.Do(headReq)
+ require.NoError(t, err)
+ defer headResp.Body.Close()
+
+ assert.Equal(t, http.StatusNotFound, headResp.StatusCode, "HEAD after DELETE should return 404")
+}
+
+// TestTusInvalidOffset tests error handling for mismatched offsets
+func TestTusInvalidOffset(t *testing.T) {
+ if testing.Short() {
+ t.Skip("Skipping integration test in short mode")
+ }
+
+ ctx, cancel := context.WithTimeout(context.Background(), 120*time.Second)
+ defer cancel()
+
+ cluster, err := startTestCluster(t, ctx)
+ require.NoError(t, err)
+ defer func() {
+ cluster.Stop()
+ os.RemoveAll(cluster.dataDir)
+ }()
+
+ testData := []byte("Test data for offset validation")
+ targetPath := "/offsettest/file.txt"
+ client := &http.Client{}
+
+ // Create upload
+ createReq, err := http.NewRequest(http.MethodPost, cluster.TusURL()+targetPath, nil)
+ require.NoError(t, err)
+ createReq.Header.Set("Tus-Resumable", TusVersion)
+ createReq.Header.Set("Upload-Length", strconv.Itoa(len(testData)))
+
+ createResp, err := client.Do(createReq)
+ require.NoError(t, err)
+ defer createResp.Body.Close()
+ require.Equal(t, http.StatusCreated, createResp.StatusCode)
+ uploadLocation := createResp.Header.Get("Location")
+
+ // Try to upload with wrong offset (should be 0, but we send 100)
+ patchReq, err := http.NewRequest(http.MethodPatch, cluster.FullURL(uploadLocation), bytes.NewReader(testData))
+ require.NoError(t, err)
+ patchReq.Header.Set("Tus-Resumable", TusVersion)
+ patchReq.Header.Set("Upload-Offset", "100") // Wrong offset!
+ patchReq.Header.Set("Content-Type", "application/offset+octet-stream")
+
+ patchResp, err := client.Do(patchReq)
+ require.NoError(t, err)
+ defer patchResp.Body.Close()
+
+ assert.Equal(t, http.StatusConflict, patchResp.StatusCode,
+ "PATCH with wrong offset should return 409 Conflict")
+}
+
+// TestTusUploadNotFound tests accessing a non-existent upload
+func TestTusUploadNotFound(t *testing.T) {
+ if testing.Short() {
+ t.Skip("Skipping integration test in short mode")
+ }
+
+ ctx, cancel := context.WithTimeout(context.Background(), 120*time.Second)
+ defer cancel()
+
+ cluster, err := startTestCluster(t, ctx)
+ require.NoError(t, err)
+ defer func() {
+ cluster.Stop()
+ os.RemoveAll(cluster.dataDir)
+ }()
+
+ client := &http.Client{}
+ fakeUploadURL := cluster.TusURL() + "/.uploads/nonexistent-upload-id"
+
+ // HEAD on non-existent upload
+ headReq, err := http.NewRequest(http.MethodHead, fakeUploadURL, nil)
+ require.NoError(t, err)
+ headReq.Header.Set("Tus-Resumable", TusVersion)
+
+ headResp, err := client.Do(headReq)
+ require.NoError(t, err)
+ defer headResp.Body.Close()
+
+ assert.Equal(t, http.StatusNotFound, headResp.StatusCode,
+ "HEAD on non-existent upload should return 404")
+
+ // PATCH on non-existent upload
+ patchReq, err := http.NewRequest(http.MethodPatch, fakeUploadURL, bytes.NewReader([]byte("data")))
+ require.NoError(t, err)
+ patchReq.Header.Set("Tus-Resumable", TusVersion)
+ patchReq.Header.Set("Upload-Offset", "0")
+ patchReq.Header.Set("Content-Type", "application/offset+octet-stream")
+
+ patchResp, err := client.Do(patchReq)
+ require.NoError(t, err)
+ defer patchResp.Body.Close()
+
+ assert.Equal(t, http.StatusNotFound, patchResp.StatusCode,
+ "PATCH on non-existent upload should return 404")
+}
+
+// TestTusCreationWithUpload tests the creation-with-upload extension
+func TestTusCreationWithUpload(t *testing.T) {
+ if testing.Short() {
+ t.Skip("Skipping integration test in short mode")
+ }
+
+ ctx, cancel := context.WithTimeout(context.Background(), 120*time.Second)
+ defer cancel()
+
+ cluster, err := startTestCluster(t, ctx)
+ require.NoError(t, err)
+ defer func() {
+ cluster.Stop()
+ os.RemoveAll(cluster.dataDir)
+ }()
+
+ testData := []byte("Small file uploaded in creation request")
+ targetPath := "/creationwithupload/smallfile.txt"
+ client := &http.Client{}
+
+ // Create upload with data in the same request
+ createReq, err := http.NewRequest(http.MethodPost, cluster.TusURL()+targetPath, bytes.NewReader(testData))
+ require.NoError(t, err)
+ createReq.Header.Set("Tus-Resumable", TusVersion)
+ createReq.Header.Set("Upload-Length", strconv.Itoa(len(testData)))
+ createReq.Header.Set("Content-Type", "application/offset+octet-stream")
+
+ createResp, err := client.Do(createReq)
+ require.NoError(t, err)
+ defer createResp.Body.Close()
+
+ assert.Equal(t, http.StatusCreated, createResp.StatusCode)
+ uploadLocation := createResp.Header.Get("Location")
+ assert.NotEmpty(t, uploadLocation)
+
+ // Check Upload-Offset header - should indicate all data was received
+ uploadOffset := createResp.Header.Get("Upload-Offset")
+ assert.Equal(t, strconv.Itoa(len(testData)), uploadOffset,
+ "Upload-Offset should equal file size for complete upload")
+
+ // Verify the file
+ getResp, err := client.Get(cluster.FilerURL() + targetPath)
+ require.NoError(t, err)
+ defer getResp.Body.Close()
+
+ assert.Equal(t, http.StatusOK, getResp.StatusCode)
+ body, err := io.ReadAll(getResp.Body)
+ require.NoError(t, err)
+ assert.Equal(t, testData, body)
+}
+
+// TestTusResumeAfterInterruption simulates resuming an upload after failure
+func TestTusResumeAfterInterruption(t *testing.T) {
+ if testing.Short() {
+ t.Skip("Skipping integration test in short mode")
+ }
+
+ ctx, cancel := context.WithTimeout(context.Background(), 120*time.Second)
+ defer cancel()
+
+ cluster, err := startTestCluster(t, ctx)
+ require.NoError(t, err)
+ defer func() {
+ cluster.Stop()
+ os.RemoveAll(cluster.dataDir)
+ }()
+
+ // 50KB test data
+ testData := make([]byte, 50*1024)
+ for i := range testData {
+ testData[i] = byte(i % 256)
+ }
+ targetPath := "/resume/interrupted.bin"
+ client := &http.Client{}
+
+ // Create upload
+ createReq, err := http.NewRequest(http.MethodPost, cluster.TusURL()+targetPath, nil)
+ require.NoError(t, err)
+ createReq.Header.Set("Tus-Resumable", TusVersion)
+ createReq.Header.Set("Upload-Length", strconv.Itoa(len(testData)))
+
+ createResp, err := client.Do(createReq)
+ require.NoError(t, err)
+ defer createResp.Body.Close()
+ require.Equal(t, http.StatusCreated, createResp.StatusCode)
+ uploadLocation := createResp.Header.Get("Location")
+
+ // Upload first 20KB
+ firstChunkSize := 20 * 1024
+ patchReq1, err := http.NewRequest(http.MethodPatch, cluster.FullURL(uploadLocation), bytes.NewReader(testData[:firstChunkSize]))
+ require.NoError(t, err)
+ patchReq1.Header.Set("Tus-Resumable", TusVersion)
+ patchReq1.Header.Set("Upload-Offset", "0")
+ patchReq1.Header.Set("Content-Type", "application/offset+octet-stream")
+
+ patchResp1, err := client.Do(patchReq1)
+ require.NoError(t, err)
+ patchResp1.Body.Close()
+ require.Equal(t, http.StatusNoContent, patchResp1.StatusCode)
+
+ t.Log("Simulating network interruption...")
+
+ // Simulate resumption: Query current offset with HEAD
+ headReq, err := http.NewRequest(http.MethodHead, cluster.FullURL(uploadLocation), nil)
+ require.NoError(t, err)
+ headReq.Header.Set("Tus-Resumable", TusVersion)
+
+ headResp, err := client.Do(headReq)
+ require.NoError(t, err)
+ defer headResp.Body.Close()
+
+ require.Equal(t, http.StatusOK, headResp.StatusCode)
+ currentOffset, _ := strconv.Atoi(headResp.Header.Get("Upload-Offset"))
+ t.Logf("Resumed upload at offset: %d", currentOffset)
+ require.Equal(t, firstChunkSize, currentOffset)
+
+ // Resume upload from current offset
+ patchReq2, err := http.NewRequest(http.MethodPatch, cluster.FullURL(uploadLocation), bytes.NewReader(testData[currentOffset:]))
+ require.NoError(t, err)
+ patchReq2.Header.Set("Tus-Resumable", TusVersion)
+ patchReq2.Header.Set("Upload-Offset", strconv.Itoa(currentOffset))
+ patchReq2.Header.Set("Content-Type", "application/offset+octet-stream")
+
+ patchResp2, err := client.Do(patchReq2)
+ require.NoError(t, err)
+ patchResp2.Body.Close()
+ require.Equal(t, http.StatusNoContent, patchResp2.StatusCode)
+
+ // Verify complete file
+ getResp, err := client.Get(cluster.FilerURL() + targetPath)
+ require.NoError(t, err)
+ defer getResp.Body.Close()
+
+ assert.Equal(t, http.StatusOK, getResp.StatusCode)
+ body, err := io.ReadAll(getResp.Body)
+ require.NoError(t, err)
+ assert.Equal(t, testData, body, "Resumed upload should produce complete file")
+}
diff --git a/weed/command/filer.go b/weed/command/filer.go
index 86991a181..314bb83b6 100644
--- a/weed/command/filer.go
+++ b/weed/command/filer.go
@@ -74,6 +74,7 @@ type FilerOptions struct {
diskType *string
allowedOrigins *string
exposeDirectoryData *bool
+ tusPath *string
certProvider certprovider.Provider
}
@@ -109,6 +110,7 @@ func init() {
f.diskType = cmdFiler.Flag.String("disk", "", "[hdd|ssd|<tag>] hard drive or solid state drive or any tag")
f.allowedOrigins = cmdFiler.Flag.String("allowedOrigins", "*", "comma separated list of allowed origins")
f.exposeDirectoryData = cmdFiler.Flag.Bool("exposeDirectoryData", true, "whether to return directory metadata and content in Filer UI")
+ f.tusPath = cmdFiler.Flag.String("tusBasePath", ".tus", "TUS resumable upload endpoint base path")
// start s3 on filer
filerStartS3 = cmdFiler.Flag.Bool("s3", false, "whether to start S3 gateway")
@@ -334,6 +336,7 @@ func (fo *FilerOptions) startFiler() {
DownloadMaxBytesPs: int64(*fo.downloadMaxMBps) * 1024 * 1024,
DiskType: *fo.diskType,
AllowedOrigins: strings.Split(*fo.allowedOrigins, ","),
+ TusPath: *fo.tusPath,
})
if nfs_err != nil {
glog.Fatalf("Filer startup error: %v", nfs_err)
diff --git a/weed/command/server.go b/weed/command/server.go
index d729502f0..ebd9f359a 100644
--- a/weed/command/server.go
+++ b/weed/command/server.go
@@ -129,6 +129,7 @@ func init() {
filerOptions.downloadMaxMBps = cmdServer.Flag.Int("filer.downloadMaxMBps", 0, "download max speed for each download request, in MB per second")
filerOptions.diskType = cmdServer.Flag.String("filer.disk", "", "[hdd|ssd|<tag>] hard drive or solid state drive or any tag")
filerOptions.exposeDirectoryData = cmdServer.Flag.Bool("filer.exposeDirectoryData", true, "expose directory data via filer. If false, filer UI will be innaccessible.")
+ filerOptions.tusPath = cmdServer.Flag.String("filer.tusBasePath", ".tus", "TUS resumable upload endpoint base path")
serverOptions.v.port = cmdServer.Flag.Int("volume.port", 8080, "volume server http listen port")
serverOptions.v.portGrpc = cmdServer.Flag.Int("volume.port.grpc", 0, "volume server grpc listen port")
diff --git a/weed/s3api/s3api_object_handlers_copy.go b/weed/s3api/s3api_object_handlers_copy.go
index 09d009372..66d4ded80 100644
--- a/weed/s3api/s3api_object_handlers_copy.go
+++ b/weed/s3api/s3api_object_handlers_copy.go
@@ -167,6 +167,14 @@ func (s3a *S3ApiServer) CopyObjectHandler(w http.ResponseWriter, r *http.Request
}
// Copy extended attributes from source, filtering out conflicting encryption metadata
+ // Pre-compute encryption state once for efficiency
+ srcHasSSEC := IsSSECEncrypted(entry.Extended)
+ srcHasSSEKMS := IsSSEKMSEncrypted(entry.Extended)
+ srcHasSSES3 := IsSSES3EncryptedInternal(entry.Extended)
+ dstWantsSSEC := IsSSECRequest(r)
+ dstWantsSSEKMS := IsSSEKMSRequest(r)
+ dstWantsSSES3 := IsSSES3RequestInternal(r)
+
for k, v := range entry.Extended {
// Skip encryption-specific headers that might conflict with destination encryption type
skipHeader := false
@@ -177,17 +185,9 @@ func (s3a *S3ApiServer) CopyObjectHandler(w http.ResponseWriter, r *http.Request
skipHeader = true
}
- // If we're doing cross-encryption, skip conflicting headers
- if !skipHeader && len(entry.GetChunks()) > 0 {
- // Detect source and destination encryption types
- srcHasSSEC := IsSSECEncrypted(entry.Extended)
- srcHasSSEKMS := IsSSEKMSEncrypted(entry.Extended)
- srcHasSSES3 := IsSSES3EncryptedInternal(entry.Extended)
- dstWantsSSEC := IsSSECRequest(r)
- dstWantsSSEKMS := IsSSEKMSRequest(r)
- dstWantsSSES3 := IsSSES3RequestInternal(r)
-
- // Use helper function to determine if header should be skipped
+ // Filter conflicting headers for cross-encryption or encrypted→unencrypted copies
+ // This applies to both inline files (no chunks) and chunked files - fixes GitHub #7562
+ if !skipHeader {
skipHeader = shouldSkipEncryptionHeader(k,
srcHasSSEC, srcHasSSEKMS, srcHasSSES3,
dstWantsSSEC, dstWantsSSEKMS, dstWantsSSES3)
@@ -212,10 +212,31 @@ func (s3a *S3ApiServer) CopyObjectHandler(w http.ResponseWriter, r *http.Request
dstEntry.Extended[k] = v
}
- // For zero-size files or files without chunks, use the original approach
+ // For zero-size files or files without chunks, handle inline content
+ // This includes encrypted inline files that need decryption/re-encryption
if entry.Attributes.FileSize == 0 || len(entry.GetChunks()) == 0 {
- // Just copy the entry structure without chunks for zero-size files
dstEntry.Chunks = nil
+
+ // Handle inline encrypted content - fixes GitHub #7562
+ if len(entry.Content) > 0 {
+ inlineContent, inlineMetadata, inlineErr := s3a.processInlineContentForCopy(
+ entry, r, dstBucket, dstObject,
+ srcHasSSEC, srcHasSSEKMS, srcHasSSES3,
+ dstWantsSSEC, dstWantsSSEKMS, dstWantsSSES3)
+ if inlineErr != nil {
+ glog.Errorf("CopyObjectHandler inline content error: %v", inlineErr)
+ s3err.WriteErrorResponse(w, r, s3err.ErrInternalError)
+ return
+ }
+ dstEntry.Content = inlineContent
+
+ // Apply inline destination metadata
+ if inlineMetadata != nil {
+ for k, v := range inlineMetadata {
+ dstEntry.Extended[k] = v
+ }
+ }
+ }
} else {
// Use unified copy strategy approach
dstChunks, dstMetadata, copyErr := s3a.executeUnifiedCopyStrategy(entry, r, dstBucket, srcObject, dstObject)
@@ -2508,3 +2529,233 @@ func shouldSkipEncryptionHeader(headerKey string,
// Default: don't skip the header
return false
}
+
+// processInlineContentForCopy handles encryption/decryption for inline content during copy
+// This fixes GitHub #7562 where small files stored inline weren't properly decrypted/re-encrypted
+func (s3a *S3ApiServer) processInlineContentForCopy(
+ entry *filer_pb.Entry, r *http.Request, dstBucket, dstObject string,
+ srcSSEC, srcSSEKMS, srcSSES3 bool,
+ dstSSEC, dstSSEKMS, dstSSES3 bool) ([]byte, map[string][]byte, error) {
+
+ content := entry.Content
+ var dstMetadata map[string][]byte
+
+ // Check if source is encrypted and needs decryption
+ srcEncrypted := srcSSEC || srcSSEKMS || srcSSES3
+
+ // Check if destination needs encryption (explicit request or bucket default)
+ dstNeedsEncryption := dstSSEC || dstSSEKMS || dstSSES3
+ if !dstNeedsEncryption {
+ // Check bucket default encryption
+ bucketMetadata, err := s3a.getBucketMetadata(dstBucket)
+ if err == nil && bucketMetadata != nil && bucketMetadata.Encryption != nil {
+ switch bucketMetadata.Encryption.SseAlgorithm {
+ case "aws:kms":
+ dstSSEKMS = true
+ dstNeedsEncryption = true
+ case "AES256":
+ dstSSES3 = true
+ dstNeedsEncryption = true
+ }
+ }
+ }
+
+ // Decrypt source content if encrypted
+ if srcEncrypted {
+ decryptedContent, decErr := s3a.decryptInlineContent(entry, srcSSEC, srcSSEKMS, srcSSES3, r)
+ if decErr != nil {
+ return nil, nil, fmt.Errorf("failed to decrypt inline content: %w", decErr)
+ }
+ content = decryptedContent
+ glog.V(3).Infof("Decrypted inline content: %d bytes", len(content))
+ }
+
+ // Re-encrypt if destination needs encryption
+ if dstNeedsEncryption {
+ encryptedContent, encMetadata, encErr := s3a.encryptInlineContent(content, dstBucket, dstObject, dstSSEC, dstSSEKMS, dstSSES3, r)
+ if encErr != nil {
+ return nil, nil, fmt.Errorf("failed to encrypt inline content: %w", encErr)
+ }
+ content = encryptedContent
+ dstMetadata = encMetadata
+ glog.V(3).Infof("Encrypted inline content: %d bytes", len(content))
+ }
+
+ return content, dstMetadata, nil
+}
+
+// decryptInlineContent decrypts inline content from an encrypted source
+func (s3a *S3ApiServer) decryptInlineContent(entry *filer_pb.Entry, srcSSEC, srcSSEKMS, srcSSES3 bool, r *http.Request) ([]byte, error) {
+ content := entry.Content
+
+ if srcSSES3 {
+ // Get SSE-S3 key from metadata
+ keyData, exists := entry.Extended[s3_constants.SeaweedFSSSES3Key]
+ if !exists {
+ return nil, fmt.Errorf("SSE-S3 key not found in metadata")
+ }
+
+ keyManager := GetSSES3KeyManager()
+ sseKey, err := DeserializeSSES3Metadata(keyData, keyManager)
+ if err != nil {
+ return nil, fmt.Errorf("failed to deserialize SSE-S3 key: %w", err)
+ }
+
+ // Get IV
+ iv := sseKey.IV
+ if len(iv) == 0 {
+ return nil, fmt.Errorf("SSE-S3 IV not found")
+ }
+
+ // Decrypt content
+ decryptedReader, err := CreateSSES3DecryptedReader(bytes.NewReader(content), sseKey, iv)
+ if err != nil {
+ return nil, fmt.Errorf("failed to create SSE-S3 decrypted reader: %w", err)
+ }
+ return io.ReadAll(decryptedReader)
+
+ } else if srcSSEKMS {
+ // Get SSE-KMS key from metadata
+ keyData, exists := entry.Extended[s3_constants.SeaweedFSSSEKMSKey]
+ if !exists {
+ return nil, fmt.Errorf("SSE-KMS key not found in metadata")
+ }
+
+ sseKey, err := DeserializeSSEKMSMetadata(keyData)
+ if err != nil {
+ return nil, fmt.Errorf("failed to deserialize SSE-KMS key: %w", err)
+ }
+
+ // Decrypt content
+ decryptedReader, err := CreateSSEKMSDecryptedReader(bytes.NewReader(content), sseKey)
+ if err != nil {
+ return nil, fmt.Errorf("failed to create SSE-KMS decrypted reader: %w", err)
+ }
+ return io.ReadAll(decryptedReader)
+
+ } else if srcSSEC {
+ // Get SSE-C key from request headers
+ sourceKey, err := ParseSSECCopySourceHeaders(r)
+ if err != nil {
+ return nil, fmt.Errorf("failed to parse SSE-C copy source headers: %w", err)
+ }
+
+ // Get IV from metadata
+ iv, err := GetSSECIVFromMetadata(entry.Extended)
+ if err != nil {
+ return nil, fmt.Errorf("failed to get SSE-C IV: %w", err)
+ }
+
+ // Decrypt content
+ decryptedReader, err := CreateSSECDecryptedReader(bytes.NewReader(content), sourceKey, iv)
+ if err != nil {
+ return nil, fmt.Errorf("failed to create SSE-C decrypted reader: %w", err)
+ }
+ return io.ReadAll(decryptedReader)
+ }
+
+ // Source not encrypted, return as-is
+ return content, nil
+}
+
+// encryptInlineContent encrypts inline content for the destination
+func (s3a *S3ApiServer) encryptInlineContent(content []byte, dstBucket, dstObject string,
+ dstSSEC, dstSSEKMS, dstSSES3 bool, r *http.Request) ([]byte, map[string][]byte, error) {
+
+ dstMetadata := make(map[string][]byte)
+
+ if dstSSES3 {
+ // Generate SSE-S3 key
+ keyManager := GetSSES3KeyManager()
+ key, err := keyManager.GetOrCreateKey("")
+ if err != nil {
+ return nil, nil, fmt.Errorf("failed to generate SSE-S3 key: %w", err)
+ }
+
+ // Encrypt content
+ encryptedReader, iv, err := CreateSSES3EncryptedReader(bytes.NewReader(content), key)
+ if err != nil {
+ return nil, nil, fmt.Errorf("failed to create SSE-S3 encrypted reader: %w", err)
+ }
+
+ encryptedContent, err := io.ReadAll(encryptedReader)
+ if err != nil {
+ return nil, nil, fmt.Errorf("failed to read encrypted content: %w", err)
+ }
+
+ // Store IV on key and serialize metadata
+ key.IV = iv
+ keyData, err := SerializeSSES3Metadata(key)
+ if err != nil {
+ return nil, nil, fmt.Errorf("failed to serialize SSE-S3 metadata: %w", err)
+ }
+
+ dstMetadata[s3_constants.SeaweedFSSSES3Key] = keyData
+ dstMetadata[s3_constants.AmzServerSideEncryption] = []byte("AES256")
+
+ return encryptedContent, dstMetadata, nil
+
+ } else if dstSSEKMS {
+ // Parse SSE-KMS headers
+ keyID, encryptionContext, bucketKeyEnabled, err := ParseSSEKMSCopyHeaders(r)
+ if err != nil {
+ return nil, nil, fmt.Errorf("failed to parse SSE-KMS headers: %w", err)
+ }
+
+ // Build encryption context if needed
+ if encryptionContext == nil {
+ encryptionContext = BuildEncryptionContext(dstBucket, dstObject, bucketKeyEnabled)
+ }
+
+ // Encrypt content
+ encryptedReader, sseKey, err := CreateSSEKMSEncryptedReaderWithBucketKey(
+ bytes.NewReader(content), keyID, encryptionContext, bucketKeyEnabled)
+ if err != nil {
+ return nil, nil, fmt.Errorf("failed to create SSE-KMS encrypted reader: %w", err)
+ }
+
+ encryptedContent, err := io.ReadAll(encryptedReader)
+ if err != nil {
+ return nil, nil, fmt.Errorf("failed to read encrypted content: %w", err)
+ }
+
+ // Serialize metadata
+ keyData, err := SerializeSSEKMSMetadata(sseKey)
+ if err != nil {
+ return nil, nil, fmt.Errorf("failed to serialize SSE-KMS metadata: %w", err)
+ }
+
+ dstMetadata[s3_constants.SeaweedFSSSEKMSKey] = keyData
+ dstMetadata[s3_constants.AmzServerSideEncryption] = []byte("aws:kms")
+
+ return encryptedContent, dstMetadata, nil
+
+ } else if dstSSEC {
+ // Parse SSE-C headers
+ destKey, err := ParseSSECHeaders(r)
+ if err != nil {
+ return nil, nil, fmt.Errorf("failed to parse SSE-C headers: %w", err)
+ }
+
+ // Encrypt content
+ encryptedReader, iv, err := CreateSSECEncryptedReader(bytes.NewReader(content), destKey)
+ if err != nil {
+ return nil, nil, fmt.Errorf("failed to create SSE-C encrypted reader: %w", err)
+ }
+
+ encryptedContent, err := io.ReadAll(encryptedReader)
+ if err != nil {
+ return nil, nil, fmt.Errorf("failed to read encrypted content: %w", err)
+ }
+
+ // Store IV in metadata
+ StoreSSECIVInMetadata(dstMetadata, iv)
+ dstMetadata[s3_constants.AmzServerSideEncryptionCustomerAlgorithm] = []byte("AES256")
+ dstMetadata[s3_constants.AmzServerSideEncryptionCustomerKeyMD5] = []byte(destKey.KeyMD5)
+
+ return encryptedContent, dstMetadata, nil
+ }
+
+ // No encryption needed
+ return content, nil, nil
+}
diff --git a/weed/server/filer_server.go b/weed/server/filer_server.go
index 95d344af4..3d2db00ad 100644
--- a/weed/server/filer_server.go
+++ b/weed/server/filer_server.go
@@ -79,6 +79,7 @@ type FilerOption struct {
DiskType string
AllowedOrigins []string
ExposeDirectoryData bool
+ TusPath string
}
type FilerServer struct {
@@ -195,6 +196,17 @@ func NewFilerServer(defaultMux, readonlyMux *http.ServeMux, option *FilerOption)
handleStaticResources(defaultMux)
if !option.DisableHttp {
defaultMux.HandleFunc("/healthz", requestIDMiddleware(fs.filerHealthzHandler))
+ // TUS resumable upload protocol handler
+ if option.TusPath != "" {
+ tusPath := option.TusPath
+ if !strings.HasPrefix(tusPath, "/") {
+ tusPath = "/" + tusPath
+ }
+ if !strings.HasSuffix(tusPath, "/") {
+ tusPath += "/"
+ }
+ defaultMux.HandleFunc(tusPath, fs.filerGuard.WhiteList(requestIDMiddleware(fs.tusHandler)))
+ }
defaultMux.HandleFunc("/", fs.filerGuard.WhiteList(requestIDMiddleware(fs.filerHandler)))
}
if defaultMux != readonlyMux {
diff --git a/weed/server/filer_server_tus_handlers.go b/weed/server/filer_server_tus_handlers.go
new file mode 100644
index 000000000..abb6ecca4
--- /dev/null
+++ b/weed/server/filer_server_tus_handlers.go
@@ -0,0 +1,415 @@
+package weed_server
+
+import (
+ "bytes"
+ "context"
+ "encoding/base64"
+ "fmt"
+ "io"
+ "net/http"
+ "path"
+ "strconv"
+ "strings"
+ "time"
+
+ "github.com/google/uuid"
+ "github.com/seaweedfs/seaweedfs/weed/glog"
+ "github.com/seaweedfs/seaweedfs/weed/operation"
+ "github.com/seaweedfs/seaweedfs/weed/pb/filer_pb"
+ "github.com/seaweedfs/seaweedfs/weed/stats"
+ "github.com/seaweedfs/seaweedfs/weed/util"
+)
+
+// tusHandler is the main entry point for TUS protocol requests
+func (fs *FilerServer) tusHandler(w http.ResponseWriter, r *http.Request) {
+ // Set common TUS response headers
+ w.Header().Set("Tus-Resumable", TusVersion)
+
+ // Check Tus-Resumable header for non-OPTIONS requests
+ if r.Method != http.MethodOptions {
+ tusVersion := r.Header.Get("Tus-Resumable")
+ if tusVersion != TusVersion {
+ http.Error(w, "Unsupported TUS version", http.StatusPreconditionFailed)
+ return
+ }
+ }
+
+ // Route based on method and path
+ reqPath := r.URL.Path
+ tusPrefix := fs.option.TusPath
+ if tusPrefix == "" {
+ tusPrefix = ".tus"
+ }
+ if !strings.HasPrefix(tusPrefix, "/") {
+ tusPrefix = "/" + tusPrefix
+ }
+
+ // Check if this is an upload location (contains upload ID after {tusPrefix}/.uploads/)
+ uploadsPrefix := tusPrefix + "/.uploads/"
+ if strings.HasPrefix(reqPath, uploadsPrefix) {
+ uploadID := strings.TrimPrefix(reqPath, uploadsPrefix)
+ uploadID = strings.Split(uploadID, "/")[0] // Get just the ID, not any trailing path
+
+ switch r.Method {
+ case http.MethodHead:
+ fs.tusHeadHandler(w, r, uploadID)
+ case http.MethodPatch:
+ fs.tusPatchHandler(w, r, uploadID)
+ case http.MethodDelete:
+ fs.tusDeleteHandler(w, r, uploadID)
+ default:
+ w.WriteHeader(http.StatusMethodNotAllowed)
+ }
+ return
+ }
+
+ // Handle creation endpoints (POST to /.tus/{path})
+ switch r.Method {
+ case http.MethodOptions:
+ fs.tusOptionsHandler(w, r)
+ case http.MethodPost:
+ fs.tusCreateHandler(w, r)
+ default:
+ w.WriteHeader(http.StatusMethodNotAllowed)
+ }
+}
+
+// tusOptionsHandler handles OPTIONS requests for capability discovery
+func (fs *FilerServer) tusOptionsHandler(w http.ResponseWriter, r *http.Request) {
+ w.Header().Set("Tus-Version", TusVersion)
+ w.Header().Set("Tus-Extension", TusExtensions)
+ w.Header().Set("Tus-Max-Size", strconv.FormatInt(TusMaxSize, 10))
+ w.WriteHeader(http.StatusOK)
+}
+
+// tusCreateHandler handles POST requests to create new uploads
+func (fs *FilerServer) tusCreateHandler(w http.ResponseWriter, r *http.Request) {
+ ctx := r.Context()
+
+ // Parse Upload-Length header (required)
+ uploadLengthStr := r.Header.Get("Upload-Length")
+ if uploadLengthStr == "" {
+ http.Error(w, "Upload-Length header required", http.StatusBadRequest)
+ return
+ }
+ uploadLength, err := strconv.ParseInt(uploadLengthStr, 10, 64)
+ if err != nil || uploadLength < 0 {
+ http.Error(w, "Invalid Upload-Length", http.StatusBadRequest)
+ return
+ }
+ if uploadLength > TusMaxSize {
+ http.Error(w, "Upload-Length exceeds maximum", http.StatusRequestEntityTooLarge)
+ return
+ }
+
+ // Parse Upload-Metadata header (optional)
+ metadata := parseTusMetadata(r.Header.Get("Upload-Metadata"))
+
+ // Get TUS path prefix
+ tusPrefix := fs.option.TusPath
+ if tusPrefix == "" {
+ tusPrefix = ".tus"
+ }
+ if !strings.HasPrefix(tusPrefix, "/") {
+ tusPrefix = "/" + tusPrefix
+ }
+
+ // Determine target path from request URL
+ targetPath := strings.TrimPrefix(r.URL.Path, tusPrefix)
+ if targetPath == "" || targetPath == "/" {
+ http.Error(w, "Target path required", http.StatusBadRequest)
+ return
+ }
+
+ // Generate upload ID
+ uploadID := uuid.New().String()
+
+ // Create upload session
+ session, err := fs.createTusSession(ctx, uploadID, targetPath, uploadLength, metadata)
+ if err != nil {
+ glog.Errorf("Failed to create TUS session: %v", err)
+ http.Error(w, "Failed to create upload", http.StatusInternalServerError)
+ return
+ }
+
+ // Build upload location URL (ensure it starts with single /)
+ uploadLocation := path.Clean(fmt.Sprintf("%s/.uploads/%s", tusPrefix, uploadID))
+ if !strings.HasPrefix(uploadLocation, "/") {
+ uploadLocation = "/" + uploadLocation
+ }
+
+ // Handle creation-with-upload extension
+ // TUS requires Content-Length for uploads; reject chunked encoding
+ if r.Header.Get("Content-Type") == "application/offset+octet-stream" {
+ if r.ContentLength < 0 {
+ fs.deleteTusSession(ctx, uploadID)
+ http.Error(w, "Content-Length header required for creation-with-upload", http.StatusBadRequest)
+ return
+ }
+ if r.ContentLength == 0 {
+ // Empty body is allowed, just skip the upload
+ goto respond
+ }
+ // Upload data in the creation request
+ bytesWritten, uploadErr := fs.tusWriteData(ctx, session, 0, r.Body, r.ContentLength)
+ if uploadErr != nil {
+ // Cleanup session on failure
+ fs.deleteTusSession(ctx, uploadID)
+ glog.Errorf("Failed to write initial TUS data: %v", uploadErr)
+ http.Error(w, "Failed to write data", http.StatusInternalServerError)
+ return
+ }
+
+ // Update offset in response header
+ w.Header().Set("Upload-Offset", strconv.FormatInt(bytesWritten, 10))
+
+ // Check if upload is complete
+ if bytesWritten == session.Size {
+ // Refresh session to get updated chunks
+ session, err = fs.getTusSession(ctx, uploadID)
+ if err != nil {
+ glog.Errorf("Failed to get updated TUS session: %v", err)
+ http.Error(w, "Failed to complete upload", http.StatusInternalServerError)
+ return
+ }
+ if err := fs.completeTusUpload(ctx, session); err != nil {
+ glog.Errorf("Failed to complete TUS upload: %v", err)
+ http.Error(w, "Failed to complete upload", http.StatusInternalServerError)
+ return
+ }
+ }
+ }
+
+respond:
+ w.Header().Set("Location", uploadLocation)
+ w.WriteHeader(http.StatusCreated)
+}
+
+// tusHeadHandler handles HEAD requests to get current upload offset
+func (fs *FilerServer) tusHeadHandler(w http.ResponseWriter, r *http.Request, uploadID string) {
+ ctx := r.Context()
+
+ session, err := fs.getTusSession(ctx, uploadID)
+ if err != nil {
+ http.Error(w, "Upload not found", http.StatusNotFound)
+ return
+ }
+
+ w.Header().Set("Upload-Offset", strconv.FormatInt(session.Offset, 10))
+ w.Header().Set("Upload-Length", strconv.FormatInt(session.Size, 10))
+ w.Header().Set("Cache-Control", "no-store")
+ w.WriteHeader(http.StatusOK)
+}
+
+// tusPatchHandler handles PATCH requests to upload data
+func (fs *FilerServer) tusPatchHandler(w http.ResponseWriter, r *http.Request, uploadID string) {
+ ctx := r.Context()
+
+ // Validate Content-Type
+ contentType := r.Header.Get("Content-Type")
+ if contentType != "application/offset+octet-stream" {
+ http.Error(w, "Content-Type must be application/offset+octet-stream", http.StatusUnsupportedMediaType)
+ return
+ }
+
+ // Get current session
+ session, err := fs.getTusSession(ctx, uploadID)
+ if err != nil {
+ http.Error(w, "Upload not found", http.StatusNotFound)
+ return
+ }
+
+ // Validate Upload-Offset header
+ uploadOffsetStr := r.Header.Get("Upload-Offset")
+ if uploadOffsetStr == "" {
+ http.Error(w, "Upload-Offset header required", http.StatusBadRequest)
+ return
+ }
+ uploadOffset, err := strconv.ParseInt(uploadOffsetStr, 10, 64)
+ if err != nil || uploadOffset < 0 {
+ http.Error(w, "Invalid Upload-Offset", http.StatusBadRequest)
+ return
+ }
+
+ // Check offset matches current position
+ if uploadOffset != session.Offset {
+ http.Error(w, fmt.Sprintf("Offset mismatch: expected %d, got %d", session.Offset, uploadOffset), http.StatusConflict)
+ return
+ }
+
+ // TUS requires Content-Length header for PATCH requests
+ if r.ContentLength < 0 {
+ http.Error(w, "Content-Length header required", http.StatusBadRequest)
+ return
+ }
+
+ // Write data
+ bytesWritten, err := fs.tusWriteData(ctx, session, uploadOffset, r.Body, r.ContentLength)
+ if err != nil {
+ glog.Errorf("Failed to write TUS data: %v", err)
+ http.Error(w, "Failed to write data", http.StatusInternalServerError)
+ return
+ }
+
+ newOffset := uploadOffset + bytesWritten
+
+ // Check if upload is complete
+ if newOffset == session.Size {
+ // Refresh session to get updated chunks
+ session, err = fs.getTusSession(ctx, uploadID)
+ if err != nil {
+ glog.Errorf("Failed to get updated TUS session: %v", err)
+ http.Error(w, "Failed to complete upload", http.StatusInternalServerError)
+ return
+ }
+
+ if err := fs.completeTusUpload(ctx, session); err != nil {
+ glog.Errorf("Failed to complete TUS upload: %v", err)
+ http.Error(w, "Failed to complete upload", http.StatusInternalServerError)
+ return
+ }
+ }
+
+ w.Header().Set("Upload-Offset", strconv.FormatInt(newOffset, 10))
+ w.WriteHeader(http.StatusNoContent)
+}
+
+// tusDeleteHandler handles DELETE requests to cancel uploads
+func (fs *FilerServer) tusDeleteHandler(w http.ResponseWriter, r *http.Request, uploadID string) {
+ ctx := r.Context()
+
+ if err := fs.deleteTusSession(ctx, uploadID); err != nil {
+ glog.Errorf("Failed to delete TUS session: %v", err)
+ http.Error(w, "Failed to delete upload", http.StatusInternalServerError)
+ return
+ }
+
+ w.WriteHeader(http.StatusNoContent)
+}
+
+// tusWriteData uploads data to volume servers and updates session
+func (fs *FilerServer) tusWriteData(ctx context.Context, session *TusSession, offset int64, reader io.Reader, contentLength int64) (int64, error) {
+ if contentLength == 0 {
+ return 0, nil
+ }
+
+ // Limit content length to remaining size
+ remaining := session.Size - offset
+ if contentLength > remaining {
+ contentLength = remaining
+ }
+ if contentLength <= 0 {
+ return 0, nil
+ }
+
+ // Read data into buffer
+ // Determine storage options based on target path
+ so, err := fs.detectStorageOption0(ctx, session.TargetPath, "", "", "", "", "", "", "", "", "")
+ if err != nil {
+ return 0, fmt.Errorf("detect storage option: %w", err)
+ }
+
+ // Assign file ID from master
+ fileId, urlLocation, auth, assignErr := fs.assignNewFileInfo(ctx, so)
+ if assignErr != nil {
+ return 0, fmt.Errorf("assign volume: %w", assignErr)
+ }
+
+ // Upload to volume server
+ uploader, uploaderErr := operation.NewUploader()
+ if uploaderErr != nil {
+ return 0, fmt.Errorf("create uploader: %w", uploaderErr)
+ }
+
+ // Read first bytes for MIME type detection, respecting contentLength
+ // http.DetectContentType uses at most 512 bytes
+ sniffSize := int64(512)
+ if contentLength < sniffSize {
+ sniffSize = contentLength
+ }
+ sniffBuf := make([]byte, sniffSize)
+ sniffN, sniffErr := io.ReadFull(reader, sniffBuf)
+ if sniffErr != nil && sniffErr != io.EOF && sniffErr != io.ErrUnexpectedEOF {
+ return 0, fmt.Errorf("read data for mime detection: %w", sniffErr)
+ }
+ if sniffN == 0 {
+ return 0, nil
+ }
+ sniffBuf = sniffBuf[:sniffN]
+
+ // Detect MIME type from sniffed bytes
+ mimeType := http.DetectContentType(sniffBuf)
+
+ // Create a reader that combines sniffed bytes with remaining data
+ var dataReader io.Reader
+ if int64(sniffN) >= contentLength {
+ // All data fits in sniff buffer
+ dataReader = bytes.NewReader(sniffBuf)
+ } else {
+ // Combine sniffed bytes with remaining stream
+ dataReader = io.MultiReader(bytes.NewReader(sniffBuf), io.LimitReader(reader, contentLength-int64(sniffN)))
+ }
+
+ uploadResult, uploadErr, _ := uploader.Upload(ctx, dataReader, &operation.UploadOption{
+ UploadUrl: urlLocation,
+ Filename: "",
+ Cipher: fs.option.Cipher,
+ IsInputCompressed: false,
+ MimeType: mimeType,
+ PairMap: nil,
+ Jwt: auth,
+ })
+ if uploadErr != nil {
+ return 0, fmt.Errorf("upload data: %w", uploadErr)
+ }
+
+ // Create chunk info
+ chunk := &TusChunkInfo{
+ Offset: offset,
+ Size: int64(uploadResult.Size),
+ FileId: fileId,
+ UploadAt: time.Now().UnixNano(),
+ }
+
+ // Update session
+ if err := fs.updateTusSessionOffset(ctx, session.ID, offset+int64(uploadResult.Size), chunk); err != nil {
+ // Try to clean up the uploaded chunk
+ fs.filer.DeleteChunks(ctx, util.FullPath(session.TargetPath), []*filer_pb.FileChunk{
+ {FileId: fileId},
+ })
+ return 0, fmt.Errorf("update session: %w", err)
+ }
+
+ stats.FilerHandlerCounter.WithLabelValues("tusUploadChunk").Inc()
+
+ return int64(uploadResult.Size), nil
+}
+
+// parseTusMetadata parses the Upload-Metadata header
+// Format: key1 base64value1,key2 base64value2,...
+func parseTusMetadata(header string) map[string]string {
+ metadata := make(map[string]string)
+ if header == "" {
+ return metadata
+ }
+
+ pairs := strings.Split(header, ",")
+ for _, pair := range pairs {
+ pair = strings.TrimSpace(pair)
+ parts := strings.SplitN(pair, " ", 2)
+ if len(parts) != 2 {
+ continue
+ }
+ key := strings.TrimSpace(parts[0])
+ encodedValue := strings.TrimSpace(parts[1])
+
+ value, err := base64.StdEncoding.DecodeString(encodedValue)
+ if err != nil {
+ glog.V(1).Infof("Failed to decode TUS metadata value for key %s: %v", key, err)
+ continue
+ }
+ metadata[key] = string(value)
+ }
+
+ return metadata
+}
diff --git a/weed/server/filer_server_tus_session.go b/weed/server/filer_server_tus_session.go
new file mode 100644
index 000000000..550574058
--- /dev/null
+++ b/weed/server/filer_server_tus_session.go
@@ -0,0 +1,341 @@
+package weed_server
+
+import (
+ "context"
+ "encoding/json"
+ "fmt"
+ "os"
+ "sort"
+ "strconv"
+ "strings"
+ "time"
+
+ "github.com/seaweedfs/seaweedfs/weed/filer"
+ "github.com/seaweedfs/seaweedfs/weed/glog"
+ "github.com/seaweedfs/seaweedfs/weed/pb/filer_pb"
+ "github.com/seaweedfs/seaweedfs/weed/util"
+)
+
+const (
+ TusVersion = "1.0.0"
+ TusMaxSize = int64(5 * 1024 * 1024 * 1024) // 5GB default max size
+ TusUploadsFolder = ".uploads.tus"
+ TusInfoFileName = ".info"
+ TusChunkExt = ".chunk"
+ TusExtensions = "creation,creation-with-upload,termination"
+)
+
+// TusSession represents an in-progress TUS upload session
+type TusSession struct {
+ ID string `json:"id"`
+ TargetPath string `json:"target_path"`
+ Size int64 `json:"size"`
+ Offset int64 `json:"offset"`
+ Metadata map[string]string `json:"metadata,omitempty"`
+ CreatedAt time.Time `json:"created_at"`
+ ExpiresAt time.Time `json:"expires_at,omitempty"`
+ Chunks []*TusChunkInfo `json:"chunks,omitempty"`
+}
+
+// TusChunkInfo tracks individual chunk uploads within a session
+type TusChunkInfo struct {
+ Offset int64 `json:"offset"`
+ Size int64 `json:"size"`
+ FileId string `json:"file_id"`
+ UploadAt int64 `json:"upload_at"`
+}
+
+// tusSessionDir returns the directory path for storing TUS upload sessions
+func (fs *FilerServer) tusSessionDir() string {
+ return "/" + TusUploadsFolder
+}
+
+// tusSessionPath returns the path to a specific upload session directory
+func (fs *FilerServer) tusSessionPath(uploadID string) string {
+ return fmt.Sprintf("/%s/%s", TusUploadsFolder, uploadID)
+}
+
+// tusSessionInfoPath returns the path to the session info file
+func (fs *FilerServer) tusSessionInfoPath(uploadID string) string {
+ return fmt.Sprintf("/%s/%s/%s", TusUploadsFolder, uploadID, TusInfoFileName)
+}
+
+// tusChunkPath returns the path to store a chunk info file
+// Format: /{TusUploadsFolder}/{uploadID}/chunk_{offset}_{size}_{fileId}
+func (fs *FilerServer) tusChunkPath(uploadID string, offset, size int64, fileId string) string {
+ // Replace / in fileId with _ to make it a valid filename
+ safeFileId := strings.ReplaceAll(fileId, "/", "_")
+ return fmt.Sprintf("/%s/%s/chunk_%016d_%016d_%s", TusUploadsFolder, uploadID, offset, size, safeFileId)
+}
+
+// parseTusChunkPath parses chunk info from a chunk file name
+func parseTusChunkPath(name string) (*TusChunkInfo, error) {
+ if !strings.HasPrefix(name, "chunk_") {
+ return nil, fmt.Errorf("not a chunk file: %s", name)
+ }
+ parts := strings.SplitN(name[6:], "_", 3) // Skip "chunk_" prefix
+ if len(parts) < 3 {
+ return nil, fmt.Errorf("invalid chunk file name: %s", name)
+ }
+ offset, err := strconv.ParseInt(parts[0], 10, 64)
+ if err != nil {
+ return nil, fmt.Errorf("invalid offset in chunk file: %s", name)
+ }
+ size, err := strconv.ParseInt(parts[1], 10, 64)
+ if err != nil {
+ return nil, fmt.Errorf("invalid size in chunk file: %s", name)
+ }
+ // Restore / in fileId
+ fileId := strings.ReplaceAll(parts[2], "_", "/")
+ return &TusChunkInfo{
+ Offset: offset,
+ Size: size,
+ FileId: fileId,
+ UploadAt: time.Now().UnixNano(),
+ }, nil
+}
+
+// createTusSession creates a new TUS upload session
+func (fs *FilerServer) createTusSession(ctx context.Context, uploadID, targetPath string, size int64, metadata map[string]string) (*TusSession, error) {
+ session := &TusSession{
+ ID: uploadID,
+ TargetPath: targetPath,
+ Size: size,
+ Offset: 0,
+ Metadata: metadata,
+ CreatedAt: time.Now(),
+ ExpiresAt: time.Now().Add(7 * 24 * time.Hour), // 7 days default expiration
+ Chunks: []*TusChunkInfo{},
+ }
+
+ // Create session directory
+ sessionDirPath := util.FullPath(fs.tusSessionPath(uploadID))
+ if err := fs.filer.CreateEntry(ctx, &filer.Entry{
+ FullPath: sessionDirPath,
+ Attr: filer.Attr{
+ Mode: os.ModeDir | 0755,
+ Crtime: time.Now(),
+ Mtime: time.Now(),
+ Uid: OS_UID,
+ Gid: OS_GID,
+ },
+ }, false, false, nil, false, fs.filer.MaxFilenameLength); err != nil {
+ return nil, fmt.Errorf("create session directory: %w", err)
+ }
+
+ // Save session info
+ if err := fs.saveTusSession(ctx, session); err != nil {
+ // Cleanup the directory on failure
+ fs.filer.DeleteEntryMetaAndData(ctx, sessionDirPath, true, true, false, false, nil, 0)
+ return nil, fmt.Errorf("save session info: %w", err)
+ }
+
+ glog.V(2).Infof("Created TUS session %s for %s, size=%d", uploadID, targetPath, size)
+ return session, nil
+}
+
+// saveTusSession saves the session info to the filer
+func (fs *FilerServer) saveTusSession(ctx context.Context, session *TusSession) error {
+ sessionData, err := json.Marshal(session)
+ if err != nil {
+ return fmt.Errorf("marshal session: %w", err)
+ }
+
+ infoPath := util.FullPath(fs.tusSessionInfoPath(session.ID))
+ entry := &filer.Entry{
+ FullPath: infoPath,
+ Attr: filer.Attr{
+ Mode: 0644,
+ Crtime: session.CreatedAt,
+ Mtime: time.Now(),
+ Uid: OS_UID,
+ Gid: OS_GID,
+ },
+ Content: sessionData,
+ }
+
+ if err := fs.filer.CreateEntry(ctx, entry, false, false, nil, false, fs.filer.MaxFilenameLength); err != nil {
+ return fmt.Errorf("save session info entry: %w", err)
+ }
+
+ return nil
+}
+
+// getTusSession retrieves a TUS session by upload ID, including chunks from directory listing
+func (fs *FilerServer) getTusSession(ctx context.Context, uploadID string) (*TusSession, error) {
+ infoPath := util.FullPath(fs.tusSessionInfoPath(uploadID))
+ entry, err := fs.filer.FindEntry(ctx, infoPath)
+ if err != nil {
+ if err == filer_pb.ErrNotFound {
+ return nil, fmt.Errorf("session not found: %s", uploadID)
+ }
+ return nil, fmt.Errorf("find session: %w", err)
+ }
+
+ var session TusSession
+ if err := json.Unmarshal(entry.Content, &session); err != nil {
+ return nil, fmt.Errorf("unmarshal session: %w", err)
+ }
+
+ // Load chunks from directory listing (atomic read, no race condition)
+ sessionDirPath := util.FullPath(fs.tusSessionPath(uploadID))
+ entries, _, err := fs.filer.ListDirectoryEntries(ctx, sessionDirPath, "", false, 10000, "", "", "")
+ if err != nil {
+ return nil, fmt.Errorf("list session directory: %w", err)
+ }
+
+ session.Chunks = nil
+ session.Offset = 0
+ for _, e := range entries {
+ if strings.HasPrefix(e.Name(), "chunk_") {
+ chunk, parseErr := parseTusChunkPath(e.Name())
+ if parseErr != nil {
+ glog.V(1).Infof("Skipping invalid chunk file %s: %v", e.Name(), parseErr)
+ continue
+ }
+ session.Chunks = append(session.Chunks, chunk)
+ }
+ }
+
+ // Sort chunks by offset and compute current offset
+ if len(session.Chunks) > 0 {
+ sort.Slice(session.Chunks, func(i, j int) bool {
+ return session.Chunks[i].Offset < session.Chunks[j].Offset
+ })
+ // Current offset is the end of the last chunk
+ lastChunk := session.Chunks[len(session.Chunks)-1]
+ session.Offset = lastChunk.Offset + lastChunk.Size
+ }
+
+ return &session, nil
+}
+
+// updateTusSessionOffset stores the chunk info as a separate file entry
+// This avoids read-modify-write race conditions across multiple filer instances
+func (fs *FilerServer) updateTusSessionOffset(ctx context.Context, uploadID string, newOffset int64, chunk *TusChunkInfo) error {
+ if chunk == nil {
+ return nil
+ }
+
+ // Store chunk info as a separate file entry (atomic operation)
+ chunkPath := util.FullPath(fs.tusChunkPath(uploadID, chunk.Offset, chunk.Size, chunk.FileId))
+ chunkData, err := json.Marshal(chunk)
+ if err != nil {
+ return fmt.Errorf("marshal chunk info: %w", err)
+ }
+
+ if err := fs.filer.CreateEntry(ctx, &filer.Entry{
+ FullPath: chunkPath,
+ Attr: filer.Attr{
+ Mode: 0644,
+ Crtime: time.Now(),
+ Mtime: time.Now(),
+ Uid: OS_UID,
+ Gid: OS_GID,
+ },
+ Content: chunkData,
+ }, false, false, nil, false, fs.filer.MaxFilenameLength); err != nil {
+ return fmt.Errorf("save chunk info: %w", err)
+ }
+
+ return nil
+}
+
+// deleteTusSession removes a TUS upload session and all its data
+func (fs *FilerServer) deleteTusSession(ctx context.Context, uploadID string) error {
+
+ session, err := fs.getTusSession(ctx, uploadID)
+ if err != nil {
+ // Session might already be deleted or never existed
+ glog.V(1).Infof("TUS session %s not found for deletion: %v", uploadID, err)
+ return nil
+ }
+
+ // Delete any uploaded chunks from volume servers
+ for _, chunk := range session.Chunks {
+ if chunk.FileId != "" {
+ fs.filer.DeleteChunks(ctx, util.FullPath(session.TargetPath), []*filer_pb.FileChunk{
+ {FileId: chunk.FileId},
+ })
+ }
+ }
+
+ // Delete the session directory
+ sessionDirPath := util.FullPath(fs.tusSessionPath(uploadID))
+ if err := fs.filer.DeleteEntryMetaAndData(ctx, sessionDirPath, true, true, false, false, nil, 0); err != nil {
+ return fmt.Errorf("delete session directory: %w", err)
+ }
+
+ glog.V(2).Infof("Deleted TUS session %s", uploadID)
+ return nil
+}
+
+// completeTusUpload assembles all chunks and creates the final file
+func (fs *FilerServer) completeTusUpload(ctx context.Context, session *TusSession) error {
+ if session.Offset != session.Size {
+ return fmt.Errorf("upload incomplete: offset=%d, expected=%d", session.Offset, session.Size)
+ }
+
+ // Sort chunks by offset to ensure correct order
+ sort.Slice(session.Chunks, func(i, j int) bool {
+ return session.Chunks[i].Offset < session.Chunks[j].Offset
+ })
+
+ // Assemble file chunks in order
+ var fileChunks []*filer_pb.FileChunk
+
+ for _, chunk := range session.Chunks {
+ fid, fidErr := filer_pb.ToFileIdObject(chunk.FileId)
+ if fidErr != nil {
+ return fmt.Errorf("invalid file ID %s at offset %d: %w", chunk.FileId, chunk.Offset, fidErr)
+ }
+
+ fileChunk := &filer_pb.FileChunk{
+ FileId: chunk.FileId,
+ Offset: chunk.Offset,
+ Size: uint64(chunk.Size),
+ ModifiedTsNs: chunk.UploadAt,
+ Fid: fid,
+ }
+ fileChunks = append(fileChunks, fileChunk)
+ }
+
+ // Determine content type from metadata
+ contentType := ""
+ if session.Metadata != nil {
+ if ct, ok := session.Metadata["content-type"]; ok {
+ contentType = ct
+ }
+ }
+
+ // Create the final file entry
+ targetPath := util.FullPath(session.TargetPath)
+ entry := &filer.Entry{
+ FullPath: targetPath,
+ Attr: filer.Attr{
+ Mode: 0644,
+ Crtime: session.CreatedAt,
+ Mtime: time.Now(),
+ Uid: OS_UID,
+ Gid: OS_GID,
+ Mime: contentType,
+ },
+ Chunks: fileChunks,
+ }
+
+ // Ensure parent directory exists
+ if err := fs.filer.CreateEntry(ctx, entry, false, false, nil, false, fs.filer.MaxFilenameLength); err != nil {
+ return fmt.Errorf("create final file entry: %w", err)
+ }
+
+ // Delete the session (but keep the chunks since they're now part of the final file)
+ sessionDirPath := util.FullPath(fs.tusSessionPath(session.ID))
+ if err := fs.filer.DeleteEntryMetaAndData(ctx, sessionDirPath, true, false, false, false, nil, 0); err != nil {
+ glog.V(1).Infof("Failed to cleanup TUS session directory %s: %v", session.ID, err)
+ }
+
+ glog.V(2).Infof("Completed TUS upload %s -> %s, size=%d, chunks=%d",
+ session.ID, session.TargetPath, session.Size, len(fileChunks))
+
+ return nil
+}