diff options
Diffstat (limited to 'weed/command/volume.go')
| -rw-r--r-- | weed/command/volume.go | 73 |
1 files changed, 40 insertions, 33 deletions
diff --git a/weed/command/volume.go b/weed/command/volume.go index 4f04a467d..dfc649ba5 100644 --- a/weed/command/volume.go +++ b/weed/command/volume.go @@ -25,6 +25,7 @@ import ( "github.com/chrislusf/seaweedfs/weed/glog" "github.com/chrislusf/seaweedfs/weed/pb/volume_server_pb" "github.com/chrislusf/seaweedfs/weed/server" + stats_collect "github.com/chrislusf/seaweedfs/weed/stats" "github.com/chrislusf/seaweedfs/weed/storage" "github.com/chrislusf/seaweedfs/weed/util" ) @@ -55,6 +56,8 @@ type VolumeServerOptions struct { fileSizeLimitMB *int minFreeSpacePercents []float32 pprof *bool + preStopSeconds *int + metricsHttpPort *int // pulseSeconds *int } @@ -66,6 +69,7 @@ func init() { v.publicUrl = cmdVolume.Flag.String("publicUrl", "", "Publicly accessible address") v.bindIp = cmdVolume.Flag.String("ip.bind", "0.0.0.0", "ip address to bind to") v.masters = cmdVolume.Flag.String("mserver", "localhost:9333", "comma-separated master servers") + v.preStopSeconds = cmdVolume.Flag.Int("preStopSeconds", 10, "number of seconds between stop send heartbeats and stop volume server") // v.pulseSeconds = cmdVolume.Flag.Int("pulseSeconds", 5, "number of seconds between heartbeats, must be smaller than or equal to the master's setting") v.idleConnectionTimeout = cmdVolume.Flag.Int("idleTimeout", 30, "connection idle seconds") v.dataCenter = cmdVolume.Flag.String("dataCenter", "", "current volume server's data center name") @@ -76,8 +80,9 @@ func init() { v.cpuProfile = cmdVolume.Flag.String("cpuprofile", "", "cpu profile output file") v.memProfile = cmdVolume.Flag.String("memprofile", "", "memory profile output file") v.compactionMBPerSecond = cmdVolume.Flag.Int("compactionMBps", 0, "limit background compaction or copying speed in mega bytes per second") - v.fileSizeLimitMB = cmdVolume.Flag.Int("fileSizeLimitMB", 256, "limit file size to avoid out of memory") + v.fileSizeLimitMB = cmdVolume.Flag.Int("fileSizeLimitMB", 1024, "limit file size to avoid out of memory") v.pprof = cmdVolume.Flag.Bool("pprof", false, "enable pprof http handlers. precludes --memprofile and --cpuprofile") + v.metricsHttpPort = cmdVolume.Flag.Int("metricsPort", 0, "Prometheus metrics listen port") } var cmdVolume = &Command{ @@ -107,6 +112,8 @@ func runVolume(cmd *Command, args []string) bool { grace.SetupProfiling(*v.cpuProfile, *v.memProfile) } + go stats_collect.StartMetricsServer(*v.metricsHttpPort) + v.startVolumeServer(*volumeFolders, *maxVolumeCounts, *volumeWhiteListOption, *minFreeSpacePercent) return true @@ -206,7 +213,6 @@ func (v VolumeServerOptions) startVolumeServer(volumeFolders, maxVolumeCounts, v *v.compactionMBPerSecond, *v.fileSizeLimitMB, ) - // starting grpc server grpcS := v.startGrpcService(volumeServer) @@ -222,47 +228,48 @@ func (v VolumeServerOptions) startVolumeServer(volumeFolders, maxVolumeCounts, v // starting the cluster http server clusterHttpServer := v.startClusterHttpService(volumeMux) - stopChain := make(chan struct{}) + stopChan := make(chan bool) grace.OnInterrupt(func() { fmt.Println("volume server has be killed") - var startTime time.Time - - // firstly, stop the public http service to prevent from receiving new user request - if nil != publicHttpDown { - startTime = time.Now() - if err := publicHttpDown.Stop(); err != nil { - glog.Warningf("stop the public http server failed, %v", err) - } - delta := time.Now().Sub(startTime).Nanoseconds() / 1e6 - glog.V(0).Infof("stop public http server, elapsed %dms", delta) - } - startTime = time.Now() - if err := clusterHttpServer.Stop(); err != nil { - glog.Warningf("stop the cluster http server failed, %v", err) + // Stop heartbeats + if !volumeServer.StopHeartbeat() { + glog.V(0).Infof("stop send heartbeat and wait %d seconds until shutdown ...", *v.preStopSeconds) + time.Sleep(time.Duration(*v.preStopSeconds) * time.Second) } - delta := time.Now().Sub(startTime).Nanoseconds() / 1e6 - glog.V(0).Infof("graceful stop cluster http server, elapsed [%d]", delta) - startTime = time.Now() - grpcS.GracefulStop() - delta = time.Now().Sub(startTime).Nanoseconds() / 1e6 - glog.V(0).Infof("graceful stop gRPC, elapsed [%d]", delta) + shutdown(publicHttpDown, clusterHttpServer, grpcS, volumeServer) + stopChan <- true + }) - startTime = time.Now() - volumeServer.Shutdown() - delta = time.Now().Sub(startTime).Nanoseconds() / 1e6 - glog.V(0).Infof("stop volume server, elapsed [%d]", delta) + select { + case <-stopChan: + } - pprof.StopCPUProfile() +} - close(stopChain) // notify exit - }) +func shutdown(publicHttpDown httpdown.Server, clusterHttpServer httpdown.Server, grpcS *grpc.Server, volumeServer *weed_server.VolumeServer) { - select { - case <-stopChain: + // firstly, stop the public http service to prevent from receiving new user request + if nil != publicHttpDown { + glog.V(0).Infof("stop public http server ... ") + if err := publicHttpDown.Stop(); err != nil { + glog.Warningf("stop the public http server failed, %v", err) + } + } + + glog.V(0).Infof("graceful stop cluster http server ... ") + if err := clusterHttpServer.Stop(); err != nil { + glog.Warningf("stop the cluster http server failed, %v", err) } - glog.Warningf("the volume server exit.") + + glog.V(0).Infof("graceful stop gRPC ...") + grpcS.GracefulStop() + + volumeServer.Shutdown() + + pprof.StopCPUProfile() + } // check whether configure the public port |
