aboutsummaryrefslogtreecommitdiff
path: root/weed/server/master_server.go
diff options
context:
space:
mode:
Diffstat (limited to 'weed/server/master_server.go')
-rw-r--r--weed/server/master_server.go79
1 files changed, 62 insertions, 17 deletions
diff --git a/weed/server/master_server.go b/weed/server/master_server.go
index 3689b5495..33a5129da 100644
--- a/weed/server/master_server.go
+++ b/weed/server/master_server.go
@@ -1,9 +1,8 @@
package weed_server
import (
+ "context"
"fmt"
- "github.com/chrislusf/seaweedfs/weed/shell"
- "google.golang.org/grpc"
"net/http"
"net/http/httputil"
"net/url"
@@ -19,10 +18,18 @@ import (
"github.com/chrislusf/seaweedfs/weed/pb/master_pb"
"github.com/chrislusf/seaweedfs/weed/security"
"github.com/chrislusf/seaweedfs/weed/sequence"
+ "github.com/chrislusf/seaweedfs/weed/shell"
"github.com/chrislusf/seaweedfs/weed/topology"
"github.com/chrislusf/seaweedfs/weed/util"
+ "github.com/chrislusf/seaweedfs/weed/wdclient"
"github.com/gorilla/mux"
"github.com/spf13/viper"
+ "google.golang.org/grpc"
+)
+
+const (
+ SequencerType = "master.sequencer.type"
+ SequencerEtcdUrls = "master.sequencer.sequencer_etcd_urls"
)
type MasterOption struct {
@@ -55,10 +62,12 @@ type MasterServer struct {
clientChansLock sync.RWMutex
clientChans map[string]chan *master_pb.VolumeLocation
- grpcDialOpiton grpc.DialOption
+ grpcDialOption grpc.DialOption
+
+ MasterClient *wdclient.MasterClient
}
-func NewMasterServer(r *mux.Router, option *MasterOption) *MasterServer {
+func NewMasterServer(r *mux.Router, option *MasterOption, peers []string) *MasterServer {
v := viper.GetViper()
signingKey := v.GetString("jwt.signing.key")
@@ -73,14 +82,21 @@ func NewMasterServer(r *mux.Router, option *MasterOption) *MasterServer {
if option.VolumePreallocate {
preallocateSize = int64(option.VolumeSizeLimitMB) * (1 << 20)
}
+
+ grpcDialOption := security.LoadClientTLS(v.Sub("grpc"), "master")
ms := &MasterServer{
option: option,
preallocateSize: preallocateSize,
clientChans: make(map[string]chan *master_pb.VolumeLocation),
- grpcDialOpiton: security.LoadClientTLS(v.Sub("grpc"), "master"),
+ grpcDialOption: grpcDialOption,
+ MasterClient: wdclient.NewMasterClient(context.Background(), grpcDialOption, "master", peers),
}
ms.bounedLeaderChan = make(chan int, 16)
- seq := sequence.NewMemorySequencer()
+
+ seq := ms.createSequencer(option)
+ if nil == seq {
+ glog.Fatalf("create sequencer failed.")
+ }
ms.Topo = topology.NewTopology("topo", seq, uint64(ms.option.VolumeSizeLimitMB)*1024*1024, ms.option.PulseSeconds)
ms.vg = topology.NewDefaultVolumeGrowth()
glog.V(0).Infoln("Volume Size Limit is", ms.option.VolumeSizeLimitMB, "MB")
@@ -92,7 +108,7 @@ func NewMasterServer(r *mux.Router, option *MasterOption) *MasterServer {
r.HandleFunc("/", ms.proxyToLeader(ms.uiStatusHandler))
r.HandleFunc("/ui/index.html", ms.uiStatusHandler)
r.HandleFunc("/dir/assign", ms.proxyToLeader(ms.guard.WhiteList(ms.dirAssignHandler)))
- r.HandleFunc("/dir/lookup", ms.proxyToLeader(ms.guard.WhiteList(ms.dirLookupHandler)))
+ r.HandleFunc("/dir/lookup", ms.guard.WhiteList(ms.dirLookupHandler))
r.HandleFunc("/dir/status", ms.proxyToLeader(ms.guard.WhiteList(ms.dirStatusHandler)))
r.HandleFunc("/col/delete", ms.proxyToLeader(ms.guard.WhiteList(ms.collectionDeleteHandler)))
r.HandleFunc("/vol/grow", ms.proxyToLeader(ms.guard.WhiteList(ms.volumeGrowHandler)))
@@ -102,10 +118,10 @@ func NewMasterServer(r *mux.Router, option *MasterOption) *MasterServer {
r.HandleFunc("/stats/health", ms.guard.WhiteList(statsHealthHandler))
r.HandleFunc("/stats/counter", ms.guard.WhiteList(statsCounterHandler))
r.HandleFunc("/stats/memory", ms.guard.WhiteList(statsMemoryHandler))
- r.HandleFunc("/{fileId}", ms.proxyToLeader(ms.redirectHandler))
+ r.HandleFunc("/{fileId}", ms.redirectHandler)
}
- ms.Topo.StartRefreshWritableVolumes(ms.grpcDialOpiton, ms.option.GarbageThreshold, ms.preallocateSize)
+ ms.Topo.StartRefreshWritableVolumes(ms.grpcDialOption, ms.option.GarbageThreshold, ms.preallocateSize)
ms.startAdminScripts()
@@ -158,23 +174,28 @@ func (ms *MasterServer) proxyToLeader(f func(w http.ResponseWriter, r *http.Requ
proxy.Transport = util.Transport
proxy.ServeHTTP(w, r)
} else {
- //drop it to the floor
- //writeJsonError(w, r, errors.New(ms.Topo.RaftServer.Name()+" does not know Leader yet:"+ms.Topo.RaftServer.Leader()))
+ // drop it to the floor
+ // writeJsonError(w, r, errors.New(ms.Topo.RaftServer.Name()+" does not know Leader yet:"+ms.Topo.RaftServer.Leader()))
}
}
}
func (ms *MasterServer) startAdminScripts() {
+ var err error
+
v := viper.GetViper()
adminScripts := v.GetString("master.maintenance.scripts")
- v.SetDefault("master.maintenance.sleep_minutes", 17)
- sleepMinutes := v.GetInt("master.maintenance.sleep_minutes")
-
glog.V(0).Infof("adminScripts:\n%v", adminScripts)
if adminScripts == "" {
return
}
+ v.SetDefault("master.maintenance.sleep_minutes", 17)
+ sleepMinutes := v.GetInt("master.maintenance.sleep_minutes")
+
+ v.SetDefault("master.filer.default_filer_url", "http://localhost:8888/")
+ filerURL := v.GetString("master.filer.default_filer_url")
+
scriptLines := strings.Split(adminScripts, "\n")
masterAddress := "localhost:" + strconv.Itoa(ms.option.Port)
@@ -182,9 +203,12 @@ func (ms *MasterServer) startAdminScripts() {
var shellOptions shell.ShellOptions
shellOptions.GrpcDialOption = security.LoadClientTLS(viper.Sub("grpc"), "master")
shellOptions.Masters = &masterAddress
- shellOptions.FilerHost = "localhost"
- shellOptions.FilerPort = 8888
- shellOptions.Directory = "/"
+
+ shellOptions.FilerHost, shellOptions.FilerPort, shellOptions.Directory, err = util.ParseFilerUrl(filerURL)
+ if err != nil {
+ glog.V(0).Infof("failed to parse master.filer.default_filer_urll=%s : %v\n", filerURL, err)
+ return
+ }
commandEnv := shell.NewCommandEnv(shellOptions)
@@ -223,3 +247,24 @@ func (ms *MasterServer) startAdminScripts() {
}
}()
}
+
+func (ms *MasterServer) createSequencer(option *MasterOption) sequence.Sequencer {
+ var seq sequence.Sequencer
+ v := viper.GetViper()
+ seqType := strings.ToLower(v.GetString(SequencerType))
+ glog.V(1).Infof("[%s] : [%s]", SequencerType, seqType)
+ switch strings.ToLower(seqType) {
+ case "etcd":
+ var err error
+ urls := v.GetString(SequencerEtcdUrls)
+ glog.V(0).Infof("[%s] : [%s]", SequencerEtcdUrls, urls)
+ seq, err = sequence.NewEtcdSequencer(urls, option.MetaFolder)
+ if err != nil {
+ glog.Error(err)
+ seq = nil
+ }
+ default:
+ seq = sequence.NewMemorySequencer()
+ }
+ return seq
+}