aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorchrislu <chris.lu@gmail.com>2025-12-01 12:56:42 -0800
committerchrislu <chris.lu@gmail.com>2025-12-01 13:28:21 -0800
commit44d0925d7bd932e8836e8884e751191231b6a298 (patch)
tree2f003b7bf7996da9efd1231a44e3856057dd0d20
parent3675d9db016c486cffcd77e67eaf7802aefd3f18 (diff)
downloadseaweedfs-44d0925d7bd932e8836e8884e751191231b6a298.tar.xz
seaweedfs-44d0925d7bd932e8836e8884e751191231b6a298.zip
Add TUS HTTP handlers
Implements TUS protocol HTTP handlers: - tusHandler: Main entry point routing requests - tusOptionsHandler: Capability discovery (OPTIONS) - tusCreateHandler: Create new upload (POST) - tusHeadHandler: Get upload offset (HEAD) - tusPatchHandler: Upload data at offset (PATCH) - tusDeleteHandler: Cancel upload (DELETE) - tusWriteData: Upload data to volume servers Features: - Supports creation-with-upload extension - Validates TUS protocol headers - Offset conflict detection - Automatic upload completion when size is reached - Metadata parsing from Upload-Metadata header
-rw-r--r--weed/server/filer_server_tus_handlers.go352
1 files changed, 352 insertions, 0 deletions
diff --git a/weed/server/filer_server_tus_handlers.go b/weed/server/filer_server_tus_handlers.go
new file mode 100644
index 000000000..8a426941e
--- /dev/null
+++ b/weed/server/filer_server_tus_handlers.go
@@ -0,0 +1,352 @@
+package weed_server
+
+import (
+ "bytes"
+ "context"
+ "encoding/base64"
+ "fmt"
+ "io"
+ "net/http"
+ "strconv"
+ "strings"
+ "time"
+
+ "github.com/google/uuid"
+ "github.com/seaweedfs/seaweedfs/weed/glog"
+ "github.com/seaweedfs/seaweedfs/weed/operation"
+ "github.com/seaweedfs/seaweedfs/weed/pb/filer_pb"
+ "github.com/seaweedfs/seaweedfs/weed/stats"
+ "github.com/seaweedfs/seaweedfs/weed/util"
+)
+
+// tusHandler is the main entry point for TUS protocol requests
+func (fs *FilerServer) tusHandler(w http.ResponseWriter, r *http.Request) {
+ // Set common TUS response headers
+ w.Header().Set("Tus-Resumable", TusVersion)
+
+ // Check Tus-Resumable header for non-OPTIONS requests
+ if r.Method != http.MethodOptions {
+ tusVersion := r.Header.Get("Tus-Resumable")
+ if tusVersion != TusVersion {
+ http.Error(w, "Unsupported TUS version", http.StatusPreconditionFailed)
+ return
+ }
+ }
+
+ // Route based on method and path
+ path := r.URL.Path
+ tusPrefix := "/.tus"
+
+ // Check if this is an upload location (contains upload ID after /.tus/.uploads/)
+ if strings.HasPrefix(path, tusPrefix+"/.uploads/") {
+ uploadID := strings.TrimPrefix(path, tusPrefix+"/.uploads/")
+ uploadID = strings.Split(uploadID, "/")[0] // Get just the ID, not any trailing path
+
+ switch r.Method {
+ case http.MethodHead:
+ fs.tusHeadHandler(w, r, uploadID)
+ case http.MethodPatch:
+ fs.tusPatchHandler(w, r, uploadID)
+ case http.MethodDelete:
+ fs.tusDeleteHandler(w, r, uploadID)
+ default:
+ w.WriteHeader(http.StatusMethodNotAllowed)
+ }
+ return
+ }
+
+ // Handle creation endpoints (POST to /.tus/{path})
+ switch r.Method {
+ case http.MethodOptions:
+ fs.tusOptionsHandler(w, r)
+ case http.MethodPost:
+ fs.tusCreateHandler(w, r)
+ default:
+ w.WriteHeader(http.StatusMethodNotAllowed)
+ }
+}
+
+// tusOptionsHandler handles OPTIONS requests for capability discovery
+func (fs *FilerServer) tusOptionsHandler(w http.ResponseWriter, r *http.Request) {
+ w.Header().Set("Tus-Version", TusVersion)
+ w.Header().Set("Tus-Extension", TusExtensions)
+ w.Header().Set("Tus-Max-Size", strconv.FormatInt(TusMaxSize, 10))
+ w.WriteHeader(http.StatusOK)
+}
+
+// tusCreateHandler handles POST requests to create new uploads
+func (fs *FilerServer) tusCreateHandler(w http.ResponseWriter, r *http.Request) {
+ ctx := r.Context()
+
+ // Parse Upload-Length header (required)
+ uploadLengthStr := r.Header.Get("Upload-Length")
+ if uploadLengthStr == "" {
+ http.Error(w, "Upload-Length header required", http.StatusBadRequest)
+ return
+ }
+ uploadLength, err := strconv.ParseInt(uploadLengthStr, 10, 64)
+ if err != nil || uploadLength < 0 {
+ http.Error(w, "Invalid Upload-Length", http.StatusBadRequest)
+ return
+ }
+ if uploadLength > TusMaxSize {
+ http.Error(w, "Upload-Length exceeds maximum", http.StatusRequestEntityTooLarge)
+ return
+ }
+
+ // Parse Upload-Metadata header (optional)
+ metadata := parseTusMetadata(r.Header.Get("Upload-Metadata"))
+
+ // Determine target path from request URL
+ targetPath := strings.TrimPrefix(r.URL.Path, "/.tus")
+ if targetPath == "" || targetPath == "/" {
+ http.Error(w, "Target path required", http.StatusBadRequest)
+ return
+ }
+
+ // Generate upload ID
+ uploadID := uuid.New().String()
+
+ // Create upload session
+ session, err := fs.createTusSession(ctx, uploadID, targetPath, uploadLength, metadata)
+ if err != nil {
+ glog.Errorf("Failed to create TUS session: %v", err)
+ http.Error(w, "Failed to create upload", http.StatusInternalServerError)
+ return
+ }
+
+ // Build upload location URL
+ uploadLocation := fmt.Sprintf("/.tus/.uploads/%s", uploadID)
+
+ // Handle creation-with-upload extension
+ if r.ContentLength > 0 && r.Header.Get("Content-Type") == "application/offset+octet-stream" {
+ // Upload data in the creation request
+ bytesWritten, uploadErr := fs.tusWriteData(ctx, session, 0, r.Body, r.ContentLength)
+ if uploadErr != nil {
+ // Cleanup session on failure
+ fs.deleteTusSession(ctx, uploadID)
+ glog.Errorf("Failed to write initial TUS data: %v", uploadErr)
+ http.Error(w, "Failed to write data", http.StatusInternalServerError)
+ return
+ }
+
+ // Update offset in response header
+ w.Header().Set("Upload-Offset", strconv.FormatInt(bytesWritten, 10))
+
+ // Check if upload is complete
+ if bytesWritten == session.Size {
+ if err := fs.completeTusUpload(ctx, session); err != nil {
+ glog.Errorf("Failed to complete TUS upload: %v", err)
+ http.Error(w, "Failed to complete upload", http.StatusInternalServerError)
+ return
+ }
+ }
+ }
+
+ w.Header().Set("Location", uploadLocation)
+ w.WriteHeader(http.StatusCreated)
+}
+
+// tusHeadHandler handles HEAD requests to get current upload offset
+func (fs *FilerServer) tusHeadHandler(w http.ResponseWriter, r *http.Request, uploadID string) {
+ ctx := r.Context()
+
+ session, err := fs.getTusSession(ctx, uploadID)
+ if err != nil {
+ http.Error(w, "Upload not found", http.StatusNotFound)
+ return
+ }
+
+ w.Header().Set("Upload-Offset", strconv.FormatInt(session.Offset, 10))
+ w.Header().Set("Upload-Length", strconv.FormatInt(session.Size, 10))
+ w.Header().Set("Cache-Control", "no-store")
+ w.WriteHeader(http.StatusOK)
+}
+
+// tusPatchHandler handles PATCH requests to upload data
+func (fs *FilerServer) tusPatchHandler(w http.ResponseWriter, r *http.Request, uploadID string) {
+ ctx := r.Context()
+
+ // Validate Content-Type
+ contentType := r.Header.Get("Content-Type")
+ if contentType != "application/offset+octet-stream" {
+ http.Error(w, "Content-Type must be application/offset+octet-stream", http.StatusUnsupportedMediaType)
+ return
+ }
+
+ // Get current session
+ session, err := fs.getTusSession(ctx, uploadID)
+ if err != nil {
+ http.Error(w, "Upload not found", http.StatusNotFound)
+ return
+ }
+
+ // Validate Upload-Offset header
+ uploadOffsetStr := r.Header.Get("Upload-Offset")
+ if uploadOffsetStr == "" {
+ http.Error(w, "Upload-Offset header required", http.StatusBadRequest)
+ return
+ }
+ uploadOffset, err := strconv.ParseInt(uploadOffsetStr, 10, 64)
+ if err != nil || uploadOffset < 0 {
+ http.Error(w, "Invalid Upload-Offset", http.StatusBadRequest)
+ return
+ }
+
+ // Check offset matches current position
+ if uploadOffset != session.Offset {
+ http.Error(w, fmt.Sprintf("Offset mismatch: expected %d, got %d", session.Offset, uploadOffset), http.StatusConflict)
+ return
+ }
+
+ // Write data
+ bytesWritten, err := fs.tusWriteData(ctx, session, uploadOffset, r.Body, r.ContentLength)
+ if err != nil {
+ glog.Errorf("Failed to write TUS data: %v", err)
+ http.Error(w, "Failed to write data", http.StatusInternalServerError)
+ return
+ }
+
+ newOffset := uploadOffset + bytesWritten
+
+ // Check if upload is complete
+ if newOffset == session.Size {
+ // Refresh session to get updated chunks
+ session, err = fs.getTusSession(ctx, uploadID)
+ if err != nil {
+ glog.Errorf("Failed to get updated TUS session: %v", err)
+ http.Error(w, "Failed to complete upload", http.StatusInternalServerError)
+ return
+ }
+
+ if err := fs.completeTusUpload(ctx, session); err != nil {
+ glog.Errorf("Failed to complete TUS upload: %v", err)
+ http.Error(w, "Failed to complete upload", http.StatusInternalServerError)
+ return
+ }
+ }
+
+ w.Header().Set("Upload-Offset", strconv.FormatInt(newOffset, 10))
+ w.WriteHeader(http.StatusNoContent)
+}
+
+// tusDeleteHandler handles DELETE requests to cancel uploads
+func (fs *FilerServer) tusDeleteHandler(w http.ResponseWriter, r *http.Request, uploadID string) {
+ ctx := r.Context()
+
+ if err := fs.deleteTusSession(ctx, uploadID); err != nil {
+ glog.Errorf("Failed to delete TUS session: %v", err)
+ http.Error(w, "Failed to delete upload", http.StatusInternalServerError)
+ return
+ }
+
+ w.WriteHeader(http.StatusNoContent)
+}
+
+// tusWriteData uploads data to volume servers and updates session
+func (fs *FilerServer) tusWriteData(ctx context.Context, session *TusSession, offset int64, reader io.Reader, contentLength int64) (int64, error) {
+ if contentLength == 0 {
+ return 0, nil
+ }
+
+ // Limit content length to remaining size
+ remaining := session.Size - offset
+ if contentLength > remaining {
+ contentLength = remaining
+ }
+ if contentLength <= 0 {
+ return 0, nil
+ }
+
+ // Read data into buffer
+ buf := new(bytes.Buffer)
+ n, err := io.CopyN(buf, reader, contentLength)
+ if err != nil && err != io.EOF {
+ return 0, fmt.Errorf("read data: %w", err)
+ }
+ if n == 0 {
+ return 0, nil
+ }
+
+ // Determine storage options based on target path
+ so, err := fs.detectStorageOption0(ctx, session.TargetPath, "", "", "", "", "", "", "", "", "")
+ if err != nil {
+ return 0, fmt.Errorf("detect storage option: %w", err)
+ }
+
+ // Assign file ID from master
+ fileId, urlLocation, auth, assignErr := fs.assignNewFileInfo(ctx, so)
+ if assignErr != nil {
+ return 0, fmt.Errorf("assign volume: %w", assignErr)
+ }
+
+ // Upload to volume server
+ uploader, uploaderErr := operation.NewUploader()
+ if uploaderErr != nil {
+ return 0, fmt.Errorf("create uploader: %w", uploaderErr)
+ }
+
+ uploadResult, uploadErr, _ := uploader.Upload(ctx, bytes.NewReader(buf.Bytes()), &operation.UploadOption{
+ UploadUrl: urlLocation,
+ Filename: "",
+ Cipher: false,
+ IsInputCompressed: false,
+ MimeType: "",
+ PairMap: nil,
+ Jwt: auth,
+ })
+ if uploadErr != nil {
+ return 0, fmt.Errorf("upload data: %w", uploadErr)
+ }
+
+ // Create chunk info
+ chunk := &TusChunkInfo{
+ Offset: offset,
+ Size: int64(uploadResult.Size),
+ FileId: fileId,
+ UploadAt: time.Now().UnixNano(),
+ }
+
+ // Update session
+ if err := fs.updateTusSessionOffset(ctx, session.ID, offset+int64(uploadResult.Size), chunk); err != nil {
+ // Try to clean up the uploaded chunk
+ fs.filer.DeleteChunks(ctx, util.FullPath(session.TargetPath), []*filer_pb.FileChunk{
+ {FileId: fileId},
+ })
+ return 0, fmt.Errorf("update session: %w", err)
+ }
+
+ stats.FilerHandlerCounter.WithLabelValues("tusUploadChunk").Inc()
+
+ return int64(uploadResult.Size), nil
+}
+
+// parseTusMetadata parses the Upload-Metadata header
+// Format: key1 base64value1,key2 base64value2,...
+func parseTusMetadata(header string) map[string]string {
+ metadata := make(map[string]string)
+ if header == "" {
+ return metadata
+ }
+
+ pairs := strings.Split(header, ",")
+ for _, pair := range pairs {
+ pair = strings.TrimSpace(pair)
+ parts := strings.SplitN(pair, " ", 2)
+ if len(parts) != 2 {
+ continue
+ }
+ key := strings.TrimSpace(parts[0])
+ encodedValue := strings.TrimSpace(parts[1])
+
+ value, err := base64.StdEncoding.DecodeString(encodedValue)
+ if err != nil {
+ glog.V(1).Infof("Failed to decode TUS metadata value for key %s: %v", key, err)
+ continue
+ }
+ metadata[key] = string(value)
+ }
+
+ return metadata
+}
+