aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorChris Lu <chrislusf@users.noreply.github.com>2019-11-29 00:11:57 -0800
committerGitHub <noreply@github.com>2019-11-29 00:11:57 -0800
commita9dbd52e063263cf418bb30805f90e6a9f69fd1a (patch)
tree380e095ff888223572f2a6dae508b4249a0dae08
parentea9a7e61d0d9ddc0608a2c5a6eb237e1bf456237 (diff)
parentea9d1ebd2f05c5fc55186c5e624585f1eddf5b68 (diff)
downloadseaweedfs-a9dbd52e063263cf418bb30805f90e6a9f69fd1a.tar.xz
seaweedfs-a9dbd52e063263cf418bb30805f90e6a9f69fd1a.zip
Merge pull request #1145 from stlpmo-jn/volume_graceful_stop
let volume server graceful stop
-rw-r--r--weed/command/volume.go146
-rw-r--r--weed/util/httpdown/http_down.go395
2 files changed, 509 insertions, 32 deletions
diff --git a/weed/command/volume.go b/weed/command/volume.go
index 3c1aa2b50..3e8341ef8 100644
--- a/weed/command/volume.go
+++ b/weed/command/volume.go
@@ -1,6 +1,7 @@
package command
import (
+ "fmt"
"net/http"
"os"
"runtime"
@@ -10,7 +11,9 @@ import (
"time"
"github.com/chrislusf/seaweedfs/weed/security"
+ "github.com/chrislusf/seaweedfs/weed/util/httpdown"
"github.com/spf13/viper"
+ "google.golang.org/grpc"
"github.com/chrislusf/seaweedfs/weed/glog"
"github.com/chrislusf/seaweedfs/weed/pb/volume_server_pb"
@@ -94,7 +97,7 @@ func runVolume(cmd *Command, args []string) bool {
func (v VolumeServerOptions) startVolumeServer(volumeFolders, maxVolumeCounts, volumeWhiteListOption string) {
- //Set multiple folders and each folder's max volume count limit'
+ // Set multiple folders and each folder's max volume count limit'
v.folders = strings.Split(volumeFolders, ",")
maxCountStrings := strings.Split(maxVolumeCounts, ",")
for _, maxString := range maxCountStrings {
@@ -113,7 +116,7 @@ func (v VolumeServerOptions) startVolumeServer(volumeFolders, maxVolumeCounts, v
}
}
- //security related white list configuration
+ // security related white list configuration
if volumeWhiteListOption != "" {
v.whiteList = strings.Split(volumeWhiteListOption, ",")
}
@@ -128,11 +131,10 @@ func (v VolumeServerOptions) startVolumeServer(volumeFolders, maxVolumeCounts, v
if *v.publicUrl == "" {
*v.publicUrl = *v.ip + ":" + strconv.Itoa(*v.publicPort)
}
- isSeperatedPublicPort := *v.publicPort != *v.port
volumeMux := http.NewServeMux()
publicVolumeMux := volumeMux
- if isSeperatedPublicPort {
+ if v.isSeparatedPublicPort() {
publicVolumeMux = http.NewServeMux()
}
@@ -158,51 +160,131 @@ func (v VolumeServerOptions) startVolumeServer(volumeFolders, maxVolumeCounts, v
*v.compactionMBPerSecond,
)
- listeningAddress := *v.bindIp + ":" + strconv.Itoa(*v.port)
- glog.V(0).Infof("Start Seaweed volume server %s at %s", util.VERSION, listeningAddress)
- listener, e := util.NewListener(listeningAddress, time.Duration(*v.idleConnectionTimeout)*time.Second)
- if e != nil {
- glog.Fatalf("Volume server listener error:%v", e)
- }
- if isSeperatedPublicPort {
- publicListeningAddress := *v.bindIp + ":" + strconv.Itoa(*v.publicPort)
- glog.V(0).Infoln("Start Seaweed volume server", util.VERSION, "public at", publicListeningAddress)
- publicListener, e := util.NewListener(publicListeningAddress, time.Duration(*v.idleConnectionTimeout)*time.Second)
- if e != nil {
- glog.Fatalf("Volume server listener error:%v", e)
+ // starting grpc server
+ grpcS := v.startGrpcService(volumeServer)
+
+ // starting public http server
+ var publicHttpDown httpdown.Server
+ if v.isSeparatedPublicPort() {
+ publicHttpDown = v.startPublicHttpService(publicVolumeMux)
+ if nil == publicHttpDown {
+ glog.Fatalf("start public http service failed")
}
- go func() {
- if e := http.Serve(publicListener, publicVolumeMux); e != nil {
- glog.Fatalf("Volume server fail to serve public: %v", e)
- }
- }()
}
+ // starting the cluster http server
+ clusterHttpServer := v.startClusterHttpService(volumeMux)
+
+ stopChain := make(chan struct{})
util.OnInterrupt(func() {
+ fmt.Println("volume server has be killed")
+ var startTime time.Time
+
+ // firstly, stop the public http service to prevent from receiving new user request
+ if nil != publicHttpDown {
+ startTime = time.Now()
+ if err := publicHttpDown.Stop(); err != nil {
+ glog.Warningf("stop the public http server failed, %v", err)
+ }
+ delta := time.Now().Sub(startTime).Nanoseconds() / 1e6
+ glog.V(0).Infof("stop public http server, elapsed %dms", delta)
+ }
+
+ startTime = time.Now()
+ if err := clusterHttpServer.Stop(); err != nil {
+ glog.Warningf("stop the cluster http server failed, %v", err)
+ }
+ delta := time.Now().Sub(startTime).Nanoseconds() / 1e6
+ glog.V(0).Infof("graceful stop cluster http server, elapsed [%d]", delta)
+
+ startTime = time.Now()
+ grpcS.GracefulStop()
+ delta = time.Now().Sub(startTime).Nanoseconds() / 1e6
+ glog.V(0).Infof("graceful stop gRPC, elapsed [%d]", delta)
+
+ startTime = time.Now()
volumeServer.Shutdown()
+ delta = time.Now().Sub(startTime).Nanoseconds() / 1e6
+ glog.V(0).Infof("stop volume server, elapsed [%d]", delta)
+
pprof.StopCPUProfile()
+
+ close(stopChain) // notify exit
})
- // starting grpc server
+ select {
+ case <-stopChain:
+ }
+ glog.Warningf("the volume server exit.")
+}
+
+// check whether configure the public port
+func (v VolumeServerOptions) isSeparatedPublicPort() bool {
+ return *v.publicPort != *v.port
+}
+
+func (v VolumeServerOptions) startGrpcService(vs volume_server_pb.VolumeServerServer) *grpc.Server {
grpcPort := *v.port + 10000
grpcL, err := util.NewListener(*v.bindIp+":"+strconv.Itoa(grpcPort), 0)
if err != nil {
glog.Fatalf("failed to listen on grpc port %d: %v", grpcPort, err)
}
grpcS := util.NewGrpcServer(security.LoadServerTLS(viper.Sub("grpc"), "volume"))
- volume_server_pb.RegisterVolumeServerServer(grpcS, volumeServer)
+ volume_server_pb.RegisterVolumeServerServer(grpcS, vs)
reflection.Register(grpcS)
- go grpcS.Serve(grpcL)
-
- if viper.GetString("https.volume.key") != "" {
- if e := http.ServeTLS(listener, volumeMux,
- viper.GetString("https.volume.cert"), viper.GetString("https.volume.key")); e != nil {
- glog.Fatalf("Volume server fail to serve: %v", e)
+ go func() {
+ if err := grpcS.Serve(grpcL); err != nil {
+ glog.Fatalf("start gRPC service failed, %s", err)
}
- } else {
- if e := http.Serve(listener, volumeMux); e != nil {
- glog.Fatalf("Volume server fail to serve: %v", e)
+ }()
+ return grpcS
+}
+
+func (v VolumeServerOptions) startPublicHttpService(handler http.Handler) httpdown.Server {
+ publicListeningAddress := *v.bindIp + ":" + strconv.Itoa(*v.publicPort)
+ glog.V(0).Infoln("Start Seaweed volume server", util.VERSION, "public at", publicListeningAddress)
+ publicListener, e := util.NewListener(publicListeningAddress, time.Duration(*v.idleConnectionTimeout)*time.Second)
+ if e != nil {
+ glog.Fatalf("Volume server listener error:%v", e)
+ }
+
+ pubHttp := httpdown.HTTP{StopTimeout: 5 * time.Minute, KillTimeout: 5 * time.Minute}
+ publicHttpDown := pubHttp.Serve(&http.Server{Handler: handler}, publicListener)
+ go func() {
+ if err := publicHttpDown.Wait(); err != nil {
+ glog.Errorf("public http down wait failed, %v", err)
}
+ }()
+
+ return publicHttpDown
+}
+
+func (v VolumeServerOptions) startClusterHttpService(handler http.Handler) httpdown.Server {
+ var (
+ certFile, keyFile string
+ )
+ if viper.GetString("https.volume.key") != "" {
+ certFile = viper.GetString("https.volume.cert")
+ keyFile = viper.GetString("https.volume.key")
+ }
+
+ listeningAddress := *v.bindIp + ":" + strconv.Itoa(*v.port)
+ glog.V(0).Infof("Start Seaweed volume server %s at %s", util.VERSION, listeningAddress)
+ listener, e := util.NewListener(listeningAddress, time.Duration(*v.idleConnectionTimeout)*time.Second)
+ if e != nil {
+ glog.Fatalf("Volume server listener error:%v", e)
}
+ httpDown := httpdown.HTTP{
+ KillTimeout: 5 * time.Minute,
+ StopTimeout: 5 * time.Minute,
+ CertFile: certFile,
+ KeyFile: keyFile}
+ clusterHttpServer := httpDown.Serve(&http.Server{Handler: handler}, listener)
+ go func() {
+ if e := clusterHttpServer.Wait(); e != nil {
+ glog.Fatalf("Volume server fail to serve: %v", e)
+ }
+ }()
+ return clusterHttpServer
}
diff --git a/weed/util/httpdown/http_down.go b/weed/util/httpdown/http_down.go
new file mode 100644
index 000000000..5cbd9611c
--- /dev/null
+++ b/weed/util/httpdown/http_down.go
@@ -0,0 +1,395 @@
+// Package httpdown provides http.ConnState enabled graceful termination of
+// http.Server.
+// based on github.com/facebookarchive/httpdown, who's licence is MIT-licence,
+// we add a feature of supporting for http TLS
+package httpdown
+
+import (
+ "crypto/tls"
+ "fmt"
+ "net"
+ "net/http"
+ "os"
+ "os/signal"
+ "sync"
+ "syscall"
+ "time"
+
+ "github.com/facebookgo/clock"
+ "github.com/facebookgo/stats"
+)
+
+const (
+ defaultStopTimeout = time.Minute
+ defaultKillTimeout = time.Minute
+)
+
+// A Server allows encapsulates the process of accepting new connections and
+// serving them, and gracefully shutting down the listener without dropping
+// active connections.
+type Server interface {
+ // Wait waits for the serving loop to finish. This will happen when Stop is
+ // called, at which point it returns no error, or if there is an error in the
+ // serving loop. You must call Wait after calling Serve or ListenAndServe.
+ Wait() error
+
+ // Stop stops the listener. It will block until all connections have been
+ // closed.
+ Stop() error
+}
+
+// HTTP defines the configuration for serving a http.Server. Multiple calls to
+// Serve or ListenAndServe can be made on the same HTTP instance. The default
+// timeouts of 1 minute each result in a maximum of 2 minutes before a Stop()
+// returns.
+type HTTP struct {
+ // StopTimeout is the duration before we begin force closing connections.
+ // Defaults to 1 minute.
+ StopTimeout time.Duration
+
+ // KillTimeout is the duration before which we completely give up and abort
+ // even though we still have connected clients. This is useful when a large
+ // number of client connections exist and closing them can take a long time.
+ // Note, this is in addition to the StopTimeout. Defaults to 1 minute.
+ KillTimeout time.Duration
+
+ // Stats is optional. If provided, it will be used to record various metrics.
+ Stats stats.Client
+
+ // Clock allows for testing timing related functionality. Do not specify this
+ // in production code.
+ Clock clock.Clock
+
+ // when set CertFile and KeyFile, the httpDown will start a http with TLS.
+ // Files containing a certificate and matching private key for the
+ // server must be provided if neither the Server's
+ // TLSConfig.Certificates nor TLSConfig.GetCertificate are populated.
+ // If the certificate is signed by a certificate authority, the
+ // certFile should be the concatenation of the server's certificate,
+ // any intermediates, and the CA's certificate.
+ CertFile, KeyFile string
+}
+
+// Serve provides the low-level API which is useful if you're creating your own
+// net.Listener.
+func (h HTTP) Serve(s *http.Server, l net.Listener) Server {
+ stopTimeout := h.StopTimeout
+ if stopTimeout == 0 {
+ stopTimeout = defaultStopTimeout
+ }
+ killTimeout := h.KillTimeout
+ if killTimeout == 0 {
+ killTimeout = defaultKillTimeout
+ }
+ klock := h.Clock
+ if klock == nil {
+ klock = clock.New()
+ }
+
+ ss := &server{
+ stopTimeout: stopTimeout,
+ killTimeout: killTimeout,
+ stats: h.Stats,
+ clock: klock,
+ oldConnState: s.ConnState,
+ listener: l,
+ server: s,
+ serveDone: make(chan struct{}),
+ serveErr: make(chan error, 1),
+ new: make(chan net.Conn),
+ active: make(chan net.Conn),
+ idle: make(chan net.Conn),
+ closed: make(chan net.Conn),
+ stop: make(chan chan struct{}),
+ kill: make(chan chan struct{}),
+ certFile: h.CertFile,
+ keyFile: h.KeyFile,
+ }
+ s.ConnState = ss.connState
+ go ss.manage()
+ go ss.serve()
+ return ss
+}
+
+// ListenAndServe returns a Server for the given http.Server. It is equivalent
+// to ListenAndServe from the standard library, but returns immediately.
+// Requests will be accepted in a background goroutine. If the http.Server has
+// a non-nil TLSConfig, a TLS enabled listener will be setup.
+func (h HTTP) ListenAndServe(s *http.Server) (Server, error) {
+ addr := s.Addr
+ if addr == "" {
+ if s.TLSConfig == nil {
+ addr = ":http"
+ } else {
+ addr = ":https"
+ }
+ }
+ l, err := net.Listen("tcp", addr)
+ if err != nil {
+ stats.BumpSum(h.Stats, "listen.error", 1)
+ return nil, err
+ }
+ if s.TLSConfig != nil {
+ l = tls.NewListener(l, s.TLSConfig)
+ }
+ return h.Serve(s, l), nil
+}
+
+// server manages the serving process and allows for gracefully stopping it.
+type server struct {
+ stopTimeout time.Duration
+ killTimeout time.Duration
+ stats stats.Client
+ clock clock.Clock
+
+ oldConnState func(net.Conn, http.ConnState)
+ server *http.Server
+ serveDone chan struct{}
+ serveErr chan error
+ listener net.Listener
+
+ new chan net.Conn
+ active chan net.Conn
+ idle chan net.Conn
+ closed chan net.Conn
+ stop chan chan struct{}
+ kill chan chan struct{}
+
+ stopOnce sync.Once
+ stopErr error
+
+ certFile, keyFile string
+}
+
+func (s *server) connState(c net.Conn, cs http.ConnState) {
+ if s.oldConnState != nil {
+ s.oldConnState(c, cs)
+ }
+
+ switch cs {
+ case http.StateNew:
+ s.new <- c
+ case http.StateActive:
+ s.active <- c
+ case http.StateIdle:
+ s.idle <- c
+ case http.StateHijacked, http.StateClosed:
+ s.closed <- c
+ }
+}
+
+func (s *server) manage() {
+ defer func() {
+ close(s.new)
+ close(s.active)
+ close(s.idle)
+ close(s.closed)
+ close(s.stop)
+ close(s.kill)
+ }()
+
+ var stopDone chan struct{}
+
+ conns := map[net.Conn]http.ConnState{}
+ var countNew, countActive, countIdle float64
+
+ // decConn decrements the count associated with the current state of the
+ // given connection.
+ decConn := func(c net.Conn) {
+ switch conns[c] {
+ default:
+ panic(fmt.Errorf("unknown existing connection: %s", c))
+ case http.StateNew:
+ countNew--
+ case http.StateActive:
+ countActive--
+ case http.StateIdle:
+ countIdle--
+ }
+ }
+
+ // setup a ticker to report various values every minute. if we don't have a
+ // Stats implementation provided, we Stop it so it never ticks.
+ statsTicker := s.clock.Ticker(time.Minute)
+ if s.stats == nil {
+ statsTicker.Stop()
+ }
+
+ for {
+ select {
+ case <-statsTicker.C:
+ // we'll only get here when s.stats is not nil
+ s.stats.BumpAvg("http-state.new", countNew)
+ s.stats.BumpAvg("http-state.active", countActive)
+ s.stats.BumpAvg("http-state.idle", countIdle)
+ s.stats.BumpAvg("http-state.total", countNew+countActive+countIdle)
+ case c := <-s.new:
+ conns[c] = http.StateNew
+ countNew++
+ case c := <-s.active:
+ decConn(c)
+ countActive++
+
+ conns[c] = http.StateActive
+ case c := <-s.idle:
+ decConn(c)
+ countIdle++
+
+ conns[c] = http.StateIdle
+
+ // if we're already stopping, close it
+ if stopDone != nil {
+ c.Close()
+ }
+ case c := <-s.closed:
+ stats.BumpSum(s.stats, "conn.closed", 1)
+ decConn(c)
+ delete(conns, c)
+
+ // if we're waiting to stop and are all empty, we just closed the last
+ // connection and we're done.
+ if stopDone != nil && len(conns) == 0 {
+ close(stopDone)
+ return
+ }
+ case stopDone = <-s.stop:
+ // if we're already all empty, we're already done
+ if len(conns) == 0 {
+ close(stopDone)
+ return
+ }
+
+ // close current idle connections right away
+ for c, cs := range conns {
+ if cs == http.StateIdle {
+ c.Close()
+ }
+ }
+
+ // continue the loop and wait for all the ConnState updates which will
+ // eventually close(stopDone) and return from this goroutine.
+
+ case killDone := <-s.kill:
+ // force close all connections
+ stats.BumpSum(s.stats, "kill.conn.count", float64(len(conns)))
+ for c := range conns {
+ c.Close()
+ }
+
+ // don't block the kill.
+ close(killDone)
+
+ // continue the loop and we wait for all the ConnState updates and will
+ // return from this goroutine when we're all done. otherwise we'll try to
+ // send those ConnState updates on closed channels.
+
+ }
+ }
+}
+
+func (s *server) serve() {
+ stats.BumpSum(s.stats, "serve", 1)
+ if s.certFile == "" && s.keyFile == "" {
+ s.serveErr <- s.server.Serve(s.listener)
+ } else {
+ s.serveErr <- s.server.ServeTLS(s.listener, s.certFile, s.keyFile)
+ }
+ close(s.serveDone)
+ close(s.serveErr)
+}
+
+func (s *server) Wait() error {
+ if err := <-s.serveErr; !isUseOfClosedError(err) {
+ return err
+ }
+ return nil
+}
+
+func (s *server) Stop() error {
+ s.stopOnce.Do(func() {
+ defer stats.BumpTime(s.stats, "stop.time").End()
+ stats.BumpSum(s.stats, "stop", 1)
+
+ // first disable keep-alive for new connections
+ s.server.SetKeepAlivesEnabled(false)
+
+ // then close the listener so new connections can't connect come thru
+ closeErr := s.listener.Close()
+ <-s.serveDone
+
+ // then trigger the background goroutine to stop and wait for it
+ stopDone := make(chan struct{})
+ s.stop <- stopDone
+
+ // wait for stop
+ select {
+ case <-stopDone:
+ case <-s.clock.After(s.stopTimeout):
+ defer stats.BumpTime(s.stats, "kill.time").End()
+ stats.BumpSum(s.stats, "kill", 1)
+
+ // stop timed out, wait for kill
+ killDone := make(chan struct{})
+ s.kill <- killDone
+ select {
+ case <-killDone:
+ case <-s.clock.After(s.killTimeout):
+ // kill timed out, give up
+ stats.BumpSum(s.stats, "kill.timeout", 1)
+ }
+ }
+
+ if closeErr != nil && !isUseOfClosedError(closeErr) {
+ stats.BumpSum(s.stats, "listener.close.error", 1)
+ s.stopErr = closeErr
+ }
+ })
+ return s.stopErr
+}
+
+func isUseOfClosedError(err error) bool {
+ if err == nil {
+ return false
+ }
+ if opErr, ok := err.(*net.OpError); ok {
+ err = opErr.Err
+ }
+ return err.Error() == "use of closed network connection"
+}
+
+// ListenAndServe is a convenience function to serve and wait for a SIGTERM
+// or SIGINT before shutting down.
+func ListenAndServe(s *http.Server, hd *HTTP) error {
+ if hd == nil {
+ hd = &HTTP{}
+ }
+ hs, err := hd.ListenAndServe(s)
+ if err != nil {
+ return err
+ }
+
+ waiterr := make(chan error, 1)
+ go func() {
+ defer close(waiterr)
+ waiterr <- hs.Wait()
+ }()
+
+ signals := make(chan os.Signal, 10)
+ signal.Notify(signals, syscall.SIGTERM, syscall.SIGINT)
+
+ select {
+ case err := <-waiterr:
+ if err != nil {
+ return err
+ }
+ case <-signals:
+ signal.Stop(signals)
+ if err := hs.Stop(); err != nil {
+ return err
+ }
+ if err := <-waiterr; err != nil {
+ return err
+ }
+ }
+ return nil
+}