diff options
Diffstat (limited to 'weed/server/filer_server_handlers_write_upload.go')
| -rw-r--r-- | weed/server/filer_server_handlers_write_upload.go | 156 |
1 files changed, 105 insertions, 51 deletions
diff --git a/weed/server/filer_server_handlers_write_upload.go b/weed/server/filer_server_handlers_write_upload.go index cf8cdf3d8..21af6a109 100644 --- a/weed/server/filer_server_handlers_write_upload.go +++ b/weed/server/filer_server_handlers_write_upload.go @@ -5,8 +5,10 @@ import ( "hash" "io" "io/ioutil" + "math/rand" "net/http" "strings" + "sync" "time" "github.com/chrislusf/seaweedfs/weed/filer" @@ -18,75 +20,127 @@ import ( "github.com/chrislusf/seaweedfs/weed/util" ) -func (fs *FilerServer) uploadReaderToChunks(w http.ResponseWriter, r *http.Request, reader io.Reader, chunkSize int32, fileName, contentType string, contentLength int64, so *operation.StorageOption) ([]*filer_pb.FileChunk, hash.Hash, int64, error, []byte) { - var fileChunks []*filer_pb.FileChunk +func (fs *FilerServer) uploadReaderToChunks(w http.ResponseWriter, r *http.Request, reader io.Reader, chunkSize int32, fileName, contentType string, contentLength int64, so *operation.StorageOption) (fileChunks []*filer_pb.FileChunk, md5Hash hash.Hash, dataSize int64, err error, smallContent []byte) { - md5Hash := md5.New() + md5Hash = md5.New() var partReader = ioutil.NopCloser(io.TeeReader(reader, md5Hash)) - chunkOffset := int64(0) - var smallContent []byte + // save small content directly + if !isAppend(r) && ((0 < contentLength && contentLength < fs.option.SaveToFilerLimit) || strings.HasPrefix(r.URL.Path, filer.DirectoryEtcRoot) && contentLength < 4*1024) { + smallContent, err = ioutil.ReadAll(partReader) + dataSize = int64(len(smallContent)) + return + } - for { - limitedReader := io.LimitReader(partReader, int64(chunkSize)) + resultsChan := make(chan *ChunkCreationResult, operation.ConcurrentUploadLimit) - data, err := ioutil.ReadAll(limitedReader) - if err != nil { - return nil, nil, 0, err, nil - } - if chunkOffset == 0 && !isAppend(r) { - if len(data) < fs.option.SaveToFilerLimit || strings.HasPrefix(r.URL.Path, filer.DirectoryEtcRoot) && len(data) < 4*1024 { - smallContent = data - chunkOffset += int64(len(data)) - break + var waitForAllData sync.WaitGroup + waitForAllData.Add(1) + go func() { + // process upload results + defer waitForAllData.Done() + for result := range resultsChan { + if result.err != nil { + err = result.err + continue } + + // Save to chunk manifest structure + fileChunks = append(fileChunks, result.chunk) } - dataReader := util.NewBytesReader(data) - - // retry to assign a different file id - var fileId, urlLocation string - var auth security.EncodedJwt - var assignErr, uploadErr error - var uploadResult *operation.UploadResult - for i := 0; i < 3; i++ { - // assign one file id for one chunk - fileId, urlLocation, auth, assignErr = fs.assignNewFileInfo(so) - if assignErr != nil { - return nil, nil, 0, assignErr, nil + }() + + var lock sync.Mutex + readOffset := int64(0) + var wg sync.WaitGroup + var readErr error + + for readErr == nil { + + wg.Add(1) + operation.AsyncOutOfOrderProcess(rand.Uint32(), func() { + defer wg.Done() + + var localOffset int64 + var data []byte + // read from the input + lock.Lock() + localOffset = readOffset + limitedReader := io.LimitReader(partReader, int64(chunkSize)) + data, readErr = ioutil.ReadAll(limitedReader) + readOffset += int64(len(data)) + lock.Unlock() + // handle read errors + if readErr != nil { + if readErr != io.EOF { + resultsChan <- &ChunkCreationResult{ + err: readErr, + } + } + return + } + if len(data) == 0 { + readErr = io.EOF + return } - // upload the chunk to the volume server - uploadResult, uploadErr, _ = fs.doUpload(urlLocation, w, r, dataReader, fileName, contentType, nil, auth) + // upload + dataReader := util.NewBytesReader(data) + fileId, uploadResult, uploadErr := fs.doCreateChunk(w, r, so, dataReader, fileName, contentType) if uploadErr != nil { - time.Sleep(251 * time.Millisecond) - continue + resultsChan <- &ChunkCreationResult{ + err: uploadErr, + } + return } - break - } - if uploadErr != nil { - return nil, nil, 0, uploadErr, nil - } - // if last chunk exhausted the reader exactly at the border - if uploadResult.Size == 0 { - break - } + glog.V(4).Infof("uploaded %s to %s [%d,%d)", fileName, fileId, localOffset, localOffset+int64(uploadResult.Size)) - // Save to chunk manifest structure - fileChunks = append(fileChunks, uploadResult.ToPbFileChunk(fileId, chunkOffset)) + // send back uploaded file chunk + resultsChan <- &ChunkCreationResult{ + chunk: uploadResult.ToPbFileChunk(fileId, localOffset), + } + + }) + } + + go func() { + wg.Wait() + close(resultsChan) + }() + + waitForAllData.Wait() - glog.V(4).Infof("uploaded %s chunk %d to %s [%d,%d)", fileName, len(fileChunks), fileId, chunkOffset, chunkOffset+int64(uploadResult.Size)) + return fileChunks, md5Hash, readOffset, err, nil +} - // reset variables for the next chunk - chunkOffset = chunkOffset + int64(uploadResult.Size) +type ChunkCreationResult struct { + chunk *filer_pb.FileChunk + err error +} - // if last chunk was not at full chunk size, but already exhausted the reader - if int64(uploadResult.Size) < int64(chunkSize) { - break +func (fs *FilerServer) doCreateChunk(w http.ResponseWriter, r *http.Request, so *operation.StorageOption, dataReader *util.BytesReader, fileName string, contentType string) (string, *operation.UploadResult, error) { + // retry to assign a different file id + var fileId, urlLocation string + var auth security.EncodedJwt + var assignErr, uploadErr error + var uploadResult *operation.UploadResult + for i := 0; i < 3; i++ { + // assign one file id for one chunk + fileId, urlLocation, auth, assignErr = fs.assignNewFileInfo(so) + if assignErr != nil { + return "", nil, assignErr } - } - return fileChunks, md5Hash, chunkOffset, nil, smallContent + // upload the chunk to the volume server + uploadResult, uploadErr, _ = fs.doUpload(urlLocation, w, r, dataReader, fileName, contentType, nil, auth) + if uploadErr != nil { + time.Sleep(251 * time.Millisecond) + continue + } + break + } + return fileId, uploadResult, uploadErr } func (fs *FilerServer) doUpload(urlLocation string, w http.ResponseWriter, r *http.Request, limitedReader io.Reader, fileName string, contentType string, pairMap map[string]string, auth security.EncodedJwt) (*operation.UploadResult, error, []byte) { |
