diff options
| author | yulai.li <blacktear23@gmail.com> | 2022-06-26 22:43:37 +0800 |
|---|---|---|
| committer | yulai.li <blacktear23@gmail.com> | 2022-06-26 22:43:37 +0800 |
| commit | 46e0b629e529f3aff535f90dd25eb719adf1c0d0 (patch) | |
| tree | 734125b48b6d96f8796a2b89b924312cd169ef0e /weed/command/volume.go | |
| parent | a5bd0b3a1644a77dcc0b9ff41c4ce8eb3ea0d566 (diff) | |
| parent | dc59ccd110a321db7d0b0480631aa95a3d9ba7e6 (diff) | |
| download | seaweedfs-46e0b629e529f3aff535f90dd25eb719adf1c0d0.tar.xz seaweedfs-46e0b629e529f3aff535f90dd25eb719adf1c0d0.zip | |
Update tikv client version and add one PC support
Diffstat (limited to 'weed/command/volume.go')
| -rw-r--r-- | weed/command/volume.go | 63 |
1 files changed, 41 insertions, 22 deletions
diff --git a/weed/command/volume.go b/weed/command/volume.go index 235eff11b..158bdf162 100644 --- a/weed/command/volume.go +++ b/weed/command/volume.go @@ -2,7 +2,6 @@ package command import ( "fmt" - "github.com/chrislusf/seaweedfs/weed/storage/types" "net/http" httppprof "net/http/pprof" "os" @@ -11,6 +10,8 @@ import ( "strings" "time" + "github.com/chrislusf/seaweedfs/weed/storage/types" + "github.com/spf13/viper" "google.golang.org/grpc" @@ -24,7 +25,7 @@ import ( "github.com/chrislusf/seaweedfs/weed/glog" "github.com/chrislusf/seaweedfs/weed/pb/volume_server_pb" - "github.com/chrislusf/seaweedfs/weed/server" + weed_server "github.com/chrislusf/seaweedfs/weed/server" stats_collect "github.com/chrislusf/seaweedfs/weed/stats" "github.com/chrislusf/seaweedfs/weed/storage" "github.com/chrislusf/seaweedfs/weed/util" @@ -36,6 +37,7 @@ var ( type VolumeServerOptions struct { port *int + portGrpc *int publicPort *int folders []string folderMaxLimits []int @@ -43,7 +45,8 @@ type VolumeServerOptions struct { ip *string publicUrl *string bindIp *string - masters *string + mastersString *string + masters []pb.ServerAddress idleConnectionTimeout *int dataCenter *string rack *string @@ -62,17 +65,19 @@ type VolumeServerOptions struct { preStopSeconds *int metricsHttpPort *int // pulseSeconds *int - enableTcp *bool + enableTcp *bool + inflightUploadDataTimeout *time.Duration } func init() { cmdVolume.Run = runVolume // break init cycle v.port = cmdVolume.Flag.Int("port", 8080, "http listen port") + v.portGrpc = cmdVolume.Flag.Int("port.grpc", 0, "grpc listen port") v.publicPort = cmdVolume.Flag.Int("port.public", 0, "port opened to public") v.ip = cmdVolume.Flag.String("ip", util.DetectedHostAddress(), "ip or server name, also used as identifier") v.publicUrl = cmdVolume.Flag.String("publicUrl", "", "Publicly accessible address") - v.bindIp = cmdVolume.Flag.String("ip.bind", "", "ip address to bind to") - v.masters = cmdVolume.Flag.String("mserver", "localhost:9333", "comma-separated master servers") + v.bindIp = cmdVolume.Flag.String("ip.bind", "", "ip address to bind to. If empty, default to same as -ip option.") + v.mastersString = cmdVolume.Flag.String("mserver", "localhost:9333", "comma-separated master servers") v.preStopSeconds = cmdVolume.Flag.Int("preStopSeconds", 10, "number of seconds between stop send heartbeats and stop volume server") // v.pulseSeconds = cmdVolume.Flag.Int("pulseSeconds", 5, "number of seconds between heartbeats, must be smaller than or equal to the master's setting") v.idleConnectionTimeout = cmdVolume.Flag.Int("idleTimeout", 30, "connection idle seconds") @@ -91,7 +96,8 @@ func init() { v.pprof = cmdVolume.Flag.Bool("pprof", false, "enable pprof http handlers. precludes --memprofile and --cpuprofile") v.metricsHttpPort = cmdVolume.Flag.Int("metricsPort", 0, "Prometheus metrics listen port") v.idxFolder = cmdVolume.Flag.String("dir.idx", "", "directory to store .idx files") - v.enableTcp = cmdVolume.Flag.Bool("tcp", false, "<exprimental> enable tcp port") + v.enableTcp = cmdVolume.Flag.Bool("tcp", false, "<experimental> enable tcp port") + v.inflightUploadDataTimeout = cmdVolume.Flag.Duration("inflightUploadDataTimeout", 60*time.Second, "inflight upload data wait timeout of volume servers") } var cmdVolume = &Command{ @@ -104,7 +110,7 @@ var cmdVolume = &Command{ var ( volumeFolders = cmdVolume.Flag.String("dir", os.TempDir(), "directories to store data files. dir[,dir]...") - maxVolumeCounts = cmdVolume.Flag.String("max", "8", "maximum numbers of volumes, count[,count]... If set to zero, the limit will be auto configured.") + maxVolumeCounts = cmdVolume.Flag.String("max", "8", "maximum numbers of volumes, count[,count]... If set to zero, the limit will be auto configured as free disk space divided by volume size.") volumeWhiteListOption = cmdVolume.Flag.String("whiteList", "", "comma separated Ip addresses having write permission. No limit if empty.") minFreeSpacePercent = cmdVolume.Flag.String("minFreeSpacePercent", "1", "minimum free disk space (default to 1%). Low disk space will mark all volumes as ReadOnly (deprecated, use minFreeSpace instead).") minFreeSpace = cmdVolume.Flag.String("minFreeSpace", "", "min free disk space (value<=100 as percentage like 1, other as human readable bytes, like 10GiB). Low disk space will mark all volumes as ReadOnly.") @@ -123,6 +129,7 @@ func runVolume(cmd *Command, args []string) bool { go stats_collect.StartMetricsServer(*v.metricsHttpPort) minFreeSpaces := util.MustParseMinFreeSpace(*minFreeSpace, *minFreeSpacePercent) + v.masters = pb.ServerAddresses(*v.mastersString).ToAddresses() v.startVolumeServer(*volumeFolders, *maxVolumeCounts, *volumeWhiteListOption, minFreeSpaces) return true @@ -189,12 +196,18 @@ func (v VolumeServerOptions) startVolumeServer(volumeFolders, maxVolumeCounts, v *v.ip = util.DetectedHostAddress() glog.V(0).Infof("detected volume server ip address: %v", *v.ip) } + if *v.bindIp == "" { + *v.bindIp = *v.ip + } if *v.publicPort == 0 { *v.publicPort = *v.port } + if *v.portGrpc == 0 { + *v.portGrpc = 10000 + *v.port + } if *v.publicUrl == "" { - *v.publicUrl = *v.ip + ":" + strconv.Itoa(*v.publicPort) + *v.publicUrl = util.JoinHostPort(*v.ip, *v.publicPort) } volumeMux := http.NewServeMux() @@ -221,20 +234,19 @@ func (v VolumeServerOptions) startVolumeServer(volumeFolders, maxVolumeCounts, v volumeNeedleMapKind = storage.NeedleMapLevelDbLarge } - masters := *v.masters - volumeServer := weed_server.NewVolumeServer(volumeMux, publicVolumeMux, - *v.ip, *v.port, *v.publicUrl, + *v.ip, *v.port, *v.portGrpc, *v.publicUrl, v.folders, v.folderMaxLimits, minFreeSpaces, diskTypes, *v.idxFolder, volumeNeedleMapKind, - strings.Split(masters, ","), 5, *v.dataCenter, *v.rack, + v.masters, 5, *v.dataCenter, *v.rack, v.whiteList, *v.fixJpgOrientation, *v.readMode, *v.compactionMBPerSecond, *v.fileSizeLimitMB, int64(*v.concurrentUploadLimitMB)*1024*1024, int64(*v.concurrentDownloadLimitMB)*1024*1024, + *v.inflightUploadDataTimeout, ) // starting grpc server grpcS := v.startGrpcService(volumeServer) @@ -258,7 +270,7 @@ func (v VolumeServerOptions) startVolumeServer(volumeFolders, maxVolumeCounts, v stopChan := make(chan bool) grace.OnInterrupt(func() { - fmt.Println("volume server has be killed") + fmt.Println("volume server has been killed") // Stop heartbeats if !volumeServer.StopHeartbeat() { @@ -307,8 +319,8 @@ func (v VolumeServerOptions) isSeparatedPublicPort() bool { } func (v VolumeServerOptions) startGrpcService(vs volume_server_pb.VolumeServerServer) *grpc.Server { - grpcPort := *v.port + 10000 - grpcL, err := util.NewListener(*v.bindIp+":"+strconv.Itoa(grpcPort), 0) + grpcPort := *v.portGrpc + grpcL, err := util.NewListener(util.JoinHostPort(*v.bindIp, grpcPort), 0) if err != nil { glog.Fatalf("failed to listen on grpc port %d: %v", grpcPort, err) } @@ -324,7 +336,7 @@ func (v VolumeServerOptions) startGrpcService(vs volume_server_pb.VolumeServerSe } func (v VolumeServerOptions) startPublicHttpService(handler http.Handler) httpdown.Server { - publicListeningAddress := *v.bindIp + ":" + strconv.Itoa(*v.publicPort) + publicListeningAddress := util.JoinHostPort(*v.bindIp, *v.publicPort) glog.V(0).Infoln("Start Seaweed volume server", util.Version(), "public at", publicListeningAddress) publicListener, e := util.NewListener(publicListeningAddress, time.Duration(*v.idleConnectionTimeout)*time.Second) if e != nil { @@ -351,7 +363,7 @@ func (v VolumeServerOptions) startClusterHttpService(handler http.Handler) httpd keyFile = viper.GetString("https.volume.key") } - listeningAddress := *v.bindIp + ":" + strconv.Itoa(*v.port) + listeningAddress := util.JoinHostPort(*v.bindIp, *v.port) glog.V(0).Infof("Start Seaweed volume server %s at %s", util.Version(), listeningAddress) listener, e := util.NewListener(listeningAddress, time.Duration(*v.idleConnectionTimeout)*time.Second) if e != nil { @@ -359,11 +371,18 @@ func (v VolumeServerOptions) startClusterHttpService(handler http.Handler) httpd } httpDown := httpdown.HTTP{ - KillTimeout: 5 * time.Minute, - StopTimeout: 5 * time.Minute, + KillTimeout: time.Minute, + StopTimeout: 30 * time.Second, CertFile: certFile, KeyFile: keyFile} - clusterHttpServer := httpDown.Serve(&http.Server{Handler: handler}, listener) + httpS := &http.Server{Handler: handler} + + if viper.GetString("https.volume.ca") != "" { + clientCertFile := viper.GetString("https.volume.ca") + httpS.TLSConfig = security.LoadClientTLSHTTP(clientCertFile) + } + + clusterHttpServer := httpDown.Serve(httpS, listener) go func() { if e := clusterHttpServer.Wait(); e != nil { glog.Fatalf("Volume server fail to serve: %v", e) @@ -373,7 +392,7 @@ func (v VolumeServerOptions) startClusterHttpService(handler http.Handler) httpd } func (v VolumeServerOptions) startTcpService(volumeServer *weed_server.VolumeServer) { - listeningAddress := *v.bindIp + ":" + strconv.Itoa(*v.port+20000) + listeningAddress := util.JoinHostPort(*v.bindIp, *v.port+20000) glog.V(0).Infoln("Start Seaweed volume server", util.Version(), "tcp at", listeningAddress) listener, e := util.NewListener(listeningAddress, 0) if e != nil { |
