aboutsummaryrefslogtreecommitdiff
path: root/weed/server
diff options
context:
space:
mode:
authorChris Lu <chris.lu@gmail.com>2019-06-23 03:08:27 -0700
committerChris Lu <chris.lu@gmail.com>2019-06-23 03:08:27 -0700
commitf16375621f25995837868bd77e64a7d0da2ac68d (patch)
tree823ec9a69dcfae7a60fc00cee48835c0a191fa9b /weed/server
parent6c01fb6d2d9986731c0d44b233b480c1bb47d219 (diff)
downloadseaweedfs-f16375621f25995837868bd77e64a7d0da2ac68d.tar.xz
seaweedfs-f16375621f25995837868bd77e64a7d0da2ac68d.zip
big refactoring
Diffstat (limited to 'weed/server')
-rw-r--r--weed/server/master_grpc_server.go6
-rw-r--r--weed/server/master_grpc_server_volume.go8
-rw-r--r--weed/server/master_server.go70
-rw-r--r--weed/server/master_server_handlers_admin.go10
4 files changed, 43 insertions, 51 deletions
diff --git a/weed/server/master_grpc_server.go b/weed/server/master_grpc_server.go
index b766b8d7d..1a17327a0 100644
--- a/weed/server/master_grpc_server.go
+++ b/weed/server/master_grpc_server.go
@@ -70,7 +70,7 @@ func (ms *MasterServer) SendHeartbeat(stream master_pb.Seaweed_SendHeartbeatServ
int64(heartbeat.MaxVolumeCount))
glog.V(0).Infof("added volume server %v:%d", heartbeat.GetIp(), heartbeat.GetPort())
if err := stream.Send(&master_pb.HeartbeatResponse{
- VolumeSizeLimit: uint64(ms.volumeSizeLimitMB) * 1024 * 1024,
+ VolumeSizeLimit: uint64(ms.option.VolumeSizeLimitMB) * 1024 * 1024,
}); err != nil {
return err
}
@@ -157,8 +157,8 @@ func (ms *MasterServer) SendHeartbeat(stream master_pb.Seaweed_SendHeartbeatServ
}
if err := stream.Send(&master_pb.HeartbeatResponse{
Leader: newLeader,
- MetricsAddress: ms.metricsAddress,
- MetricsIntervalSeconds: uint32(ms.metricsIntervalSec),
+ MetricsAddress: ms.option.MetricsAddress,
+ MetricsIntervalSeconds: uint32(ms.option.MetricsIntervalSec),
}); err != nil {
return err
}
diff --git a/weed/server/master_grpc_server_volume.go b/weed/server/master_grpc_server_volume.go
index 2aaf7b891..17903f020 100644
--- a/weed/server/master_grpc_server_volume.go
+++ b/weed/server/master_grpc_server_volume.go
@@ -50,7 +50,7 @@ func (ms *MasterServer) Assign(ctx context.Context, req *master_pb.AssignRequest
}
if req.Replication == "" {
- req.Replication = ms.defaultReplicaPlacement
+ req.Replication = ms.option.DefaultReplicaPlacement
}
replicaPlacement, err := storage.NewReplicaPlacementFromString(req.Replication)
if err != nil {
@@ -65,7 +65,7 @@ func (ms *MasterServer) Assign(ctx context.Context, req *master_pb.AssignRequest
Collection: req.Collection,
ReplicaPlacement: replicaPlacement,
Ttl: ttl,
- Prealloacte: ms.preallocate,
+ Prealloacte: ms.preallocateSize,
DataCenter: req.DataCenter,
Rack: req.Rack,
DataNode: req.DataNode,
@@ -105,7 +105,7 @@ func (ms *MasterServer) Statistics(ctx context.Context, req *master_pb.Statistic
}
if req.Replication == "" {
- req.Replication = ms.defaultReplicaPlacement
+ req.Replication = ms.option.DefaultReplicaPlacement
}
replicaPlacement, err := storage.NewReplicaPlacementFromString(req.Replication)
if err != nil {
@@ -136,7 +136,7 @@ func (ms *MasterServer) VolumeList(ctx context.Context, req *master_pb.VolumeLis
resp := &master_pb.VolumeListResponse{
TopologyInfo: ms.Topo.ToTopologyInfo(),
- VolumeSizeLimitMb: uint64(ms.volumeSizeLimitMB),
+ VolumeSizeLimitMb: uint64(ms.option.VolumeSizeLimitMB),
}
return resp, nil
diff --git a/weed/server/master_server.go b/weed/server/master_server.go
index 180007df7..3689b5495 100644
--- a/weed/server/master_server.go
+++ b/weed/server/master_server.go
@@ -25,17 +25,25 @@ import (
"github.com/spf13/viper"
)
+type MasterOption struct {
+ Port int
+ MetaFolder string
+ VolumeSizeLimitMB uint
+ VolumePreallocate bool
+ PulseSeconds int
+ DefaultReplicaPlacement string
+ GarbageThreshold float64
+ WhiteList []string
+ DisableHttp bool
+ MetricsAddress string
+ MetricsIntervalSec int
+}
+
type MasterServer struct {
- port int
- metaFolder string
- volumeSizeLimitMB uint
- preallocate int64
- pulseSeconds int
- defaultReplicaPlacement string
- garbageThreshold float64
- guard *security.Guard
- metricsAddress string
- metricsIntervalSec int
+ option *MasterOption
+ guard *security.Guard
+
+ preallocateSize int64
Topo *topology.Topology
vg *topology.VolumeGrowth
@@ -50,17 +58,7 @@ type MasterServer struct {
grpcDialOpiton grpc.DialOption
}
-func NewMasterServer(r *mux.Router, port int, metaFolder string,
- volumeSizeLimitMB uint,
- preallocate bool,
- pulseSeconds int,
- defaultReplicaPlacement string,
- garbageThreshold float64,
- whiteList []string,
- disableHttp bool,
- metricsAddress string,
- metricsIntervalSec int,
-) *MasterServer {
+func NewMasterServer(r *mux.Router, option *MasterOption) *MasterServer {
v := viper.GetViper()
signingKey := v.GetString("jwt.signing.key")
@@ -72,30 +70,24 @@ func NewMasterServer(r *mux.Router, port int, metaFolder string,
readExpiresAfterSec := v.GetInt("jwt.signing.read.expires_after_seconds")
var preallocateSize int64
- if preallocate {
- preallocateSize = int64(volumeSizeLimitMB) * (1 << 20)
+ if option.VolumePreallocate {
+ preallocateSize = int64(option.VolumeSizeLimitMB) * (1 << 20)
}
ms := &MasterServer{
- port: port,
- volumeSizeLimitMB: volumeSizeLimitMB,
- preallocate: preallocateSize,
- pulseSeconds: pulseSeconds,
- defaultReplicaPlacement: defaultReplicaPlacement,
- garbageThreshold: garbageThreshold,
- clientChans: make(map[string]chan *master_pb.VolumeLocation),
- grpcDialOpiton: security.LoadClientTLS(v.Sub("grpc"), "master"),
- metricsAddress: metricsAddress,
- metricsIntervalSec: metricsIntervalSec,
+ option: option,
+ preallocateSize: preallocateSize,
+ clientChans: make(map[string]chan *master_pb.VolumeLocation),
+ grpcDialOpiton: security.LoadClientTLS(v.Sub("grpc"), "master"),
}
ms.bounedLeaderChan = make(chan int, 16)
seq := sequence.NewMemorySequencer()
- ms.Topo = topology.NewTopology("topo", seq, uint64(volumeSizeLimitMB)*1024*1024, pulseSeconds)
+ ms.Topo = topology.NewTopology("topo", seq, uint64(ms.option.VolumeSizeLimitMB)*1024*1024, ms.option.PulseSeconds)
ms.vg = topology.NewDefaultVolumeGrowth()
- glog.V(0).Infoln("Volume Size Limit is", volumeSizeLimitMB, "MB")
+ glog.V(0).Infoln("Volume Size Limit is", ms.option.VolumeSizeLimitMB, "MB")
- ms.guard = security.NewGuard(whiteList, signingKey, expiresAfterSec, readSigningKey, readExpiresAfterSec)
+ ms.guard = security.NewGuard(ms.option.WhiteList, signingKey, expiresAfterSec, readSigningKey, readExpiresAfterSec)
- if !disableHttp {
+ if !ms.option.DisableHttp {
handleStaticResources2(r)
r.HandleFunc("/", ms.proxyToLeader(ms.uiStatusHandler))
r.HandleFunc("/ui/index.html", ms.uiStatusHandler)
@@ -113,7 +105,7 @@ func NewMasterServer(r *mux.Router, port int, metaFolder string,
r.HandleFunc("/{fileId}", ms.proxyToLeader(ms.redirectHandler))
}
- ms.Topo.StartRefreshWritableVolumes(ms.grpcDialOpiton, garbageThreshold, ms.preallocate)
+ ms.Topo.StartRefreshWritableVolumes(ms.grpcDialOpiton, ms.option.GarbageThreshold, ms.preallocateSize)
ms.startAdminScripts()
@@ -185,7 +177,7 @@ func (ms *MasterServer) startAdminScripts() {
scriptLines := strings.Split(adminScripts, "\n")
- masterAddress := "localhost:" + strconv.Itoa(ms.port)
+ masterAddress := "localhost:" + strconv.Itoa(ms.option.Port)
var shellOptions shell.ShellOptions
shellOptions.GrpcDialOption = security.LoadClientTLS(viper.Sub("grpc"), "master")
diff --git a/weed/server/master_server_handlers_admin.go b/weed/server/master_server_handlers_admin.go
index 244098515..343bcb8da 100644
--- a/weed/server/master_server_handlers_admin.go
+++ b/weed/server/master_server_handlers_admin.go
@@ -47,7 +47,7 @@ func (ms *MasterServer) dirStatusHandler(w http.ResponseWriter, r *http.Request)
func (ms *MasterServer) volumeVacuumHandler(w http.ResponseWriter, r *http.Request) {
gcString := r.FormValue("garbageThreshold")
- gcThreshold := ms.garbageThreshold
+ gcThreshold := ms.option.GarbageThreshold
if gcString != "" {
var err error
gcThreshold, err = strconv.ParseFloat(gcString, 32)
@@ -57,7 +57,7 @@ func (ms *MasterServer) volumeVacuumHandler(w http.ResponseWriter, r *http.Reque
}
}
glog.Infoln("garbageThreshold =", gcThreshold)
- ms.Topo.Vacuum(ms.grpcDialOpiton, gcThreshold, ms.preallocate)
+ ms.Topo.Vacuum(ms.grpcDialOpiton, gcThreshold, ms.preallocateSize)
ms.dirStatusHandler(w, r)
}
@@ -119,7 +119,7 @@ func (ms *MasterServer) selfUrl(r *http.Request) string {
if r.Host != "" {
return r.Host
}
- return "localhost:" + strconv.Itoa(ms.port)
+ return "localhost:" + strconv.Itoa(ms.option.Port)
}
func (ms *MasterServer) submitFromMasterServerHandler(w http.ResponseWriter, r *http.Request) {
if ms.Topo.IsLeader() {
@@ -142,7 +142,7 @@ func (ms *MasterServer) HasWritableVolume(option *topology.VolumeGrowOption) boo
func (ms *MasterServer) getVolumeGrowOption(r *http.Request) (*topology.VolumeGrowOption, error) {
replicationString := r.FormValue("replication")
if replicationString == "" {
- replicationString = ms.defaultReplicaPlacement
+ replicationString = ms.option.DefaultReplicaPlacement
}
replicaPlacement, err := storage.NewReplicaPlacementFromString(replicationString)
if err != nil {
@@ -152,7 +152,7 @@ func (ms *MasterServer) getVolumeGrowOption(r *http.Request) (*topology.VolumeGr
if err != nil {
return nil, err
}
- preallocate := ms.preallocate
+ preallocate := ms.preallocateSize
if r.FormValue("preallocate") != "" {
preallocate, err = strconv.ParseInt(r.FormValue("preallocate"), 10, 64)
if err != nil {