aboutsummaryrefslogtreecommitdiff
path: root/weed/server
diff options
context:
space:
mode:
Diffstat (limited to 'weed/server')
-rw-r--r--weed/server/common.go8
-rw-r--r--weed/server/filer_grpc_server.go2
-rw-r--r--weed/server/filer_grpc_server_rename.go27
-rw-r--r--weed/server/filer_server.go45
-rw-r--r--weed/server/filer_server_handlers.go39
-rw-r--r--weed/server/filer_server_handlers_read.go16
-rw-r--r--weed/server/filer_server_handlers_read_dir.go3
-rw-r--r--weed/server/filer_server_handlers_write.go4
-rw-r--r--weed/server/filer_server_handlers_write_autochunk.go135
-rw-r--r--weed/server/filer_server_handlers_write_upload.go105
-rw-r--r--weed/server/filer_ui/breadcrumb.go2
-rw-r--r--weed/server/filer_ui/templates.go2
-rw-r--r--weed/server/gateway_server.go106
-rw-r--r--weed/server/master_grpc_server.go6
-rw-r--r--weed/server/master_grpc_server_admin.go21
-rw-r--r--weed/server/master_server.go7
-rw-r--r--weed/server/master_server_handlers_ui.go12
-rw-r--r--weed/server/master_ui/templates.go10
-rw-r--r--weed/server/volume_grpc_read_write.go38
-rw-r--r--weed/server/volume_server.go8
-rw-r--r--weed/server/volume_server_handlers.go30
-rw-r--r--weed/server/volume_server_handlers_read.go7
-rw-r--r--weed/server/volume_server_handlers_write.go3
-rw-r--r--weed/server/volume_server_tcp_handlers_write.go137
-rw-r--r--weed/server/volume_server_ui/templates.go2
25 files changed, 582 insertions, 193 deletions
diff --git a/weed/server/common.go b/weed/server/common.go
index 9001a3b33..5c5f1b8eb 100644
--- a/weed/server/common.go
+++ b/weed/server/common.go
@@ -234,12 +234,12 @@ func adjustHeaderContentDisposition(w http.ResponseWriter, r *http.Request, file
}
}
-func processRangeRequest(r *http.Request, w http.ResponseWriter, totalSize int64, mimeType string, writeFn func(writer io.Writer, offset int64, size int64, httpStatusCode int) error) {
+func processRangeRequest(r *http.Request, w http.ResponseWriter, totalSize int64, mimeType string, writeFn func(writer io.Writer, offset int64, size int64) error) {
rangeReq := r.Header.Get("Range")
if rangeReq == "" {
w.Header().Set("Content-Length", strconv.FormatInt(totalSize, 10))
- if err := writeFn(w, 0, totalSize, 0); err != nil {
+ if err := writeFn(w, 0, totalSize); err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}
@@ -279,7 +279,7 @@ func processRangeRequest(r *http.Request, w http.ResponseWriter, totalSize int64
w.Header().Set("Content-Length", strconv.FormatInt(ra.length, 10))
w.Header().Set("Content-Range", ra.contentRange(totalSize))
- err = writeFn(w, ra.start, ra.length, http.StatusPartialContent)
+ err = writeFn(w, ra.start, ra.length)
if err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError)
return
@@ -307,7 +307,7 @@ func processRangeRequest(r *http.Request, w http.ResponseWriter, totalSize int64
pw.CloseWithError(e)
return
}
- if e = writeFn(part, ra.start, ra.length, 0); e != nil {
+ if e = writeFn(part, ra.start, ra.length); e != nil {
pw.CloseWithError(e)
return
}
diff --git a/weed/server/filer_grpc_server.go b/weed/server/filer_grpc_server.go
index a4bb721ef..3821de6a9 100644
--- a/weed/server/filer_grpc_server.go
+++ b/weed/server/filer_grpc_server.go
@@ -63,7 +63,7 @@ func (fs *FilerServer) ListEntries(req *filer_pb.ListEntriesRequest, stream file
var listErr error
for limit > 0 {
var hasEntries bool
- lastFileName, listErr = fs.filer.StreamListDirectoryEntries(stream.Context(), util.FullPath(req.Directory), lastFileName, includeLastFile, int64(paginationLimit), req.Prefix, "", func(entry *filer.Entry) bool {
+ lastFileName, listErr = fs.filer.StreamListDirectoryEntries(stream.Context(), util.FullPath(req.Directory), lastFileName, includeLastFile, int64(paginationLimit), req.Prefix, "", "", func(entry *filer.Entry) bool {
hasEntries = true
if err = stream.Send(&filer_pb.ListEntriesResponse{
Entry: &filer_pb.Entry{
diff --git a/weed/server/filer_grpc_server_rename.go b/weed/server/filer_grpc_server_rename.go
index 5b68b64de..eadb970d5 100644
--- a/weed/server/filer_grpc_server_rename.go
+++ b/weed/server/filer_grpc_server_rename.go
@@ -33,8 +33,7 @@ func (fs *FilerServer) AtomicRenameEntry(ctx context.Context, req *filer_pb.Atom
return nil, fmt.Errorf("%s/%s not found: %v", req.OldDirectory, req.OldName, err)
}
- var events MoveEvents
- moveErr := fs.moveEntry(ctx, oldParent, oldEntry, newParent, req.NewName, &events)
+ moveErr := fs.moveEntry(ctx, oldParent, oldEntry, newParent, req.NewName)
if moveErr != nil {
fs.filer.RollbackTransaction(ctx)
return nil, fmt.Errorf("%s/%s move error: %v", req.OldDirectory, req.OldName, moveErr)
@@ -48,11 +47,11 @@ func (fs *FilerServer) AtomicRenameEntry(ctx context.Context, req *filer_pb.Atom
return &filer_pb.AtomicRenameEntryResponse{}, nil
}
-func (fs *FilerServer) moveEntry(ctx context.Context, oldParent util.FullPath, entry *filer.Entry, newParent util.FullPath, newName string, events *MoveEvents) error {
+func (fs *FilerServer) moveEntry(ctx context.Context, oldParent util.FullPath, entry *filer.Entry, newParent util.FullPath, newName string) error {
- if err := fs.moveSelfEntry(ctx, oldParent, entry, newParent, newName, events, func() error {
+ if err := fs.moveSelfEntry(ctx, oldParent, entry, newParent, newName, func() error {
if entry.IsDirectory() {
- if err := fs.moveFolderSubEntries(ctx, oldParent, entry, newParent, newName, events); err != nil {
+ if err := fs.moveFolderSubEntries(ctx, oldParent, entry, newParent, newName); err != nil {
return err
}
}
@@ -64,7 +63,7 @@ func (fs *FilerServer) moveEntry(ctx context.Context, oldParent util.FullPath, e
return nil
}
-func (fs *FilerServer) moveFolderSubEntries(ctx context.Context, oldParent util.FullPath, entry *filer.Entry, newParent util.FullPath, newName string, events *MoveEvents) error {
+func (fs *FilerServer) moveFolderSubEntries(ctx context.Context, oldParent util.FullPath, entry *filer.Entry, newParent util.FullPath, newName string) error {
currentDirPath := oldParent.Child(entry.Name())
newDirPath := newParent.Child(newName)
@@ -75,7 +74,7 @@ func (fs *FilerServer) moveFolderSubEntries(ctx context.Context, oldParent util.
includeLastFile := false
for {
- entries, hasMore, err := fs.filer.ListDirectoryEntries(ctx, currentDirPath, lastFileName, includeLastFile, 1024, "", "")
+ entries, hasMore, err := fs.filer.ListDirectoryEntries(ctx, currentDirPath, lastFileName, includeLastFile, 1024, "", "", "")
if err != nil {
return err
}
@@ -85,7 +84,7 @@ func (fs *FilerServer) moveFolderSubEntries(ctx context.Context, oldParent util.
for _, item := range entries {
lastFileName = item.Name()
// println("processing", lastFileName)
- err := fs.moveEntry(ctx, currentDirPath, item, newDirPath, item.Name(), events)
+ err := fs.moveEntry(ctx, currentDirPath, item, newDirPath, item.Name())
if err != nil {
return err
}
@@ -97,8 +96,7 @@ func (fs *FilerServer) moveFolderSubEntries(ctx context.Context, oldParent util.
return nil
}
-func (fs *FilerServer) moveSelfEntry(ctx context.Context, oldParent util.FullPath, entry *filer.Entry, newParent util.FullPath, newName string, events *MoveEvents,
- moveFolderSubEntries func() error) error {
+func (fs *FilerServer) moveSelfEntry(ctx context.Context, oldParent util.FullPath, entry *filer.Entry, newParent util.FullPath, newName string, moveFolderSubEntries func() error) error {
oldPath, newPath := oldParent.Child(entry.Name()), newParent.Child(newName)
@@ -122,8 +120,6 @@ func (fs *FilerServer) moveSelfEntry(ctx context.Context, oldParent util.FullPat
return createErr
}
- events.newEntries = append(events.newEntries, newEntry)
-
if moveFolderSubEntries != nil {
if moveChildrenErr := moveFolderSubEntries(); moveChildrenErr != nil {
return moveChildrenErr
@@ -136,13 +132,6 @@ func (fs *FilerServer) moveSelfEntry(ctx context.Context, oldParent util.FullPat
return deleteErr
}
- events.oldEntries = append(events.oldEntries, entry)
-
return nil
}
-
-type MoveEvents struct {
- oldEntries []*filer.Entry
- newEntries []*filer.Entry
-}
diff --git a/weed/server/filer_server.go b/weed/server/filer_server.go
index 22474a5e2..2734223ea 100644
--- a/weed/server/filer_server.go
+++ b/weed/server/filer_server.go
@@ -45,22 +45,23 @@ import (
)
type FilerOption struct {
- Masters []string
- Collection string
- DefaultReplication string
- DisableDirListing bool
- MaxMB int
- DirListingLimit int
- DataCenter string
- Rack string
- DefaultLevelDbDir string
- DisableHttp bool
- Host string
- Port uint32
- recursiveDelete bool
- Cipher bool
- SaveToFilerLimit int
- Filers []string
+ Masters []string
+ Collection string
+ DefaultReplication string
+ DisableDirListing bool
+ MaxMB int
+ DirListingLimit int
+ DataCenter string
+ Rack string
+ DefaultLevelDbDir string
+ DisableHttp bool
+ Host string
+ Port uint32
+ recursiveDelete bool
+ Cipher bool
+ SaveToFilerLimit int64
+ Filers []string
+ ConcurrentUploadLimit int64
}
type FilerServer struct {
@@ -79,14 +80,18 @@ type FilerServer struct {
brokers map[string]map[string]bool
brokersLock sync.Mutex
+
+ inFlightDataSize int64
+ inFlightDataLimitCond *sync.Cond
}
func NewFilerServer(defaultMux, readonlyMux *http.ServeMux, option *FilerOption) (fs *FilerServer, err error) {
fs = &FilerServer{
- option: option,
- grpcDialOption: security.LoadClientTLS(util.GetViper(), "grpc.filer"),
- brokers: make(map[string]map[string]bool),
+ option: option,
+ grpcDialOption: security.LoadClientTLS(util.GetViper(), "grpc.filer"),
+ brokers: make(map[string]map[string]bool),
+ inFlightDataLimitCond: sync.NewCond(new(sync.Mutex)),
}
fs.listenersCond = sync.NewCond(&fs.listenersLock)
@@ -153,7 +158,7 @@ func NewFilerServer(defaultMux, readonlyMux *http.ServeMux, option *FilerOption)
func (fs *FilerServer) checkWithMaster() {
for _, master := range fs.option.Masters {
- _, err := pb.ParseFilerGrpcAddress(master)
+ _, err := pb.ParseServerToGrpcAddress(master)
if err != nil {
glog.Fatalf("invalid master address %s: %v", master, err)
}
diff --git a/weed/server/filer_server_handlers.go b/weed/server/filer_server_handlers.go
index 3bc0c5d0d..ed6bbb6f6 100644
--- a/weed/server/filer_server_handlers.go
+++ b/weed/server/filer_server_handlers.go
@@ -4,6 +4,7 @@ import (
"github.com/chrislusf/seaweedfs/weed/util"
"net/http"
"strings"
+ "sync/atomic"
"time"
"github.com/chrislusf/seaweedfs/weed/stats"
@@ -47,18 +48,34 @@ func (fs *FilerServer) filerHandler(w http.ResponseWriter, r *http.Request) {
fs.DeleteHandler(w, r)
}
stats.FilerRequestHistogram.WithLabelValues("delete").Observe(time.Since(start).Seconds())
- case "PUT":
- stats.FilerRequestCounter.WithLabelValues("put").Inc()
- if _, ok := r.URL.Query()["tagging"]; ok {
- fs.PutTaggingHandler(w, r)
- } else {
- fs.PostHandler(w, r)
+ case "POST", "PUT":
+
+ // wait until in flight data is less than the limit
+ contentLength := getContentLength(r)
+ fs.inFlightDataLimitCond.L.Lock()
+ for atomic.LoadInt64(&fs.inFlightDataSize) > fs.option.ConcurrentUploadLimit {
+ fs.inFlightDataLimitCond.Wait()
+ }
+ atomic.AddInt64(&fs.inFlightDataSize, contentLength)
+ fs.inFlightDataLimitCond.L.Unlock()
+ defer func() {
+ atomic.AddInt64(&fs.inFlightDataSize, -contentLength)
+ fs.inFlightDataLimitCond.Signal()
+ }()
+
+ if r.Method == "PUT" {
+ stats.FilerRequestCounter.WithLabelValues("put").Inc()
+ if _, ok := r.URL.Query()["tagging"]; ok {
+ fs.PutTaggingHandler(w, r)
+ } else {
+ fs.PostHandler(w, r, contentLength)
+ }
+ stats.FilerRequestHistogram.WithLabelValues("put").Observe(time.Since(start).Seconds())
+ } else { // method == "POST"
+ stats.FilerRequestCounter.WithLabelValues("post").Inc()
+ fs.PostHandler(w, r, contentLength)
+ stats.FilerRequestHistogram.WithLabelValues("post").Observe(time.Since(start).Seconds())
}
- stats.FilerRequestHistogram.WithLabelValues("put").Observe(time.Since(start).Seconds())
- case "POST":
- stats.FilerRequestCounter.WithLabelValues("post").Inc()
- fs.PostHandler(w, r)
- stats.FilerRequestHistogram.WithLabelValues("post").Observe(time.Since(start).Seconds())
case "OPTIONS":
stats.FilerRequestCounter.WithLabelValues("options").Inc()
OptionsHandler(w, r, false)
diff --git a/weed/server/filer_server_handlers_read.go b/weed/server/filer_server_handlers_read.go
index e210f4bdf..6bc09e953 100644
--- a/weed/server/filer_server_handlers_read.go
+++ b/weed/server/filer_server_handlers_read.go
@@ -79,7 +79,7 @@ func (fs *FilerServer) GetOrHeadHandler(w http.ResponseWriter, r *http.Request,
w.Header().Set("Last-Modified", entry.Attr.Mtime.UTC().Format(http.TimeFormat))
if r.Header.Get("If-Modified-Since") != "" {
if t, parseError := time.Parse(http.TimeFormat, r.Header.Get("If-Modified-Since")); parseError == nil {
- if t.After(entry.Attr.Mtime) {
+ if !t.Before(entry.Attr.Mtime) {
w.WriteHeader(http.StatusNotModified)
return
}
@@ -131,6 +131,9 @@ func (fs *FilerServer) GetOrHeadHandler(w http.ResponseWriter, r *http.Request,
if r.Method == "HEAD" {
w.Header().Set("Content-Length", strconv.FormatInt(totalSize, 10))
+ processRangeRequest(r, w, totalSize, mimeType, func(writer io.Writer, offset int64, size int64) error {
+ return filer.StreamContent(fs.filer.MasterClient, writer, entry.Chunks, offset, size, true)
+ })
return
}
@@ -150,16 +153,15 @@ func (fs *FilerServer) GetOrHeadHandler(w http.ResponseWriter, r *http.Request,
}
}
- processRangeRequest(r, w, totalSize, mimeType, func(writer io.Writer, offset int64, size int64, httpStatusCode int) error {
- if httpStatusCode != 0 {
- w.WriteHeader(httpStatusCode)
- }
+ processRangeRequest(r, w, totalSize, mimeType, func(writer io.Writer, offset int64, size int64) error {
if offset+size <= int64(len(entry.Content)) {
_, err := writer.Write(entry.Content[offset : offset+size])
- glog.Errorf("failed to write entry content: %v", err)
+ if err != nil {
+ glog.Errorf("failed to write entry content: %v", err)
+ }
return err
}
- return filer.StreamContent(fs.filer.MasterClient, writer, entry.Chunks, offset, size)
+ return filer.StreamContent(fs.filer.MasterClient, writer, entry.Chunks, offset, size, false)
})
}
diff --git a/weed/server/filer_server_handlers_read_dir.go b/weed/server/filer_server_handlers_read_dir.go
index 9cf79ab41..307c411b6 100644
--- a/weed/server/filer_server_handlers_read_dir.go
+++ b/weed/server/filer_server_handlers_read_dir.go
@@ -35,8 +35,9 @@ func (fs *FilerServer) listDirectoryHandler(w http.ResponseWriter, r *http.Reque
lastFileName := r.FormValue("lastFileName")
namePattern := r.FormValue("namePattern")
+ namePatternExclude := r.FormValue("namePatternExclude")
- entries, shouldDisplayLoadMore, err := fs.filer.ListDirectoryEntries(context.Background(), util.FullPath(path), lastFileName, false, int64(limit), "", namePattern)
+ entries, shouldDisplayLoadMore, err := fs.filer.ListDirectoryEntries(context.Background(), util.FullPath(path), lastFileName, false, int64(limit), "", namePattern, namePatternExclude)
if err != nil {
glog.V(0).Infof("listDirectory %s %s %d: %s", path, lastFileName, limit, err)
diff --git a/weed/server/filer_server_handlers_write.go b/weed/server/filer_server_handlers_write.go
index 0ce3e4a58..95eba9d3d 100644
--- a/weed/server/filer_server_handlers_write.go
+++ b/weed/server/filer_server_handlers_write.go
@@ -52,7 +52,7 @@ func (fs *FilerServer) assignNewFileInfo(so *operation.StorageOption) (fileId, u
return
}
-func (fs *FilerServer) PostHandler(w http.ResponseWriter, r *http.Request) {
+func (fs *FilerServer) PostHandler(w http.ResponseWriter, r *http.Request, contentLength int64) {
ctx := context.Background()
@@ -66,7 +66,7 @@ func (fs *FilerServer) PostHandler(w http.ResponseWriter, r *http.Request) {
query.Get("rack"),
)
- fs.autoChunk(ctx, w, r, so)
+ fs.autoChunk(ctx, w, r, contentLength, so)
util.CloseRequest(r)
}
diff --git a/weed/server/filer_server_handlers_write_autochunk.go b/weed/server/filer_server_handlers_write_autochunk.go
index d3ce7e605..c4f10d94e 100644
--- a/weed/server/filer_server_handlers_write_autochunk.go
+++ b/weed/server/filer_server_handlers_write_autochunk.go
@@ -2,11 +2,8 @@ package weed_server
import (
"context"
- "crypto/md5"
"fmt"
- "hash"
"io"
- "io/ioutil"
"net/http"
"os"
"path"
@@ -19,13 +16,12 @@ import (
"github.com/chrislusf/seaweedfs/weed/operation"
"github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
xhttp "github.com/chrislusf/seaweedfs/weed/s3api/http"
- "github.com/chrislusf/seaweedfs/weed/security"
"github.com/chrislusf/seaweedfs/weed/stats"
"github.com/chrislusf/seaweedfs/weed/storage/needle"
"github.com/chrislusf/seaweedfs/weed/util"
)
-func (fs *FilerServer) autoChunk(ctx context.Context, w http.ResponseWriter, r *http.Request, so *operation.StorageOption) {
+func (fs *FilerServer) autoChunk(ctx context.Context, w http.ResponseWriter, r *http.Request, contentLength int64, so *operation.StorageOption) {
// autoChunking can be set at the command-line level or as a query param. Query param overrides command-line
query := r.URL.Query()
@@ -38,10 +34,10 @@ func (fs *FilerServer) autoChunk(ctx context.Context, w http.ResponseWriter, r *
chunkSize := 1024 * 1024 * maxMB
- stats.FilerRequestCounter.WithLabelValues("postAutoChunk").Inc()
+ stats.FilerRequestCounter.WithLabelValues("chunk").Inc()
start := time.Now()
defer func() {
- stats.FilerRequestHistogram.WithLabelValues("postAutoChunk").Observe(time.Since(start).Seconds())
+ stats.FilerRequestHistogram.WithLabelValues("chunk").Observe(time.Since(start).Seconds())
}()
var reply *FilerPostResult
@@ -51,14 +47,16 @@ func (fs *FilerServer) autoChunk(ctx context.Context, w http.ResponseWriter, r *
if r.Header.Get("Content-Type") == "" && strings.HasSuffix(r.URL.Path, "/") {
reply, err = fs.mkdir(ctx, w, r)
} else {
- reply, md5bytes, err = fs.doPostAutoChunk(ctx, w, r, chunkSize, so)
+ reply, md5bytes, err = fs.doPostAutoChunk(ctx, w, r, chunkSize, contentLength, so)
}
} else {
- reply, md5bytes, err = fs.doPutAutoChunk(ctx, w, r, chunkSize, so)
+ reply, md5bytes, err = fs.doPutAutoChunk(ctx, w, r, chunkSize, contentLength, so)
}
if err != nil {
if strings.HasPrefix(err.Error(), "read input:") {
writeJsonError(w, r, 499, err)
+ } else if strings.HasSuffix(err.Error(), "is a file") {
+ writeJsonError(w, r, http.StatusConflict, err)
} else {
writeJsonError(w, r, http.StatusInternalServerError, err)
}
@@ -70,7 +68,7 @@ func (fs *FilerServer) autoChunk(ctx context.Context, w http.ResponseWriter, r *
}
}
-func (fs *FilerServer) doPostAutoChunk(ctx context.Context, w http.ResponseWriter, r *http.Request, chunkSize int32, so *operation.StorageOption) (filerResult *FilerPostResult, md5bytes []byte, replyerr error) {
+func (fs *FilerServer) doPostAutoChunk(ctx context.Context, w http.ResponseWriter, r *http.Request, chunkSize int32, contentLength int64, so *operation.StorageOption) (filerResult *FilerPostResult, md5bytes []byte, replyerr error) {
multipartReader, multipartReaderErr := r.MultipartReader()
if multipartReaderErr != nil {
@@ -91,7 +89,7 @@ func (fs *FilerServer) doPostAutoChunk(ctx context.Context, w http.ResponseWrite
contentType = ""
}
- fileChunks, md5Hash, chunkOffset, err, smallContent := fs.uploadReaderToChunks(w, r, part1, chunkSize, fileName, contentType, so)
+ fileChunks, md5Hash, chunkOffset, err, smallContent := fs.uploadReaderToChunks(w, r, part1, chunkSize, fileName, contentType, contentLength, so)
if err != nil {
return nil, nil, err
}
@@ -102,7 +100,7 @@ func (fs *FilerServer) doPostAutoChunk(ctx context.Context, w http.ResponseWrite
return
}
-func (fs *FilerServer) doPutAutoChunk(ctx context.Context, w http.ResponseWriter, r *http.Request, chunkSize int32, so *operation.StorageOption) (filerResult *FilerPostResult, md5bytes []byte, replyerr error) {
+func (fs *FilerServer) doPutAutoChunk(ctx context.Context, w http.ResponseWriter, r *http.Request, chunkSize int32, contentLength int64, so *operation.StorageOption) (filerResult *FilerPostResult, md5bytes []byte, replyerr error) {
fileName := path.Base(r.URL.Path)
contentType := r.Header.Get("Content-Type")
@@ -110,7 +108,7 @@ func (fs *FilerServer) doPutAutoChunk(ctx context.Context, w http.ResponseWriter
contentType = ""
}
- fileChunks, md5Hash, chunkOffset, err, smallContent := fs.uploadReaderToChunks(w, r, r.Body, chunkSize, fileName, contentType, so)
+ fileChunks, md5Hash, chunkOffset, err, smallContent := fs.uploadReaderToChunks(w, r, r.Body, chunkSize, fileName, contentType, contentLength, so)
if err != nil {
return nil, nil, err
}
@@ -144,6 +142,14 @@ func (fs *FilerServer) saveMetaData(ctx context.Context, r *http.Request, fileNa
if fileName != "" {
path += fileName
}
+ } else {
+ if fileName != "" {
+ if possibleDirEntry, findDirErr := fs.filer.FindEntry(ctx, util.FullPath(path)); findDirErr == nil {
+ if possibleDirEntry.IsDirectory() {
+ path += "/" + fileName
+ }
+ }
+ }
}
var entry *filer.Entry
@@ -212,7 +218,7 @@ func (fs *FilerServer) saveMetaData(ctx context.Context, r *http.Request, fileNa
entry.Extended = make(map[string][]byte)
}
- fs.saveAmzMetaData(r, entry)
+ SaveAmzMetaData(r, entry.Extended, false)
for k, v := range r.Header {
if len(v) > 0 && strings.HasPrefix(k, needle.PairNamePrefix) {
@@ -229,89 +235,6 @@ func (fs *FilerServer) saveMetaData(ctx context.Context, r *http.Request, fileNa
return filerResult, replyerr
}
-func (fs *FilerServer) uploadReaderToChunks(w http.ResponseWriter, r *http.Request, reader io.Reader, chunkSize int32, fileName, contentType string, so *operation.StorageOption) ([]*filer_pb.FileChunk, hash.Hash, int64, error, []byte) {
- var fileChunks []*filer_pb.FileChunk
-
- md5Hash := md5.New()
- var partReader = ioutil.NopCloser(io.TeeReader(reader, md5Hash))
-
- chunkOffset := int64(0)
- var smallContent []byte
-
- for {
- limitedReader := io.LimitReader(partReader, int64(chunkSize))
-
- data, err := ioutil.ReadAll(limitedReader)
- if err != nil {
- return nil, nil, 0, err, nil
- }
- if chunkOffset == 0 && !isAppend(r) {
- if len(data) < fs.option.SaveToFilerLimit || strings.HasPrefix(r.URL.Path, filer.DirectoryEtcRoot) && len(data) < 4*1024 {
- smallContent = data
- chunkOffset += int64(len(data))
- break
- }
- }
- dataReader := util.NewBytesReader(data)
-
- // retry to assign a different file id
- var fileId, urlLocation string
- var auth security.EncodedJwt
- var assignErr, uploadErr error
- var uploadResult *operation.UploadResult
- for i := 0; i < 3; i++ {
- // assign one file id for one chunk
- fileId, urlLocation, auth, assignErr = fs.assignNewFileInfo(so)
- if assignErr != nil {
- return nil, nil, 0, assignErr, nil
- }
-
- // upload the chunk to the volume server
- uploadResult, uploadErr, _ = fs.doUpload(urlLocation, w, r, dataReader, fileName, contentType, nil, auth)
- if uploadErr != nil {
- time.Sleep(251 * time.Millisecond)
- continue
- }
- break
- }
- if uploadErr != nil {
- return nil, nil, 0, uploadErr, nil
- }
-
- // if last chunk exhausted the reader exactly at the border
- if uploadResult.Size == 0 {
- break
- }
-
- // Save to chunk manifest structure
- fileChunks = append(fileChunks, uploadResult.ToPbFileChunk(fileId, chunkOffset))
-
- glog.V(4).Infof("uploaded %s chunk %d to %s [%d,%d)", fileName, len(fileChunks), fileId, chunkOffset, chunkOffset+int64(uploadResult.Size))
-
- // reset variables for the next chunk
- chunkOffset = chunkOffset + int64(uploadResult.Size)
-
- // if last chunk was not at full chunk size, but already exhausted the reader
- if int64(uploadResult.Size) < int64(chunkSize) {
- break
- }
- }
-
- return fileChunks, md5Hash, chunkOffset, nil, smallContent
-}
-
-func (fs *FilerServer) doUpload(urlLocation string, w http.ResponseWriter, r *http.Request, limitedReader io.Reader, fileName string, contentType string, pairMap map[string]string, auth security.EncodedJwt) (*operation.UploadResult, error, []byte) {
-
- stats.FilerRequestCounter.WithLabelValues("postAutoChunkUpload").Inc()
- start := time.Now()
- defer func() {
- stats.FilerRequestHistogram.WithLabelValues("postAutoChunkUpload").Observe(time.Since(start).Seconds())
- }()
-
- uploadResult, err, data := operation.Upload(urlLocation, fileName, fs.option.Cipher, limitedReader, false, contentType, pairMap, auth)
- return uploadResult, err, data
-}
-
func (fs *FilerServer) saveAsChunk(so *operation.StorageOption) filer.SaveDataAsChunkFunctionType {
return func(reader io.Reader, name string, offset int64) (*filer_pb.FileChunk, string, string, error) {
@@ -380,17 +303,24 @@ func (fs *FilerServer) mkdir(ctx context.Context, w http.ResponseWriter, r *http
return filerResult, replyerr
}
-func (fs *FilerServer) saveAmzMetaData(r *http.Request, entry *filer.Entry) {
+func SaveAmzMetaData(r *http.Request, existing map[string][]byte, isReplace bool) (metadata map[string][]byte) {
+
+ metadata = make(map[string][]byte)
+ if !isReplace {
+ for k, v := range existing {
+ metadata[k] = v
+ }
+ }
if sc := r.Header.Get(xhttp.AmzStorageClass); sc != "" {
- entry.Extended[xhttp.AmzStorageClass] = []byte(sc)
+ metadata[xhttp.AmzStorageClass] = []byte(sc)
}
if tags := r.Header.Get(xhttp.AmzObjectTagging); tags != "" {
for _, v := range strings.Split(tags, "&") {
tag := strings.Split(v, "=")
if len(tag) == 2 {
- entry.Extended[xhttp.AmzObjectTagging+"-"+tag[0]] = []byte(tag[1])
+ metadata[xhttp.AmzObjectTagging+"-"+tag[0]] = []byte(tag[1])
}
}
}
@@ -398,8 +328,11 @@ func (fs *FilerServer) saveAmzMetaData(r *http.Request, entry *filer.Entry) {
for header, values := range r.Header {
if strings.HasPrefix(header, xhttp.AmzUserMetaPrefix) {
for _, value := range values {
- entry.Extended[header] = []byte(value)
+ metadata[header] = []byte(value)
}
}
}
+
+ return
+
}
diff --git a/weed/server/filer_server_handlers_write_upload.go b/weed/server/filer_server_handlers_write_upload.go
new file mode 100644
index 000000000..3ab45453e
--- /dev/null
+++ b/weed/server/filer_server_handlers_write_upload.go
@@ -0,0 +1,105 @@
+package weed_server
+
+import (
+ "crypto/md5"
+ "hash"
+ "io"
+ "io/ioutil"
+ "net/http"
+ "strings"
+ "time"
+
+ "github.com/chrislusf/seaweedfs/weed/filer"
+ "github.com/chrislusf/seaweedfs/weed/glog"
+ "github.com/chrislusf/seaweedfs/weed/operation"
+ "github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
+ "github.com/chrislusf/seaweedfs/weed/security"
+ "github.com/chrislusf/seaweedfs/weed/stats"
+ "github.com/chrislusf/seaweedfs/weed/util"
+)
+
+func (fs *FilerServer) uploadReaderToChunks(w http.ResponseWriter, r *http.Request, reader io.Reader, chunkSize int32, fileName, contentType string, contentLength int64, so *operation.StorageOption) ([]*filer_pb.FileChunk, hash.Hash, int64, error, []byte) {
+ var fileChunks []*filer_pb.FileChunk
+
+ md5Hash := md5.New()
+ var partReader = ioutil.NopCloser(io.TeeReader(reader, md5Hash))
+
+ chunkOffset := int64(0)
+ var smallContent []byte
+
+ for {
+ limitedReader := io.LimitReader(partReader, int64(chunkSize))
+
+ data, err := ioutil.ReadAll(limitedReader)
+ if err != nil {
+ return nil, nil, 0, err, nil
+ }
+ if chunkOffset == 0 && !isAppend(r) {
+ if len(data) < int(fs.option.SaveToFilerLimit) || strings.HasPrefix(r.URL.Path, filer.DirectoryEtcRoot) && len(data) < 4*1024 {
+ smallContent = data
+ chunkOffset += int64(len(data))
+ break
+ }
+ }
+ dataReader := util.NewBytesReader(data)
+
+ // retry to assign a different file id
+ var fileId, urlLocation string
+ var auth security.EncodedJwt
+ var assignErr, uploadErr error
+ var uploadResult *operation.UploadResult
+ for i := 0; i < 3; i++ {
+ // assign one file id for one chunk
+ fileId, urlLocation, auth, assignErr = fs.assignNewFileInfo(so)
+ if assignErr != nil {
+ return nil, nil, 0, assignErr, nil
+ }
+
+ // upload the chunk to the volume server
+ uploadResult, uploadErr, _ = fs.doUpload(urlLocation, w, r, dataReader, fileName, contentType, nil, auth)
+ if uploadErr != nil {
+ time.Sleep(251 * time.Millisecond)
+ continue
+ }
+ break
+ }
+ if uploadErr != nil {
+ return nil, nil, 0, uploadErr, nil
+ }
+
+ // if last chunk exhausted the reader exactly at the border
+ if uploadResult.Size == 0 {
+ break
+ }
+
+ // Save to chunk manifest structure
+ fileChunks = append(fileChunks, uploadResult.ToPbFileChunk(fileId, chunkOffset))
+
+ glog.V(4).Infof("uploaded %s chunk %d to %s [%d,%d)", fileName, len(fileChunks), fileId, chunkOffset, chunkOffset+int64(uploadResult.Size))
+
+ // reset variables for the next chunk
+ chunkOffset = chunkOffset + int64(uploadResult.Size)
+
+ // if last chunk was not at full chunk size, but already exhausted the reader
+ if int64(uploadResult.Size) < int64(chunkSize) {
+ break
+ }
+ }
+
+ return fileChunks, md5Hash, chunkOffset, nil, smallContent
+}
+
+func (fs *FilerServer) doUpload(urlLocation string, w http.ResponseWriter, r *http.Request, limitedReader io.Reader, fileName string, contentType string, pairMap map[string]string, auth security.EncodedJwt) (*operation.UploadResult, error, []byte) {
+
+ stats.FilerRequestCounter.WithLabelValues("chunkUpload").Inc()
+ start := time.Now()
+ defer func() {
+ stats.FilerRequestHistogram.WithLabelValues("chunkUpload").Observe(time.Since(start).Seconds())
+ }()
+
+ uploadResult, err, data := operation.Upload(urlLocation, fileName, fs.option.Cipher, limitedReader, false, contentType, pairMap, auth)
+ if uploadResult != nil && uploadResult.RetryCount > 0 {
+ stats.FilerRequestCounter.WithLabelValues("chunkUploadRetry").Add(float64(uploadResult.RetryCount))
+ }
+ return uploadResult, err, data
+}
diff --git a/weed/server/filer_ui/breadcrumb.go b/weed/server/filer_ui/breadcrumb.go
index f21cce7d1..5016117a8 100644
--- a/weed/server/filer_ui/breadcrumb.go
+++ b/weed/server/filer_ui/breadcrumb.go
@@ -1,4 +1,4 @@
-package master_ui
+package filer_ui
import (
"strings"
diff --git a/weed/server/filer_ui/templates.go b/weed/server/filer_ui/templates.go
index 3f0647119..648b97f22 100644
--- a/weed/server/filer_ui/templates.go
+++ b/weed/server/filer_ui/templates.go
@@ -1,4 +1,4 @@
-package master_ui
+package filer_ui
import (
"github.com/dustin/go-humanize"
diff --git a/weed/server/gateway_server.go b/weed/server/gateway_server.go
new file mode 100644
index 000000000..608217ed7
--- /dev/null
+++ b/weed/server/gateway_server.go
@@ -0,0 +1,106 @@
+package weed_server
+
+import (
+ "github.com/chrislusf/seaweedfs/weed/operation"
+ "google.golang.org/grpc"
+ "math/rand"
+ "net/http"
+
+ "github.com/chrislusf/seaweedfs/weed/util"
+
+ _ "github.com/chrislusf/seaweedfs/weed/filer/cassandra"
+ _ "github.com/chrislusf/seaweedfs/weed/filer/elastic/v7"
+ _ "github.com/chrislusf/seaweedfs/weed/filer/etcd"
+ _ "github.com/chrislusf/seaweedfs/weed/filer/hbase"
+ _ "github.com/chrislusf/seaweedfs/weed/filer/leveldb"
+ _ "github.com/chrislusf/seaweedfs/weed/filer/leveldb2"
+ _ "github.com/chrislusf/seaweedfs/weed/filer/leveldb3"
+ _ "github.com/chrislusf/seaweedfs/weed/filer/mongodb"
+ _ "github.com/chrislusf/seaweedfs/weed/filer/mysql"
+ _ "github.com/chrislusf/seaweedfs/weed/filer/mysql2"
+ _ "github.com/chrislusf/seaweedfs/weed/filer/postgres"
+ _ "github.com/chrislusf/seaweedfs/weed/filer/postgres2"
+ _ "github.com/chrislusf/seaweedfs/weed/filer/redis"
+ _ "github.com/chrislusf/seaweedfs/weed/filer/redis2"
+ "github.com/chrislusf/seaweedfs/weed/glog"
+ _ "github.com/chrislusf/seaweedfs/weed/notification/aws_sqs"
+ _ "github.com/chrislusf/seaweedfs/weed/notification/gocdk_pub_sub"
+ _ "github.com/chrislusf/seaweedfs/weed/notification/google_pub_sub"
+ _ "github.com/chrislusf/seaweedfs/weed/notification/kafka"
+ _ "github.com/chrislusf/seaweedfs/weed/notification/log"
+ "github.com/chrislusf/seaweedfs/weed/security"
+)
+
+type GatewayOption struct {
+ Masters []string
+ Filers []string
+ MaxMB int
+ IsSecure bool
+}
+
+type GatewayServer struct {
+ option *GatewayOption
+ secret security.SigningKey
+ grpcDialOption grpc.DialOption
+}
+
+func NewGatewayServer(defaultMux *http.ServeMux, option *GatewayOption) (fs *GatewayServer, err error) {
+
+ fs = &GatewayServer{
+ option: option,
+ grpcDialOption: security.LoadClientTLS(util.GetViper(), "grpc.client"),
+ }
+
+ if len(option.Masters) == 0 {
+ glog.Fatal("master list is required!")
+ }
+
+ defaultMux.HandleFunc("/blobs/", fs.blobsHandler)
+ defaultMux.HandleFunc("/files/", fs.filesHandler)
+ defaultMux.HandleFunc("/topics/", fs.topicsHandler)
+
+ return fs, nil
+}
+
+func (fs *GatewayServer) getMaster() string {
+ randMaster := rand.Intn(len(fs.option.Masters))
+ return fs.option.Masters[randMaster]
+}
+
+func (fs *GatewayServer) blobsHandler(w http.ResponseWriter, r *http.Request) {
+ switch r.Method {
+ case "DELETE":
+ chunkId := r.URL.Path[len("/blobs/"):]
+ fullUrl, err := operation.LookupFileId(fs.getMaster, chunkId)
+ if err != nil {
+ writeJsonError(w, r, http.StatusNotFound, err)
+ return
+ }
+ var jwtAuthorization security.EncodedJwt
+ if fs.option.IsSecure {
+ jwtAuthorization = operation.LookupJwt(fs.getMaster(), chunkId)
+ }
+ body, statusCode, err := util.DeleteProxied(fullUrl, string(jwtAuthorization))
+ if err != nil {
+ writeJsonError(w, r, http.StatusNotFound, err)
+ return
+ }
+ w.WriteHeader(statusCode)
+ w.Write(body)
+ case "POST":
+ submitForClientHandler(w, r, fs.getMaster, fs.grpcDialOption)
+ }
+}
+
+func (fs *GatewayServer) filesHandler(w http.ResponseWriter, r *http.Request) {
+ switch r.Method {
+ case "DELETE":
+ case "POST":
+ }
+}
+
+func (fs *GatewayServer) topicsHandler(w http.ResponseWriter, r *http.Request) {
+ switch r.Method {
+ case "POST":
+ }
+}
diff --git a/weed/server/master_grpc_server.go b/weed/server/master_grpc_server.go
index 0f0b7f101..3e6d9bb9e 100644
--- a/weed/server/master_grpc_server.go
+++ b/weed/server/master_grpc_server.go
@@ -80,10 +80,14 @@ func (ms *MasterServer) SendHeartbeat(stream master_pb.Seaweed_SendHeartbeatServ
dn.AdjustMaxVolumeCounts(heartbeat.MaxVolumeCounts)
glog.V(4).Infof("master received heartbeat %s", heartbeat.String())
+ var dataCenter string
+ if dc := dn.GetDataCenter(); dc != nil {
+ dataCenter = string(dc.Id())
+ }
message := &master_pb.VolumeLocation{
Url: dn.Url(),
PublicUrl: dn.PublicUrl,
- DataCenter: string(dn.GetDataCenter().Id()),
+ DataCenter: dataCenter,
}
if len(heartbeat.NewVolumes) > 0 || len(heartbeat.DeletedVolumes) > 0 {
// process delta volume ids if exists for fast volume id updates
diff --git a/weed/server/master_grpc_server_admin.go b/weed/server/master_grpc_server_admin.go
index 7e7dcb36b..93c9e4e4e 100644
--- a/weed/server/master_grpc_server_admin.go
+++ b/weed/server/master_grpc_server_admin.go
@@ -3,6 +3,7 @@ package weed_server
import (
"context"
"fmt"
+ "github.com/chrislusf/seaweedfs/weed/glog"
"math/rand"
"sync"
"time"
@@ -60,6 +61,7 @@ const (
type AdminLock struct {
accessSecret int64
accessLockTime time.Time
+ lastClient string
}
type AdminLocks struct {
@@ -73,14 +75,15 @@ func NewAdminLocks() *AdminLocks {
}
}
-func (locks *AdminLocks) isLocked(lockName string) bool {
+func (locks *AdminLocks) isLocked(lockName string) (clientName string, isLocked bool) {
locks.RLock()
defer locks.RUnlock()
adminLock, found := locks.locks[lockName]
if !found {
- return false
+ return "", false
}
- return adminLock.accessLockTime.Add(LockDuration).After(time.Now())
+ glog.V(4).Infof("isLocked %v", adminLock.lastClient)
+ return adminLock.lastClient, adminLock.accessLockTime.Add(LockDuration).After(time.Now())
}
func (locks *AdminLocks) isValidToken(lockName string, ts time.Time, token int64) bool {
@@ -93,12 +96,13 @@ func (locks *AdminLocks) isValidToken(lockName string, ts time.Time, token int64
return adminLock.accessLockTime.Equal(ts) && adminLock.accessSecret == token
}
-func (locks *AdminLocks) generateToken(lockName string) (ts time.Time, token int64) {
+func (locks *AdminLocks) generateToken(lockName string, clientName string) (ts time.Time, token int64) {
locks.Lock()
defer locks.Unlock()
lock := &AdminLock{
accessSecret: rand.Int63(),
accessLockTime: time.Now(),
+ lastClient: clientName,
}
locks.locks[lockName] = lock
return lock.accessLockTime, lock.accessSecret
@@ -113,18 +117,19 @@ func (locks *AdminLocks) deleteLock(lockName string) {
func (ms *MasterServer) LeaseAdminToken(ctx context.Context, req *master_pb.LeaseAdminTokenRequest) (*master_pb.LeaseAdminTokenResponse, error) {
resp := &master_pb.LeaseAdminTokenResponse{}
- if ms.adminLocks.isLocked(req.LockName) {
+ if lastClient, isLocked := ms.adminLocks.isLocked(req.LockName); isLocked {
+ glog.V(4).Infof("LeaseAdminToken %v", lastClient)
if req.PreviousToken != 0 && ms.adminLocks.isValidToken(req.LockName, time.Unix(0, req.PreviousLockTime), req.PreviousToken) {
// for renew
- ts, token := ms.adminLocks.generateToken(req.LockName)
+ ts, token := ms.adminLocks.generateToken(req.LockName, req.ClientName)
resp.Token, resp.LockTsNs = token, ts.UnixNano()
return resp, nil
}
// refuse since still locked
- return resp, fmt.Errorf("already locked")
+ return resp, fmt.Errorf("already locked by " + lastClient)
}
// for fresh lease request
- ts, token := ms.adminLocks.generateToken(req.LockName)
+ ts, token := ms.adminLocks.generateToken(req.LockName, req.ClientName)
resp.Token, resp.LockTsNs = token, ts.UnixNano()
return resp, nil
}
diff --git a/weed/server/master_server.go b/weed/server/master_server.go
index 9404081b4..e2b2df18d 100644
--- a/weed/server/master_server.go
+++ b/weed/server/master_server.go
@@ -277,6 +277,13 @@ func (ms *MasterServer) createSequencer(option *MasterOption) sequence.Sequencer
glog.Error(err)
seq = nil
}
+ case "snowflake":
+ var err error
+ seq, err = sequence.NewSnowflakeSequencer(fmt.Sprintf("%s:%d", option.Host, option.Port))
+ if err != nil {
+ glog.Error(err)
+ seq = nil
+ }
default:
seq = sequence.NewMemorySequencer()
}
diff --git a/weed/server/master_server_handlers_ui.go b/weed/server/master_server_handlers_ui.go
index 9cd58158b..3822c6113 100644
--- a/weed/server/master_server_handlers_ui.go
+++ b/weed/server/master_server_handlers_ui.go
@@ -14,17 +14,19 @@ func (ms *MasterServer) uiStatusHandler(w http.ResponseWriter, r *http.Request)
infos := make(map[string]interface{})
infos["Up Time"] = time.Now().Sub(startTime).String()
args := struct {
- Version string
- Topology interface{}
- RaftServer raft.Server
- Stats map[string]interface{}
- Counters *stats.ServerStats
+ Version string
+ Topology interface{}
+ RaftServer raft.Server
+ Stats map[string]interface{}
+ Counters *stats.ServerStats
+ VolumeSizeLimitMB uint
}{
util.Version(),
ms.Topo.ToMap(),
ms.Topo.RaftServer,
infos,
serverStats,
+ ms.option.VolumeSizeLimitMB,
}
ui.StatusTpl.Execute(w, args)
}
diff --git a/weed/server/master_ui/templates.go b/weed/server/master_ui/templates.go
index 60873f6aa..31b6353e9 100644
--- a/weed/server/master_ui/templates.go
+++ b/weed/server/master_ui/templates.go
@@ -22,9 +22,13 @@ var StatusTpl = template.Must(template.New("status").Parse(`<!DOCTYPE html>
<div class="row">
<div class="col-sm-6">
<h2>Cluster status</h2>
- <table class="table">
+ <table class="table table-condensed table-striped">
<tbody>
<tr>
+ <th>Volume Size Limit</th>
+ <td>{{ .VolumeSizeLimitMB }}MB</td>
+ </tr>
+ <tr>
<th>Free</th>
<td>{{ .Topology.Free }}</td>
</tr>
@@ -38,8 +42,8 @@ var StatusTpl = template.Must(template.New("status").Parse(`<!DOCTYPE html>
<td><a href="http://{{ .Leader }}">{{ .Leader }}</a></td>
</tr>
<tr>
- <td class="col-sm-2 field-label"><label>Other Masters:</label></td>
- <td class="col-sm-10"><ul class="list-unstyled">
+ <th>Other Masters</th>
+ <td class="col-sm-5"><ul class="list-unstyled">
{{ range $k, $p := .Peers }}
<li><a href="http://{{ $p.Name }}/ui/index.html">{{ $p.Name }}</a></li>
{{ end }}
diff --git a/weed/server/volume_grpc_read_write.go b/weed/server/volume_grpc_read_write.go
new file mode 100644
index 000000000..988e9e145
--- /dev/null
+++ b/weed/server/volume_grpc_read_write.go
@@ -0,0 +1,38 @@
+package weed_server
+
+import (
+ "context"
+ "fmt"
+ "github.com/chrislusf/seaweedfs/weed/pb/volume_server_pb"
+ "github.com/chrislusf/seaweedfs/weed/storage/needle"
+ "github.com/chrislusf/seaweedfs/weed/storage/types"
+)
+
+func (vs *VolumeServer) ReadNeedleBlob(ctx context.Context, req *volume_server_pb.ReadNeedleBlobRequest) (resp *volume_server_pb.ReadNeedleBlobResponse, err error) {
+ resp = &volume_server_pb.ReadNeedleBlobResponse{}
+ v := vs.store.GetVolume(needle.VolumeId(req.VolumeId))
+ if v == nil {
+ return nil, fmt.Errorf("not found volume id %d", req.VolumeId)
+ }
+
+ resp.NeedleBlob, err = v.ReadNeedleBlob(req.Offset, types.Size(req.Size))
+ if err != nil {
+ return nil, fmt.Errorf("read needle blob offset %d size %d: %v", req.Offset, req.Size, err)
+ }
+
+ return resp, nil
+}
+
+func (vs *VolumeServer) WriteNeedleBlob(ctx context.Context, req *volume_server_pb.WriteNeedleBlobRequest) (resp *volume_server_pb.WriteNeedleBlobResponse, err error) {
+ resp = &volume_server_pb.WriteNeedleBlobResponse{}
+ v := vs.store.GetVolume(needle.VolumeId(req.VolumeId))
+ if v == nil {
+ return nil, fmt.Errorf("not found volume id %d", req.VolumeId)
+ }
+
+ if err = v.WriteNeedleBlob(types.NeedleId(req.NeedleId), req.NeedleBlob, types.Size(req.Size)); err != nil {
+ return nil, fmt.Errorf("write blob needle %d size %d: %v", req.NeedleId, req.Size, err)
+ }
+
+ return resp, nil
+}
diff --git a/weed/server/volume_server.go b/weed/server/volume_server.go
index e496b1ce2..e11d607a4 100644
--- a/weed/server/volume_server.go
+++ b/weed/server/volume_server.go
@@ -4,6 +4,7 @@ import (
"fmt"
"github.com/chrislusf/seaweedfs/weed/storage/types"
"net/http"
+ "sync"
"google.golang.org/grpc"
@@ -34,6 +35,10 @@ type VolumeServer struct {
fileSizeLimitBytes int64
isHeartbeating bool
stopChan chan bool
+
+ inFlightDataSize int64
+ inFlightDataLimitCond *sync.Cond
+ concurrentUploadLimit int64
}
func NewVolumeServer(adminMux, publicMux *http.ServeMux, ip string,
@@ -48,6 +53,7 @@ func NewVolumeServer(adminMux, publicMux *http.ServeMux, ip string,
readRedirect bool,
compactionMBPerSecond int,
fileSizeLimitMB int,
+ concurrentUploadLimit int64,
) *VolumeServer {
v := util.GetViper()
@@ -72,6 +78,8 @@ func NewVolumeServer(adminMux, publicMux *http.ServeMux, ip string,
fileSizeLimitBytes: int64(fileSizeLimitMB) * 1024 * 1024,
isHeartbeating: true,
stopChan: make(chan bool),
+ inFlightDataLimitCond: sync.NewCond(new(sync.Mutex)),
+ concurrentUploadLimit: concurrentUploadLimit,
}
vs.SeedMasterNodes = masterNodes
diff --git a/weed/server/volume_server_handlers.go b/weed/server/volume_server_handlers.go
index 7852c950a..4527add44 100644
--- a/weed/server/volume_server_handlers.go
+++ b/weed/server/volume_server_handlers.go
@@ -2,7 +2,9 @@ package weed_server
import (
"net/http"
+ "strconv"
"strings"
+ "sync/atomic"
"github.com/chrislusf/seaweedfs/weed/util"
@@ -40,8 +42,24 @@ func (vs *VolumeServer) privateStoreHandler(w http.ResponseWriter, r *http.Reque
stats.DeleteRequest()
vs.guard.WhiteList(vs.DeleteHandler)(w, r)
case "PUT", "POST":
+
+ // wait until in flight data is less than the limit
+ contentLength := getContentLength(r)
+ vs.inFlightDataLimitCond.L.Lock()
+ for atomic.LoadInt64(&vs.inFlightDataSize) > vs.concurrentUploadLimit {
+ vs.inFlightDataLimitCond.Wait()
+ }
+ atomic.AddInt64(&vs.inFlightDataSize, contentLength)
+ vs.inFlightDataLimitCond.L.Unlock()
+ defer func() {
+ atomic.AddInt64(&vs.inFlightDataSize, -contentLength)
+ vs.inFlightDataLimitCond.Signal()
+ }()
+
+ // processs uploads
stats.WriteRequest()
vs.guard.WhiteList(vs.PostHandler)(w, r)
+
case "OPTIONS":
stats.ReadRequest()
w.Header().Add("Access-Control-Allow-Methods", "PUT, POST, GET, DELETE, OPTIONS")
@@ -49,6 +67,18 @@ func (vs *VolumeServer) privateStoreHandler(w http.ResponseWriter, r *http.Reque
}
}
+func getContentLength(r *http.Request) int64 {
+ contentLength := r.Header.Get("Content-Length")
+ if contentLength != "" {
+ length, err := strconv.ParseInt(contentLength, 10, 64)
+ if err != nil {
+ return 0
+ }
+ return length
+ }
+ return 0
+}
+
func (vs *VolumeServer) publicReadOnlyHandler(w http.ResponseWriter, r *http.Request) {
w.Header().Set("Server", "SeaweedFS Volume "+util.VERSION)
if r.Header.Get("Origin") != "" {
diff --git a/weed/server/volume_server_handlers_read.go b/weed/server/volume_server_handlers_read.go
index 2db46ac9b..3e977cfd4 100644
--- a/weed/server/volume_server_handlers_read.go
+++ b/weed/server/volume_server_handlers_read.go
@@ -27,7 +27,7 @@ var fileNameEscaper = strings.NewReplacer(`\`, `\\`, `"`, `\"`)
func (vs *VolumeServer) GetOrHeadHandler(w http.ResponseWriter, r *http.Request) {
- // println(r.Method + " " + r.URL.Path)
+ glog.V(9).Info(r.Method + " " + r.URL.Path + " " + r.Header.Get("Range"))
stats.VolumeServerRequestCounter.WithLabelValues("get").Inc()
start := time.Now()
@@ -261,13 +261,10 @@ func writeResponseContent(filename, mimeType string, rs io.ReadSeeker, w http.Re
return nil
}
- processRangeRequest(r, w, totalSize, mimeType, func(writer io.Writer, offset int64, size int64, httpStatusCode int) error {
+ processRangeRequest(r, w, totalSize, mimeType, func(writer io.Writer, offset int64, size int64) error {
if _, e = rs.Seek(offset, 0); e != nil {
return e
}
- if httpStatusCode != 0 {
- w.WriteHeader(httpStatusCode)
- }
_, e = io.CopyN(writer, rs, size)
return e
})
diff --git a/weed/server/volume_server_handlers_write.go b/weed/server/volume_server_handlers_write.go
index 602b147e1..3d752eda6 100644
--- a/weed/server/volume_server_handlers_write.go
+++ b/weed/server/volume_server_handlers_write.go
@@ -13,7 +13,6 @@ import (
"github.com/chrislusf/seaweedfs/weed/stats"
"github.com/chrislusf/seaweedfs/weed/storage/needle"
"github.com/chrislusf/seaweedfs/weed/topology"
- "github.com/chrislusf/seaweedfs/weed/util"
)
func (vs *VolumeServer) PostHandler(w http.ResponseWriter, r *http.Request) {
@@ -68,7 +67,7 @@ func (vs *VolumeServer) PostHandler(w http.ResponseWriter, r *http.Request) {
ret.Name = string(reqNeedle.Name)
}
ret.Size = uint32(originalSize)
- ret.ETag = fmt.Sprintf("%x", util.Base64Md5ToBytes(contentMd5))
+ ret.ETag = reqNeedle.Etag()
ret.Mime = string(reqNeedle.Mime)
setEtag(w, ret.ETag)
w.Header().Set("Content-MD5", contentMd5)
diff --git a/weed/server/volume_server_tcp_handlers_write.go b/weed/server/volume_server_tcp_handlers_write.go
new file mode 100644
index 000000000..a009611da
--- /dev/null
+++ b/weed/server/volume_server_tcp_handlers_write.go
@@ -0,0 +1,137 @@
+package weed_server
+
+import (
+ "bufio"
+ "fmt"
+ "github.com/chrislusf/seaweedfs/weed/glog"
+ "github.com/chrislusf/seaweedfs/weed/storage/needle"
+ "github.com/chrislusf/seaweedfs/weed/util"
+ "io"
+ "net"
+ "strings"
+)
+
+func (vs *VolumeServer) HandleTcpConnection(c net.Conn) {
+ defer c.Close()
+
+ glog.V(0).Infof("Serving writes from %s", c.RemoteAddr().String())
+
+ bufReader := bufio.NewReaderSize(c, 1024*1024)
+ bufWriter := bufio.NewWriterSize(c, 1024*1024)
+
+ for {
+ cmd, err := bufReader.ReadString('\n')
+ if err != nil {
+ if err != io.EOF {
+ glog.Errorf("read command from %s: %v", c.RemoteAddr().String(), err)
+ }
+ return
+ }
+ cmd = cmd[:len(cmd)-1]
+ switch cmd[0] {
+ case '+':
+ fileId := cmd[1:]
+ err = vs.handleTcpPut(fileId, bufReader)
+ if err == nil {
+ bufWriter.Write([]byte("+OK\n"))
+ } else {
+ bufWriter.Write([]byte("-ERR " + string(err.Error()) + "\n"))
+ }
+ case '-':
+ fileId := cmd[1:]
+ err = vs.handleTcpDelete(fileId)
+ if err == nil {
+ bufWriter.Write([]byte("+OK\n"))
+ } else {
+ bufWriter.Write([]byte("-ERR " + string(err.Error()) + "\n"))
+ }
+ case '?':
+ fileId := cmd[1:]
+ err = vs.handleTcpGet(fileId, bufWriter)
+ case '!':
+ bufWriter.Flush()
+ }
+
+ }
+
+}
+
+func (vs *VolumeServer) handleTcpGet(fileId string, writer *bufio.Writer) (err error) {
+
+ volumeId, n, err2 := vs.parseFileId(fileId)
+ if err2 != nil {
+ return err2
+ }
+
+ volume := vs.store.GetVolume(volumeId)
+ if volume == nil {
+ return fmt.Errorf("volume %d not found", volumeId)
+ }
+
+ err = volume.StreamRead(n, writer)
+ if err != nil {
+ return err
+ }
+
+ return nil
+}
+
+func (vs *VolumeServer) handleTcpPut(fileId string, bufReader *bufio.Reader) (err error) {
+
+ volumeId, n, err2 := vs.parseFileId(fileId)
+ if err2 != nil {
+ return err2
+ }
+
+ volume := vs.store.GetVolume(volumeId)
+ if volume == nil {
+ return fmt.Errorf("volume %d not found", volumeId)
+ }
+
+ sizeBuf := make([]byte, 4)
+ if _, err = bufReader.Read(sizeBuf); err != nil {
+ return err
+ }
+ dataSize := util.BytesToUint32(sizeBuf)
+
+ err = volume.StreamWrite(n, bufReader, dataSize)
+ if err != nil {
+ return err
+ }
+
+ return nil
+}
+
+func (vs *VolumeServer) handleTcpDelete(fileId string) (err error) {
+
+ volumeId, n, err2 := vs.parseFileId(fileId)
+ if err2 != nil {
+ return err2
+ }
+
+ _, err = vs.store.DeleteVolumeNeedle(volumeId, n)
+ if err != nil {
+ return err
+ }
+
+ return nil
+}
+
+func (vs *VolumeServer) parseFileId(fileId string) (needle.VolumeId, *needle.Needle, error) {
+
+ commaIndex := strings.LastIndex(fileId, ",")
+ if commaIndex <= 0 {
+ return 0, nil, fmt.Errorf("unknown fileId %s", fileId)
+ }
+
+ vid, fid := fileId[0:commaIndex], fileId[commaIndex+1:]
+
+ volumeId, ve := needle.NewVolumeId(vid)
+ if ve != nil {
+ return 0, nil, fmt.Errorf("unknown volume id in fileId %s", fileId)
+ }
+
+ n := new(needle.Needle)
+ n.ParsePath(fid)
+ return volumeId, n, nil
+}
diff --git a/weed/server/volume_server_ui/templates.go b/weed/server/volume_server_ui/templates.go
index 6a8bb6f55..ee4c2e31d 100644
--- a/weed/server/volume_server_ui/templates.go
+++ b/weed/server/volume_server_ui/templates.go
@@ -1,4 +1,4 @@
-package master_ui
+package volume_server_ui
import (
"fmt"