aboutsummaryrefslogtreecommitdiff
path: root/weed/server
diff options
context:
space:
mode:
Diffstat (limited to 'weed/server')
-rw-r--r--weed/server/common.go9
-rw-r--r--weed/server/filer_grpc_server.go6
-rw-r--r--weed/server/filer_grpc_server_rename.go18
-rw-r--r--weed/server/filer_server.go6
-rw-r--r--weed/server/filer_server_handlers_proxy.go6
-rw-r--r--weed/server/filer_server_handlers_read.go21
-rw-r--r--weed/server/filer_server_handlers_tagging.go8
-rw-r--r--weed/server/filer_server_handlers_write.go70
-rw-r--r--weed/server/filer_server_handlers_write_autochunk.go25
-rw-r--r--weed/server/filer_server_handlers_write_cipher.go2
-rw-r--r--weed/server/filer_server_handlers_write_upload.go22
-rw-r--r--weed/server/master_server.go2
-rw-r--r--weed/server/raft_server.go76
-rw-r--r--weed/server/volume_server.go3
-rw-r--r--weed/server/volume_server_handlers.go25
-rw-r--r--weed/server/volume_server_handlers_admin.go19
-rw-r--r--weed/server/volume_server_handlers_read.go5
-rw-r--r--weed/server/webdav_server.go5
18 files changed, 238 insertions, 90 deletions
diff --git a/weed/server/common.go b/weed/server/common.go
index ba4d13456..0d458c9c3 100644
--- a/weed/server/common.go
+++ b/weed/server/common.go
@@ -1,6 +1,7 @@
package weed_server
import (
+ "bufio"
"bytes"
"encoding/json"
"errors"
@@ -277,10 +278,12 @@ func adjustHeaderContentDisposition(w http.ResponseWriter, r *http.Request, file
func processRangeRequest(r *http.Request, w http.ResponseWriter, totalSize int64, mimeType string, writeFn func(writer io.Writer, offset int64, size int64) error) {
rangeReq := r.Header.Get("Range")
+ bufferedWriter := bufio.NewWriterSize(w, 128*1024)
+ defer bufferedWriter.Flush()
if rangeReq == "" {
w.Header().Set("Content-Length", strconv.FormatInt(totalSize, 10))
- if err := writeFn(w, 0, totalSize); err != nil {
+ if err := writeFn(bufferedWriter, 0, totalSize); err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}
@@ -321,7 +324,7 @@ func processRangeRequest(r *http.Request, w http.ResponseWriter, totalSize int64
w.Header().Set("Content-Range", ra.contentRange(totalSize))
w.WriteHeader(http.StatusPartialContent)
- err = writeFn(w, ra.start, ra.length)
+ err = writeFn(bufferedWriter, ra.start, ra.length)
if err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError)
return
@@ -361,7 +364,7 @@ func processRangeRequest(r *http.Request, w http.ResponseWriter, totalSize int64
w.Header().Set("Content-Length", strconv.FormatInt(sendSize, 10))
}
w.WriteHeader(http.StatusPartialContent)
- if _, err := io.CopyN(w, sendContent, sendSize); err != nil {
+ if _, err := io.CopyN(bufferedWriter, sendContent, sendSize); err != nil {
http.Error(w, "Internal Error", http.StatusInternalServerError)
return
}
diff --git a/weed/server/filer_grpc_server.go b/weed/server/filer_grpc_server.go
index 8e6cd8451..5a5714156 100644
--- a/weed/server/filer_grpc_server.go
+++ b/weed/server/filer_grpc_server.go
@@ -148,7 +148,7 @@ func (fs *FilerServer) CreateEntry(ctx context.Context, req *filer_pb.CreateEntr
newEntry := filer.FromPbEntry(req.Directory, req.Entry)
newEntry.Chunks = chunks
- createErr := fs.filer.CreateEntry(ctx, newEntry, req.OExcl, req.IsFromOtherCluster, req.Signatures)
+ createErr := fs.filer.CreateEntry(ctx, newEntry, req.OExcl, req.IsFromOtherCluster, req.Signatures, req.SkipCheckParentDirectory)
if createErr == nil {
fs.filer.DeleteChunks(garbage)
@@ -271,7 +271,7 @@ func (fs *FilerServer) AppendToEntry(ctx context.Context, req *filer_pb.AppendTo
glog.V(0).Infof("MaybeManifestize: %v", err)
}
- err = fs.filer.CreateEntry(context.Background(), entry, false, false, nil)
+ err = fs.filer.CreateEntry(context.Background(), entry, false, false, nil, false)
return &filer_pb.AppendToEntryResponse{}, err
}
@@ -393,7 +393,7 @@ func (fs *FilerServer) GetFilerConfiguration(ctx context.Context, req *filer_pb.
clusterId, _ := fs.filer.Store.KvGet(context.Background(), []byte("clusterId"))
t := &filer_pb.GetFilerConfigurationResponse{
- Masters: pb.ToAddressStrings(fs.option.Masters),
+ Masters: pb.ToAddressStringsFromMap(fs.option.Masters),
Collection: fs.option.Collection,
Replication: fs.option.DefaultReplication,
MaxMb: uint32(fs.option.MaxMB),
diff --git a/weed/server/filer_grpc_server_rename.go b/weed/server/filer_grpc_server_rename.go
index 773f7aebe..7d6650b53 100644
--- a/weed/server/filer_grpc_server_rename.go
+++ b/weed/server/filer_grpc_server_rename.go
@@ -163,13 +163,17 @@ func (fs *FilerServer) moveSelfEntry(ctx context.Context, stream filer_pb.Seawee
// add to new directory
newEntry := &filer.Entry{
- FullPath: newPath,
- Attr: entry.Attr,
- Chunks: entry.Chunks,
- Extended: entry.Extended,
- Content: entry.Content,
- }
- if createErr := fs.filer.CreateEntry(ctx, newEntry, false, false, signatures); createErr != nil {
+ FullPath: newPath,
+ Attr: entry.Attr,
+ Chunks: entry.Chunks,
+ Extended: entry.Extended,
+ Content: entry.Content,
+ HardLinkCounter: entry.HardLinkCounter,
+ HardLinkId: entry.HardLinkId,
+ Remote: entry.Remote,
+ Quota: entry.Quota,
+ }
+ if createErr := fs.filer.CreateEntry(ctx, newEntry, false, false, signatures, false); createErr != nil {
return createErr
}
if stream != nil {
diff --git a/weed/server/filer_server.go b/weed/server/filer_server.go
index e51299c6d..7edd5870f 100644
--- a/weed/server/filer_server.go
+++ b/weed/server/filer_server.go
@@ -48,7 +48,7 @@ import (
)
type FilerOption struct {
- Masters []pb.ServerAddress
+ Masters map[string]pb.ServerAddress
Collection string
DefaultReplication string
DisableDirListing bool
@@ -130,8 +130,8 @@ func NewFilerServer(defaultMux, readonlyMux *http.ServeMux, option *FilerOption)
go fs.filer.KeepMasterClientConnected()
if !util.LoadConfiguration("filer", false) {
- v.Set("leveldb2.enabled", true)
- v.Set("leveldb2.dir", option.DefaultLevelDbDir)
+ v.SetDefault("leveldb2.enabled", true)
+ v.SetDefault("leveldb2.dir", option.DefaultLevelDbDir)
_, err := os.Stat(option.DefaultLevelDbDir)
if os.IsNotExist(err) {
os.MkdirAll(option.DefaultLevelDbDir, 0755)
diff --git a/weed/server/filer_server_handlers_proxy.go b/weed/server/filer_server_handlers_proxy.go
index b8b28790b..301d609ec 100644
--- a/weed/server/filer_server_handlers_proxy.go
+++ b/weed/server/filer_server_handlers_proxy.go
@@ -3,6 +3,7 @@ package weed_server
import (
"github.com/chrislusf/seaweedfs/weed/glog"
"github.com/chrislusf/seaweedfs/weed/util"
+ "github.com/chrislusf/seaweedfs/weed/util/mem"
"io"
"math/rand"
"net/http"
@@ -62,6 +63,9 @@ func (fs *FilerServer) proxyToVolumeServer(w http.ResponseWriter, r *http.Reques
w.Header()[k] = v
}
w.WriteHeader(proxyResponse.StatusCode)
- io.Copy(w, proxyResponse.Body)
+
+ buf := mem.Allocate(128 * 1024)
+ defer mem.Free(buf)
+ io.CopyBuffer(w, proxyResponse.Body, buf)
}
diff --git a/weed/server/filer_server_handlers_read.go b/weed/server/filer_server_handlers_read.go
index 8037b1d94..2bac585e9 100644
--- a/weed/server/filer_server_handlers_read.go
+++ b/weed/server/filer_server_handlers_read.go
@@ -4,7 +4,9 @@ import (
"bytes"
"context"
"fmt"
+ "github.com/chrislusf/seaweedfs/weed/util/mem"
"io"
+ "math"
"mime"
"net/http"
"path/filepath"
@@ -21,7 +23,6 @@ import (
"github.com/chrislusf/seaweedfs/weed/util"
)
-
// Validates the preconditions. Returns true if GET/HEAD operation should not proceed.
// Preconditions supported are:
// If-Modified-Since
@@ -119,6 +120,20 @@ func (fs *FilerServer) GetOrHeadHandler(w http.ResponseWriter, r *http.Request)
return
}
+ query := r.URL.Query()
+ if query.Get("metadata") == "true" {
+ if query.Get("resolveManifest") == "true" {
+ if entry.Chunks, _, err = filer.ResolveChunkManifest(
+ fs.filer.MasterClient.GetLookupFileIdFunction(),
+ entry.Chunks, 0, math.MaxInt64); err != nil {
+ err = fmt.Errorf("failed to resolve chunk manifest, err: %s", err.Error())
+ writeJsonError(w, r, http.StatusInternalServerError, err)
+ }
+ }
+ writeJsonQuiet(w, r, http.StatusOK, entry)
+ return
+ }
+
etag := filer.ETagEntry(entry)
if checkPreconditions(w, r, entry) {
return
@@ -185,7 +200,9 @@ func (fs *FilerServer) GetOrHeadHandler(w http.ResponseWriter, r *http.Request)
}
width, height, mode, shouldResize := shouldResizeImages(ext, r)
if shouldResize {
- data, err := filer.ReadAll(fs.filer.MasterClient, entry.Chunks)
+ data := mem.Allocate(int(totalSize))
+ defer mem.Free(data)
+ err := filer.ReadAll(data, fs.filer.MasterClient, entry.Chunks)
if err != nil {
glog.Errorf("failed to read %s: %v", path, err)
w.WriteHeader(http.StatusNotModified)
diff --git a/weed/server/filer_server_handlers_tagging.go b/weed/server/filer_server_handlers_tagging.go
index 70b5327d6..ae2093947 100644
--- a/weed/server/filer_server_handlers_tagging.go
+++ b/weed/server/filer_server_handlers_tagging.go
@@ -43,7 +43,7 @@ func (fs *FilerServer) PutTaggingHandler(w http.ResponseWriter, r *http.Request)
}
}
- if dbErr := fs.filer.CreateEntry(ctx, existingEntry, false, false, nil); dbErr != nil {
+ if dbErr := fs.filer.CreateEntry(ctx, existingEntry, false, false, nil, false); dbErr != nil {
glog.V(0).Infof("failing to update %s tagging : %v", path, dbErr)
writeJsonError(w, r, http.StatusInternalServerError, err)
return
@@ -82,7 +82,9 @@ func (fs *FilerServer) DeleteTaggingHandler(w http.ResponseWriter, r *http.Reque
toDelete := strings.Split(r.URL.Query().Get("tagging"), ",")
deletions := make(map[string]struct{})
for _, deletion := range toDelete {
- deletions[deletion] = struct{}{}
+ if deletion != "" {
+ deletions[deletion] = struct{}{}
+ }
}
// delete all tags or specific tags
@@ -107,7 +109,7 @@ func (fs *FilerServer) DeleteTaggingHandler(w http.ResponseWriter, r *http.Reque
return
}
- if dbErr := fs.filer.CreateEntry(ctx, existingEntry, false, false, nil); dbErr != nil {
+ if dbErr := fs.filer.CreateEntry(ctx, existingEntry, false, false, nil, false); dbErr != nil {
glog.V(0).Infof("failing to delete %s tagging : %v", path, dbErr)
writeJsonError(w, r, http.StatusInternalServerError, err)
return
diff --git a/weed/server/filer_server_handlers_write.go b/weed/server/filer_server_handlers_write.go
index 1ebe66d43..3bbae8197 100644
--- a/weed/server/filer_server_handlers_write.go
+++ b/weed/server/filer_server_handlers_write.go
@@ -3,6 +3,7 @@ package weed_server
import (
"context"
"errors"
+ "fmt"
"net/http"
"os"
"strings"
@@ -78,11 +79,78 @@ func (fs *FilerServer) PostHandler(w http.ResponseWriter, r *http.Request, conte
return
}
- fs.autoChunk(ctx, w, r, contentLength, so)
+ if query.Has("mv.from") {
+ fs.move(ctx, w, r, so)
+ } else {
+ fs.autoChunk(ctx, w, r, contentLength, so)
+ }
+
util.CloseRequest(r)
}
+func (fs *FilerServer) move(ctx context.Context, w http.ResponseWriter, r *http.Request, so *operation.StorageOption) {
+ src := r.URL.Query().Get("mv.from")
+ dst := r.URL.Path
+
+ glog.V(2).Infof("FilerServer.move %v to %v", src, dst)
+
+ var err error
+ if src, err = clearName(src); err != nil {
+ writeJsonError(w, r, http.StatusBadRequest, err)
+ return
+ }
+ if dst, err = clearName(dst); err != nil {
+ writeJsonError(w, r, http.StatusBadRequest, err)
+ return
+ }
+ src = strings.TrimRight(src, "/")
+ if src == "" {
+ err = fmt.Errorf("invalid source '/'")
+ writeJsonError(w, r, http.StatusBadRequest, err)
+ return
+ }
+
+ srcPath := util.FullPath(src)
+ dstPath := util.FullPath(dst)
+ srcEntry, err := fs.filer.FindEntry(ctx, srcPath)
+ if err != nil {
+ err = fmt.Errorf("failed to get src entry '%s', err: %s", src, err)
+ writeJsonError(w, r, http.StatusBadRequest, err)
+ return
+ }
+
+ oldDir, oldName := srcPath.DirAndName()
+ newDir, newName := dstPath.DirAndName()
+ newName = util.Nvl(newName, oldName)
+
+ dstEntry, err := fs.filer.FindEntry(ctx, util.FullPath(strings.TrimRight(dst, "/")))
+ if err != nil && err != filer_pb.ErrNotFound {
+ err = fmt.Errorf("failed to get dst entry '%s', err: %s", dst, err)
+ writeJsonError(w, r, http.StatusInternalServerError, err)
+ return
+ }
+ if err == nil && !dstEntry.IsDirectory() && srcEntry.IsDirectory() {
+ err = fmt.Errorf("move: cannot overwrite non-directory '%s' with directory '%s'", dst, src)
+ writeJsonError(w, r, http.StatusBadRequest, err)
+ return
+ }
+
+ _, err = fs.AtomicRenameEntry(ctx, &filer_pb.AtomicRenameEntryRequest{
+ OldDirectory: oldDir,
+ OldName: oldName,
+ NewDirectory: newDir,
+ NewName: newName,
+ })
+ if err != nil {
+ err = fmt.Errorf("failed to move entry from '%s' to '%s', err: %s", src, dst, err)
+ writeJsonError(w, r, http.StatusBadRequest, err)
+ return
+ }
+
+ w.WriteHeader(http.StatusNoContent)
+}
+
// curl -X DELETE http://localhost:8888/path/to
// curl -X DELETE http://localhost:8888/path/to?recursive=true
// curl -X DELETE http://localhost:8888/path/to?recursive=true&ignoreRecursiveError=true
diff --git a/weed/server/filer_server_handlers_write_autochunk.go b/weed/server/filer_server_handlers_write_autochunk.go
index 61d30372b..854b35f82 100644
--- a/weed/server/filer_server_handlers_write_autochunk.go
+++ b/weed/server/filer_server_handlers_write_autochunk.go
@@ -130,6 +130,10 @@ func isAppend(r *http.Request) bool {
return r.URL.Query().Get("op") == "append"
}
+func skipCheckParentDirEntry(r *http.Request) bool {
+ return r.URL.Query().Get("skipCheckParentDir") == "true"
+}
+
func (fs *FilerServer) saveMetaData(ctx context.Context, r *http.Request, fileName string, contentType string, so *operation.StorageOption, md5bytes []byte, fileChunks []*filer_pb.FileChunk, chunkOffset int64, content []byte) (filerResult *FilerPostResult, replyerr error) {
// detect file mode
@@ -161,8 +165,11 @@ func (fs *FilerServer) saveMetaData(ctx context.Context, r *http.Request, fileNa
var entry *filer.Entry
var mergedChunks []*filer_pb.FileChunk
+
+ isAppend := isAppend(r)
+ isOffsetWrite := len(fileChunks) > 0 && fileChunks[0].Offset > 0
// when it is an append
- if isAppend(r) {
+ if isAppend || isOffsetWrite {
existingEntry, findErr := fs.filer.FindEntry(ctx, util.FullPath(path))
if findErr != nil && findErr != filer_pb.ErrNotFound {
glog.V(0).Infof("failing to find %s: %v", path, findErr)
@@ -173,11 +180,13 @@ func (fs *FilerServer) saveMetaData(ctx context.Context, r *http.Request, fileNa
entry.Mtime = time.Now()
entry.Md5 = nil
// adjust chunk offsets
- for _, chunk := range fileChunks {
- chunk.Offset += int64(entry.FileSize)
+ if isAppend {
+ for _, chunk := range fileChunks {
+ chunk.Offset += int64(entry.FileSize)
+ }
+ entry.FileSize += uint64(chunkOffset)
}
mergedChunks = append(entry.Chunks, fileChunks...)
- entry.FileSize += uint64(chunkOffset)
// TODO
if len(entry.Content) > 0 {
@@ -215,6 +224,10 @@ func (fs *FilerServer) saveMetaData(ctx context.Context, r *http.Request, fileNa
return
}
entry.Chunks = mergedChunks
+ if isOffsetWrite {
+ entry.Md5 = nil
+ entry.FileSize = entry.Size()
+ }
filerResult = &FilerPostResult{
Name: fileName,
@@ -234,7 +247,7 @@ func (fs *FilerServer) saveMetaData(ctx context.Context, r *http.Request, fileNa
}
}
- if dbErr := fs.filer.CreateEntry(ctx, entry, false, false, nil); dbErr != nil {
+ if dbErr := fs.filer.CreateEntry(ctx, entry, false, false, nil, skipCheckParentDirEntry(r)); dbErr != nil {
replyerr = dbErr
filerResult.Error = dbErr.Error()
glog.V(0).Infof("failing to write %s to filer server : %v", path, dbErr)
@@ -311,7 +324,7 @@ func (fs *FilerServer) mkdir(ctx context.Context, w http.ResponseWriter, r *http
Name: util.FullPath(path).Name(),
}
- if dbErr := fs.filer.CreateEntry(ctx, entry, false, false, nil); dbErr != nil {
+ if dbErr := fs.filer.CreateEntry(ctx, entry, false, false, nil, false); dbErr != nil {
replyerr = dbErr
filerResult.Error = dbErr.Error()
glog.V(0).Infof("failing to create dir %s on filer server : %v", path, dbErr)
diff --git a/weed/server/filer_server_handlers_write_cipher.go b/weed/server/filer_server_handlers_write_cipher.go
index 14fa10e2c..a5b085764 100644
--- a/weed/server/filer_server_handlers_write_cipher.go
+++ b/weed/server/filer_server_handlers_write_cipher.go
@@ -93,7 +93,7 @@ func (fs *FilerServer) encrypt(ctx context.Context, w http.ResponseWriter, r *ht
Size: int64(pu.OriginalDataSize),
}
- if dbErr := fs.filer.CreateEntry(ctx, entry, false, false, nil); dbErr != nil {
+ if dbErr := fs.filer.CreateEntry(ctx, entry, false, false, nil, false); dbErr != nil {
fs.filer.DeleteChunks(entry.Chunks)
err = dbErr
filerResult.Error = dbErr.Error()
diff --git a/weed/server/filer_server_handlers_write_upload.go b/weed/server/filer_server_handlers_write_upload.go
index a7716ef02..6ee378819 100644
--- a/weed/server/filer_server_handlers_write_upload.go
+++ b/weed/server/filer_server_handlers_write_upload.go
@@ -3,10 +3,12 @@ package weed_server
import (
"bytes"
"crypto/md5"
+ "fmt"
"hash"
"io"
"net/http"
"sort"
+ "strconv"
"strings"
"sync"
"sync/atomic"
@@ -28,6 +30,22 @@ var bufPool = sync.Pool{
}
func (fs *FilerServer) uploadReaderToChunks(w http.ResponseWriter, r *http.Request, reader io.Reader, chunkSize int32, fileName, contentType string, contentLength int64, so *operation.StorageOption) (fileChunks []*filer_pb.FileChunk, md5Hash hash.Hash, chunkOffset int64, uploadErr error, smallContent []byte) {
+ query := r.URL.Query()
+
+ isAppend := isAppend(r)
+ if query.Has("offset") {
+ offset := query.Get("offset")
+ offsetInt, err := strconv.ParseInt(offset, 10, 64)
+ if err != nil || offsetInt < 0 {
+ err = fmt.Errorf("invalid 'offset': '%s'", offset)
+ return nil, nil, 0, err, nil
+ }
+ if isAppend && offsetInt > 0 {
+ err = fmt.Errorf("cannot set offset when op=append")
+ return nil, nil, 0, err, nil
+ }
+ chunkOffset = offsetInt
+ }
md5Hash = md5.New()
var partReader = io.NopCloser(io.TeeReader(reader, md5Hash))
@@ -61,9 +79,10 @@ func (fs *FilerServer) uploadReaderToChunks(w http.ResponseWriter, r *http.Reque
bufPool.Put(bytesBuffer)
atomic.AddInt64(&bytesBufferCounter, -1)
bytesBufferLimitCond.Signal()
+ uploadErr = err
break
}
- if chunkOffset == 0 && !isAppend(r) {
+ if chunkOffset == 0 && !isAppend {
if dataSize < fs.option.SaveToFilerLimit || strings.HasPrefix(r.URL.Path, filer.DirectoryEtcRoot) {
chunkOffset += dataSize
smallContent = make([]byte, dataSize)
@@ -108,6 +127,7 @@ func (fs *FilerServer) uploadReaderToChunks(w http.ResponseWriter, r *http.Reque
wg.Wait()
if uploadErr != nil {
+ fs.filer.DeleteChunks(fileChunks)
return nil, md5Hash, 0, uploadErr, nil
}
diff --git a/weed/server/master_server.go b/weed/server/master_server.go
index 671432d5c..b63e3a418 100644
--- a/weed/server/master_server.go
+++ b/weed/server/master_server.go
@@ -75,7 +75,7 @@ type MasterServer struct {
Cluster *cluster.Cluster
}
-func NewMasterServer(r *mux.Router, option *MasterOption, peers []pb.ServerAddress) *MasterServer {
+func NewMasterServer(r *mux.Router, option *MasterOption, peers map[string]pb.ServerAddress) *MasterServer {
v := util.GetViper()
signingKey := v.GetString("jwt.signing.key")
diff --git a/weed/server/raft_server.go b/weed/server/raft_server.go
index 568bfc7b5..d559cb691 100644
--- a/weed/server/raft_server.go
+++ b/weed/server/raft_server.go
@@ -5,8 +5,6 @@ import (
"math/rand"
"os"
"path"
- "sort"
- "strings"
"time"
"google.golang.org/grpc"
@@ -19,8 +17,19 @@ import (
"github.com/chrislusf/seaweedfs/weed/topology"
)
+type RaftServerOption struct {
+ GrpcDialOption grpc.DialOption
+ Peers map[string]pb.ServerAddress
+ ServerAddr pb.ServerAddress
+ DataDir string
+ Topo *topology.Topology
+ RaftResumeState bool
+ HeartbeatInterval time.Duration
+ ElectionTimeout time.Duration
+}
+
type RaftServer struct {
- peers []pb.ServerAddress // initial peers to join with
+ peers map[string]pb.ServerAddress // initial peers to join with
raftServer raft.Server
dataDir string
serverAddr pb.ServerAddress
@@ -52,12 +61,12 @@ func (s StateMachine) Recovery(data []byte) error {
return nil
}
-func NewRaftServer(grpcDialOption grpc.DialOption, peers []pb.ServerAddress, serverAddr pb.ServerAddress, dataDir string, topo *topology.Topology, raftResumeState bool) (*RaftServer, error) {
+func NewRaftServer(option *RaftServerOption) (*RaftServer, error) {
s := &RaftServer{
- peers: peers,
- serverAddr: serverAddr,
- dataDir: dataDir,
- topo: topo,
+ peers: option.Peers,
+ serverAddr: option.ServerAddr,
+ dataDir: option.DataDir,
+ topo: option.Topo,
}
if glog.V(4) {
@@ -67,27 +76,29 @@ func NewRaftServer(grpcDialOption grpc.DialOption, peers []pb.ServerAddress, ser
raft.RegisterCommand(&topology.MaxVolumeIdCommand{})
var err error
- transporter := raft.NewGrpcTransporter(grpcDialOption)
- glog.V(0).Infof("Starting RaftServer with %v", serverAddr)
+ transporter := raft.NewGrpcTransporter(option.GrpcDialOption)
+ glog.V(0).Infof("Starting RaftServer with %v", option.ServerAddr)
- if !raftResumeState {
+ // always clear previous log to avoid server is promotable
+ os.RemoveAll(path.Join(s.dataDir, "log"))
+ if !option.RaftResumeState {
// always clear previous metadata
os.RemoveAll(path.Join(s.dataDir, "conf"))
- os.RemoveAll(path.Join(s.dataDir, "log"))
os.RemoveAll(path.Join(s.dataDir, "snapshot"))
}
- if err := os.MkdirAll(path.Join(s.dataDir, "snapshot"), 0600); err != nil {
+ if err := os.MkdirAll(path.Join(s.dataDir, "snapshot"), 0700); err != nil {
return nil, err
}
- stateMachine := StateMachine{topo: topo}
- s.raftServer, err = raft.NewServer(string(s.serverAddr), s.dataDir, transporter, stateMachine, topo, "")
+ stateMachine := StateMachine{topo: option.Topo}
+ s.raftServer, err = raft.NewServer(string(s.serverAddr), s.dataDir, transporter, stateMachine, option.Topo, "")
if err != nil {
glog.V(0).Infoln(err)
return nil, err
}
- s.raftServer.SetHeartbeatInterval(time.Duration(300+rand.Intn(150)) * time.Millisecond)
- s.raftServer.SetElectionTimeout(10 * time.Second)
+ heartbeatInterval := time.Duration(float64(option.HeartbeatInterval) * (rand.Float64()*0.25 + 1))
+ s.raftServer.SetHeartbeatInterval(heartbeatInterval)
+ s.raftServer.SetElectionTimeout(option.ElectionTimeout)
if err := s.raftServer.LoadSnapshot(); err != nil {
return nil, err
}
@@ -95,39 +106,26 @@ func NewRaftServer(grpcDialOption grpc.DialOption, peers []pb.ServerAddress, ser
return nil, err
}
- for _, peer := range s.peers {
- if err := s.raftServer.AddPeer(string(peer), peer.ToGrpcAddress()); err != nil {
+ for name, peer := range s.peers {
+ if err := s.raftServer.AddPeer(name, peer.ToGrpcAddress()); err != nil {
return nil, err
}
}
// Remove deleted peers
for existsPeerName := range s.raftServer.Peers() {
- exists := false
- var existingPeer pb.ServerAddress
- for _, peer := range s.peers {
- if peer.ToGrpcAddress() == existsPeerName {
- exists, existingPeer = true, peer
- break
- }
- }
- if exists {
+ if existingPeer, found := s.peers[existsPeerName]; !found {
if err := s.raftServer.RemovePeer(existsPeerName); err != nil {
glog.V(0).Infoln(err)
return nil, err
} else {
- glog.V(0).Infof("removing old peer %s", existingPeer)
+ glog.V(0).Infof("removing old peer: %s", existingPeer)
}
}
}
s.GrpcServer = raft.NewGrpcServer(s.raftServer)
- if s.raftServer.IsLogEmpty() && isTheFirstOne(serverAddr, s.peers) {
- // Initialize the server by joining itself.
- // s.DoJoinCommand()
- }
-
glog.V(0).Infof("current cluster leader: %v", s.raftServer.Leader())
return s, nil
@@ -143,16 +141,6 @@ func (s *RaftServer) Peers() (members []string) {
return
}
-func isTheFirstOne(self pb.ServerAddress, peers []pb.ServerAddress) bool {
- sort.Slice(peers, func(i, j int) bool {
- return strings.Compare(string(peers[i]), string(peers[j])) < 0
- })
- if len(peers) <= 0 {
- return true
- }
- return self == peers[0]
-}
-
func (s *RaftServer) DoJoinCommand() {
glog.V(0).Infoln("Initializing new cluster")
diff --git a/weed/server/volume_server.go b/weed/server/volume_server.go
index 2551cc6e6..dcd27673c 100644
--- a/weed/server/volume_server.go
+++ b/weed/server/volume_server.go
@@ -23,7 +23,6 @@ type VolumeServer struct {
inFlightDownloadDataSize int64
concurrentUploadLimit int64
concurrentDownloadLimit int64
- inFlightUploadDataLimitCond *sync.Cond
inFlightDownloadDataLimitCond *sync.Cond
SeedMasterNodes []pb.ServerAddress
@@ -84,7 +83,6 @@ func NewVolumeServer(adminMux, publicMux *http.ServeMux, ip string,
fileSizeLimitBytes: int64(fileSizeLimitMB) * 1024 * 1024,
isHeartbeating: true,
stopChan: make(chan bool),
- inFlightUploadDataLimitCond: sync.NewCond(new(sync.Mutex)),
inFlightDownloadDataLimitCond: sync.NewCond(new(sync.Mutex)),
concurrentUploadLimit: concurrentUploadLimit,
concurrentDownloadLimit: concurrentDownloadLimit,
@@ -98,6 +96,7 @@ func NewVolumeServer(adminMux, publicMux *http.ServeMux, ip string,
handleStaticResources(adminMux)
adminMux.HandleFunc("/status", vs.statusHandler)
+ adminMux.HandleFunc("/healthz", vs.healthzHandler)
if signingKey == "" || enableUiAccess {
// only expose the volume server details for safe environments
adminMux.HandleFunc("/ui/index.html", vs.uiStatusHandler)
diff --git a/weed/server/volume_server_handlers.go b/weed/server/volume_server_handlers.go
index 510902cf0..49bc297fb 100644
--- a/weed/server/volume_server_handlers.go
+++ b/weed/server/volume_server_handlers.go
@@ -1,6 +1,7 @@
package weed_server
import (
+ "fmt"
"net/http"
"strconv"
"strings"
@@ -39,8 +40,14 @@ func (vs *VolumeServer) privateStoreHandler(w http.ResponseWriter, r *http.Reque
stats.ReadRequest()
vs.inFlightDownloadDataLimitCond.L.Lock()
for vs.concurrentDownloadLimit != 0 && atomic.LoadInt64(&vs.inFlightDownloadDataSize) > vs.concurrentDownloadLimit {
- glog.V(4).Infof("wait because inflight download data %d > %d", vs.inFlightDownloadDataSize, vs.concurrentDownloadLimit)
- vs.inFlightDownloadDataLimitCond.Wait()
+ select {
+ case <-r.Context().Done():
+ glog.V(4).Infof("request cancelled from %s: %v", r.RemoteAddr, r.Context().Err())
+ return
+ default:
+ glog.V(4).Infof("wait because inflight download data %d > %d", vs.inFlightDownloadDataSize, vs.concurrentDownloadLimit)
+ vs.inFlightDownloadDataLimitCond.Wait()
+ }
}
vs.inFlightDownloadDataLimitCond.L.Unlock()
vs.GetOrHeadHandler(w, r)
@@ -51,16 +58,18 @@ func (vs *VolumeServer) privateStoreHandler(w http.ResponseWriter, r *http.Reque
// wait until in flight data is less than the limit
contentLength := getContentLength(r)
- vs.inFlightUploadDataLimitCond.L.Lock()
- for vs.concurrentUploadLimit != 0 && atomic.LoadInt64(&vs.inFlightUploadDataSize) > vs.concurrentUploadLimit {
- glog.V(4).Infof("wait because inflight upload data %d > %d", vs.inFlightUploadDataSize, vs.concurrentUploadLimit)
- vs.inFlightUploadDataLimitCond.Wait()
+
+ // exclude the replication from the concurrentUploadLimitMB
+ if vs.concurrentUploadLimit != 0 && r.URL.Query().Get("type") != "replicate" &&
+ atomic.LoadInt64(&vs.inFlightUploadDataSize) > vs.concurrentUploadLimit {
+ err := fmt.Errorf("reject because inflight upload data %d > %d", vs.inFlightUploadDataSize, vs.concurrentUploadLimit)
+ glog.V(1).Infof("too many requests: %v", err)
+ writeJsonError(w, r, http.StatusTooManyRequests, err)
+ return
}
- vs.inFlightUploadDataLimitCond.L.Unlock()
atomic.AddInt64(&vs.inFlightUploadDataSize, contentLength)
defer func() {
atomic.AddInt64(&vs.inFlightUploadDataSize, -contentLength)
- vs.inFlightUploadDataLimitCond.Signal()
}()
// processs uploads
diff --git a/weed/server/volume_server_handlers_admin.go b/weed/server/volume_server_handlers_admin.go
index 7e6c06871..37cf109e2 100644
--- a/weed/server/volume_server_handlers_admin.go
+++ b/weed/server/volume_server_handlers_admin.go
@@ -1,6 +1,7 @@
package weed_server
import (
+ "github.com/chrislusf/seaweedfs/weed/topology"
"net/http"
"path/filepath"
@@ -9,6 +10,24 @@ import (
"github.com/chrislusf/seaweedfs/weed/util"
)
+func (vs *VolumeServer) healthzHandler(w http.ResponseWriter, r *http.Request) {
+ w.Header().Set("Server", "SeaweedFS Volume "+util.VERSION)
+ volumeInfos := vs.store.VolumeInfos()
+ for _, vinfo := range volumeInfos {
+ if len(vinfo.Collection) == 0 {
+ continue
+ }
+ if vinfo.ReplicaPlacement.GetCopyCount() > 1 {
+ _, err := topology.GetWritableRemoteReplications(vs.store, vs.grpcDialOption, vinfo.Id, vs.GetMaster)
+ if err != nil {
+ w.WriteHeader(http.StatusServiceUnavailable)
+ return
+ }
+ }
+ }
+ w.WriteHeader(http.StatusOK)
+}
+
func (vs *VolumeServer) statusHandler(w http.ResponseWriter, r *http.Request) {
w.Header().Set("Server", "SeaweedFS Volume "+util.VERSION)
m := make(map[string]interface{})
diff --git a/weed/server/volume_server_handlers_read.go b/weed/server/volume_server_handlers_read.go
index 5ce2278bf..203f6c07d 100644
--- a/weed/server/volume_server_handlers_read.go
+++ b/weed/server/volume_server_handlers_read.go
@@ -6,6 +6,7 @@ import (
"errors"
"fmt"
"github.com/chrislusf/seaweedfs/weed/storage/types"
+ "github.com/chrislusf/seaweedfs/weed/util/mem"
"io"
"mime"
"net/http"
@@ -101,7 +102,9 @@ func (vs *VolumeServer) GetOrHeadHandler(w http.ResponseWriter, r *http.Request)
}
}
w.WriteHeader(response.StatusCode)
- io.Copy(w, response.Body)
+ buf := mem.Allocate(128 * 1024)
+ defer mem.Free(buf)
+ io.CopyBuffer(w, response.Body, buf)
return
} else {
// redirect
diff --git a/weed/server/webdav_server.go b/weed/server/webdav_server.go
index 018daed8b..267c3e1f0 100644
--- a/weed/server/webdav_server.go
+++ b/weed/server/webdav_server.go
@@ -5,7 +5,6 @@ import (
"context"
"fmt"
"io"
- "math"
"os"
"path"
"strings"
@@ -540,11 +539,11 @@ func (f *WebDavFile) Read(p []byte) (readSize int, err error) {
return 0, io.EOF
}
if f.entryViewCache == nil {
- f.entryViewCache, _ = filer.NonOverlappingVisibleIntervals(filer.LookupFn(f.fs), f.entry.Chunks, 0, math.MaxInt64)
+ f.entryViewCache, _ = filer.NonOverlappingVisibleIntervals(filer.LookupFn(f.fs), f.entry.Chunks, 0, fileSize)
f.reader = nil
}
if f.reader == nil {
- chunkViews := filer.ViewFromVisibleIntervals(f.entryViewCache, 0, math.MaxInt64)
+ chunkViews := filer.ViewFromVisibleIntervals(f.entryViewCache, 0, fileSize)
f.reader = filer.NewChunkReaderAtFromClient(filer.LookupFn(f.fs), chunkViews, f.fs.chunkCache, fileSize)
}