aboutsummaryrefslogtreecommitdiff
path: root/weed/server/master_server.go
diff options
context:
space:
mode:
Diffstat (limited to 'weed/server/master_server.go')
-rw-r--r--weed/server/master_server.go254
1 files changed, 201 insertions, 53 deletions
diff --git a/weed/server/master_server.go b/weed/server/master_server.go
index f22925e56..9a490bb1f 100644
--- a/weed/server/master_server.go
+++ b/weed/server/master_server.go
@@ -5,27 +5,52 @@ import (
"net/http"
"net/http/httputil"
"net/url"
+ "os"
+ "regexp"
+ "strconv"
+ "strings"
"sync"
+ "time"
"github.com/chrislusf/raft"
+ "github.com/gorilla/mux"
+ "google.golang.org/grpc"
+
"github.com/chrislusf/seaweedfs/weed/glog"
"github.com/chrislusf/seaweedfs/weed/pb/master_pb"
"github.com/chrislusf/seaweedfs/weed/security"
"github.com/chrislusf/seaweedfs/weed/sequence"
+ "github.com/chrislusf/seaweedfs/weed/shell"
"github.com/chrislusf/seaweedfs/weed/topology"
"github.com/chrislusf/seaweedfs/weed/util"
- "github.com/gorilla/mux"
+ "github.com/chrislusf/seaweedfs/weed/wdclient"
)
+const (
+ SequencerType = "master.sequencer.type"
+ SequencerEtcdUrls = "master.sequencer.sequencer_etcd_urls"
+)
+
+type MasterOption struct {
+ Host string
+ Port int
+ MetaFolder string
+ VolumeSizeLimitMB uint
+ VolumePreallocate bool
+ // PulseSeconds int
+ DefaultReplicaPlacement string
+ GarbageThreshold float64
+ WhiteList []string
+ DisableHttp bool
+ MetricsAddress string
+ MetricsIntervalSec int
+}
+
type MasterServer struct {
- port int
- metaFolder string
- volumeSizeLimitMB uint
- preallocate int64
- pulseSeconds int
- defaultReplicaPlacement string
- garbageThreshold float64
- guard *security.Guard
+ option *MasterOption
+ guard *security.Guard
+
+ preallocateSize int64
Topo *topology.Topology
vg *topology.VolumeGrowth
@@ -36,56 +61,77 @@ type MasterServer struct {
// notifying clients
clientChansLock sync.RWMutex
clientChans map[string]chan *master_pb.VolumeLocation
+
+ grpcDialOption grpc.DialOption
+
+ MasterClient *wdclient.MasterClient
+
+ adminLocks *AdminLocks
}
-func NewMasterServer(r *mux.Router, port int, metaFolder string,
- volumeSizeLimitMB uint,
- preallocate bool,
- pulseSeconds int,
- defaultReplicaPlacement string,
- garbageThreshold float64,
- whiteList []string,
- secureKey string,
-) *MasterServer {
+func NewMasterServer(r *mux.Router, option *MasterOption, peers []string) *MasterServer {
+
+ v := util.GetViper()
+ signingKey := v.GetString("jwt.signing.key")
+ v.SetDefault("jwt.signing.expires_after_seconds", 10)
+ expiresAfterSec := v.GetInt("jwt.signing.expires_after_seconds")
+
+ readSigningKey := v.GetString("jwt.signing.read.key")
+ v.SetDefault("jwt.signing.read.expires_after_seconds", 60)
+ readExpiresAfterSec := v.GetInt("jwt.signing.read.expires_after_seconds")
+
+ v.SetDefault("master.replication.treat_replication_as_minimums", false)
+ replicationAsMin := v.GetBool("master.replication.treat_replication_as_minimums")
var preallocateSize int64
- if preallocate {
- preallocateSize = int64(volumeSizeLimitMB) * (1 << 20)
+ if option.VolumePreallocate {
+ preallocateSize = int64(option.VolumeSizeLimitMB) * (1 << 20)
}
+
+ grpcDialOption := security.LoadClientTLS(v, "grpc.master")
ms := &MasterServer{
- port: port,
- volumeSizeLimitMB: volumeSizeLimitMB,
- preallocate: preallocateSize,
- pulseSeconds: pulseSeconds,
- defaultReplicaPlacement: defaultReplicaPlacement,
- garbageThreshold: garbageThreshold,
- clientChans: make(map[string]chan *master_pb.VolumeLocation),
+ option: option,
+ preallocateSize: preallocateSize,
+ clientChans: make(map[string]chan *master_pb.VolumeLocation),
+ grpcDialOption: grpcDialOption,
+ MasterClient: wdclient.NewMasterClient(grpcDialOption, "master", option.Host, 0, peers),
+ adminLocks: NewAdminLocks(),
}
ms.bounedLeaderChan = make(chan int, 16)
- seq := sequence.NewMemorySequencer()
- ms.Topo = topology.NewTopology("topo", seq, uint64(volumeSizeLimitMB)*1024*1024, pulseSeconds)
+
+ seq := ms.createSequencer(option)
+ if nil == seq {
+ glog.Fatalf("create sequencer failed.")
+ }
+ ms.Topo = topology.NewTopology("topo", seq, uint64(ms.option.VolumeSizeLimitMB)*1024*1024, 5, replicationAsMin)
ms.vg = topology.NewDefaultVolumeGrowth()
- glog.V(0).Infoln("Volume Size Limit is", volumeSizeLimitMB, "MB")
-
- ms.guard = security.NewGuard(whiteList, secureKey)
-
- handleStaticResources2(r)
- r.HandleFunc("/", ms.uiStatusHandler)
- r.HandleFunc("/ui/index.html", ms.uiStatusHandler)
- r.HandleFunc("/dir/assign", ms.proxyToLeader(ms.guard.WhiteList(ms.dirAssignHandler)))
- r.HandleFunc("/dir/lookup", ms.proxyToLeader(ms.guard.WhiteList(ms.dirLookupHandler)))
- r.HandleFunc("/dir/status", ms.proxyToLeader(ms.guard.WhiteList(ms.dirStatusHandler)))
- r.HandleFunc("/col/delete", ms.proxyToLeader(ms.guard.WhiteList(ms.collectionDeleteHandler)))
- r.HandleFunc("/vol/grow", ms.proxyToLeader(ms.guard.WhiteList(ms.volumeGrowHandler)))
- r.HandleFunc("/vol/status", ms.proxyToLeader(ms.guard.WhiteList(ms.volumeStatusHandler)))
- r.HandleFunc("/vol/vacuum", ms.proxyToLeader(ms.guard.WhiteList(ms.volumeVacuumHandler)))
- r.HandleFunc("/submit", ms.guard.WhiteList(ms.submitFromMasterServerHandler))
- r.HandleFunc("/stats/health", ms.guard.WhiteList(statsHealthHandler))
- r.HandleFunc("/stats/counter", ms.guard.WhiteList(statsCounterHandler))
- r.HandleFunc("/stats/memory", ms.guard.WhiteList(statsMemoryHandler))
- r.HandleFunc("/{fileId}", ms.proxyToLeader(ms.redirectHandler))
-
- ms.Topo.StartRefreshWritableVolumes(garbageThreshold, ms.preallocate)
+ glog.V(0).Infoln("Volume Size Limit is", ms.option.VolumeSizeLimitMB, "MB")
+
+ ms.guard = security.NewGuard(ms.option.WhiteList, signingKey, expiresAfterSec, readSigningKey, readExpiresAfterSec)
+
+ if !ms.option.DisableHttp {
+ handleStaticResources2(r)
+ r.HandleFunc("/", ms.proxyToLeader(ms.uiStatusHandler))
+ r.HandleFunc("/ui/index.html", ms.uiStatusHandler)
+ r.HandleFunc("/dir/assign", ms.proxyToLeader(ms.guard.WhiteList(ms.dirAssignHandler)))
+ r.HandleFunc("/dir/lookup", ms.guard.WhiteList(ms.dirLookupHandler))
+ r.HandleFunc("/dir/status", ms.proxyToLeader(ms.guard.WhiteList(ms.dirStatusHandler)))
+ r.HandleFunc("/col/delete", ms.proxyToLeader(ms.guard.WhiteList(ms.collectionDeleteHandler)))
+ r.HandleFunc("/vol/grow", ms.proxyToLeader(ms.guard.WhiteList(ms.volumeGrowHandler)))
+ r.HandleFunc("/vol/status", ms.proxyToLeader(ms.guard.WhiteList(ms.volumeStatusHandler)))
+ r.HandleFunc("/vol/vacuum", ms.proxyToLeader(ms.guard.WhiteList(ms.volumeVacuumHandler)))
+ r.HandleFunc("/submit", ms.guard.WhiteList(ms.submitFromMasterServerHandler))
+ /*
+ r.HandleFunc("/stats/health", ms.guard.WhiteList(statsHealthHandler))
+ r.HandleFunc("/stats/counter", ms.guard.WhiteList(statsCounterHandler))
+ r.HandleFunc("/stats/memory", ms.guard.WhiteList(statsMemoryHandler))
+ */
+ r.HandleFunc("/{fileId}", ms.redirectHandler)
+ }
+
+ ms.Topo.StartRefreshWritableVolumes(ms.grpcDialOption, ms.option.GarbageThreshold, ms.preallocateSize)
+
+ ms.startAdminScripts()
return ms
}
@@ -98,6 +144,9 @@ func (ms *MasterServer) SetRaftServer(raftServer *RaftServer) {
glog.V(0).Infoln("[", ms.Topo.RaftServer.Name(), "]", ms.Topo.RaftServer.Leader(), "becomes leader.")
}
})
+ ms.Topo.RaftServer.AddEventListener(raft.StateChangeEventType, func(e raft.Event) {
+ glog.V(0).Infof("state change: %+v", e)
+ })
if ms.Topo.IsLeader() {
glog.V(0).Infoln("[", ms.Topo.RaftServer.Name(), "]", "I am the leader!")
} else {
@@ -107,7 +156,7 @@ func (ms *MasterServer) SetRaftServer(raftServer *RaftServer) {
}
}
-func (ms *MasterServer) proxyToLeader(f func(w http.ResponseWriter, r *http.Request)) func(w http.ResponseWriter, r *http.Request) {
+func (ms *MasterServer) proxyToLeader(f http.HandlerFunc) http.HandlerFunc {
return func(w http.ResponseWriter, r *http.Request) {
if ms.Topo.IsLeader() {
f(w, r)
@@ -133,8 +182,107 @@ func (ms *MasterServer) proxyToLeader(f func(w http.ResponseWriter, r *http.Requ
proxy.Transport = util.Transport
proxy.ServeHTTP(w, r)
} else {
- //drop it to the floor
- //writeJsonError(w, r, errors.New(ms.Topo.RaftServer.Name()+" does not know Leader yet:"+ms.Topo.RaftServer.Leader()))
+ // drop it to the floor
+ // writeJsonError(w, r, errors.New(ms.Topo.RaftServer.Name()+" does not know Leader yet:"+ms.Topo.RaftServer.Leader()))
+ }
+ }
+}
+
+func (ms *MasterServer) startAdminScripts() {
+ var err error
+
+ v := util.GetViper()
+ adminScripts := v.GetString("master.maintenance.scripts")
+ glog.V(0).Infof("adminScripts:\n%v", adminScripts)
+ if adminScripts == "" {
+ return
+ }
+
+ v.SetDefault("master.maintenance.sleep_minutes", 17)
+ sleepMinutes := v.GetInt("master.maintenance.sleep_minutes")
+
+ v.SetDefault("master.filer.default", "localhost:8888")
+ filerHostPort := v.GetString("master.filer.default")
+
+ scriptLines := strings.Split(adminScripts, "\n")
+ if !strings.Contains(adminScripts, "lock") {
+ scriptLines = append(append([]string{}, "lock"), scriptLines...)
+ scriptLines = append(scriptLines, "unlock")
+ }
+
+ masterAddress := "localhost:" + strconv.Itoa(ms.option.Port)
+
+ var shellOptions shell.ShellOptions
+ shellOptions.GrpcDialOption = security.LoadClientTLS(v, "grpc.master")
+ shellOptions.Masters = &masterAddress
+
+ shellOptions.FilerHost, shellOptions.FilerPort, err = util.ParseHostPort(filerHostPort)
+ shellOptions.Directory = "/"
+ if err != nil {
+ glog.V(0).Infof("failed to parse master.filer.default = %s : %v\n", filerHostPort, err)
+ return
+ }
+
+ commandEnv := shell.NewCommandEnv(shellOptions)
+
+ reg, _ := regexp.Compile(`'.*?'|".*?"|\S+`)
+
+ go commandEnv.MasterClient.KeepConnectedToMaster()
+
+ go func() {
+ commandEnv.MasterClient.WaitUntilConnected()
+
+ c := time.Tick(time.Duration(sleepMinutes) * time.Minute)
+ for range c {
+ if ms.Topo.IsLeader() {
+ for _, line := range scriptLines {
+ for _, c := range strings.Split(line, ";") {
+ processEachCmd(reg, c, commandEnv)
+ }
+ }
+ }
+ }
+ }()
+}
+
+func processEachCmd(reg *regexp.Regexp, line string, commandEnv *shell.CommandEnv) {
+ cmds := reg.FindAllString(line, -1)
+ if len(cmds) == 0 {
+ return
+ }
+ args := make([]string, len(cmds[1:]))
+ for i := range args {
+ args[i] = strings.Trim(string(cmds[1+i]), "\"'")
+ }
+ cmd := strings.ToLower(cmds[0])
+
+ for _, c := range shell.Commands {
+ if c.Name() == cmd {
+ glog.V(0).Infof("executing: %s %v", cmd, args)
+ if err := c.Do(args, commandEnv, os.Stdout); err != nil {
+ glog.V(0).Infof("error: %v", err)
+ }
+ }
+ }
+}
+
+func (ms *MasterServer) createSequencer(option *MasterOption) sequence.Sequencer {
+ var seq sequence.Sequencer
+ v := util.GetViper()
+ seqType := strings.ToLower(v.GetString(SequencerType))
+ glog.V(1).Infof("[%s] : [%s]", SequencerType, seqType)
+ switch strings.ToLower(seqType) {
+ case "etcd":
+ var err error
+ urls := v.GetString(SequencerEtcdUrls)
+ glog.V(0).Infof("[%s] : [%s]", SequencerEtcdUrls, urls)
+ seq, err = sequence.NewEtcdSequencer(urls, option.MetaFolder)
+ if err != nil {
+ glog.Error(err)
+ seq = nil
}
+ default:
+ seq = sequence.NewMemorySequencer()
}
+ return seq
}