diff options
Diffstat (limited to 'weed/server')
| -rw-r--r-- | weed/server/common.go | 9 | ||||
| -rw-r--r-- | weed/server/filer_grpc_server.go | 6 | ||||
| -rw-r--r-- | weed/server/filer_grpc_server_rename.go | 18 | ||||
| -rw-r--r-- | weed/server/filer_server.go | 6 | ||||
| -rw-r--r-- | weed/server/filer_server_handlers_proxy.go | 6 | ||||
| -rw-r--r-- | weed/server/filer_server_handlers_read.go | 21 | ||||
| -rw-r--r-- | weed/server/filer_server_handlers_tagging.go | 8 | ||||
| -rw-r--r-- | weed/server/filer_server_handlers_write.go | 70 | ||||
| -rw-r--r-- | weed/server/filer_server_handlers_write_autochunk.go | 25 | ||||
| -rw-r--r-- | weed/server/filer_server_handlers_write_cipher.go | 2 | ||||
| -rw-r--r-- | weed/server/filer_server_handlers_write_upload.go | 22 | ||||
| -rw-r--r-- | weed/server/master_server.go | 2 | ||||
| -rw-r--r-- | weed/server/raft_server.go | 76 | ||||
| -rw-r--r-- | weed/server/volume_server.go | 3 | ||||
| -rw-r--r-- | weed/server/volume_server_handlers.go | 25 | ||||
| -rw-r--r-- | weed/server/volume_server_handlers_admin.go | 19 | ||||
| -rw-r--r-- | weed/server/volume_server_handlers_read.go | 5 | ||||
| -rw-r--r-- | weed/server/webdav_server.go | 5 |
18 files changed, 238 insertions, 90 deletions
diff --git a/weed/server/common.go b/weed/server/common.go index ba4d13456..0d458c9c3 100644 --- a/weed/server/common.go +++ b/weed/server/common.go @@ -1,6 +1,7 @@ package weed_server import ( + "bufio" "bytes" "encoding/json" "errors" @@ -277,10 +278,12 @@ func adjustHeaderContentDisposition(w http.ResponseWriter, r *http.Request, file func processRangeRequest(r *http.Request, w http.ResponseWriter, totalSize int64, mimeType string, writeFn func(writer io.Writer, offset int64, size int64) error) { rangeReq := r.Header.Get("Range") + bufferedWriter := bufio.NewWriterSize(w, 128*1024) + defer bufferedWriter.Flush() if rangeReq == "" { w.Header().Set("Content-Length", strconv.FormatInt(totalSize, 10)) - if err := writeFn(w, 0, totalSize); err != nil { + if err := writeFn(bufferedWriter, 0, totalSize); err != nil { http.Error(w, err.Error(), http.StatusInternalServerError) return } @@ -321,7 +324,7 @@ func processRangeRequest(r *http.Request, w http.ResponseWriter, totalSize int64 w.Header().Set("Content-Range", ra.contentRange(totalSize)) w.WriteHeader(http.StatusPartialContent) - err = writeFn(w, ra.start, ra.length) + err = writeFn(bufferedWriter, ra.start, ra.length) if err != nil { http.Error(w, err.Error(), http.StatusInternalServerError) return @@ -361,7 +364,7 @@ func processRangeRequest(r *http.Request, w http.ResponseWriter, totalSize int64 w.Header().Set("Content-Length", strconv.FormatInt(sendSize, 10)) } w.WriteHeader(http.StatusPartialContent) - if _, err := io.CopyN(w, sendContent, sendSize); err != nil { + if _, err := io.CopyN(bufferedWriter, sendContent, sendSize); err != nil { http.Error(w, "Internal Error", http.StatusInternalServerError) return } diff --git a/weed/server/filer_grpc_server.go b/weed/server/filer_grpc_server.go index 8e6cd8451..5a5714156 100644 --- a/weed/server/filer_grpc_server.go +++ b/weed/server/filer_grpc_server.go @@ -148,7 +148,7 @@ func (fs *FilerServer) CreateEntry(ctx context.Context, req *filer_pb.CreateEntr newEntry := filer.FromPbEntry(req.Directory, req.Entry) newEntry.Chunks = chunks - createErr := fs.filer.CreateEntry(ctx, newEntry, req.OExcl, req.IsFromOtherCluster, req.Signatures) + createErr := fs.filer.CreateEntry(ctx, newEntry, req.OExcl, req.IsFromOtherCluster, req.Signatures, req.SkipCheckParentDirectory) if createErr == nil { fs.filer.DeleteChunks(garbage) @@ -271,7 +271,7 @@ func (fs *FilerServer) AppendToEntry(ctx context.Context, req *filer_pb.AppendTo glog.V(0).Infof("MaybeManifestize: %v", err) } - err = fs.filer.CreateEntry(context.Background(), entry, false, false, nil) + err = fs.filer.CreateEntry(context.Background(), entry, false, false, nil, false) return &filer_pb.AppendToEntryResponse{}, err } @@ -393,7 +393,7 @@ func (fs *FilerServer) GetFilerConfiguration(ctx context.Context, req *filer_pb. clusterId, _ := fs.filer.Store.KvGet(context.Background(), []byte("clusterId")) t := &filer_pb.GetFilerConfigurationResponse{ - Masters: pb.ToAddressStrings(fs.option.Masters), + Masters: pb.ToAddressStringsFromMap(fs.option.Masters), Collection: fs.option.Collection, Replication: fs.option.DefaultReplication, MaxMb: uint32(fs.option.MaxMB), diff --git a/weed/server/filer_grpc_server_rename.go b/weed/server/filer_grpc_server_rename.go index 773f7aebe..7d6650b53 100644 --- a/weed/server/filer_grpc_server_rename.go +++ b/weed/server/filer_grpc_server_rename.go @@ -163,13 +163,17 @@ func (fs *FilerServer) moveSelfEntry(ctx context.Context, stream filer_pb.Seawee // add to new directory newEntry := &filer.Entry{ - FullPath: newPath, - Attr: entry.Attr, - Chunks: entry.Chunks, - Extended: entry.Extended, - Content: entry.Content, - } - if createErr := fs.filer.CreateEntry(ctx, newEntry, false, false, signatures); createErr != nil { + FullPath: newPath, + Attr: entry.Attr, + Chunks: entry.Chunks, + Extended: entry.Extended, + Content: entry.Content, + HardLinkCounter: entry.HardLinkCounter, + HardLinkId: entry.HardLinkId, + Remote: entry.Remote, + Quota: entry.Quota, + } + if createErr := fs.filer.CreateEntry(ctx, newEntry, false, false, signatures, false); createErr != nil { return createErr } if stream != nil { diff --git a/weed/server/filer_server.go b/weed/server/filer_server.go index e51299c6d..7edd5870f 100644 --- a/weed/server/filer_server.go +++ b/weed/server/filer_server.go @@ -48,7 +48,7 @@ import ( ) type FilerOption struct { - Masters []pb.ServerAddress + Masters map[string]pb.ServerAddress Collection string DefaultReplication string DisableDirListing bool @@ -130,8 +130,8 @@ func NewFilerServer(defaultMux, readonlyMux *http.ServeMux, option *FilerOption) go fs.filer.KeepMasterClientConnected() if !util.LoadConfiguration("filer", false) { - v.Set("leveldb2.enabled", true) - v.Set("leveldb2.dir", option.DefaultLevelDbDir) + v.SetDefault("leveldb2.enabled", true) + v.SetDefault("leveldb2.dir", option.DefaultLevelDbDir) _, err := os.Stat(option.DefaultLevelDbDir) if os.IsNotExist(err) { os.MkdirAll(option.DefaultLevelDbDir, 0755) diff --git a/weed/server/filer_server_handlers_proxy.go b/weed/server/filer_server_handlers_proxy.go index b8b28790b..301d609ec 100644 --- a/weed/server/filer_server_handlers_proxy.go +++ b/weed/server/filer_server_handlers_proxy.go @@ -3,6 +3,7 @@ package weed_server import ( "github.com/chrislusf/seaweedfs/weed/glog" "github.com/chrislusf/seaweedfs/weed/util" + "github.com/chrislusf/seaweedfs/weed/util/mem" "io" "math/rand" "net/http" @@ -62,6 +63,9 @@ func (fs *FilerServer) proxyToVolumeServer(w http.ResponseWriter, r *http.Reques w.Header()[k] = v } w.WriteHeader(proxyResponse.StatusCode) - io.Copy(w, proxyResponse.Body) + + buf := mem.Allocate(128 * 1024) + defer mem.Free(buf) + io.CopyBuffer(w, proxyResponse.Body, buf) } diff --git a/weed/server/filer_server_handlers_read.go b/weed/server/filer_server_handlers_read.go index 8037b1d94..2bac585e9 100644 --- a/weed/server/filer_server_handlers_read.go +++ b/weed/server/filer_server_handlers_read.go @@ -4,7 +4,9 @@ import ( "bytes" "context" "fmt" + "github.com/chrislusf/seaweedfs/weed/util/mem" "io" + "math" "mime" "net/http" "path/filepath" @@ -21,7 +23,6 @@ import ( "github.com/chrislusf/seaweedfs/weed/util" ) - // Validates the preconditions. Returns true if GET/HEAD operation should not proceed. // Preconditions supported are: // If-Modified-Since @@ -119,6 +120,20 @@ func (fs *FilerServer) GetOrHeadHandler(w http.ResponseWriter, r *http.Request) return } + query := r.URL.Query() + if query.Get("metadata") == "true" { + if query.Get("resolveManifest") == "true" { + if entry.Chunks, _, err = filer.ResolveChunkManifest( + fs.filer.MasterClient.GetLookupFileIdFunction(), + entry.Chunks, 0, math.MaxInt64); err != nil { + err = fmt.Errorf("failed to resolve chunk manifest, err: %s", err.Error()) + writeJsonError(w, r, http.StatusInternalServerError, err) + } + } + writeJsonQuiet(w, r, http.StatusOK, entry) + return + } + etag := filer.ETagEntry(entry) if checkPreconditions(w, r, entry) { return @@ -185,7 +200,9 @@ func (fs *FilerServer) GetOrHeadHandler(w http.ResponseWriter, r *http.Request) } width, height, mode, shouldResize := shouldResizeImages(ext, r) if shouldResize { - data, err := filer.ReadAll(fs.filer.MasterClient, entry.Chunks) + data := mem.Allocate(int(totalSize)) + defer mem.Free(data) + err := filer.ReadAll(data, fs.filer.MasterClient, entry.Chunks) if err != nil { glog.Errorf("failed to read %s: %v", path, err) w.WriteHeader(http.StatusNotModified) diff --git a/weed/server/filer_server_handlers_tagging.go b/weed/server/filer_server_handlers_tagging.go index 70b5327d6..ae2093947 100644 --- a/weed/server/filer_server_handlers_tagging.go +++ b/weed/server/filer_server_handlers_tagging.go @@ -43,7 +43,7 @@ func (fs *FilerServer) PutTaggingHandler(w http.ResponseWriter, r *http.Request) } } - if dbErr := fs.filer.CreateEntry(ctx, existingEntry, false, false, nil); dbErr != nil { + if dbErr := fs.filer.CreateEntry(ctx, existingEntry, false, false, nil, false); dbErr != nil { glog.V(0).Infof("failing to update %s tagging : %v", path, dbErr) writeJsonError(w, r, http.StatusInternalServerError, err) return @@ -82,7 +82,9 @@ func (fs *FilerServer) DeleteTaggingHandler(w http.ResponseWriter, r *http.Reque toDelete := strings.Split(r.URL.Query().Get("tagging"), ",") deletions := make(map[string]struct{}) for _, deletion := range toDelete { - deletions[deletion] = struct{}{} + if deletion != "" { + deletions[deletion] = struct{}{} + } } // delete all tags or specific tags @@ -107,7 +109,7 @@ func (fs *FilerServer) DeleteTaggingHandler(w http.ResponseWriter, r *http.Reque return } - if dbErr := fs.filer.CreateEntry(ctx, existingEntry, false, false, nil); dbErr != nil { + if dbErr := fs.filer.CreateEntry(ctx, existingEntry, false, false, nil, false); dbErr != nil { glog.V(0).Infof("failing to delete %s tagging : %v", path, dbErr) writeJsonError(w, r, http.StatusInternalServerError, err) return diff --git a/weed/server/filer_server_handlers_write.go b/weed/server/filer_server_handlers_write.go index 1ebe66d43..3bbae8197 100644 --- a/weed/server/filer_server_handlers_write.go +++ b/weed/server/filer_server_handlers_write.go @@ -3,6 +3,7 @@ package weed_server import ( "context" "errors" + "fmt" "net/http" "os" "strings" @@ -78,11 +79,78 @@ func (fs *FilerServer) PostHandler(w http.ResponseWriter, r *http.Request, conte return } - fs.autoChunk(ctx, w, r, contentLength, so) + if query.Has("mv.from") { + fs.move(ctx, w, r, so) + } else { + fs.autoChunk(ctx, w, r, contentLength, so) + } + util.CloseRequest(r) } +func (fs *FilerServer) move(ctx context.Context, w http.ResponseWriter, r *http.Request, so *operation.StorageOption) { + src := r.URL.Query().Get("mv.from") + dst := r.URL.Path + + glog.V(2).Infof("FilerServer.move %v to %v", src, dst) + + var err error + if src, err = clearName(src); err != nil { + writeJsonError(w, r, http.StatusBadRequest, err) + return + } + if dst, err = clearName(dst); err != nil { + writeJsonError(w, r, http.StatusBadRequest, err) + return + } + src = strings.TrimRight(src, "/") + if src == "" { + err = fmt.Errorf("invalid source '/'") + writeJsonError(w, r, http.StatusBadRequest, err) + return + } + + srcPath := util.FullPath(src) + dstPath := util.FullPath(dst) + srcEntry, err := fs.filer.FindEntry(ctx, srcPath) + if err != nil { + err = fmt.Errorf("failed to get src entry '%s', err: %s", src, err) + writeJsonError(w, r, http.StatusBadRequest, err) + return + } + + oldDir, oldName := srcPath.DirAndName() + newDir, newName := dstPath.DirAndName() + newName = util.Nvl(newName, oldName) + + dstEntry, err := fs.filer.FindEntry(ctx, util.FullPath(strings.TrimRight(dst, "/"))) + if err != nil && err != filer_pb.ErrNotFound { + err = fmt.Errorf("failed to get dst entry '%s', err: %s", dst, err) + writeJsonError(w, r, http.StatusInternalServerError, err) + return + } + if err == nil && !dstEntry.IsDirectory() && srcEntry.IsDirectory() { + err = fmt.Errorf("move: cannot overwrite non-directory '%s' with directory '%s'", dst, src) + writeJsonError(w, r, http.StatusBadRequest, err) + return + } + + _, err = fs.AtomicRenameEntry(ctx, &filer_pb.AtomicRenameEntryRequest{ + OldDirectory: oldDir, + OldName: oldName, + NewDirectory: newDir, + NewName: newName, + }) + if err != nil { + err = fmt.Errorf("failed to move entry from '%s' to '%s', err: %s", src, dst, err) + writeJsonError(w, r, http.StatusBadRequest, err) + return + } + + w.WriteHeader(http.StatusNoContent) +} + // curl -X DELETE http://localhost:8888/path/to // curl -X DELETE http://localhost:8888/path/to?recursive=true // curl -X DELETE http://localhost:8888/path/to?recursive=true&ignoreRecursiveError=true diff --git a/weed/server/filer_server_handlers_write_autochunk.go b/weed/server/filer_server_handlers_write_autochunk.go index 61d30372b..854b35f82 100644 --- a/weed/server/filer_server_handlers_write_autochunk.go +++ b/weed/server/filer_server_handlers_write_autochunk.go @@ -130,6 +130,10 @@ func isAppend(r *http.Request) bool { return r.URL.Query().Get("op") == "append" } +func skipCheckParentDirEntry(r *http.Request) bool { + return r.URL.Query().Get("skipCheckParentDir") == "true" +} + func (fs *FilerServer) saveMetaData(ctx context.Context, r *http.Request, fileName string, contentType string, so *operation.StorageOption, md5bytes []byte, fileChunks []*filer_pb.FileChunk, chunkOffset int64, content []byte) (filerResult *FilerPostResult, replyerr error) { // detect file mode @@ -161,8 +165,11 @@ func (fs *FilerServer) saveMetaData(ctx context.Context, r *http.Request, fileNa var entry *filer.Entry var mergedChunks []*filer_pb.FileChunk + + isAppend := isAppend(r) + isOffsetWrite := len(fileChunks) > 0 && fileChunks[0].Offset > 0 // when it is an append - if isAppend(r) { + if isAppend || isOffsetWrite { existingEntry, findErr := fs.filer.FindEntry(ctx, util.FullPath(path)) if findErr != nil && findErr != filer_pb.ErrNotFound { glog.V(0).Infof("failing to find %s: %v", path, findErr) @@ -173,11 +180,13 @@ func (fs *FilerServer) saveMetaData(ctx context.Context, r *http.Request, fileNa entry.Mtime = time.Now() entry.Md5 = nil // adjust chunk offsets - for _, chunk := range fileChunks { - chunk.Offset += int64(entry.FileSize) + if isAppend { + for _, chunk := range fileChunks { + chunk.Offset += int64(entry.FileSize) + } + entry.FileSize += uint64(chunkOffset) } mergedChunks = append(entry.Chunks, fileChunks...) - entry.FileSize += uint64(chunkOffset) // TODO if len(entry.Content) > 0 { @@ -215,6 +224,10 @@ func (fs *FilerServer) saveMetaData(ctx context.Context, r *http.Request, fileNa return } entry.Chunks = mergedChunks + if isOffsetWrite { + entry.Md5 = nil + entry.FileSize = entry.Size() + } filerResult = &FilerPostResult{ Name: fileName, @@ -234,7 +247,7 @@ func (fs *FilerServer) saveMetaData(ctx context.Context, r *http.Request, fileNa } } - if dbErr := fs.filer.CreateEntry(ctx, entry, false, false, nil); dbErr != nil { + if dbErr := fs.filer.CreateEntry(ctx, entry, false, false, nil, skipCheckParentDirEntry(r)); dbErr != nil { replyerr = dbErr filerResult.Error = dbErr.Error() glog.V(0).Infof("failing to write %s to filer server : %v", path, dbErr) @@ -311,7 +324,7 @@ func (fs *FilerServer) mkdir(ctx context.Context, w http.ResponseWriter, r *http Name: util.FullPath(path).Name(), } - if dbErr := fs.filer.CreateEntry(ctx, entry, false, false, nil); dbErr != nil { + if dbErr := fs.filer.CreateEntry(ctx, entry, false, false, nil, false); dbErr != nil { replyerr = dbErr filerResult.Error = dbErr.Error() glog.V(0).Infof("failing to create dir %s on filer server : %v", path, dbErr) diff --git a/weed/server/filer_server_handlers_write_cipher.go b/weed/server/filer_server_handlers_write_cipher.go index 14fa10e2c..a5b085764 100644 --- a/weed/server/filer_server_handlers_write_cipher.go +++ b/weed/server/filer_server_handlers_write_cipher.go @@ -93,7 +93,7 @@ func (fs *FilerServer) encrypt(ctx context.Context, w http.ResponseWriter, r *ht Size: int64(pu.OriginalDataSize), } - if dbErr := fs.filer.CreateEntry(ctx, entry, false, false, nil); dbErr != nil { + if dbErr := fs.filer.CreateEntry(ctx, entry, false, false, nil, false); dbErr != nil { fs.filer.DeleteChunks(entry.Chunks) err = dbErr filerResult.Error = dbErr.Error() diff --git a/weed/server/filer_server_handlers_write_upload.go b/weed/server/filer_server_handlers_write_upload.go index a7716ef02..6ee378819 100644 --- a/weed/server/filer_server_handlers_write_upload.go +++ b/weed/server/filer_server_handlers_write_upload.go @@ -3,10 +3,12 @@ package weed_server import ( "bytes" "crypto/md5" + "fmt" "hash" "io" "net/http" "sort" + "strconv" "strings" "sync" "sync/atomic" @@ -28,6 +30,22 @@ var bufPool = sync.Pool{ } func (fs *FilerServer) uploadReaderToChunks(w http.ResponseWriter, r *http.Request, reader io.Reader, chunkSize int32, fileName, contentType string, contentLength int64, so *operation.StorageOption) (fileChunks []*filer_pb.FileChunk, md5Hash hash.Hash, chunkOffset int64, uploadErr error, smallContent []byte) { + query := r.URL.Query() + + isAppend := isAppend(r) + if query.Has("offset") { + offset := query.Get("offset") + offsetInt, err := strconv.ParseInt(offset, 10, 64) + if err != nil || offsetInt < 0 { + err = fmt.Errorf("invalid 'offset': '%s'", offset) + return nil, nil, 0, err, nil + } + if isAppend && offsetInt > 0 { + err = fmt.Errorf("cannot set offset when op=append") + return nil, nil, 0, err, nil + } + chunkOffset = offsetInt + } md5Hash = md5.New() var partReader = io.NopCloser(io.TeeReader(reader, md5Hash)) @@ -61,9 +79,10 @@ func (fs *FilerServer) uploadReaderToChunks(w http.ResponseWriter, r *http.Reque bufPool.Put(bytesBuffer) atomic.AddInt64(&bytesBufferCounter, -1) bytesBufferLimitCond.Signal() + uploadErr = err break } - if chunkOffset == 0 && !isAppend(r) { + if chunkOffset == 0 && !isAppend { if dataSize < fs.option.SaveToFilerLimit || strings.HasPrefix(r.URL.Path, filer.DirectoryEtcRoot) { chunkOffset += dataSize smallContent = make([]byte, dataSize) @@ -108,6 +127,7 @@ func (fs *FilerServer) uploadReaderToChunks(w http.ResponseWriter, r *http.Reque wg.Wait() if uploadErr != nil { + fs.filer.DeleteChunks(fileChunks) return nil, md5Hash, 0, uploadErr, nil } diff --git a/weed/server/master_server.go b/weed/server/master_server.go index 671432d5c..b63e3a418 100644 --- a/weed/server/master_server.go +++ b/weed/server/master_server.go @@ -75,7 +75,7 @@ type MasterServer struct { Cluster *cluster.Cluster } -func NewMasterServer(r *mux.Router, option *MasterOption, peers []pb.ServerAddress) *MasterServer { +func NewMasterServer(r *mux.Router, option *MasterOption, peers map[string]pb.ServerAddress) *MasterServer { v := util.GetViper() signingKey := v.GetString("jwt.signing.key") diff --git a/weed/server/raft_server.go b/weed/server/raft_server.go index 568bfc7b5..d559cb691 100644 --- a/weed/server/raft_server.go +++ b/weed/server/raft_server.go @@ -5,8 +5,6 @@ import ( "math/rand" "os" "path" - "sort" - "strings" "time" "google.golang.org/grpc" @@ -19,8 +17,19 @@ import ( "github.com/chrislusf/seaweedfs/weed/topology" ) +type RaftServerOption struct { + GrpcDialOption grpc.DialOption + Peers map[string]pb.ServerAddress + ServerAddr pb.ServerAddress + DataDir string + Topo *topology.Topology + RaftResumeState bool + HeartbeatInterval time.Duration + ElectionTimeout time.Duration +} + type RaftServer struct { - peers []pb.ServerAddress // initial peers to join with + peers map[string]pb.ServerAddress // initial peers to join with raftServer raft.Server dataDir string serverAddr pb.ServerAddress @@ -52,12 +61,12 @@ func (s StateMachine) Recovery(data []byte) error { return nil } -func NewRaftServer(grpcDialOption grpc.DialOption, peers []pb.ServerAddress, serverAddr pb.ServerAddress, dataDir string, topo *topology.Topology, raftResumeState bool) (*RaftServer, error) { +func NewRaftServer(option *RaftServerOption) (*RaftServer, error) { s := &RaftServer{ - peers: peers, - serverAddr: serverAddr, - dataDir: dataDir, - topo: topo, + peers: option.Peers, + serverAddr: option.ServerAddr, + dataDir: option.DataDir, + topo: option.Topo, } if glog.V(4) { @@ -67,27 +76,29 @@ func NewRaftServer(grpcDialOption grpc.DialOption, peers []pb.ServerAddress, ser raft.RegisterCommand(&topology.MaxVolumeIdCommand{}) var err error - transporter := raft.NewGrpcTransporter(grpcDialOption) - glog.V(0).Infof("Starting RaftServer with %v", serverAddr) + transporter := raft.NewGrpcTransporter(option.GrpcDialOption) + glog.V(0).Infof("Starting RaftServer with %v", option.ServerAddr) - if !raftResumeState { + // always clear previous log to avoid server is promotable + os.RemoveAll(path.Join(s.dataDir, "log")) + if !option.RaftResumeState { // always clear previous metadata os.RemoveAll(path.Join(s.dataDir, "conf")) - os.RemoveAll(path.Join(s.dataDir, "log")) os.RemoveAll(path.Join(s.dataDir, "snapshot")) } - if err := os.MkdirAll(path.Join(s.dataDir, "snapshot"), 0600); err != nil { + if err := os.MkdirAll(path.Join(s.dataDir, "snapshot"), 0700); err != nil { return nil, err } - stateMachine := StateMachine{topo: topo} - s.raftServer, err = raft.NewServer(string(s.serverAddr), s.dataDir, transporter, stateMachine, topo, "") + stateMachine := StateMachine{topo: option.Topo} + s.raftServer, err = raft.NewServer(string(s.serverAddr), s.dataDir, transporter, stateMachine, option.Topo, "") if err != nil { glog.V(0).Infoln(err) return nil, err } - s.raftServer.SetHeartbeatInterval(time.Duration(300+rand.Intn(150)) * time.Millisecond) - s.raftServer.SetElectionTimeout(10 * time.Second) + heartbeatInterval := time.Duration(float64(option.HeartbeatInterval) * (rand.Float64()*0.25 + 1)) + s.raftServer.SetHeartbeatInterval(heartbeatInterval) + s.raftServer.SetElectionTimeout(option.ElectionTimeout) if err := s.raftServer.LoadSnapshot(); err != nil { return nil, err } @@ -95,39 +106,26 @@ func NewRaftServer(grpcDialOption grpc.DialOption, peers []pb.ServerAddress, ser return nil, err } - for _, peer := range s.peers { - if err := s.raftServer.AddPeer(string(peer), peer.ToGrpcAddress()); err != nil { + for name, peer := range s.peers { + if err := s.raftServer.AddPeer(name, peer.ToGrpcAddress()); err != nil { return nil, err } } // Remove deleted peers for existsPeerName := range s.raftServer.Peers() { - exists := false - var existingPeer pb.ServerAddress - for _, peer := range s.peers { - if peer.ToGrpcAddress() == existsPeerName { - exists, existingPeer = true, peer - break - } - } - if exists { + if existingPeer, found := s.peers[existsPeerName]; !found { if err := s.raftServer.RemovePeer(existsPeerName); err != nil { glog.V(0).Infoln(err) return nil, err } else { - glog.V(0).Infof("removing old peer %s", existingPeer) + glog.V(0).Infof("removing old peer: %s", existingPeer) } } } s.GrpcServer = raft.NewGrpcServer(s.raftServer) - if s.raftServer.IsLogEmpty() && isTheFirstOne(serverAddr, s.peers) { - // Initialize the server by joining itself. - // s.DoJoinCommand() - } - glog.V(0).Infof("current cluster leader: %v", s.raftServer.Leader()) return s, nil @@ -143,16 +141,6 @@ func (s *RaftServer) Peers() (members []string) { return } -func isTheFirstOne(self pb.ServerAddress, peers []pb.ServerAddress) bool { - sort.Slice(peers, func(i, j int) bool { - return strings.Compare(string(peers[i]), string(peers[j])) < 0 - }) - if len(peers) <= 0 { - return true - } - return self == peers[0] -} - func (s *RaftServer) DoJoinCommand() { glog.V(0).Infoln("Initializing new cluster") diff --git a/weed/server/volume_server.go b/weed/server/volume_server.go index 2551cc6e6..dcd27673c 100644 --- a/weed/server/volume_server.go +++ b/weed/server/volume_server.go @@ -23,7 +23,6 @@ type VolumeServer struct { inFlightDownloadDataSize int64 concurrentUploadLimit int64 concurrentDownloadLimit int64 - inFlightUploadDataLimitCond *sync.Cond inFlightDownloadDataLimitCond *sync.Cond SeedMasterNodes []pb.ServerAddress @@ -84,7 +83,6 @@ func NewVolumeServer(adminMux, publicMux *http.ServeMux, ip string, fileSizeLimitBytes: int64(fileSizeLimitMB) * 1024 * 1024, isHeartbeating: true, stopChan: make(chan bool), - inFlightUploadDataLimitCond: sync.NewCond(new(sync.Mutex)), inFlightDownloadDataLimitCond: sync.NewCond(new(sync.Mutex)), concurrentUploadLimit: concurrentUploadLimit, concurrentDownloadLimit: concurrentDownloadLimit, @@ -98,6 +96,7 @@ func NewVolumeServer(adminMux, publicMux *http.ServeMux, ip string, handleStaticResources(adminMux) adminMux.HandleFunc("/status", vs.statusHandler) + adminMux.HandleFunc("/healthz", vs.healthzHandler) if signingKey == "" || enableUiAccess { // only expose the volume server details for safe environments adminMux.HandleFunc("/ui/index.html", vs.uiStatusHandler) diff --git a/weed/server/volume_server_handlers.go b/weed/server/volume_server_handlers.go index 510902cf0..49bc297fb 100644 --- a/weed/server/volume_server_handlers.go +++ b/weed/server/volume_server_handlers.go @@ -1,6 +1,7 @@ package weed_server import ( + "fmt" "net/http" "strconv" "strings" @@ -39,8 +40,14 @@ func (vs *VolumeServer) privateStoreHandler(w http.ResponseWriter, r *http.Reque stats.ReadRequest() vs.inFlightDownloadDataLimitCond.L.Lock() for vs.concurrentDownloadLimit != 0 && atomic.LoadInt64(&vs.inFlightDownloadDataSize) > vs.concurrentDownloadLimit { - glog.V(4).Infof("wait because inflight download data %d > %d", vs.inFlightDownloadDataSize, vs.concurrentDownloadLimit) - vs.inFlightDownloadDataLimitCond.Wait() + select { + case <-r.Context().Done(): + glog.V(4).Infof("request cancelled from %s: %v", r.RemoteAddr, r.Context().Err()) + return + default: + glog.V(4).Infof("wait because inflight download data %d > %d", vs.inFlightDownloadDataSize, vs.concurrentDownloadLimit) + vs.inFlightDownloadDataLimitCond.Wait() + } } vs.inFlightDownloadDataLimitCond.L.Unlock() vs.GetOrHeadHandler(w, r) @@ -51,16 +58,18 @@ func (vs *VolumeServer) privateStoreHandler(w http.ResponseWriter, r *http.Reque // wait until in flight data is less than the limit contentLength := getContentLength(r) - vs.inFlightUploadDataLimitCond.L.Lock() - for vs.concurrentUploadLimit != 0 && atomic.LoadInt64(&vs.inFlightUploadDataSize) > vs.concurrentUploadLimit { - glog.V(4).Infof("wait because inflight upload data %d > %d", vs.inFlightUploadDataSize, vs.concurrentUploadLimit) - vs.inFlightUploadDataLimitCond.Wait() + + // exclude the replication from the concurrentUploadLimitMB + if vs.concurrentUploadLimit != 0 && r.URL.Query().Get("type") != "replicate" && + atomic.LoadInt64(&vs.inFlightUploadDataSize) > vs.concurrentUploadLimit { + err := fmt.Errorf("reject because inflight upload data %d > %d", vs.inFlightUploadDataSize, vs.concurrentUploadLimit) + glog.V(1).Infof("too many requests: %v", err) + writeJsonError(w, r, http.StatusTooManyRequests, err) + return } - vs.inFlightUploadDataLimitCond.L.Unlock() atomic.AddInt64(&vs.inFlightUploadDataSize, contentLength) defer func() { atomic.AddInt64(&vs.inFlightUploadDataSize, -contentLength) - vs.inFlightUploadDataLimitCond.Signal() }() // processs uploads diff --git a/weed/server/volume_server_handlers_admin.go b/weed/server/volume_server_handlers_admin.go index 7e6c06871..37cf109e2 100644 --- a/weed/server/volume_server_handlers_admin.go +++ b/weed/server/volume_server_handlers_admin.go @@ -1,6 +1,7 @@ package weed_server import ( + "github.com/chrislusf/seaweedfs/weed/topology" "net/http" "path/filepath" @@ -9,6 +10,24 @@ import ( "github.com/chrislusf/seaweedfs/weed/util" ) +func (vs *VolumeServer) healthzHandler(w http.ResponseWriter, r *http.Request) { + w.Header().Set("Server", "SeaweedFS Volume "+util.VERSION) + volumeInfos := vs.store.VolumeInfos() + for _, vinfo := range volumeInfos { + if len(vinfo.Collection) == 0 { + continue + } + if vinfo.ReplicaPlacement.GetCopyCount() > 1 { + _, err := topology.GetWritableRemoteReplications(vs.store, vs.grpcDialOption, vinfo.Id, vs.GetMaster) + if err != nil { + w.WriteHeader(http.StatusServiceUnavailable) + return + } + } + } + w.WriteHeader(http.StatusOK) +} + func (vs *VolumeServer) statusHandler(w http.ResponseWriter, r *http.Request) { w.Header().Set("Server", "SeaweedFS Volume "+util.VERSION) m := make(map[string]interface{}) diff --git a/weed/server/volume_server_handlers_read.go b/weed/server/volume_server_handlers_read.go index 5ce2278bf..203f6c07d 100644 --- a/weed/server/volume_server_handlers_read.go +++ b/weed/server/volume_server_handlers_read.go @@ -6,6 +6,7 @@ import ( "errors" "fmt" "github.com/chrislusf/seaweedfs/weed/storage/types" + "github.com/chrislusf/seaweedfs/weed/util/mem" "io" "mime" "net/http" @@ -101,7 +102,9 @@ func (vs *VolumeServer) GetOrHeadHandler(w http.ResponseWriter, r *http.Request) } } w.WriteHeader(response.StatusCode) - io.Copy(w, response.Body) + buf := mem.Allocate(128 * 1024) + defer mem.Free(buf) + io.CopyBuffer(w, response.Body, buf) return } else { // redirect diff --git a/weed/server/webdav_server.go b/weed/server/webdav_server.go index 018daed8b..267c3e1f0 100644 --- a/weed/server/webdav_server.go +++ b/weed/server/webdav_server.go @@ -5,7 +5,6 @@ import ( "context" "fmt" "io" - "math" "os" "path" "strings" @@ -540,11 +539,11 @@ func (f *WebDavFile) Read(p []byte) (readSize int, err error) { return 0, io.EOF } if f.entryViewCache == nil { - f.entryViewCache, _ = filer.NonOverlappingVisibleIntervals(filer.LookupFn(f.fs), f.entry.Chunks, 0, math.MaxInt64) + f.entryViewCache, _ = filer.NonOverlappingVisibleIntervals(filer.LookupFn(f.fs), f.entry.Chunks, 0, fileSize) f.reader = nil } if f.reader == nil { - chunkViews := filer.ViewFromVisibleIntervals(f.entryViewCache, 0, math.MaxInt64) + chunkViews := filer.ViewFromVisibleIntervals(f.entryViewCache, 0, fileSize) f.reader = filer.NewChunkReaderAtFromClient(filer.LookupFn(f.fs), chunkViews, f.fs.chunkCache, fileSize) } |
