diff options
Diffstat (limited to 'weed/server/master_server.go')
| -rw-r--r-- | weed/server/master_server.go | 235 |
1 files changed, 183 insertions, 52 deletions
diff --git a/weed/server/master_server.go b/weed/server/master_server.go index f22925e56..a9ae6b888 100644 --- a/weed/server/master_server.go +++ b/weed/server/master_server.go @@ -5,27 +5,51 @@ import ( "net/http" "net/http/httputil" "net/url" + "os" + "regexp" + "strconv" + "strings" "sync" + "time" "github.com/chrislusf/raft" + "github.com/gorilla/mux" + "google.golang.org/grpc" + "github.com/chrislusf/seaweedfs/weed/glog" "github.com/chrislusf/seaweedfs/weed/pb/master_pb" "github.com/chrislusf/seaweedfs/weed/security" "github.com/chrislusf/seaweedfs/weed/sequence" + "github.com/chrislusf/seaweedfs/weed/shell" "github.com/chrislusf/seaweedfs/weed/topology" "github.com/chrislusf/seaweedfs/weed/util" - "github.com/gorilla/mux" + "github.com/chrislusf/seaweedfs/weed/wdclient" ) +const ( + SequencerType = "master.sequencer.type" + SequencerEtcdUrls = "master.sequencer.sequencer_etcd_urls" +) + +type MasterOption struct { + Port int + MetaFolder string + VolumeSizeLimitMB uint + VolumePreallocate bool + PulseSeconds int + DefaultReplicaPlacement string + GarbageThreshold float64 + WhiteList []string + DisableHttp bool + MetricsAddress string + MetricsIntervalSec int +} + type MasterServer struct { - port int - metaFolder string - volumeSizeLimitMB uint - preallocate int64 - pulseSeconds int - defaultReplicaPlacement string - garbageThreshold float64 - guard *security.Guard + option *MasterOption + guard *security.Guard + + preallocateSize int64 Topo *topology.Topology vg *topology.VolumeGrowth @@ -36,56 +60,71 @@ type MasterServer struct { // notifying clients clientChansLock sync.RWMutex clientChans map[string]chan *master_pb.VolumeLocation + + grpcDialOption grpc.DialOption + + MasterClient *wdclient.MasterClient } -func NewMasterServer(r *mux.Router, port int, metaFolder string, - volumeSizeLimitMB uint, - preallocate bool, - pulseSeconds int, - defaultReplicaPlacement string, - garbageThreshold float64, - whiteList []string, - secureKey string, -) *MasterServer { +func NewMasterServer(r *mux.Router, option *MasterOption, peers []string) *MasterServer { + + v := util.GetViper() + signingKey := v.GetString("jwt.signing.key") + v.SetDefault("jwt.signing.expires_after_seconds", 10) + expiresAfterSec := v.GetInt("jwt.signing.expires_after_seconds") + + readSigningKey := v.GetString("jwt.signing.read.key") + v.SetDefault("jwt.signing.read.expires_after_seconds", 60) + readExpiresAfterSec := v.GetInt("jwt.signing.read.expires_after_seconds") var preallocateSize int64 - if preallocate { - preallocateSize = int64(volumeSizeLimitMB) * (1 << 20) + if option.VolumePreallocate { + preallocateSize = int64(option.VolumeSizeLimitMB) * (1 << 20) } + + grpcDialOption := security.LoadClientTLS(v, "grpc.master") ms := &MasterServer{ - port: port, - volumeSizeLimitMB: volumeSizeLimitMB, - preallocate: preallocateSize, - pulseSeconds: pulseSeconds, - defaultReplicaPlacement: defaultReplicaPlacement, - garbageThreshold: garbageThreshold, - clientChans: make(map[string]chan *master_pb.VolumeLocation), + option: option, + preallocateSize: preallocateSize, + clientChans: make(map[string]chan *master_pb.VolumeLocation), + grpcDialOption: grpcDialOption, + MasterClient: wdclient.NewMasterClient(grpcDialOption, "master", 0, peers), } ms.bounedLeaderChan = make(chan int, 16) - seq := sequence.NewMemorySequencer() - ms.Topo = topology.NewTopology("topo", seq, uint64(volumeSizeLimitMB)*1024*1024, pulseSeconds) + + seq := ms.createSequencer(option) + if nil == seq { + glog.Fatalf("create sequencer failed.") + } + ms.Topo = topology.NewTopology("topo", seq, uint64(ms.option.VolumeSizeLimitMB)*1024*1024, ms.option.PulseSeconds) ms.vg = topology.NewDefaultVolumeGrowth() - glog.V(0).Infoln("Volume Size Limit is", volumeSizeLimitMB, "MB") - - ms.guard = security.NewGuard(whiteList, secureKey) - - handleStaticResources2(r) - r.HandleFunc("/", ms.uiStatusHandler) - r.HandleFunc("/ui/index.html", ms.uiStatusHandler) - r.HandleFunc("/dir/assign", ms.proxyToLeader(ms.guard.WhiteList(ms.dirAssignHandler))) - r.HandleFunc("/dir/lookup", ms.proxyToLeader(ms.guard.WhiteList(ms.dirLookupHandler))) - r.HandleFunc("/dir/status", ms.proxyToLeader(ms.guard.WhiteList(ms.dirStatusHandler))) - r.HandleFunc("/col/delete", ms.proxyToLeader(ms.guard.WhiteList(ms.collectionDeleteHandler))) - r.HandleFunc("/vol/grow", ms.proxyToLeader(ms.guard.WhiteList(ms.volumeGrowHandler))) - r.HandleFunc("/vol/status", ms.proxyToLeader(ms.guard.WhiteList(ms.volumeStatusHandler))) - r.HandleFunc("/vol/vacuum", ms.proxyToLeader(ms.guard.WhiteList(ms.volumeVacuumHandler))) - r.HandleFunc("/submit", ms.guard.WhiteList(ms.submitFromMasterServerHandler)) - r.HandleFunc("/stats/health", ms.guard.WhiteList(statsHealthHandler)) - r.HandleFunc("/stats/counter", ms.guard.WhiteList(statsCounterHandler)) - r.HandleFunc("/stats/memory", ms.guard.WhiteList(statsMemoryHandler)) - r.HandleFunc("/{fileId}", ms.proxyToLeader(ms.redirectHandler)) - - ms.Topo.StartRefreshWritableVolumes(garbageThreshold, ms.preallocate) + glog.V(0).Infoln("Volume Size Limit is", ms.option.VolumeSizeLimitMB, "MB") + + ms.guard = security.NewGuard(ms.option.WhiteList, signingKey, expiresAfterSec, readSigningKey, readExpiresAfterSec) + + if !ms.option.DisableHttp { + handleStaticResources2(r) + r.HandleFunc("/", ms.proxyToLeader(ms.uiStatusHandler)) + r.HandleFunc("/ui/index.html", ms.uiStatusHandler) + r.HandleFunc("/dir/assign", ms.proxyToLeader(ms.guard.WhiteList(ms.dirAssignHandler))) + r.HandleFunc("/dir/lookup", ms.guard.WhiteList(ms.dirLookupHandler)) + r.HandleFunc("/dir/status", ms.proxyToLeader(ms.guard.WhiteList(ms.dirStatusHandler))) + r.HandleFunc("/col/delete", ms.proxyToLeader(ms.guard.WhiteList(ms.collectionDeleteHandler))) + r.HandleFunc("/vol/grow", ms.proxyToLeader(ms.guard.WhiteList(ms.volumeGrowHandler))) + r.HandleFunc("/vol/status", ms.proxyToLeader(ms.guard.WhiteList(ms.volumeStatusHandler))) + r.HandleFunc("/vol/vacuum", ms.proxyToLeader(ms.guard.WhiteList(ms.volumeVacuumHandler))) + r.HandleFunc("/submit", ms.guard.WhiteList(ms.submitFromMasterServerHandler)) + /* + r.HandleFunc("/stats/health", ms.guard.WhiteList(statsHealthHandler)) + r.HandleFunc("/stats/counter", ms.guard.WhiteList(statsCounterHandler)) + r.HandleFunc("/stats/memory", ms.guard.WhiteList(statsMemoryHandler)) + */ + r.HandleFunc("/{fileId}", ms.redirectHandler) + } + + ms.Topo.StartRefreshWritableVolumes(ms.grpcDialOption, ms.option.GarbageThreshold, ms.preallocateSize) + + ms.startAdminScripts() return ms } @@ -98,6 +137,9 @@ func (ms *MasterServer) SetRaftServer(raftServer *RaftServer) { glog.V(0).Infoln("[", ms.Topo.RaftServer.Name(), "]", ms.Topo.RaftServer.Leader(), "becomes leader.") } }) + ms.Topo.RaftServer.AddEventListener(raft.StateChangeEventType, func(e raft.Event) { + glog.V(0).Infof("state change: %+v", e) + }) if ms.Topo.IsLeader() { glog.V(0).Infoln("[", ms.Topo.RaftServer.Name(), "]", "I am the leader!") } else { @@ -133,8 +175,97 @@ func (ms *MasterServer) proxyToLeader(f func(w http.ResponseWriter, r *http.Requ proxy.Transport = util.Transport proxy.ServeHTTP(w, r) } else { - //drop it to the floor - //writeJsonError(w, r, errors.New(ms.Topo.RaftServer.Name()+" does not know Leader yet:"+ms.Topo.RaftServer.Leader())) + // drop it to the floor + // writeJsonError(w, r, errors.New(ms.Topo.RaftServer.Name()+" does not know Leader yet:"+ms.Topo.RaftServer.Leader())) + } + } +} + +func (ms *MasterServer) startAdminScripts() { + var err error + + v := util.GetViper() + adminScripts := v.GetString("master.maintenance.scripts") + glog.V(0).Infof("adminScripts:\n%v", adminScripts) + if adminScripts == "" { + return + } + + v.SetDefault("master.maintenance.sleep_minutes", 17) + sleepMinutes := v.GetInt("master.maintenance.sleep_minutes") + + v.SetDefault("master.filer.default_filer_url", "http://localhost:8888/") + filerURL := v.GetString("master.filer.default_filer_url") + + scriptLines := strings.Split(adminScripts, "\n") + + masterAddress := "localhost:" + strconv.Itoa(ms.option.Port) + + var shellOptions shell.ShellOptions + shellOptions.GrpcDialOption = security.LoadClientTLS(v, "grpc.master") + shellOptions.Masters = &masterAddress + + shellOptions.FilerHost, shellOptions.FilerPort, shellOptions.Directory, err = util.ParseFilerUrl(filerURL) + if err != nil { + glog.V(0).Infof("failed to parse master.filer.default_filer_urll=%s : %v\n", filerURL, err) + return + } + + commandEnv := shell.NewCommandEnv(shellOptions) + + reg, _ := regexp.Compile(`'.*?'|".*?"|\S+`) + + go commandEnv.MasterClient.KeepConnectedToMaster() + + go func() { + commandEnv.MasterClient.WaitUntilConnected() + + c := time.Tick(time.Duration(sleepMinutes) * time.Minute) + for range c { + if ms.Topo.IsLeader() { + for _, line := range scriptLines { + + cmds := reg.FindAllString(line, -1) + if len(cmds) == 0 { + continue + } + args := make([]string, len(cmds[1:])) + for i := range args { + args[i] = strings.Trim(string(cmds[1+i]), "\"'") + } + cmd := strings.ToLower(cmds[0]) + + for _, c := range shell.Commands { + if c.Name() == cmd { + glog.V(0).Infof("executing: %s %v", cmd, args) + if err := c.Do(args, commandEnv, os.Stdout); err != nil { + glog.V(0).Infof("error: %v", err) + } + } + } + } + } + } + }() +} + +func (ms *MasterServer) createSequencer(option *MasterOption) sequence.Sequencer { + var seq sequence.Sequencer + v := util.GetViper() + seqType := strings.ToLower(v.GetString(SequencerType)) + glog.V(1).Infof("[%s] : [%s]", SequencerType, seqType) + switch strings.ToLower(seqType) { + case "etcd": + var err error + urls := v.GetString(SequencerEtcdUrls) + glog.V(0).Infof("[%s] : [%s]", SequencerEtcdUrls, urls) + seq, err = sequence.NewEtcdSequencer(urls, option.MetaFolder) + if err != nil { + glog.Error(err) + seq = nil } + default: + seq = sequence.NewMemorySequencer() } + return seq } |
