aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorchrislu <chris.lu@gmail.com>2025-12-01 16:21:29 -0800
committerchrislu <chris.lu@gmail.com>2025-12-01 16:21:29 -0800
commitb49ee254814d6ecea7937147f30862c154f25667 (patch)
treea8f55f7c25d6c390b3dfd416b50017f0f857b5cb
parent870e32c4ad154dac83c14fc150b6ef3a5842f1ab (diff)
downloadseaweedfs-b49ee254814d6ecea7937147f30862c154f25667.tar.xz
seaweedfs-b49ee254814d6ecea7937147f30862c154f25667.zip
Fix race condition to work across multiple filer instances
- Store each chunk as a separate file entry instead of updating session JSON - Chunk file names encode offset, size, and fileId for atomic storage - getTusSession loads chunks from directory listing (atomic read) - Eliminates read-modify-write race condition across multiple filers - Remove in-memory mutex that only worked for single filer instance
-rw-r--r--weed/server/filer_server_tus_session.go138
1 files changed, 90 insertions, 48 deletions
diff --git a/weed/server/filer_server_tus_session.go b/weed/server/filer_server_tus_session.go
index c8ae7c1cd..550574058 100644
--- a/weed/server/filer_server_tus_session.go
+++ b/weed/server/filer_server_tus_session.go
@@ -6,7 +6,8 @@ import (
"fmt"
"os"
"sort"
- "sync"
+ "strconv"
+ "strings"
"time"
"github.com/seaweedfs/seaweedfs/weed/filer"
@@ -15,39 +16,6 @@ import (
"github.com/seaweedfs/seaweedfs/weed/util"
)
-// tusSessionLocks provides per-session locking to prevent race conditions
-var tusSessionLocks = struct {
- sync.RWMutex
- locks map[string]*sync.Mutex
-}{locks: make(map[string]*sync.Mutex)}
-
-// getTusSessionLock returns a lock for the given upload ID
-func getTusSessionLock(uploadID string) *sync.Mutex {
- tusSessionLocks.RLock()
- lock, exists := tusSessionLocks.locks[uploadID]
- tusSessionLocks.RUnlock()
- if exists {
- return lock
- }
-
- tusSessionLocks.Lock()
- defer tusSessionLocks.Unlock()
- // Double-check after acquiring write lock
- if lock, exists = tusSessionLocks.locks[uploadID]; exists {
- return lock
- }
- lock = &sync.Mutex{}
- tusSessionLocks.locks[uploadID] = lock
- return lock
-}
-
-// removeTusSessionLock removes the lock for the given upload ID
-func removeTusSessionLock(uploadID string) {
- tusSessionLocks.Lock()
- defer tusSessionLocks.Unlock()
- delete(tusSessionLocks.locks, uploadID)
-}
-
const (
TusVersion = "1.0.0"
TusMaxSize = int64(5 * 1024 * 1024 * 1024) // 5GB default max size
@@ -92,6 +60,41 @@ func (fs *FilerServer) tusSessionInfoPath(uploadID string) string {
return fmt.Sprintf("/%s/%s/%s", TusUploadsFolder, uploadID, TusInfoFileName)
}
+// tusChunkPath returns the path to store a chunk info file
+// Format: /{TusUploadsFolder}/{uploadID}/chunk_{offset}_{size}_{fileId}
+func (fs *FilerServer) tusChunkPath(uploadID string, offset, size int64, fileId string) string {
+ // Replace / in fileId with _ to make it a valid filename
+ safeFileId := strings.ReplaceAll(fileId, "/", "_")
+ return fmt.Sprintf("/%s/%s/chunk_%016d_%016d_%s", TusUploadsFolder, uploadID, offset, size, safeFileId)
+}
+
+// parseTusChunkPath parses chunk info from a chunk file name
+func parseTusChunkPath(name string) (*TusChunkInfo, error) {
+ if !strings.HasPrefix(name, "chunk_") {
+ return nil, fmt.Errorf("not a chunk file: %s", name)
+ }
+ parts := strings.SplitN(name[6:], "_", 3) // Skip "chunk_" prefix
+ if len(parts) < 3 {
+ return nil, fmt.Errorf("invalid chunk file name: %s", name)
+ }
+ offset, err := strconv.ParseInt(parts[0], 10, 64)
+ if err != nil {
+ return nil, fmt.Errorf("invalid offset in chunk file: %s", name)
+ }
+ size, err := strconv.ParseInt(parts[1], 10, 64)
+ if err != nil {
+ return nil, fmt.Errorf("invalid size in chunk file: %s", name)
+ }
+ // Restore / in fileId
+ fileId := strings.ReplaceAll(parts[2], "_", "/")
+ return &TusChunkInfo{
+ Offset: offset,
+ Size: size,
+ FileId: fileId,
+ UploadAt: time.Now().UnixNano(),
+ }, nil
+}
+
// createTusSession creates a new TUS upload session
func (fs *FilerServer) createTusSession(ctx context.Context, uploadID, targetPath string, size int64, metadata map[string]string) (*TusSession, error) {
session := &TusSession{
@@ -158,7 +161,7 @@ func (fs *FilerServer) saveTusSession(ctx context.Context, session *TusSession)
return nil
}
-// getTusSession retrieves a TUS session by upload ID
+// getTusSession retrieves a TUS session by upload ID, including chunks from directory listing
func (fs *FilerServer) getTusSession(ctx context.Context, uploadID string) (*TusSession, error) {
infoPath := util.FullPath(fs.tusSessionInfoPath(uploadID))
entry, err := fs.filer.FindEntry(ctx, infoPath)
@@ -174,33 +177,72 @@ func (fs *FilerServer) getTusSession(ctx context.Context, uploadID string) (*Tus
return nil, fmt.Errorf("unmarshal session: %w", err)
}
+ // Load chunks from directory listing (atomic read, no race condition)
+ sessionDirPath := util.FullPath(fs.tusSessionPath(uploadID))
+ entries, _, err := fs.filer.ListDirectoryEntries(ctx, sessionDirPath, "", false, 10000, "", "", "")
+ if err != nil {
+ return nil, fmt.Errorf("list session directory: %w", err)
+ }
+
+ session.Chunks = nil
+ session.Offset = 0
+ for _, e := range entries {
+ if strings.HasPrefix(e.Name(), "chunk_") {
+ chunk, parseErr := parseTusChunkPath(e.Name())
+ if parseErr != nil {
+ glog.V(1).Infof("Skipping invalid chunk file %s: %v", e.Name(), parseErr)
+ continue
+ }
+ session.Chunks = append(session.Chunks, chunk)
+ }
+ }
+
+ // Sort chunks by offset and compute current offset
+ if len(session.Chunks) > 0 {
+ sort.Slice(session.Chunks, func(i, j int) bool {
+ return session.Chunks[i].Offset < session.Chunks[j].Offset
+ })
+ // Current offset is the end of the last chunk
+ lastChunk := session.Chunks[len(session.Chunks)-1]
+ session.Offset = lastChunk.Offset + lastChunk.Size
+ }
+
return &session, nil
}
-// updateTusSessionOffset updates the session offset after a successful chunk upload
+// updateTusSessionOffset stores the chunk info as a separate file entry
+// This avoids read-modify-write race conditions across multiple filer instances
func (fs *FilerServer) updateTusSessionOffset(ctx context.Context, uploadID string, newOffset int64, chunk *TusChunkInfo) error {
- // Lock the session to prevent concurrent modifications
- lock := getTusSessionLock(uploadID)
- lock.Lock()
- defer lock.Unlock()
+ if chunk == nil {
+ return nil
+ }
- session, err := fs.getTusSession(ctx, uploadID)
+ // Store chunk info as a separate file entry (atomic operation)
+ chunkPath := util.FullPath(fs.tusChunkPath(uploadID, chunk.Offset, chunk.Size, chunk.FileId))
+ chunkData, err := json.Marshal(chunk)
if err != nil {
- return err
+ return fmt.Errorf("marshal chunk info: %w", err)
}
- session.Offset = newOffset
- if chunk != nil {
- session.Chunks = append(session.Chunks, chunk)
+ if err := fs.filer.CreateEntry(ctx, &filer.Entry{
+ FullPath: chunkPath,
+ Attr: filer.Attr{
+ Mode: 0644,
+ Crtime: time.Now(),
+ Mtime: time.Now(),
+ Uid: OS_UID,
+ Gid: OS_GID,
+ },
+ Content: chunkData,
+ }, false, false, nil, false, fs.filer.MaxFilenameLength); err != nil {
+ return fmt.Errorf("save chunk info: %w", err)
}
- return fs.saveTusSession(ctx, session)
+ return nil
}
// deleteTusSession removes a TUS upload session and all its data
func (fs *FilerServer) deleteTusSession(ctx context.Context, uploadID string) error {
- // Clean up the session lock
- defer removeTusSessionLock(uploadID)
session, err := fs.getTusSession(ctx, uploadID)
if err != nil {