aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--weed/server/filer_server_handlers_write_upload.go170
1 files changed, 50 insertions, 120 deletions
diff --git a/weed/server/filer_server_handlers_write_upload.go b/weed/server/filer_server_handlers_write_upload.go
index 81b2ce1b0..3ab45453e 100644
--- a/weed/server/filer_server_handlers_write_upload.go
+++ b/weed/server/filer_server_handlers_write_upload.go
@@ -6,9 +6,7 @@ import (
"io"
"io/ioutil"
"net/http"
- "runtime"
"strings"
- "sync"
"time"
"github.com/chrislusf/seaweedfs/weed/filer"
@@ -20,143 +18,75 @@ import (
"github.com/chrislusf/seaweedfs/weed/util"
)
-var (
- limitedUploadProcessor = util.NewLimitedOutOfOrderProcessor(int32(runtime.NumCPU()))
-)
-
-func (fs *FilerServer) uploadReaderToChunks(w http.ResponseWriter, r *http.Request, reader io.Reader, chunkSize int32, fileName, contentType string, contentLength int64, so *operation.StorageOption) (fileChunks []*filer_pb.FileChunk, md5Hash hash.Hash, dataSize int64, err error, smallContent []byte) {
+func (fs *FilerServer) uploadReaderToChunks(w http.ResponseWriter, r *http.Request, reader io.Reader, chunkSize int32, fileName, contentType string, contentLength int64, so *operation.StorageOption) ([]*filer_pb.FileChunk, hash.Hash, int64, error, []byte) {
+ var fileChunks []*filer_pb.FileChunk
- md5Hash = md5.New()
+ md5Hash := md5.New()
var partReader = ioutil.NopCloser(io.TeeReader(reader, md5Hash))
- // save small content directly
- if !isAppend(r) && ((0 < contentLength && contentLength < fs.option.SaveToFilerLimit) || strings.HasPrefix(r.URL.Path, filer.DirectoryEtcRoot) && contentLength < 4*1024) {
- smallContent, err = ioutil.ReadAll(partReader)
- dataSize = int64(len(smallContent))
- return
- }
+ chunkOffset := int64(0)
+ var smallContent []byte
- resultsChan := make(chan *ChunkCreationResult, 2)
+ for {
+ limitedReader := io.LimitReader(partReader, int64(chunkSize))
- var waitForAllData sync.WaitGroup
- waitForAllData.Add(1)
- go func() {
- // process upload results
- defer waitForAllData.Done()
- for result := range resultsChan {
- if result.err != nil {
- err = result.err
- continue
- }
-
- // Save to chunk manifest structure
- fileChunks = append(fileChunks, result.chunk)
+ data, err := ioutil.ReadAll(limitedReader)
+ if err != nil {
+ return nil, nil, 0, err, nil
}
- }()
-
- var lock sync.Mutex
- readOffset := int64(0)
- var wg sync.WaitGroup
-
- for err == nil {
-
- wg.Add(1)
- request := func() {
- defer wg.Done()
-
- var localOffset int64
- // read from the input
- lock.Lock()
- localOffset = readOffset
- limitedReader := io.LimitReader(partReader, int64(chunkSize))
- data, readErr := ioutil.ReadAll(limitedReader)
- readOffset += int64(len(data))
- lock.Unlock()
- // handle read errors
- if readErr != nil {
- if err == nil {
- err = readErr
- }
- if readErr != io.EOF {
- resultsChan <- &ChunkCreationResult{
- err: readErr,
- }
- }
- return
+ if chunkOffset == 0 && !isAppend(r) {
+ if len(data) < int(fs.option.SaveToFilerLimit) || strings.HasPrefix(r.URL.Path, filer.DirectoryEtcRoot) && len(data) < 4*1024 {
+ smallContent = data
+ chunkOffset += int64(len(data))
+ break
}
- if len(data) == 0 {
- readErr = io.EOF
- if err == nil {
- err = readErr
- }
- return
+ }
+ dataReader := util.NewBytesReader(data)
+
+ // retry to assign a different file id
+ var fileId, urlLocation string
+ var auth security.EncodedJwt
+ var assignErr, uploadErr error
+ var uploadResult *operation.UploadResult
+ for i := 0; i < 3; i++ {
+ // assign one file id for one chunk
+ fileId, urlLocation, auth, assignErr = fs.assignNewFileInfo(so)
+ if assignErr != nil {
+ return nil, nil, 0, assignErr, nil
}
- // upload
- dataReader := util.NewBytesReader(data)
- fileId, uploadResult, uploadErr := fs.doCreateChunk(w, r, so, dataReader, fileName, contentType)
+ // upload the chunk to the volume server
+ uploadResult, uploadErr, _ = fs.doUpload(urlLocation, w, r, dataReader, fileName, contentType, nil, auth)
if uploadErr != nil {
- if err == nil {
- err = uploadErr
- }
- resultsChan <- &ChunkCreationResult{
- err: uploadErr,
- }
- return
- }
-
- glog.V(4).Infof("uploaded %s to %s [%d,%d)", fileName, fileId, localOffset, localOffset+int64(uploadResult.Size))
-
- // send back uploaded file chunk
- resultsChan <- &ChunkCreationResult{
- chunk: uploadResult.ToPbFileChunk(fileId, localOffset),
+ time.Sleep(251 * time.Millisecond)
+ continue
}
-
+ break
+ }
+ if uploadErr != nil {
+ return nil, nil, 0, uploadErr, nil
}
- limitedUploadProcessor.Execute(request)
- }
-
- go func() {
- wg.Wait()
- close(resultsChan)
- }()
-
- waitForAllData.Wait()
- if err == io.EOF {
- err = nil
- }
+ // if last chunk exhausted the reader exactly at the border
+ if uploadResult.Size == 0 {
+ break
+ }
- return fileChunks, md5Hash, readOffset, err, nil
-}
+ // Save to chunk manifest structure
+ fileChunks = append(fileChunks, uploadResult.ToPbFileChunk(fileId, chunkOffset))
-type ChunkCreationResult struct {
- chunk *filer_pb.FileChunk
- err error
-}
+ glog.V(4).Infof("uploaded %s chunk %d to %s [%d,%d)", fileName, len(fileChunks), fileId, chunkOffset, chunkOffset+int64(uploadResult.Size))
-func (fs *FilerServer) doCreateChunk(w http.ResponseWriter, r *http.Request, so *operation.StorageOption, dataReader *util.BytesReader, fileName string, contentType string) (string, *operation.UploadResult, error) {
- // retry to assign a different file id
- var fileId, urlLocation string
- var auth security.EncodedJwt
- var assignErr, uploadErr error
- var uploadResult *operation.UploadResult
- for i := 0; i < 3; i++ {
- // assign one file id for one chunk
- fileId, urlLocation, auth, assignErr = fs.assignNewFileInfo(so)
- if assignErr != nil {
- return "", nil, assignErr
- }
+ // reset variables for the next chunk
+ chunkOffset = chunkOffset + int64(uploadResult.Size)
- // upload the chunk to the volume server
- uploadResult, uploadErr, _ = fs.doUpload(urlLocation, w, r, dataReader, fileName, contentType, nil, auth)
- if uploadErr != nil {
- time.Sleep(251 * time.Millisecond)
- continue
+ // if last chunk was not at full chunk size, but already exhausted the reader
+ if int64(uploadResult.Size) < int64(chunkSize) {
+ break
}
- break
}
- return fileId, uploadResult, uploadErr
+
+ return fileChunks, md5Hash, chunkOffset, nil, smallContent
}
func (fs *FilerServer) doUpload(urlLocation string, w http.ResponseWriter, r *http.Request, limitedReader io.Reader, fileName string, contentType string, pairMap map[string]string, auth security.EncodedJwt) (*operation.UploadResult, error, []byte) {