aboutsummaryrefslogtreecommitdiff
path: root/go/weed
diff options
context:
space:
mode:
authortnextday <fw2k4@163.com>2015-12-01 20:23:50 +0800
committertnextday <fw2k4@163.com>2015-12-01 20:23:50 +0800
commit6b0894d80635047479548bb19029300a596b4d55 (patch)
treeaef605f25524928a3c5aa0092f4aa5cf6e39f7b1 /go/weed
parentf825d237890c7df70c3d752668ddbc2deab3890e (diff)
downloadseaweedfs-6b0894d80635047479548bb19029300a596b4d55.tar.xz
seaweedfs-6b0894d80635047479548bb19029300a596b4d55.zip
update ChunkedFile to seekable reader, so we can use io.* to read data
Diffstat (limited to 'go/weed')
-rw-r--r--go/weed/weed_server/common.go2
-rw-r--r--go/weed/weed_server/volume_server_handlers_read.go150
2 files changed, 151 insertions, 1 deletions
diff --git a/go/weed/weed_server/common.go b/go/weed/weed_server/common.go
index 4ad9824b1..a7fa2de53 100644
--- a/go/weed/weed_server/common.go
+++ b/go/weed/weed_server/common.go
@@ -86,7 +86,7 @@ func submitForClientHandler(w http.ResponseWriter, r *http.Request, masterUrl st
}
debug("parsing upload file...")
- fname, data, mimeType, isGzipped, lastModified, _, pe := storage.ParseUpload(r)
+ fname, data, mimeType, isGzipped, lastModified, _, _, pe := storage.ParseUpload(r)
if pe != nil {
writeJsonError(w, r, http.StatusBadRequest, pe)
return
diff --git a/go/weed/weed_server/volume_server_handlers_read.go b/go/weed/weed_server/volume_server_handlers_read.go
index 9e252d205..febed354b 100644
--- a/go/weed/weed_server/volume_server_handlers_read.go
+++ b/go/weed/weed_server/volume_server_handlers_read.go
@@ -81,6 +81,11 @@ func (vs *VolumeServer) GetOrHeadHandler(w http.ResponseWriter, r *http.Request)
return
}
w.Header().Set("Etag", etag)
+
+ if vs.tryHandleChunkedFile(n, filename, w, r) {
+ return
+ }
+
if n.NameSize > 0 && filename == "" {
filename = string(n.Name)
dotIndex := strings.LastIndex(filename, ".")
@@ -215,3 +220,148 @@ func (vs *VolumeServer) GetOrHeadHandler(w http.ResponseWriter, r *http.Request)
io.CopyN(w, sendContent, sendSize)
}
+
+func (vs *VolumeServer) tryHandleChunkedFile(n *storage.Needle, fileName string, w http.ResponseWriter, r *http.Request) (processed bool) {
+ if !n.IsChunkedFile() {
+ return false
+ }
+ processed = true
+ raw, _ := strconv.ParseBool(r.FormValue("raw"))
+ if raw {
+ w.Header().Set("Content-Type", "application/json")
+ w.Header().Set("Content-Length", strconv.Itoa(len(n.Data)))
+ if _, e := w.Write(n.Data); e != nil {
+ glog.V(0).Infoln("response write error:", e)
+ }
+ return true
+ }
+ chunkManifest, e := operation.LoadChunkedManifest(n.Data)
+ if e != nil {
+ return false
+ }
+ ext := ""
+ if fileName == "" && chunkManifest.Name != "" {
+ fileName = chunkManifest.Name
+ dotIndex := strings.LastIndex(fileName, ".")
+ if dotIndex > 0 {
+ ext = fileName[dotIndex:]
+ }
+ }
+ mtype := ""
+ if ext != "" {
+ mtype = mime.TypeByExtension(ext)
+ }
+ if chunkManifest.Mime != "" {
+ mt := chunkManifest.Mime
+ if !strings.HasPrefix(mt, "application/octet-stream") {
+ mtype = mt
+ }
+ }
+ if mtype != "" {
+ w.Header().Set("Content-Type", mtype)
+ }
+ if fileName != "" {
+ w.Header().Set("Content-Disposition", `filename="`+fileNameEscaper.Replace(fileName)+`"`)
+ }
+ w.Header().Set("X-File-Store", "chunked")
+ w.Header().Set("Accept-Ranges", "bytes")
+ if r.Method == "HEAD" {
+ w.Header().Set("Content-Length", strconv.FormatInt(chunkManifest.Size, 10))
+ return true
+ }
+
+ chunkedFileReader := operation.ChunkedFileReader{
+ Manifest: chunkManifest,
+ Master: vs.GetMasterNode(),
+ }
+ defer chunkedFileReader.Close()
+ rangeReq := r.Header.Get("Range")
+ if rangeReq == "" {
+ w.Header().Set("Content-Length", strconv.FormatInt(chunkManifest.Size, 10))
+ if _, e = io.Copy(w, chunkedFileReader); e != nil {
+ glog.V(0).Infoln("response write error:", e)
+ }
+ return true
+ }
+
+ //the rest is dealing with partial content request
+ //mostly copy from src/pkg/net/http/fs.go
+ size := chunkManifest.Size
+ ranges, err := parseRange(rangeReq, size)
+ if err != nil {
+ http.Error(w, err.Error(), http.StatusRequestedRangeNotSatisfiable)
+ return
+ }
+ if sumRangesSize(ranges) > size {
+ // The total number of bytes in all the ranges
+ // is larger than the size of the file by
+ // itself, so this is probably an attack, or a
+ // dumb client. Ignore the range request.
+ ranges = nil
+ return
+ }
+ if len(ranges) == 0 {
+ return
+ }
+ if len(ranges) == 1 {
+ // RFC 2616, Section 14.16:
+ // "When an HTTP message includes the content of a single
+ // range (for example, a response to a request for a
+ // single range, or to a request for a set of ranges
+ // that overlap without any holes), this content is
+ // transmitted with a Content-Range header, and a
+ // Content-Length header showing the number of bytes
+ // actually transferred.
+ // ...
+ // A response to a request for a single range MUST NOT
+ // be sent using the multipart/byteranges media type."
+ ra := ranges[0]
+ w.Header().Set("Content-Length", strconv.FormatInt(ra.length, 10))
+ w.Header().Set("Content-Range", ra.contentRange(size))
+ w.WriteHeader(http.StatusPartialContent)
+ if _, e = chunkedFileReader.Seek(ra.start, 0); e != nil {
+ glog.V(0).Infoln("response write error:", e)
+ }
+ if _, e = io.CopyN(w, chunkedFileReader, ra.length); e != nil {
+ glog.V(0).Infoln("response write error:", e)
+ }
+ return
+ }
+ // process multiple ranges
+ for _, ra := range ranges {
+ if ra.start > size {
+ http.Error(w, "Out of Range", http.StatusRequestedRangeNotSatisfiable)
+ return
+ }
+ }
+ sendSize := rangesMIMESize(ranges, mtype, size)
+ pr, pw := io.Pipe()
+ mw := multipart.NewWriter(pw)
+ w.Header().Set("Content-Type", "multipart/byteranges; boundary="+mw.Boundary())
+ sendContent := pr
+ defer pr.Close() // cause writing goroutine to fail and exit if CopyN doesn't finish.
+ go func() {
+ for _, ra := range ranges {
+ part, err := mw.CreatePart(ra.mimeHeader(mtype, size))
+ if err != nil {
+ pw.CloseWithError(err)
+ return
+ }
+ if _, e = chunkedFileReader.Seek(ra.start, 0); e != nil {
+ glog.V(0).Infoln("response write error:", e)
+ }
+ if _, err = io.CopyN(part, chunkedFileReader, ra.length); err != nil {
+ pw.CloseWithError(err)
+ return
+ }
+ }
+ mw.Close()
+ pw.Close()
+ }()
+ if w.Header().Get("Content-Encoding") == "" {
+ w.Header().Set("Content-Length", strconv.FormatInt(sendSize, 10))
+ }
+ w.WriteHeader(http.StatusPartialContent)
+ io.CopyN(w, sendContent, sendSize)
+ return
+}