aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--weed-fs/src/cmd/weed/master.go33
-rw-r--r--weed-fs/src/pkg/directory/volume_mapping.go2
-rw-r--r--weed-fs/src/pkg/replication/volume_growth.go2
-rw-r--r--weed-fs/src/pkg/topology/data_node.go2
-rw-r--r--weed-fs/src/pkg/topology/node.go2
-rw-r--r--weed-fs/src/pkg/topology/rack.go5
-rw-r--r--weed-fs/src/pkg/topology/topology.go17
-rw-r--r--weed-fs/src/pkg/topology/volume_location.go4
8 files changed, 44 insertions, 23 deletions
diff --git a/weed-fs/src/cmd/weed/master.go b/weed-fs/src/cmd/weed/master.go
index a61c65f39..d2e58eb5b 100644
--- a/weed-fs/src/cmd/weed/master.go
+++ b/weed-fs/src/cmd/weed/master.go
@@ -2,6 +2,7 @@ package main
import (
"encoding/json"
+ "errors"
"log"
"net/http"
"pkg/directory"
@@ -70,9 +71,9 @@ func dirAssignHandler(w http.ResponseWriter, r *http.Request) {
func dirAssign2Handler(w http.ResponseWriter, r *http.Request) {
c, _ := strconv.Atoi(r.FormValue("count"))
rt, err := storage.NewReplicationType(r.FormValue("replication"))
- if err!=nil {
- writeJson(w, r, map[string]string{"error": err.Error()})
- return
+ if err != nil {
+ writeJson(w, r, map[string]string{"error": err.Error()})
+ return
}
if topo.GetVolumeLayout(rt).GetActiveVolumeCount() <= 0 {
if topo.FreeSpace() <= 0 {
@@ -111,22 +112,21 @@ func dirNewStatusHandler(w http.ResponseWriter, r *http.Request) {
writeJson(w, r, topo.ToMap())
}
func volumeGrowHandler(w http.ResponseWriter, r *http.Request) {
- rt, err := storage.NewReplicationType(r.FormValue("replication"))
- if err!=nil {
- writeJson(w, r, map[string]string{"error": err.Error()})
- return
- }
- count, err := strconv.Atoi(r.FormValue("count"))
- if topo.FreeSpace() < count * rt.GetCopyCount() {
- writeJson(w, r, map[string]string{"error": "Only "+strconv.Itoa(topo.FreeSpace())+" volumes left! Not enough for "+strconv.Itoa(count*rt.GetCopyCount())})
- return
+ count := 0
+ rt, err := storage.NewReplicationType(r.FormValue("replication"))
+ if err == nil {
+ if count, err = strconv.Atoi(r.FormValue("count")); err == nil {
+ if topo.FreeSpace() < count*rt.GetCopyCount() {
+ err = errors.New("Only " + strconv.Itoa(topo.FreeSpace()) + " volumes left! Not enough for " + strconv.Itoa(count*rt.GetCopyCount()))
+ } else {
+ count, err = vg.GrowByCountAndType(count, rt, topo)
+ }
+ }
}
if err != nil {
- count, err := vg.GrowByType(rt, topo)
- writeJson(w, r, map[string]interface{}{"count": count, "error": err})
+ writeJson(w, r, map[string]string{"error": err.Error()})
} else {
- count, err := vg.GrowByCountAndType(count, rt, topo)
- writeJson(w, r, map[string]interface{}{"count": count, "error": err})
+ writeJson(w, r, map[string]interface{}{"count": count})
}
}
@@ -144,6 +144,7 @@ func runMaster(cmd *Command, args []string) bool {
http.HandleFunc("/vol/grow", volumeGrowHandler)
mapper.StartRefreshWritableVolumes()
+ topo.StartRefreshWritableVolumes()
log.Println("Start directory service at http://127.0.0.1:" + strconv.Itoa(*mport))
e := http.ListenAndServe(":"+strconv.Itoa(*mport), nil)
diff --git a/weed-fs/src/pkg/directory/volume_mapping.go b/weed-fs/src/pkg/directory/volume_mapping.go
index 495c57930..da41b3510 100644
--- a/weed-fs/src/pkg/directory/volume_mapping.go
+++ b/weed-fs/src/pkg/directory/volume_mapping.go
@@ -120,7 +120,7 @@ func (m *Mapper) StartRefreshWritableVolumes() {
}
func (m *Mapper) refreshWritableVolumes() {
- freshThreshHold := time.Now().Unix() - 5*m.pulse //5 times of sleep interval
+ freshThreshHold := time.Now().Unix() - 3*m.pulse //3 times of sleep interval
//setting Writers, copy-on-write because of possible updating, this needs some future work!
var writers []storage.VolumeId
for _, machine_entry := range m.Machines {
diff --git a/weed-fs/src/pkg/replication/volume_growth.go b/weed-fs/src/pkg/replication/volume_growth.go
index 8519921f5..28dca68b4 100644
--- a/weed-fs/src/pkg/replication/volume_growth.go
+++ b/weed-fs/src/pkg/replication/volume_growth.go
@@ -44,7 +44,7 @@ func (vg *VolumeGrowth) GrowByType(repType storage.ReplicationType, topo *topolo
case storage.Copy11:
return vg.GrowByCountAndType(vg.copy3factor, repType, topo)
}
- return 0, nil
+ return 0, errors.New("Unknown Replication Type!")
}
func (vg *VolumeGrowth) GrowByCountAndType(count int, repType storage.ReplicationType, topo *topology.Topology) (counter int, err error) {
counter = 0
diff --git a/weed-fs/src/pkg/topology/data_node.go b/weed-fs/src/pkg/topology/data_node.go
index 1516572fd..04c7cf111 100644
--- a/weed-fs/src/pkg/topology/data_node.go
+++ b/weed-fs/src/pkg/topology/data_node.go
@@ -11,7 +11,7 @@ type DataNode struct {
Ip string
Port int
PublicUrl string
- lastSeen int64 // unix time in seconds
+ LastSeen int64 // unix time in seconds
}
func NewDataNode(id string) *DataNode {
diff --git a/weed-fs/src/pkg/topology/node.go b/weed-fs/src/pkg/topology/node.go
index e73719794..ddaf9f0b2 100644
--- a/weed-fs/src/pkg/topology/node.go
+++ b/weed-fs/src/pkg/topology/node.go
@@ -151,7 +151,7 @@ func (n *NodeImpl) CollectWritableVolumes(freshThreshHold int64, volumeSizeLimit
if n.IsRack() {
for _, c := range n.Children() {
dn := c.(*DataNode) //can not cast n to DataNode
- if dn.lastSeen > freshThreshHold {
+ if dn.LastSeen > freshThreshHold {
continue
}
for _, v := range dn.volumes {
diff --git a/weed-fs/src/pkg/topology/rack.go b/weed-fs/src/pkg/topology/rack.go
index f685f1a82..c819feb00 100644
--- a/weed-fs/src/pkg/topology/rack.go
+++ b/weed-fs/src/pkg/topology/rack.go
@@ -2,6 +2,7 @@ package topology
import (
"strconv"
+ "time"
)
type Rack struct {
@@ -28,7 +29,8 @@ func (r *Rack) GetOrCreateDataNode(ip string, port int, publicUrl string, maxVol
for _, c := range r.Children() {
dn := c.(*DataNode)
if dn.MatchLocation(ip, port) {
- dn.NodeImpl.UpAdjustMaxVolumeCountDelta(maxVolumeCount - dn.maxVolumeCount)
+ dn.LastSeen = time.Now().Unix()
+ dn.UpAdjustMaxVolumeCountDelta(maxVolumeCount - dn.maxVolumeCount)
return dn
}
}
@@ -37,6 +39,7 @@ func (r *Rack) GetOrCreateDataNode(ip string, port int, publicUrl string, maxVol
dn.Port = port
dn.PublicUrl = publicUrl
dn.maxVolumeCount = maxVolumeCount
+ dn.LastSeen = time.Now().Unix()
r.LinkChildNode(dn)
return dn
}
diff --git a/weed-fs/src/pkg/topology/topology.go b/weed-fs/src/pkg/topology/topology.go
index 079fb54d9..a60768f14 100644
--- a/weed-fs/src/pkg/topology/topology.go
+++ b/weed-fs/src/pkg/topology/topology.go
@@ -6,6 +6,7 @@ import (
"pkg/directory"
"pkg/sequence"
"pkg/storage"
+ "time"
)
type Topology struct {
@@ -125,3 +126,19 @@ func (t *Topology) ToMap() interface{} {
m["layouts"] = layouts
return m
}
+
+func (t *Topology) StartRefreshWritableVolumes() {
+ go func() {
+ for {
+ t.refreshWritableVolumes()
+ time.Sleep(time.Duration(float32(t.pulse*1e3)*(1+rand.Float32())) * time.Millisecond)
+ }
+ }()
+}
+
+func (t *Topology) refreshWritableVolumes() {
+ freshThreshHold := time.Now().Unix() - 3*t.pulse //5 times of sleep interval
+ //setting Writers, copy-on-write because of possible updating, this needs some future work!
+ t.CollectWritableVolumes(freshThreshHold, t.volumeSizeLimit)
+ //TODO: collect writable columes for each replication type
+}
diff --git a/weed-fs/src/pkg/topology/volume_location.go b/weed-fs/src/pkg/topology/volume_location.go
index 92d89ae46..f2e5dd894 100644
--- a/weed-fs/src/pkg/topology/volume_location.go
+++ b/weed-fs/src/pkg/topology/volume_location.go
@@ -27,7 +27,7 @@ func (dnll *DataNodeLocationList) Add(loc *DataNode) bool {
func (dnll *DataNodeLocationList) Refresh(freshThreshHold int64) {
var changed bool
for _, dnl := range dnll.list {
- if dnl.lastSeen < freshThreshHold {
+ if dnl.LastSeen < freshThreshHold {
changed = true
break
}
@@ -35,7 +35,7 @@ func (dnll *DataNodeLocationList) Refresh(freshThreshHold int64) {
if changed {
var l []*DataNode
for _, dnl := range dnll.list {
- if dnl.lastSeen >= freshThreshHold {
+ if dnl.LastSeen >= freshThreshHold {
l = append(l, dnl)
}
}