diff options
Diffstat (limited to 'weed/master.go')
| -rw-r--r-- | weed/master.go | 217 |
1 files changed, 0 insertions, 217 deletions
diff --git a/weed/master.go b/weed/master.go deleted file mode 100644 index 78db929f2..000000000 --- a/weed/master.go +++ /dev/null @@ -1,217 +0,0 @@ -package main - -import ( - "encoding/json" - "errors" - "log" - "net/http" - "code.google.com/p/weed-fs/weed/replication" - "code.google.com/p/weed-fs/weed/storage" - "code.google.com/p/weed-fs/weed/topology" - "runtime" - "strconv" - "strings" - "time" -) - -func init() { - cmdMaster.Run = runMaster // break init cycle - cmdMaster.IsDebug = cmdMaster.Flag.Bool("debug", false, "enable debug mode") -} - -var cmdMaster = &Command{ - UsageLine: "master -port=9333", - Short: "start a master server", - Long: `start a master server to provide volume=>location mapping service - and sequence number of file ids - - `, -} - -var ( - mport = cmdMaster.Flag.Int("port", 9333, "http listen port") - metaFolder = cmdMaster.Flag.String("mdir", "/tmp", "data directory to store mappings") - volumeSizeLimitMB = cmdMaster.Flag.Uint("volumeSizeLimitMB", 32*1024, "Default Volume Size in MegaBytes") - mpulse = cmdMaster.Flag.Int("pulseSeconds", 5, "number of seconds between heartbeats") - confFile = cmdMaster.Flag.String("conf", "/etc/weedfs/weedfs.conf", "xml configuration file") - defaultRepType = cmdMaster.Flag.String("defaultReplicationType", "000", "Default replication type if not specified.") - mReadTimeout = cmdMaster.Flag.Int("readTimeout", 3, "connection read timeout in seconds") - mMaxCpu = cmdMaster.Flag.Int("maxCpu", 0, "maximum number of CPUs. 0 means all available CPUs") - garbageThreshold = cmdMaster.Flag.String("garbageThreshold", "0.3", "threshold to vacuum and reclaim spaces") -) - -var topo *topology.Topology -var vg *replication.VolumeGrowth - -func dirLookupHandler(w http.ResponseWriter, r *http.Request) { - vid := r.FormValue("volumeId") - commaSep := strings.Index(vid, ",") - if commaSep > 0 { - vid = vid[0:commaSep] - } - volumeId, err := storage.NewVolumeId(vid) - if err == nil { - machines := topo.Lookup(volumeId) - if machines != nil { - ret := []map[string]string{} - for _, dn := range machines { - ret = append(ret, map[string]string{"url": dn.Url(), "publicUrl": dn.PublicUrl}) - } - writeJson(w, r, map[string]interface{}{"locations": ret}) - } else { - w.WriteHeader(http.StatusNotFound) - writeJson(w, r, map[string]string{"error": "volume id " + volumeId.String() + " not found. "}) - } - } else { - w.WriteHeader(http.StatusNotAcceptable) - writeJson(w, r, map[string]string{"error": "unknown volumeId format " + vid}) - } -} - -func dirAssignHandler(w http.ResponseWriter, r *http.Request) { - c, e := strconv.Atoi(r.FormValue("count")) - if e != nil { - c = 1 - } - repType := r.FormValue("replication") - if repType == "" { - repType = *defaultRepType - } - rt, err := storage.NewReplicationTypeFromString(repType) - if err != nil { - w.WriteHeader(http.StatusNotAcceptable) - writeJson(w, r, map[string]string{"error": err.Error()}) - return - } - if topo.GetVolumeLayout(rt).GetActiveVolumeCount() <= 0 { - if topo.FreeSpace() <= 0 { - w.WriteHeader(http.StatusNotFound) - writeJson(w, r, map[string]string{"error": "No free volumes left!"}) - return - } else { - vg.GrowByType(rt, topo) - } - } - fid, count, dn, err := topo.PickForWrite(rt, c) - if err == nil { - writeJson(w, r, map[string]interface{}{"fid": fid, "url": dn.Url(), "publicUrl": dn.PublicUrl, "count": count}) - } else { - w.WriteHeader(http.StatusNotAcceptable) - writeJson(w, r, map[string]string{"error": err.Error()}) - } -} - -func dirJoinHandler(w http.ResponseWriter, r *http.Request) { - init := r.FormValue("init") == "true" - ip := r.FormValue("ip") - if ip == "" { - ip = r.RemoteAddr[0:strings.Index(r.RemoteAddr, ":")] - } - port, _ := strconv.Atoi(r.FormValue("port")) - maxVolumeCount, _ := strconv.Atoi(r.FormValue("maxVolumeCount")) - s := r.RemoteAddr[0:strings.Index(r.RemoteAddr, ":")+1] + r.FormValue("port") - publicUrl := r.FormValue("publicUrl") - volumes := new([]storage.VolumeInfo) - json.Unmarshal([]byte(r.FormValue("volumes")), volumes) - debug(s, "volumes", r.FormValue("volumes")) - topo.RegisterVolumes(init, *volumes, ip, port, publicUrl, maxVolumeCount) - m := make(map[string]interface{}) - m["VolumeSizeLimit"] = uint64(*volumeSizeLimitMB) * 1024 * 1024 - writeJson(w, r, m) -} - -func dirStatusHandler(w http.ResponseWriter, r *http.Request) { - m := make(map[string]interface{}) - m["Version"] = VERSION - m["Topology"] = topo.ToMap() - writeJson(w, r, m) -} - -func volumeVacuumHandler(w http.ResponseWriter, r *http.Request) { - gcThreshold := r.FormValue("garbageThreshold") - if gcThreshold == "" { - gcThreshold = *garbageThreshold - } - debug("garbageThreshold =", gcThreshold) - topo.Vacuum(gcThreshold) - dirStatusHandler(w, r) -} - -func volumeGrowHandler(w http.ResponseWriter, r *http.Request) { - count := 0 - rt, err := storage.NewReplicationTypeFromString(r.FormValue("replication")) - if err == nil { - if count, err = strconv.Atoi(r.FormValue("count")); err == nil { - if topo.FreeSpace() < count*rt.GetCopyCount() { - err = errors.New("Only " + strconv.Itoa(topo.FreeSpace()) + " volumes left! Not enough for " + strconv.Itoa(count*rt.GetCopyCount())) - } else { - count, err = vg.GrowByCountAndType(count, rt, topo) - } - } else { - err = errors.New("parameter count is not found") - } - } - if err != nil { - w.WriteHeader(http.StatusNotAcceptable) - writeJson(w, r, map[string]string{"error": "parameter replication " + err.Error()}) - } else { - w.WriteHeader(http.StatusNotAcceptable) - writeJson(w, r, map[string]interface{}{"count": count}) - } -} - -func volumeStatusHandler(w http.ResponseWriter, r *http.Request) { - m := make(map[string]interface{}) - m["Version"] = VERSION - m["Volumes"] = topo.ToVolumeMap() - writeJson(w, r, m) -} - -func redirectHandler(w http.ResponseWriter, r *http.Request) { - vid, _, _ := parseURLPath(r.URL.Path) - volumeId, err := storage.NewVolumeId(vid) - if err != nil { - debug("parsing error:", err, r.URL.Path) - return - } - machines := topo.Lookup(volumeId) - if machines != nil && len(machines) > 0 { - http.Redirect(w, r, "http://"+machines[0].PublicUrl+r.URL.Path, http.StatusMovedPermanently) - } else { - w.WriteHeader(http.StatusNotFound) - writeJson(w, r, map[string]string{"error": "volume id " + volumeId.String() + " not found. "}) - } -} - -func runMaster(cmd *Command, args []string) bool { - if *mMaxCpu < 1 { - *mMaxCpu = runtime.NumCPU() - } - runtime.GOMAXPROCS(*mMaxCpu) - topo = topology.NewTopology("topo", *confFile, *metaFolder, "weed", uint64(*volumeSizeLimitMB)*1024*1024, *mpulse) - vg = replication.NewDefaultVolumeGrowth() - log.Println("Volume Size Limit is", *volumeSizeLimitMB, "MB") - http.HandleFunc("/dir/assign", dirAssignHandler) - http.HandleFunc("/dir/lookup", dirLookupHandler) - http.HandleFunc("/dir/join", dirJoinHandler) - http.HandleFunc("/dir/status", dirStatusHandler) - http.HandleFunc("/vol/grow", volumeGrowHandler) - http.HandleFunc("/vol/status", volumeStatusHandler) - http.HandleFunc("/vol/vacuum", volumeVacuumHandler) - - http.HandleFunc("/", redirectHandler) - - topo.StartRefreshWritableVolumes(*garbageThreshold) - - log.Println("Start Weed Master", VERSION, "at port", strconv.Itoa(*mport)) - srv := &http.Server{ - Addr: ":" + strconv.Itoa(*mport), - Handler: http.DefaultServeMux, - ReadTimeout: time.Duration(*mReadTimeout) * time.Second, - } - e := srv.ListenAndServe() - if e != nil { - log.Fatalf("Fail to start:%s", e.Error()) - } - return true -} |
