diff options
| author | shibinbin <shibinbin@megvii.com> | 2020-10-28 11:36:42 +0800 |
|---|---|---|
| committer | shibinbin <shibinbin@megvii.com> | 2020-10-28 11:36:42 +0800 |
| commit | 7cc07655d493d11c967cfa978ddc5181d4b6b861 (patch) | |
| tree | 5ae5bcf7ccc3cee3c55372674753d7c1ca48dff9 /weed/command/volume.go | |
| parent | 29a4c3944eeb07434060df52dfb1d3cf4c59dc91 (diff) | |
| parent | 53c3aad87528d57343afc5fdb3fb5107544af0fc (diff) | |
| download | seaweedfs-7cc07655d493d11c967cfa978ddc5181d4b6b861.tar.xz seaweedfs-7cc07655d493d11c967cfa978ddc5181d4b6b861.zip | |
Merge remote-tracking branch 'upstream/master'
Diffstat (limited to 'weed/command/volume.go')
| -rw-r--r-- | weed/command/volume.go | 108 |
1 files changed, 65 insertions, 43 deletions
diff --git a/weed/command/volume.go b/weed/command/volume.go index d0fdd2ed1..d73c24ed1 100644 --- a/weed/command/volume.go +++ b/weed/command/volume.go @@ -25,6 +25,7 @@ import ( "github.com/chrislusf/seaweedfs/weed/glog" "github.com/chrislusf/seaweedfs/weed/pb/volume_server_pb" "github.com/chrislusf/seaweedfs/weed/server" + stats_collect "github.com/chrislusf/seaweedfs/weed/stats" "github.com/chrislusf/seaweedfs/weed/storage" "github.com/chrislusf/seaweedfs/weed/util" ) @@ -53,8 +54,10 @@ type VolumeServerOptions struct { memProfile *string compactionMBPerSecond *int fileSizeLimitMB *int - minFreeSpacePercent []float32 + minFreeSpacePercents []float32 pprof *bool + preStopSeconds *int + metricsHttpPort *int // pulseSeconds *int } @@ -66,6 +69,7 @@ func init() { v.publicUrl = cmdVolume.Flag.String("publicUrl", "", "Publicly accessible address") v.bindIp = cmdVolume.Flag.String("ip.bind", "0.0.0.0", "ip address to bind to") v.masters = cmdVolume.Flag.String("mserver", "localhost:9333", "comma-separated master servers") + v.preStopSeconds = cmdVolume.Flag.Int("preStopSeconds", 10, "number of seconds between stop send heartbeats and stop volume server") // v.pulseSeconds = cmdVolume.Flag.Int("pulseSeconds", 5, "number of seconds between heartbeats, must be smaller than or equal to the master's setting") v.idleConnectionTimeout = cmdVolume.Flag.Int("idleTimeout", 30, "connection idle seconds") v.dataCenter = cmdVolume.Flag.String("dataCenter", "", "current volume server's data center name") @@ -76,8 +80,9 @@ func init() { v.cpuProfile = cmdVolume.Flag.String("cpuprofile", "", "cpu profile output file") v.memProfile = cmdVolume.Flag.String("memprofile", "", "memory profile output file") v.compactionMBPerSecond = cmdVolume.Flag.Int("compactionMBps", 0, "limit background compaction or copying speed in mega bytes per second") - v.fileSizeLimitMB = cmdVolume.Flag.Int("fileSizeLimitMB", 256, "limit file size to avoid out of memory") + v.fileSizeLimitMB = cmdVolume.Flag.Int("fileSizeLimitMB", 1024, "limit file size to avoid out of memory") v.pprof = cmdVolume.Flag.Bool("pprof", false, "enable pprof http handlers. precludes --memprofile and --cpuprofile") + v.metricsHttpPort = cmdVolume.Flag.Int("metricsPort", 0, "Prometheus metrics listen port") } var cmdVolume = &Command{ @@ -90,9 +95,9 @@ var cmdVolume = &Command{ var ( volumeFolders = cmdVolume.Flag.String("dir", os.TempDir(), "directories to store data files. dir[,dir]...") - maxVolumeCounts = cmdVolume.Flag.String("max", "7", "maximum numbers of volumes, count[,count]... If set to zero on non-windows OS, the limit will be auto configured.") + maxVolumeCounts = cmdVolume.Flag.String("max", "8", "maximum numbers of volumes, count[,count]... If set to zero, the limit will be auto configured.") volumeWhiteListOption = cmdVolume.Flag.String("whiteList", "", "comma separated Ip addresses having write permission. No limit if empty.") - minFreeSpacePercent = cmdVolume.Flag.String("minFreeSpacePercent ", "0", "minimum free disk space(in percents). If free disk space lower this value - all volumes marks as ReadOnly") + minFreeSpacePercent = cmdVolume.Flag.String("minFreeSpacePercent", "1", "minimum free disk space (default to 1%). Low disk space will mark all volumes as ReadOnly.") ) func runVolume(cmd *Command, args []string) bool { @@ -107,6 +112,8 @@ func runVolume(cmd *Command, args []string) bool { grace.SetupProfiling(*v.cpuProfile, *v.memProfile) } + go stats_collect.StartMetricsServer(*v.metricsHttpPort) + v.startVolumeServer(*volumeFolders, *maxVolumeCounts, *volumeWhiteListOption, *minFreeSpacePercent) return true @@ -116,6 +123,13 @@ func (v VolumeServerOptions) startVolumeServer(volumeFolders, maxVolumeCounts, v // Set multiple folders and each folder's max volume count limit' v.folders = strings.Split(volumeFolders, ",") + for _, folder := range v.folders { + if err := util.TestFolderWritable(util.ResolvePath(folder)); err != nil { + glog.Fatalf("Check Data Folder(-dir) Writable %s : %s", folder, err) + } + } + + // set max maxCountStrings := strings.Split(maxVolumeCounts, ",") for _, maxString := range maxCountStrings { if max, e := strconv.Atoi(maxString); e == nil { @@ -124,24 +138,32 @@ func (v VolumeServerOptions) startVolumeServer(volumeFolders, maxVolumeCounts, v glog.Fatalf("The max specified in -max not a valid number %s", maxString) } } + if len(v.folderMaxLimits) == 1 && len(v.folders) > 1 { + for i := 0; i < len(v.folders)-1; i++ { + v.folderMaxLimits = append(v.folderMaxLimits, v.folderMaxLimits[0]) + } + } if len(v.folders) != len(v.folderMaxLimits) { glog.Fatalf("%d directories by -dir, but only %d max is set by -max", len(v.folders), len(v.folderMaxLimits)) } + + // set minFreeSpacePercent minFreeSpacePercentStrings := strings.Split(minFreeSpacePercent, ",") for _, freeString := range minFreeSpacePercentStrings { - if value, e := strconv.ParseFloat(freeString, 32); e == nil { - v.minFreeSpacePercent = append(v.minFreeSpacePercent, float32(value)) + v.minFreeSpacePercents = append(v.minFreeSpacePercents, float32(value)) } else { glog.Fatalf("The value specified in -minFreeSpacePercent not a valid value %s", freeString) } } - - for _, folder := range v.folders { - if err := util.TestFolderWritable(folder); err != nil { - glog.Fatalf("Check Data Folder(-dir) Writable %s : %s", folder, err) + if len(v.minFreeSpacePercents) == 1 && len(v.folders) > 1 { + for i := 0; i < len(v.folders)-1; i++ { + v.minFreeSpacePercents = append(v.minFreeSpacePercents, v.minFreeSpacePercents[0]) } } + if len(v.folders) != len(v.minFreeSpacePercents) { + glog.Fatalf("%d directories by -dir, but only %d minFreeSpacePercent is set by -minFreeSpacePercent", len(v.folders), len(v.minFreeSpacePercents)) + } // security related white list configuration if volumeWhiteListOption != "" { @@ -188,7 +210,7 @@ func (v VolumeServerOptions) startVolumeServer(volumeFolders, maxVolumeCounts, v volumeServer := weed_server.NewVolumeServer(volumeMux, publicVolumeMux, *v.ip, *v.port, *v.publicUrl, - v.folders, v.folderMaxLimits, v.minFreeSpacePercent, + v.folders, v.folderMaxLimits, v.minFreeSpacePercents, volumeNeedleMapKind, strings.Split(masters, ","), 5, *v.dataCenter, *v.rack, v.whiteList, @@ -196,7 +218,6 @@ func (v VolumeServerOptions) startVolumeServer(volumeFolders, maxVolumeCounts, v *v.compactionMBPerSecond, *v.fileSizeLimitMB, ) - // starting grpc server grpcS := v.startGrpcService(volumeServer) @@ -212,47 +233,48 @@ func (v VolumeServerOptions) startVolumeServer(volumeFolders, maxVolumeCounts, v // starting the cluster http server clusterHttpServer := v.startClusterHttpService(volumeMux) - stopChain := make(chan struct{}) + stopChan := make(chan bool) grace.OnInterrupt(func() { fmt.Println("volume server has be killed") - var startTime time.Time - - // firstly, stop the public http service to prevent from receiving new user request - if nil != publicHttpDown { - startTime = time.Now() - if err := publicHttpDown.Stop(); err != nil { - glog.Warningf("stop the public http server failed, %v", err) - } - delta := time.Now().Sub(startTime).Nanoseconds() / 1e6 - glog.V(0).Infof("stop public http server, elapsed %dms", delta) - } - startTime = time.Now() - if err := clusterHttpServer.Stop(); err != nil { - glog.Warningf("stop the cluster http server failed, %v", err) + // Stop heartbeats + if !volumeServer.StopHeartbeat() { + glog.V(0).Infof("stop send heartbeat and wait %d seconds until shutdown ...", *v.preStopSeconds) + time.Sleep(time.Duration(*v.preStopSeconds) * time.Second) } - delta := time.Now().Sub(startTime).Nanoseconds() / 1e6 - glog.V(0).Infof("graceful stop cluster http server, elapsed [%d]", delta) - startTime = time.Now() - grpcS.GracefulStop() - delta = time.Now().Sub(startTime).Nanoseconds() / 1e6 - glog.V(0).Infof("graceful stop gRPC, elapsed [%d]", delta) + shutdown(publicHttpDown, clusterHttpServer, grpcS, volumeServer) + stopChan <- true + }) - startTime = time.Now() - volumeServer.Shutdown() - delta = time.Now().Sub(startTime).Nanoseconds() / 1e6 - glog.V(0).Infof("stop volume server, elapsed [%d]", delta) + select { + case <-stopChan: + } - pprof.StopCPUProfile() +} - close(stopChain) // notify exit - }) +func shutdown(publicHttpDown httpdown.Server, clusterHttpServer httpdown.Server, grpcS *grpc.Server, volumeServer *weed_server.VolumeServer) { - select { - case <-stopChain: + // firstly, stop the public http service to prevent from receiving new user request + if nil != publicHttpDown { + glog.V(0).Infof("stop public http server ... ") + if err := publicHttpDown.Stop(); err != nil { + glog.Warningf("stop the public http server failed, %v", err) + } + } + + glog.V(0).Infof("graceful stop cluster http server ... ") + if err := clusterHttpServer.Stop(); err != nil { + glog.Warningf("stop the cluster http server failed, %v", err) } - glog.Warningf("the volume server exit.") + + glog.V(0).Infof("graceful stop gRPC ...") + grpcS.GracefulStop() + + volumeServer.Shutdown() + + pprof.StopCPUProfile() + } // check whether configure the public port |
