diff options
Diffstat (limited to 'weed/master.go')
| -rw-r--r-- | weed/master.go | 217 |
1 files changed, 217 insertions, 0 deletions
diff --git a/weed/master.go b/weed/master.go new file mode 100644 index 000000000..a06c8a99a --- /dev/null +++ b/weed/master.go @@ -0,0 +1,217 @@ +package main + +import ( + "encoding/json" + "errors" + "log" + "net/http" + "weed/replication" + "weed/storage" + "weed/topology" + "runtime" + "strconv" + "strings" + "time" +) + +func init() { + cmdMaster.Run = runMaster // break init cycle + cmdMaster.IsDebug = cmdMaster.Flag.Bool("debug", false, "enable debug mode") +} + +var cmdMaster = &Command{ + UsageLine: "master -port=9333", + Short: "start a master server", + Long: `start a master server to provide volume=>location mapping service + and sequence number of file ids + + `, +} + +var ( + mport = cmdMaster.Flag.Int("port", 9333, "http listen port") + metaFolder = cmdMaster.Flag.String("mdir", "/tmp", "data directory to store mappings") + volumeSizeLimitMB = cmdMaster.Flag.Uint("volumeSizeLimitMB", 32*1024, "Default Volume Size in MegaBytes") + mpulse = cmdMaster.Flag.Int("pulseSeconds", 5, "number of seconds between heartbeats") + confFile = cmdMaster.Flag.String("conf", "/etc/weedfs/weedfs.conf", "xml configuration file") + defaultRepType = cmdMaster.Flag.String("defaultReplicationType", "000", "Default replication type if not specified.") + mReadTimeout = cmdMaster.Flag.Int("readTimeout", 3, "connection read timeout in seconds") + mMaxCpu = cmdMaster.Flag.Int("maxCpu", 0, "maximum number of CPUs. 0 means all available CPUs") + garbageThreshold = cmdMaster.Flag.String("garbageThreshold", "0.3", "threshold to vacuum and reclaim spaces") +) + +var topo *topology.Topology +var vg *replication.VolumeGrowth + +func dirLookupHandler(w http.ResponseWriter, r *http.Request) { + vid := r.FormValue("volumeId") + commaSep := strings.Index(vid, ",") + if commaSep > 0 { + vid = vid[0:commaSep] + } + volumeId, err := storage.NewVolumeId(vid) + if err == nil { + machines := topo.Lookup(volumeId) + if machines != nil { + ret := []map[string]string{} + for _, dn := range machines { + ret = append(ret, map[string]string{"url": dn.Url(), "publicUrl": dn.PublicUrl}) + } + writeJson(w, r, map[string]interface{}{"locations": ret}) + } else { + w.WriteHeader(http.StatusNotFound) + writeJson(w, r, map[string]string{"error": "volume id " + volumeId.String() + " not found. "}) + } + } else { + w.WriteHeader(http.StatusNotAcceptable) + writeJson(w, r, map[string]string{"error": "unknown volumeId format " + vid}) + } +} + +func dirAssignHandler(w http.ResponseWriter, r *http.Request) { + c, e := strconv.Atoi(r.FormValue("count")) + if e != nil { + c = 1 + } + repType := r.FormValue("replication") + if repType == "" { + repType = *defaultRepType + } + rt, err := storage.NewReplicationTypeFromString(repType) + if err != nil { + w.WriteHeader(http.StatusNotAcceptable) + writeJson(w, r, map[string]string{"error": err.Error()}) + return + } + if topo.GetVolumeLayout(rt).GetActiveVolumeCount() <= 0 { + if topo.FreeSpace() <= 0 { + w.WriteHeader(http.StatusNotFound) + writeJson(w, r, map[string]string{"error": "No free volumes left!"}) + return + } else { + vg.GrowByType(rt, topo) + } + } + fid, count, dn, err := topo.PickForWrite(rt, c) + if err == nil { + writeJson(w, r, map[string]interface{}{"fid": fid, "url": dn.Url(), "publicUrl": dn.PublicUrl, "count": count}) + } else { + w.WriteHeader(http.StatusNotAcceptable) + writeJson(w, r, map[string]string{"error": err.Error()}) + } +} + +func dirJoinHandler(w http.ResponseWriter, r *http.Request) { + init := r.FormValue("init") == "true" + ip := r.FormValue("ip") + if ip == "" { + ip = r.RemoteAddr[0:strings.Index(r.RemoteAddr, ":")] + } + port, _ := strconv.Atoi(r.FormValue("port")) + maxVolumeCount, _ := strconv.Atoi(r.FormValue("maxVolumeCount")) + s := r.RemoteAddr[0:strings.Index(r.RemoteAddr, ":")+1] + r.FormValue("port") + publicUrl := r.FormValue("publicUrl") + volumes := new([]storage.VolumeInfo) + json.Unmarshal([]byte(r.FormValue("volumes")), volumes) + debug(s, "volumes", r.FormValue("volumes")) + topo.RegisterVolumes(init, *volumes, ip, port, publicUrl, maxVolumeCount) + m := make(map[string]interface{}) + m["VolumeSizeLimit"] = uint64(*volumeSizeLimitMB) * 1024 * 1024 + writeJson(w, r, m) +} + +func dirStatusHandler(w http.ResponseWriter, r *http.Request) { + m := make(map[string]interface{}) + m["Version"] = VERSION + m["Topology"] = topo.ToMap() + writeJson(w, r, m) +} + +func volumeVacuumHandler(w http.ResponseWriter, r *http.Request) { + gcThreshold := r.FormValue("garbageThreshold") + if gcThreshold == "" { + gcThreshold = *garbageThreshold + } + debug("garbageThreshold =", gcThreshold) + topo.Vacuum(gcThreshold) + dirStatusHandler(w, r) +} + +func volumeGrowHandler(w http.ResponseWriter, r *http.Request) { + count := 0 + rt, err := storage.NewReplicationTypeFromString(r.FormValue("replication")) + if err == nil { + if count, err = strconv.Atoi(r.FormValue("count")); err == nil { + if topo.FreeSpace() < count*rt.GetCopyCount() { + err = errors.New("Only " + strconv.Itoa(topo.FreeSpace()) + " volumes left! Not enough for " + strconv.Itoa(count*rt.GetCopyCount())) + } else { + count, err = vg.GrowByCountAndType(count, rt, topo) + } + } else { + err = errors.New("parameter count is not found") + } + } + if err != nil { + w.WriteHeader(http.StatusNotAcceptable) + writeJson(w, r, map[string]string{"error": "parameter replication " + err.Error()}) + } else { + w.WriteHeader(http.StatusNotAcceptable) + writeJson(w, r, map[string]interface{}{"count": count}) + } +} + +func volumeStatusHandler(w http.ResponseWriter, r *http.Request) { + m := make(map[string]interface{}) + m["Version"] = VERSION + m["Volumes"] = topo.ToVolumeMap() + writeJson(w, r, m) +} + +func redirectHandler(w http.ResponseWriter, r *http.Request) { + vid, _, _ := parseURLPath(r.URL.Path) + volumeId, err := storage.NewVolumeId(vid) + if err != nil { + debug("parsing error:", err, r.URL.Path) + return + } + machines := topo.Lookup(volumeId) + if machines != nil && len(machines) > 0 { + http.Redirect(w, r, "http://"+machines[0].PublicUrl+r.URL.Path, http.StatusMovedPermanently) + } else { + w.WriteHeader(http.StatusNotFound) + writeJson(w, r, map[string]string{"error": "volume id " + volumeId.String() + " not found. "}) + } +} + +func runMaster(cmd *Command, args []string) bool { + if *mMaxCpu < 1 { + *mMaxCpu = runtime.NumCPU() + } + runtime.GOMAXPROCS(*mMaxCpu) + topo = topology.NewTopology("topo", *confFile, *metaFolder, "weed", uint64(*volumeSizeLimitMB)*1024*1024, *mpulse) + vg = replication.NewDefaultVolumeGrowth() + log.Println("Volume Size Limit is", *volumeSizeLimitMB, "MB") + http.HandleFunc("/dir/assign", dirAssignHandler) + http.HandleFunc("/dir/lookup", dirLookupHandler) + http.HandleFunc("/dir/join", dirJoinHandler) + http.HandleFunc("/dir/status", dirStatusHandler) + http.HandleFunc("/vol/grow", volumeGrowHandler) + http.HandleFunc("/vol/status", volumeStatusHandler) + http.HandleFunc("/vol/vacuum", volumeVacuumHandler) + + http.HandleFunc("/", redirectHandler) + + topo.StartRefreshWritableVolumes(*garbageThreshold) + + log.Println("Start Weed Master", VERSION, "at port", strconv.Itoa(*mport)) + srv := &http.Server{ + Addr: ":" + strconv.Itoa(*mport), + Handler: http.DefaultServeMux, + ReadTimeout: time.Duration(*mReadTimeout) * time.Second, + } + e := srv.ListenAndServe() + if e != nil { + log.Fatalf("Fail to start:%s", e.Error()) + } + return true +} |
