diff options
Diffstat (limited to 'weed/server')
| -rw-r--r-- | weed/server/common.go | 8 | ||||
| -rw-r--r-- | weed/server/filer_grpc_server_rename.go | 25 | ||||
| -rw-r--r-- | weed/server/filer_server.go | 43 | ||||
| -rw-r--r-- | weed/server/filer_server_handlers.go | 39 | ||||
| -rw-r--r-- | weed/server/filer_server_handlers_read.go | 10 | ||||
| -rw-r--r-- | weed/server/filer_server_handlers_write.go | 4 | ||||
| -rw-r--r-- | weed/server/filer_server_handlers_write_autochunk.go | 126 | ||||
| -rw-r--r-- | weed/server/filer_server_handlers_write_upload.go | 168 | ||||
| -rw-r--r-- | weed/server/master_grpc_server.go | 6 | ||||
| -rw-r--r-- | weed/server/master_server.go | 7 | ||||
| -rw-r--r-- | weed/server/volume_grpc_read_write.go | 38 | ||||
| -rw-r--r-- | weed/server/volume_server.go | 8 | ||||
| -rw-r--r-- | weed/server/volume_server_handlers.go | 30 | ||||
| -rw-r--r-- | weed/server/volume_server_handlers_read.go | 7 |
14 files changed, 352 insertions, 167 deletions
diff --git a/weed/server/common.go b/weed/server/common.go index 9001a3b33..5c5f1b8eb 100644 --- a/weed/server/common.go +++ b/weed/server/common.go @@ -234,12 +234,12 @@ func adjustHeaderContentDisposition(w http.ResponseWriter, r *http.Request, file } } -func processRangeRequest(r *http.Request, w http.ResponseWriter, totalSize int64, mimeType string, writeFn func(writer io.Writer, offset int64, size int64, httpStatusCode int) error) { +func processRangeRequest(r *http.Request, w http.ResponseWriter, totalSize int64, mimeType string, writeFn func(writer io.Writer, offset int64, size int64) error) { rangeReq := r.Header.Get("Range") if rangeReq == "" { w.Header().Set("Content-Length", strconv.FormatInt(totalSize, 10)) - if err := writeFn(w, 0, totalSize, 0); err != nil { + if err := writeFn(w, 0, totalSize); err != nil { http.Error(w, err.Error(), http.StatusInternalServerError) return } @@ -279,7 +279,7 @@ func processRangeRequest(r *http.Request, w http.ResponseWriter, totalSize int64 w.Header().Set("Content-Length", strconv.FormatInt(ra.length, 10)) w.Header().Set("Content-Range", ra.contentRange(totalSize)) - err = writeFn(w, ra.start, ra.length, http.StatusPartialContent) + err = writeFn(w, ra.start, ra.length) if err != nil { http.Error(w, err.Error(), http.StatusInternalServerError) return @@ -307,7 +307,7 @@ func processRangeRequest(r *http.Request, w http.ResponseWriter, totalSize int64 pw.CloseWithError(e) return } - if e = writeFn(part, ra.start, ra.length, 0); e != nil { + if e = writeFn(part, ra.start, ra.length); e != nil { pw.CloseWithError(e) return } diff --git a/weed/server/filer_grpc_server_rename.go b/weed/server/filer_grpc_server_rename.go index 5b68b64de..c1e5bc789 100644 --- a/weed/server/filer_grpc_server_rename.go +++ b/weed/server/filer_grpc_server_rename.go @@ -33,8 +33,7 @@ func (fs *FilerServer) AtomicRenameEntry(ctx context.Context, req *filer_pb.Atom return nil, fmt.Errorf("%s/%s not found: %v", req.OldDirectory, req.OldName, err) } - var events MoveEvents - moveErr := fs.moveEntry(ctx, oldParent, oldEntry, newParent, req.NewName, &events) + moveErr := fs.moveEntry(ctx, oldParent, oldEntry, newParent, req.NewName) if moveErr != nil { fs.filer.RollbackTransaction(ctx) return nil, fmt.Errorf("%s/%s move error: %v", req.OldDirectory, req.OldName, moveErr) @@ -48,11 +47,11 @@ func (fs *FilerServer) AtomicRenameEntry(ctx context.Context, req *filer_pb.Atom return &filer_pb.AtomicRenameEntryResponse{}, nil } -func (fs *FilerServer) moveEntry(ctx context.Context, oldParent util.FullPath, entry *filer.Entry, newParent util.FullPath, newName string, events *MoveEvents) error { +func (fs *FilerServer) moveEntry(ctx context.Context, oldParent util.FullPath, entry *filer.Entry, newParent util.FullPath, newName string) error { - if err := fs.moveSelfEntry(ctx, oldParent, entry, newParent, newName, events, func() error { + if err := fs.moveSelfEntry(ctx, oldParent, entry, newParent, newName, func() error { if entry.IsDirectory() { - if err := fs.moveFolderSubEntries(ctx, oldParent, entry, newParent, newName, events); err != nil { + if err := fs.moveFolderSubEntries(ctx, oldParent, entry, newParent, newName); err != nil { return err } } @@ -64,7 +63,7 @@ func (fs *FilerServer) moveEntry(ctx context.Context, oldParent util.FullPath, e return nil } -func (fs *FilerServer) moveFolderSubEntries(ctx context.Context, oldParent util.FullPath, entry *filer.Entry, newParent util.FullPath, newName string, events *MoveEvents) error { +func (fs *FilerServer) moveFolderSubEntries(ctx context.Context, oldParent util.FullPath, entry *filer.Entry, newParent util.FullPath, newName string) error { currentDirPath := oldParent.Child(entry.Name()) newDirPath := newParent.Child(newName) @@ -85,7 +84,7 @@ func (fs *FilerServer) moveFolderSubEntries(ctx context.Context, oldParent util. for _, item := range entries { lastFileName = item.Name() // println("processing", lastFileName) - err := fs.moveEntry(ctx, currentDirPath, item, newDirPath, item.Name(), events) + err := fs.moveEntry(ctx, currentDirPath, item, newDirPath, item.Name()) if err != nil { return err } @@ -97,8 +96,7 @@ func (fs *FilerServer) moveFolderSubEntries(ctx context.Context, oldParent util. return nil } -func (fs *FilerServer) moveSelfEntry(ctx context.Context, oldParent util.FullPath, entry *filer.Entry, newParent util.FullPath, newName string, events *MoveEvents, - moveFolderSubEntries func() error) error { +func (fs *FilerServer) moveSelfEntry(ctx context.Context, oldParent util.FullPath, entry *filer.Entry, newParent util.FullPath, newName string, moveFolderSubEntries func() error) error { oldPath, newPath := oldParent.Child(entry.Name()), newParent.Child(newName) @@ -122,8 +120,6 @@ func (fs *FilerServer) moveSelfEntry(ctx context.Context, oldParent util.FullPat return createErr } - events.newEntries = append(events.newEntries, newEntry) - if moveFolderSubEntries != nil { if moveChildrenErr := moveFolderSubEntries(); moveChildrenErr != nil { return moveChildrenErr @@ -136,13 +132,6 @@ func (fs *FilerServer) moveSelfEntry(ctx context.Context, oldParent util.FullPat return deleteErr } - events.oldEntries = append(events.oldEntries, entry) - return nil } - -type MoveEvents struct { - oldEntries []*filer.Entry - newEntries []*filer.Entry -} diff --git a/weed/server/filer_server.go b/weed/server/filer_server.go index 9e0770afa..2734223ea 100644 --- a/weed/server/filer_server.go +++ b/weed/server/filer_server.go @@ -45,22 +45,23 @@ import ( ) type FilerOption struct { - Masters []string - Collection string - DefaultReplication string - DisableDirListing bool - MaxMB int - DirListingLimit int - DataCenter string - Rack string - DefaultLevelDbDir string - DisableHttp bool - Host string - Port uint32 - recursiveDelete bool - Cipher bool - SaveToFilerLimit int - Filers []string + Masters []string + Collection string + DefaultReplication string + DisableDirListing bool + MaxMB int + DirListingLimit int + DataCenter string + Rack string + DefaultLevelDbDir string + DisableHttp bool + Host string + Port uint32 + recursiveDelete bool + Cipher bool + SaveToFilerLimit int64 + Filers []string + ConcurrentUploadLimit int64 } type FilerServer struct { @@ -79,14 +80,18 @@ type FilerServer struct { brokers map[string]map[string]bool brokersLock sync.Mutex + + inFlightDataSize int64 + inFlightDataLimitCond *sync.Cond } func NewFilerServer(defaultMux, readonlyMux *http.ServeMux, option *FilerOption) (fs *FilerServer, err error) { fs = &FilerServer{ - option: option, - grpcDialOption: security.LoadClientTLS(util.GetViper(), "grpc.filer"), - brokers: make(map[string]map[string]bool), + option: option, + grpcDialOption: security.LoadClientTLS(util.GetViper(), "grpc.filer"), + brokers: make(map[string]map[string]bool), + inFlightDataLimitCond: sync.NewCond(new(sync.Mutex)), } fs.listenersCond = sync.NewCond(&fs.listenersLock) diff --git a/weed/server/filer_server_handlers.go b/weed/server/filer_server_handlers.go index 3bc0c5d0d..ed6bbb6f6 100644 --- a/weed/server/filer_server_handlers.go +++ b/weed/server/filer_server_handlers.go @@ -4,6 +4,7 @@ import ( "github.com/chrislusf/seaweedfs/weed/util" "net/http" "strings" + "sync/atomic" "time" "github.com/chrislusf/seaweedfs/weed/stats" @@ -47,18 +48,34 @@ func (fs *FilerServer) filerHandler(w http.ResponseWriter, r *http.Request) { fs.DeleteHandler(w, r) } stats.FilerRequestHistogram.WithLabelValues("delete").Observe(time.Since(start).Seconds()) - case "PUT": - stats.FilerRequestCounter.WithLabelValues("put").Inc() - if _, ok := r.URL.Query()["tagging"]; ok { - fs.PutTaggingHandler(w, r) - } else { - fs.PostHandler(w, r) + case "POST", "PUT": + + // wait until in flight data is less than the limit + contentLength := getContentLength(r) + fs.inFlightDataLimitCond.L.Lock() + for atomic.LoadInt64(&fs.inFlightDataSize) > fs.option.ConcurrentUploadLimit { + fs.inFlightDataLimitCond.Wait() + } + atomic.AddInt64(&fs.inFlightDataSize, contentLength) + fs.inFlightDataLimitCond.L.Unlock() + defer func() { + atomic.AddInt64(&fs.inFlightDataSize, -contentLength) + fs.inFlightDataLimitCond.Signal() + }() + + if r.Method == "PUT" { + stats.FilerRequestCounter.WithLabelValues("put").Inc() + if _, ok := r.URL.Query()["tagging"]; ok { + fs.PutTaggingHandler(w, r) + } else { + fs.PostHandler(w, r, contentLength) + } + stats.FilerRequestHistogram.WithLabelValues("put").Observe(time.Since(start).Seconds()) + } else { // method == "POST" + stats.FilerRequestCounter.WithLabelValues("post").Inc() + fs.PostHandler(w, r, contentLength) + stats.FilerRequestHistogram.WithLabelValues("post").Observe(time.Since(start).Seconds()) } - stats.FilerRequestHistogram.WithLabelValues("put").Observe(time.Since(start).Seconds()) - case "POST": - stats.FilerRequestCounter.WithLabelValues("post").Inc() - fs.PostHandler(w, r) - stats.FilerRequestHistogram.WithLabelValues("post").Observe(time.Since(start).Seconds()) case "OPTIONS": stats.FilerRequestCounter.WithLabelValues("options").Inc() OptionsHandler(w, r, false) diff --git a/weed/server/filer_server_handlers_read.go b/weed/server/filer_server_handlers_read.go index 892a732f7..f90b070a2 100644 --- a/weed/server/filer_server_handlers_read.go +++ b/weed/server/filer_server_handlers_read.go @@ -131,6 +131,9 @@ func (fs *FilerServer) GetOrHeadHandler(w http.ResponseWriter, r *http.Request, if r.Method == "HEAD" { w.Header().Set("Content-Length", strconv.FormatInt(totalSize, 10)) + processRangeRequest(r, w, totalSize, mimeType, func(writer io.Writer, offset int64, size int64) error { + return filer.StreamContent(fs.filer.MasterClient, writer, entry.Chunks, offset, size, true) + }) return } @@ -150,10 +153,7 @@ func (fs *FilerServer) GetOrHeadHandler(w http.ResponseWriter, r *http.Request, } } - processRangeRequest(r, w, totalSize, mimeType, func(writer io.Writer, offset int64, size int64, httpStatusCode int) error { - if httpStatusCode != 0 { - w.WriteHeader(httpStatusCode) - } + processRangeRequest(r, w, totalSize, mimeType, func(writer io.Writer, offset int64, size int64) error { if offset+size <= int64(len(entry.Content)) { _, err := writer.Write(entry.Content[offset : offset+size]) if err != nil { @@ -161,7 +161,7 @@ func (fs *FilerServer) GetOrHeadHandler(w http.ResponseWriter, r *http.Request, } return err } - return filer.StreamContent(fs.filer.MasterClient, writer, entry.Chunks, offset, size) + return filer.StreamContent(fs.filer.MasterClient, writer, entry.Chunks, offset, size, false) }) } diff --git a/weed/server/filer_server_handlers_write.go b/weed/server/filer_server_handlers_write.go index 0ce3e4a58..95eba9d3d 100644 --- a/weed/server/filer_server_handlers_write.go +++ b/weed/server/filer_server_handlers_write.go @@ -52,7 +52,7 @@ func (fs *FilerServer) assignNewFileInfo(so *operation.StorageOption) (fileId, u return } -func (fs *FilerServer) PostHandler(w http.ResponseWriter, r *http.Request) { +func (fs *FilerServer) PostHandler(w http.ResponseWriter, r *http.Request, contentLength int64) { ctx := context.Background() @@ -66,7 +66,7 @@ func (fs *FilerServer) PostHandler(w http.ResponseWriter, r *http.Request) { query.Get("rack"), ) - fs.autoChunk(ctx, w, r, so) + fs.autoChunk(ctx, w, r, contentLength, so) util.CloseRequest(r) } diff --git a/weed/server/filer_server_handlers_write_autochunk.go b/weed/server/filer_server_handlers_write_autochunk.go index 318399281..2808042c7 100644 --- a/weed/server/filer_server_handlers_write_autochunk.go +++ b/weed/server/filer_server_handlers_write_autochunk.go @@ -2,11 +2,8 @@ package weed_server import ( "context" - "crypto/md5" "fmt" - "hash" "io" - "io/ioutil" "net/http" "os" "path" @@ -19,13 +16,12 @@ import ( "github.com/chrislusf/seaweedfs/weed/operation" "github.com/chrislusf/seaweedfs/weed/pb/filer_pb" xhttp "github.com/chrislusf/seaweedfs/weed/s3api/http" - "github.com/chrislusf/seaweedfs/weed/security" "github.com/chrislusf/seaweedfs/weed/stats" "github.com/chrislusf/seaweedfs/weed/storage/needle" "github.com/chrislusf/seaweedfs/weed/util" ) -func (fs *FilerServer) autoChunk(ctx context.Context, w http.ResponseWriter, r *http.Request, so *operation.StorageOption) { +func (fs *FilerServer) autoChunk(ctx context.Context, w http.ResponseWriter, r *http.Request, contentLength int64, so *operation.StorageOption) { // autoChunking can be set at the command-line level or as a query param. Query param overrides command-line query := r.URL.Query() @@ -51,14 +47,16 @@ func (fs *FilerServer) autoChunk(ctx context.Context, w http.ResponseWriter, r * if r.Header.Get("Content-Type") == "" && strings.HasSuffix(r.URL.Path, "/") { reply, err = fs.mkdir(ctx, w, r) } else { - reply, md5bytes, err = fs.doPostAutoChunk(ctx, w, r, chunkSize, so) + reply, md5bytes, err = fs.doPostAutoChunk(ctx, w, r, chunkSize, contentLength, so) } } else { - reply, md5bytes, err = fs.doPutAutoChunk(ctx, w, r, chunkSize, so) + reply, md5bytes, err = fs.doPutAutoChunk(ctx, w, r, chunkSize, contentLength, so) } if err != nil { if strings.HasPrefix(err.Error(), "read input:") { writeJsonError(w, r, 499, err) + } else if strings.HasSuffix(err.Error(), "is a file") { + writeJsonError(w, r, http.StatusConflict, err) } else { writeJsonError(w, r, http.StatusInternalServerError, err) } @@ -70,7 +68,7 @@ func (fs *FilerServer) autoChunk(ctx context.Context, w http.ResponseWriter, r * } } -func (fs *FilerServer) doPostAutoChunk(ctx context.Context, w http.ResponseWriter, r *http.Request, chunkSize int32, so *operation.StorageOption) (filerResult *FilerPostResult, md5bytes []byte, replyerr error) { +func (fs *FilerServer) doPostAutoChunk(ctx context.Context, w http.ResponseWriter, r *http.Request, chunkSize int32, contentLength int64, so *operation.StorageOption) (filerResult *FilerPostResult, md5bytes []byte, replyerr error) { multipartReader, multipartReaderErr := r.MultipartReader() if multipartReaderErr != nil { @@ -91,7 +89,7 @@ func (fs *FilerServer) doPostAutoChunk(ctx context.Context, w http.ResponseWrite contentType = "" } - fileChunks, md5Hash, chunkOffset, err, smallContent := fs.uploadReaderToChunks(w, r, part1, chunkSize, fileName, contentType, so) + fileChunks, md5Hash, chunkOffset, err, smallContent := fs.uploadReaderToChunks(w, r, part1, chunkSize, fileName, contentType, contentLength, so) if err != nil { return nil, nil, err } @@ -102,7 +100,7 @@ func (fs *FilerServer) doPostAutoChunk(ctx context.Context, w http.ResponseWrite return } -func (fs *FilerServer) doPutAutoChunk(ctx context.Context, w http.ResponseWriter, r *http.Request, chunkSize int32, so *operation.StorageOption) (filerResult *FilerPostResult, md5bytes []byte, replyerr error) { +func (fs *FilerServer) doPutAutoChunk(ctx context.Context, w http.ResponseWriter, r *http.Request, chunkSize int32, contentLength int64, so *operation.StorageOption) (filerResult *FilerPostResult, md5bytes []byte, replyerr error) { fileName := path.Base(r.URL.Path) contentType := r.Header.Get("Content-Type") @@ -110,7 +108,7 @@ func (fs *FilerServer) doPutAutoChunk(ctx context.Context, w http.ResponseWriter contentType = "" } - fileChunks, md5Hash, chunkOffset, err, smallContent := fs.uploadReaderToChunks(w, r, r.Body, chunkSize, fileName, contentType, so) + fileChunks, md5Hash, chunkOffset, err, smallContent := fs.uploadReaderToChunks(w, r, r.Body, chunkSize, fileName, contentType, contentLength, so) if err != nil { return nil, nil, err } @@ -212,7 +210,7 @@ func (fs *FilerServer) saveMetaData(ctx context.Context, r *http.Request, fileNa entry.Extended = make(map[string][]byte) } - fs.saveAmzMetaData(r, entry) + SaveAmzMetaData(r, entry.Extended, false) for k, v := range r.Header { if len(v) > 0 && strings.HasPrefix(k, needle.PairNamePrefix) { @@ -229,92 +227,6 @@ func (fs *FilerServer) saveMetaData(ctx context.Context, r *http.Request, fileNa return filerResult, replyerr } -func (fs *FilerServer) uploadReaderToChunks(w http.ResponseWriter, r *http.Request, reader io.Reader, chunkSize int32, fileName, contentType string, so *operation.StorageOption) ([]*filer_pb.FileChunk, hash.Hash, int64, error, []byte) { - var fileChunks []*filer_pb.FileChunk - - md5Hash := md5.New() - var partReader = ioutil.NopCloser(io.TeeReader(reader, md5Hash)) - - chunkOffset := int64(0) - var smallContent []byte - - for { - limitedReader := io.LimitReader(partReader, int64(chunkSize)) - - data, err := ioutil.ReadAll(limitedReader) - if err != nil { - return nil, nil, 0, err, nil - } - if chunkOffset == 0 && !isAppend(r) { - if len(data) < fs.option.SaveToFilerLimit || strings.HasPrefix(r.URL.Path, filer.DirectoryEtcRoot) && len(data) < 4*1024 { - smallContent = data - chunkOffset += int64(len(data)) - break - } - } - dataReader := util.NewBytesReader(data) - - // retry to assign a different file id - var fileId, urlLocation string - var auth security.EncodedJwt - var assignErr, uploadErr error - var uploadResult *operation.UploadResult - for i := 0; i < 3; i++ { - // assign one file id for one chunk - fileId, urlLocation, auth, assignErr = fs.assignNewFileInfo(so) - if assignErr != nil { - return nil, nil, 0, assignErr, nil - } - - // upload the chunk to the volume server - uploadResult, uploadErr, _ = fs.doUpload(urlLocation, w, r, dataReader, fileName, contentType, nil, auth) - if uploadErr != nil { - time.Sleep(251 * time.Millisecond) - continue - } - break - } - if uploadErr != nil { - return nil, nil, 0, uploadErr, nil - } - - // if last chunk exhausted the reader exactly at the border - if uploadResult.Size == 0 { - break - } - - // Save to chunk manifest structure - fileChunks = append(fileChunks, uploadResult.ToPbFileChunk(fileId, chunkOffset)) - - glog.V(4).Infof("uploaded %s chunk %d to %s [%d,%d)", fileName, len(fileChunks), fileId, chunkOffset, chunkOffset+int64(uploadResult.Size)) - - // reset variables for the next chunk - chunkOffset = chunkOffset + int64(uploadResult.Size) - - // if last chunk was not at full chunk size, but already exhausted the reader - if int64(uploadResult.Size) < int64(chunkSize) { - break - } - } - - return fileChunks, md5Hash, chunkOffset, nil, smallContent -} - -func (fs *FilerServer) doUpload(urlLocation string, w http.ResponseWriter, r *http.Request, limitedReader io.Reader, fileName string, contentType string, pairMap map[string]string, auth security.EncodedJwt) (*operation.UploadResult, error, []byte) { - - stats.FilerRequestCounter.WithLabelValues("chunkUpload").Inc() - start := time.Now() - defer func() { - stats.FilerRequestHistogram.WithLabelValues("chunkUpload").Observe(time.Since(start).Seconds()) - }() - - uploadResult, err, data := operation.Upload(urlLocation, fileName, fs.option.Cipher, limitedReader, false, contentType, pairMap, auth) - if uploadResult != nil && uploadResult.RetryCount > 0 { - stats.FilerRequestCounter.WithLabelValues("chunkUploadRetry").Add(float64(uploadResult.RetryCount)) - } - return uploadResult, err, data -} - func (fs *FilerServer) saveAsChunk(so *operation.StorageOption) filer.SaveDataAsChunkFunctionType { return func(reader io.Reader, name string, offset int64) (*filer_pb.FileChunk, string, string, error) { @@ -383,17 +295,24 @@ func (fs *FilerServer) mkdir(ctx context.Context, w http.ResponseWriter, r *http return filerResult, replyerr } -func (fs *FilerServer) saveAmzMetaData(r *http.Request, entry *filer.Entry) { +func SaveAmzMetaData(r *http.Request, existing map[string][]byte, isReplace bool) (metadata map[string][]byte) { + + metadata = make(map[string][]byte) + if !isReplace { + for k, v := range existing { + metadata[k] = v + } + } if sc := r.Header.Get(xhttp.AmzStorageClass); sc != "" { - entry.Extended[xhttp.AmzStorageClass] = []byte(sc) + metadata[xhttp.AmzStorageClass] = []byte(sc) } if tags := r.Header.Get(xhttp.AmzObjectTagging); tags != "" { for _, v := range strings.Split(tags, "&") { tag := strings.Split(v, "=") if len(tag) == 2 { - entry.Extended[xhttp.AmzObjectTagging+"-"+tag[0]] = []byte(tag[1]) + metadata[xhttp.AmzObjectTagging+"-"+tag[0]] = []byte(tag[1]) } } } @@ -401,8 +320,11 @@ func (fs *FilerServer) saveAmzMetaData(r *http.Request, entry *filer.Entry) { for header, values := range r.Header { if strings.HasPrefix(header, xhttp.AmzUserMetaPrefix) { for _, value := range values { - entry.Extended[header] = []byte(value) + metadata[header] = []byte(value) } } } + + return + } diff --git a/weed/server/filer_server_handlers_write_upload.go b/weed/server/filer_server_handlers_write_upload.go new file mode 100644 index 000000000..03db942c6 --- /dev/null +++ b/weed/server/filer_server_handlers_write_upload.go @@ -0,0 +1,168 @@ +package weed_server + +import ( + "crypto/md5" + "hash" + "io" + "io/ioutil" + "net/http" + "runtime" + "strings" + "sync" + "time" + + "github.com/chrislusf/seaweedfs/weed/filer" + "github.com/chrislusf/seaweedfs/weed/glog" + "github.com/chrislusf/seaweedfs/weed/operation" + "github.com/chrislusf/seaweedfs/weed/pb/filer_pb" + "github.com/chrislusf/seaweedfs/weed/security" + "github.com/chrislusf/seaweedfs/weed/stats" + "github.com/chrislusf/seaweedfs/weed/util" +) + +var ( + limitedUploadProcessor = util.NewLimitedOutOfOrderProcessor(int32(runtime.NumCPU())) +) + +func (fs *FilerServer) uploadReaderToChunks(w http.ResponseWriter, r *http.Request, reader io.Reader, chunkSize int32, fileName, contentType string, contentLength int64, so *operation.StorageOption) (fileChunks []*filer_pb.FileChunk, md5Hash hash.Hash, dataSize int64, err error, smallContent []byte) { + + md5Hash = md5.New() + var partReader = ioutil.NopCloser(io.TeeReader(reader, md5Hash)) + + // save small content directly + if !isAppend(r) && ((0 < contentLength && contentLength < fs.option.SaveToFilerLimit) || strings.HasPrefix(r.URL.Path, filer.DirectoryEtcRoot) && contentLength < 4*1024) { + smallContent, err = ioutil.ReadAll(partReader) + dataSize = int64(len(smallContent)) + return + } + + resultsChan := make(chan *ChunkCreationResult, 2) + + var waitForAllData sync.WaitGroup + waitForAllData.Add(1) + go func() { + // process upload results + defer waitForAllData.Done() + for result := range resultsChan { + if result.err != nil { + err = result.err + continue + } + + // Save to chunk manifest structure + fileChunks = append(fileChunks, result.chunk) + } + }() + + var lock sync.Mutex + readOffset := int64(0) + var wg sync.WaitGroup + + for err == nil { + + wg.Add(1) + request := func() { + defer wg.Done() + + var localOffset int64 + // read from the input + lock.Lock() + localOffset = readOffset + limitedReader := io.LimitReader(partReader, int64(chunkSize)) + data, readErr := ioutil.ReadAll(limitedReader) + readOffset += int64(len(data)) + lock.Unlock() + // handle read errors + if readErr != nil { + if readErr != io.EOF { + if err == nil { + err = readErr + } + resultsChan <- &ChunkCreationResult{ + err: readErr, + } + } + return + } + if len(data) == 0 { + readErr = io.EOF + return + } + + // upload + dataReader := util.NewBytesReader(data) + fileId, uploadResult, uploadErr := fs.doCreateChunk(w, r, so, dataReader, fileName, contentType) + if uploadErr != nil { + if err == nil { + err = uploadErr + } + resultsChan <- &ChunkCreationResult{ + err: uploadErr, + } + return + } + + glog.V(4).Infof("uploaded %s to %s [%d,%d)", fileName, fileId, localOffset, localOffset+int64(uploadResult.Size)) + + // send back uploaded file chunk + resultsChan <- &ChunkCreationResult{ + chunk: uploadResult.ToPbFileChunk(fileId, localOffset), + } + + } + limitedUploadProcessor.Execute(request) + } + + go func() { + wg.Wait() + close(resultsChan) + }() + + waitForAllData.Wait() + + return fileChunks, md5Hash, readOffset, err, nil +} + +type ChunkCreationResult struct { + chunk *filer_pb.FileChunk + err error +} + +func (fs *FilerServer) doCreateChunk(w http.ResponseWriter, r *http.Request, so *operation.StorageOption, dataReader *util.BytesReader, fileName string, contentType string) (string, *operation.UploadResult, error) { + // retry to assign a different file id + var fileId, urlLocation string + var auth security.EncodedJwt + var assignErr, uploadErr error + var uploadResult *operation.UploadResult + for i := 0; i < 3; i++ { + // assign one file id for one chunk + fileId, urlLocation, auth, assignErr = fs.assignNewFileInfo(so) + if assignErr != nil { + return "", nil, assignErr + } + + // upload the chunk to the volume server + uploadResult, uploadErr, _ = fs.doUpload(urlLocation, w, r, dataReader, fileName, contentType, nil, auth) + if uploadErr != nil { + time.Sleep(251 * time.Millisecond) + continue + } + break + } + return fileId, uploadResult, uploadErr +} + +func (fs *FilerServer) doUpload(urlLocation string, w http.ResponseWriter, r *http.Request, limitedReader io.Reader, fileName string, contentType string, pairMap map[string]string, auth security.EncodedJwt) (*operation.UploadResult, error, []byte) { + + stats.FilerRequestCounter.WithLabelValues("chunkUpload").Inc() + start := time.Now() + defer func() { + stats.FilerRequestHistogram.WithLabelValues("chunkUpload").Observe(time.Since(start).Seconds()) + }() + + uploadResult, err, data := operation.Upload(urlLocation, fileName, fs.option.Cipher, limitedReader, false, contentType, pairMap, auth) + if uploadResult != nil && uploadResult.RetryCount > 0 { + stats.FilerRequestCounter.WithLabelValues("chunkUploadRetry").Add(float64(uploadResult.RetryCount)) + } + return uploadResult, err, data +} diff --git a/weed/server/master_grpc_server.go b/weed/server/master_grpc_server.go index 0f0b7f101..3e6d9bb9e 100644 --- a/weed/server/master_grpc_server.go +++ b/weed/server/master_grpc_server.go @@ -80,10 +80,14 @@ func (ms *MasterServer) SendHeartbeat(stream master_pb.Seaweed_SendHeartbeatServ dn.AdjustMaxVolumeCounts(heartbeat.MaxVolumeCounts) glog.V(4).Infof("master received heartbeat %s", heartbeat.String()) + var dataCenter string + if dc := dn.GetDataCenter(); dc != nil { + dataCenter = string(dc.Id()) + } message := &master_pb.VolumeLocation{ Url: dn.Url(), PublicUrl: dn.PublicUrl, - DataCenter: string(dn.GetDataCenter().Id()), + DataCenter: dataCenter, } if len(heartbeat.NewVolumes) > 0 || len(heartbeat.DeletedVolumes) > 0 { // process delta volume ids if exists for fast volume id updates diff --git a/weed/server/master_server.go b/weed/server/master_server.go index 9404081b4..e2b2df18d 100644 --- a/weed/server/master_server.go +++ b/weed/server/master_server.go @@ -277,6 +277,13 @@ func (ms *MasterServer) createSequencer(option *MasterOption) sequence.Sequencer glog.Error(err) seq = nil } + case "snowflake": + var err error + seq, err = sequence.NewSnowflakeSequencer(fmt.Sprintf("%s:%d", option.Host, option.Port)) + if err != nil { + glog.Error(err) + seq = nil + } default: seq = sequence.NewMemorySequencer() } diff --git a/weed/server/volume_grpc_read_write.go b/weed/server/volume_grpc_read_write.go new file mode 100644 index 000000000..988e9e145 --- /dev/null +++ b/weed/server/volume_grpc_read_write.go @@ -0,0 +1,38 @@ +package weed_server + +import ( + "context" + "fmt" + "github.com/chrislusf/seaweedfs/weed/pb/volume_server_pb" + "github.com/chrislusf/seaweedfs/weed/storage/needle" + "github.com/chrislusf/seaweedfs/weed/storage/types" +) + +func (vs *VolumeServer) ReadNeedleBlob(ctx context.Context, req *volume_server_pb.ReadNeedleBlobRequest) (resp *volume_server_pb.ReadNeedleBlobResponse, err error) { + resp = &volume_server_pb.ReadNeedleBlobResponse{} + v := vs.store.GetVolume(needle.VolumeId(req.VolumeId)) + if v == nil { + return nil, fmt.Errorf("not found volume id %d", req.VolumeId) + } + + resp.NeedleBlob, err = v.ReadNeedleBlob(req.Offset, types.Size(req.Size)) + if err != nil { + return nil, fmt.Errorf("read needle blob offset %d size %d: %v", req.Offset, req.Size, err) + } + + return resp, nil +} + +func (vs *VolumeServer) WriteNeedleBlob(ctx context.Context, req *volume_server_pb.WriteNeedleBlobRequest) (resp *volume_server_pb.WriteNeedleBlobResponse, err error) { + resp = &volume_server_pb.WriteNeedleBlobResponse{} + v := vs.store.GetVolume(needle.VolumeId(req.VolumeId)) + if v == nil { + return nil, fmt.Errorf("not found volume id %d", req.VolumeId) + } + + if err = v.WriteNeedleBlob(types.NeedleId(req.NeedleId), req.NeedleBlob, types.Size(req.Size)); err != nil { + return nil, fmt.Errorf("write blob needle %d size %d: %v", req.NeedleId, req.Size, err) + } + + return resp, nil +} diff --git a/weed/server/volume_server.go b/weed/server/volume_server.go index e496b1ce2..e11d607a4 100644 --- a/weed/server/volume_server.go +++ b/weed/server/volume_server.go @@ -4,6 +4,7 @@ import ( "fmt" "github.com/chrislusf/seaweedfs/weed/storage/types" "net/http" + "sync" "google.golang.org/grpc" @@ -34,6 +35,10 @@ type VolumeServer struct { fileSizeLimitBytes int64 isHeartbeating bool stopChan chan bool + + inFlightDataSize int64 + inFlightDataLimitCond *sync.Cond + concurrentUploadLimit int64 } func NewVolumeServer(adminMux, publicMux *http.ServeMux, ip string, @@ -48,6 +53,7 @@ func NewVolumeServer(adminMux, publicMux *http.ServeMux, ip string, readRedirect bool, compactionMBPerSecond int, fileSizeLimitMB int, + concurrentUploadLimit int64, ) *VolumeServer { v := util.GetViper() @@ -72,6 +78,8 @@ func NewVolumeServer(adminMux, publicMux *http.ServeMux, ip string, fileSizeLimitBytes: int64(fileSizeLimitMB) * 1024 * 1024, isHeartbeating: true, stopChan: make(chan bool), + inFlightDataLimitCond: sync.NewCond(new(sync.Mutex)), + concurrentUploadLimit: concurrentUploadLimit, } vs.SeedMasterNodes = masterNodes diff --git a/weed/server/volume_server_handlers.go b/weed/server/volume_server_handlers.go index 7852c950a..4527add44 100644 --- a/weed/server/volume_server_handlers.go +++ b/weed/server/volume_server_handlers.go @@ -2,7 +2,9 @@ package weed_server import ( "net/http" + "strconv" "strings" + "sync/atomic" "github.com/chrislusf/seaweedfs/weed/util" @@ -40,8 +42,24 @@ func (vs *VolumeServer) privateStoreHandler(w http.ResponseWriter, r *http.Reque stats.DeleteRequest() vs.guard.WhiteList(vs.DeleteHandler)(w, r) case "PUT", "POST": + + // wait until in flight data is less than the limit + contentLength := getContentLength(r) + vs.inFlightDataLimitCond.L.Lock() + for atomic.LoadInt64(&vs.inFlightDataSize) > vs.concurrentUploadLimit { + vs.inFlightDataLimitCond.Wait() + } + atomic.AddInt64(&vs.inFlightDataSize, contentLength) + vs.inFlightDataLimitCond.L.Unlock() + defer func() { + atomic.AddInt64(&vs.inFlightDataSize, -contentLength) + vs.inFlightDataLimitCond.Signal() + }() + + // processs uploads stats.WriteRequest() vs.guard.WhiteList(vs.PostHandler)(w, r) + case "OPTIONS": stats.ReadRequest() w.Header().Add("Access-Control-Allow-Methods", "PUT, POST, GET, DELETE, OPTIONS") @@ -49,6 +67,18 @@ func (vs *VolumeServer) privateStoreHandler(w http.ResponseWriter, r *http.Reque } } +func getContentLength(r *http.Request) int64 { + contentLength := r.Header.Get("Content-Length") + if contentLength != "" { + length, err := strconv.ParseInt(contentLength, 10, 64) + if err != nil { + return 0 + } + return length + } + return 0 +} + func (vs *VolumeServer) publicReadOnlyHandler(w http.ResponseWriter, r *http.Request) { w.Header().Set("Server", "SeaweedFS Volume "+util.VERSION) if r.Header.Get("Origin") != "" { diff --git a/weed/server/volume_server_handlers_read.go b/weed/server/volume_server_handlers_read.go index 2db46ac9b..3e977cfd4 100644 --- a/weed/server/volume_server_handlers_read.go +++ b/weed/server/volume_server_handlers_read.go @@ -27,7 +27,7 @@ var fileNameEscaper = strings.NewReplacer(`\`, `\\`, `"`, `\"`) func (vs *VolumeServer) GetOrHeadHandler(w http.ResponseWriter, r *http.Request) { - // println(r.Method + " " + r.URL.Path) + glog.V(9).Info(r.Method + " " + r.URL.Path + " " + r.Header.Get("Range")) stats.VolumeServerRequestCounter.WithLabelValues("get").Inc() start := time.Now() @@ -261,13 +261,10 @@ func writeResponseContent(filename, mimeType string, rs io.ReadSeeker, w http.Re return nil } - processRangeRequest(r, w, totalSize, mimeType, func(writer io.Writer, offset int64, size int64, httpStatusCode int) error { + processRangeRequest(r, w, totalSize, mimeType, func(writer io.Writer, offset int64, size int64) error { if _, e = rs.Seek(offset, 0); e != nil { return e } - if httpStatusCode != 0 { - w.WriteHeader(httpStatusCode) - } _, e = io.CopyN(writer, rs, size) return e }) |
