diff options
| author | chrislu <chris.lu@gmail.com> | 2024-08-10 10:01:57 -0700 |
|---|---|---|
| committer | chrislu <chris.lu@gmail.com> | 2024-08-10 10:01:57 -0700 |
| commit | 7438648d1cfacd5ca570dd029d1bdb5fd271bd70 (patch) | |
| tree | cf12b49473be0373cb03d83470ddc75708454171 /weed/server | |
| parent | 49893267e978cc3fda00dc991e00099742fb5a9d (diff) | |
| parent | 63c707f9c1b4dc469ec39c446563c324ce4ccb6f (diff) | |
| download | seaweedfs-7438648d1cfacd5ca570dd029d1bdb5fd271bd70.tar.xz seaweedfs-7438648d1cfacd5ca570dd029d1bdb5fd271bd70.zip | |
Merge branch 'master' into mq
Diffstat (limited to 'weed/server')
28 files changed, 496 insertions, 193 deletions
diff --git a/weed/server/common.go b/weed/server/common.go index a7d67fb2e..e6f6cdb88 100644 --- a/weed/server/common.go +++ b/weed/server/common.go @@ -127,7 +127,7 @@ func debug(params ...interface{}) { func submitForClientHandler(w http.ResponseWriter, r *http.Request, masterFn operation.GetMasterFn, grpcDialOption grpc.DialOption) { m := make(map[string]interface{}) - if r.Method != "POST" { + if r.Method != http.MethodPost { writeJsonError(w, r, http.StatusMethodNotAllowed, errors.New("Only submit via POST!")) return } @@ -181,7 +181,12 @@ func submitForClientHandler(w http.ResponseWriter, r *http.Request, masterFn ope PairMap: pu.PairMap, Jwt: assignResult.Auth, } - uploadResult, err := operation.UploadData(pu.Data, uploadOption) + uploader, err := operation.NewUploader() + if err != nil { + writeJsonError(w, r, http.StatusInternalServerError, err) + return + } + uploadResult, err := uploader.UploadData(pu.Data, uploadOption) if err != nil { writeJsonError(w, r, http.StatusInternalServerError, err) return @@ -260,7 +265,7 @@ func handleStaticResources2(r *mux.Router) { r.PathPrefix("/seaweedfsstatic/").Handler(http.StripPrefix("/seaweedfsstatic", http.FileServer(http.FS(StaticFS)))) } -func adjustPassthroughHeaders(w http.ResponseWriter, r *http.Request, filename string) { +func AdjustPassthroughHeaders(w http.ResponseWriter, r *http.Request, filename string) { for header, values := range r.Header { if normalizedHeader, ok := s3_constants.PassThroughHeaders[strings.ToLower(header)]; ok { w.Header()[normalizedHeader] = values @@ -284,7 +289,7 @@ func adjustHeaderContentDisposition(w http.ResponseWriter, r *http.Request, file } } -func processRangeRequest(r *http.Request, w http.ResponseWriter, totalSize int64, mimeType string, prepareWriteFn func(offset int64, size int64) (filer.DoStreamContent, error)) error { +func ProcessRangeRequest(r *http.Request, w http.ResponseWriter, totalSize int64, mimeType string, prepareWriteFn func(offset int64, size int64) (filer.DoStreamContent, error)) error { rangeReq := r.Header.Get("Range") bufferedWriter := writePool.Get().(*bufio.Writer) bufferedWriter.Reset(w) @@ -297,14 +302,14 @@ func processRangeRequest(r *http.Request, w http.ResponseWriter, totalSize int64 w.Header().Set("Content-Length", strconv.FormatInt(totalSize, 10)) writeFn, err := prepareWriteFn(0, totalSize) if err != nil { - glog.Errorf("processRangeRequest: %v", err) + glog.Errorf("ProcessRangeRequest: %v", err) http.Error(w, err.Error(), http.StatusInternalServerError) - return fmt.Errorf("processRangeRequest: %v", err) + return fmt.Errorf("ProcessRangeRequest: %v", err) } if err = writeFn(bufferedWriter); err != nil { - glog.Errorf("processRangeRequest: %v", err) + glog.Errorf("ProcessRangeRequest: %v", err) http.Error(w, err.Error(), http.StatusInternalServerError) - return fmt.Errorf("processRangeRequest: %v", err) + return fmt.Errorf("ProcessRangeRequest: %v", err) } return nil } @@ -313,9 +318,9 @@ func processRangeRequest(r *http.Request, w http.ResponseWriter, totalSize int64 //mostly copy from src/pkg/net/http/fs.go ranges, err := parseRange(rangeReq, totalSize) if err != nil { - glog.Errorf("processRangeRequest headers: %+v err: %v", w.Header(), err) + glog.Errorf("ProcessRangeRequest headers: %+v err: %v", w.Header(), err) http.Error(w, err.Error(), http.StatusRequestedRangeNotSatisfiable) - return fmt.Errorf("processRangeRequest header: %v", err) + return fmt.Errorf("ProcessRangeRequest header: %v", err) } if sumRangesSize(ranges) > totalSize { // The total number of bytes in all the ranges @@ -345,16 +350,16 @@ func processRangeRequest(r *http.Request, w http.ResponseWriter, totalSize int64 writeFn, err := prepareWriteFn(ra.start, ra.length) if err != nil { - glog.Errorf("processRangeRequest range[0]: %+v err: %v", w.Header(), err) + glog.Errorf("ProcessRangeRequest range[0]: %+v err: %v", w.Header(), err) http.Error(w, err.Error(), http.StatusInternalServerError) - return fmt.Errorf("processRangeRequest: %v", err) + return fmt.Errorf("ProcessRangeRequest: %v", err) } w.WriteHeader(http.StatusPartialContent) err = writeFn(bufferedWriter) if err != nil { - glog.Errorf("processRangeRequest range[0]: %+v err: %v", w.Header(), err) + glog.Errorf("ProcessRangeRequest range[0]: %+v err: %v", w.Header(), err) http.Error(w, err.Error(), http.StatusInternalServerError) - return fmt.Errorf("processRangeRequest range[0]: %v", err) + return fmt.Errorf("ProcessRangeRequest range[0]: %v", err) } return nil } @@ -369,9 +374,9 @@ func processRangeRequest(r *http.Request, w http.ResponseWriter, totalSize int64 } writeFn, err := prepareWriteFn(ra.start, ra.length) if err != nil { - glog.Errorf("processRangeRequest range[%d] err: %v", i, err) + glog.Errorf("ProcessRangeRequest range[%d] err: %v", i, err) http.Error(w, "Internal Error", http.StatusInternalServerError) - return fmt.Errorf("processRangeRequest range[%d] err: %v", i, err) + return fmt.Errorf("ProcessRangeRequest range[%d] err: %v", i, err) } writeFnByRange[i] = writeFn } @@ -406,9 +411,9 @@ func processRangeRequest(r *http.Request, w http.ResponseWriter, totalSize int64 } w.WriteHeader(http.StatusPartialContent) if _, err := io.CopyN(bufferedWriter, sendContent, sendSize); err != nil { - glog.Errorf("processRangeRequest err: %v", err) + glog.Errorf("ProcessRangeRequest err: %v", err) http.Error(w, "Internal Error", http.StatusInternalServerError) - return fmt.Errorf("processRangeRequest err: %v", err) + return fmt.Errorf("ProcessRangeRequest err: %v", err) } return nil } diff --git a/weed/server/filer_grpc_server.go b/weed/server/filer_grpc_server.go index b9571710d..b1440c94f 100644 --- a/weed/server/filer_grpc_server.go +++ b/weed/server/filer_grpc_server.go @@ -291,7 +291,7 @@ func (fs *FilerServer) DeleteEntry(ctx context.Context, req *filer_pb.DeleteEntr glog.V(4).Infof("DeleteEntry %v", req) - err = fs.filer.DeleteEntryMetaAndData(ctx, util.JoinPath(req.Directory, req.Name), req.IsRecursive, req.IgnoreRecursiveError, req.IsDeleteData, req.IsFromOtherCluster, req.Signatures) + err = fs.filer.DeleteEntryMetaAndData(ctx, util.JoinPath(req.Directory, req.Name), req.IsRecursive, req.IgnoreRecursiveError, req.IsDeleteData, req.IsFromOtherCluster, req.Signatures, req.IfNotModifiedAfter) resp = &filer_pb.DeleteEntryResponse{} if err != nil && err != filer_pb.ErrNotFound { resp.Error = err.Error() @@ -363,12 +363,7 @@ func (fs *FilerServer) DeleteCollection(ctx context.Context, req *filer_pb.Delet glog.V(4).Infof("DeleteCollection %v", req) - err = fs.filer.MasterClient.WithClient(false, func(client master_pb.SeaweedClient) error { - _, err := client.CollectionDelete(context.Background(), &master_pb.CollectionDeleteRequest{ - Name: req.GetCollection(), - }) - return err - }) + err = fs.filer.DoDeleteCollection(req.GetCollection()) return &filer_pb.DeleteCollectionResponse{}, err } diff --git a/weed/server/filer_grpc_server_admin.go b/weed/server/filer_grpc_server_admin.go index b4caaf4e2..8b4912258 100644 --- a/weed/server/filer_grpc_server_admin.go +++ b/weed/server/filer_grpc_server_admin.go @@ -96,6 +96,8 @@ func (fs *FilerServer) GetFilerConfiguration(ctx context.Context, req *filer_pb. MetricsIntervalSec: int32(fs.metricsIntervalSec), Version: util.Version(), FilerGroup: fs.option.FilerGroup, + MajorVersion: util.MAJOR_VERSION, + MinorVersion: util.MINOR_VERSION, } glog.V(4).Infof("GetFilerConfiguration: %v", t) diff --git a/weed/server/filer_grpc_server_rename.go b/weed/server/filer_grpc_server_rename.go index 3acea6f14..db00dd496 100644 --- a/weed/server/filer_grpc_server_rename.go +++ b/weed/server/filer_grpc_server_rename.go @@ -203,7 +203,7 @@ func (fs *FilerServer) moveSelfEntry(ctx context.Context, stream filer_pb.Seawee // delete old entry ctx = context.WithValue(ctx, "OP", "MV") - deleteErr := fs.filer.DeleteEntryMetaAndData(ctx, oldPath, false, false, false, false, signatures) + deleteErr := fs.filer.DeleteEntryMetaAndData(ctx, oldPath, false, false, false, false, signatures, 0) if deleteErr != nil { return deleteErr } diff --git a/weed/server/filer_grpc_server_traverse_meta.go b/weed/server/filer_grpc_server_traverse_meta.go new file mode 100644 index 000000000..4a924f065 --- /dev/null +++ b/weed/server/filer_grpc_server_traverse_meta.go @@ -0,0 +1,84 @@ +package weed_server + +import ( + "context" + "fmt" + "github.com/seaweedfs/seaweedfs/weed/filer" + "github.com/seaweedfs/seaweedfs/weed/glog" + "github.com/seaweedfs/seaweedfs/weed/pb/filer_pb" + "github.com/seaweedfs/seaweedfs/weed/util" + "github.com/viant/ptrie" +) + +func (fs *FilerServer) TraverseBfsMetadata(req *filer_pb.TraverseBfsMetadataRequest, stream filer_pb.SeaweedFiler_TraverseBfsMetadataServer) error { + + glog.V(0).Infof("TraverseBfsMetadata %v", req) + + excludedTrie := ptrie.New[bool]() + for _, excluded := range req.ExcludedPrefixes { + excludedTrie.Put([]byte(excluded), true) + } + + ctx := stream.Context() + + queue := util.NewQueue[*filer.Entry]() + dirEntry, err := fs.filer.FindEntry(ctx, util.FullPath(req.Directory)) + if err != nil { + return fmt.Errorf("find dir %s: %v", req.Directory, err) + } + queue.Enqueue(dirEntry) + + for item := queue.Dequeue(); item != nil; item = queue.Dequeue() { + if excludedTrie.MatchPrefix([]byte(item.FullPath), func(key []byte, value bool) bool { + return true + }) { + // println("excluded", item.FullPath) + continue + } + parent, _ := item.FullPath.DirAndName() + if err := stream.Send(&filer_pb.TraverseBfsMetadataResponse{ + Directory: parent, + Entry: item.ToProtoEntry(), + }); err != nil { + return fmt.Errorf("send traverse bfs metadata response: %v", err) + } + + if !item.IsDirectory() { + continue + } + + if err := fs.iterateDirectory(ctx, item.FullPath, func(entry *filer.Entry) error { + queue.Enqueue(entry) + return nil + }); err != nil { + return err + } + } + + return nil +} + +func (fs *FilerServer) iterateDirectory(ctx context.Context, dirPath util.FullPath, fn func(entry *filer.Entry) error) (err error) { + var lastFileName string + var listErr error + for { + var hasEntries bool + lastFileName, listErr = fs.filer.StreamListDirectoryEntries(ctx, dirPath, lastFileName, false, 1024, "", "", "", func(entry *filer.Entry) bool { + hasEntries = true + if fnErr := fn(entry); fnErr != nil { + err = fnErr + return false + } + return true + }) + if listErr != nil { + return listErr + } + if err != nil { + return err + } + if !hasEntries { + return nil + } + } +} diff --git a/weed/server/filer_grpc_server_traverse_meta_test.go b/weed/server/filer_grpc_server_traverse_meta_test.go new file mode 100644 index 000000000..72f8a916e --- /dev/null +++ b/weed/server/filer_grpc_server_traverse_meta_test.go @@ -0,0 +1,31 @@ +package weed_server + +import ( + "github.com/stretchr/testify/assert" + "github.com/viant/ptrie" + "testing" +) + +func TestPtrie(t *testing.T) { + b := []byte("/topics/abc/dev") + excludedTrie := ptrie.New[bool]() + excludedTrie.Put([]byte("/topics/abc/d"), true) + excludedTrie.Put([]byte("/topics/abc"), true) + + assert.True(t, excludedTrie.MatchPrefix(b, func(key []byte, value bool) bool { + println("matched1", string(key)) + return true + })) + + assert.True(t, excludedTrie.MatchAll(b, func(key []byte, value bool) bool { + println("matched2", string(key)) + return true + })) + + assert.False(t, excludedTrie.MatchAll([]byte("/topics/ab"), func(key []byte, value bool) bool { + println("matched3", string(key)) + return true + })) + + assert.False(t, excludedTrie.Has(b)) +} diff --git a/weed/server/filer_server.go b/weed/server/filer_server.go index 0b7254c0d..ee052579c 100644 --- a/weed/server/filer_server.go +++ b/weed/server/filer_server.go @@ -74,7 +74,6 @@ type FilerOption struct { DiskType string AllowedOrigins []string ExposeDirectoryData bool - JoinExistingFiler bool } type FilerServer struct { @@ -198,12 +197,9 @@ func NewFilerServer(defaultMux, readonlyMux *http.ServeMux, option *FilerOption) existingNodes := fs.filer.ListExistingPeerUpdates(context.Background()) startFromTime := time.Now().Add(-filer.LogFlushInterval) - if option.JoinExistingFiler { - startFromTime = time.Time{} - } if isFresh { glog.V(0).Infof("%s bootstrap from peers %+v", option.Host, existingNodes) - if err := fs.filer.MaybeBootstrapFromPeers(option.Host, existingNodes, startFromTime); err != nil { + if err := fs.filer.MaybeBootstrapFromOnePeer(option.Host, existingNodes, startFromTime); err != nil { glog.Fatalf("%s bootstrap from %+v: %v", option.Host, existingNodes, err) } } diff --git a/weed/server/filer_server_handlers_proxy.go b/weed/server/filer_server_handlers_proxy.go index e04994569..c1a26ca11 100644 --- a/weed/server/filer_server_handlers_proxy.go +++ b/weed/server/filer_server_handlers_proxy.go @@ -3,24 +3,13 @@ package weed_server import ( "github.com/seaweedfs/seaweedfs/weed/glog" "github.com/seaweedfs/seaweedfs/weed/security" - "github.com/seaweedfs/seaweedfs/weed/util" "github.com/seaweedfs/seaweedfs/weed/util/mem" "io" "math/rand" "net/http" + util_http "github.com/seaweedfs/seaweedfs/weed/util/http" ) -var ( - client *http.Client -) - -func init() { - client = &http.Client{Transport: &http.Transport{ - MaxIdleConns: 1024, - MaxIdleConnsPerHost: 1024, - }} -} - func (fs *FilerServer) maybeAddVolumeJwtAuthorization(r *http.Request, fileId string, isWrite bool) { encodedJwt := fs.maybeGetVolumeJwtAuthorizationToken(fileId, isWrite) @@ -71,14 +60,14 @@ func (fs *FilerServer) proxyToVolumeServer(w http.ResponseWriter, r *http.Reques } } - proxyResponse, postErr := client.Do(proxyReq) + proxyResponse, postErr := util_http.GetGlobalHttpClient().Do(proxyReq) if postErr != nil { glog.Errorf("post to filer: %v", postErr) w.WriteHeader(http.StatusInternalServerError) return } - defer util.CloseResponse(proxyResponse) + defer util_http.CloseResponse(proxyResponse) for k, v := range proxyResponse.Header { w.Header()[k] = v diff --git a/weed/server/filer_server_handlers_read.go b/weed/server/filer_server_handlers_read.go index 123b7a494..a02e6c2c1 100644 --- a/weed/server/filer_server_handlers_read.go +++ b/weed/server/filer_server_handlers_read.go @@ -71,14 +71,14 @@ func checkPreconditions(w http.ResponseWriter, r *http.Request, entry *filer.Ent ifModifiedSinceHeader := r.Header.Get("If-Modified-Since") if ifNoneMatchETagHeader != "" { if util.CanonicalizeETag(etag) == util.CanonicalizeETag(ifNoneMatchETagHeader) { - setEtag(w, etag) + SetEtag(w, etag) w.WriteHeader(http.StatusNotModified) return true } } else if ifModifiedSinceHeader != "" { if t, parseError := time.Parse(http.TimeFormat, ifModifiedSinceHeader); parseError == nil { if !t.Before(entry.Attr.Mtime) { - setEtag(w, etag) + SetEtag(w, etag) w.WriteHeader(http.StatusNotModified) return true } @@ -220,13 +220,13 @@ func (fs *FilerServer) GetOrHeadHandler(w http.ResponseWriter, r *http.Request) w.Header().Set(s3_constants.AmzTagCount, strconv.Itoa(tagCount)) } - setEtag(w, etag) + SetEtag(w, etag) filename := entry.Name() - adjustPassthroughHeaders(w, r, filename) + AdjustPassthroughHeaders(w, r, filename) totalSize := int64(entry.Size()) - if r.Method == "HEAD" { + if r.Method == http.MethodHead { w.Header().Set("Content-Length", strconv.FormatInt(totalSize, 10)) return } @@ -252,7 +252,7 @@ func (fs *FilerServer) GetOrHeadHandler(w http.ResponseWriter, r *http.Request) } } - processRangeRequest(r, w, totalSize, mimeType, func(offset int64, size int64) (filer.DoStreamContent, error) { + ProcessRangeRequest(r, w, totalSize, mimeType, func(offset int64, size int64) (filer.DoStreamContent, error) { if offset+size <= int64(len(entry.Content)) { return func(writer io.Writer) error { _, err := writer.Write(entry.Content[offset : offset+size]) diff --git a/weed/server/filer_server_handlers_read_dir.go b/weed/server/filer_server_handlers_read_dir.go index 0531527bb..56f0f9cb4 100644 --- a/weed/server/filer_server_handlers_read_dir.go +++ b/weed/server/filer_server_handlers_read_dir.go @@ -31,8 +31,8 @@ func (fs *FilerServer) listDirectoryHandler(w http.ResponseWriter, r *http.Reque path = path[:len(path)-1] } - limit, limit_err := strconv.Atoi(r.FormValue("limit")) - if limit_err != nil { + limit, limitErr := strconv.Atoi(r.FormValue("limit")) + if limitErr != nil { limit = fs.option.DirListingLimit } @@ -62,6 +62,7 @@ func (fs *FilerServer) listDirectoryHandler(w http.ResponseWriter, r *http.Reque if r.Header.Get("Accept") == "application/json" { writeJsonQuiet(w, r, http.StatusOK, struct { + Version string Path string Entries interface{} Limit int @@ -69,6 +70,7 @@ func (fs *FilerServer) listDirectoryHandler(w http.ResponseWriter, r *http.Reque ShouldDisplayLoadMore bool EmptyFolder bool }{ + util.Version(), path, entries, limit, @@ -80,6 +82,7 @@ func (fs *FilerServer) listDirectoryHandler(w http.ResponseWriter, r *http.Reque } err = ui.StatusTpl.Execute(w, struct { + Version string Path string Breadcrumbs []ui.Breadcrumb Entries interface{} @@ -89,6 +92,7 @@ func (fs *FilerServer) listDirectoryHandler(w http.ResponseWriter, r *http.Reque EmptyFolder bool ShowDirectoryDelete bool }{ + util.Version(), path, ui.ToBreadcrumb(path), entries, diff --git a/weed/server/filer_server_handlers_write.go b/weed/server/filer_server_handlers_write.go index b186fd34e..f0f756e34 100644 --- a/weed/server/filer_server_handlers_write.go +++ b/weed/server/filer_server_handlers_write.go @@ -18,6 +18,7 @@ import ( "github.com/seaweedfs/seaweedfs/weed/stats" "github.com/seaweedfs/seaweedfs/weed/storage/needle" "github.com/seaweedfs/seaweedfs/weed/util" + util_http "github.com/seaweedfs/seaweedfs/weed/util/http" ) var ( @@ -120,7 +121,7 @@ func (fs *FilerServer) PostHandler(w http.ResponseWriter, r *http.Request, conte fs.autoChunk(ctx, w, r, contentLength, so) } - util.CloseRequest(r) + util_http.CloseRequest(r) } @@ -211,7 +212,7 @@ func (fs *FilerServer) DeleteHandler(w http.ResponseWriter, r *http.Request) { objectPath = objectPath[0 : len(objectPath)-1] } - err := fs.filer.DeleteEntryMetaAndData(context.Background(), util.FullPath(objectPath), isRecursive, ignoreRecursiveError, !skipChunkDeletion, false, nil) + err := fs.filer.DeleteEntryMetaAndData(context.Background(), util.FullPath(objectPath), isRecursive, ignoreRecursiveError, !skipChunkDeletion, false, nil, 0) if err != nil { if err == filer_pb.ErrNotFound { writeJsonQuiet(w, r, http.StatusNoContent, nil) diff --git a/weed/server/filer_server_handlers_write_autochunk.go b/weed/server/filer_server_handlers_write_autochunk.go index 2698e2209..1c7ed0c3c 100644 --- a/weed/server/filer_server_handlers_write_autochunk.go +++ b/weed/server/filer_server_handlers_write_autochunk.go @@ -39,7 +39,7 @@ func (fs *FilerServer) autoChunk(ctx context.Context, w http.ResponseWriter, r * var reply *FilerPostResult var err error var md5bytes []byte - if r.Method == "POST" { + if r.Method == http.MethodPost { if r.Header.Get("Content-Type") == "" && strings.HasSuffix(r.URL.Path, "/") { reply, err = fs.mkdir(ctx, w, r, so) } else { @@ -148,6 +148,10 @@ func skipCheckParentDirEntry(r *http.Request) bool { return r.URL.Query().Get("skipCheckParentDir") == "true" } +func isS3Request(r *http.Request) bool { + return r.Header.Get(s3_constants.AmzAuthType) != "" || r.Header.Get("X-Amz-Date") != "" +} + func (fs *FilerServer) saveMetaData(ctx context.Context, r *http.Request, fileName string, contentType string, so *operation.StorageOption, md5bytes []byte, fileChunks []*filer_pb.FileChunk, chunkOffset int64, content []byte) (filerResult *FilerPostResult, replyerr error) { // detect file mode @@ -266,7 +270,12 @@ func (fs *FilerServer) saveMetaData(ctx context.Context, r *http.Request, fileNa } } - if dbErr := fs.filer.CreateEntry(ctx, entry, false, false, nil, skipCheckParentDirEntry(r), so.MaxFileNameLength); dbErr != nil { + dbErr := fs.filer.CreateEntry(ctx, entry, false, false, nil, skipCheckParentDirEntry(r), so.MaxFileNameLength) + // In test_bucket_listv2_delimiter_basic, the valid object key is the parent folder + if dbErr != nil && strings.HasSuffix(dbErr.Error(), " is a file") && isS3Request(r) { + dbErr = fs.filer.CreateEntry(ctx, entry, false, false, nil, true, so.MaxFileNameLength) + } + if dbErr != nil { replyerr = dbErr filerResult.Error = dbErr.Error() glog.V(0).Infof("failing to write %s to filer server : %v", path, dbErr) @@ -299,8 +308,14 @@ func (fs *FilerServer) saveAsChunk(so *operation.StorageOption) filer.SaveDataAs PairMap: nil, Jwt: auth, } + + uploader, uploaderErr := operation.NewUploader() + if uploaderErr != nil { + return uploaderErr + } + var uploadErr error - uploadResult, uploadErr, _ = operation.Upload(reader, uploadOption) + uploadResult, uploadErr, _ = uploader.Upload(reader, uploadOption) if uploadErr != nil { return uploadErr } diff --git a/weed/server/filer_server_handlers_write_cipher.go b/weed/server/filer_server_handlers_write_cipher.go index 6cf7d65b1..f8d129bf3 100644 --- a/weed/server/filer_server_handlers_write_cipher.go +++ b/weed/server/filer_server_handlers_write_cipher.go @@ -53,7 +53,13 @@ func (fs *FilerServer) encrypt(ctx context.Context, w http.ResponseWriter, r *ht PairMap: pu.PairMap, Jwt: auth, } - uploadResult, uploadError := operation.UploadData(uncompressedData, uploadOption) + + uploader, uploaderErr := operation.NewUploader() + if uploaderErr != nil { + return nil, fmt.Errorf("uploader initialization error: %v", uploaderErr) + } + + uploadResult, uploadError := uploader.UploadData(uncompressedData, uploadOption) if uploadError != nil { return nil, fmt.Errorf("upload to volume server: %v", uploadError) } diff --git a/weed/server/filer_server_handlers_write_upload.go b/weed/server/filer_server_handlers_write_upload.go index 8c8eba078..d0d1575cf 100644 --- a/weed/server/filer_server_handlers_write_upload.go +++ b/weed/server/filer_server_handlers_write_upload.go @@ -158,7 +158,13 @@ func (fs *FilerServer) doUpload(urlLocation string, limitedReader io.Reader, fil PairMap: pairMap, Jwt: auth, } - uploadResult, err, data := operation.Upload(limitedReader, uploadOption) + + uploader, err := operation.NewUploader() + if err != nil { + return nil, err, []byte{} + } + + uploadResult, err, data := uploader.Upload(limitedReader, uploadOption) if uploadResult != nil && uploadResult.RetryCount > 0 { stats.FilerHandlerCounter.WithLabelValues(stats.ChunkUploadRetry).Add(float64(uploadResult.RetryCount)) } diff --git a/weed/server/filer_ui/breadcrumb.go b/weed/server/filer_ui/breadcrumb.go index abb6cce9a..638638196 100644 --- a/weed/server/filer_ui/breadcrumb.go +++ b/weed/server/filer_ui/breadcrumb.go @@ -13,6 +13,9 @@ type Breadcrumb struct { func ToBreadcrumb(fullpath string) (crumbs []Breadcrumb) { parts := strings.Split(fullpath, "/") + if fullpath == "/" { + parts = []string{""} + } for i := 0; i < len(parts); i++ { name := parts[i] diff --git a/weed/server/filer_ui/breadcrumb_test.go b/weed/server/filer_ui/breadcrumb_test.go new file mode 100644 index 000000000..6e42541cb --- /dev/null +++ b/weed/server/filer_ui/breadcrumb_test.go @@ -0,0 +1,86 @@ +package filer_ui + +import ( + "reflect" + "testing" +) + +func TestToBreadcrumb(t *testing.T) { + type args struct { + fullpath string + } + tests := []struct { + name string + args args + wantCrumbs []Breadcrumb + }{ + { + name: "empty", + args: args{ + fullpath: "", + }, + wantCrumbs: []Breadcrumb{ + { + Name: "/", + Link: "/", + }, + }, + }, + { + name: "test1", + args: args{ + fullpath: "/", + }, + wantCrumbs: []Breadcrumb{ + { + Name: "/", + Link: "/", + }, + }, + }, + { + name: "test2", + args: args{ + fullpath: "/abc", + }, + wantCrumbs: []Breadcrumb{ + { + Name: "/", + Link: "/", + }, + { + Name: "abc", + Link: "/abc/", + }, + }, + }, + { + name: "test3", + args: args{ + fullpath: "/abc/def", + }, + wantCrumbs: []Breadcrumb{ + { + Name: "/", + Link: "/", + }, + { + Name: "abc", + Link: "/abc/", + }, + { + Name: "def", + Link: "/abc/def/", + }, + }, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + if gotCrumbs := ToBreadcrumb(tt.args.fullpath); !reflect.DeepEqual(gotCrumbs, tt.wantCrumbs) { + t.Errorf("ToBreadcrumb() = %v, want %v", gotCrumbs, tt.wantCrumbs) + } + }) + } +} diff --git a/weed/server/filer_ui/filer.html b/weed/server/filer_ui/filer.html index 28425f180..627f3ba77 100644 --- a/weed/server/filer_ui/filer.html +++ b/weed/server/filer_ui/filer.html @@ -1,7 +1,7 @@ <!DOCTYPE html> <html> <head> - <title>SeaweedFS Filer</title> + <title>SeaweedFS Filer {{ .Version }}</title> <meta name="viewport" content="width=device-width, initial-scale=1"> <link rel="stylesheet" href="/seaweedfsstatic/bootstrap/3.3.1/css/bootstrap.min.css"> <style> @@ -82,7 +82,7 @@ <div class="page-header"> <h1> <a href="https://github.com/seaweedfs/seaweedfs"><img src="/seaweedfsstatic/seaweed50x50.png"></img></a> - SeaweedFS Filer + SeaweedFS Filer <small>{{ .Version }}</small> </h1> </div> <div class="row"> @@ -99,7 +99,7 @@ {{ range $entry := .Breadcrumbs }} <li><a href="{{ printpath $entry.Link }}"> {{ $entry.Name }} - </li></a> + </a></li> {{ end }} </ol> </div> diff --git a/weed/server/master_grpc_server_assign.go b/weed/server/master_grpc_server_assign.go index 7455c9ea4..4f95b4ff6 100644 --- a/weed/server/master_grpc_server_assign.go +++ b/weed/server/master_grpc_server_assign.go @@ -4,6 +4,7 @@ import ( "context" "fmt" "github.com/seaweedfs/seaweedfs/weed/glog" + "github.com/seaweedfs/seaweedfs/weed/stats" "time" "github.com/seaweedfs/raft" @@ -69,7 +70,12 @@ func (ms *MasterServer) Assign(ctx context.Context, req *master_pb.AssignRequest MemoryMapMaxSizeMb: req.MemoryMapMaxSizeMb, } + if !ms.Topo.DataCenterExists(option.DataCenter) { + return nil, fmt.Errorf("data center %v not found in topology", option.DataCenter) + } + vl := ms.Topo.GetVolumeLayout(option.Collection, option.ReplicaPlacement, option.Ttl, option.DiskType) + vl.SetLastGrowCount(req.WritableVolumeCount) var ( lastErr error @@ -80,18 +86,17 @@ func (ms *MasterServer) Assign(ctx context.Context, req *master_pb.AssignRequest for time.Now().Sub(startTime) < maxTimeout { fid, count, dnList, shouldGrow, err := ms.Topo.PickForWrite(req.Count, option, vl) if shouldGrow && !vl.HasGrowRequest() { - // if picked volume is almost full, trigger a volume-grow request - if ms.Topo.AvailableSpaceFor(option) <= 0 { - return nil, fmt.Errorf("no free volumes left for " + option.String()) + if err != nil && ms.Topo.AvailableSpaceFor(option) <= 0 { + err = fmt.Errorf("%s and no free volumes left for %s", err.Error(), option.String()) } vl.AddGrowRequest() ms.volumeGrowthRequestChan <- &topology.VolumeGrowRequest{ Option: option, - Count: int(req.WritableVolumeCount), + Count: req.WritableVolumeCount, } } if err != nil { - // glog.Warningf("PickForWrite %+v: %v", req, err) + stats.MasterPickForWriteErrorCounter.Inc() lastErr = err time.Sleep(200 * time.Millisecond) continue diff --git a/weed/server/master_grpc_server_volume.go b/weed/server/master_grpc_server_volume.go index 503da7fd4..3cad627db 100644 --- a/weed/server/master_grpc_server_volume.go +++ b/weed/server/master_grpc_server_volume.go @@ -3,11 +3,14 @@ package weed_server import ( "context" "fmt" + "math/rand" "reflect" "strings" "sync" "time" + "github.com/seaweedfs/seaweedfs/weed/topology" + "github.com/seaweedfs/raft" "github.com/seaweedfs/seaweedfs/weed/glog" @@ -18,8 +21,39 @@ import ( "github.com/seaweedfs/seaweedfs/weed/storage/types" ) +func (ms *MasterServer) DoAutomaticVolumeGrow(req *topology.VolumeGrowRequest) { + glog.V(1).Infoln("starting automatic volume grow") + start := time.Now() + newVidLocations, err := ms.vg.AutomaticGrowByType(req.Option, ms.grpcDialOption, ms.Topo, req.Count) + glog.V(1).Infoln("finished automatic volume grow, cost ", time.Now().Sub(start)) + if err != nil { + glog.V(1).Infof("automatic volume grow failed: %+v", err) + return + } + for _, newVidLocation := range newVidLocations { + ms.broadcastToClients(&master_pb.KeepConnectedResponse{VolumeLocation: newVidLocation}) + } +} + func (ms *MasterServer) ProcessGrowRequest() { go func() { + for { + time.Sleep(14*time.Minute + time.Duration(120*rand.Float32())*time.Second) + if !ms.Topo.IsLeader() { + continue + } + for _, vl := range ms.Topo.ListVolumeLyauts() { + if !vl.HasGrowRequest() && vl.ShouldGrowVolumes(&topology.VolumeGrowOption{}) { + vl.AddGrowRequest() + ms.volumeGrowthRequestChan <- &topology.VolumeGrowRequest{ + Option: vl.ToGrowOption(), + Count: vl.GetLastGrowCount(), + } + } + } + } + }() + go func() { filter := sync.Map{} for { req, ok := <-ms.volumeGrowthRequestChan @@ -27,9 +61,13 @@ func (ms *MasterServer) ProcessGrowRequest() { break } + option := req.Option + vl := ms.Topo.GetVolumeLayout(option.Collection, option.ReplicaPlacement, option.Ttl, option.DiskType) + if !ms.Topo.IsLeader() { //discard buffered requests time.Sleep(time.Second * 1) + vl.DoneGrowRequest() continue } @@ -42,28 +80,15 @@ func (ms *MasterServer) ProcessGrowRequest() { return !found }) - option := req.Option - vl := ms.Topo.GetVolumeLayout(option.Collection, option.ReplicaPlacement, option.Ttl, option.DiskType) - // not atomic but it's okay if !found && vl.ShouldGrowVolumes(option) { filter.Store(req, nil) // we have lock called inside vg - go func() { - glog.V(1).Infoln("starting automatic volume grow") - start := time.Now() - newVidLocations, err := ms.vg.AutomaticGrowByType(req.Option, ms.grpcDialOption, ms.Topo, req.Count) - glog.V(1).Infoln("finished automatic volume grow, cost ", time.Now().Sub(start)) - if err == nil { - for _, newVidLocation := range newVidLocations { - ms.broadcastToClients(&master_pb.KeepConnectedResponse{VolumeLocation: newVidLocation}) - } - } + go func(req *topology.VolumeGrowRequest, vl *topology.VolumeLayout) { + ms.DoAutomaticVolumeGrow(req) vl.DoneGrowRequest() - filter.Delete(req) - }() - + }(req, vl) } else { glog.V(4).Infoln("discard volume grow request") time.Sleep(time.Millisecond * 211) @@ -91,6 +116,7 @@ func (ms *MasterServer) LookupVolume(ctx context.Context, req *master_pb.LookupV Url: loc.Url, PublicUrl: loc.PublicUrl, DataCenter: loc.DataCenter, + GrpcPort: uint32(loc.GrpcPort), }) } var auth string diff --git a/weed/server/master_server.go b/weed/server/master_server.go index 3499a2e13..44a1664c0 100644 --- a/weed/server/master_server.go +++ b/weed/server/master_server.go @@ -30,6 +30,7 @@ import ( "github.com/seaweedfs/seaweedfs/weed/topology" "github.com/seaweedfs/seaweedfs/weed/util" "github.com/seaweedfs/seaweedfs/weed/wdclient" + util_http "github.com/seaweedfs/seaweedfs/weed/util/http" ) const ( @@ -92,15 +93,15 @@ func NewMasterServer(r *mux.Router, option *MasterOption, peers map[string]pb.Se v.SetDefault("master.replication.treat_replication_as_minimums", false) replicationAsMin := v.GetBool("master.replication.treat_replication_as_minimums") - v.SetDefault("master.volume_growth.copy_1", 7) - v.SetDefault("master.volume_growth.copy_2", 6) - v.SetDefault("master.volume_growth.copy_3", 3) - v.SetDefault("master.volume_growth.copy_other", 1) - v.SetDefault("master.volume_growth.threshold", 0.9) - topology.VolumeGrowStrategy.Copy1Count = v.GetInt("master.volume_growth.copy_1") - topology.VolumeGrowStrategy.Copy2Count = v.GetInt("master.volume_growth.copy_2") - topology.VolumeGrowStrategy.Copy3Count = v.GetInt("master.volume_growth.copy_3") - topology.VolumeGrowStrategy.CopyOtherCount = v.GetInt("master.volume_growth.copy_other") + v.SetDefault("master.volume_growth.copy_1", topology.VolumeGrowStrategy.Copy1Count) + v.SetDefault("master.volume_growth.copy_2", topology.VolumeGrowStrategy.Copy2Count) + v.SetDefault("master.volume_growth.copy_3", topology.VolumeGrowStrategy.Copy3Count) + v.SetDefault("master.volume_growth.copy_other", topology.VolumeGrowStrategy.CopyOtherCount) + v.SetDefault("master.volume_growth.threshold", topology.VolumeGrowStrategy.Threshold) + topology.VolumeGrowStrategy.Copy1Count = v.GetUint32("master.volume_growth.copy_1") + topology.VolumeGrowStrategy.Copy2Count = v.GetUint32("master.volume_growth.copy_2") + topology.VolumeGrowStrategy.Copy3Count = v.GetUint32("master.volume_growth.copy_3") + topology.VolumeGrowStrategy.CopyOtherCount = v.GetUint32("master.volume_growth.copy_other") topology.VolumeGrowStrategy.Threshold = v.GetFloat64("master.volume_growth.threshold") var preallocateSize int64 @@ -185,22 +186,7 @@ func (ms *MasterServer) SetRaftServer(raftServer *RaftServer) { raftServerName = fmt.Sprintf("[%s]", ms.Topo.RaftServer.Name()) } else if raftServer.RaftHashicorp != nil { ms.Topo.HashicorpRaft = raftServer.RaftHashicorp - leaderCh := raftServer.RaftHashicorp.LeaderCh() - prevLeader, _ := ms.Topo.HashicorpRaft.LeaderWithID() raftServerName = ms.Topo.HashicorpRaft.String() - go func() { - for { - select { - case isLeader := <-leaderCh: - ms.Topo.RaftServerAccessLock.RLock() - leader, _ := ms.Topo.HashicorpRaft.LeaderWithID() - ms.Topo.RaftServerAccessLock.RUnlock() - glog.V(0).Infof("is leader %+v change event: %+v => %+v", isLeader, prevLeader, leader) - stats.MasterLeaderChangeCounter.WithLabelValues(fmt.Sprintf("%+v", leader)).Inc() - prevLeader = leader - } - } - }() } ms.Topo.RaftServerAccessLock.Unlock() @@ -256,7 +242,7 @@ func (ms *MasterServer) proxyToLeader(f http.HandlerFunc) http.HandlerFunc { } director(req) } - proxy.Transport = util.Transport + proxy.Transport = util_http.GetGlobalHttpClient().GetClientTransport() proxy.ServeHTTP(w, r) } } diff --git a/weed/server/master_server_handlers.go b/weed/server/master_server_handlers.go index e4188420d..5e17bcca8 100644 --- a/weed/server/master_server_handlers.go +++ b/weed/server/master_server_handlers.go @@ -2,12 +2,13 @@ package weed_server import ( "fmt" - "github.com/seaweedfs/seaweedfs/weed/glog" "net/http" "strconv" "strings" "time" + "github.com/seaweedfs/seaweedfs/weed/glog" + "github.com/seaweedfs/seaweedfs/weed/operation" "github.com/seaweedfs/seaweedfs/weed/security" "github.com/seaweedfs/seaweedfs/weed/stats" @@ -74,7 +75,10 @@ func (ms *MasterServer) findVolumeLocation(collection, vid string) operation.Loo machines := ms.Topo.Lookup(collection, volumeId) for _, loc := range machines { locations = append(locations, operation.Location{ - Url: loc.Url(), PublicUrl: loc.PublicUrl, DataCenter: loc.GetDataCenterId(), + Url: loc.Url(), + PublicUrl: loc.PublicUrl, + DataCenter: loc.GetDataCenterId(), + GrpcPort: loc.GrpcPort, }) } } @@ -82,7 +86,10 @@ func (ms *MasterServer) findVolumeLocation(collection, vid string) operation.Loo machines, getVidLocationsErr := ms.MasterClient.GetVidLocations(vid) for _, loc := range machines { locations = append(locations, operation.Location{ - Url: loc.Url, PublicUrl: loc.PublicUrl, DataCenter: loc.DataCenter, + Url: loc.Url, + PublicUrl: loc.PublicUrl, + DataCenter: loc.DataCenter, + GrpcPort: loc.GrpcPort, }) } err = getVidLocationsErr @@ -107,7 +114,7 @@ func (ms *MasterServer) dirAssignHandler(w http.ResponseWriter, r *http.Request) requestedCount = 1 } - writableVolumeCount, e := strconv.Atoi(r.FormValue("writableVolumeCount")) + writableVolumeCount, e := strconv.ParseUint(r.FormValue("writableVolumeCount"), 10, 32) if e != nil { writableVolumeCount = 0 } @@ -126,23 +133,28 @@ func (ms *MasterServer) dirAssignHandler(w http.ResponseWriter, r *http.Request) startTime = time.Now() ) + if !ms.Topo.DataCenterExists(option.DataCenter) { + writeJsonQuiet(w, r, http.StatusBadRequest, operation.AssignResult{ + Error: fmt.Sprintf("data center %v not found in topology", option.DataCenter), + }) + return + } + for time.Now().Sub(startTime) < maxTimeout { fid, count, dnList, shouldGrow, err := ms.Topo.PickForWrite(requestedCount, option, vl) if shouldGrow && !vl.HasGrowRequest() { - // if picked volume is almost full, trigger a volume-grow request glog.V(0).Infof("dirAssign volume growth %v from %v", option.String(), r.RemoteAddr) - if ms.Topo.AvailableSpaceFor(option) <= 0 { - writeJsonQuiet(w, r, http.StatusNotFound, operation.AssignResult{Error: "No free volumes left for " + option.String()}) - return + if err != nil && ms.Topo.AvailableSpaceFor(option) <= 0 { + err = fmt.Errorf("%s and no free volumes left for %s", err.Error(), option.String()) } vl.AddGrowRequest() ms.volumeGrowthRequestChan <- &topology.VolumeGrowRequest{ Option: option, - Count: writableVolumeCount, + Count: uint32(writableVolumeCount), } } if err != nil { - // glog.Warningf("PickForWrite %+v: %v", req, err) + stats.MasterPickForWriteErrorCounter.Inc() lastErr = err time.Sleep(200 * time.Millisecond) continue diff --git a/weed/server/master_server_handlers_admin.go b/weed/server/master_server_handlers_admin.go index f40b819af..7479b5535 100644 --- a/weed/server/master_server_handlers_admin.go +++ b/weed/server/master_server_handlers_admin.go @@ -18,6 +18,7 @@ import ( "github.com/seaweedfs/seaweedfs/weed/storage/types" "github.com/seaweedfs/seaweedfs/weed/topology" "github.com/seaweedfs/seaweedfs/weed/util" + util_http "github.com/seaweedfs/seaweedfs/weed/util/http" ) func (ms *MasterServer) collectionDeleteHandler(w http.ResponseWriter, r *http.Request) { @@ -70,7 +71,7 @@ func (ms *MasterServer) volumeVacuumHandler(w http.ResponseWriter, r *http.Reque } func (ms *MasterServer) volumeGrowHandler(w http.ResponseWriter, r *http.Request) { - count := 0 + count := uint64(0) option, err := ms.getVolumeGrowOption(r) if err != nil { writeJsonError(w, r, http.StatusNotAcceptable, err) @@ -78,13 +79,16 @@ func (ms *MasterServer) volumeGrowHandler(w http.ResponseWriter, r *http.Request } glog.V(0).Infof("volumeGrowHandler received %v from %v", option.String(), r.RemoteAddr) - if count, err = strconv.Atoi(r.FormValue("count")); err == nil { - if ms.Topo.AvailableSpaceFor(option) < int64(count*option.ReplicaPlacement.GetCopyCount()) { - err = fmt.Errorf("only %d volumes left, not enough for %d", ms.Topo.AvailableSpaceFor(option), count*option.ReplicaPlacement.GetCopyCount()) + if count, err = strconv.ParseUint(r.FormValue("count"), 10, 32); err == nil { + replicaCount := int64(count * uint64(option.ReplicaPlacement.GetCopyCount())) + if ms.Topo.AvailableSpaceFor(option) < replicaCount { + err = fmt.Errorf("only %d volumes left, not enough for %d", ms.Topo.AvailableSpaceFor(option), replicaCount) + } else if !ms.Topo.DataCenterExists(option.DataCenter) { + err = fmt.Errorf("data center %v not found in topology", option.DataCenter) } else { var newVidLocations []*master_pb.VolumeLocation - newVidLocations, err = ms.vg.GrowByCountAndType(ms.grpcDialOption, count, option, ms.Topo) - count = len(newVidLocations) + newVidLocations, err = ms.vg.GrowByCountAndType(ms.grpcDialOption, uint32(count), option, ms.Topo) + count = uint64(len(newVidLocations)) } } else { err = fmt.Errorf("can not parse parameter count %s", r.FormValue("count")) @@ -110,11 +114,11 @@ func (ms *MasterServer) redirectHandler(w http.ResponseWriter, r *http.Request) location := ms.findVolumeLocation(collection, vid) if location.Error == "" { loc := location.Locations[rand.Intn(len(location.Locations))] - var url string + url, _ := util_http.NormalizeUrl(loc.PublicUrl) if r.URL.RawQuery != "" { - url = util.NormalizeUrl(loc.PublicUrl) + r.URL.Path + "?" + r.URL.RawQuery + url = url + r.URL.Path + "?" + r.URL.RawQuery } else { - url = util.NormalizeUrl(loc.PublicUrl) + r.URL.Path + url = url + r.URL.Path } http.Redirect(w, r, url, http.StatusPermanentRedirect) } else { diff --git a/weed/server/raft_hashicorp.go b/weed/server/raft_hashicorp.go index d06066b93..299df323a 100644 --- a/weed/server/raft_hashicorp.go +++ b/weed/server/raft_hashicorp.go @@ -5,6 +5,14 @@ package weed_server import ( "fmt" + "math/rand" + "os" + "path" + "path/filepath" + "sort" + "strings" + "time" + transport "github.com/Jille/raft-grpc-transport" "github.com/armon/go-metrics" "github.com/armon/go-metrics/prometheus" @@ -14,13 +22,6 @@ import ( "github.com/seaweedfs/seaweedfs/weed/pb" "github.com/seaweedfs/seaweedfs/weed/stats" "google.golang.org/grpc" - "math/rand" - "os" - "path" - "path/filepath" - "sort" - "strings" - "time" ) const ( @@ -56,46 +57,61 @@ func (s *RaftServer) AddPeersConfiguration() (cfg raft.Configuration) { return cfg } -func (s *RaftServer) UpdatePeers() { +func (s *RaftServer) monitorLeaderLoop(updatePeers bool) { for { + prevLeader, _ := s.RaftHashicorp.LeaderWithID() select { case isLeader := <-s.RaftHashicorp.LeaderCh(): + leader, _ := s.RaftHashicorp.LeaderWithID() if isLeader { - peerLeader := string(s.serverAddr) - existsPeerName := make(map[string]bool) - for _, server := range s.RaftHashicorp.GetConfiguration().Configuration().Servers { - if string(server.ID) == peerLeader { - continue - } - existsPeerName[string(server.ID)] = true - } - for _, peer := range s.peers { - peerName := string(peer) - if peerName == peerLeader || existsPeerName[peerName] { - continue - } - glog.V(0).Infof("adding new peer: %s", peerName) - s.RaftHashicorp.AddVoter( - raft.ServerID(peerName), raft.ServerAddress(peer.ToGrpcAddress()), 0, 0) - } - for peer := range existsPeerName { - if _, found := s.peers[peer]; !found { - glog.V(0).Infof("removing old peer: %s", peer) - s.RaftHashicorp.RemoveServer(raft.ServerID(peer), 0, 0) - } - } - if _, found := s.peers[peerLeader]; !found { - glog.V(0).Infof("removing old leader peer: %s", peerLeader) - s.RaftHashicorp.RemoveServer(raft.ServerID(peerLeader), 0, 0) + + if updatePeers { + s.updatePeers() + updatePeers = false } + + s.topo.DoBarrier() + + stats.MasterLeaderChangeCounter.WithLabelValues(fmt.Sprintf("%+v", leader)).Inc() + } else { + s.topo.BarrierReset() } - return - case <-time.After(updatePeersTimeout): - return + glog.V(0).Infof("is leader %+v change event: %+v => %+v", isLeader, prevLeader, leader) + prevLeader = leader } } } +func (s *RaftServer) updatePeers() { + peerLeader := string(s.serverAddr) + existsPeerName := make(map[string]bool) + for _, server := range s.RaftHashicorp.GetConfiguration().Configuration().Servers { + if string(server.ID) == peerLeader { + continue + } + existsPeerName[string(server.ID)] = true + } + for _, peer := range s.peers { + peerName := string(peer) + if peerName == peerLeader || existsPeerName[peerName] { + continue + } + glog.V(0).Infof("adding new peer: %s", peerName) + s.RaftHashicorp.AddVoter( + raft.ServerID(peerName), raft.ServerAddress(peer.ToGrpcAddress()), 0, 0) + } + for peer := range existsPeerName { + if _, found := s.peers[peer]; !found { + glog.V(0).Infof("removing old peer: %s", peer) + s.RaftHashicorp.RemoveServer(raft.ServerID(peer), 0, 0) + } + } + if _, found := s.peers[peerLeader]; !found { + glog.V(0).Infof("removing old leader peer: %s", peerLeader) + s.RaftHashicorp.RemoveServer(raft.ServerID(peerLeader), 0, 0) + } +} + func NewHashicorpRaftServer(option *RaftServerOption) (*RaftServer, error) { s := &RaftServer{ peers: option.Peers, @@ -157,6 +173,8 @@ func NewHashicorpRaftServer(option *RaftServerOption) (*RaftServer, error) { if err != nil { return nil, fmt.Errorf("raft.NewRaft: %v", err) } + + updatePeers := false if option.RaftBootstrap || len(s.RaftHashicorp.GetConfiguration().Configuration().Servers) == 0 { cfg := s.AddPeersConfiguration() // Need to get lock, in case all servers do this at the same time. @@ -169,9 +187,11 @@ func NewHashicorpRaftServer(option *RaftServerOption) (*RaftServer, error) { return nil, fmt.Errorf("raft.Raft.BootstrapCluster: %v", err) } } else { - go s.UpdatePeers() + updatePeers = true } + go s.monitorLeaderLoop(updatePeers) + ticker := time.NewTicker(c.HeartbeatTimeout * 10) if glog.V(4) { go func() { diff --git a/weed/server/raft_server.go b/weed/server/raft_server.go index d718ecac7..4bcd808c2 100644 --- a/weed/server/raft_server.go +++ b/weed/server/raft_server.go @@ -2,13 +2,14 @@ package weed_server import ( "encoding/json" - transport "github.com/Jille/raft-grpc-transport" "io" "math/rand" "os" "path" "time" + transport "github.com/Jille/raft-grpc-transport" + "google.golang.org/grpc" "github.com/seaweedfs/seaweedfs/weed/pb" diff --git a/weed/server/volume_grpc_remote.go b/weed/server/volume_grpc_remote.go index 64254b3b8..4452e019b 100644 --- a/weed/server/volume_grpc_remote.go +++ b/weed/server/volume_grpc_remote.go @@ -70,10 +70,15 @@ func (vs *VolumeServer) FetchAndWriteNeedle(ctx context.Context, req *volume_ser PairMap: nil, Jwt: security.EncodedJwt(req.Auth), } - if _, replicaWriteErr := operation.UploadData(data, uploadOption); replicaWriteErr != nil { - if err == nil { - err = fmt.Errorf("remote write needle %d size %d: %v", req.NeedleId, req.Size, err) - } + + uploader, uploaderErr := operation.NewUploader() + if uploaderErr != nil && err == nil { + err = fmt.Errorf("remote write needle %d size %d: %v", req.NeedleId, req.Size, uploaderErr) + return + } + + if _, replicaWriteErr := uploader.UploadData(data, uploadOption); replicaWriteErr != nil && err == nil { + err = fmt.Errorf("remote write needle %d size %d: %v", req.NeedleId, req.Size, replicaWriteErr) } }(replica.Url) } diff --git a/weed/server/volume_server_handlers_read.go b/weed/server/volume_server_handlers_read.go index cc364513b..15d639f49 100644 --- a/weed/server/volume_server_handlers_read.go +++ b/weed/server/volume_server_handlers_read.go @@ -27,6 +27,7 @@ import ( "github.com/seaweedfs/seaweedfs/weed/storage" "github.com/seaweedfs/seaweedfs/weed/storage/needle" "github.com/seaweedfs/seaweedfs/weed/util" + util_http "github.com/seaweedfs/seaweedfs/weed/util/http" ) var fileNameEscaper = strings.NewReplacer(`\`, `\\`, `"`, `\"`) @@ -81,10 +82,11 @@ func (vs *VolumeServer) GetOrHeadHandler(w http.ResponseWriter, r *http.Request) } if vs.ReadMode == "proxy" { // proxy client request to target server - u, _ := url.Parse(util.NormalizeUrl(lookupResult.Locations[0].Url)) + rawURL, _ := util_http.NormalizeUrl(lookupResult.Locations[0].Url) + u, _ := url.Parse(rawURL) r.URL.Host = u.Host r.URL.Scheme = u.Scheme - request, err := http.NewRequest("GET", r.URL.String(), nil) + request, err := http.NewRequest(http.MethodGet, r.URL.String(), nil) if err != nil { glog.V(0).Infof("failed to instance http request of url %s: %v", r.URL.String(), err) InternalError(w) @@ -96,13 +98,13 @@ func (vs *VolumeServer) GetOrHeadHandler(w http.ResponseWriter, r *http.Request) } } - response, err := client.Do(request) + response, err := util_http.GetGlobalHttpClient().Do(request) if err != nil { glog.V(0).Infof("request remote url %s: %v", r.URL.String(), err) InternalError(w) return } - defer util.CloseResponse(response) + defer util_http.CloseResponse(response) // proxy target response to client for k, vv := range response.Header { for _, v := range vv { @@ -116,7 +118,8 @@ func (vs *VolumeServer) GetOrHeadHandler(w http.ResponseWriter, r *http.Request) return } else { // redirect - u, _ := url.Parse(util.NormalizeUrl(lookupResult.Locations[0].PublicUrl)) + rawURL, _ := util_http.NormalizeUrl(lookupResult.Locations[0].PublicUrl) + u, _ := url.Parse(rawURL) u.Path = fmt.Sprintf("%s/%s,%s", u.Path, vid, fid) arg := url.Values{} if c := r.FormValue("collection"); c != "" { @@ -186,7 +189,7 @@ func (vs *VolumeServer) GetOrHeadHandler(w http.ResponseWriter, r *http.Request) w.WriteHeader(http.StatusNotModified) return } - setEtag(w, n.Etag()) + SetEtag(w, n.Etag()) if n.HasPairs() { pairMap := make(map[string]string) @@ -253,7 +256,7 @@ func shouldAttemptStreamWrite(hasLocalVolume bool, ext string, r *http.Request) if len(ext) > 0 { ext = strings.ToLower(ext) } - if r.Method == "HEAD" { + if r.Method == http.MethodHead { return true, true } _, _, _, shouldResize := shouldResizeImages(ext, r) @@ -377,14 +380,14 @@ func writeResponseContent(filename, mimeType string, rs io.ReadSeeker, w http.Re } w.Header().Set("Accept-Ranges", "bytes") - adjustPassthroughHeaders(w, r, filename) + AdjustPassthroughHeaders(w, r, filename) - if r.Method == "HEAD" { + if r.Method == http.MethodHead { w.Header().Set("Content-Length", strconv.FormatInt(totalSize, 10)) return nil } - return processRangeRequest(r, w, totalSize, mimeType, func(offset int64, size int64) (filer.DoStreamContent, error) { + return ProcessRangeRequest(r, w, totalSize, mimeType, func(offset int64, size int64) (filer.DoStreamContent, error) { return func(writer io.Writer) error { if _, e = rs.Seek(offset, 0); e != nil { return e @@ -406,14 +409,14 @@ func (vs *VolumeServer) streamWriteResponseContent(filename string, mimeType str w.Header().Set("Content-Type", mimeType) } w.Header().Set("Accept-Ranges", "bytes") - adjustPassthroughHeaders(w, r, filename) + AdjustPassthroughHeaders(w, r, filename) - if r.Method == "HEAD" { + if r.Method == http.MethodHead { w.Header().Set("Content-Length", strconv.FormatInt(totalSize, 10)) return } - processRangeRequest(r, w, totalSize, mimeType, func(offset int64, size int64) (filer.DoStreamContent, error) { + ProcessRangeRequest(r, w, totalSize, mimeType, func(offset int64, size int64) (filer.DoStreamContent, error) { return func(writer io.Writer) error { return vs.store.ReadVolumeNeedleDataInto(volumeId, n, readOption, writer, offset, size) }, nil diff --git a/weed/server/volume_server_handlers_write.go b/weed/server/volume_server_handlers_write.go index 6e151bf80..7f0fcc871 100644 --- a/weed/server/volume_server_handlers_write.go +++ b/weed/server/volume_server_handlers_write.go @@ -53,7 +53,7 @@ func (vs *VolumeServer) PostHandler(w http.ResponseWriter, r *http.Request) { // http 204 status code does not allow body if writeError == nil && isUnchanged { - setEtag(w, reqNeedle.Etag()) + SetEtag(w, reqNeedle.Etag()) w.WriteHeader(http.StatusNoContent) return } @@ -65,7 +65,7 @@ func (vs *VolumeServer) PostHandler(w http.ResponseWriter, r *http.Request) { ret.Size = uint32(originalSize) ret.ETag = reqNeedle.Etag() ret.Mime = string(reqNeedle.Mime) - setEtag(w, ret.ETag) + SetEtag(w, ret.ETag) w.Header().Set("Content-MD5", contentMd5) writeJsonQuiet(w, r, httpStatus, ret) } @@ -147,7 +147,7 @@ func writeDeleteResult(err error, count int64, w http.ResponseWriter, r *http.Re } } -func setEtag(w http.ResponseWriter, etag string) { +func SetEtag(w http.ResponseWriter, etag string) { if etag != "" { if strings.HasPrefix(etag, "\"") { w.Header().Set("ETag", etag) diff --git a/weed/server/webdav_server.go b/weed/server/webdav_server.go index 97d51dad7..dbe6dfed5 100644 --- a/weed/server/webdav_server.go +++ b/weed/server/webdav_server.go @@ -99,6 +99,7 @@ type FileInfo struct { modifiedTime time.Time etag string isDirectory bool + err error } func (fi *FileInfo) Name() string { return fi.name } @@ -109,6 +110,9 @@ func (fi *FileInfo) IsDir() bool { return fi.isDirectory } func (fi *FileInfo) Sys() interface{} { return nil } func (fi *FileInfo) ETag(ctx context.Context) (string, error) { + if fi.err != nil { + return "", fi.err + } return fi.etag, nil } @@ -269,7 +273,10 @@ func (fs *WebDavFileSystem) OpenFile(ctx context.Context, fullFilePath string, f fi, err := fs.stat(ctx, fullFilePath) if err != nil { - return nil, os.ErrNotExist + if err == os.ErrNotExist { + return nil, err + } + return &WebDavFile{fs: fs}, nil } if !strings.HasSuffix(fullFilePath, "/") && fi.IsDir() { fullFilePath += "/" @@ -365,12 +372,16 @@ func (fs *WebDavFileSystem) stat(ctx context.Context, fullFilePath string) (os.F var fi FileInfo entry, err := filer_pb.GetEntry(fs, fullpath) + if err != nil { + if err == filer_pb.ErrNotFound { + return nil, os.ErrNotExist + } + fi.err = err + return &fi, nil + } if entry == nil { return nil, os.ErrNotExist } - if err != nil { - return nil, err - } fi.size = int64(filer.FileSize(entry)) fi.name = string(fullpath) fi.mode = os.FileMode(entry.Attributes.FileMode) @@ -392,8 +403,13 @@ func (fs *WebDavFileSystem) Stat(ctx context.Context, name string) (os.FileInfo, } func (f *WebDavFile) saveDataAsChunk(reader io.Reader, name string, offset int64, tsNs int64) (chunk *filer_pb.FileChunk, err error) { + uploader, uploaderErr := operation.NewUploader() + if uploaderErr != nil { + glog.V(0).Infof("upload data %v: %v", f.name, uploaderErr) + return nil, fmt.Errorf("upload data: %v", uploaderErr) + } - fileId, uploadResult, flushErr, _ := operation.UploadWithRetry( + fileId, uploadResult, flushErr, _ := uploader.UploadWithRetry( f.fs, &filer_pb.AssignVolumeRequest{ Count: 1, @@ -509,7 +525,9 @@ func (f *WebDavFile) Write(buf []byte) (int, error) { func (f *WebDavFile) Close() error { glog.V(2).Infof("WebDavFileSystem.Close %v", f.name) - + if f.bufWriter == nil { + return nil + } err := f.bufWriter.Close() if f.entry != nil { |
