aboutsummaryrefslogtreecommitdiff
path: root/weed/command/master.go
blob: 15d1171e09a4048e7de53717a074efad16a0aea0 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
package command

import (
	"github.com/chrislusf/raft/protobuf"
	"github.com/chrislusf/seaweedfs/weed/glog"
	"github.com/chrislusf/seaweedfs/weed/pb/master_pb"
	"github.com/chrislusf/seaweedfs/weed/security"
	"github.com/chrislusf/seaweedfs/weed/server"
	"github.com/chrislusf/seaweedfs/weed/util"
	"github.com/gorilla/mux"
	"github.com/spf13/viper"
	"google.golang.org/grpc/reflection"
	"net/http"
	"os"
	"runtime"
	"strconv"
	"strings"
)

func init() {
	cmdMaster.Run = runMaster // break init cycle
}

var cmdMaster = &Command{
	UsageLine: "master -port=9333",
	Short:     "start a master server",
	Long: `start a master server to provide volume=>location mapping service and sequence number of file ids

	The configuration file "security.toml" is read from ".", "$HOME/.seaweedfs/", or "/etc/seaweedfs/", in that order.

	The example security.toml configuration file can be generated by "weed scaffold -config=security"

  `,
}

var (
	mport                   = cmdMaster.Flag.Int("port", 9333, "http listen port")
	masterIp                = cmdMaster.Flag.String("ip", "localhost", "master <ip>|<server> address")
	masterBindIp            = cmdMaster.Flag.String("ip.bind", "0.0.0.0", "ip address to bind to")
	metaFolder              = cmdMaster.Flag.String("mdir", os.TempDir(), "data directory to store meta data")
	masterPeers             = cmdMaster.Flag.String("peers", "", "all master nodes in comma separated ip:port list, example: 127.0.0.1:9093,127.0.0.1:9094")
	volumeSizeLimitMB       = cmdMaster.Flag.Uint("volumeSizeLimitMB", 30*1000, "Master stops directing writes to oversized volumes.")
	volumePreallocate       = cmdMaster.Flag.Bool("volumePreallocate", false, "Preallocate disk space for volumes.")
	mpulse                  = cmdMaster.Flag.Int("pulseSeconds", 5, "number of seconds between heartbeats")
	defaultReplicaPlacement = cmdMaster.Flag.String("defaultReplication", "000", "Default replication type if not specified.")
	// mTimeout                = cmdMaster.Flag.Int("idleTimeout", 30, "connection idle seconds")
	mMaxCpu               = cmdMaster.Flag.Int("maxCpu", 0, "maximum number of CPUs. 0 means all available CPUs")
	garbageThreshold      = cmdMaster.Flag.Float64("garbageThreshold", 0.3, "threshold to vacuum and reclaim spaces")
	masterWhiteListOption = cmdMaster.Flag.String("whiteList", "", "comma separated Ip addresses having write permission. No limit if empty.")
	disableHttp           = cmdMaster.Flag.Bool("disableHttp", false, "disable http requests, only gRPC operations are allowed.")
	masterCpuProfile      = cmdMaster.Flag.String("cpuprofile", "", "cpu profile output file")
	masterMemProfile      = cmdMaster.Flag.String("memprofile", "", "memory profile output file")

	masterWhiteList []string
)

func runMaster(cmd *Command, args []string) bool {

	weed_server.LoadConfiguration("security", false)

	if *mMaxCpu < 1 {
		*mMaxCpu = runtime.NumCPU()
	}
	runtime.GOMAXPROCS(*mMaxCpu)
	util.SetupProfiling(*masterCpuProfile, *masterMemProfile)

	if err := util.TestFolderWritable(*metaFolder); err != nil {
		glog.Fatalf("Check Meta Folder (-mdir) Writable %s : %s", *metaFolder, err)
	}
	if *masterWhiteListOption != "" {
		masterWhiteList = strings.Split(*masterWhiteListOption, ",")
	}
	if *volumeSizeLimitMB > 30*1000 {
		glog.Fatalf("volumeSizeLimitMB should be smaller than 30000")
	}

	r := mux.NewRouter()
	ms := weed_server.NewMasterServer(r, *mport, *metaFolder,
		*volumeSizeLimitMB, *volumePreallocate,
		*mpulse, *defaultReplicaPlacement, *garbageThreshold,
		masterWhiteList,
		*disableHttp,
	)

	listeningAddress := *masterBindIp + ":" + strconv.Itoa(*mport)

	glog.V(0).Infoln("Start Seaweed Master", util.VERSION, "at", listeningAddress)

	masterListener, e := util.NewListener(listeningAddress, 0)
	if e != nil {
		glog.Fatalf("Master startup error: %v", e)
	}

	go func() {
		// start raftServer
		myMasterAddress, peers := checkPeers(*masterIp, *mport, *masterPeers)
		raftServer := weed_server.NewRaftServer(security.LoadClientTLS(viper.Sub("grpc"), "master"),
			peers, myMasterAddress, *metaFolder, ms.Topo, *mpulse)
		ms.SetRaftServer(raftServer)
		r.HandleFunc("/cluster/status", raftServer.StatusHandler).Methods("GET")

		// starting grpc server
		grpcPort := *mport + 10000
		grpcL, err := util.NewListener(*masterBindIp+":"+strconv.Itoa(grpcPort), 0)
		if err != nil {
			glog.Fatalf("master failed to listen on grpc port %d: %v", grpcPort, err)
		}
		// Create your protocol servers.
		grpcS := util.NewGrpcServer(security.LoadServerTLS(viper.Sub("grpc"), "master"))
		master_pb.RegisterSeaweedServer(grpcS, ms)
		protobuf.RegisterRaftServer(grpcS, raftServer)
		reflection.Register(grpcS)

		glog.V(0).Infof("Start Seaweed Master %s grpc server at %s:%d", util.VERSION, *masterBindIp, grpcPort)
		grpcS.Serve(grpcL)
	}()

	// start http server
	httpS := &http.Server{Handler: r}
	if err := httpS.Serve(masterListener); err != nil {
		glog.Fatalf("master server failed to serve: %v", err)
	}

	return true
}

func checkPeers(masterIp string, masterPort int, peers string) (masterAddress string, cleanedPeers []string) {
	masterAddress = masterIp + ":" + strconv.Itoa(masterPort)
	if peers != "" {
		cleanedPeers = strings.Split(peers, ",")
	}

	hasSelf := false
	for _, peer := range cleanedPeers {
		if peer == masterAddress {
			hasSelf = true
			break
		}
	}

	peerCount := len(cleanedPeers)
	if !hasSelf {
		peerCount += 1
	}
	if peerCount%2 == 0 {
		glog.Fatalf("Only odd number of masters are supported!")
	}
	return
}