diff options
Diffstat (limited to 'weed/server/master_server.go')
| -rw-r--r-- | weed/server/master_server.go | 181 |
1 files changed, 133 insertions, 48 deletions
diff --git a/weed/server/master_server.go b/weed/server/master_server.go index f22925e56..3689b5495 100644 --- a/weed/server/master_server.go +++ b/weed/server/master_server.go @@ -2,10 +2,17 @@ package weed_server import ( "fmt" + "github.com/chrislusf/seaweedfs/weed/shell" + "google.golang.org/grpc" "net/http" "net/http/httputil" "net/url" + "os" + "regexp" + "strconv" + "strings" "sync" + "time" "github.com/chrislusf/raft" "github.com/chrislusf/seaweedfs/weed/glog" @@ -15,17 +22,28 @@ import ( "github.com/chrislusf/seaweedfs/weed/topology" "github.com/chrislusf/seaweedfs/weed/util" "github.com/gorilla/mux" + "github.com/spf13/viper" ) +type MasterOption struct { + Port int + MetaFolder string + VolumeSizeLimitMB uint + VolumePreallocate bool + PulseSeconds int + DefaultReplicaPlacement string + GarbageThreshold float64 + WhiteList []string + DisableHttp bool + MetricsAddress string + MetricsIntervalSec int +} + type MasterServer struct { - port int - metaFolder string - volumeSizeLimitMB uint - preallocate int64 - pulseSeconds int - defaultReplicaPlacement string - garbageThreshold float64 - guard *security.Guard + option *MasterOption + guard *security.Guard + + preallocateSize int64 Topo *topology.Topology vg *topology.VolumeGrowth @@ -36,56 +54,60 @@ type MasterServer struct { // notifying clients clientChansLock sync.RWMutex clientChans map[string]chan *master_pb.VolumeLocation + + grpcDialOpiton grpc.DialOption } -func NewMasterServer(r *mux.Router, port int, metaFolder string, - volumeSizeLimitMB uint, - preallocate bool, - pulseSeconds int, - defaultReplicaPlacement string, - garbageThreshold float64, - whiteList []string, - secureKey string, -) *MasterServer { +func NewMasterServer(r *mux.Router, option *MasterOption) *MasterServer { + + v := viper.GetViper() + signingKey := v.GetString("jwt.signing.key") + v.SetDefault("jwt.signing.expires_after_seconds", 10) + expiresAfterSec := v.GetInt("jwt.signing.expires_after_seconds") + + readSigningKey := v.GetString("jwt.signing.read.key") + v.SetDefault("jwt.signing.read.expires_after_seconds", 60) + readExpiresAfterSec := v.GetInt("jwt.signing.read.expires_after_seconds") var preallocateSize int64 - if preallocate { - preallocateSize = int64(volumeSizeLimitMB) * (1 << 20) + if option.VolumePreallocate { + preallocateSize = int64(option.VolumeSizeLimitMB) * (1 << 20) } ms := &MasterServer{ - port: port, - volumeSizeLimitMB: volumeSizeLimitMB, - preallocate: preallocateSize, - pulseSeconds: pulseSeconds, - defaultReplicaPlacement: defaultReplicaPlacement, - garbageThreshold: garbageThreshold, - clientChans: make(map[string]chan *master_pb.VolumeLocation), + option: option, + preallocateSize: preallocateSize, + clientChans: make(map[string]chan *master_pb.VolumeLocation), + grpcDialOpiton: security.LoadClientTLS(v.Sub("grpc"), "master"), } ms.bounedLeaderChan = make(chan int, 16) seq := sequence.NewMemorySequencer() - ms.Topo = topology.NewTopology("topo", seq, uint64(volumeSizeLimitMB)*1024*1024, pulseSeconds) + ms.Topo = topology.NewTopology("topo", seq, uint64(ms.option.VolumeSizeLimitMB)*1024*1024, ms.option.PulseSeconds) ms.vg = topology.NewDefaultVolumeGrowth() - glog.V(0).Infoln("Volume Size Limit is", volumeSizeLimitMB, "MB") - - ms.guard = security.NewGuard(whiteList, secureKey) - - handleStaticResources2(r) - r.HandleFunc("/", ms.uiStatusHandler) - r.HandleFunc("/ui/index.html", ms.uiStatusHandler) - r.HandleFunc("/dir/assign", ms.proxyToLeader(ms.guard.WhiteList(ms.dirAssignHandler))) - r.HandleFunc("/dir/lookup", ms.proxyToLeader(ms.guard.WhiteList(ms.dirLookupHandler))) - r.HandleFunc("/dir/status", ms.proxyToLeader(ms.guard.WhiteList(ms.dirStatusHandler))) - r.HandleFunc("/col/delete", ms.proxyToLeader(ms.guard.WhiteList(ms.collectionDeleteHandler))) - r.HandleFunc("/vol/grow", ms.proxyToLeader(ms.guard.WhiteList(ms.volumeGrowHandler))) - r.HandleFunc("/vol/status", ms.proxyToLeader(ms.guard.WhiteList(ms.volumeStatusHandler))) - r.HandleFunc("/vol/vacuum", ms.proxyToLeader(ms.guard.WhiteList(ms.volumeVacuumHandler))) - r.HandleFunc("/submit", ms.guard.WhiteList(ms.submitFromMasterServerHandler)) - r.HandleFunc("/stats/health", ms.guard.WhiteList(statsHealthHandler)) - r.HandleFunc("/stats/counter", ms.guard.WhiteList(statsCounterHandler)) - r.HandleFunc("/stats/memory", ms.guard.WhiteList(statsMemoryHandler)) - r.HandleFunc("/{fileId}", ms.proxyToLeader(ms.redirectHandler)) - - ms.Topo.StartRefreshWritableVolumes(garbageThreshold, ms.preallocate) + glog.V(0).Infoln("Volume Size Limit is", ms.option.VolumeSizeLimitMB, "MB") + + ms.guard = security.NewGuard(ms.option.WhiteList, signingKey, expiresAfterSec, readSigningKey, readExpiresAfterSec) + + if !ms.option.DisableHttp { + handleStaticResources2(r) + r.HandleFunc("/", ms.proxyToLeader(ms.uiStatusHandler)) + r.HandleFunc("/ui/index.html", ms.uiStatusHandler) + r.HandleFunc("/dir/assign", ms.proxyToLeader(ms.guard.WhiteList(ms.dirAssignHandler))) + r.HandleFunc("/dir/lookup", ms.proxyToLeader(ms.guard.WhiteList(ms.dirLookupHandler))) + r.HandleFunc("/dir/status", ms.proxyToLeader(ms.guard.WhiteList(ms.dirStatusHandler))) + r.HandleFunc("/col/delete", ms.proxyToLeader(ms.guard.WhiteList(ms.collectionDeleteHandler))) + r.HandleFunc("/vol/grow", ms.proxyToLeader(ms.guard.WhiteList(ms.volumeGrowHandler))) + r.HandleFunc("/vol/status", ms.proxyToLeader(ms.guard.WhiteList(ms.volumeStatusHandler))) + r.HandleFunc("/vol/vacuum", ms.proxyToLeader(ms.guard.WhiteList(ms.volumeVacuumHandler))) + r.HandleFunc("/submit", ms.guard.WhiteList(ms.submitFromMasterServerHandler)) + r.HandleFunc("/stats/health", ms.guard.WhiteList(statsHealthHandler)) + r.HandleFunc("/stats/counter", ms.guard.WhiteList(statsCounterHandler)) + r.HandleFunc("/stats/memory", ms.guard.WhiteList(statsMemoryHandler)) + r.HandleFunc("/{fileId}", ms.proxyToLeader(ms.redirectHandler)) + } + + ms.Topo.StartRefreshWritableVolumes(ms.grpcDialOpiton, ms.option.GarbageThreshold, ms.preallocateSize) + + ms.startAdminScripts() return ms } @@ -98,6 +120,9 @@ func (ms *MasterServer) SetRaftServer(raftServer *RaftServer) { glog.V(0).Infoln("[", ms.Topo.RaftServer.Name(), "]", ms.Topo.RaftServer.Leader(), "becomes leader.") } }) + ms.Topo.RaftServer.AddEventListener(raft.StateChangeEventType, func(e raft.Event) { + glog.V(0).Infof("state change: %+v", e) + }) if ms.Topo.IsLeader() { glog.V(0).Infoln("[", ms.Topo.RaftServer.Name(), "]", "I am the leader!") } else { @@ -138,3 +163,63 @@ func (ms *MasterServer) proxyToLeader(f func(w http.ResponseWriter, r *http.Requ } } } + +func (ms *MasterServer) startAdminScripts() { + v := viper.GetViper() + adminScripts := v.GetString("master.maintenance.scripts") + v.SetDefault("master.maintenance.sleep_minutes", 17) + sleepMinutes := v.GetInt("master.maintenance.sleep_minutes") + + glog.V(0).Infof("adminScripts:\n%v", adminScripts) + if adminScripts == "" { + return + } + + scriptLines := strings.Split(adminScripts, "\n") + + masterAddress := "localhost:" + strconv.Itoa(ms.option.Port) + + var shellOptions shell.ShellOptions + shellOptions.GrpcDialOption = security.LoadClientTLS(viper.Sub("grpc"), "master") + shellOptions.Masters = &masterAddress + shellOptions.FilerHost = "localhost" + shellOptions.FilerPort = 8888 + shellOptions.Directory = "/" + + commandEnv := shell.NewCommandEnv(shellOptions) + + reg, _ := regexp.Compile(`'.*?'|".*?"|\S+`) + + go commandEnv.MasterClient.KeepConnectedToMaster() + + go func() { + commandEnv.MasterClient.WaitUntilConnected() + + c := time.Tick(time.Duration(sleepMinutes) * time.Minute) + for _ = range c { + if ms.Topo.IsLeader() { + for _, line := range scriptLines { + + cmds := reg.FindAllString(line, -1) + if len(cmds) == 0 { + continue + } + args := make([]string, len(cmds[1:])) + for i := range args { + args[i] = strings.Trim(string(cmds[1+i]), "\"'") + } + cmd := strings.ToLower(cmds[0]) + + for _, c := range shell.Commands { + if c.Name() == cmd { + glog.V(0).Infof("executing: %s %v", cmd, args) + if err := c.Do(args, commandEnv, os.Stdout); err != nil { + glog.V(0).Infof("error: %v", err) + } + } + } + } + } + } + }() +} |
