diff options
| author | tnextday <fw2k4@163.com> | 2015-12-01 20:23:50 +0800 |
|---|---|---|
| committer | tnextday <fw2k4@163.com> | 2015-12-01 20:23:50 +0800 |
| commit | 6b0894d80635047479548bb19029300a596b4d55 (patch) | |
| tree | aef605f25524928a3c5aa0092f4aa5cf6e39f7b1 /go/weed | |
| parent | f825d237890c7df70c3d752668ddbc2deab3890e (diff) | |
| download | seaweedfs-6b0894d80635047479548bb19029300a596b4d55.tar.xz seaweedfs-6b0894d80635047479548bb19029300a596b4d55.zip | |
update ChunkedFile to seekable reader, so we can use io.* to read data
Diffstat (limited to 'go/weed')
| -rw-r--r-- | go/weed/weed_server/common.go | 2 | ||||
| -rw-r--r-- | go/weed/weed_server/volume_server_handlers_read.go | 150 |
2 files changed, 151 insertions, 1 deletions
diff --git a/go/weed/weed_server/common.go b/go/weed/weed_server/common.go index 4ad9824b1..a7fa2de53 100644 --- a/go/weed/weed_server/common.go +++ b/go/weed/weed_server/common.go @@ -86,7 +86,7 @@ func submitForClientHandler(w http.ResponseWriter, r *http.Request, masterUrl st } debug("parsing upload file...") - fname, data, mimeType, isGzipped, lastModified, _, pe := storage.ParseUpload(r) + fname, data, mimeType, isGzipped, lastModified, _, _, pe := storage.ParseUpload(r) if pe != nil { writeJsonError(w, r, http.StatusBadRequest, pe) return diff --git a/go/weed/weed_server/volume_server_handlers_read.go b/go/weed/weed_server/volume_server_handlers_read.go index 9e252d205..febed354b 100644 --- a/go/weed/weed_server/volume_server_handlers_read.go +++ b/go/weed/weed_server/volume_server_handlers_read.go @@ -81,6 +81,11 @@ func (vs *VolumeServer) GetOrHeadHandler(w http.ResponseWriter, r *http.Request) return } w.Header().Set("Etag", etag) + + if vs.tryHandleChunkedFile(n, filename, w, r) { + return + } + if n.NameSize > 0 && filename == "" { filename = string(n.Name) dotIndex := strings.LastIndex(filename, ".") @@ -215,3 +220,148 @@ func (vs *VolumeServer) GetOrHeadHandler(w http.ResponseWriter, r *http.Request) io.CopyN(w, sendContent, sendSize) } + +func (vs *VolumeServer) tryHandleChunkedFile(n *storage.Needle, fileName string, w http.ResponseWriter, r *http.Request) (processed bool) { + if !n.IsChunkedFile() { + return false + } + processed = true + raw, _ := strconv.ParseBool(r.FormValue("raw")) + if raw { + w.Header().Set("Content-Type", "application/json") + w.Header().Set("Content-Length", strconv.Itoa(len(n.Data))) + if _, e := w.Write(n.Data); e != nil { + glog.V(0).Infoln("response write error:", e) + } + return true + } + chunkManifest, e := operation.LoadChunkedManifest(n.Data) + if e != nil { + return false + } + ext := "" + if fileName == "" && chunkManifest.Name != "" { + fileName = chunkManifest.Name + dotIndex := strings.LastIndex(fileName, ".") + if dotIndex > 0 { + ext = fileName[dotIndex:] + } + } + mtype := "" + if ext != "" { + mtype = mime.TypeByExtension(ext) + } + if chunkManifest.Mime != "" { + mt := chunkManifest.Mime + if !strings.HasPrefix(mt, "application/octet-stream") { + mtype = mt + } + } + if mtype != "" { + w.Header().Set("Content-Type", mtype) + } + if fileName != "" { + w.Header().Set("Content-Disposition", `filename="`+fileNameEscaper.Replace(fileName)+`"`) + } + w.Header().Set("X-File-Store", "chunked") + w.Header().Set("Accept-Ranges", "bytes") + if r.Method == "HEAD" { + w.Header().Set("Content-Length", strconv.FormatInt(chunkManifest.Size, 10)) + return true + } + + chunkedFileReader := operation.ChunkedFileReader{ + Manifest: chunkManifest, + Master: vs.GetMasterNode(), + } + defer chunkedFileReader.Close() + rangeReq := r.Header.Get("Range") + if rangeReq == "" { + w.Header().Set("Content-Length", strconv.FormatInt(chunkManifest.Size, 10)) + if _, e = io.Copy(w, chunkedFileReader); e != nil { + glog.V(0).Infoln("response write error:", e) + } + return true + } + + //the rest is dealing with partial content request + //mostly copy from src/pkg/net/http/fs.go + size := chunkManifest.Size + ranges, err := parseRange(rangeReq, size) + if err != nil { + http.Error(w, err.Error(), http.StatusRequestedRangeNotSatisfiable) + return + } + if sumRangesSize(ranges) > size { + // The total number of bytes in all the ranges + // is larger than the size of the file by + // itself, so this is probably an attack, or a + // dumb client. Ignore the range request. + ranges = nil + return + } + if len(ranges) == 0 { + return + } + if len(ranges) == 1 { + // RFC 2616, Section 14.16: + // "When an HTTP message includes the content of a single + // range (for example, a response to a request for a + // single range, or to a request for a set of ranges + // that overlap without any holes), this content is + // transmitted with a Content-Range header, and a + // Content-Length header showing the number of bytes + // actually transferred. + // ... + // A response to a request for a single range MUST NOT + // be sent using the multipart/byteranges media type." + ra := ranges[0] + w.Header().Set("Content-Length", strconv.FormatInt(ra.length, 10)) + w.Header().Set("Content-Range", ra.contentRange(size)) + w.WriteHeader(http.StatusPartialContent) + if _, e = chunkedFileReader.Seek(ra.start, 0); e != nil { + glog.V(0).Infoln("response write error:", e) + } + if _, e = io.CopyN(w, chunkedFileReader, ra.length); e != nil { + glog.V(0).Infoln("response write error:", e) + } + return + } + // process multiple ranges + for _, ra := range ranges { + if ra.start > size { + http.Error(w, "Out of Range", http.StatusRequestedRangeNotSatisfiable) + return + } + } + sendSize := rangesMIMESize(ranges, mtype, size) + pr, pw := io.Pipe() + mw := multipart.NewWriter(pw) + w.Header().Set("Content-Type", "multipart/byteranges; boundary="+mw.Boundary()) + sendContent := pr + defer pr.Close() // cause writing goroutine to fail and exit if CopyN doesn't finish. + go func() { + for _, ra := range ranges { + part, err := mw.CreatePart(ra.mimeHeader(mtype, size)) + if err != nil { + pw.CloseWithError(err) + return + } + if _, e = chunkedFileReader.Seek(ra.start, 0); e != nil { + glog.V(0).Infoln("response write error:", e) + } + if _, err = io.CopyN(part, chunkedFileReader, ra.length); err != nil { + pw.CloseWithError(err) + return + } + } + mw.Close() + pw.Close() + }() + if w.Header().Get("Content-Encoding") == "" { + w.Header().Set("Content-Length", strconv.FormatInt(sendSize, 10)) + } + w.WriteHeader(http.StatusPartialContent) + io.CopyN(w, sendContent, sendSize) + return +} |
