aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorchrislu <chris.lu@gmail.com>2025-12-01 12:55:08 -0800
committerchrislu <chris.lu@gmail.com>2025-12-01 13:28:21 -0800
commit3675d9db016c486cffcd77e67eaf7802aefd3f18 (patch)
treedad12f1fd91d696f6de587d8dd7c169488fa1d31
parent5b4fede08e939c30330f4c307dfd39f0eab4f0eb (diff)
downloadseaweedfs-3675d9db016c486cffcd77e67eaf7802aefd3f18.tar.xz
seaweedfs-3675d9db016c486cffcd77e67eaf7802aefd3f18.zip
Add TUS session storage types and utilities
Implements TUS upload session management: - TusSession struct for tracking upload state - Session creation with directory-based storage - Session persistence using filer entries - Session retrieval and offset updates - Session deletion with chunk cleanup - Upload completion with chunk assembly into final file Session data is stored in /.uploads.tus/{upload-id}/ directory, following the pattern used by S3 multipart uploads.
-rw-r--r--weed/server/filer_server_tus_session.go255
1 files changed, 255 insertions, 0 deletions
diff --git a/weed/server/filer_server_tus_session.go b/weed/server/filer_server_tus_session.go
new file mode 100644
index 000000000..e0e811a8c
--- /dev/null
+++ b/weed/server/filer_server_tus_session.go
@@ -0,0 +1,255 @@
+package weed_server
+
+import (
+ "context"
+ "encoding/json"
+ "fmt"
+ "os"
+ "time"
+
+ "github.com/seaweedfs/seaweedfs/weed/filer"
+ "github.com/seaweedfs/seaweedfs/weed/glog"
+ "github.com/seaweedfs/seaweedfs/weed/pb/filer_pb"
+ "github.com/seaweedfs/seaweedfs/weed/util"
+)
+
+const (
+ TusVersion = "1.0.0"
+ TusMaxSize = int64(5 * 1024 * 1024 * 1024) // 5GB default max size
+ TusUploadsFolder = ".uploads.tus"
+ TusInfoFileName = ".info"
+ TusChunkExt = ".chunk"
+ TusExtensions = "creation,creation-with-upload,termination"
+)
+
+// TusSession represents an in-progress TUS upload session
+type TusSession struct {
+ ID string `json:"id"`
+ TargetPath string `json:"target_path"`
+ Size int64 `json:"size"`
+ Offset int64 `json:"offset"`
+ Metadata map[string]string `json:"metadata,omitempty"`
+ CreatedAt time.Time `json:"created_at"`
+ ExpiresAt time.Time `json:"expires_at,omitempty"`
+ Chunks []*TusChunkInfo `json:"chunks,omitempty"`
+}
+
+// TusChunkInfo tracks individual chunk uploads within a session
+type TusChunkInfo struct {
+ Offset int64 `json:"offset"`
+ Size int64 `json:"size"`
+ FileId string `json:"file_id"`
+ UploadAt int64 `json:"upload_at"`
+}
+
+// tusSessionDir returns the directory path for storing TUS upload sessions
+func (fs *FilerServer) tusSessionDir() string {
+ return "/" + TusUploadsFolder
+}
+
+// tusSessionPath returns the path to a specific upload session directory
+func (fs *FilerServer) tusSessionPath(uploadID string) string {
+ return fmt.Sprintf("/%s/%s", TusUploadsFolder, uploadID)
+}
+
+// tusSessionInfoPath returns the path to the session info file
+func (fs *FilerServer) tusSessionInfoPath(uploadID string) string {
+ return fmt.Sprintf("/%s/%s/%s", TusUploadsFolder, uploadID, TusInfoFileName)
+}
+
+// createTusSession creates a new TUS upload session
+func (fs *FilerServer) createTusSession(ctx context.Context, uploadID, targetPath string, size int64, metadata map[string]string) (*TusSession, error) {
+ session := &TusSession{
+ ID: uploadID,
+ TargetPath: targetPath,
+ Size: size,
+ Offset: 0,
+ Metadata: metadata,
+ CreatedAt: time.Now(),
+ ExpiresAt: time.Now().Add(7 * 24 * time.Hour), // 7 days default expiration
+ Chunks: []*TusChunkInfo{},
+ }
+
+ // Create session directory
+ sessionDirPath := util.FullPath(fs.tusSessionPath(uploadID))
+ if err := fs.filer.CreateEntry(ctx, &filer.Entry{
+ FullPath: sessionDirPath,
+ Attr: filer.Attr{
+ Mode: os.ModeDir | 0755,
+ Crtime: time.Now(),
+ Mtime: time.Now(),
+ Uid: OS_UID,
+ Gid: OS_GID,
+ },
+ }, false, false, nil, false, fs.filer.MaxFilenameLength); err != nil {
+ return nil, fmt.Errorf("create session directory: %w", err)
+ }
+
+ // Save session info
+ if err := fs.saveTusSession(ctx, session); err != nil {
+ // Cleanup the directory on failure
+ fs.filer.DeleteEntryMetaAndData(ctx, sessionDirPath, true, true, false, false, nil, 0)
+ return nil, fmt.Errorf("save session info: %w", err)
+ }
+
+ glog.V(2).Infof("Created TUS session %s for %s, size=%d", uploadID, targetPath, size)
+ return session, nil
+}
+
+// saveTusSession saves the session info to the filer
+func (fs *FilerServer) saveTusSession(ctx context.Context, session *TusSession) error {
+ sessionData, err := json.Marshal(session)
+ if err != nil {
+ return fmt.Errorf("marshal session: %w", err)
+ }
+
+ infoPath := util.FullPath(fs.tusSessionInfoPath(session.ID))
+ entry := &filer.Entry{
+ FullPath: infoPath,
+ Attr: filer.Attr{
+ Mode: 0644,
+ Crtime: session.CreatedAt,
+ Mtime: time.Now(),
+ Uid: OS_UID,
+ Gid: OS_GID,
+ },
+ Content: sessionData,
+ }
+
+ if err := fs.filer.CreateEntry(ctx, entry, false, false, nil, false, fs.filer.MaxFilenameLength); err != nil {
+ return fmt.Errorf("save session info entry: %w", err)
+ }
+
+ return nil
+}
+
+// getTusSession retrieves a TUS session by upload ID
+func (fs *FilerServer) getTusSession(ctx context.Context, uploadID string) (*TusSession, error) {
+ infoPath := util.FullPath(fs.tusSessionInfoPath(uploadID))
+ entry, err := fs.filer.FindEntry(ctx, infoPath)
+ if err != nil {
+ if err == filer_pb.ErrNotFound {
+ return nil, fmt.Errorf("session not found: %s", uploadID)
+ }
+ return nil, fmt.Errorf("find session: %w", err)
+ }
+
+ var session TusSession
+ if err := json.Unmarshal(entry.Content, &session); err != nil {
+ return nil, fmt.Errorf("unmarshal session: %w", err)
+ }
+
+ return &session, nil
+}
+
+// updateTusSessionOffset updates the session offset after a successful chunk upload
+func (fs *FilerServer) updateTusSessionOffset(ctx context.Context, uploadID string, newOffset int64, chunk *TusChunkInfo) error {
+ session, err := fs.getTusSession(ctx, uploadID)
+ if err != nil {
+ return err
+ }
+
+ session.Offset = newOffset
+ if chunk != nil {
+ session.Chunks = append(session.Chunks, chunk)
+ }
+
+ return fs.saveTusSession(ctx, session)
+}
+
+// deleteTusSession removes a TUS upload session and all its data
+func (fs *FilerServer) deleteTusSession(ctx context.Context, uploadID string) error {
+ session, err := fs.getTusSession(ctx, uploadID)
+ if err != nil {
+ // Session might already be deleted or never existed
+ glog.V(1).Infof("TUS session %s not found for deletion: %v", uploadID, err)
+ return nil
+ }
+
+ // Delete any uploaded chunks from volume servers
+ for _, chunk := range session.Chunks {
+ if chunk.FileId != "" {
+ fs.filer.DeleteChunks(ctx, util.FullPath(session.TargetPath), []*filer_pb.FileChunk{
+ {FileId: chunk.FileId},
+ })
+ }
+ }
+
+ // Delete the session directory
+ sessionDirPath := util.FullPath(fs.tusSessionPath(uploadID))
+ if err := fs.filer.DeleteEntryMetaAndData(ctx, sessionDirPath, true, true, false, false, nil, 0); err != nil {
+ return fmt.Errorf("delete session directory: %w", err)
+ }
+
+ glog.V(2).Infof("Deleted TUS session %s", uploadID)
+ return nil
+}
+
+// completeTusUpload assembles all chunks and creates the final file
+func (fs *FilerServer) completeTusUpload(ctx context.Context, session *TusSession) error {
+ if session.Offset != session.Size {
+ return fmt.Errorf("upload incomplete: offset=%d, expected=%d", session.Offset, session.Size)
+ }
+
+ // Assemble file chunks in order
+ var fileChunks []*filer_pb.FileChunk
+ var offset int64 = 0
+
+ for _, chunk := range session.Chunks {
+ fid, fidErr := filer_pb.ToFileIdObject(chunk.FileId)
+ if fidErr != nil {
+ glog.Warningf("Invalid file ID %s: %v", chunk.FileId, fidErr)
+ continue
+ }
+
+ fileChunk := &filer_pb.FileChunk{
+ FileId: chunk.FileId,
+ Offset: offset,
+ Size: uint64(chunk.Size),
+ ModifiedTsNs: chunk.UploadAt,
+ Fid: fid,
+ }
+ fileChunks = append(fileChunks, fileChunk)
+ offset += chunk.Size
+ }
+
+ // Determine content type from metadata
+ contentType := ""
+ if session.Metadata != nil {
+ if ct, ok := session.Metadata["content-type"]; ok {
+ contentType = ct
+ }
+ }
+
+ // Create the final file entry
+ targetPath := util.FullPath(session.TargetPath)
+ entry := &filer.Entry{
+ FullPath: targetPath,
+ Attr: filer.Attr{
+ Mode: 0644,
+ Crtime: session.CreatedAt,
+ Mtime: time.Now(),
+ Uid: OS_UID,
+ Gid: OS_GID,
+ Mime: contentType,
+ },
+ Chunks: fileChunks,
+ }
+
+ // Ensure parent directory exists
+ if err := fs.filer.CreateEntry(ctx, entry, false, false, nil, false, fs.filer.MaxFilenameLength); err != nil {
+ return fmt.Errorf("create final file entry: %w", err)
+ }
+
+ // Delete the session (but keep the chunks since they're now part of the final file)
+ sessionDirPath := util.FullPath(fs.tusSessionPath(session.ID))
+ if err := fs.filer.DeleteEntryMetaAndData(ctx, sessionDirPath, true, false, false, false, nil, 0); err != nil {
+ glog.V(1).Infof("Failed to cleanup TUS session directory %s: %v", session.ID, err)
+ }
+
+ glog.V(2).Infof("Completed TUS upload %s -> %s, size=%d, chunks=%d",
+ session.ID, session.TargetPath, session.Size, len(fileChunks))
+
+ return nil
+}
+