aboutsummaryrefslogtreecommitdiff
path: root/weed/server/master_server.go
diff options
context:
space:
mode:
Diffstat (limited to 'weed/server/master_server.go')
-rw-r--r--weed/server/master_server.go181
1 files changed, 133 insertions, 48 deletions
diff --git a/weed/server/master_server.go b/weed/server/master_server.go
index f22925e56..3689b5495 100644
--- a/weed/server/master_server.go
+++ b/weed/server/master_server.go
@@ -2,10 +2,17 @@ package weed_server
import (
"fmt"
+ "github.com/chrislusf/seaweedfs/weed/shell"
+ "google.golang.org/grpc"
"net/http"
"net/http/httputil"
"net/url"
+ "os"
+ "regexp"
+ "strconv"
+ "strings"
"sync"
+ "time"
"github.com/chrislusf/raft"
"github.com/chrislusf/seaweedfs/weed/glog"
@@ -15,17 +22,28 @@ import (
"github.com/chrislusf/seaweedfs/weed/topology"
"github.com/chrislusf/seaweedfs/weed/util"
"github.com/gorilla/mux"
+ "github.com/spf13/viper"
)
+type MasterOption struct {
+ Port int
+ MetaFolder string
+ VolumeSizeLimitMB uint
+ VolumePreallocate bool
+ PulseSeconds int
+ DefaultReplicaPlacement string
+ GarbageThreshold float64
+ WhiteList []string
+ DisableHttp bool
+ MetricsAddress string
+ MetricsIntervalSec int
+}
+
type MasterServer struct {
- port int
- metaFolder string
- volumeSizeLimitMB uint
- preallocate int64
- pulseSeconds int
- defaultReplicaPlacement string
- garbageThreshold float64
- guard *security.Guard
+ option *MasterOption
+ guard *security.Guard
+
+ preallocateSize int64
Topo *topology.Topology
vg *topology.VolumeGrowth
@@ -36,56 +54,60 @@ type MasterServer struct {
// notifying clients
clientChansLock sync.RWMutex
clientChans map[string]chan *master_pb.VolumeLocation
+
+ grpcDialOpiton grpc.DialOption
}
-func NewMasterServer(r *mux.Router, port int, metaFolder string,
- volumeSizeLimitMB uint,
- preallocate bool,
- pulseSeconds int,
- defaultReplicaPlacement string,
- garbageThreshold float64,
- whiteList []string,
- secureKey string,
-) *MasterServer {
+func NewMasterServer(r *mux.Router, option *MasterOption) *MasterServer {
+
+ v := viper.GetViper()
+ signingKey := v.GetString("jwt.signing.key")
+ v.SetDefault("jwt.signing.expires_after_seconds", 10)
+ expiresAfterSec := v.GetInt("jwt.signing.expires_after_seconds")
+
+ readSigningKey := v.GetString("jwt.signing.read.key")
+ v.SetDefault("jwt.signing.read.expires_after_seconds", 60)
+ readExpiresAfterSec := v.GetInt("jwt.signing.read.expires_after_seconds")
var preallocateSize int64
- if preallocate {
- preallocateSize = int64(volumeSizeLimitMB) * (1 << 20)
+ if option.VolumePreallocate {
+ preallocateSize = int64(option.VolumeSizeLimitMB) * (1 << 20)
}
ms := &MasterServer{
- port: port,
- volumeSizeLimitMB: volumeSizeLimitMB,
- preallocate: preallocateSize,
- pulseSeconds: pulseSeconds,
- defaultReplicaPlacement: defaultReplicaPlacement,
- garbageThreshold: garbageThreshold,
- clientChans: make(map[string]chan *master_pb.VolumeLocation),
+ option: option,
+ preallocateSize: preallocateSize,
+ clientChans: make(map[string]chan *master_pb.VolumeLocation),
+ grpcDialOpiton: security.LoadClientTLS(v.Sub("grpc"), "master"),
}
ms.bounedLeaderChan = make(chan int, 16)
seq := sequence.NewMemorySequencer()
- ms.Topo = topology.NewTopology("topo", seq, uint64(volumeSizeLimitMB)*1024*1024, pulseSeconds)
+ ms.Topo = topology.NewTopology("topo", seq, uint64(ms.option.VolumeSizeLimitMB)*1024*1024, ms.option.PulseSeconds)
ms.vg = topology.NewDefaultVolumeGrowth()
- glog.V(0).Infoln("Volume Size Limit is", volumeSizeLimitMB, "MB")
-
- ms.guard = security.NewGuard(whiteList, secureKey)
-
- handleStaticResources2(r)
- r.HandleFunc("/", ms.uiStatusHandler)
- r.HandleFunc("/ui/index.html", ms.uiStatusHandler)
- r.HandleFunc("/dir/assign", ms.proxyToLeader(ms.guard.WhiteList(ms.dirAssignHandler)))
- r.HandleFunc("/dir/lookup", ms.proxyToLeader(ms.guard.WhiteList(ms.dirLookupHandler)))
- r.HandleFunc("/dir/status", ms.proxyToLeader(ms.guard.WhiteList(ms.dirStatusHandler)))
- r.HandleFunc("/col/delete", ms.proxyToLeader(ms.guard.WhiteList(ms.collectionDeleteHandler)))
- r.HandleFunc("/vol/grow", ms.proxyToLeader(ms.guard.WhiteList(ms.volumeGrowHandler)))
- r.HandleFunc("/vol/status", ms.proxyToLeader(ms.guard.WhiteList(ms.volumeStatusHandler)))
- r.HandleFunc("/vol/vacuum", ms.proxyToLeader(ms.guard.WhiteList(ms.volumeVacuumHandler)))
- r.HandleFunc("/submit", ms.guard.WhiteList(ms.submitFromMasterServerHandler))
- r.HandleFunc("/stats/health", ms.guard.WhiteList(statsHealthHandler))
- r.HandleFunc("/stats/counter", ms.guard.WhiteList(statsCounterHandler))
- r.HandleFunc("/stats/memory", ms.guard.WhiteList(statsMemoryHandler))
- r.HandleFunc("/{fileId}", ms.proxyToLeader(ms.redirectHandler))
-
- ms.Topo.StartRefreshWritableVolumes(garbageThreshold, ms.preallocate)
+ glog.V(0).Infoln("Volume Size Limit is", ms.option.VolumeSizeLimitMB, "MB")
+
+ ms.guard = security.NewGuard(ms.option.WhiteList, signingKey, expiresAfterSec, readSigningKey, readExpiresAfterSec)
+
+ if !ms.option.DisableHttp {
+ handleStaticResources2(r)
+ r.HandleFunc("/", ms.proxyToLeader(ms.uiStatusHandler))
+ r.HandleFunc("/ui/index.html", ms.uiStatusHandler)
+ r.HandleFunc("/dir/assign", ms.proxyToLeader(ms.guard.WhiteList(ms.dirAssignHandler)))
+ r.HandleFunc("/dir/lookup", ms.proxyToLeader(ms.guard.WhiteList(ms.dirLookupHandler)))
+ r.HandleFunc("/dir/status", ms.proxyToLeader(ms.guard.WhiteList(ms.dirStatusHandler)))
+ r.HandleFunc("/col/delete", ms.proxyToLeader(ms.guard.WhiteList(ms.collectionDeleteHandler)))
+ r.HandleFunc("/vol/grow", ms.proxyToLeader(ms.guard.WhiteList(ms.volumeGrowHandler)))
+ r.HandleFunc("/vol/status", ms.proxyToLeader(ms.guard.WhiteList(ms.volumeStatusHandler)))
+ r.HandleFunc("/vol/vacuum", ms.proxyToLeader(ms.guard.WhiteList(ms.volumeVacuumHandler)))
+ r.HandleFunc("/submit", ms.guard.WhiteList(ms.submitFromMasterServerHandler))
+ r.HandleFunc("/stats/health", ms.guard.WhiteList(statsHealthHandler))
+ r.HandleFunc("/stats/counter", ms.guard.WhiteList(statsCounterHandler))
+ r.HandleFunc("/stats/memory", ms.guard.WhiteList(statsMemoryHandler))
+ r.HandleFunc("/{fileId}", ms.proxyToLeader(ms.redirectHandler))
+ }
+
+ ms.Topo.StartRefreshWritableVolumes(ms.grpcDialOpiton, ms.option.GarbageThreshold, ms.preallocateSize)
+
+ ms.startAdminScripts()
return ms
}
@@ -98,6 +120,9 @@ func (ms *MasterServer) SetRaftServer(raftServer *RaftServer) {
glog.V(0).Infoln("[", ms.Topo.RaftServer.Name(), "]", ms.Topo.RaftServer.Leader(), "becomes leader.")
}
})
+ ms.Topo.RaftServer.AddEventListener(raft.StateChangeEventType, func(e raft.Event) {
+ glog.V(0).Infof("state change: %+v", e)
+ })
if ms.Topo.IsLeader() {
glog.V(0).Infoln("[", ms.Topo.RaftServer.Name(), "]", "I am the leader!")
} else {
@@ -138,3 +163,63 @@ func (ms *MasterServer) proxyToLeader(f func(w http.ResponseWriter, r *http.Requ
}
}
}
+
+func (ms *MasterServer) startAdminScripts() {
+ v := viper.GetViper()
+ adminScripts := v.GetString("master.maintenance.scripts")
+ v.SetDefault("master.maintenance.sleep_minutes", 17)
+ sleepMinutes := v.GetInt("master.maintenance.sleep_minutes")
+
+ glog.V(0).Infof("adminScripts:\n%v", adminScripts)
+ if adminScripts == "" {
+ return
+ }
+
+ scriptLines := strings.Split(adminScripts, "\n")
+
+ masterAddress := "localhost:" + strconv.Itoa(ms.option.Port)
+
+ var shellOptions shell.ShellOptions
+ shellOptions.GrpcDialOption = security.LoadClientTLS(viper.Sub("grpc"), "master")
+ shellOptions.Masters = &masterAddress
+ shellOptions.FilerHost = "localhost"
+ shellOptions.FilerPort = 8888
+ shellOptions.Directory = "/"
+
+ commandEnv := shell.NewCommandEnv(shellOptions)
+
+ reg, _ := regexp.Compile(`'.*?'|".*?"|\S+`)
+
+ go commandEnv.MasterClient.KeepConnectedToMaster()
+
+ go func() {
+ commandEnv.MasterClient.WaitUntilConnected()
+
+ c := time.Tick(time.Duration(sleepMinutes) * time.Minute)
+ for _ = range c {
+ if ms.Topo.IsLeader() {
+ for _, line := range scriptLines {
+
+ cmds := reg.FindAllString(line, -1)
+ if len(cmds) == 0 {
+ continue
+ }
+ args := make([]string, len(cmds[1:]))
+ for i := range args {
+ args[i] = strings.Trim(string(cmds[1+i]), "\"'")
+ }
+ cmd := strings.ToLower(cmds[0])
+
+ for _, c := range shell.Commands {
+ if c.Name() == cmd {
+ glog.V(0).Infof("executing: %s %v", cmd, args)
+ if err := c.Do(args, commandEnv, os.Stdout); err != nil {
+ glog.V(0).Infof("error: %v", err)
+ }
+ }
+ }
+ }
+ }
+ }
+ }()
+}