aboutsummaryrefslogtreecommitdiff
path: root/weed/server
diff options
context:
space:
mode:
authorKonstantin Lebedev <lebedev_k@tochka.com>2021-04-06 13:50:33 +0500
committerKonstantin Lebedev <lebedev_k@tochka.com>2021-04-06 13:50:33 +0500
commit011e6e90ee8a3aeff6f845fec90331ad4714b514 (patch)
treeb661a90a1cc8c77b2085f120420b0bdd537bcf0d /weed/server
parented79baa30fe5687a35a9a61e2dcf3b4750064d36 (diff)
parent100ed773870b8826352f25e0cd72f60a591ecfa8 (diff)
downloadseaweedfs-011e6e90ee8a3aeff6f845fec90331ad4714b514.tar.xz
seaweedfs-011e6e90ee8a3aeff6f845fec90331ad4714b514.zip
Merge branch 'upstreamMaster' into iamapipr
Diffstat (limited to 'weed/server')
-rw-r--r--weed/server/common.go8
-rw-r--r--weed/server/filer_grpc_server_rename.go25
-rw-r--r--weed/server/filer_server.go45
-rw-r--r--weed/server/filer_server_handlers.go39
-rw-r--r--weed/server/filer_server_handlers_read.go13
-rw-r--r--weed/server/filer_server_handlers_write.go4
-rw-r--r--weed/server/filer_server_handlers_write_autochunk.go127
-rw-r--r--weed/server/filer_server_handlers_write_upload.go175
-rw-r--r--weed/server/master_grpc_server.go6
-rw-r--r--weed/server/master_grpc_server_volume.go7
-rw-r--r--weed/server/master_server.go7
-rw-r--r--weed/server/volume_grpc_read_write.go38
-rw-r--r--weed/server/volume_server.go8
-rw-r--r--weed/server/volume_server_handlers.go30
-rw-r--r--weed/server/volume_server_handlers_read.go7
-rw-r--r--weed/server/volume_server_tcp_handlers_write.go137
16 files changed, 504 insertions, 172 deletions
diff --git a/weed/server/common.go b/weed/server/common.go
index 9001a3b33..5c5f1b8eb 100644
--- a/weed/server/common.go
+++ b/weed/server/common.go
@@ -234,12 +234,12 @@ func adjustHeaderContentDisposition(w http.ResponseWriter, r *http.Request, file
}
}
-func processRangeRequest(r *http.Request, w http.ResponseWriter, totalSize int64, mimeType string, writeFn func(writer io.Writer, offset int64, size int64, httpStatusCode int) error) {
+func processRangeRequest(r *http.Request, w http.ResponseWriter, totalSize int64, mimeType string, writeFn func(writer io.Writer, offset int64, size int64) error) {
rangeReq := r.Header.Get("Range")
if rangeReq == "" {
w.Header().Set("Content-Length", strconv.FormatInt(totalSize, 10))
- if err := writeFn(w, 0, totalSize, 0); err != nil {
+ if err := writeFn(w, 0, totalSize); err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}
@@ -279,7 +279,7 @@ func processRangeRequest(r *http.Request, w http.ResponseWriter, totalSize int64
w.Header().Set("Content-Length", strconv.FormatInt(ra.length, 10))
w.Header().Set("Content-Range", ra.contentRange(totalSize))
- err = writeFn(w, ra.start, ra.length, http.StatusPartialContent)
+ err = writeFn(w, ra.start, ra.length)
if err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError)
return
@@ -307,7 +307,7 @@ func processRangeRequest(r *http.Request, w http.ResponseWriter, totalSize int64
pw.CloseWithError(e)
return
}
- if e = writeFn(part, ra.start, ra.length, 0); e != nil {
+ if e = writeFn(part, ra.start, ra.length); e != nil {
pw.CloseWithError(e)
return
}
diff --git a/weed/server/filer_grpc_server_rename.go b/weed/server/filer_grpc_server_rename.go
index 5b68b64de..c1e5bc789 100644
--- a/weed/server/filer_grpc_server_rename.go
+++ b/weed/server/filer_grpc_server_rename.go
@@ -33,8 +33,7 @@ func (fs *FilerServer) AtomicRenameEntry(ctx context.Context, req *filer_pb.Atom
return nil, fmt.Errorf("%s/%s not found: %v", req.OldDirectory, req.OldName, err)
}
- var events MoveEvents
- moveErr := fs.moveEntry(ctx, oldParent, oldEntry, newParent, req.NewName, &events)
+ moveErr := fs.moveEntry(ctx, oldParent, oldEntry, newParent, req.NewName)
if moveErr != nil {
fs.filer.RollbackTransaction(ctx)
return nil, fmt.Errorf("%s/%s move error: %v", req.OldDirectory, req.OldName, moveErr)
@@ -48,11 +47,11 @@ func (fs *FilerServer) AtomicRenameEntry(ctx context.Context, req *filer_pb.Atom
return &filer_pb.AtomicRenameEntryResponse{}, nil
}
-func (fs *FilerServer) moveEntry(ctx context.Context, oldParent util.FullPath, entry *filer.Entry, newParent util.FullPath, newName string, events *MoveEvents) error {
+func (fs *FilerServer) moveEntry(ctx context.Context, oldParent util.FullPath, entry *filer.Entry, newParent util.FullPath, newName string) error {
- if err := fs.moveSelfEntry(ctx, oldParent, entry, newParent, newName, events, func() error {
+ if err := fs.moveSelfEntry(ctx, oldParent, entry, newParent, newName, func() error {
if entry.IsDirectory() {
- if err := fs.moveFolderSubEntries(ctx, oldParent, entry, newParent, newName, events); err != nil {
+ if err := fs.moveFolderSubEntries(ctx, oldParent, entry, newParent, newName); err != nil {
return err
}
}
@@ -64,7 +63,7 @@ func (fs *FilerServer) moveEntry(ctx context.Context, oldParent util.FullPath, e
return nil
}
-func (fs *FilerServer) moveFolderSubEntries(ctx context.Context, oldParent util.FullPath, entry *filer.Entry, newParent util.FullPath, newName string, events *MoveEvents) error {
+func (fs *FilerServer) moveFolderSubEntries(ctx context.Context, oldParent util.FullPath, entry *filer.Entry, newParent util.FullPath, newName string) error {
currentDirPath := oldParent.Child(entry.Name())
newDirPath := newParent.Child(newName)
@@ -85,7 +84,7 @@ func (fs *FilerServer) moveFolderSubEntries(ctx context.Context, oldParent util.
for _, item := range entries {
lastFileName = item.Name()
// println("processing", lastFileName)
- err := fs.moveEntry(ctx, currentDirPath, item, newDirPath, item.Name(), events)
+ err := fs.moveEntry(ctx, currentDirPath, item, newDirPath, item.Name())
if err != nil {
return err
}
@@ -97,8 +96,7 @@ func (fs *FilerServer) moveFolderSubEntries(ctx context.Context, oldParent util.
return nil
}
-func (fs *FilerServer) moveSelfEntry(ctx context.Context, oldParent util.FullPath, entry *filer.Entry, newParent util.FullPath, newName string, events *MoveEvents,
- moveFolderSubEntries func() error) error {
+func (fs *FilerServer) moveSelfEntry(ctx context.Context, oldParent util.FullPath, entry *filer.Entry, newParent util.FullPath, newName string, moveFolderSubEntries func() error) error {
oldPath, newPath := oldParent.Child(entry.Name()), newParent.Child(newName)
@@ -122,8 +120,6 @@ func (fs *FilerServer) moveSelfEntry(ctx context.Context, oldParent util.FullPat
return createErr
}
- events.newEntries = append(events.newEntries, newEntry)
-
if moveFolderSubEntries != nil {
if moveChildrenErr := moveFolderSubEntries(); moveChildrenErr != nil {
return moveChildrenErr
@@ -136,13 +132,6 @@ func (fs *FilerServer) moveSelfEntry(ctx context.Context, oldParent util.FullPat
return deleteErr
}
- events.oldEntries = append(events.oldEntries, entry)
-
return nil
}
-
-type MoveEvents struct {
- oldEntries []*filer.Entry
- newEntries []*filer.Entry
-}
diff --git a/weed/server/filer_server.go b/weed/server/filer_server.go
index 22474a5e2..2734223ea 100644
--- a/weed/server/filer_server.go
+++ b/weed/server/filer_server.go
@@ -45,22 +45,23 @@ import (
)
type FilerOption struct {
- Masters []string
- Collection string
- DefaultReplication string
- DisableDirListing bool
- MaxMB int
- DirListingLimit int
- DataCenter string
- Rack string
- DefaultLevelDbDir string
- DisableHttp bool
- Host string
- Port uint32
- recursiveDelete bool
- Cipher bool
- SaveToFilerLimit int
- Filers []string
+ Masters []string
+ Collection string
+ DefaultReplication string
+ DisableDirListing bool
+ MaxMB int
+ DirListingLimit int
+ DataCenter string
+ Rack string
+ DefaultLevelDbDir string
+ DisableHttp bool
+ Host string
+ Port uint32
+ recursiveDelete bool
+ Cipher bool
+ SaveToFilerLimit int64
+ Filers []string
+ ConcurrentUploadLimit int64
}
type FilerServer struct {
@@ -79,14 +80,18 @@ type FilerServer struct {
brokers map[string]map[string]bool
brokersLock sync.Mutex
+
+ inFlightDataSize int64
+ inFlightDataLimitCond *sync.Cond
}
func NewFilerServer(defaultMux, readonlyMux *http.ServeMux, option *FilerOption) (fs *FilerServer, err error) {
fs = &FilerServer{
- option: option,
- grpcDialOption: security.LoadClientTLS(util.GetViper(), "grpc.filer"),
- brokers: make(map[string]map[string]bool),
+ option: option,
+ grpcDialOption: security.LoadClientTLS(util.GetViper(), "grpc.filer"),
+ brokers: make(map[string]map[string]bool),
+ inFlightDataLimitCond: sync.NewCond(new(sync.Mutex)),
}
fs.listenersCond = sync.NewCond(&fs.listenersLock)
@@ -153,7 +158,7 @@ func NewFilerServer(defaultMux, readonlyMux *http.ServeMux, option *FilerOption)
func (fs *FilerServer) checkWithMaster() {
for _, master := range fs.option.Masters {
- _, err := pb.ParseFilerGrpcAddress(master)
+ _, err := pb.ParseServerToGrpcAddress(master)
if err != nil {
glog.Fatalf("invalid master address %s: %v", master, err)
}
diff --git a/weed/server/filer_server_handlers.go b/weed/server/filer_server_handlers.go
index 3bc0c5d0d..ed6bbb6f6 100644
--- a/weed/server/filer_server_handlers.go
+++ b/weed/server/filer_server_handlers.go
@@ -4,6 +4,7 @@ import (
"github.com/chrislusf/seaweedfs/weed/util"
"net/http"
"strings"
+ "sync/atomic"
"time"
"github.com/chrislusf/seaweedfs/weed/stats"
@@ -47,18 +48,34 @@ func (fs *FilerServer) filerHandler(w http.ResponseWriter, r *http.Request) {
fs.DeleteHandler(w, r)
}
stats.FilerRequestHistogram.WithLabelValues("delete").Observe(time.Since(start).Seconds())
- case "PUT":
- stats.FilerRequestCounter.WithLabelValues("put").Inc()
- if _, ok := r.URL.Query()["tagging"]; ok {
- fs.PutTaggingHandler(w, r)
- } else {
- fs.PostHandler(w, r)
+ case "POST", "PUT":
+
+ // wait until in flight data is less than the limit
+ contentLength := getContentLength(r)
+ fs.inFlightDataLimitCond.L.Lock()
+ for atomic.LoadInt64(&fs.inFlightDataSize) > fs.option.ConcurrentUploadLimit {
+ fs.inFlightDataLimitCond.Wait()
+ }
+ atomic.AddInt64(&fs.inFlightDataSize, contentLength)
+ fs.inFlightDataLimitCond.L.Unlock()
+ defer func() {
+ atomic.AddInt64(&fs.inFlightDataSize, -contentLength)
+ fs.inFlightDataLimitCond.Signal()
+ }()
+
+ if r.Method == "PUT" {
+ stats.FilerRequestCounter.WithLabelValues("put").Inc()
+ if _, ok := r.URL.Query()["tagging"]; ok {
+ fs.PutTaggingHandler(w, r)
+ } else {
+ fs.PostHandler(w, r, contentLength)
+ }
+ stats.FilerRequestHistogram.WithLabelValues("put").Observe(time.Since(start).Seconds())
+ } else { // method == "POST"
+ stats.FilerRequestCounter.WithLabelValues("post").Inc()
+ fs.PostHandler(w, r, contentLength)
+ stats.FilerRequestHistogram.WithLabelValues("post").Observe(time.Since(start).Seconds())
}
- stats.FilerRequestHistogram.WithLabelValues("put").Observe(time.Since(start).Seconds())
- case "POST":
- stats.FilerRequestCounter.WithLabelValues("post").Inc()
- fs.PostHandler(w, r)
- stats.FilerRequestHistogram.WithLabelValues("post").Observe(time.Since(start).Seconds())
case "OPTIONS":
stats.FilerRequestCounter.WithLabelValues("options").Inc()
OptionsHandler(w, r, false)
diff --git a/weed/server/filer_server_handlers_read.go b/weed/server/filer_server_handlers_read.go
index 160ee9d97..f90b070a2 100644
--- a/weed/server/filer_server_handlers_read.go
+++ b/weed/server/filer_server_handlers_read.go
@@ -131,6 +131,9 @@ func (fs *FilerServer) GetOrHeadHandler(w http.ResponseWriter, r *http.Request,
if r.Method == "HEAD" {
w.Header().Set("Content-Length", strconv.FormatInt(totalSize, 10))
+ processRangeRequest(r, w, totalSize, mimeType, func(writer io.Writer, offset int64, size int64) error {
+ return filer.StreamContent(fs.filer.MasterClient, writer, entry.Chunks, offset, size, true)
+ })
return
}
@@ -150,15 +153,15 @@ func (fs *FilerServer) GetOrHeadHandler(w http.ResponseWriter, r *http.Request,
}
}
- processRangeRequest(r, w, totalSize, mimeType, func(writer io.Writer, offset int64, size int64, httpStatusCode int) error {
- if httpStatusCode != 0 {
- w.WriteHeader(httpStatusCode)
- }
+ processRangeRequest(r, w, totalSize, mimeType, func(writer io.Writer, offset int64, size int64) error {
if offset+size <= int64(len(entry.Content)) {
_, err := writer.Write(entry.Content[offset : offset+size])
+ if err != nil {
+ glog.Errorf("failed to write entry content: %v", err)
+ }
return err
}
- return filer.StreamContent(fs.filer.MasterClient, writer, entry.Chunks, offset, size)
+ return filer.StreamContent(fs.filer.MasterClient, writer, entry.Chunks, offset, size, false)
})
}
diff --git a/weed/server/filer_server_handlers_write.go b/weed/server/filer_server_handlers_write.go
index 0ce3e4a58..95eba9d3d 100644
--- a/weed/server/filer_server_handlers_write.go
+++ b/weed/server/filer_server_handlers_write.go
@@ -52,7 +52,7 @@ func (fs *FilerServer) assignNewFileInfo(so *operation.StorageOption) (fileId, u
return
}
-func (fs *FilerServer) PostHandler(w http.ResponseWriter, r *http.Request) {
+func (fs *FilerServer) PostHandler(w http.ResponseWriter, r *http.Request, contentLength int64) {
ctx := context.Background()
@@ -66,7 +66,7 @@ func (fs *FilerServer) PostHandler(w http.ResponseWriter, r *http.Request) {
query.Get("rack"),
)
- fs.autoChunk(ctx, w, r, so)
+ fs.autoChunk(ctx, w, r, contentLength, so)
util.CloseRequest(r)
}
diff --git a/weed/server/filer_server_handlers_write_autochunk.go b/weed/server/filer_server_handlers_write_autochunk.go
index d3ce7e605..2808042c7 100644
--- a/weed/server/filer_server_handlers_write_autochunk.go
+++ b/weed/server/filer_server_handlers_write_autochunk.go
@@ -2,11 +2,8 @@ package weed_server
import (
"context"
- "crypto/md5"
"fmt"
- "hash"
"io"
- "io/ioutil"
"net/http"
"os"
"path"
@@ -19,13 +16,12 @@ import (
"github.com/chrislusf/seaweedfs/weed/operation"
"github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
xhttp "github.com/chrislusf/seaweedfs/weed/s3api/http"
- "github.com/chrislusf/seaweedfs/weed/security"
"github.com/chrislusf/seaweedfs/weed/stats"
"github.com/chrislusf/seaweedfs/weed/storage/needle"
"github.com/chrislusf/seaweedfs/weed/util"
)
-func (fs *FilerServer) autoChunk(ctx context.Context, w http.ResponseWriter, r *http.Request, so *operation.StorageOption) {
+func (fs *FilerServer) autoChunk(ctx context.Context, w http.ResponseWriter, r *http.Request, contentLength int64, so *operation.StorageOption) {
// autoChunking can be set at the command-line level or as a query param. Query param overrides command-line
query := r.URL.Query()
@@ -38,10 +34,10 @@ func (fs *FilerServer) autoChunk(ctx context.Context, w http.ResponseWriter, r *
chunkSize := 1024 * 1024 * maxMB
- stats.FilerRequestCounter.WithLabelValues("postAutoChunk").Inc()
+ stats.FilerRequestCounter.WithLabelValues("chunk").Inc()
start := time.Now()
defer func() {
- stats.FilerRequestHistogram.WithLabelValues("postAutoChunk").Observe(time.Since(start).Seconds())
+ stats.FilerRequestHistogram.WithLabelValues("chunk").Observe(time.Since(start).Seconds())
}()
var reply *FilerPostResult
@@ -51,14 +47,16 @@ func (fs *FilerServer) autoChunk(ctx context.Context, w http.ResponseWriter, r *
if r.Header.Get("Content-Type") == "" && strings.HasSuffix(r.URL.Path, "/") {
reply, err = fs.mkdir(ctx, w, r)
} else {
- reply, md5bytes, err = fs.doPostAutoChunk(ctx, w, r, chunkSize, so)
+ reply, md5bytes, err = fs.doPostAutoChunk(ctx, w, r, chunkSize, contentLength, so)
}
} else {
- reply, md5bytes, err = fs.doPutAutoChunk(ctx, w, r, chunkSize, so)
+ reply, md5bytes, err = fs.doPutAutoChunk(ctx, w, r, chunkSize, contentLength, so)
}
if err != nil {
if strings.HasPrefix(err.Error(), "read input:") {
writeJsonError(w, r, 499, err)
+ } else if strings.HasSuffix(err.Error(), "is a file") {
+ writeJsonError(w, r, http.StatusConflict, err)
} else {
writeJsonError(w, r, http.StatusInternalServerError, err)
}
@@ -70,7 +68,7 @@ func (fs *FilerServer) autoChunk(ctx context.Context, w http.ResponseWriter, r *
}
}
-func (fs *FilerServer) doPostAutoChunk(ctx context.Context, w http.ResponseWriter, r *http.Request, chunkSize int32, so *operation.StorageOption) (filerResult *FilerPostResult, md5bytes []byte, replyerr error) {
+func (fs *FilerServer) doPostAutoChunk(ctx context.Context, w http.ResponseWriter, r *http.Request, chunkSize int32, contentLength int64, so *operation.StorageOption) (filerResult *FilerPostResult, md5bytes []byte, replyerr error) {
multipartReader, multipartReaderErr := r.MultipartReader()
if multipartReaderErr != nil {
@@ -91,7 +89,7 @@ func (fs *FilerServer) doPostAutoChunk(ctx context.Context, w http.ResponseWrite
contentType = ""
}
- fileChunks, md5Hash, chunkOffset, err, smallContent := fs.uploadReaderToChunks(w, r, part1, chunkSize, fileName, contentType, so)
+ fileChunks, md5Hash, chunkOffset, err, smallContent := fs.uploadReaderToChunks(w, r, part1, chunkSize, fileName, contentType, contentLength, so)
if err != nil {
return nil, nil, err
}
@@ -102,7 +100,7 @@ func (fs *FilerServer) doPostAutoChunk(ctx context.Context, w http.ResponseWrite
return
}
-func (fs *FilerServer) doPutAutoChunk(ctx context.Context, w http.ResponseWriter, r *http.Request, chunkSize int32, so *operation.StorageOption) (filerResult *FilerPostResult, md5bytes []byte, replyerr error) {
+func (fs *FilerServer) doPutAutoChunk(ctx context.Context, w http.ResponseWriter, r *http.Request, chunkSize int32, contentLength int64, so *operation.StorageOption) (filerResult *FilerPostResult, md5bytes []byte, replyerr error) {
fileName := path.Base(r.URL.Path)
contentType := r.Header.Get("Content-Type")
@@ -110,7 +108,7 @@ func (fs *FilerServer) doPutAutoChunk(ctx context.Context, w http.ResponseWriter
contentType = ""
}
- fileChunks, md5Hash, chunkOffset, err, smallContent := fs.uploadReaderToChunks(w, r, r.Body, chunkSize, fileName, contentType, so)
+ fileChunks, md5Hash, chunkOffset, err, smallContent := fs.uploadReaderToChunks(w, r, r.Body, chunkSize, fileName, contentType, contentLength, so)
if err != nil {
return nil, nil, err
}
@@ -212,7 +210,7 @@ func (fs *FilerServer) saveMetaData(ctx context.Context, r *http.Request, fileNa
entry.Extended = make(map[string][]byte)
}
- fs.saveAmzMetaData(r, entry)
+ SaveAmzMetaData(r, entry.Extended, false)
for k, v := range r.Header {
if len(v) > 0 && strings.HasPrefix(k, needle.PairNamePrefix) {
@@ -229,89 +227,6 @@ func (fs *FilerServer) saveMetaData(ctx context.Context, r *http.Request, fileNa
return filerResult, replyerr
}
-func (fs *FilerServer) uploadReaderToChunks(w http.ResponseWriter, r *http.Request, reader io.Reader, chunkSize int32, fileName, contentType string, so *operation.StorageOption) ([]*filer_pb.FileChunk, hash.Hash, int64, error, []byte) {
- var fileChunks []*filer_pb.FileChunk
-
- md5Hash := md5.New()
- var partReader = ioutil.NopCloser(io.TeeReader(reader, md5Hash))
-
- chunkOffset := int64(0)
- var smallContent []byte
-
- for {
- limitedReader := io.LimitReader(partReader, int64(chunkSize))
-
- data, err := ioutil.ReadAll(limitedReader)
- if err != nil {
- return nil, nil, 0, err, nil
- }
- if chunkOffset == 0 && !isAppend(r) {
- if len(data) < fs.option.SaveToFilerLimit || strings.HasPrefix(r.URL.Path, filer.DirectoryEtcRoot) && len(data) < 4*1024 {
- smallContent = data
- chunkOffset += int64(len(data))
- break
- }
- }
- dataReader := util.NewBytesReader(data)
-
- // retry to assign a different file id
- var fileId, urlLocation string
- var auth security.EncodedJwt
- var assignErr, uploadErr error
- var uploadResult *operation.UploadResult
- for i := 0; i < 3; i++ {
- // assign one file id for one chunk
- fileId, urlLocation, auth, assignErr = fs.assignNewFileInfo(so)
- if assignErr != nil {
- return nil, nil, 0, assignErr, nil
- }
-
- // upload the chunk to the volume server
- uploadResult, uploadErr, _ = fs.doUpload(urlLocation, w, r, dataReader, fileName, contentType, nil, auth)
- if uploadErr != nil {
- time.Sleep(251 * time.Millisecond)
- continue
- }
- break
- }
- if uploadErr != nil {
- return nil, nil, 0, uploadErr, nil
- }
-
- // if last chunk exhausted the reader exactly at the border
- if uploadResult.Size == 0 {
- break
- }
-
- // Save to chunk manifest structure
- fileChunks = append(fileChunks, uploadResult.ToPbFileChunk(fileId, chunkOffset))
-
- glog.V(4).Infof("uploaded %s chunk %d to %s [%d,%d)", fileName, len(fileChunks), fileId, chunkOffset, chunkOffset+int64(uploadResult.Size))
-
- // reset variables for the next chunk
- chunkOffset = chunkOffset + int64(uploadResult.Size)
-
- // if last chunk was not at full chunk size, but already exhausted the reader
- if int64(uploadResult.Size) < int64(chunkSize) {
- break
- }
- }
-
- return fileChunks, md5Hash, chunkOffset, nil, smallContent
-}
-
-func (fs *FilerServer) doUpload(urlLocation string, w http.ResponseWriter, r *http.Request, limitedReader io.Reader, fileName string, contentType string, pairMap map[string]string, auth security.EncodedJwt) (*operation.UploadResult, error, []byte) {
-
- stats.FilerRequestCounter.WithLabelValues("postAutoChunkUpload").Inc()
- start := time.Now()
- defer func() {
- stats.FilerRequestHistogram.WithLabelValues("postAutoChunkUpload").Observe(time.Since(start).Seconds())
- }()
-
- uploadResult, err, data := operation.Upload(urlLocation, fileName, fs.option.Cipher, limitedReader, false, contentType, pairMap, auth)
- return uploadResult, err, data
-}
-
func (fs *FilerServer) saveAsChunk(so *operation.StorageOption) filer.SaveDataAsChunkFunctionType {
return func(reader io.Reader, name string, offset int64) (*filer_pb.FileChunk, string, string, error) {
@@ -380,17 +295,24 @@ func (fs *FilerServer) mkdir(ctx context.Context, w http.ResponseWriter, r *http
return filerResult, replyerr
}
-func (fs *FilerServer) saveAmzMetaData(r *http.Request, entry *filer.Entry) {
+func SaveAmzMetaData(r *http.Request, existing map[string][]byte, isReplace bool) (metadata map[string][]byte) {
+
+ metadata = make(map[string][]byte)
+ if !isReplace {
+ for k, v := range existing {
+ metadata[k] = v
+ }
+ }
if sc := r.Header.Get(xhttp.AmzStorageClass); sc != "" {
- entry.Extended[xhttp.AmzStorageClass] = []byte(sc)
+ metadata[xhttp.AmzStorageClass] = []byte(sc)
}
if tags := r.Header.Get(xhttp.AmzObjectTagging); tags != "" {
for _, v := range strings.Split(tags, "&") {
tag := strings.Split(v, "=")
if len(tag) == 2 {
- entry.Extended[xhttp.AmzObjectTagging+"-"+tag[0]] = []byte(tag[1])
+ metadata[xhttp.AmzObjectTagging+"-"+tag[0]] = []byte(tag[1])
}
}
}
@@ -398,8 +320,11 @@ func (fs *FilerServer) saveAmzMetaData(r *http.Request, entry *filer.Entry) {
for header, values := range r.Header {
if strings.HasPrefix(header, xhttp.AmzUserMetaPrefix) {
for _, value := range values {
- entry.Extended[header] = []byte(value)
+ metadata[header] = []byte(value)
}
}
}
+
+ return
+
}
diff --git a/weed/server/filer_server_handlers_write_upload.go b/weed/server/filer_server_handlers_write_upload.go
new file mode 100644
index 000000000..81b2ce1b0
--- /dev/null
+++ b/weed/server/filer_server_handlers_write_upload.go
@@ -0,0 +1,175 @@
+package weed_server
+
+import (
+ "crypto/md5"
+ "hash"
+ "io"
+ "io/ioutil"
+ "net/http"
+ "runtime"
+ "strings"
+ "sync"
+ "time"
+
+ "github.com/chrislusf/seaweedfs/weed/filer"
+ "github.com/chrislusf/seaweedfs/weed/glog"
+ "github.com/chrislusf/seaweedfs/weed/operation"
+ "github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
+ "github.com/chrislusf/seaweedfs/weed/security"
+ "github.com/chrislusf/seaweedfs/weed/stats"
+ "github.com/chrislusf/seaweedfs/weed/util"
+)
+
+var (
+ limitedUploadProcessor = util.NewLimitedOutOfOrderProcessor(int32(runtime.NumCPU()))
+)
+
+func (fs *FilerServer) uploadReaderToChunks(w http.ResponseWriter, r *http.Request, reader io.Reader, chunkSize int32, fileName, contentType string, contentLength int64, so *operation.StorageOption) (fileChunks []*filer_pb.FileChunk, md5Hash hash.Hash, dataSize int64, err error, smallContent []byte) {
+
+ md5Hash = md5.New()
+ var partReader = ioutil.NopCloser(io.TeeReader(reader, md5Hash))
+
+ // save small content directly
+ if !isAppend(r) && ((0 < contentLength && contentLength < fs.option.SaveToFilerLimit) || strings.HasPrefix(r.URL.Path, filer.DirectoryEtcRoot) && contentLength < 4*1024) {
+ smallContent, err = ioutil.ReadAll(partReader)
+ dataSize = int64(len(smallContent))
+ return
+ }
+
+ resultsChan := make(chan *ChunkCreationResult, 2)
+
+ var waitForAllData sync.WaitGroup
+ waitForAllData.Add(1)
+ go func() {
+ // process upload results
+ defer waitForAllData.Done()
+ for result := range resultsChan {
+ if result.err != nil {
+ err = result.err
+ continue
+ }
+
+ // Save to chunk manifest structure
+ fileChunks = append(fileChunks, result.chunk)
+ }
+ }()
+
+ var lock sync.Mutex
+ readOffset := int64(0)
+ var wg sync.WaitGroup
+
+ for err == nil {
+
+ wg.Add(1)
+ request := func() {
+ defer wg.Done()
+
+ var localOffset int64
+ // read from the input
+ lock.Lock()
+ localOffset = readOffset
+ limitedReader := io.LimitReader(partReader, int64(chunkSize))
+ data, readErr := ioutil.ReadAll(limitedReader)
+ readOffset += int64(len(data))
+ lock.Unlock()
+ // handle read errors
+ if readErr != nil {
+ if err == nil {
+ err = readErr
+ }
+ if readErr != io.EOF {
+ resultsChan <- &ChunkCreationResult{
+ err: readErr,
+ }
+ }
+ return
+ }
+ if len(data) == 0 {
+ readErr = io.EOF
+ if err == nil {
+ err = readErr
+ }
+ return
+ }
+
+ // upload
+ dataReader := util.NewBytesReader(data)
+ fileId, uploadResult, uploadErr := fs.doCreateChunk(w, r, so, dataReader, fileName, contentType)
+ if uploadErr != nil {
+ if err == nil {
+ err = uploadErr
+ }
+ resultsChan <- &ChunkCreationResult{
+ err: uploadErr,
+ }
+ return
+ }
+
+ glog.V(4).Infof("uploaded %s to %s [%d,%d)", fileName, fileId, localOffset, localOffset+int64(uploadResult.Size))
+
+ // send back uploaded file chunk
+ resultsChan <- &ChunkCreationResult{
+ chunk: uploadResult.ToPbFileChunk(fileId, localOffset),
+ }
+
+ }
+ limitedUploadProcessor.Execute(request)
+ }
+
+ go func() {
+ wg.Wait()
+ close(resultsChan)
+ }()
+
+ waitForAllData.Wait()
+
+ if err == io.EOF {
+ err = nil
+ }
+
+ return fileChunks, md5Hash, readOffset, err, nil
+}
+
+type ChunkCreationResult struct {
+ chunk *filer_pb.FileChunk
+ err error
+}
+
+func (fs *FilerServer) doCreateChunk(w http.ResponseWriter, r *http.Request, so *operation.StorageOption, dataReader *util.BytesReader, fileName string, contentType string) (string, *operation.UploadResult, error) {
+ // retry to assign a different file id
+ var fileId, urlLocation string
+ var auth security.EncodedJwt
+ var assignErr, uploadErr error
+ var uploadResult *operation.UploadResult
+ for i := 0; i < 3; i++ {
+ // assign one file id for one chunk
+ fileId, urlLocation, auth, assignErr = fs.assignNewFileInfo(so)
+ if assignErr != nil {
+ return "", nil, assignErr
+ }
+
+ // upload the chunk to the volume server
+ uploadResult, uploadErr, _ = fs.doUpload(urlLocation, w, r, dataReader, fileName, contentType, nil, auth)
+ if uploadErr != nil {
+ time.Sleep(251 * time.Millisecond)
+ continue
+ }
+ break
+ }
+ return fileId, uploadResult, uploadErr
+}
+
+func (fs *FilerServer) doUpload(urlLocation string, w http.ResponseWriter, r *http.Request, limitedReader io.Reader, fileName string, contentType string, pairMap map[string]string, auth security.EncodedJwt) (*operation.UploadResult, error, []byte) {
+
+ stats.FilerRequestCounter.WithLabelValues("chunkUpload").Inc()
+ start := time.Now()
+ defer func() {
+ stats.FilerRequestHistogram.WithLabelValues("chunkUpload").Observe(time.Since(start).Seconds())
+ }()
+
+ uploadResult, err, data := operation.Upload(urlLocation, fileName, fs.option.Cipher, limitedReader, false, contentType, pairMap, auth)
+ if uploadResult != nil && uploadResult.RetryCount > 0 {
+ stats.FilerRequestCounter.WithLabelValues("chunkUploadRetry").Add(float64(uploadResult.RetryCount))
+ }
+ return uploadResult, err, data
+}
diff --git a/weed/server/master_grpc_server.go b/weed/server/master_grpc_server.go
index 0f0b7f101..3e6d9bb9e 100644
--- a/weed/server/master_grpc_server.go
+++ b/weed/server/master_grpc_server.go
@@ -80,10 +80,14 @@ func (ms *MasterServer) SendHeartbeat(stream master_pb.Seaweed_SendHeartbeatServ
dn.AdjustMaxVolumeCounts(heartbeat.MaxVolumeCounts)
glog.V(4).Infof("master received heartbeat %s", heartbeat.String())
+ var dataCenter string
+ if dc := dn.GetDataCenter(); dc != nil {
+ dataCenter = string(dc.Id())
+ }
message := &master_pb.VolumeLocation{
Url: dn.Url(),
PublicUrl: dn.PublicUrl,
- DataCenter: string(dn.GetDataCenter().Id()),
+ DataCenter: dataCenter,
}
if len(heartbeat.NewVolumes) > 0 || len(heartbeat.DeletedVolumes) > 0 {
// process delta volume ids if exists for fast volume id updates
diff --git a/weed/server/master_grpc_server_volume.go b/weed/server/master_grpc_server_volume.go
index 29aff5c0b..156afd4a1 100644
--- a/weed/server/master_grpc_server_volume.go
+++ b/weed/server/master_grpc_server_volume.go
@@ -77,7 +77,7 @@ func (ms *MasterServer) Assign(ctx context.Context, req *master_pb.AssignRequest
if !ms.Topo.HasWritableVolume(option) {
if ms.Topo.AvailableSpaceFor(option) <= 0 {
- return nil, fmt.Errorf("no free volumes left for "+option.String())
+ return nil, fmt.Errorf("no free volumes left for " + option.String())
}
ms.vgLock.Lock()
if !ms.Topo.HasWritableVolume(option) {
@@ -122,11 +122,8 @@ func (ms *MasterServer) Statistics(ctx context.Context, req *master_pb.Statistic
volumeLayout := ms.Topo.GetVolumeLayout(req.Collection, replicaPlacement, ttl, types.ToDiskType(req.DiskType))
stats := volumeLayout.Stats()
-
- totalSize := ms.Topo.GetDiskUsages().GetMaxVolumeCount() * int64(ms.option.VolumeSizeLimitMB) * 1024 * 1024
-
resp := &master_pb.StatisticsResponse{
- TotalSize: uint64(totalSize),
+ TotalSize: stats.TotalSize,
UsedSize: stats.UsedSize,
FileCount: stats.FileCount,
}
diff --git a/weed/server/master_server.go b/weed/server/master_server.go
index 9404081b4..e2b2df18d 100644
--- a/weed/server/master_server.go
+++ b/weed/server/master_server.go
@@ -277,6 +277,13 @@ func (ms *MasterServer) createSequencer(option *MasterOption) sequence.Sequencer
glog.Error(err)
seq = nil
}
+ case "snowflake":
+ var err error
+ seq, err = sequence.NewSnowflakeSequencer(fmt.Sprintf("%s:%d", option.Host, option.Port))
+ if err != nil {
+ glog.Error(err)
+ seq = nil
+ }
default:
seq = sequence.NewMemorySequencer()
}
diff --git a/weed/server/volume_grpc_read_write.go b/weed/server/volume_grpc_read_write.go
new file mode 100644
index 000000000..988e9e145
--- /dev/null
+++ b/weed/server/volume_grpc_read_write.go
@@ -0,0 +1,38 @@
+package weed_server
+
+import (
+ "context"
+ "fmt"
+ "github.com/chrislusf/seaweedfs/weed/pb/volume_server_pb"
+ "github.com/chrislusf/seaweedfs/weed/storage/needle"
+ "github.com/chrislusf/seaweedfs/weed/storage/types"
+)
+
+func (vs *VolumeServer) ReadNeedleBlob(ctx context.Context, req *volume_server_pb.ReadNeedleBlobRequest) (resp *volume_server_pb.ReadNeedleBlobResponse, err error) {
+ resp = &volume_server_pb.ReadNeedleBlobResponse{}
+ v := vs.store.GetVolume(needle.VolumeId(req.VolumeId))
+ if v == nil {
+ return nil, fmt.Errorf("not found volume id %d", req.VolumeId)
+ }
+
+ resp.NeedleBlob, err = v.ReadNeedleBlob(req.Offset, types.Size(req.Size))
+ if err != nil {
+ return nil, fmt.Errorf("read needle blob offset %d size %d: %v", req.Offset, req.Size, err)
+ }
+
+ return resp, nil
+}
+
+func (vs *VolumeServer) WriteNeedleBlob(ctx context.Context, req *volume_server_pb.WriteNeedleBlobRequest) (resp *volume_server_pb.WriteNeedleBlobResponse, err error) {
+ resp = &volume_server_pb.WriteNeedleBlobResponse{}
+ v := vs.store.GetVolume(needle.VolumeId(req.VolumeId))
+ if v == nil {
+ return nil, fmt.Errorf("not found volume id %d", req.VolumeId)
+ }
+
+ if err = v.WriteNeedleBlob(types.NeedleId(req.NeedleId), req.NeedleBlob, types.Size(req.Size)); err != nil {
+ return nil, fmt.Errorf("write blob needle %d size %d: %v", req.NeedleId, req.Size, err)
+ }
+
+ return resp, nil
+}
diff --git a/weed/server/volume_server.go b/weed/server/volume_server.go
index e496b1ce2..e11d607a4 100644
--- a/weed/server/volume_server.go
+++ b/weed/server/volume_server.go
@@ -4,6 +4,7 @@ import (
"fmt"
"github.com/chrislusf/seaweedfs/weed/storage/types"
"net/http"
+ "sync"
"google.golang.org/grpc"
@@ -34,6 +35,10 @@ type VolumeServer struct {
fileSizeLimitBytes int64
isHeartbeating bool
stopChan chan bool
+
+ inFlightDataSize int64
+ inFlightDataLimitCond *sync.Cond
+ concurrentUploadLimit int64
}
func NewVolumeServer(adminMux, publicMux *http.ServeMux, ip string,
@@ -48,6 +53,7 @@ func NewVolumeServer(adminMux, publicMux *http.ServeMux, ip string,
readRedirect bool,
compactionMBPerSecond int,
fileSizeLimitMB int,
+ concurrentUploadLimit int64,
) *VolumeServer {
v := util.GetViper()
@@ -72,6 +78,8 @@ func NewVolumeServer(adminMux, publicMux *http.ServeMux, ip string,
fileSizeLimitBytes: int64(fileSizeLimitMB) * 1024 * 1024,
isHeartbeating: true,
stopChan: make(chan bool),
+ inFlightDataLimitCond: sync.NewCond(new(sync.Mutex)),
+ concurrentUploadLimit: concurrentUploadLimit,
}
vs.SeedMasterNodes = masterNodes
diff --git a/weed/server/volume_server_handlers.go b/weed/server/volume_server_handlers.go
index 7852c950a..4527add44 100644
--- a/weed/server/volume_server_handlers.go
+++ b/weed/server/volume_server_handlers.go
@@ -2,7 +2,9 @@ package weed_server
import (
"net/http"
+ "strconv"
"strings"
+ "sync/atomic"
"github.com/chrislusf/seaweedfs/weed/util"
@@ -40,8 +42,24 @@ func (vs *VolumeServer) privateStoreHandler(w http.ResponseWriter, r *http.Reque
stats.DeleteRequest()
vs.guard.WhiteList(vs.DeleteHandler)(w, r)
case "PUT", "POST":
+
+ // wait until in flight data is less than the limit
+ contentLength := getContentLength(r)
+ vs.inFlightDataLimitCond.L.Lock()
+ for atomic.LoadInt64(&vs.inFlightDataSize) > vs.concurrentUploadLimit {
+ vs.inFlightDataLimitCond.Wait()
+ }
+ atomic.AddInt64(&vs.inFlightDataSize, contentLength)
+ vs.inFlightDataLimitCond.L.Unlock()
+ defer func() {
+ atomic.AddInt64(&vs.inFlightDataSize, -contentLength)
+ vs.inFlightDataLimitCond.Signal()
+ }()
+
+ // processs uploads
stats.WriteRequest()
vs.guard.WhiteList(vs.PostHandler)(w, r)
+
case "OPTIONS":
stats.ReadRequest()
w.Header().Add("Access-Control-Allow-Methods", "PUT, POST, GET, DELETE, OPTIONS")
@@ -49,6 +67,18 @@ func (vs *VolumeServer) privateStoreHandler(w http.ResponseWriter, r *http.Reque
}
}
+func getContentLength(r *http.Request) int64 {
+ contentLength := r.Header.Get("Content-Length")
+ if contentLength != "" {
+ length, err := strconv.ParseInt(contentLength, 10, 64)
+ if err != nil {
+ return 0
+ }
+ return length
+ }
+ return 0
+}
+
func (vs *VolumeServer) publicReadOnlyHandler(w http.ResponseWriter, r *http.Request) {
w.Header().Set("Server", "SeaweedFS Volume "+util.VERSION)
if r.Header.Get("Origin") != "" {
diff --git a/weed/server/volume_server_handlers_read.go b/weed/server/volume_server_handlers_read.go
index 2db46ac9b..3e977cfd4 100644
--- a/weed/server/volume_server_handlers_read.go
+++ b/weed/server/volume_server_handlers_read.go
@@ -27,7 +27,7 @@ var fileNameEscaper = strings.NewReplacer(`\`, `\\`, `"`, `\"`)
func (vs *VolumeServer) GetOrHeadHandler(w http.ResponseWriter, r *http.Request) {
- // println(r.Method + " " + r.URL.Path)
+ glog.V(9).Info(r.Method + " " + r.URL.Path + " " + r.Header.Get("Range"))
stats.VolumeServerRequestCounter.WithLabelValues("get").Inc()
start := time.Now()
@@ -261,13 +261,10 @@ func writeResponseContent(filename, mimeType string, rs io.ReadSeeker, w http.Re
return nil
}
- processRangeRequest(r, w, totalSize, mimeType, func(writer io.Writer, offset int64, size int64, httpStatusCode int) error {
+ processRangeRequest(r, w, totalSize, mimeType, func(writer io.Writer, offset int64, size int64) error {
if _, e = rs.Seek(offset, 0); e != nil {
return e
}
- if httpStatusCode != 0 {
- w.WriteHeader(httpStatusCode)
- }
_, e = io.CopyN(writer, rs, size)
return e
})
diff --git a/weed/server/volume_server_tcp_handlers_write.go b/weed/server/volume_server_tcp_handlers_write.go
new file mode 100644
index 000000000..a009611da
--- /dev/null
+++ b/weed/server/volume_server_tcp_handlers_write.go
@@ -0,0 +1,137 @@
+package weed_server
+
+import (
+ "bufio"
+ "fmt"
+ "github.com/chrislusf/seaweedfs/weed/glog"
+ "github.com/chrislusf/seaweedfs/weed/storage/needle"
+ "github.com/chrislusf/seaweedfs/weed/util"
+ "io"
+ "net"
+ "strings"
+)
+
+func (vs *VolumeServer) HandleTcpConnection(c net.Conn) {
+ defer c.Close()
+
+ glog.V(0).Infof("Serving writes from %s", c.RemoteAddr().String())
+
+ bufReader := bufio.NewReaderSize(c, 1024*1024)
+ bufWriter := bufio.NewWriterSize(c, 1024*1024)
+
+ for {
+ cmd, err := bufReader.ReadString('\n')
+ if err != nil {
+ if err != io.EOF {
+ glog.Errorf("read command from %s: %v", c.RemoteAddr().String(), err)
+ }
+ return
+ }
+ cmd = cmd[:len(cmd)-1]
+ switch cmd[0] {
+ case '+':
+ fileId := cmd[1:]
+ err = vs.handleTcpPut(fileId, bufReader)
+ if err == nil {
+ bufWriter.Write([]byte("+OK\n"))
+ } else {
+ bufWriter.Write([]byte("-ERR " + string(err.Error()) + "\n"))
+ }
+ case '-':
+ fileId := cmd[1:]
+ err = vs.handleTcpDelete(fileId)
+ if err == nil {
+ bufWriter.Write([]byte("+OK\n"))
+ } else {
+ bufWriter.Write([]byte("-ERR " + string(err.Error()) + "\n"))
+ }
+ case '?':
+ fileId := cmd[1:]
+ err = vs.handleTcpGet(fileId, bufWriter)
+ case '!':
+ bufWriter.Flush()
+ }
+
+ }
+
+}
+
+func (vs *VolumeServer) handleTcpGet(fileId string, writer *bufio.Writer) (err error) {
+
+ volumeId, n, err2 := vs.parseFileId(fileId)
+ if err2 != nil {
+ return err2
+ }
+
+ volume := vs.store.GetVolume(volumeId)
+ if volume == nil {
+ return fmt.Errorf("volume %d not found", volumeId)
+ }
+
+ err = volume.StreamRead(n, writer)
+ if err != nil {
+ return err
+ }
+
+ return nil
+}
+
+func (vs *VolumeServer) handleTcpPut(fileId string, bufReader *bufio.Reader) (err error) {
+
+ volumeId, n, err2 := vs.parseFileId(fileId)
+ if err2 != nil {
+ return err2
+ }
+
+ volume := vs.store.GetVolume(volumeId)
+ if volume == nil {
+ return fmt.Errorf("volume %d not found", volumeId)
+ }
+
+ sizeBuf := make([]byte, 4)
+ if _, err = bufReader.Read(sizeBuf); err != nil {
+ return err
+ }
+ dataSize := util.BytesToUint32(sizeBuf)
+
+ err = volume.StreamWrite(n, bufReader, dataSize)
+ if err != nil {
+ return err
+ }
+
+ return nil
+}
+
+func (vs *VolumeServer) handleTcpDelete(fileId string) (err error) {
+
+ volumeId, n, err2 := vs.parseFileId(fileId)
+ if err2 != nil {
+ return err2
+ }
+
+ _, err = vs.store.DeleteVolumeNeedle(volumeId, n)
+ if err != nil {
+ return err
+ }
+
+ return nil
+}
+
+func (vs *VolumeServer) parseFileId(fileId string) (needle.VolumeId, *needle.Needle, error) {
+
+ commaIndex := strings.LastIndex(fileId, ",")
+ if commaIndex <= 0 {
+ return 0, nil, fmt.Errorf("unknown fileId %s", fileId)
+ }
+
+ vid, fid := fileId[0:commaIndex], fileId[commaIndex+1:]
+
+ volumeId, ve := needle.NewVolumeId(vid)
+ if ve != nil {
+ return 0, nil, fmt.Errorf("unknown volume id in fileId %s", fileId)
+ }
+
+ n := new(needle.Needle)
+ n.ParsePath(fid)
+ return volumeId, n, nil
+}