diff options
| author | Mike Tolman <mike.tolman@fidelissecurity.com> | 2016-08-05 15:47:46 -0600 |
|---|---|---|
| committer | Mike Tolman <mike.tolman@fidelissecurity.com> | 2016-08-05 15:47:46 -0600 |
| commit | a89a3c86d0bfa20ead98fec1d286cdc6018c3bde (patch) | |
| tree | 801566226da8e0e5880b43644aae0f3124b3c8ac /weed/server | |
| parent | 0d331c1e3ae0d038ae972279a63d2ff9a70e25f4 (diff) | |
| download | seaweedfs-a89a3c86d0bfa20ead98fec1d286cdc6018c3bde.tar.xz seaweedfs-a89a3c86d0bfa20ead98fec1d286cdc6018c3bde.zip | |
Revert "Add AutoChunking to the Filer API, so that you can upload really large files through the filer API."
This reverts commit 09059bfdccdeff1a588ee1326318075adb068b0f.
Diffstat (limited to 'weed/server')
| -rw-r--r-- | weed/server/filer_server.go | 2 | ||||
| -rw-r--r-- | weed/server/filer_server_handlers_write.go | 208 |
2 files changed, 0 insertions, 210 deletions
diff --git a/weed/server/filer_server.go b/weed/server/filer_server.go index c9bc0e021..b99bbd7c9 100644 --- a/weed/server/filer_server.go +++ b/weed/server/filer_server.go @@ -28,7 +28,6 @@ type FilerServer struct { disableDirListing bool secret security.Secret filer filer.Filer - maxMB int masterNodes *storage.MasterNodes } @@ -44,7 +43,6 @@ func NewFilerServer(r *http.ServeMux, ip string, port int, master string, dir st defaultReplication: replication, redirectOnRead: redirectOnRead, disableDirListing: disableDirListing, - maxMB: maxMB, port: ip + ":" + strconv.Itoa(port), } diff --git a/weed/server/filer_server_handlers_write.go b/weed/server/filer_server_handlers_write.go index 872d8c4b9..e2d40f532 100644 --- a/weed/server/filer_server_handlers_write.go +++ b/weed/server/filer_server_handlers_write.go @@ -20,8 +20,6 @@ import ( "github.com/chrislusf/seaweedfs/weed/storage" "github.com/chrislusf/seaweedfs/weed/util" "github.com/syndtr/goleveldb/leveldb" - "path" - "strconv" ) type FilerPostResult struct { @@ -219,7 +217,6 @@ func (fs *FilerServer) monolithicUploadAnalyzer(w http.ResponseWriter, r *http.R } func (fs *FilerServer) PostHandler(w http.ResponseWriter, r *http.Request) { - query := r.URL.Query() replication := query.Get("replication") if replication == "" { @@ -230,10 +227,6 @@ func (fs *FilerServer) PostHandler(w http.ResponseWriter, r *http.Request) { collection = fs.collection } - if autoChunked := fs.autoChunk(w, r, replication, collection); autoChunked { - return - } - var fileId, urlLocation string var err error @@ -250,17 +243,7 @@ func (fs *FilerServer) PostHandler(w http.ResponseWriter, r *http.Request) { } u, _ := url.Parse(urlLocation) - - // This allows a client to generate a chunk manifest and submit it to the filer -- it is a little off - // because they need to provide FIDs instead of file paths... - cm, _ := strconv.ParseBool(query.Get("cm")) - if cm { - q := u.Query() - q.Set("cm", "true") - u.RawQuery = q.Encode() - } glog.V(4).Infoln("post to", u) - request := &http.Request{ Method: r.Method, URL: u, @@ -336,197 +319,6 @@ func (fs *FilerServer) PostHandler(w http.ResponseWriter, r *http.Request) { writeJsonQuiet(w, r, http.StatusCreated, reply) } -func (fs *FilerServer) autoChunk(w http.ResponseWriter, r *http.Request, replication string, collection string) bool { - if r.Method != "POST" { - glog.V(4).Infoln("AutoChunking not supported for method", r.Method) - return false - } - - // autoChunking can be set at the command-line level or as a query param. Query param overrides command-line - query := r.URL.Query() - - parsedMaxMB, _ := strconv.ParseInt(query.Get("maxMB"), 10, 32) - maxMB := int32(parsedMaxMB) - if maxMB <= 0 && fs.maxMB > 0 { - maxMB = int32(fs.maxMB) - } - if maxMB <= 0 { - glog.V(4).Infoln("AutoChunking not enabled") - return false - } - glog.V(4).Infoln("AutoChunking level set to", maxMB, "(MB)") - - chunkSize := 1024 * 1024 * maxMB - - contentLength := int64(0) - if contentLengthHeader := r.Header["Content-Length"]; len(contentLengthHeader) == 1 { - contentLength, _ = strconv.ParseInt(contentLengthHeader[0], 10, 64) - if contentLength <= int64(chunkSize) { - glog.V(4).Infoln("Content-Length of", contentLength, "is less than the chunk size of", chunkSize, "so autoChunking will be skipped.") - return false - } - } - - if contentLength <= 0 { - glog.V(4).Infoln("Content-Length value is missing or unexpected so autoChunking will be skipped.") - return false - } - - reply, err := fs.doAutoChunk(w, r, contentLength, chunkSize, replication, collection) - if err != nil { - writeJsonError(w, r, http.StatusInternalServerError, err) - } else if reply != nil { - writeJsonQuiet(w, r, http.StatusCreated, reply) - } - return true -} - -func (fs *FilerServer) doAutoChunk(w http.ResponseWriter, r *http.Request, contentLength int64, chunkSize int32, replication string, collection string) (filerResult *FilerPostResult, replyerr error) { - - multipartReader, multipartReaderErr := r.MultipartReader() - if multipartReaderErr != nil { - return nil, multipartReaderErr - } - - part1, part1Err := multipartReader.NextPart() - if part1Err != nil { - return nil, part1Err - } - - fileName := part1.FileName() - if fileName != "" { - fileName = path.Base(fileName) - } - - chunks := (int64(contentLength) / int64(chunkSize)) + 1 - cm := operation.ChunkManifest{ - Name: fileName, - Size: 0, // don't know yet - Mime: "application/octet-stream", - Chunks: make([]*operation.ChunkInfo, 0, chunks), - } - - totalBytesRead := int64(0) - tmpBufferSize := int32(1024 * 1024) - tmpBuffer := bytes.NewBuffer(make([]byte, 0, tmpBufferSize)) - chunkBuf := make([]byte, chunkSize+tmpBufferSize, chunkSize+tmpBufferSize) // chunk size plus a little overflow - chunkBufOffset := int32(0) - chunkOffset := int64(0) - writtenChunks := 0 - - filerResult = &FilerPostResult{ - Name: fileName, - } - - for totalBytesRead < contentLength { - tmpBuffer.Reset() - bytesRead, readErr := io.CopyN(tmpBuffer, part1, int64(tmpBufferSize)) - readFully := readErr != nil && readErr == io.EOF - tmpBuf := tmpBuffer.Bytes() - bytesToCopy := tmpBuf[0:int(bytesRead)] - - copy(chunkBuf[chunkBufOffset:chunkBufOffset+int32(bytesRead)], bytesToCopy) - chunkBufOffset = chunkBufOffset + int32(bytesRead) - - if chunkBufOffset >= chunkSize || readFully || (chunkBufOffset > 0 && bytesRead == 0) { - writtenChunks = writtenChunks + 1 - fileId, urlLocation, assignErr := fs.assignNewFileInfo(w, r, replication, collection) - if assignErr != nil { - return nil, assignErr - } - - // upload the chunk to the volume server - chunkName := fileName + "_chunk_" + strconv.FormatInt(int64(cm.Chunks.Len()+1), 10) - uploadErr := fs.doUpload(urlLocation, w, r, chunkBuf[0:chunkBufOffset], chunkName, "application/octet-stream", fileId) - if uploadErr != nil { - return nil, uploadErr - } - - // Save to chunk manifest structure - cm.Chunks = append(cm.Chunks, - &operation.ChunkInfo{ - Offset: chunkOffset, - Size: int64(chunkBufOffset), - Fid: fileId, - }, - ) - - // reset variables for the next chunk - chunkBufOffset = 0 - chunkOffset = totalBytesRead + int64(bytesRead) - } - - totalBytesRead = totalBytesRead + int64(bytesRead) - - if bytesRead == 0 || readFully { - break - } - - if readErr != nil { - return nil, readErr - } - } - - cm.Size = totalBytesRead - manifestBuf, marshalErr := cm.Marshal() - if marshalErr != nil { - return nil, marshalErr - } - - manifestStr := string(manifestBuf) - glog.V(4).Infoln("Generated chunk manifest: ", manifestStr) - - manifestFileId, manifestUrlLocation, manifestAssignmentErr := fs.assignNewFileInfo(w, r, replication, collection) - if manifestAssignmentErr != nil { - return nil, manifestAssignmentErr - } - glog.V(4).Infoln("Manifest uploaded to:", manifestUrlLocation, "Fid:", manifestFileId) - filerResult.Fid = manifestFileId - - u, _ := url.Parse(manifestUrlLocation) - q := u.Query() - q.Set("cm", "true") - u.RawQuery = q.Encode() - - manifestUploadErr := fs.doUpload(u.String(), w, r, manifestBuf, fileName+"_manifest", "application/json", manifestFileId) - if manifestUploadErr != nil { - return nil, manifestUploadErr - } - - path := r.URL.Path - // also delete the old fid unless PUT operation - if r.Method != "PUT" { - if oldFid, err := fs.filer.FindFile(path); err == nil { - operation.DeleteFile(fs.getMasterNode(), oldFid, fs.jwt(oldFid)) - } - } - - glog.V(4).Infoln("saving", path, "=>", manifestFileId) - if db_err := fs.filer.CreateFile(path, manifestFileId); db_err != nil { - replyerr = db_err - filerResult.Error = db_err.Error() - operation.DeleteFile(fs.getMasterNode(), manifestFileId, fs.jwt(manifestFileId)) //clean up - glog.V(0).Infof("failing to write %s to filer server : %v", path, db_err) - return - } - - return -} - -func (fs *FilerServer) doUpload(urlLocation string, w http.ResponseWriter, r *http.Request, chunkBuf []byte, fileName string, contentType string, fileId string) (err error) { - err = nil - - ioReader := ioutil.NopCloser(bytes.NewBuffer(chunkBuf)) - uploadResult, uploadError := operation.Upload(urlLocation, fileName, ioReader, false, contentType, fs.jwt(fileId)) - if uploadResult != nil { - glog.V(0).Infoln("Chunk upload result. Name:", uploadResult.Name, "Fid:", fileId, "Size:", uploadResult.Size) - } - if uploadError != nil { - err = uploadError - } - return -} - // curl -X DELETE http://localhost:8888/path/to // curl -X DELETE http://localhost:8888/path/to?recursive=true func (fs *FilerServer) DeleteHandler(w http.ResponseWriter, r *http.Request) { |
