aboutsummaryrefslogtreecommitdiff
path: root/go/cmd/master.go
diff options
context:
space:
mode:
authorChris Lu <chris.lu@gmail.com>2013-02-10 03:49:51 -0800
committerChris Lu <chris.lu@gmail.com>2013-02-10 03:49:51 -0800
commit5071f528f649f3f99336c7d491ceef4859e34744 (patch)
tree0c4bc8a286597cd79e22b1ce02cd9cd3b1c44602 /go/cmd/master.go
parent55f2627fcf965c3765ad9b63878e9a22a59f4b55 (diff)
downloadseaweedfs-5071f528f649f3f99336c7d491ceef4859e34744.tar.xz
seaweedfs-5071f528f649f3f99336c7d491ceef4859e34744.zip
testing compilation with remove package
Diffstat (limited to 'go/cmd/master.go')
-rw-r--r--go/cmd/master.go217
1 files changed, 217 insertions, 0 deletions
diff --git a/go/cmd/master.go b/go/cmd/master.go
new file mode 100644
index 000000000..3d8757c16
--- /dev/null
+++ b/go/cmd/master.go
@@ -0,0 +1,217 @@
+package main
+
+import (
+ "encoding/json"
+ "errors"
+ "log"
+ "net/http"
+ "code.google.com/p/weed-fs/go/replication"
+ "code.google.com/p/weed-fs/go/storage"
+ "code.google.com/p/weed-fs/go/topology"
+ "runtime"
+ "strconv"
+ "strings"
+ "time"
+)
+
+func init() {
+ cmdMaster.Run = runMaster // break init cycle
+ cmdMaster.IsDebug = cmdMaster.Flag.Bool("debug", false, "enable debug mode")
+}
+
+var cmdMaster = &Command{
+ UsageLine: "master -port=9333",
+ Short: "start a master server",
+ Long: `start a master server to provide volume=>location mapping service
+ and sequence number of file ids
+
+ `,
+}
+
+var (
+ mport = cmdMaster.Flag.Int("port", 9333, "http listen port")
+ metaFolder = cmdMaster.Flag.String("mdir", "/tmp", "data directory to store mappings")
+ volumeSizeLimitMB = cmdMaster.Flag.Uint("volumeSizeLimitMB", 32*1024, "Default Volume Size in MegaBytes")
+ mpulse = cmdMaster.Flag.Int("pulseSeconds", 5, "number of seconds between heartbeats")
+ confFile = cmdMaster.Flag.String("conf", "/etc/weedfs/weedfs.conf", "xml configuration file")
+ defaultRepType = cmdMaster.Flag.String("defaultReplicationType", "000", "Default replication type if not specified.")
+ mReadTimeout = cmdMaster.Flag.Int("readTimeout", 3, "connection read timeout in seconds")
+ mMaxCpu = cmdMaster.Flag.Int("maxCpu", 0, "maximum number of CPUs. 0 means all available CPUs")
+ garbageThreshold = cmdMaster.Flag.String("garbageThreshold", "0.3", "threshold to vacuum and reclaim spaces")
+)
+
+var topo *topology.Topology
+var vg *replication.VolumeGrowth
+
+func dirLookupHandler(w http.ResponseWriter, r *http.Request) {
+ vid := r.FormValue("volumeId")
+ commaSep := strings.Index(vid, ",")
+ if commaSep > 0 {
+ vid = vid[0:commaSep]
+ }
+ volumeId, err := storage.NewVolumeId(vid)
+ if err == nil {
+ machines := topo.Lookup(volumeId)
+ if machines != nil {
+ ret := []map[string]string{}
+ for _, dn := range machines {
+ ret = append(ret, map[string]string{"url": dn.Url(), "publicUrl": dn.PublicUrl})
+ }
+ writeJson(w, r, map[string]interface{}{"locations": ret})
+ } else {
+ w.WriteHeader(http.StatusNotFound)
+ writeJson(w, r, map[string]string{"error": "volume id " + volumeId.String() + " not found. "})
+ }
+ } else {
+ w.WriteHeader(http.StatusNotAcceptable)
+ writeJson(w, r, map[string]string{"error": "unknown volumeId format " + vid})
+ }
+}
+
+func dirAssignHandler(w http.ResponseWriter, r *http.Request) {
+ c, e := strconv.Atoi(r.FormValue("count"))
+ if e != nil {
+ c = 1
+ }
+ repType := r.FormValue("replication")
+ if repType == "" {
+ repType = *defaultRepType
+ }
+ rt, err := storage.NewReplicationTypeFromString(repType)
+ if err != nil {
+ w.WriteHeader(http.StatusNotAcceptable)
+ writeJson(w, r, map[string]string{"error": err.Error()})
+ return
+ }
+ if topo.GetVolumeLayout(rt).GetActiveVolumeCount() <= 0 {
+ if topo.FreeSpace() <= 0 {
+ w.WriteHeader(http.StatusNotFound)
+ writeJson(w, r, map[string]string{"error": "No free volumes left!"})
+ return
+ } else {
+ vg.GrowByType(rt, topo)
+ }
+ }
+ fid, count, dn, err := topo.PickForWrite(rt, c)
+ if err == nil {
+ writeJson(w, r, map[string]interface{}{"fid": fid, "url": dn.Url(), "publicUrl": dn.PublicUrl, "count": count})
+ } else {
+ w.WriteHeader(http.StatusNotAcceptable)
+ writeJson(w, r, map[string]string{"error": err.Error()})
+ }
+}
+
+func dirJoinHandler(w http.ResponseWriter, r *http.Request) {
+ init := r.FormValue("init") == "true"
+ ip := r.FormValue("ip")
+ if ip == "" {
+ ip = r.RemoteAddr[0:strings.Index(r.RemoteAddr, ":")]
+ }
+ port, _ := strconv.Atoi(r.FormValue("port"))
+ maxVolumeCount, _ := strconv.Atoi(r.FormValue("maxVolumeCount"))
+ s := r.RemoteAddr[0:strings.Index(r.RemoteAddr, ":")+1] + r.FormValue("port")
+ publicUrl := r.FormValue("publicUrl")
+ volumes := new([]storage.VolumeInfo)
+ json.Unmarshal([]byte(r.FormValue("volumes")), volumes)
+ debug(s, "volumes", r.FormValue("volumes"))
+ topo.RegisterVolumes(init, *volumes, ip, port, publicUrl, maxVolumeCount)
+ m := make(map[string]interface{})
+ m["VolumeSizeLimit"] = uint64(*volumeSizeLimitMB) * 1024 * 1024
+ writeJson(w, r, m)
+}
+
+func dirStatusHandler(w http.ResponseWriter, r *http.Request) {
+ m := make(map[string]interface{})
+ m["Version"] = VERSION
+ m["Topology"] = topo.ToMap()
+ writeJson(w, r, m)
+}
+
+func volumeVacuumHandler(w http.ResponseWriter, r *http.Request) {
+ gcThreshold := r.FormValue("garbageThreshold")
+ if gcThreshold == "" {
+ gcThreshold = *garbageThreshold
+ }
+ debug("garbageThreshold =", gcThreshold)
+ topo.Vacuum(gcThreshold)
+ dirStatusHandler(w, r)
+}
+
+func volumeGrowHandler(w http.ResponseWriter, r *http.Request) {
+ count := 0
+ rt, err := storage.NewReplicationTypeFromString(r.FormValue("replication"))
+ if err == nil {
+ if count, err = strconv.Atoi(r.FormValue("count")); err == nil {
+ if topo.FreeSpace() < count*rt.GetCopyCount() {
+ err = errors.New("Only " + strconv.Itoa(topo.FreeSpace()) + " volumes left! Not enough for " + strconv.Itoa(count*rt.GetCopyCount()))
+ } else {
+ count, err = vg.GrowByCountAndType(count, rt, topo)
+ }
+ } else {
+ err = errors.New("parameter count is not found")
+ }
+ }
+ if err != nil {
+ w.WriteHeader(http.StatusNotAcceptable)
+ writeJson(w, r, map[string]string{"error": "parameter replication " + err.Error()})
+ } else {
+ w.WriteHeader(http.StatusNotAcceptable)
+ writeJson(w, r, map[string]interface{}{"count": count})
+ }
+}
+
+func volumeStatusHandler(w http.ResponseWriter, r *http.Request) {
+ m := make(map[string]interface{})
+ m["Version"] = VERSION
+ m["Volumes"] = topo.ToVolumeMap()
+ writeJson(w, r, m)
+}
+
+func redirectHandler(w http.ResponseWriter, r *http.Request) {
+ vid, _, _ := parseURLPath(r.URL.Path)
+ volumeId, err := storage.NewVolumeId(vid)
+ if err != nil {
+ debug("parsing error:", err, r.URL.Path)
+ return
+ }
+ machines := topo.Lookup(volumeId)
+ if machines != nil && len(machines) > 0 {
+ http.Redirect(w, r, "http://"+machines[0].PublicUrl+r.URL.Path, http.StatusMovedPermanently)
+ } else {
+ w.WriteHeader(http.StatusNotFound)
+ writeJson(w, r, map[string]string{"error": "volume id " + volumeId.String() + " not found. "})
+ }
+}
+
+func runMaster(cmd *Command, args []string) bool {
+ if *mMaxCpu < 1 {
+ *mMaxCpu = runtime.NumCPU()
+ }
+ runtime.GOMAXPROCS(*mMaxCpu)
+ topo = topology.NewTopology("topo", *confFile, *metaFolder, "weed", uint64(*volumeSizeLimitMB)*1024*1024, *mpulse)
+ vg = replication.NewDefaultVolumeGrowth()
+ log.Println("Volume Size Limit is", *volumeSizeLimitMB, "MB")
+ http.HandleFunc("/dir/assign", dirAssignHandler)
+ http.HandleFunc("/dir/lookup", dirLookupHandler)
+ http.HandleFunc("/dir/join", dirJoinHandler)
+ http.HandleFunc("/dir/status", dirStatusHandler)
+ http.HandleFunc("/vol/grow", volumeGrowHandler)
+ http.HandleFunc("/vol/status", volumeStatusHandler)
+ http.HandleFunc("/vol/vacuum", volumeVacuumHandler)
+
+ http.HandleFunc("/", redirectHandler)
+
+ topo.StartRefreshWritableVolumes(*garbageThreshold)
+
+ log.Println("Start Weed Master", VERSION, "at port", strconv.Itoa(*mport))
+ srv := &http.Server{
+ Addr: ":" + strconv.Itoa(*mport),
+ Handler: http.DefaultServeMux,
+ ReadTimeout: time.Duration(*mReadTimeout) * time.Second,
+ }
+ e := srv.ListenAndServe()
+ if e != nil {
+ log.Fatalf("Fail to start:%s", e.Error())
+ }
+ return true
+}