aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorChris Lu <chris.lu@gmail.com>2018-10-11 00:04:31 -0700
committerChris Lu <chris.lu@gmail.com>2018-10-11 00:04:31 -0700
commitda6154b29c71c5f12d482ffd773e77d492b89371 (patch)
tree865a80e6256fa7893bcdae80e9ea2412a145b609
parent60d2f1557d4b871fb8b19cab8206d7b725c12845 (diff)
downloadseaweedfs-da6154b29c71c5f12d482ffd773e77d492b89371.tar.xz
seaweedfs-da6154b29c71c5f12d482ffd773e77d492b89371.zip
refactor volume server to startVolumeServer()
-rw-r--r--weed/command/server.go112
-rw-r--r--weed/command/volume.go28
2 files changed, 40 insertions, 100 deletions
diff --git a/weed/command/server.go b/weed/command/server.go
index 7cd245b94..875feebad 100644
--- a/weed/command/server.go
+++ b/weed/command/server.go
@@ -13,8 +13,7 @@ import (
"github.com/chrislusf/seaweedfs/weed/glog"
"github.com/chrislusf/seaweedfs/weed/pb/master_pb"
"github.com/chrislusf/seaweedfs/weed/server"
- "github.com/chrislusf/seaweedfs/weed/storage"
- "github.com/chrislusf/seaweedfs/weed/util"
+ "github.com/chrislusf/seaweedfs/weed/util"
"github.com/gorilla/mux"
"github.com/soheilhy/cmux"
"google.golang.org/grpc/reflection"
@@ -22,6 +21,7 @@ import (
type ServerOptions struct {
cpuprofile *string
+ v VolumeServerOptions
}
var (
@@ -66,15 +66,9 @@ var (
masterVolumeSizeLimitMB = cmdServer.Flag.Uint("master.volumeSizeLimitMB", 30*1000, "Master stops directing writes to oversized volumes.")
masterVolumePreallocate = cmdServer.Flag.Bool("master.volumePreallocate", false, "Preallocate disk space for volumes.")
masterDefaultReplicaPlacement = cmdServer.Flag.String("master.defaultReplicaPlacement", "000", "Default replication type if not specified.")
- volumePort = cmdServer.Flag.Int("volume.port", 8080, "volume server http listen port")
- volumePublicPort = cmdServer.Flag.Int("volume.port.public", 0, "volume server public port")
volumeDataFolders = cmdServer.Flag.String("dir", os.TempDir(), "directories to store data files. dir[,dir]...")
volumeMaxDataVolumeCounts = cmdServer.Flag.String("volume.max", "7", "maximum numbers of volumes, count[,count]...")
- volumePulse = cmdServer.Flag.Int("pulseSeconds", 5, "number of seconds between heartbeats")
- volumeIndexType = cmdServer.Flag.String("volume.index", "memory", "Choose [memory|leveldb|boltdb|btree] mode for memory~performance balance.")
- volumeFixJpgOrientation = cmdServer.Flag.Bool("volume.images.fix.orientation", true, "Adjust jpg orientation when uploading.")
- volumeReadRedirect = cmdServer.Flag.Bool("volume.read.redirect", true, "Redirect moved or non-local volumes.")
- volumeServerPublicUrl = cmdServer.Flag.String("volume.publicUrl", "", "publicly accessible address")
+ pulseSeconds = cmdServer.Flag.Int("pulseSeconds", 5, "number of seconds between heartbeats")
isStartingFiler = cmdServer.Flag.Bool("filer", false, "whether to start filer")
serverWhiteList []string
@@ -91,6 +85,14 @@ func init() {
filerOptions.disableDirListing = cmdServer.Flag.Bool("filer.disableDirListing", false, "turn off directory listing")
filerOptions.maxMB = cmdServer.Flag.Int("filer.maxMB", 32, "split files larger than the limit")
filerOptions.dirListingLimit = cmdServer.Flag.Int("filer.dirListLimit", 1000, "limit sub dir listing size")
+
+ serverOptions.v.port = cmdServer.Flag.Int("volume.port", 8080, "volume server http listen port")
+ serverOptions.v.publicPort = cmdServer.Flag.Int("volume.port.public", 0, "volume server public port")
+ serverOptions.v.indexType = cmdServer.Flag.String("volume.index", "memory", "Choose [memory|leveldb|boltdb|btree] mode for memory~performance balance.")
+ serverOptions.v.fixJpgOrientation = cmdServer.Flag.Bool("volume.images.fix.orientation", true, "Adjust jpg orientation when uploading.")
+ serverOptions.v.readRedirect = cmdServer.Flag.Bool("volume.read.redirect", true, "Redirect moved or non-local volumes.")
+ serverOptions.v.publicUrl = cmdServer.Flag.String("volume.publicUrl", "", "publicly accessible address")
+
}
func runServer(cmd *Command, args []string) bool {
@@ -110,6 +112,14 @@ func runServer(cmd *Command, args []string) bool {
master := *serverIp + ":" + strconv.Itoa(*masterPort)
filerOptions.ip = serverIp
+ serverOptions.v.ip = serverIp
+ serverOptions.v.bindIp = serverBindIp
+ serverOptions.v.masters = &master
+ serverOptions.v.idleConnectionTimeout = serverTimeout
+ serverOptions.v.maxCpu = serverMaxCpu
+ serverOptions.v.dataCenter = serverDataCenter
+ serverOptions.v.rack = serverRack
+ serverOptions.v.pulseSeconds = pulseSeconds
filerOptions.dataCenter = serverDataCenter
@@ -117,33 +127,12 @@ func runServer(cmd *Command, args []string) bool {
*filerOptions.defaultReplicaPlacement = *masterDefaultReplicaPlacement
}
- if *volumePublicPort == 0 {
- *volumePublicPort = *volumePort
- }
-
if *serverMaxCpu < 1 {
*serverMaxCpu = runtime.NumCPU()
}
runtime.GOMAXPROCS(*serverMaxCpu)
folders := strings.Split(*volumeDataFolders, ",")
- maxCountStrings := strings.Split(*volumeMaxDataVolumeCounts, ",")
- var maxCounts []int
- for _, maxString := range maxCountStrings {
- if max, e := strconv.Atoi(maxString); e == nil {
- maxCounts = append(maxCounts, max)
- } else {
- glog.Fatalf("The max specified in -max not a valid number %s", maxString)
- }
- }
- if len(folders) != len(maxCounts) {
- glog.Fatalf("%d directories by -dir, but only %d max is set by -max", len(folders), len(maxCounts))
- }
- for _, folder := range folders {
- if err := util.TestFolderWritable(folder); err != nil {
- glog.Fatalf("Check Data Folder(-dir) Writable %s : %s", folder, err)
- }
- }
if *masterVolumeSizeLimitMB > 30*1000 {
glog.Fatalf("masterVolumeSizeLimitMB should be less than 30000")
@@ -179,7 +168,7 @@ func runServer(cmd *Command, args []string) bool {
r := mux.NewRouter()
ms := weed_server.NewMasterServer(r, *masterPort, *masterMetaFolder,
*masterVolumeSizeLimitMB, *masterVolumePreallocate,
- *volumePulse, *masterDefaultReplicaPlacement, *serverGarbageThreshold,
+ *pulseSeconds, *masterDefaultReplicaPlacement, *serverGarbageThreshold,
serverWhiteList, *serverSecureKey,
)
@@ -193,7 +182,7 @@ func runServer(cmd *Command, args []string) bool {
raftWaitForMaster.Wait()
time.Sleep(100 * time.Millisecond)
myAddress, peers := checkPeers(*serverIp, *masterPort, *serverPeers)
- raftServer := weed_server.NewRaftServer(r, peers, myAddress, *masterMetaFolder, ms.Topo, *volumePulse)
+ raftServer := weed_server.NewRaftServer(r, peers, myAddress, *masterMetaFolder, ms.Topo, *pulseSeconds)
ms.SetRaftServer(raftServer)
volumeWait.Done()
}()
@@ -224,65 +213,8 @@ func runServer(cmd *Command, args []string) bool {
volumeWait.Wait()
time.Sleep(100 * time.Millisecond)
- if *volumePublicPort == 0 {
- *volumePublicPort = *volumePort
- }
- if *volumeServerPublicUrl == "" {
- *volumeServerPublicUrl = *serverIp + ":" + strconv.Itoa(*volumePublicPort)
- }
- isSeperatedPublicPort := *volumePublicPort != *volumePort
- volumeMux := http.NewServeMux()
- publicVolumeMux := volumeMux
- if isSeperatedPublicPort {
- publicVolumeMux = http.NewServeMux()
- }
- volumeNeedleMapKind := storage.NeedleMapInMemory
- switch *volumeIndexType {
- case "leveldb":
- volumeNeedleMapKind = storage.NeedleMapLevelDb
- case "boltdb":
- volumeNeedleMapKind = storage.NeedleMapBoltDb
- case "btree":
- volumeNeedleMapKind = storage.NeedleMapBtree
- }
- volumeServer := weed_server.NewVolumeServer(volumeMux, publicVolumeMux,
- *serverIp, *volumePort, *volumeServerPublicUrl,
- folders, maxCounts,
- volumeNeedleMapKind,
- []string{master}, *volumePulse, *serverDataCenter, *serverRack,
- serverWhiteList, *volumeFixJpgOrientation, *volumeReadRedirect,
- )
-
- glog.V(0).Infoln("Start Seaweed volume server", util.VERSION, "at", *serverIp+":"+strconv.Itoa(*volumePort))
- volumeListener, eListen := util.NewListener(
- *serverBindIp+":"+strconv.Itoa(*volumePort),
- time.Duration(*serverTimeout)*time.Second,
- )
- if eListen != nil {
- glog.Fatalf("Volume server listener error: %v", eListen)
- }
- if isSeperatedPublicPort {
- publicListeningAddress := *serverIp + ":" + strconv.Itoa(*volumePublicPort)
- glog.V(0).Infoln("Start Seaweed volume server", util.VERSION, "public at", publicListeningAddress)
- publicListener, e := util.NewListener(publicListeningAddress, time.Duration(*serverTimeout)*time.Second)
- if e != nil {
- glog.Fatalf("Volume server listener error:%v", e)
- }
- go func() {
- if e := http.Serve(publicListener, publicVolumeMux); e != nil {
- glog.Fatalf("Volume server fail to serve public: %v", e)
- }
- }()
- }
- util.OnInterrupt(func() {
- volumeServer.Shutdown()
- pprof.StopCPUProfile()
- })
-
- if e := http.Serve(volumeListener, volumeMux); e != nil {
- glog.Fatalf("Volume server fail to serve:%v", e)
- }
+ serverOptions.v.startVolumeServer(*volumeDataFolders, *volumeMaxDataVolumeCounts, *serverWhiteListOption)
return true
}
diff --git a/weed/command/volume.go b/weed/command/volume.go
index 407c39eb1..df8d842dc 100644
--- a/weed/command/volume.go
+++ b/weed/command/volume.go
@@ -1,17 +1,17 @@
package command
import (
- "net/http"
"os"
"runtime"
"strconv"
"strings"
- "time"
-
"github.com/chrislusf/seaweedfs/weed/glog"
- "github.com/chrislusf/seaweedfs/weed/server"
- "github.com/chrislusf/seaweedfs/weed/storage"
"github.com/chrislusf/seaweedfs/weed/util"
+ "net/http"
+ "github.com/chrislusf/seaweedfs/weed/storage"
+ "github.com/chrislusf/seaweedfs/weed/server"
+ "time"
+ "runtime/pprof"
)
var (
@@ -81,9 +81,16 @@ func runVolume(cmd *Command, args []string) bool {
runtime.GOMAXPROCS(*v.maxCpu)
util.SetupProfiling(*v.cpuProfile, *v.memProfile)
+ v.startVolumeServer(*volumeFolders, *maxVolumeCounts, *volumeWhiteListOption)
+
+ return true
+}
+
+func (v VolumeServerOptions) startVolumeServer(volumeFolders, maxVolumeCounts, volumeWhiteListOption string) {
+
//Set multiple folders and each folder's max volume count limit'
- v.folders = strings.Split(*volumeFolders, ",")
- maxCountStrings := strings.Split(*maxVolumeCounts, ",")
+ v.folders = strings.Split(volumeFolders, ",")
+ maxCountStrings := strings.Split(maxVolumeCounts, ",")
for _, maxString := range maxCountStrings {
if max, e := strconv.Atoi(maxString); e == nil {
v.folderMaxLimits = append(v.folderMaxLimits, max)
@@ -101,8 +108,8 @@ func runVolume(cmd *Command, args []string) bool {
}
//security related white list configuration
- if *volumeWhiteListOption != "" {
- v.whiteList = strings.Split(*volumeWhiteListOption, ",")
+ if volumeWhiteListOption != "" {
+ v.whiteList = strings.Split(volumeWhiteListOption, ",")
}
if *v.ip == "" {
@@ -166,10 +173,11 @@ func runVolume(cmd *Command, args []string) bool {
util.OnInterrupt(func() {
volumeServer.Shutdown()
+ pprof.StopCPUProfile()
})
if e := http.Serve(listener, volumeMux); e != nil {
glog.Fatalf("Volume server fail to serve: %v", e)
}
- return true
+
}