aboutsummaryrefslogtreecommitdiff
path: root/go/weed
diff options
context:
space:
mode:
Diffstat (limited to 'go/weed')
-rw-r--r--go/weed/master.go12
-rw-r--r--go/weed/volume.go17
2 files changed, 23 insertions, 6 deletions
diff --git a/go/weed/master.go b/go/weed/master.go
index f6cc88df0..7da7831bf 100644
--- a/go/weed/master.go
+++ b/go/weed/master.go
@@ -77,25 +77,27 @@ func dirAssignHandler(w http.ResponseWriter, r *http.Request) {
if repType == "" {
repType = *defaultRepType
}
+ dataCenter := r.FormValue("dataCenter")
rt, err := storage.NewReplicationTypeFromString(repType)
if err != nil {
w.WriteHeader(http.StatusNotAcceptable)
writeJsonQuiet(w, r, map[string]string{"error": err.Error()})
return
}
- if topo.GetVolumeLayout(rt).GetActiveVolumeCount() <= 0 {
+
+ if topo.GetVolumeLayout(rt).GetActiveVolumeCount(dataCenter) <= 0 {
if topo.FreeSpace() <= 0 {
w.WriteHeader(http.StatusNotFound)
writeJsonQuiet(w, r, map[string]string{"error": "No free volumes left!"})
return
} else {
- if _, err = vg.GrowByType(rt, topo); err != nil {
+ if _, err = vg.GrowByType(rt, dataCenter, topo); err != nil {
writeJsonQuiet(w, r, map[string]string{"error": "Cannot grow volume group! " + err.Error()})
return
}
}
}
- fid, count, dn, err := topo.PickForWrite(rt, c)
+ fid, count, dn, err := topo.PickForWrite(rt, c, dataCenter)
if err == nil {
writeJsonQuiet(w, r, map[string]interface{}{"fid": fid, "url": dn.Url(), "publicUrl": dn.PublicUrl, "count": count})
} else {
@@ -120,7 +122,7 @@ func dirJoinHandler(w http.ResponseWriter, r *http.Request) {
return
}
debug(s, "volumes", r.FormValue("volumes"))
- topo.RegisterVolumes(init, *volumes, ip, port, publicUrl, maxVolumeCount)
+ topo.RegisterVolumes(init, *volumes, ip, port, publicUrl, maxVolumeCount, r.FormValue("dataCenter"), r.FormValue("rack"))
m := make(map[string]interface{})
m["VolumeSizeLimit"] = uint64(*volumeSizeLimitMB) * 1024 * 1024
writeJsonQuiet(w, r, m)
@@ -151,7 +153,7 @@ func volumeGrowHandler(w http.ResponseWriter, r *http.Request) {
if topo.FreeSpace() < count*rt.GetCopyCount() {
err = errors.New("Only " + strconv.Itoa(topo.FreeSpace()) + " volumes left! Not enough for " + strconv.Itoa(count*rt.GetCopyCount()))
} else {
- count, err = vg.GrowByCountAndType(count, rt, topo)
+ count, err = vg.GrowByCountAndType(count, rt, r.FormValue("dataCneter"), topo)
}
} else {
err = errors.New("parameter count is not found")
diff --git a/go/weed/volume.go b/go/weed/volume.go
index 33121388e..6cbbceaef 100644
--- a/go/weed/volume.go
+++ b/go/weed/volume.go
@@ -2,7 +2,7 @@ package main
import (
"code.google.com/p/weed-fs/go/operation"
- "code.google.com/p/weed-fs/go/replication"
+ "code.google.com/p/weed-fs/go/replication"
"code.google.com/p/weed-fs/go/storage"
"log"
"math/rand"
@@ -38,6 +38,8 @@ var (
maxVolumeCount = cmdVolume.Flag.Int("max", 5, "maximum number of volumes")
vReadTimeout = cmdVolume.Flag.Int("readTimeout", 3, "connection read timeout in seconds")
vMaxCpu = cmdVolume.Flag.Int("maxCpu", 0, "maximum number of CPUs. 0 means all available CPUs")
+ dataCenter = cmdVolume.Flag.String("dataCenter", "", "current volume server's data center name")
+ rack = cmdVolume.Flag.String("rack", "", "current volume server's rack name")
store *storage.Store
)
@@ -86,6 +88,16 @@ func vacuumVolumeCommitHandler(w http.ResponseWriter, r *http.Request) {
}
debug("commit compact volume =", r.FormValue("volume"), ", error =", err)
}
+func freezeVolumeHandler(w http.ResponseWriter, r *http.Request) {
+ //TODO: notify master that this volume will be read-only
+ err := store.FreezeVolume(r.FormValue("volume"))
+ if err == nil {
+ writeJsonQuiet(w, r, map[string]interface{}{"error": ""})
+ } else {
+ writeJsonQuiet(w, r, map[string]string{"error": err.Error()})
+ }
+ debug("freeze volume =", r.FormValue("volume"), ", error =", err)
+}
func storeHandler(w http.ResponseWriter, r *http.Request) {
switch r.Method {
case "GET":
@@ -289,10 +301,13 @@ func runVolume(cmd *Command, args []string) bool {
http.HandleFunc("/admin/vacuum_volume_check", vacuumVolumeCheckHandler)
http.HandleFunc("/admin/vacuum_volume_compact", vacuumVolumeCompactHandler)
http.HandleFunc("/admin/vacuum_volume_commit", vacuumVolumeCommitHandler)
+ http.HandleFunc("/admin/freeze_volume", freezeVolumeHandler)
go func() {
connected := true
store.SetMaster(*masterNode)
+ store.SetDataCenter(*dataCenter)
+ store.SetRack(*rack)
for {
err := store.Join()
if err == nil {