aboutsummaryrefslogtreecommitdiff
path: root/go
diff options
context:
space:
mode:
authorChris Lu <chris.lu@gmail.com>2014-04-13 01:29:52 -0700
committerChris Lu <chris.lu@gmail.com>2014-04-13 01:29:52 -0700
commitf7f582ec8698dc43f1a2289dbd06fe0cade7468f (patch)
tree1b788ffd9b33ef6807e6aaea3bc24b08cbf10fa8 /go
parent008aee0dc1932f75c86e52893044d9cd953ef405 (diff)
downloadseaweedfs-f7f582ec8698dc43f1a2289dbd06fe0cade7468f.tar.xz
seaweedfs-f7f582ec8698dc43f1a2289dbd06fe0cade7468f.zip
1. refactoring, merge "replication" logic into "topology" package
2. when growing volumes, additional preferred "rack" and "dataNode" paraemters are also provided. Previously only "dataCenter" paraemter is provided.
Diffstat (limited to 'go')
-rw-r--r--go/topology/allocate_volume.go (renamed from go/replication/allocate_volume.go)5
-rw-r--r--go/topology/configuration.go24
-rw-r--r--go/topology/data_node.go10
-rw-r--r--go/topology/store_replicate.go (renamed from go/replication/store_replicate.go)2
-rw-r--r--go/topology/topology.go9
-rw-r--r--go/topology/volume_growth.go (renamed from go/replication/volume_growth.go)56
-rw-r--r--go/topology/volume_growth_test.go (renamed from go/replication/volume_growth_test.go)13
-rw-r--r--go/topology/volume_layout.go24
-rw-r--r--go/weed/weed_server/master_server.go13
-rw-r--r--go/weed/weed_server/master_server_handlers.go59
-rw-r--r--go/weed/weed_server/volume_server_handlers.go6
11 files changed, 144 insertions, 77 deletions
diff --git a/go/replication/allocate_volume.go b/go/topology/allocate_volume.go
index fb40c6353..77b4ac508 100644
--- a/go/replication/allocate_volume.go
+++ b/go/topology/allocate_volume.go
@@ -1,8 +1,7 @@
-package replication
+package topology
import (
"code.google.com/p/weed-fs/go/storage"
- "code.google.com/p/weed-fs/go/topology"
"code.google.com/p/weed-fs/go/util"
"encoding/json"
"errors"
@@ -13,7 +12,7 @@ type AllocateVolumeResult struct {
Error string
}
-func AllocateVolume(dn *topology.DataNode, vid storage.VolumeId, collection string, rp *storage.ReplicaPlacement) error {
+func AllocateVolume(dn *DataNode, vid storage.VolumeId, collection string, rp *storage.ReplicaPlacement) error {
values := make(url.Values)
values.Add("volume", vid.String())
values.Add("collection", collection)
diff --git a/go/topology/configuration.go b/go/topology/configuration.go
index 058600a7c..ffcebb59c 100644
--- a/go/topology/configuration.go
+++ b/go/topology/configuration.go
@@ -47,19 +47,19 @@ func (c *Configuration) String() string {
}
func (c *Configuration) Locate(ip string, dcName string, rackName string) (dc string, rack string) {
- if dcName == "" {
- if c != nil && c.ip2location != nil {
- if loc, ok := c.ip2location[ip]; ok {
- return loc.dcName, loc.rackName
- }
- }
- } else {
- if rackName == "" {
- return dcName, "DefaultRack"
- } else {
- return dcName, rackName
+ if c != nil && c.ip2location != nil {
+ if loc, ok := c.ip2location[ip]; ok {
+ return loc.dcName, loc.rackName
}
}
- return "DefaultDataCenter", "DefaultRack"
+ if dcName == "" {
+ dcName = "DefaultDataCenter"
+ }
+
+ if rackName == "" {
+ rackName = "DefaultRack"
+ }
+
+ return dcName, rackName
}
diff --git a/go/topology/data_node.go b/go/topology/data_node.go
index 0cedb5cfe..ae80e08bb 100644
--- a/go/topology/data_node.go
+++ b/go/topology/data_node.go
@@ -24,6 +24,7 @@ func NewDataNode(id string) *DataNode {
s.NodeImpl.value = s
return s
}
+
func (dn *DataNode) AddOrUpdateVolume(v storage.VolumeInfo) {
if _, ok := dn.volumes[v.Id]; !ok {
dn.volumes[v.Id] = v
@@ -36,6 +37,7 @@ func (dn *DataNode) AddOrUpdateVolume(v storage.VolumeInfo) {
dn.volumes[v.Id] = v
}
}
+
func (dn *DataNode) UpdateVolumes(actualVolumes []storage.VolumeInfo) {
actualVolumeMap := make(map[storage.VolumeId]storage.VolumeInfo)
for _, v := range actualVolumes {
@@ -53,9 +55,15 @@ func (dn *DataNode) UpdateVolumes(actualVolumes []storage.VolumeInfo) {
dn.AddOrUpdateVolume(v)
}
}
+
func (dn *DataNode) GetDataCenter() *DataCenter {
return dn.Parent().Parent().(*NodeImpl).value.(*DataCenter)
}
+
+func (dn *DataNode) GetRack() *Rack {
+ return dn.Parent().(*NodeImpl).value.(*Rack)
+}
+
func (dn *DataNode) GetTopology() *Topology {
p := dn.Parent()
for p.Parent() != nil {
@@ -64,9 +72,11 @@ func (dn *DataNode) GetTopology() *Topology {
t := p.(*Topology)
return t
}
+
func (dn *DataNode) MatchLocation(ip string, port int) bool {
return dn.Ip == ip && dn.Port == port
}
+
func (dn *DataNode) Url() string {
return dn.Ip + ":" + strconv.Itoa(dn.Port)
}
diff --git a/go/replication/store_replicate.go b/go/topology/store_replicate.go
index 249e7e3e6..a982cebe5 100644
--- a/go/replication/store_replicate.go
+++ b/go/topology/store_replicate.go
@@ -1,4 +1,4 @@
-package replication
+package topology
import (
"bytes"
diff --git a/go/topology/topology.go b/go/topology/topology.go
index 6c5bde304..b1fa3f2a2 100644
--- a/go/topology/topology.go
+++ b/go/topology/topology.go
@@ -108,8 +108,13 @@ func (t *Topology) NextVolumeId() storage.VolumeId {
return next
}
-func (t *Topology) PickForWrite(collectionName string, rp *storage.ReplicaPlacement, count int, dataCenter string) (string, int, *DataNode, error) {
- vid, count, datanodes, err := t.GetVolumeLayout(collectionName, rp).PickForWrite(count, dataCenter)
+func (t *Topology) HasWriableVolume(option *VolumeGrowOption) bool {
+ vl := t.GetVolumeLayout(option.Collection, option.ReplicaPlacement)
+ return vl.GetActiveVolumeCount(option) > 0
+}
+
+func (t *Topology) PickForWrite(count int, option *VolumeGrowOption) (string, int, *DataNode, error) {
+ vid, count, datanodes, err := t.GetVolumeLayout(option.Collection, option.ReplicaPlacement).PickForWrite(count, option)
if err != nil || datanodes.Length() == 0 {
return "", 0, nil, errors.New("No writable volumes avalable!")
}
diff --git a/go/replication/volume_growth.go b/go/topology/volume_growth.go
index 33dfc570e..ee6233364 100644
--- a/go/replication/volume_growth.go
+++ b/go/topology/volume_growth.go
@@ -1,9 +1,8 @@
-package replication
+package topology
import (
"code.google.com/p/weed-fs/go/glog"
"code.google.com/p/weed-fs/go/storage"
- "code.google.com/p/weed-fs/go/topology"
"fmt"
"math/rand"
"sync"
@@ -17,6 +16,14 @@ This package is created to resolve these replica placement issues:
4. volume allocation for each bucket
*/
+type VolumeGrowOption struct {
+ Collection string
+ ReplicaPlacement *storage.ReplicaPlacement
+ DataCenter string
+ Rack string
+ DataNode string
+}
+
type VolumeGrowth struct {
accessLock sync.Mutex
}
@@ -41,19 +48,19 @@ func (vg *VolumeGrowth) findVolumeCount(copyCount int) (count int) {
return
}
-func (vg *VolumeGrowth) AutomaticGrowByType(collection string, rp *storage.ReplicaPlacement, preferredDataCenter string, topo *topology.Topology) (count int, err error) {
- count, err = vg.GrowByCountAndType(vg.findVolumeCount(rp.GetCopyCount()), collection, rp, preferredDataCenter, topo)
- if count > 0 && count%rp.GetCopyCount() == 0 {
+func (vg *VolumeGrowth) AutomaticGrowByType(option *VolumeGrowOption, topo *Topology) (count int, err error) {
+ count, err = vg.GrowByCountAndType(vg.findVolumeCount(option.ReplicaPlacement.GetCopyCount()), option, topo)
+ if count > 0 && count%option.ReplicaPlacement.GetCopyCount() == 0 {
return count, nil
}
return count, err
}
-func (vg *VolumeGrowth) GrowByCountAndType(targetCount int, collection string, rp *storage.ReplicaPlacement, preferredDataCenter string, topo *topology.Topology) (counter int, err error) {
+func (vg *VolumeGrowth) GrowByCountAndType(targetCount int, option *VolumeGrowOption, topo *Topology) (counter int, err error) {
vg.accessLock.Lock()
defer vg.accessLock.Unlock()
for i := 0; i < targetCount; i++ {
- if c, e := vg.findAndGrow(topo, preferredDataCenter, collection, rp); e == nil {
+ if c, e := vg.findAndGrow(topo, option); e == nil {
counter += c
} else {
return counter, e
@@ -62,21 +69,22 @@ func (vg *VolumeGrowth) GrowByCountAndType(targetCount int, collection string, r
return
}
-func (vg *VolumeGrowth) findAndGrow(topo *topology.Topology, preferredDataCenter string, collection string, rp *storage.ReplicaPlacement) (int, error) {
- servers, e := vg.findEmptySlotsForOneVolume(topo, preferredDataCenter, rp)
+func (vg *VolumeGrowth) findAndGrow(topo *Topology, option *VolumeGrowOption) (int, error) {
+ servers, e := vg.findEmptySlotsForOneVolume(topo, option)
if e != nil {
return 0, e
}
vid := topo.NextVolumeId()
- err := vg.grow(topo, vid, collection, rp, servers...)
+ err := vg.grow(topo, vid, option, servers...)
return len(servers), err
}
-func (vg *VolumeGrowth) findEmptySlotsForOneVolume(topo *topology.Topology, preferredDataCenter string, rp *storage.ReplicaPlacement) (servers []*topology.DataNode, err error) {
+func (vg *VolumeGrowth) findEmptySlotsForOneVolume(topo *Topology, option *VolumeGrowOption) (servers []*DataNode, err error) {
//find main datacenter and other data centers
- mainDataCenter, otherDataCenters, dc_err := topo.RandomlyPickNodes(rp.DiffDataCenterCount+1, func(node topology.Node) error {
- if preferredDataCenter != "" && node.IsDataCenter() && node.Id() != topology.NodeId(preferredDataCenter) {
- return fmt.Errorf("Not matching preferred:%s", preferredDataCenter)
+ rp := option.ReplicaPlacement
+ mainDataCenter, otherDataCenters, dc_err := topo.RandomlyPickNodes(rp.DiffDataCenterCount+1, func(node Node) error {
+ if option.DataCenter != "" && node.IsDataCenter() && node.Id() != NodeId(option.DataCenter) {
+ return fmt.Errorf("Not matching preferred data center:%s", option.DataCenter)
}
if node.FreeSpace() < rp.DiffRackCount+rp.SameRackCount+1 {
return fmt.Errorf("Free:%d < Expected:%d", node.FreeSpace(), rp.DiffRackCount+rp.SameRackCount+1)
@@ -88,7 +96,10 @@ func (vg *VolumeGrowth) findEmptySlotsForOneVolume(topo *topology.Topology, pref
}
//find main rack and other racks
- mainRack, otherRacks, rack_err := mainDataCenter.(*topology.DataCenter).RandomlyPickNodes(rp.DiffRackCount+1, func(node topology.Node) error {
+ mainRack, otherRacks, rack_err := mainDataCenter.(*DataCenter).RandomlyPickNodes(rp.DiffRackCount+1, func(node Node) error {
+ if option.Rack != "" && node.IsRack() && node.Id() != NodeId(option.Rack) {
+ return fmt.Errorf("Not matching preferred rack:%s", option.Rack)
+ }
if node.FreeSpace() < rp.SameRackCount+1 {
return fmt.Errorf("Free:%d < Expected:%d", node.FreeSpace(), rp.SameRackCount+1)
}
@@ -99,7 +110,10 @@ func (vg *VolumeGrowth) findEmptySlotsForOneVolume(topo *topology.Topology, pref
}
//find main rack and other racks
- mainServer, otherServers, server_err := mainRack.(*topology.Rack).RandomlyPickNodes(rp.SameRackCount+1, func(node topology.Node) error {
+ mainServer, otherServers, server_err := mainRack.(*Rack).RandomlyPickNodes(rp.SameRackCount+1, func(node Node) error {
+ if option.DataNode != "" && node.IsDataNode() && node.Id() != NodeId(option.DataNode) {
+ return fmt.Errorf("Not matching preferred data node:%s", option.DataNode)
+ }
if node.FreeSpace() < 1 {
return fmt.Errorf("Free:%d < Expected:%d", node.FreeSpace(), 1)
}
@@ -109,9 +123,9 @@ func (vg *VolumeGrowth) findEmptySlotsForOneVolume(topo *topology.Topology, pref
return nil, server_err
}
- servers = append(servers, mainServer.(*topology.DataNode))
+ servers = append(servers, mainServer.(*DataNode))
for _, server := range otherServers {
- servers = append(servers, server.(*topology.DataNode))
+ servers = append(servers, server.(*DataNode))
}
for _, rack := range otherRacks {
r := rand.Intn(rack.FreeSpace())
@@ -132,10 +146,10 @@ func (vg *VolumeGrowth) findEmptySlotsForOneVolume(topo *topology.Topology, pref
return
}
-func (vg *VolumeGrowth) grow(topo *topology.Topology, vid storage.VolumeId, collection string, rp *storage.ReplicaPlacement, servers ...*topology.DataNode) error {
+func (vg *VolumeGrowth) grow(topo *Topology, vid storage.VolumeId, option *VolumeGrowOption, servers ...*DataNode) error {
for _, server := range servers {
- if err := AllocateVolume(server, vid, collection, rp); err == nil {
- vi := storage.VolumeInfo{Id: vid, Size: 0, Collection: collection, ReplicaPlacement: rp, Version: storage.CurrentVersion}
+ if err := AllocateVolume(server, vid, option.Collection, option.ReplicaPlacement); err == nil {
+ vi := storage.VolumeInfo{Id: vid, Size: 0, Collection: option.Collection, ReplicaPlacement: option.ReplicaPlacement, Version: storage.CurrentVersion}
server.AddOrUpdateVolume(vi)
topo.RegisterVolumeLayout(vi, server)
glog.V(0).Infoln("Created Volume", vid, "on", server)
diff --git a/go/replication/volume_growth_test.go b/go/topology/volume_growth_test.go
index bb6cbe90e..7f6bd9489 100644
--- a/go/replication/volume_growth_test.go
+++ b/go/topology/volume_growth_test.go
@@ -1,9 +1,8 @@
-package replication
+package topology
import (
"code.google.com/p/weed-fs/go/sequence"
"code.google.com/p/weed-fs/go/storage"
- "code.google.com/p/weed-fs/go/topology"
"encoding/json"
"fmt"
"testing"
@@ -70,7 +69,7 @@ var topologyLayout = `
}
`
-func setup(topologyLayout string) *topology.Topology {
+func setup(topologyLayout string) *Topology {
var data interface{}
err := json.Unmarshal([]byte(topologyLayout), &data)
if err != nil {
@@ -79,22 +78,22 @@ func setup(topologyLayout string) *topology.Topology {
fmt.Println("data:", data)
//need to connect all nodes first before server adding volumes
- topo, err := topology.NewTopology("weedfs", "/etc/weedfs/weedfs.conf",
+ topo, err := NewTopology("weedfs", "/etc/weedfs/weedfs.conf",
sequence.NewMemorySequencer(), 32*1024, 5)
if err != nil {
panic("error: " + err.Error())
}
mTopology := data.(map[string]interface{})
for dcKey, dcValue := range mTopology {
- dc := topology.NewDataCenter(dcKey)
+ dc := NewDataCenter(dcKey)
dcMap := dcValue.(map[string]interface{})
topo.LinkChildNode(dc)
for rackKey, rackValue := range dcMap {
- rack := topology.NewRack(rackKey)
+ rack := NewRack(rackKey)
rackMap := rackValue.(map[string]interface{})
dc.LinkChildNode(rack)
for serverKey, serverValue := range rackMap {
- server := topology.NewDataNode(serverKey)
+ server := NewDataNode(serverKey)
serverMap := serverValue.(map[string]interface{})
rack.LinkChildNode(server)
for _, v := range serverMap["volumes"].([]interface{}) {
diff --git a/go/topology/volume_layout.go b/go/topology/volume_layout.go
index a53e2ae82..bd95cc796 100644
--- a/go/topology/volume_layout.go
+++ b/go/topology/volume_layout.go
@@ -71,13 +71,13 @@ func (vl *VolumeLayout) ListVolumeServers() (nodes []*DataNode) {
return
}
-func (vl *VolumeLayout) PickForWrite(count int, dataCenter string) (*storage.VolumeId, int, *VolumeLocationList, error) {
+func (vl *VolumeLayout) PickForWrite(count int, option *VolumeGrowOption) (*storage.VolumeId, int, *VolumeLocationList, error) {
len_writers := len(vl.writables)
if len_writers <= 0 {
glog.V(0).Infoln("No more writable volumes!")
return nil, 0, nil, errors.New("No more writable volumes!")
}
- if dataCenter == "" {
+ if option.DataCenter == "" {
vid := vl.writables[rand.Intn(len_writers)]
locationList := vl.vid2location[vid]
if locationList != nil {
@@ -91,7 +91,13 @@ func (vl *VolumeLayout) PickForWrite(count int, dataCenter string) (*storage.Vol
for _, v := range vl.writables {
volumeLocationList := vl.vid2location[v]
for _, dn := range volumeLocationList.list {
- if dn.GetDataCenter().Id() == NodeId(dataCenter) {
+ if dn.GetDataCenter().Id() == NodeId(option.DataCenter) {
+ if option.Rack != "" && dn.GetRack().Id() != NodeId(option.Rack) {
+ continue
+ }
+ if option.DataNode != "" && dn.Id() != NodeId(option.DataNode) {
+ continue
+ }
counter++
if rand.Intn(counter) < 1 {
vid, locationList = v, volumeLocationList
@@ -104,14 +110,20 @@ func (vl *VolumeLayout) PickForWrite(count int, dataCenter string) (*storage.Vol
return nil, 0, nil, errors.New("Strangely This Should Never Have Happened!")
}
-func (vl *VolumeLayout) GetActiveVolumeCount(dataCenter string) int {
- if dataCenter == "" {
+func (vl *VolumeLayout) GetActiveVolumeCount(option *VolumeGrowOption) int {
+ if option.DataCenter == "" {
return len(vl.writables)
}
counter := 0
for _, v := range vl.writables {
for _, dn := range vl.vid2location[v].list {
- if dn.GetDataCenter().Id() == NodeId(dataCenter) {
+ if dn.GetDataCenter().Id() == NodeId(option.DataCenter) {
+ if option.Rack != "" && dn.GetRack().Id() != NodeId(option.Rack) {
+ continue
+ }
+ if option.DataNode != "" && dn.Id() != NodeId(option.DataNode) {
+ continue
+ }
counter++
}
}
diff --git a/go/weed/weed_server/master_server.go b/go/weed/weed_server/master_server.go
index 286b90aca..9d8af156f 100644
--- a/go/weed/weed_server/master_server.go
+++ b/go/weed/weed_server/master_server.go
@@ -2,10 +2,10 @@ package weed_server
import (
"code.google.com/p/weed-fs/go/glog"
- "code.google.com/p/weed-fs/go/replication"
"code.google.com/p/weed-fs/go/sequence"
"code.google.com/p/weed-fs/go/topology"
"code.google.com/p/weed-fs/go/util"
+ "errors"
"github.com/goraft/raft"
"github.com/gorilla/mux"
"net/http"
@@ -25,7 +25,7 @@ type MasterServer struct {
whiteList []string
Topo *topology.Topology
- vg *replication.VolumeGrowth
+ vg *topology.VolumeGrowth
vgLock sync.Mutex
bounedLeaderChan chan int
@@ -53,7 +53,7 @@ func NewMasterServer(r *mux.Router, port int, metaFolder string,
uint64(volumeSizeLimitMB)*1024*1024, pulseSeconds); e != nil {
glog.Fatalf("cannot create topology:%s", e)
}
- ms.vg = replication.NewDefaultVolumeGrowth()
+ ms.vg = topology.NewDefaultVolumeGrowth()
glog.V(0).Infoln("Volume Size Limit is", volumeSizeLimitMB, "MB")
r.HandleFunc("/dir/assign", ms.proxyToLeader(secure(ms.whiteList, ms.dirAssignHandler)))
@@ -94,11 +94,11 @@ func (ms *MasterServer) proxyToLeader(f func(w http.ResponseWriter, r *http.Requ
return func(w http.ResponseWriter, r *http.Request) {
if ms.Topo.IsLeader() {
f(w, r)
- } else {
+ } else if ms.Topo.RaftServer.Leader() != "" {
ms.bounedLeaderChan <- 1
defer func() { <-ms.bounedLeaderChan }()
targetUrl, err := url.Parse("http://" + ms.Topo.RaftServer.Leader())
- if err != nil || ms.Topo.RaftServer.Leader() == "" {
+ if err != nil {
writeJsonQuiet(w, r, map[string]interface{}{"error": "Leader URL http://" + ms.Topo.RaftServer.Leader() + " Parse Error " + err.Error()})
return
}
@@ -106,6 +106,9 @@ func (ms *MasterServer) proxyToLeader(f func(w http.ResponseWriter, r *http.Requ
proxy := httputil.NewSingleHostReverseProxy(targetUrl)
proxy.Transport = util.Transport
proxy.ServeHTTP(w, r)
+ } else {
+ //drop it to the floor
+ writeJsonError(w, r, errors.New(ms.Topo.RaftServer.Name()+"does not know Leader yet:"+ms.Topo.RaftServer.Leader()))
}
}
}
diff --git a/go/weed/weed_server/master_server_handlers.go b/go/weed/weed_server/master_server_handlers.go
index a2f7b162c..884689a5e 100644
--- a/go/weed/weed_server/master_server_handlers.go
+++ b/go/weed/weed_server/master_server_handlers.go
@@ -3,6 +3,7 @@ package weed_server
import (
"code.google.com/p/weed-fs/go/stats"
"code.google.com/p/weed-fs/go/storage"
+ "code.google.com/p/weed-fs/go/topology"
"code.google.com/p/weed-fs/go/util"
"encoding/json"
"errors"
@@ -39,24 +40,19 @@ func (ms *MasterServer) dirLookupHandler(w http.ResponseWriter, r *http.Request)
func (ms *MasterServer) dirAssignHandler(w http.ResponseWriter, r *http.Request) {
stats.AssignRequest()
- c, e := strconv.Atoi(r.FormValue("count"))
+ requestedCount, e := strconv.Atoi(r.FormValue("count"))
if e != nil {
- c = 1
+ requestedCount = 1
}
- replication := r.FormValue("replication")
- if replication == "" {
- replication = ms.defaultReplicaPlacement
- }
- collection := r.FormValue("collection")
- dataCenter := r.FormValue("dataCenter")
- replicaPlacement, err := storage.NewReplicaPlacementFromString(replication)
+
+ option, err := ms.getVolumeGrowOption(r)
if err != nil {
w.WriteHeader(http.StatusNotAcceptable)
writeJsonQuiet(w, r, map[string]string{"error": err.Error()})
return
}
- if ms.Topo.GetVolumeLayout(collection, replicaPlacement).GetActiveVolumeCount(dataCenter) <= 0 {
+ if !ms.Topo.HasWriableVolume(option) {
if ms.Topo.FreeSpace() <= 0 {
w.WriteHeader(http.StatusNotFound)
writeJsonQuiet(w, r, map[string]string{"error": "No free volumes left!"})
@@ -64,15 +60,15 @@ func (ms *MasterServer) dirAssignHandler(w http.ResponseWriter, r *http.Request)
} else {
ms.vgLock.Lock()
defer ms.vgLock.Unlock()
- if ms.Topo.GetVolumeLayout(collection, replicaPlacement).GetActiveVolumeCount(dataCenter) <= 0 {
- if _, err = ms.vg.AutomaticGrowByType(collection, replicaPlacement, dataCenter, ms.Topo); err != nil {
+ if !ms.Topo.HasWriableVolume(option) {
+ if _, err = ms.vg.AutomaticGrowByType(option, ms.Topo); err != nil {
writeJsonQuiet(w, r, map[string]string{"error": "Cannot grow volume group! " + err.Error()})
return
}
}
}
}
- fid, count, dn, err := ms.Topo.PickForWrite(collection, replicaPlacement, c, dataCenter)
+ fid, count, dn, err := ms.Topo.PickForWrite(requestedCount, option)
if err == nil {
writeJsonQuiet(w, r, map[string]interface{}{"fid": fid, "url": dn.Url(), "publicUrl": dn.PublicUrl, "count": count})
} else {
@@ -138,13 +134,18 @@ func (ms *MasterServer) volumeVacuumHandler(w http.ResponseWriter, r *http.Reque
func (ms *MasterServer) volumeGrowHandler(w http.ResponseWriter, r *http.Request) {
count := 0
- replicaPlacement, err := storage.NewReplicaPlacementFromString(r.FormValue("replication"))
+ option, err := ms.getVolumeGrowOption(r)
+ if err != nil {
+ w.WriteHeader(http.StatusNotAcceptable)
+ writeJsonQuiet(w, r, map[string]string{"error": err.Error()})
+ return
+ }
if err == nil {
if count, err = strconv.Atoi(r.FormValue("count")); err == nil {
- if ms.Topo.FreeSpace() < count*replicaPlacement.GetCopyCount() {
- err = errors.New("Only " + strconv.Itoa(ms.Topo.FreeSpace()) + " volumes left! Not enough for " + strconv.Itoa(count*replicaPlacement.GetCopyCount()))
+ if ms.Topo.FreeSpace() < count*option.ReplicaPlacement.GetCopyCount() {
+ err = errors.New("Only " + strconv.Itoa(ms.Topo.FreeSpace()) + " volumes left! Not enough for " + strconv.Itoa(count*option.ReplicaPlacement.GetCopyCount()))
} else {
- count, err = ms.vg.GrowByCountAndType(count, r.FormValue("collection"), replicaPlacement, r.FormValue("dataCenter"), ms.Topo)
+ count, err = ms.vg.GrowByCountAndType(count, option, ms.Topo)
}
} else {
err = errors.New("parameter count is not found")
@@ -189,3 +190,27 @@ func (ms *MasterServer) submitFromMasterServerHandler(w http.ResponseWriter, r *
submitForClientHandler(w, r, ms.Topo.RaftServer.Leader())
}
}
+
+func (ms *MasterServer) hasWriableVolume(option *topology.VolumeGrowOption) bool {
+ vl := ms.Topo.GetVolumeLayout(option.Collection, option.ReplicaPlacement)
+ return vl.GetActiveVolumeCount(option) > 0
+}
+
+func (ms *MasterServer) getVolumeGrowOption(r *http.Request) (*topology.VolumeGrowOption, error) {
+ replicationString := r.FormValue("replication")
+ if replicationString == "" {
+ replicationString = ms.defaultReplicaPlacement
+ }
+ replicaPlacement, err := storage.NewReplicaPlacementFromString(replicationString)
+ if err != nil {
+ return nil, err
+ }
+ volumeGrowOption := &topology.VolumeGrowOption{
+ Collection: r.FormValue("collection"),
+ ReplicaPlacement: replicaPlacement,
+ DataCenter: r.FormValue("dataCenter"),
+ Rack: r.FormValue("rack"),
+ DataNode: r.FormValue("dataNode"),
+ }
+ return volumeGrowOption, nil
+}
diff --git a/go/weed/weed_server/volume_server_handlers.go b/go/weed/weed_server/volume_server_handlers.go
index 0649ce7f4..36ed204ac 100644
--- a/go/weed/weed_server/volume_server_handlers.go
+++ b/go/weed/weed_server/volume_server_handlers.go
@@ -3,9 +3,9 @@ package weed_server
import (
"code.google.com/p/weed-fs/go/glog"
"code.google.com/p/weed-fs/go/operation"
- "code.google.com/p/weed-fs/go/replication"
"code.google.com/p/weed-fs/go/stats"
"code.google.com/p/weed-fs/go/storage"
+ "code.google.com/p/weed-fs/go/topology"
"code.google.com/p/weed-fs/go/util"
"mime"
"net/http"
@@ -214,7 +214,7 @@ func (vs *VolumeServer) PostHandler(w http.ResponseWriter, r *http.Request) {
writeJsonError(w, r, ne)
return
}
- ret, errorStatus := replication.ReplicatedWrite(vs.masterNode, vs.store, volumeId, needle, r)
+ ret, errorStatus := topology.ReplicatedWrite(vs.masterNode, vs.store, volumeId, needle, r)
if errorStatus == "" {
w.WriteHeader(http.StatusCreated)
} else {
@@ -251,7 +251,7 @@ func (vs *VolumeServer) DeleteHandler(w http.ResponseWriter, r *http.Request) {
}
n.Size = 0
- ret := replication.ReplicatedDelete(vs.masterNode, vs.store, volumeId, n, r)
+ ret := topology.ReplicatedDelete(vs.masterNode, vs.store, volumeId, n, r)
if ret != 0 {
w.WriteHeader(http.StatusAccepted)