aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorchrislu <chris.lu@gmail.com>2025-12-01 16:15:30 -0800
committerchrislu <chris.lu@gmail.com>2025-12-01 16:15:30 -0800
commit870e32c4ad154dac83c14fc150b6ef3a5842f1ab (patch)
tree1379ac27b0d6163720337a30bbb264cf0aad3d73
parent21d1a2d167f4bda4cdbe08aa31a3f6e1910fa6b6 (diff)
downloadseaweedfs-870e32c4ad154dac83c14fc150b6ef3a5842f1ab.tar.xz
seaweedfs-870e32c4ad154dac83c14fc150b6ef3a5842f1ab.zip
Address critical and high-priority review comments
- Add per-session locking to prevent race conditions in updateTusSessionOffset - Stream data directly to volume server instead of buffering entire chunk - Only buffer 512 bytes for MIME type detection, then stream remaining data - Clean up session locks when session is deleted
-rw-r--r--weed/server/filer_server_tus_handlers.go36
-rw-r--r--weed/server/filer_server_tus_session.go42
2 files changed, 66 insertions, 12 deletions
diff --git a/weed/server/filer_server_tus_handlers.go b/weed/server/filer_server_tus_handlers.go
index 7411b2527..f0b47f6d0 100644
--- a/weed/server/filer_server_tus_handlers.go
+++ b/weed/server/filer_server_tus_handlers.go
@@ -292,15 +292,6 @@ func (fs *FilerServer) tusWriteData(ctx context.Context, session *TusSession, of
}
// Read data into buffer
- buf := new(bytes.Buffer)
- n, err := io.CopyN(buf, reader, contentLength)
- if err != nil && err != io.EOF {
- return 0, fmt.Errorf("read data: %w", err)
- }
- if n == 0 {
- return 0, nil
- }
-
// Determine storage options based on target path
so, err := fs.detectStorageOption0(ctx, session.TargetPath, "", "", "", "", "", "", "", "", "")
if err != nil {
@@ -319,10 +310,31 @@ func (fs *FilerServer) tusWriteData(ctx context.Context, session *TusSession, of
return 0, fmt.Errorf("create uploader: %w", uploaderErr)
}
- // Detect MIME type from data
- mimeType := http.DetectContentType(buf.Bytes())
+ // Read first 512 bytes for MIME type detection, then stream the rest
+ sniffBuf := make([]byte, 512)
+ sniffN, sniffErr := io.ReadFull(reader, sniffBuf)
+ if sniffErr != nil && sniffErr != io.EOF && sniffErr != io.ErrUnexpectedEOF {
+ return 0, fmt.Errorf("read data for mime detection: %w", sniffErr)
+ }
+ if sniffN == 0 {
+ return 0, nil
+ }
+ sniffBuf = sniffBuf[:sniffN]
+
+ // Detect MIME type from sniffed bytes
+ mimeType := http.DetectContentType(sniffBuf)
+
+ // Create a reader that combines sniffed bytes with remaining data
+ var dataReader io.Reader
+ if int64(sniffN) >= contentLength {
+ // All data fits in sniff buffer
+ dataReader = bytes.NewReader(sniffBuf)
+ } else {
+ // Combine sniffed bytes with remaining stream
+ dataReader = io.MultiReader(bytes.NewReader(sniffBuf), io.LimitReader(reader, contentLength-int64(sniffN)))
+ }
- uploadResult, uploadErr, _ := uploader.Upload(ctx, bytes.NewReader(buf.Bytes()), &operation.UploadOption{
+ uploadResult, uploadErr, _ := uploader.Upload(ctx, dataReader, &operation.UploadOption{
UploadUrl: urlLocation,
Filename: "",
Cipher: fs.option.Cipher,
diff --git a/weed/server/filer_server_tus_session.go b/weed/server/filer_server_tus_session.go
index 685488e10..c8ae7c1cd 100644
--- a/weed/server/filer_server_tus_session.go
+++ b/weed/server/filer_server_tus_session.go
@@ -6,6 +6,7 @@ import (
"fmt"
"os"
"sort"
+ "sync"
"time"
"github.com/seaweedfs/seaweedfs/weed/filer"
@@ -14,6 +15,39 @@ import (
"github.com/seaweedfs/seaweedfs/weed/util"
)
+// tusSessionLocks provides per-session locking to prevent race conditions
+var tusSessionLocks = struct {
+ sync.RWMutex
+ locks map[string]*sync.Mutex
+}{locks: make(map[string]*sync.Mutex)}
+
+// getTusSessionLock returns a lock for the given upload ID
+func getTusSessionLock(uploadID string) *sync.Mutex {
+ tusSessionLocks.RLock()
+ lock, exists := tusSessionLocks.locks[uploadID]
+ tusSessionLocks.RUnlock()
+ if exists {
+ return lock
+ }
+
+ tusSessionLocks.Lock()
+ defer tusSessionLocks.Unlock()
+ // Double-check after acquiring write lock
+ if lock, exists = tusSessionLocks.locks[uploadID]; exists {
+ return lock
+ }
+ lock = &sync.Mutex{}
+ tusSessionLocks.locks[uploadID] = lock
+ return lock
+}
+
+// removeTusSessionLock removes the lock for the given upload ID
+func removeTusSessionLock(uploadID string) {
+ tusSessionLocks.Lock()
+ defer tusSessionLocks.Unlock()
+ delete(tusSessionLocks.locks, uploadID)
+}
+
const (
TusVersion = "1.0.0"
TusMaxSize = int64(5 * 1024 * 1024 * 1024) // 5GB default max size
@@ -145,6 +179,11 @@ func (fs *FilerServer) getTusSession(ctx context.Context, uploadID string) (*Tus
// updateTusSessionOffset updates the session offset after a successful chunk upload
func (fs *FilerServer) updateTusSessionOffset(ctx context.Context, uploadID string, newOffset int64, chunk *TusChunkInfo) error {
+ // Lock the session to prevent concurrent modifications
+ lock := getTusSessionLock(uploadID)
+ lock.Lock()
+ defer lock.Unlock()
+
session, err := fs.getTusSession(ctx, uploadID)
if err != nil {
return err
@@ -160,6 +199,9 @@ func (fs *FilerServer) updateTusSessionOffset(ctx context.Context, uploadID stri
// deleteTusSession removes a TUS upload session and all its data
func (fs *FilerServer) deleteTusSession(ctx context.Context, uploadID string) error {
+ // Clean up the session lock
+ defer removeTusSessionLock(uploadID)
+
session, err := fs.getTusSession(ctx, uploadID)
if err != nil {
// Session might already be deleted or never existed