aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorChris Lu <chris.lu@gmail.com>2020-08-08 10:45:37 -0700
committerChris Lu <chris.lu@gmail.com>2020-08-08 10:45:37 -0700
commitae00cce4bd673363f5f7c36905fbff138bcbd772 (patch)
tree715520c280dbc1a1d5c307e5625267bca0251916
parentbd8bfdae0716a4071c55b05b06f943267a35c4b9 (diff)
downloadseaweedfs-ae00cce4bd673363f5f7c36905fbff138bcbd772.tar.xz
seaweedfs-ae00cce4bd673363f5f7c36905fbff138bcbd772.zip
support POST and PUT auto chunking
-rw-r--r--weed/server/filer_server_handlers_write_autochunk.go131
1 files changed, 86 insertions, 45 deletions
diff --git a/weed/server/filer_server_handlers_write_autochunk.go b/weed/server/filer_server_handlers_write_autochunk.go
index c3ce09af6..77ae10f19 100644
--- a/weed/server/filer_server_handlers_write_autochunk.go
+++ b/weed/server/filer_server_handlers_write_autochunk.go
@@ -3,6 +3,7 @@ package weed_server
import (
"context"
"crypto/md5"
+ "hash"
"io"
"io/ioutil"
"net/http"
@@ -22,10 +23,6 @@ import (
func (fs *FilerServer) autoChunk(ctx context.Context, w http.ResponseWriter, r *http.Request,
replication string, collection string, dataCenter string, ttlSec int32, ttlString string, fsync bool) bool {
- if r.Method != "POST" {
- glog.V(4).Infoln("AutoChunking not supported for method", r.Method)
- return false
- }
// autoChunking can be set at the command-line level or as a query param. Query param overrides command-line
query := r.URL.Query()
@@ -57,7 +54,19 @@ func (fs *FilerServer) autoChunk(ctx context.Context, w http.ResponseWriter, r *
return false
}
- reply, err := fs.doPostAutoChunk(ctx, w, r, contentLength, chunkSize, replication, collection, dataCenter, ttlSec, ttlString, fsync)
+ stats.FilerRequestCounter.WithLabelValues("postAutoChunk").Inc()
+ start := time.Now()
+ defer func() {
+ stats.FilerRequestHistogram.WithLabelValues("postAutoChunk").Observe(time.Since(start).Seconds())
+ }()
+
+ var reply *FilerPostResult
+ var err error
+ if r.Method == "POST" {
+ reply, err = fs.doPostAutoChunk(ctx, w, r, contentLength, chunkSize, replication, collection, dataCenter, ttlSec, ttlString, fsync)
+ } else {
+ reply, err = fs.doPutAutoChunk(ctx, w, r, contentLength, chunkSize, replication, collection, dataCenter, ttlSec, ttlString, fsync)
+ }
if err != nil {
writeJsonError(w, r, http.StatusInternalServerError, err)
} else if reply != nil {
@@ -69,12 +78,6 @@ func (fs *FilerServer) autoChunk(ctx context.Context, w http.ResponseWriter, r *
func (fs *FilerServer) doPostAutoChunk(ctx context.Context, w http.ResponseWriter, r *http.Request,
contentLength int64, chunkSize int32, replication string, collection string, dataCenter string, ttlSec int32, ttlString string, fsync bool) (filerResult *FilerPostResult, replyerr error) {
- stats.FilerRequestCounter.WithLabelValues("postAutoChunk").Inc()
- start := time.Now()
- defer func() {
- stats.FilerRequestHistogram.WithLabelValues("postAutoChunk").Observe(time.Since(start).Seconds())
- }()
-
multipartReader, multipartReaderErr := r.MultipartReader()
if multipartReaderErr != nil {
return nil, multipartReaderErr
@@ -90,46 +93,36 @@ func (fs *FilerServer) doPostAutoChunk(ctx context.Context, w http.ResponseWrite
fileName = path.Base(fileName)
}
contentType := part1.Header.Get("Content-Type")
+ if contentType == "application/octet-stream" {
+ contentType = ""
+ }
- var fileChunks []*filer_pb.FileChunk
-
- md5Hash := md5.New()
- var partReader = ioutil.NopCloser(io.TeeReader(part1, md5Hash))
-
- chunkOffset := int64(0)
-
- for chunkOffset < contentLength {
- limitedReader := io.LimitReader(partReader, int64(chunkSize))
+ fileChunks, md5Hash, chunkOffset, err := fs.uploadReaderToChunks(w, r, part1, contentLength, chunkSize, replication, collection, dataCenter, ttlString, fileName, contentType, fsync)
+ if err != nil {
+ return nil, err
+ }
- // assign one file id for one chunk
- fileId, urlLocation, auth, assignErr := fs.assignNewFileInfo(replication, collection, dataCenter, ttlString, fsync)
- if assignErr != nil {
- return nil, assignErr
- }
+ fileChunks, replyerr = filer2.MaybeManifestize(fs.saveAsChunk(replication, collection, dataCenter, ttlString, fsync), fileChunks)
+ if replyerr != nil {
+ glog.V(0).Infof("manifestize %s: %v", r.RequestURI, replyerr)
+ return
+ }
- // upload the chunk to the volume server
- uploadResult, uploadErr := fs.doUpload(urlLocation, w, r, limitedReader, fileName, contentType, nil, auth)
- if uploadErr != nil {
- return nil, uploadErr
- }
+ filerResult, replyerr = fs.saveMetaData(ctx, r, fileName, replication, collection, ttlSec, contentType, md5Hash, fileChunks, chunkOffset)
- // if last chunk exhausted the reader exactly at the border
- if uploadResult.Size == 0 {
- break
- }
+ return
+}
- // Save to chunk manifest structure
- fileChunks = append(fileChunks, uploadResult.ToPbFileChunk(fileId, chunkOffset))
- glog.V(4).Infof("uploaded %s chunk %d to %s [%d,%d) of %d", fileName, len(fileChunks), fileId, chunkOffset, chunkOffset+int64(uploadResult.Size), contentLength)
+func (fs *FilerServer) doPutAutoChunk(ctx context.Context, w http.ResponseWriter, r *http.Request,
+ contentLength int64, chunkSize int32, replication string, collection string, dataCenter string, ttlSec int32, ttlString string, fsync bool) (filerResult *FilerPostResult, replyerr error) {
- // reset variables for the next chunk
- chunkOffset = chunkOffset + int64(uploadResult.Size)
+ fileName := ""
+ contentType := ""
- // if last chunk was not at full chunk size, but already exhausted the reader
- if int64(uploadResult.Size) < int64(chunkSize) {
- break
- }
+ fileChunks, md5Hash, chunkOffset, err := fs.uploadReaderToChunks(w, r, r.Body, contentLength, chunkSize, replication, collection, dataCenter, ttlString, fileName, contentType, fsync)
+ if err != nil {
+ return nil, err
}
fileChunks, replyerr = filer2.MaybeManifestize(fs.saveAsChunk(replication, collection, dataCenter, ttlString, fsync), fileChunks)
@@ -138,6 +131,12 @@ func (fs *FilerServer) doPostAutoChunk(ctx context.Context, w http.ResponseWrite
return
}
+ filerResult, replyerr = fs.saveMetaData(ctx, r, fileName, replication, collection, ttlSec, contentType, md5Hash, fileChunks, chunkOffset)
+
+ return
+}
+
+func (fs *FilerServer) saveMetaData(ctx context.Context, r *http.Request, fileName string, replication string, collection string, ttlSec int32, contentType string, md5Hash hash.Hash, fileChunks []*filer_pb.FileChunk, chunkOffset int64) (filerResult *FilerPostResult, replyerr error) {
path := r.URL.Path
if strings.HasSuffix(path, "/") {
if fileName != "" {
@@ -173,10 +172,52 @@ func (fs *FilerServer) doPostAutoChunk(ctx context.Context, w http.ResponseWrite
replyerr = dbErr
filerResult.Error = dbErr.Error()
glog.V(0).Infof("failing to write %s to filer server : %v", path, dbErr)
- return
}
+ return filerResult, replyerr
+}
- return
+func (fs *FilerServer) uploadReaderToChunks(w http.ResponseWriter, r *http.Request, reader io.Reader, contentLength int64, chunkSize int32, replication string, collection string, dataCenter string, ttlString string, fileName string, contentType string, fsync bool) ([]*filer_pb.FileChunk, hash.Hash, int64, error) {
+ var fileChunks []*filer_pb.FileChunk
+
+ md5Hash := md5.New()
+ var partReader = ioutil.NopCloser(io.TeeReader(reader, md5Hash))
+
+ chunkOffset := int64(0)
+
+ for chunkOffset < contentLength {
+ limitedReader := io.LimitReader(partReader, int64(chunkSize))
+
+ // assign one file id for one chunk
+ fileId, urlLocation, auth, assignErr := fs.assignNewFileInfo(replication, collection, dataCenter, ttlString, fsync)
+ if assignErr != nil {
+ return nil, nil, 0, assignErr
+ }
+
+ // upload the chunk to the volume server
+ uploadResult, uploadErr := fs.doUpload(urlLocation, w, r, limitedReader, fileName, contentType, nil, auth)
+ if uploadErr != nil {
+ return nil, nil, 0, uploadErr
+ }
+
+ // if last chunk exhausted the reader exactly at the border
+ if uploadResult.Size == 0 {
+ break
+ }
+
+ // Save to chunk manifest structure
+ fileChunks = append(fileChunks, uploadResult.ToPbFileChunk(fileId, chunkOffset))
+
+ glog.V(4).Infof("uploaded %s chunk %d to %s [%d,%d) of %d", fileName, len(fileChunks), fileId, chunkOffset, chunkOffset+int64(uploadResult.Size), contentLength)
+
+ // reset variables for the next chunk
+ chunkOffset = chunkOffset + int64(uploadResult.Size)
+
+ // if last chunk was not at full chunk size, but already exhausted the reader
+ if int64(uploadResult.Size) < int64(chunkSize) {
+ break
+ }
+ }
+ return fileChunks, md5Hash, chunkOffset, nil
}
func (fs *FilerServer) doUpload(urlLocation string, w http.ResponseWriter, r *http.Request, limitedReader io.Reader, fileName string, contentType string, pairMap map[string]string, auth security.EncodedJwt) (*operation.UploadResult, error) {