aboutsummaryrefslogtreecommitdiff
path: root/weed/command/volume.go
diff options
context:
space:
mode:
authoryulai.li <blacktear23@gmail.com>2022-06-26 22:43:37 +0800
committeryulai.li <blacktear23@gmail.com>2022-06-26 22:43:37 +0800
commit46e0b629e529f3aff535f90dd25eb719adf1c0d0 (patch)
tree734125b48b6d96f8796a2b89b924312cd169ef0e /weed/command/volume.go
parenta5bd0b3a1644a77dcc0b9ff41c4ce8eb3ea0d566 (diff)
parentdc59ccd110a321db7d0b0480631aa95a3d9ba7e6 (diff)
downloadseaweedfs-46e0b629e529f3aff535f90dd25eb719adf1c0d0.tar.xz
seaweedfs-46e0b629e529f3aff535f90dd25eb719adf1c0d0.zip
Update tikv client version and add one PC support
Diffstat (limited to 'weed/command/volume.go')
-rw-r--r--weed/command/volume.go63
1 files changed, 41 insertions, 22 deletions
diff --git a/weed/command/volume.go b/weed/command/volume.go
index 235eff11b..158bdf162 100644
--- a/weed/command/volume.go
+++ b/weed/command/volume.go
@@ -2,7 +2,6 @@ package command
import (
"fmt"
- "github.com/chrislusf/seaweedfs/weed/storage/types"
"net/http"
httppprof "net/http/pprof"
"os"
@@ -11,6 +10,8 @@ import (
"strings"
"time"
+ "github.com/chrislusf/seaweedfs/weed/storage/types"
+
"github.com/spf13/viper"
"google.golang.org/grpc"
@@ -24,7 +25,7 @@ import (
"github.com/chrislusf/seaweedfs/weed/glog"
"github.com/chrislusf/seaweedfs/weed/pb/volume_server_pb"
- "github.com/chrislusf/seaweedfs/weed/server"
+ weed_server "github.com/chrislusf/seaweedfs/weed/server"
stats_collect "github.com/chrislusf/seaweedfs/weed/stats"
"github.com/chrislusf/seaweedfs/weed/storage"
"github.com/chrislusf/seaweedfs/weed/util"
@@ -36,6 +37,7 @@ var (
type VolumeServerOptions struct {
port *int
+ portGrpc *int
publicPort *int
folders []string
folderMaxLimits []int
@@ -43,7 +45,8 @@ type VolumeServerOptions struct {
ip *string
publicUrl *string
bindIp *string
- masters *string
+ mastersString *string
+ masters []pb.ServerAddress
idleConnectionTimeout *int
dataCenter *string
rack *string
@@ -62,17 +65,19 @@ type VolumeServerOptions struct {
preStopSeconds *int
metricsHttpPort *int
// pulseSeconds *int
- enableTcp *bool
+ enableTcp *bool
+ inflightUploadDataTimeout *time.Duration
}
func init() {
cmdVolume.Run = runVolume // break init cycle
v.port = cmdVolume.Flag.Int("port", 8080, "http listen port")
+ v.portGrpc = cmdVolume.Flag.Int("port.grpc", 0, "grpc listen port")
v.publicPort = cmdVolume.Flag.Int("port.public", 0, "port opened to public")
v.ip = cmdVolume.Flag.String("ip", util.DetectedHostAddress(), "ip or server name, also used as identifier")
v.publicUrl = cmdVolume.Flag.String("publicUrl", "", "Publicly accessible address")
- v.bindIp = cmdVolume.Flag.String("ip.bind", "", "ip address to bind to")
- v.masters = cmdVolume.Flag.String("mserver", "localhost:9333", "comma-separated master servers")
+ v.bindIp = cmdVolume.Flag.String("ip.bind", "", "ip address to bind to. If empty, default to same as -ip option.")
+ v.mastersString = cmdVolume.Flag.String("mserver", "localhost:9333", "comma-separated master servers")
v.preStopSeconds = cmdVolume.Flag.Int("preStopSeconds", 10, "number of seconds between stop send heartbeats and stop volume server")
// v.pulseSeconds = cmdVolume.Flag.Int("pulseSeconds", 5, "number of seconds between heartbeats, must be smaller than or equal to the master's setting")
v.idleConnectionTimeout = cmdVolume.Flag.Int("idleTimeout", 30, "connection idle seconds")
@@ -91,7 +96,8 @@ func init() {
v.pprof = cmdVolume.Flag.Bool("pprof", false, "enable pprof http handlers. precludes --memprofile and --cpuprofile")
v.metricsHttpPort = cmdVolume.Flag.Int("metricsPort", 0, "Prometheus metrics listen port")
v.idxFolder = cmdVolume.Flag.String("dir.idx", "", "directory to store .idx files")
- v.enableTcp = cmdVolume.Flag.Bool("tcp", false, "<exprimental> enable tcp port")
+ v.enableTcp = cmdVolume.Flag.Bool("tcp", false, "<experimental> enable tcp port")
+ v.inflightUploadDataTimeout = cmdVolume.Flag.Duration("inflightUploadDataTimeout", 60*time.Second, "inflight upload data wait timeout of volume servers")
}
var cmdVolume = &Command{
@@ -104,7 +110,7 @@ var cmdVolume = &Command{
var (
volumeFolders = cmdVolume.Flag.String("dir", os.TempDir(), "directories to store data files. dir[,dir]...")
- maxVolumeCounts = cmdVolume.Flag.String("max", "8", "maximum numbers of volumes, count[,count]... If set to zero, the limit will be auto configured.")
+ maxVolumeCounts = cmdVolume.Flag.String("max", "8", "maximum numbers of volumes, count[,count]... If set to zero, the limit will be auto configured as free disk space divided by volume size.")
volumeWhiteListOption = cmdVolume.Flag.String("whiteList", "", "comma separated Ip addresses having write permission. No limit if empty.")
minFreeSpacePercent = cmdVolume.Flag.String("minFreeSpacePercent", "1", "minimum free disk space (default to 1%). Low disk space will mark all volumes as ReadOnly (deprecated, use minFreeSpace instead).")
minFreeSpace = cmdVolume.Flag.String("minFreeSpace", "", "min free disk space (value<=100 as percentage like 1, other as human readable bytes, like 10GiB). Low disk space will mark all volumes as ReadOnly.")
@@ -123,6 +129,7 @@ func runVolume(cmd *Command, args []string) bool {
go stats_collect.StartMetricsServer(*v.metricsHttpPort)
minFreeSpaces := util.MustParseMinFreeSpace(*minFreeSpace, *minFreeSpacePercent)
+ v.masters = pb.ServerAddresses(*v.mastersString).ToAddresses()
v.startVolumeServer(*volumeFolders, *maxVolumeCounts, *volumeWhiteListOption, minFreeSpaces)
return true
@@ -189,12 +196,18 @@ func (v VolumeServerOptions) startVolumeServer(volumeFolders, maxVolumeCounts, v
*v.ip = util.DetectedHostAddress()
glog.V(0).Infof("detected volume server ip address: %v", *v.ip)
}
+ if *v.bindIp == "" {
+ *v.bindIp = *v.ip
+ }
if *v.publicPort == 0 {
*v.publicPort = *v.port
}
+ if *v.portGrpc == 0 {
+ *v.portGrpc = 10000 + *v.port
+ }
if *v.publicUrl == "" {
- *v.publicUrl = *v.ip + ":" + strconv.Itoa(*v.publicPort)
+ *v.publicUrl = util.JoinHostPort(*v.ip, *v.publicPort)
}
volumeMux := http.NewServeMux()
@@ -221,20 +234,19 @@ func (v VolumeServerOptions) startVolumeServer(volumeFolders, maxVolumeCounts, v
volumeNeedleMapKind = storage.NeedleMapLevelDbLarge
}
- masters := *v.masters
-
volumeServer := weed_server.NewVolumeServer(volumeMux, publicVolumeMux,
- *v.ip, *v.port, *v.publicUrl,
+ *v.ip, *v.port, *v.portGrpc, *v.publicUrl,
v.folders, v.folderMaxLimits, minFreeSpaces, diskTypes,
*v.idxFolder,
volumeNeedleMapKind,
- strings.Split(masters, ","), 5, *v.dataCenter, *v.rack,
+ v.masters, 5, *v.dataCenter, *v.rack,
v.whiteList,
*v.fixJpgOrientation, *v.readMode,
*v.compactionMBPerSecond,
*v.fileSizeLimitMB,
int64(*v.concurrentUploadLimitMB)*1024*1024,
int64(*v.concurrentDownloadLimitMB)*1024*1024,
+ *v.inflightUploadDataTimeout,
)
// starting grpc server
grpcS := v.startGrpcService(volumeServer)
@@ -258,7 +270,7 @@ func (v VolumeServerOptions) startVolumeServer(volumeFolders, maxVolumeCounts, v
stopChan := make(chan bool)
grace.OnInterrupt(func() {
- fmt.Println("volume server has be killed")
+ fmt.Println("volume server has been killed")
// Stop heartbeats
if !volumeServer.StopHeartbeat() {
@@ -307,8 +319,8 @@ func (v VolumeServerOptions) isSeparatedPublicPort() bool {
}
func (v VolumeServerOptions) startGrpcService(vs volume_server_pb.VolumeServerServer) *grpc.Server {
- grpcPort := *v.port + 10000
- grpcL, err := util.NewListener(*v.bindIp+":"+strconv.Itoa(grpcPort), 0)
+ grpcPort := *v.portGrpc
+ grpcL, err := util.NewListener(util.JoinHostPort(*v.bindIp, grpcPort), 0)
if err != nil {
glog.Fatalf("failed to listen on grpc port %d: %v", grpcPort, err)
}
@@ -324,7 +336,7 @@ func (v VolumeServerOptions) startGrpcService(vs volume_server_pb.VolumeServerSe
}
func (v VolumeServerOptions) startPublicHttpService(handler http.Handler) httpdown.Server {
- publicListeningAddress := *v.bindIp + ":" + strconv.Itoa(*v.publicPort)
+ publicListeningAddress := util.JoinHostPort(*v.bindIp, *v.publicPort)
glog.V(0).Infoln("Start Seaweed volume server", util.Version(), "public at", publicListeningAddress)
publicListener, e := util.NewListener(publicListeningAddress, time.Duration(*v.idleConnectionTimeout)*time.Second)
if e != nil {
@@ -351,7 +363,7 @@ func (v VolumeServerOptions) startClusterHttpService(handler http.Handler) httpd
keyFile = viper.GetString("https.volume.key")
}
- listeningAddress := *v.bindIp + ":" + strconv.Itoa(*v.port)
+ listeningAddress := util.JoinHostPort(*v.bindIp, *v.port)
glog.V(0).Infof("Start Seaweed volume server %s at %s", util.Version(), listeningAddress)
listener, e := util.NewListener(listeningAddress, time.Duration(*v.idleConnectionTimeout)*time.Second)
if e != nil {
@@ -359,11 +371,18 @@ func (v VolumeServerOptions) startClusterHttpService(handler http.Handler) httpd
}
httpDown := httpdown.HTTP{
- KillTimeout: 5 * time.Minute,
- StopTimeout: 5 * time.Minute,
+ KillTimeout: time.Minute,
+ StopTimeout: 30 * time.Second,
CertFile: certFile,
KeyFile: keyFile}
- clusterHttpServer := httpDown.Serve(&http.Server{Handler: handler}, listener)
+ httpS := &http.Server{Handler: handler}
+
+ if viper.GetString("https.volume.ca") != "" {
+ clientCertFile := viper.GetString("https.volume.ca")
+ httpS.TLSConfig = security.LoadClientTLSHTTP(clientCertFile)
+ }
+
+ clusterHttpServer := httpDown.Serve(httpS, listener)
go func() {
if e := clusterHttpServer.Wait(); e != nil {
glog.Fatalf("Volume server fail to serve: %v", e)
@@ -373,7 +392,7 @@ func (v VolumeServerOptions) startClusterHttpService(handler http.Handler) httpd
}
func (v VolumeServerOptions) startTcpService(volumeServer *weed_server.VolumeServer) {
- listeningAddress := *v.bindIp + ":" + strconv.Itoa(*v.port+20000)
+ listeningAddress := util.JoinHostPort(*v.bindIp, *v.port+20000)
glog.V(0).Infoln("Start Seaweed volume server", util.Version(), "tcp at", listeningAddress)
listener, e := util.NewListener(listeningAddress, 0)
if e != nil {