aboutsummaryrefslogtreecommitdiff
path: root/weed/server
diff options
context:
space:
mode:
Diffstat (limited to 'weed/server')
-rw-r--r--weed/server/common.go41
-rw-r--r--weed/server/filer_grpc_server.go9
-rw-r--r--weed/server/filer_grpc_server_admin.go2
-rw-r--r--weed/server/filer_grpc_server_rename.go2
-rw-r--r--weed/server/filer_grpc_server_traverse_meta.go84
-rw-r--r--weed/server/filer_grpc_server_traverse_meta_test.go31
-rw-r--r--weed/server/filer_server.go6
-rw-r--r--weed/server/filer_server_handlers_proxy.go17
-rw-r--r--weed/server/filer_server_handlers_read.go12
-rw-r--r--weed/server/filer_server_handlers_read_dir.go8
-rw-r--r--weed/server/filer_server_handlers_write.go5
-rw-r--r--weed/server/filer_server_handlers_write_autochunk.go21
-rw-r--r--weed/server/filer_server_handlers_write_cipher.go8
-rw-r--r--weed/server/filer_server_handlers_write_upload.go8
-rw-r--r--weed/server/filer_ui/breadcrumb.go3
-rw-r--r--weed/server/filer_ui/breadcrumb_test.go86
-rw-r--r--weed/server/filer_ui/filer.html6
-rw-r--r--weed/server/master_grpc_server_assign.go15
-rw-r--r--weed/server/master_grpc_server_volume.go58
-rw-r--r--weed/server/master_server.go36
-rw-r--r--weed/server/master_server_handlers.go32
-rw-r--r--weed/server/master_server_handlers_admin.go22
-rw-r--r--weed/server/raft_hashicorp.go96
-rw-r--r--weed/server/raft_server.go3
-rw-r--r--weed/server/volume_grpc_remote.go13
-rw-r--r--weed/server/volume_server_handlers_read.go29
-rw-r--r--weed/server/volume_server_handlers_write.go6
-rw-r--r--weed/server/webdav_server.go30
28 files changed, 496 insertions, 193 deletions
diff --git a/weed/server/common.go b/weed/server/common.go
index a7d67fb2e..e6f6cdb88 100644
--- a/weed/server/common.go
+++ b/weed/server/common.go
@@ -127,7 +127,7 @@ func debug(params ...interface{}) {
func submitForClientHandler(w http.ResponseWriter, r *http.Request, masterFn operation.GetMasterFn, grpcDialOption grpc.DialOption) {
m := make(map[string]interface{})
- if r.Method != "POST" {
+ if r.Method != http.MethodPost {
writeJsonError(w, r, http.StatusMethodNotAllowed, errors.New("Only submit via POST!"))
return
}
@@ -181,7 +181,12 @@ func submitForClientHandler(w http.ResponseWriter, r *http.Request, masterFn ope
PairMap: pu.PairMap,
Jwt: assignResult.Auth,
}
- uploadResult, err := operation.UploadData(pu.Data, uploadOption)
+ uploader, err := operation.NewUploader()
+ if err != nil {
+ writeJsonError(w, r, http.StatusInternalServerError, err)
+ return
+ }
+ uploadResult, err := uploader.UploadData(pu.Data, uploadOption)
if err != nil {
writeJsonError(w, r, http.StatusInternalServerError, err)
return
@@ -260,7 +265,7 @@ func handleStaticResources2(r *mux.Router) {
r.PathPrefix("/seaweedfsstatic/").Handler(http.StripPrefix("/seaweedfsstatic", http.FileServer(http.FS(StaticFS))))
}
-func adjustPassthroughHeaders(w http.ResponseWriter, r *http.Request, filename string) {
+func AdjustPassthroughHeaders(w http.ResponseWriter, r *http.Request, filename string) {
for header, values := range r.Header {
if normalizedHeader, ok := s3_constants.PassThroughHeaders[strings.ToLower(header)]; ok {
w.Header()[normalizedHeader] = values
@@ -284,7 +289,7 @@ func adjustHeaderContentDisposition(w http.ResponseWriter, r *http.Request, file
}
}
-func processRangeRequest(r *http.Request, w http.ResponseWriter, totalSize int64, mimeType string, prepareWriteFn func(offset int64, size int64) (filer.DoStreamContent, error)) error {
+func ProcessRangeRequest(r *http.Request, w http.ResponseWriter, totalSize int64, mimeType string, prepareWriteFn func(offset int64, size int64) (filer.DoStreamContent, error)) error {
rangeReq := r.Header.Get("Range")
bufferedWriter := writePool.Get().(*bufio.Writer)
bufferedWriter.Reset(w)
@@ -297,14 +302,14 @@ func processRangeRequest(r *http.Request, w http.ResponseWriter, totalSize int64
w.Header().Set("Content-Length", strconv.FormatInt(totalSize, 10))
writeFn, err := prepareWriteFn(0, totalSize)
if err != nil {
- glog.Errorf("processRangeRequest: %v", err)
+ glog.Errorf("ProcessRangeRequest: %v", err)
http.Error(w, err.Error(), http.StatusInternalServerError)
- return fmt.Errorf("processRangeRequest: %v", err)
+ return fmt.Errorf("ProcessRangeRequest: %v", err)
}
if err = writeFn(bufferedWriter); err != nil {
- glog.Errorf("processRangeRequest: %v", err)
+ glog.Errorf("ProcessRangeRequest: %v", err)
http.Error(w, err.Error(), http.StatusInternalServerError)
- return fmt.Errorf("processRangeRequest: %v", err)
+ return fmt.Errorf("ProcessRangeRequest: %v", err)
}
return nil
}
@@ -313,9 +318,9 @@ func processRangeRequest(r *http.Request, w http.ResponseWriter, totalSize int64
//mostly copy from src/pkg/net/http/fs.go
ranges, err := parseRange(rangeReq, totalSize)
if err != nil {
- glog.Errorf("processRangeRequest headers: %+v err: %v", w.Header(), err)
+ glog.Errorf("ProcessRangeRequest headers: %+v err: %v", w.Header(), err)
http.Error(w, err.Error(), http.StatusRequestedRangeNotSatisfiable)
- return fmt.Errorf("processRangeRequest header: %v", err)
+ return fmt.Errorf("ProcessRangeRequest header: %v", err)
}
if sumRangesSize(ranges) > totalSize {
// The total number of bytes in all the ranges
@@ -345,16 +350,16 @@ func processRangeRequest(r *http.Request, w http.ResponseWriter, totalSize int64
writeFn, err := prepareWriteFn(ra.start, ra.length)
if err != nil {
- glog.Errorf("processRangeRequest range[0]: %+v err: %v", w.Header(), err)
+ glog.Errorf("ProcessRangeRequest range[0]: %+v err: %v", w.Header(), err)
http.Error(w, err.Error(), http.StatusInternalServerError)
- return fmt.Errorf("processRangeRequest: %v", err)
+ return fmt.Errorf("ProcessRangeRequest: %v", err)
}
w.WriteHeader(http.StatusPartialContent)
err = writeFn(bufferedWriter)
if err != nil {
- glog.Errorf("processRangeRequest range[0]: %+v err: %v", w.Header(), err)
+ glog.Errorf("ProcessRangeRequest range[0]: %+v err: %v", w.Header(), err)
http.Error(w, err.Error(), http.StatusInternalServerError)
- return fmt.Errorf("processRangeRequest range[0]: %v", err)
+ return fmt.Errorf("ProcessRangeRequest range[0]: %v", err)
}
return nil
}
@@ -369,9 +374,9 @@ func processRangeRequest(r *http.Request, w http.ResponseWriter, totalSize int64
}
writeFn, err := prepareWriteFn(ra.start, ra.length)
if err != nil {
- glog.Errorf("processRangeRequest range[%d] err: %v", i, err)
+ glog.Errorf("ProcessRangeRequest range[%d] err: %v", i, err)
http.Error(w, "Internal Error", http.StatusInternalServerError)
- return fmt.Errorf("processRangeRequest range[%d] err: %v", i, err)
+ return fmt.Errorf("ProcessRangeRequest range[%d] err: %v", i, err)
}
writeFnByRange[i] = writeFn
}
@@ -406,9 +411,9 @@ func processRangeRequest(r *http.Request, w http.ResponseWriter, totalSize int64
}
w.WriteHeader(http.StatusPartialContent)
if _, err := io.CopyN(bufferedWriter, sendContent, sendSize); err != nil {
- glog.Errorf("processRangeRequest err: %v", err)
+ glog.Errorf("ProcessRangeRequest err: %v", err)
http.Error(w, "Internal Error", http.StatusInternalServerError)
- return fmt.Errorf("processRangeRequest err: %v", err)
+ return fmt.Errorf("ProcessRangeRequest err: %v", err)
}
return nil
}
diff --git a/weed/server/filer_grpc_server.go b/weed/server/filer_grpc_server.go
index b9571710d..b1440c94f 100644
--- a/weed/server/filer_grpc_server.go
+++ b/weed/server/filer_grpc_server.go
@@ -291,7 +291,7 @@ func (fs *FilerServer) DeleteEntry(ctx context.Context, req *filer_pb.DeleteEntr
glog.V(4).Infof("DeleteEntry %v", req)
- err = fs.filer.DeleteEntryMetaAndData(ctx, util.JoinPath(req.Directory, req.Name), req.IsRecursive, req.IgnoreRecursiveError, req.IsDeleteData, req.IsFromOtherCluster, req.Signatures)
+ err = fs.filer.DeleteEntryMetaAndData(ctx, util.JoinPath(req.Directory, req.Name), req.IsRecursive, req.IgnoreRecursiveError, req.IsDeleteData, req.IsFromOtherCluster, req.Signatures, req.IfNotModifiedAfter)
resp = &filer_pb.DeleteEntryResponse{}
if err != nil && err != filer_pb.ErrNotFound {
resp.Error = err.Error()
@@ -363,12 +363,7 @@ func (fs *FilerServer) DeleteCollection(ctx context.Context, req *filer_pb.Delet
glog.V(4).Infof("DeleteCollection %v", req)
- err = fs.filer.MasterClient.WithClient(false, func(client master_pb.SeaweedClient) error {
- _, err := client.CollectionDelete(context.Background(), &master_pb.CollectionDeleteRequest{
- Name: req.GetCollection(),
- })
- return err
- })
+ err = fs.filer.DoDeleteCollection(req.GetCollection())
return &filer_pb.DeleteCollectionResponse{}, err
}
diff --git a/weed/server/filer_grpc_server_admin.go b/weed/server/filer_grpc_server_admin.go
index b4caaf4e2..8b4912258 100644
--- a/weed/server/filer_grpc_server_admin.go
+++ b/weed/server/filer_grpc_server_admin.go
@@ -96,6 +96,8 @@ func (fs *FilerServer) GetFilerConfiguration(ctx context.Context, req *filer_pb.
MetricsIntervalSec: int32(fs.metricsIntervalSec),
Version: util.Version(),
FilerGroup: fs.option.FilerGroup,
+ MajorVersion: util.MAJOR_VERSION,
+ MinorVersion: util.MINOR_VERSION,
}
glog.V(4).Infof("GetFilerConfiguration: %v", t)
diff --git a/weed/server/filer_grpc_server_rename.go b/weed/server/filer_grpc_server_rename.go
index 3acea6f14..db00dd496 100644
--- a/weed/server/filer_grpc_server_rename.go
+++ b/weed/server/filer_grpc_server_rename.go
@@ -203,7 +203,7 @@ func (fs *FilerServer) moveSelfEntry(ctx context.Context, stream filer_pb.Seawee
// delete old entry
ctx = context.WithValue(ctx, "OP", "MV")
- deleteErr := fs.filer.DeleteEntryMetaAndData(ctx, oldPath, false, false, false, false, signatures)
+ deleteErr := fs.filer.DeleteEntryMetaAndData(ctx, oldPath, false, false, false, false, signatures, 0)
if deleteErr != nil {
return deleteErr
}
diff --git a/weed/server/filer_grpc_server_traverse_meta.go b/weed/server/filer_grpc_server_traverse_meta.go
new file mode 100644
index 000000000..4a924f065
--- /dev/null
+++ b/weed/server/filer_grpc_server_traverse_meta.go
@@ -0,0 +1,84 @@
+package weed_server
+
+import (
+ "context"
+ "fmt"
+ "github.com/seaweedfs/seaweedfs/weed/filer"
+ "github.com/seaweedfs/seaweedfs/weed/glog"
+ "github.com/seaweedfs/seaweedfs/weed/pb/filer_pb"
+ "github.com/seaweedfs/seaweedfs/weed/util"
+ "github.com/viant/ptrie"
+)
+
+func (fs *FilerServer) TraverseBfsMetadata(req *filer_pb.TraverseBfsMetadataRequest, stream filer_pb.SeaweedFiler_TraverseBfsMetadataServer) error {
+
+ glog.V(0).Infof("TraverseBfsMetadata %v", req)
+
+ excludedTrie := ptrie.New[bool]()
+ for _, excluded := range req.ExcludedPrefixes {
+ excludedTrie.Put([]byte(excluded), true)
+ }
+
+ ctx := stream.Context()
+
+ queue := util.NewQueue[*filer.Entry]()
+ dirEntry, err := fs.filer.FindEntry(ctx, util.FullPath(req.Directory))
+ if err != nil {
+ return fmt.Errorf("find dir %s: %v", req.Directory, err)
+ }
+ queue.Enqueue(dirEntry)
+
+ for item := queue.Dequeue(); item != nil; item = queue.Dequeue() {
+ if excludedTrie.MatchPrefix([]byte(item.FullPath), func(key []byte, value bool) bool {
+ return true
+ }) {
+ // println("excluded", item.FullPath)
+ continue
+ }
+ parent, _ := item.FullPath.DirAndName()
+ if err := stream.Send(&filer_pb.TraverseBfsMetadataResponse{
+ Directory: parent,
+ Entry: item.ToProtoEntry(),
+ }); err != nil {
+ return fmt.Errorf("send traverse bfs metadata response: %v", err)
+ }
+
+ if !item.IsDirectory() {
+ continue
+ }
+
+ if err := fs.iterateDirectory(ctx, item.FullPath, func(entry *filer.Entry) error {
+ queue.Enqueue(entry)
+ return nil
+ }); err != nil {
+ return err
+ }
+ }
+
+ return nil
+}
+
+func (fs *FilerServer) iterateDirectory(ctx context.Context, dirPath util.FullPath, fn func(entry *filer.Entry) error) (err error) {
+ var lastFileName string
+ var listErr error
+ for {
+ var hasEntries bool
+ lastFileName, listErr = fs.filer.StreamListDirectoryEntries(ctx, dirPath, lastFileName, false, 1024, "", "", "", func(entry *filer.Entry) bool {
+ hasEntries = true
+ if fnErr := fn(entry); fnErr != nil {
+ err = fnErr
+ return false
+ }
+ return true
+ })
+ if listErr != nil {
+ return listErr
+ }
+ if err != nil {
+ return err
+ }
+ if !hasEntries {
+ return nil
+ }
+ }
+}
diff --git a/weed/server/filer_grpc_server_traverse_meta_test.go b/weed/server/filer_grpc_server_traverse_meta_test.go
new file mode 100644
index 000000000..72f8a916e
--- /dev/null
+++ b/weed/server/filer_grpc_server_traverse_meta_test.go
@@ -0,0 +1,31 @@
+package weed_server
+
+import (
+ "github.com/stretchr/testify/assert"
+ "github.com/viant/ptrie"
+ "testing"
+)
+
+func TestPtrie(t *testing.T) {
+ b := []byte("/topics/abc/dev")
+ excludedTrie := ptrie.New[bool]()
+ excludedTrie.Put([]byte("/topics/abc/d"), true)
+ excludedTrie.Put([]byte("/topics/abc"), true)
+
+ assert.True(t, excludedTrie.MatchPrefix(b, func(key []byte, value bool) bool {
+ println("matched1", string(key))
+ return true
+ }))
+
+ assert.True(t, excludedTrie.MatchAll(b, func(key []byte, value bool) bool {
+ println("matched2", string(key))
+ return true
+ }))
+
+ assert.False(t, excludedTrie.MatchAll([]byte("/topics/ab"), func(key []byte, value bool) bool {
+ println("matched3", string(key))
+ return true
+ }))
+
+ assert.False(t, excludedTrie.Has(b))
+}
diff --git a/weed/server/filer_server.go b/weed/server/filer_server.go
index 0b7254c0d..ee052579c 100644
--- a/weed/server/filer_server.go
+++ b/weed/server/filer_server.go
@@ -74,7 +74,6 @@ type FilerOption struct {
DiskType string
AllowedOrigins []string
ExposeDirectoryData bool
- JoinExistingFiler bool
}
type FilerServer struct {
@@ -198,12 +197,9 @@ func NewFilerServer(defaultMux, readonlyMux *http.ServeMux, option *FilerOption)
existingNodes := fs.filer.ListExistingPeerUpdates(context.Background())
startFromTime := time.Now().Add(-filer.LogFlushInterval)
- if option.JoinExistingFiler {
- startFromTime = time.Time{}
- }
if isFresh {
glog.V(0).Infof("%s bootstrap from peers %+v", option.Host, existingNodes)
- if err := fs.filer.MaybeBootstrapFromPeers(option.Host, existingNodes, startFromTime); err != nil {
+ if err := fs.filer.MaybeBootstrapFromOnePeer(option.Host, existingNodes, startFromTime); err != nil {
glog.Fatalf("%s bootstrap from %+v: %v", option.Host, existingNodes, err)
}
}
diff --git a/weed/server/filer_server_handlers_proxy.go b/weed/server/filer_server_handlers_proxy.go
index e04994569..c1a26ca11 100644
--- a/weed/server/filer_server_handlers_proxy.go
+++ b/weed/server/filer_server_handlers_proxy.go
@@ -3,24 +3,13 @@ package weed_server
import (
"github.com/seaweedfs/seaweedfs/weed/glog"
"github.com/seaweedfs/seaweedfs/weed/security"
- "github.com/seaweedfs/seaweedfs/weed/util"
"github.com/seaweedfs/seaweedfs/weed/util/mem"
"io"
"math/rand"
"net/http"
+ util_http "github.com/seaweedfs/seaweedfs/weed/util/http"
)
-var (
- client *http.Client
-)
-
-func init() {
- client = &http.Client{Transport: &http.Transport{
- MaxIdleConns: 1024,
- MaxIdleConnsPerHost: 1024,
- }}
-}
-
func (fs *FilerServer) maybeAddVolumeJwtAuthorization(r *http.Request, fileId string, isWrite bool) {
encodedJwt := fs.maybeGetVolumeJwtAuthorizationToken(fileId, isWrite)
@@ -71,14 +60,14 @@ func (fs *FilerServer) proxyToVolumeServer(w http.ResponseWriter, r *http.Reques
}
}
- proxyResponse, postErr := client.Do(proxyReq)
+ proxyResponse, postErr := util_http.GetGlobalHttpClient().Do(proxyReq)
if postErr != nil {
glog.Errorf("post to filer: %v", postErr)
w.WriteHeader(http.StatusInternalServerError)
return
}
- defer util.CloseResponse(proxyResponse)
+ defer util_http.CloseResponse(proxyResponse)
for k, v := range proxyResponse.Header {
w.Header()[k] = v
diff --git a/weed/server/filer_server_handlers_read.go b/weed/server/filer_server_handlers_read.go
index 123b7a494..a02e6c2c1 100644
--- a/weed/server/filer_server_handlers_read.go
+++ b/weed/server/filer_server_handlers_read.go
@@ -71,14 +71,14 @@ func checkPreconditions(w http.ResponseWriter, r *http.Request, entry *filer.Ent
ifModifiedSinceHeader := r.Header.Get("If-Modified-Since")
if ifNoneMatchETagHeader != "" {
if util.CanonicalizeETag(etag) == util.CanonicalizeETag(ifNoneMatchETagHeader) {
- setEtag(w, etag)
+ SetEtag(w, etag)
w.WriteHeader(http.StatusNotModified)
return true
}
} else if ifModifiedSinceHeader != "" {
if t, parseError := time.Parse(http.TimeFormat, ifModifiedSinceHeader); parseError == nil {
if !t.Before(entry.Attr.Mtime) {
- setEtag(w, etag)
+ SetEtag(w, etag)
w.WriteHeader(http.StatusNotModified)
return true
}
@@ -220,13 +220,13 @@ func (fs *FilerServer) GetOrHeadHandler(w http.ResponseWriter, r *http.Request)
w.Header().Set(s3_constants.AmzTagCount, strconv.Itoa(tagCount))
}
- setEtag(w, etag)
+ SetEtag(w, etag)
filename := entry.Name()
- adjustPassthroughHeaders(w, r, filename)
+ AdjustPassthroughHeaders(w, r, filename)
totalSize := int64(entry.Size())
- if r.Method == "HEAD" {
+ if r.Method == http.MethodHead {
w.Header().Set("Content-Length", strconv.FormatInt(totalSize, 10))
return
}
@@ -252,7 +252,7 @@ func (fs *FilerServer) GetOrHeadHandler(w http.ResponseWriter, r *http.Request)
}
}
- processRangeRequest(r, w, totalSize, mimeType, func(offset int64, size int64) (filer.DoStreamContent, error) {
+ ProcessRangeRequest(r, w, totalSize, mimeType, func(offset int64, size int64) (filer.DoStreamContent, error) {
if offset+size <= int64(len(entry.Content)) {
return func(writer io.Writer) error {
_, err := writer.Write(entry.Content[offset : offset+size])
diff --git a/weed/server/filer_server_handlers_read_dir.go b/weed/server/filer_server_handlers_read_dir.go
index 0531527bb..56f0f9cb4 100644
--- a/weed/server/filer_server_handlers_read_dir.go
+++ b/weed/server/filer_server_handlers_read_dir.go
@@ -31,8 +31,8 @@ func (fs *FilerServer) listDirectoryHandler(w http.ResponseWriter, r *http.Reque
path = path[:len(path)-1]
}
- limit, limit_err := strconv.Atoi(r.FormValue("limit"))
- if limit_err != nil {
+ limit, limitErr := strconv.Atoi(r.FormValue("limit"))
+ if limitErr != nil {
limit = fs.option.DirListingLimit
}
@@ -62,6 +62,7 @@ func (fs *FilerServer) listDirectoryHandler(w http.ResponseWriter, r *http.Reque
if r.Header.Get("Accept") == "application/json" {
writeJsonQuiet(w, r, http.StatusOK, struct {
+ Version string
Path string
Entries interface{}
Limit int
@@ -69,6 +70,7 @@ func (fs *FilerServer) listDirectoryHandler(w http.ResponseWriter, r *http.Reque
ShouldDisplayLoadMore bool
EmptyFolder bool
}{
+ util.Version(),
path,
entries,
limit,
@@ -80,6 +82,7 @@ func (fs *FilerServer) listDirectoryHandler(w http.ResponseWriter, r *http.Reque
}
err = ui.StatusTpl.Execute(w, struct {
+ Version string
Path string
Breadcrumbs []ui.Breadcrumb
Entries interface{}
@@ -89,6 +92,7 @@ func (fs *FilerServer) listDirectoryHandler(w http.ResponseWriter, r *http.Reque
EmptyFolder bool
ShowDirectoryDelete bool
}{
+ util.Version(),
path,
ui.ToBreadcrumb(path),
entries,
diff --git a/weed/server/filer_server_handlers_write.go b/weed/server/filer_server_handlers_write.go
index b186fd34e..f0f756e34 100644
--- a/weed/server/filer_server_handlers_write.go
+++ b/weed/server/filer_server_handlers_write.go
@@ -18,6 +18,7 @@ import (
"github.com/seaweedfs/seaweedfs/weed/stats"
"github.com/seaweedfs/seaweedfs/weed/storage/needle"
"github.com/seaweedfs/seaweedfs/weed/util"
+ util_http "github.com/seaweedfs/seaweedfs/weed/util/http"
)
var (
@@ -120,7 +121,7 @@ func (fs *FilerServer) PostHandler(w http.ResponseWriter, r *http.Request, conte
fs.autoChunk(ctx, w, r, contentLength, so)
}
- util.CloseRequest(r)
+ util_http.CloseRequest(r)
}
@@ -211,7 +212,7 @@ func (fs *FilerServer) DeleteHandler(w http.ResponseWriter, r *http.Request) {
objectPath = objectPath[0 : len(objectPath)-1]
}
- err := fs.filer.DeleteEntryMetaAndData(context.Background(), util.FullPath(objectPath), isRecursive, ignoreRecursiveError, !skipChunkDeletion, false, nil)
+ err := fs.filer.DeleteEntryMetaAndData(context.Background(), util.FullPath(objectPath), isRecursive, ignoreRecursiveError, !skipChunkDeletion, false, nil, 0)
if err != nil {
if err == filer_pb.ErrNotFound {
writeJsonQuiet(w, r, http.StatusNoContent, nil)
diff --git a/weed/server/filer_server_handlers_write_autochunk.go b/weed/server/filer_server_handlers_write_autochunk.go
index 2698e2209..1c7ed0c3c 100644
--- a/weed/server/filer_server_handlers_write_autochunk.go
+++ b/weed/server/filer_server_handlers_write_autochunk.go
@@ -39,7 +39,7 @@ func (fs *FilerServer) autoChunk(ctx context.Context, w http.ResponseWriter, r *
var reply *FilerPostResult
var err error
var md5bytes []byte
- if r.Method == "POST" {
+ if r.Method == http.MethodPost {
if r.Header.Get("Content-Type") == "" && strings.HasSuffix(r.URL.Path, "/") {
reply, err = fs.mkdir(ctx, w, r, so)
} else {
@@ -148,6 +148,10 @@ func skipCheckParentDirEntry(r *http.Request) bool {
return r.URL.Query().Get("skipCheckParentDir") == "true"
}
+func isS3Request(r *http.Request) bool {
+ return r.Header.Get(s3_constants.AmzAuthType) != "" || r.Header.Get("X-Amz-Date") != ""
+}
+
func (fs *FilerServer) saveMetaData(ctx context.Context, r *http.Request, fileName string, contentType string, so *operation.StorageOption, md5bytes []byte, fileChunks []*filer_pb.FileChunk, chunkOffset int64, content []byte) (filerResult *FilerPostResult, replyerr error) {
// detect file mode
@@ -266,7 +270,12 @@ func (fs *FilerServer) saveMetaData(ctx context.Context, r *http.Request, fileNa
}
}
- if dbErr := fs.filer.CreateEntry(ctx, entry, false, false, nil, skipCheckParentDirEntry(r), so.MaxFileNameLength); dbErr != nil {
+ dbErr := fs.filer.CreateEntry(ctx, entry, false, false, nil, skipCheckParentDirEntry(r), so.MaxFileNameLength)
+ // In test_bucket_listv2_delimiter_basic, the valid object key is the parent folder
+ if dbErr != nil && strings.HasSuffix(dbErr.Error(), " is a file") && isS3Request(r) {
+ dbErr = fs.filer.CreateEntry(ctx, entry, false, false, nil, true, so.MaxFileNameLength)
+ }
+ if dbErr != nil {
replyerr = dbErr
filerResult.Error = dbErr.Error()
glog.V(0).Infof("failing to write %s to filer server : %v", path, dbErr)
@@ -299,8 +308,14 @@ func (fs *FilerServer) saveAsChunk(so *operation.StorageOption) filer.SaveDataAs
PairMap: nil,
Jwt: auth,
}
+
+ uploader, uploaderErr := operation.NewUploader()
+ if uploaderErr != nil {
+ return uploaderErr
+ }
+
var uploadErr error
- uploadResult, uploadErr, _ = operation.Upload(reader, uploadOption)
+ uploadResult, uploadErr, _ = uploader.Upload(reader, uploadOption)
if uploadErr != nil {
return uploadErr
}
diff --git a/weed/server/filer_server_handlers_write_cipher.go b/weed/server/filer_server_handlers_write_cipher.go
index 6cf7d65b1..f8d129bf3 100644
--- a/weed/server/filer_server_handlers_write_cipher.go
+++ b/weed/server/filer_server_handlers_write_cipher.go
@@ -53,7 +53,13 @@ func (fs *FilerServer) encrypt(ctx context.Context, w http.ResponseWriter, r *ht
PairMap: pu.PairMap,
Jwt: auth,
}
- uploadResult, uploadError := operation.UploadData(uncompressedData, uploadOption)
+
+ uploader, uploaderErr := operation.NewUploader()
+ if uploaderErr != nil {
+ return nil, fmt.Errorf("uploader initialization error: %v", uploaderErr)
+ }
+
+ uploadResult, uploadError := uploader.UploadData(uncompressedData, uploadOption)
if uploadError != nil {
return nil, fmt.Errorf("upload to volume server: %v", uploadError)
}
diff --git a/weed/server/filer_server_handlers_write_upload.go b/weed/server/filer_server_handlers_write_upload.go
index 8c8eba078..d0d1575cf 100644
--- a/weed/server/filer_server_handlers_write_upload.go
+++ b/weed/server/filer_server_handlers_write_upload.go
@@ -158,7 +158,13 @@ func (fs *FilerServer) doUpload(urlLocation string, limitedReader io.Reader, fil
PairMap: pairMap,
Jwt: auth,
}
- uploadResult, err, data := operation.Upload(limitedReader, uploadOption)
+
+ uploader, err := operation.NewUploader()
+ if err != nil {
+ return nil, err, []byte{}
+ }
+
+ uploadResult, err, data := uploader.Upload(limitedReader, uploadOption)
if uploadResult != nil && uploadResult.RetryCount > 0 {
stats.FilerHandlerCounter.WithLabelValues(stats.ChunkUploadRetry).Add(float64(uploadResult.RetryCount))
}
diff --git a/weed/server/filer_ui/breadcrumb.go b/weed/server/filer_ui/breadcrumb.go
index abb6cce9a..638638196 100644
--- a/weed/server/filer_ui/breadcrumb.go
+++ b/weed/server/filer_ui/breadcrumb.go
@@ -13,6 +13,9 @@ type Breadcrumb struct {
func ToBreadcrumb(fullpath string) (crumbs []Breadcrumb) {
parts := strings.Split(fullpath, "/")
+ if fullpath == "/" {
+ parts = []string{""}
+ }
for i := 0; i < len(parts); i++ {
name := parts[i]
diff --git a/weed/server/filer_ui/breadcrumb_test.go b/weed/server/filer_ui/breadcrumb_test.go
new file mode 100644
index 000000000..6e42541cb
--- /dev/null
+++ b/weed/server/filer_ui/breadcrumb_test.go
@@ -0,0 +1,86 @@
+package filer_ui
+
+import (
+ "reflect"
+ "testing"
+)
+
+func TestToBreadcrumb(t *testing.T) {
+ type args struct {
+ fullpath string
+ }
+ tests := []struct {
+ name string
+ args args
+ wantCrumbs []Breadcrumb
+ }{
+ {
+ name: "empty",
+ args: args{
+ fullpath: "",
+ },
+ wantCrumbs: []Breadcrumb{
+ {
+ Name: "/",
+ Link: "/",
+ },
+ },
+ },
+ {
+ name: "test1",
+ args: args{
+ fullpath: "/",
+ },
+ wantCrumbs: []Breadcrumb{
+ {
+ Name: "/",
+ Link: "/",
+ },
+ },
+ },
+ {
+ name: "test2",
+ args: args{
+ fullpath: "/abc",
+ },
+ wantCrumbs: []Breadcrumb{
+ {
+ Name: "/",
+ Link: "/",
+ },
+ {
+ Name: "abc",
+ Link: "/abc/",
+ },
+ },
+ },
+ {
+ name: "test3",
+ args: args{
+ fullpath: "/abc/def",
+ },
+ wantCrumbs: []Breadcrumb{
+ {
+ Name: "/",
+ Link: "/",
+ },
+ {
+ Name: "abc",
+ Link: "/abc/",
+ },
+ {
+ Name: "def",
+ Link: "/abc/def/",
+ },
+ },
+ },
+ }
+
+ for _, tt := range tests {
+ t.Run(tt.name, func(t *testing.T) {
+ if gotCrumbs := ToBreadcrumb(tt.args.fullpath); !reflect.DeepEqual(gotCrumbs, tt.wantCrumbs) {
+ t.Errorf("ToBreadcrumb() = %v, want %v", gotCrumbs, tt.wantCrumbs)
+ }
+ })
+ }
+}
diff --git a/weed/server/filer_ui/filer.html b/weed/server/filer_ui/filer.html
index 28425f180..627f3ba77 100644
--- a/weed/server/filer_ui/filer.html
+++ b/weed/server/filer_ui/filer.html
@@ -1,7 +1,7 @@
<!DOCTYPE html>
<html>
<head>
- <title>SeaweedFS Filer</title>
+ <title>SeaweedFS Filer {{ .Version }}</title>
<meta name="viewport" content="width=device-width, initial-scale=1">
<link rel="stylesheet" href="/seaweedfsstatic/bootstrap/3.3.1/css/bootstrap.min.css">
<style>
@@ -82,7 +82,7 @@
<div class="page-header">
<h1>
<a href="https://github.com/seaweedfs/seaweedfs"><img src="/seaweedfsstatic/seaweed50x50.png"></img></a>
- SeaweedFS Filer
+ SeaweedFS Filer <small>{{ .Version }}</small>
</h1>
</div>
<div class="row">
@@ -99,7 +99,7 @@
{{ range $entry := .Breadcrumbs }}
<li><a href="{{ printpath $entry.Link }}">
{{ $entry.Name }}
- </li></a>
+ </a></li>
{{ end }}
</ol>
</div>
diff --git a/weed/server/master_grpc_server_assign.go b/weed/server/master_grpc_server_assign.go
index 7455c9ea4..4f95b4ff6 100644
--- a/weed/server/master_grpc_server_assign.go
+++ b/weed/server/master_grpc_server_assign.go
@@ -4,6 +4,7 @@ import (
"context"
"fmt"
"github.com/seaweedfs/seaweedfs/weed/glog"
+ "github.com/seaweedfs/seaweedfs/weed/stats"
"time"
"github.com/seaweedfs/raft"
@@ -69,7 +70,12 @@ func (ms *MasterServer) Assign(ctx context.Context, req *master_pb.AssignRequest
MemoryMapMaxSizeMb: req.MemoryMapMaxSizeMb,
}
+ if !ms.Topo.DataCenterExists(option.DataCenter) {
+ return nil, fmt.Errorf("data center %v not found in topology", option.DataCenter)
+ }
+
vl := ms.Topo.GetVolumeLayout(option.Collection, option.ReplicaPlacement, option.Ttl, option.DiskType)
+ vl.SetLastGrowCount(req.WritableVolumeCount)
var (
lastErr error
@@ -80,18 +86,17 @@ func (ms *MasterServer) Assign(ctx context.Context, req *master_pb.AssignRequest
for time.Now().Sub(startTime) < maxTimeout {
fid, count, dnList, shouldGrow, err := ms.Topo.PickForWrite(req.Count, option, vl)
if shouldGrow && !vl.HasGrowRequest() {
- // if picked volume is almost full, trigger a volume-grow request
- if ms.Topo.AvailableSpaceFor(option) <= 0 {
- return nil, fmt.Errorf("no free volumes left for " + option.String())
+ if err != nil && ms.Topo.AvailableSpaceFor(option) <= 0 {
+ err = fmt.Errorf("%s and no free volumes left for %s", err.Error(), option.String())
}
vl.AddGrowRequest()
ms.volumeGrowthRequestChan <- &topology.VolumeGrowRequest{
Option: option,
- Count: int(req.WritableVolumeCount),
+ Count: req.WritableVolumeCount,
}
}
if err != nil {
- // glog.Warningf("PickForWrite %+v: %v", req, err)
+ stats.MasterPickForWriteErrorCounter.Inc()
lastErr = err
time.Sleep(200 * time.Millisecond)
continue
diff --git a/weed/server/master_grpc_server_volume.go b/weed/server/master_grpc_server_volume.go
index 503da7fd4..3cad627db 100644
--- a/weed/server/master_grpc_server_volume.go
+++ b/weed/server/master_grpc_server_volume.go
@@ -3,11 +3,14 @@ package weed_server
import (
"context"
"fmt"
+ "math/rand"
"reflect"
"strings"
"sync"
"time"
+ "github.com/seaweedfs/seaweedfs/weed/topology"
+
"github.com/seaweedfs/raft"
"github.com/seaweedfs/seaweedfs/weed/glog"
@@ -18,8 +21,39 @@ import (
"github.com/seaweedfs/seaweedfs/weed/storage/types"
)
+func (ms *MasterServer) DoAutomaticVolumeGrow(req *topology.VolumeGrowRequest) {
+ glog.V(1).Infoln("starting automatic volume grow")
+ start := time.Now()
+ newVidLocations, err := ms.vg.AutomaticGrowByType(req.Option, ms.grpcDialOption, ms.Topo, req.Count)
+ glog.V(1).Infoln("finished automatic volume grow, cost ", time.Now().Sub(start))
+ if err != nil {
+ glog.V(1).Infof("automatic volume grow failed: %+v", err)
+ return
+ }
+ for _, newVidLocation := range newVidLocations {
+ ms.broadcastToClients(&master_pb.KeepConnectedResponse{VolumeLocation: newVidLocation})
+ }
+}
+
func (ms *MasterServer) ProcessGrowRequest() {
go func() {
+ for {
+ time.Sleep(14*time.Minute + time.Duration(120*rand.Float32())*time.Second)
+ if !ms.Topo.IsLeader() {
+ continue
+ }
+ for _, vl := range ms.Topo.ListVolumeLyauts() {
+ if !vl.HasGrowRequest() && vl.ShouldGrowVolumes(&topology.VolumeGrowOption{}) {
+ vl.AddGrowRequest()
+ ms.volumeGrowthRequestChan <- &topology.VolumeGrowRequest{
+ Option: vl.ToGrowOption(),
+ Count: vl.GetLastGrowCount(),
+ }
+ }
+ }
+ }
+ }()
+ go func() {
filter := sync.Map{}
for {
req, ok := <-ms.volumeGrowthRequestChan
@@ -27,9 +61,13 @@ func (ms *MasterServer) ProcessGrowRequest() {
break
}
+ option := req.Option
+ vl := ms.Topo.GetVolumeLayout(option.Collection, option.ReplicaPlacement, option.Ttl, option.DiskType)
+
if !ms.Topo.IsLeader() {
//discard buffered requests
time.Sleep(time.Second * 1)
+ vl.DoneGrowRequest()
continue
}
@@ -42,28 +80,15 @@ func (ms *MasterServer) ProcessGrowRequest() {
return !found
})
- option := req.Option
- vl := ms.Topo.GetVolumeLayout(option.Collection, option.ReplicaPlacement, option.Ttl, option.DiskType)
-
// not atomic but it's okay
if !found && vl.ShouldGrowVolumes(option) {
filter.Store(req, nil)
// we have lock called inside vg
- go func() {
- glog.V(1).Infoln("starting automatic volume grow")
- start := time.Now()
- newVidLocations, err := ms.vg.AutomaticGrowByType(req.Option, ms.grpcDialOption, ms.Topo, req.Count)
- glog.V(1).Infoln("finished automatic volume grow, cost ", time.Now().Sub(start))
- if err == nil {
- for _, newVidLocation := range newVidLocations {
- ms.broadcastToClients(&master_pb.KeepConnectedResponse{VolumeLocation: newVidLocation})
- }
- }
+ go func(req *topology.VolumeGrowRequest, vl *topology.VolumeLayout) {
+ ms.DoAutomaticVolumeGrow(req)
vl.DoneGrowRequest()
-
filter.Delete(req)
- }()
-
+ }(req, vl)
} else {
glog.V(4).Infoln("discard volume grow request")
time.Sleep(time.Millisecond * 211)
@@ -91,6 +116,7 @@ func (ms *MasterServer) LookupVolume(ctx context.Context, req *master_pb.LookupV
Url: loc.Url,
PublicUrl: loc.PublicUrl,
DataCenter: loc.DataCenter,
+ GrpcPort: uint32(loc.GrpcPort),
})
}
var auth string
diff --git a/weed/server/master_server.go b/weed/server/master_server.go
index 3499a2e13..44a1664c0 100644
--- a/weed/server/master_server.go
+++ b/weed/server/master_server.go
@@ -30,6 +30,7 @@ import (
"github.com/seaweedfs/seaweedfs/weed/topology"
"github.com/seaweedfs/seaweedfs/weed/util"
"github.com/seaweedfs/seaweedfs/weed/wdclient"
+ util_http "github.com/seaweedfs/seaweedfs/weed/util/http"
)
const (
@@ -92,15 +93,15 @@ func NewMasterServer(r *mux.Router, option *MasterOption, peers map[string]pb.Se
v.SetDefault("master.replication.treat_replication_as_minimums", false)
replicationAsMin := v.GetBool("master.replication.treat_replication_as_minimums")
- v.SetDefault("master.volume_growth.copy_1", 7)
- v.SetDefault("master.volume_growth.copy_2", 6)
- v.SetDefault("master.volume_growth.copy_3", 3)
- v.SetDefault("master.volume_growth.copy_other", 1)
- v.SetDefault("master.volume_growth.threshold", 0.9)
- topology.VolumeGrowStrategy.Copy1Count = v.GetInt("master.volume_growth.copy_1")
- topology.VolumeGrowStrategy.Copy2Count = v.GetInt("master.volume_growth.copy_2")
- topology.VolumeGrowStrategy.Copy3Count = v.GetInt("master.volume_growth.copy_3")
- topology.VolumeGrowStrategy.CopyOtherCount = v.GetInt("master.volume_growth.copy_other")
+ v.SetDefault("master.volume_growth.copy_1", topology.VolumeGrowStrategy.Copy1Count)
+ v.SetDefault("master.volume_growth.copy_2", topology.VolumeGrowStrategy.Copy2Count)
+ v.SetDefault("master.volume_growth.copy_3", topology.VolumeGrowStrategy.Copy3Count)
+ v.SetDefault("master.volume_growth.copy_other", topology.VolumeGrowStrategy.CopyOtherCount)
+ v.SetDefault("master.volume_growth.threshold", topology.VolumeGrowStrategy.Threshold)
+ topology.VolumeGrowStrategy.Copy1Count = v.GetUint32("master.volume_growth.copy_1")
+ topology.VolumeGrowStrategy.Copy2Count = v.GetUint32("master.volume_growth.copy_2")
+ topology.VolumeGrowStrategy.Copy3Count = v.GetUint32("master.volume_growth.copy_3")
+ topology.VolumeGrowStrategy.CopyOtherCount = v.GetUint32("master.volume_growth.copy_other")
topology.VolumeGrowStrategy.Threshold = v.GetFloat64("master.volume_growth.threshold")
var preallocateSize int64
@@ -185,22 +186,7 @@ func (ms *MasterServer) SetRaftServer(raftServer *RaftServer) {
raftServerName = fmt.Sprintf("[%s]", ms.Topo.RaftServer.Name())
} else if raftServer.RaftHashicorp != nil {
ms.Topo.HashicorpRaft = raftServer.RaftHashicorp
- leaderCh := raftServer.RaftHashicorp.LeaderCh()
- prevLeader, _ := ms.Topo.HashicorpRaft.LeaderWithID()
raftServerName = ms.Topo.HashicorpRaft.String()
- go func() {
- for {
- select {
- case isLeader := <-leaderCh:
- ms.Topo.RaftServerAccessLock.RLock()
- leader, _ := ms.Topo.HashicorpRaft.LeaderWithID()
- ms.Topo.RaftServerAccessLock.RUnlock()
- glog.V(0).Infof("is leader %+v change event: %+v => %+v", isLeader, prevLeader, leader)
- stats.MasterLeaderChangeCounter.WithLabelValues(fmt.Sprintf("%+v", leader)).Inc()
- prevLeader = leader
- }
- }
- }()
}
ms.Topo.RaftServerAccessLock.Unlock()
@@ -256,7 +242,7 @@ func (ms *MasterServer) proxyToLeader(f http.HandlerFunc) http.HandlerFunc {
}
director(req)
}
- proxy.Transport = util.Transport
+ proxy.Transport = util_http.GetGlobalHttpClient().GetClientTransport()
proxy.ServeHTTP(w, r)
}
}
diff --git a/weed/server/master_server_handlers.go b/weed/server/master_server_handlers.go
index e4188420d..5e17bcca8 100644
--- a/weed/server/master_server_handlers.go
+++ b/weed/server/master_server_handlers.go
@@ -2,12 +2,13 @@ package weed_server
import (
"fmt"
- "github.com/seaweedfs/seaweedfs/weed/glog"
"net/http"
"strconv"
"strings"
"time"
+ "github.com/seaweedfs/seaweedfs/weed/glog"
+
"github.com/seaweedfs/seaweedfs/weed/operation"
"github.com/seaweedfs/seaweedfs/weed/security"
"github.com/seaweedfs/seaweedfs/weed/stats"
@@ -74,7 +75,10 @@ func (ms *MasterServer) findVolumeLocation(collection, vid string) operation.Loo
machines := ms.Topo.Lookup(collection, volumeId)
for _, loc := range machines {
locations = append(locations, operation.Location{
- Url: loc.Url(), PublicUrl: loc.PublicUrl, DataCenter: loc.GetDataCenterId(),
+ Url: loc.Url(),
+ PublicUrl: loc.PublicUrl,
+ DataCenter: loc.GetDataCenterId(),
+ GrpcPort: loc.GrpcPort,
})
}
}
@@ -82,7 +86,10 @@ func (ms *MasterServer) findVolumeLocation(collection, vid string) operation.Loo
machines, getVidLocationsErr := ms.MasterClient.GetVidLocations(vid)
for _, loc := range machines {
locations = append(locations, operation.Location{
- Url: loc.Url, PublicUrl: loc.PublicUrl, DataCenter: loc.DataCenter,
+ Url: loc.Url,
+ PublicUrl: loc.PublicUrl,
+ DataCenter: loc.DataCenter,
+ GrpcPort: loc.GrpcPort,
})
}
err = getVidLocationsErr
@@ -107,7 +114,7 @@ func (ms *MasterServer) dirAssignHandler(w http.ResponseWriter, r *http.Request)
requestedCount = 1
}
- writableVolumeCount, e := strconv.Atoi(r.FormValue("writableVolumeCount"))
+ writableVolumeCount, e := strconv.ParseUint(r.FormValue("writableVolumeCount"), 10, 32)
if e != nil {
writableVolumeCount = 0
}
@@ -126,23 +133,28 @@ func (ms *MasterServer) dirAssignHandler(w http.ResponseWriter, r *http.Request)
startTime = time.Now()
)
+ if !ms.Topo.DataCenterExists(option.DataCenter) {
+ writeJsonQuiet(w, r, http.StatusBadRequest, operation.AssignResult{
+ Error: fmt.Sprintf("data center %v not found in topology", option.DataCenter),
+ })
+ return
+ }
+
for time.Now().Sub(startTime) < maxTimeout {
fid, count, dnList, shouldGrow, err := ms.Topo.PickForWrite(requestedCount, option, vl)
if shouldGrow && !vl.HasGrowRequest() {
- // if picked volume is almost full, trigger a volume-grow request
glog.V(0).Infof("dirAssign volume growth %v from %v", option.String(), r.RemoteAddr)
- if ms.Topo.AvailableSpaceFor(option) <= 0 {
- writeJsonQuiet(w, r, http.StatusNotFound, operation.AssignResult{Error: "No free volumes left for " + option.String()})
- return
+ if err != nil && ms.Topo.AvailableSpaceFor(option) <= 0 {
+ err = fmt.Errorf("%s and no free volumes left for %s", err.Error(), option.String())
}
vl.AddGrowRequest()
ms.volumeGrowthRequestChan <- &topology.VolumeGrowRequest{
Option: option,
- Count: writableVolumeCount,
+ Count: uint32(writableVolumeCount),
}
}
if err != nil {
- // glog.Warningf("PickForWrite %+v: %v", req, err)
+ stats.MasterPickForWriteErrorCounter.Inc()
lastErr = err
time.Sleep(200 * time.Millisecond)
continue
diff --git a/weed/server/master_server_handlers_admin.go b/weed/server/master_server_handlers_admin.go
index f40b819af..7479b5535 100644
--- a/weed/server/master_server_handlers_admin.go
+++ b/weed/server/master_server_handlers_admin.go
@@ -18,6 +18,7 @@ import (
"github.com/seaweedfs/seaweedfs/weed/storage/types"
"github.com/seaweedfs/seaweedfs/weed/topology"
"github.com/seaweedfs/seaweedfs/weed/util"
+ util_http "github.com/seaweedfs/seaweedfs/weed/util/http"
)
func (ms *MasterServer) collectionDeleteHandler(w http.ResponseWriter, r *http.Request) {
@@ -70,7 +71,7 @@ func (ms *MasterServer) volumeVacuumHandler(w http.ResponseWriter, r *http.Reque
}
func (ms *MasterServer) volumeGrowHandler(w http.ResponseWriter, r *http.Request) {
- count := 0
+ count := uint64(0)
option, err := ms.getVolumeGrowOption(r)
if err != nil {
writeJsonError(w, r, http.StatusNotAcceptable, err)
@@ -78,13 +79,16 @@ func (ms *MasterServer) volumeGrowHandler(w http.ResponseWriter, r *http.Request
}
glog.V(0).Infof("volumeGrowHandler received %v from %v", option.String(), r.RemoteAddr)
- if count, err = strconv.Atoi(r.FormValue("count")); err == nil {
- if ms.Topo.AvailableSpaceFor(option) < int64(count*option.ReplicaPlacement.GetCopyCount()) {
- err = fmt.Errorf("only %d volumes left, not enough for %d", ms.Topo.AvailableSpaceFor(option), count*option.ReplicaPlacement.GetCopyCount())
+ if count, err = strconv.ParseUint(r.FormValue("count"), 10, 32); err == nil {
+ replicaCount := int64(count * uint64(option.ReplicaPlacement.GetCopyCount()))
+ if ms.Topo.AvailableSpaceFor(option) < replicaCount {
+ err = fmt.Errorf("only %d volumes left, not enough for %d", ms.Topo.AvailableSpaceFor(option), replicaCount)
+ } else if !ms.Topo.DataCenterExists(option.DataCenter) {
+ err = fmt.Errorf("data center %v not found in topology", option.DataCenter)
} else {
var newVidLocations []*master_pb.VolumeLocation
- newVidLocations, err = ms.vg.GrowByCountAndType(ms.grpcDialOption, count, option, ms.Topo)
- count = len(newVidLocations)
+ newVidLocations, err = ms.vg.GrowByCountAndType(ms.grpcDialOption, uint32(count), option, ms.Topo)
+ count = uint64(len(newVidLocations))
}
} else {
err = fmt.Errorf("can not parse parameter count %s", r.FormValue("count"))
@@ -110,11 +114,11 @@ func (ms *MasterServer) redirectHandler(w http.ResponseWriter, r *http.Request)
location := ms.findVolumeLocation(collection, vid)
if location.Error == "" {
loc := location.Locations[rand.Intn(len(location.Locations))]
- var url string
+ url, _ := util_http.NormalizeUrl(loc.PublicUrl)
if r.URL.RawQuery != "" {
- url = util.NormalizeUrl(loc.PublicUrl) + r.URL.Path + "?" + r.URL.RawQuery
+ url = url + r.URL.Path + "?" + r.URL.RawQuery
} else {
- url = util.NormalizeUrl(loc.PublicUrl) + r.URL.Path
+ url = url + r.URL.Path
}
http.Redirect(w, r, url, http.StatusPermanentRedirect)
} else {
diff --git a/weed/server/raft_hashicorp.go b/weed/server/raft_hashicorp.go
index d06066b93..299df323a 100644
--- a/weed/server/raft_hashicorp.go
+++ b/weed/server/raft_hashicorp.go
@@ -5,6 +5,14 @@ package weed_server
import (
"fmt"
+ "math/rand"
+ "os"
+ "path"
+ "path/filepath"
+ "sort"
+ "strings"
+ "time"
+
transport "github.com/Jille/raft-grpc-transport"
"github.com/armon/go-metrics"
"github.com/armon/go-metrics/prometheus"
@@ -14,13 +22,6 @@ import (
"github.com/seaweedfs/seaweedfs/weed/pb"
"github.com/seaweedfs/seaweedfs/weed/stats"
"google.golang.org/grpc"
- "math/rand"
- "os"
- "path"
- "path/filepath"
- "sort"
- "strings"
- "time"
)
const (
@@ -56,46 +57,61 @@ func (s *RaftServer) AddPeersConfiguration() (cfg raft.Configuration) {
return cfg
}
-func (s *RaftServer) UpdatePeers() {
+func (s *RaftServer) monitorLeaderLoop(updatePeers bool) {
for {
+ prevLeader, _ := s.RaftHashicorp.LeaderWithID()
select {
case isLeader := <-s.RaftHashicorp.LeaderCh():
+ leader, _ := s.RaftHashicorp.LeaderWithID()
if isLeader {
- peerLeader := string(s.serverAddr)
- existsPeerName := make(map[string]bool)
- for _, server := range s.RaftHashicorp.GetConfiguration().Configuration().Servers {
- if string(server.ID) == peerLeader {
- continue
- }
- existsPeerName[string(server.ID)] = true
- }
- for _, peer := range s.peers {
- peerName := string(peer)
- if peerName == peerLeader || existsPeerName[peerName] {
- continue
- }
- glog.V(0).Infof("adding new peer: %s", peerName)
- s.RaftHashicorp.AddVoter(
- raft.ServerID(peerName), raft.ServerAddress(peer.ToGrpcAddress()), 0, 0)
- }
- for peer := range existsPeerName {
- if _, found := s.peers[peer]; !found {
- glog.V(0).Infof("removing old peer: %s", peer)
- s.RaftHashicorp.RemoveServer(raft.ServerID(peer), 0, 0)
- }
- }
- if _, found := s.peers[peerLeader]; !found {
- glog.V(0).Infof("removing old leader peer: %s", peerLeader)
- s.RaftHashicorp.RemoveServer(raft.ServerID(peerLeader), 0, 0)
+
+ if updatePeers {
+ s.updatePeers()
+ updatePeers = false
}
+
+ s.topo.DoBarrier()
+
+ stats.MasterLeaderChangeCounter.WithLabelValues(fmt.Sprintf("%+v", leader)).Inc()
+ } else {
+ s.topo.BarrierReset()
}
- return
- case <-time.After(updatePeersTimeout):
- return
+ glog.V(0).Infof("is leader %+v change event: %+v => %+v", isLeader, prevLeader, leader)
+ prevLeader = leader
}
}
}
+func (s *RaftServer) updatePeers() {
+ peerLeader := string(s.serverAddr)
+ existsPeerName := make(map[string]bool)
+ for _, server := range s.RaftHashicorp.GetConfiguration().Configuration().Servers {
+ if string(server.ID) == peerLeader {
+ continue
+ }
+ existsPeerName[string(server.ID)] = true
+ }
+ for _, peer := range s.peers {
+ peerName := string(peer)
+ if peerName == peerLeader || existsPeerName[peerName] {
+ continue
+ }
+ glog.V(0).Infof("adding new peer: %s", peerName)
+ s.RaftHashicorp.AddVoter(
+ raft.ServerID(peerName), raft.ServerAddress(peer.ToGrpcAddress()), 0, 0)
+ }
+ for peer := range existsPeerName {
+ if _, found := s.peers[peer]; !found {
+ glog.V(0).Infof("removing old peer: %s", peer)
+ s.RaftHashicorp.RemoveServer(raft.ServerID(peer), 0, 0)
+ }
+ }
+ if _, found := s.peers[peerLeader]; !found {
+ glog.V(0).Infof("removing old leader peer: %s", peerLeader)
+ s.RaftHashicorp.RemoveServer(raft.ServerID(peerLeader), 0, 0)
+ }
+}
+
func NewHashicorpRaftServer(option *RaftServerOption) (*RaftServer, error) {
s := &RaftServer{
peers: option.Peers,
@@ -157,6 +173,8 @@ func NewHashicorpRaftServer(option *RaftServerOption) (*RaftServer, error) {
if err != nil {
return nil, fmt.Errorf("raft.NewRaft: %v", err)
}
+
+ updatePeers := false
if option.RaftBootstrap || len(s.RaftHashicorp.GetConfiguration().Configuration().Servers) == 0 {
cfg := s.AddPeersConfiguration()
// Need to get lock, in case all servers do this at the same time.
@@ -169,9 +187,11 @@ func NewHashicorpRaftServer(option *RaftServerOption) (*RaftServer, error) {
return nil, fmt.Errorf("raft.Raft.BootstrapCluster: %v", err)
}
} else {
- go s.UpdatePeers()
+ updatePeers = true
}
+ go s.monitorLeaderLoop(updatePeers)
+
ticker := time.NewTicker(c.HeartbeatTimeout * 10)
if glog.V(4) {
go func() {
diff --git a/weed/server/raft_server.go b/weed/server/raft_server.go
index d718ecac7..4bcd808c2 100644
--- a/weed/server/raft_server.go
+++ b/weed/server/raft_server.go
@@ -2,13 +2,14 @@ package weed_server
import (
"encoding/json"
- transport "github.com/Jille/raft-grpc-transport"
"io"
"math/rand"
"os"
"path"
"time"
+ transport "github.com/Jille/raft-grpc-transport"
+
"google.golang.org/grpc"
"github.com/seaweedfs/seaweedfs/weed/pb"
diff --git a/weed/server/volume_grpc_remote.go b/weed/server/volume_grpc_remote.go
index 64254b3b8..4452e019b 100644
--- a/weed/server/volume_grpc_remote.go
+++ b/weed/server/volume_grpc_remote.go
@@ -70,10 +70,15 @@ func (vs *VolumeServer) FetchAndWriteNeedle(ctx context.Context, req *volume_ser
PairMap: nil,
Jwt: security.EncodedJwt(req.Auth),
}
- if _, replicaWriteErr := operation.UploadData(data, uploadOption); replicaWriteErr != nil {
- if err == nil {
- err = fmt.Errorf("remote write needle %d size %d: %v", req.NeedleId, req.Size, err)
- }
+
+ uploader, uploaderErr := operation.NewUploader()
+ if uploaderErr != nil && err == nil {
+ err = fmt.Errorf("remote write needle %d size %d: %v", req.NeedleId, req.Size, uploaderErr)
+ return
+ }
+
+ if _, replicaWriteErr := uploader.UploadData(data, uploadOption); replicaWriteErr != nil && err == nil {
+ err = fmt.Errorf("remote write needle %d size %d: %v", req.NeedleId, req.Size, replicaWriteErr)
}
}(replica.Url)
}
diff --git a/weed/server/volume_server_handlers_read.go b/weed/server/volume_server_handlers_read.go
index cc364513b..15d639f49 100644
--- a/weed/server/volume_server_handlers_read.go
+++ b/weed/server/volume_server_handlers_read.go
@@ -27,6 +27,7 @@ import (
"github.com/seaweedfs/seaweedfs/weed/storage"
"github.com/seaweedfs/seaweedfs/weed/storage/needle"
"github.com/seaweedfs/seaweedfs/weed/util"
+ util_http "github.com/seaweedfs/seaweedfs/weed/util/http"
)
var fileNameEscaper = strings.NewReplacer(`\`, `\\`, `"`, `\"`)
@@ -81,10 +82,11 @@ func (vs *VolumeServer) GetOrHeadHandler(w http.ResponseWriter, r *http.Request)
}
if vs.ReadMode == "proxy" {
// proxy client request to target server
- u, _ := url.Parse(util.NormalizeUrl(lookupResult.Locations[0].Url))
+ rawURL, _ := util_http.NormalizeUrl(lookupResult.Locations[0].Url)
+ u, _ := url.Parse(rawURL)
r.URL.Host = u.Host
r.URL.Scheme = u.Scheme
- request, err := http.NewRequest("GET", r.URL.String(), nil)
+ request, err := http.NewRequest(http.MethodGet, r.URL.String(), nil)
if err != nil {
glog.V(0).Infof("failed to instance http request of url %s: %v", r.URL.String(), err)
InternalError(w)
@@ -96,13 +98,13 @@ func (vs *VolumeServer) GetOrHeadHandler(w http.ResponseWriter, r *http.Request)
}
}
- response, err := client.Do(request)
+ response, err := util_http.GetGlobalHttpClient().Do(request)
if err != nil {
glog.V(0).Infof("request remote url %s: %v", r.URL.String(), err)
InternalError(w)
return
}
- defer util.CloseResponse(response)
+ defer util_http.CloseResponse(response)
// proxy target response to client
for k, vv := range response.Header {
for _, v := range vv {
@@ -116,7 +118,8 @@ func (vs *VolumeServer) GetOrHeadHandler(w http.ResponseWriter, r *http.Request)
return
} else {
// redirect
- u, _ := url.Parse(util.NormalizeUrl(lookupResult.Locations[0].PublicUrl))
+ rawURL, _ := util_http.NormalizeUrl(lookupResult.Locations[0].PublicUrl)
+ u, _ := url.Parse(rawURL)
u.Path = fmt.Sprintf("%s/%s,%s", u.Path, vid, fid)
arg := url.Values{}
if c := r.FormValue("collection"); c != "" {
@@ -186,7 +189,7 @@ func (vs *VolumeServer) GetOrHeadHandler(w http.ResponseWriter, r *http.Request)
w.WriteHeader(http.StatusNotModified)
return
}
- setEtag(w, n.Etag())
+ SetEtag(w, n.Etag())
if n.HasPairs() {
pairMap := make(map[string]string)
@@ -253,7 +256,7 @@ func shouldAttemptStreamWrite(hasLocalVolume bool, ext string, r *http.Request)
if len(ext) > 0 {
ext = strings.ToLower(ext)
}
- if r.Method == "HEAD" {
+ if r.Method == http.MethodHead {
return true, true
}
_, _, _, shouldResize := shouldResizeImages(ext, r)
@@ -377,14 +380,14 @@ func writeResponseContent(filename, mimeType string, rs io.ReadSeeker, w http.Re
}
w.Header().Set("Accept-Ranges", "bytes")
- adjustPassthroughHeaders(w, r, filename)
+ AdjustPassthroughHeaders(w, r, filename)
- if r.Method == "HEAD" {
+ if r.Method == http.MethodHead {
w.Header().Set("Content-Length", strconv.FormatInt(totalSize, 10))
return nil
}
- return processRangeRequest(r, w, totalSize, mimeType, func(offset int64, size int64) (filer.DoStreamContent, error) {
+ return ProcessRangeRequest(r, w, totalSize, mimeType, func(offset int64, size int64) (filer.DoStreamContent, error) {
return func(writer io.Writer) error {
if _, e = rs.Seek(offset, 0); e != nil {
return e
@@ -406,14 +409,14 @@ func (vs *VolumeServer) streamWriteResponseContent(filename string, mimeType str
w.Header().Set("Content-Type", mimeType)
}
w.Header().Set("Accept-Ranges", "bytes")
- adjustPassthroughHeaders(w, r, filename)
+ AdjustPassthroughHeaders(w, r, filename)
- if r.Method == "HEAD" {
+ if r.Method == http.MethodHead {
w.Header().Set("Content-Length", strconv.FormatInt(totalSize, 10))
return
}
- processRangeRequest(r, w, totalSize, mimeType, func(offset int64, size int64) (filer.DoStreamContent, error) {
+ ProcessRangeRequest(r, w, totalSize, mimeType, func(offset int64, size int64) (filer.DoStreamContent, error) {
return func(writer io.Writer) error {
return vs.store.ReadVolumeNeedleDataInto(volumeId, n, readOption, writer, offset, size)
}, nil
diff --git a/weed/server/volume_server_handlers_write.go b/weed/server/volume_server_handlers_write.go
index 6e151bf80..7f0fcc871 100644
--- a/weed/server/volume_server_handlers_write.go
+++ b/weed/server/volume_server_handlers_write.go
@@ -53,7 +53,7 @@ func (vs *VolumeServer) PostHandler(w http.ResponseWriter, r *http.Request) {
// http 204 status code does not allow body
if writeError == nil && isUnchanged {
- setEtag(w, reqNeedle.Etag())
+ SetEtag(w, reqNeedle.Etag())
w.WriteHeader(http.StatusNoContent)
return
}
@@ -65,7 +65,7 @@ func (vs *VolumeServer) PostHandler(w http.ResponseWriter, r *http.Request) {
ret.Size = uint32(originalSize)
ret.ETag = reqNeedle.Etag()
ret.Mime = string(reqNeedle.Mime)
- setEtag(w, ret.ETag)
+ SetEtag(w, ret.ETag)
w.Header().Set("Content-MD5", contentMd5)
writeJsonQuiet(w, r, httpStatus, ret)
}
@@ -147,7 +147,7 @@ func writeDeleteResult(err error, count int64, w http.ResponseWriter, r *http.Re
}
}
-func setEtag(w http.ResponseWriter, etag string) {
+func SetEtag(w http.ResponseWriter, etag string) {
if etag != "" {
if strings.HasPrefix(etag, "\"") {
w.Header().Set("ETag", etag)
diff --git a/weed/server/webdav_server.go b/weed/server/webdav_server.go
index 97d51dad7..dbe6dfed5 100644
--- a/weed/server/webdav_server.go
+++ b/weed/server/webdav_server.go
@@ -99,6 +99,7 @@ type FileInfo struct {
modifiedTime time.Time
etag string
isDirectory bool
+ err error
}
func (fi *FileInfo) Name() string { return fi.name }
@@ -109,6 +110,9 @@ func (fi *FileInfo) IsDir() bool { return fi.isDirectory }
func (fi *FileInfo) Sys() interface{} { return nil }
func (fi *FileInfo) ETag(ctx context.Context) (string, error) {
+ if fi.err != nil {
+ return "", fi.err
+ }
return fi.etag, nil
}
@@ -269,7 +273,10 @@ func (fs *WebDavFileSystem) OpenFile(ctx context.Context, fullFilePath string, f
fi, err := fs.stat(ctx, fullFilePath)
if err != nil {
- return nil, os.ErrNotExist
+ if err == os.ErrNotExist {
+ return nil, err
+ }
+ return &WebDavFile{fs: fs}, nil
}
if !strings.HasSuffix(fullFilePath, "/") && fi.IsDir() {
fullFilePath += "/"
@@ -365,12 +372,16 @@ func (fs *WebDavFileSystem) stat(ctx context.Context, fullFilePath string) (os.F
var fi FileInfo
entry, err := filer_pb.GetEntry(fs, fullpath)
+ if err != nil {
+ if err == filer_pb.ErrNotFound {
+ return nil, os.ErrNotExist
+ }
+ fi.err = err
+ return &fi, nil
+ }
if entry == nil {
return nil, os.ErrNotExist
}
- if err != nil {
- return nil, err
- }
fi.size = int64(filer.FileSize(entry))
fi.name = string(fullpath)
fi.mode = os.FileMode(entry.Attributes.FileMode)
@@ -392,8 +403,13 @@ func (fs *WebDavFileSystem) Stat(ctx context.Context, name string) (os.FileInfo,
}
func (f *WebDavFile) saveDataAsChunk(reader io.Reader, name string, offset int64, tsNs int64) (chunk *filer_pb.FileChunk, err error) {
+ uploader, uploaderErr := operation.NewUploader()
+ if uploaderErr != nil {
+ glog.V(0).Infof("upload data %v: %v", f.name, uploaderErr)
+ return nil, fmt.Errorf("upload data: %v", uploaderErr)
+ }
- fileId, uploadResult, flushErr, _ := operation.UploadWithRetry(
+ fileId, uploadResult, flushErr, _ := uploader.UploadWithRetry(
f.fs,
&filer_pb.AssignVolumeRequest{
Count: 1,
@@ -509,7 +525,9 @@ func (f *WebDavFile) Write(buf []byte) (int, error) {
func (f *WebDavFile) Close() error {
glog.V(2).Infof("WebDavFileSystem.Close %v", f.name)
-
+ if f.bufWriter == nil {
+ return nil
+ }
err := f.bufWriter.Close()
if f.entry != nil {