aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--go/operation/list_masters.go6
-rw-r--r--go/replication/allocate_volume.go4
-rw-r--r--go/replication/volume_growth.go246
-rw-r--r--go/replication/volume_growth_test.go28
-rw-r--r--go/storage/cdb_map.go4
-rw-r--r--go/storage/compact_map_perf_test.go2
-rw-r--r--go/storage/replica_placement.go61
-rw-r--r--go/storage/replication_type.go123
-rw-r--r--go/storage/store.go18
-rw-r--r--go/storage/volume.go18
-rw-r--r--go/storage/volume_info.go2
-rw-r--r--go/topology/collection.go14
-rw-r--r--go/topology/data_center.go1
-rw-r--r--go/topology/node.go69
-rw-r--r--go/topology/node_list.go83
-rw-r--r--go/topology/node_list_test.go60
-rw-r--r--go/topology/rack.go1
-rw-r--r--go/topology/topo_test.go14
-rw-r--r--go/topology/topology.go20
-rw-r--r--go/topology/topology_event_handling.go6
-rw-r--r--go/topology/volume_layout.go16
-rw-r--r--go/util/file_util.go2
-rw-r--r--go/weed/compact.go2
-rw-r--r--go/weed/download.go2
-rw-r--r--go/weed/export.go4
-rw-r--r--go/weed/master.go26
-rw-r--r--go/weed/server.go39
-rw-r--r--go/weed/version.go2
-rw-r--r--go/weed/weed_server/master_server.go30
-rw-r--r--go/weed/weed_server/master_server_handlers.go24
-rw-r--r--go/weed/weed_server/volume_server.go2
-rw-r--r--note/replication.txt37
32 files changed, 371 insertions, 595 deletions
diff --git a/go/operation/list_masters.go b/go/operation/list_masters.go
index 05235aed0..ade975c71 100644
--- a/go/operation/list_masters.go
+++ b/go/operation/list_masters.go
@@ -23,5 +23,9 @@ func ListMasters(server string) ([]string, error) {
if err != nil {
return nil, err
}
- return ret.Peers, nil
+ masters := ret.Peers
+ if ret.IsLeader {
+ masters = append(masters, ret.Leader)
+ }
+ return masters, nil
}
diff --git a/go/replication/allocate_volume.go b/go/replication/allocate_volume.go
index 0f5ebc00f..fb40c6353 100644
--- a/go/replication/allocate_volume.go
+++ b/go/replication/allocate_volume.go
@@ -13,11 +13,11 @@ type AllocateVolumeResult struct {
Error string
}
-func AllocateVolume(dn *topology.DataNode, vid storage.VolumeId, collection string, repType storage.ReplicationType) error {
+func AllocateVolume(dn *topology.DataNode, vid storage.VolumeId, collection string, rp *storage.ReplicaPlacement) error {
values := make(url.Values)
values.Add("volume", vid.String())
values.Add("collection", collection)
- values.Add("replication", repType.String())
+ values.Add("replication", rp.String())
jsonBlob, err := util.Post("http://"+dn.PublicUrl+"/admin/assign_volume", values)
if err != nil {
return err
diff --git a/go/replication/volume_growth.go b/go/replication/volume_growth.go
index d7d1c90bd..8466b149f 100644
--- a/go/replication/volume_growth.go
+++ b/go/replication/volume_growth.go
@@ -5,7 +5,6 @@ import (
"code.google.com/p/weed-fs/go/storage"
"code.google.com/p/weed-fs/go/topology"
"errors"
- "fmt"
"math/rand"
"sync"
)
@@ -19,188 +18,115 @@ This package is created to resolve these replica placement issues:
*/
type VolumeGrowth struct {
- copy1factor int
- copy2factor int
- copy3factor int
- copyAll int
-
accessLock sync.Mutex
}
func NewDefaultVolumeGrowth() *VolumeGrowth {
- return &VolumeGrowth{copy1factor: 7, copy2factor: 6, copy3factor: 3}
+ return &VolumeGrowth{}
}
-func (vg *VolumeGrowth) AutomaticGrowByType(collection string, repType storage.ReplicationType, dataCenter string, topo *topology.Topology) (count int, err error) {
- factor := 1
- switch repType {
- case storage.Copy000:
- factor = 1
- count, err = vg.GrowByCountAndType(vg.copy1factor, collection, repType, dataCenter, topo)
- case storage.Copy001:
- factor = 2
- count, err = vg.GrowByCountAndType(vg.copy2factor, collection, repType, dataCenter, topo)
- case storage.Copy010:
- factor = 2
- count, err = vg.GrowByCountAndType(vg.copy2factor, collection, repType, dataCenter, topo)
- case storage.Copy100:
- factor = 2
- count, err = vg.GrowByCountAndType(vg.copy2factor, collection, repType, dataCenter, topo)
- case storage.Copy110:
- factor = 3
- count, err = vg.GrowByCountAndType(vg.copy3factor, collection, repType, dataCenter, topo)
- case storage.Copy200:
- factor = 3
- count, err = vg.GrowByCountAndType(vg.copy3factor, collection, repType, dataCenter, topo)
+// one replication type may need rp.GetCopyCount() actual volumes
+// given copyCount, how many logical volumes to create
+func (vg *VolumeGrowth) findVolumeCount(copyCount int) (count int) {
+ switch copyCount {
+ case 1:
+ count = 7
+ case 2:
+ count = 6
+ case 3:
+ count = 3
default:
- err = errors.New("Unknown Replication Type!")
+ count = 1
}
- if count > 0 && count%factor == 0 {
+ return
+}
+
+func (vg *VolumeGrowth) AutomaticGrowByType(collection string, rp *storage.ReplicaPlacement, preferredDataCenter string, topo *topology.Topology) (count int, err error) {
+ count, err = vg.GrowByCountAndType(vg.findVolumeCount(rp.GetCopyCount()), collection, rp, preferredDataCenter, topo)
+ if count > 0 && count%rp.GetCopyCount() == 0 {
return count, nil
}
return count, err
}
-func (vg *VolumeGrowth) GrowByCountAndType(count int, collection string, repType storage.ReplicationType, dataCenter string, topo *topology.Topology) (counter int, err error) {
+func (vg *VolumeGrowth) GrowByCountAndType(targetCount int, collection string, rp *storage.ReplicaPlacement, preferredDataCenter string, topo *topology.Topology) (counter int, err error) {
vg.accessLock.Lock()
defer vg.accessLock.Unlock()
- counter = 0
- switch repType {
- case storage.Copy000:
- for i := 0; i < count; i++ {
- if ok, server, vid := topo.RandomlyReserveOneVolume(dataCenter); ok {
- if err = vg.grow(topo, *vid, collection, repType, server); err == nil {
- counter++
- } else {
- return counter, err
- }
- } else {
- return counter, fmt.Errorf("Failed to grown volume for data center %s", dataCenter)
- }
- }
- case storage.Copy001:
- for i := 0; i < count; i++ {
- //randomly pick one server from the datacenter, and then choose from the same rack
- if ok, server1, vid := topo.RandomlyReserveOneVolume(dataCenter); ok {
- rack := server1.Parent()
- exclusion := make(map[string]topology.Node)
- exclusion[server1.String()] = server1
- newNodeList := topology.NewNodeList(rack.Children(), exclusion)
- if newNodeList.FreeSpace() > 0 {
- if ok2, server2 := newNodeList.ReserveOneVolume(rand.Intn(newNodeList.FreeSpace()), *vid); ok2 {
- if err = vg.grow(topo, *vid, collection, repType, server1, server2); err == nil {
- counter++
- }
- }
- }
- }
- }
- case storage.Copy010:
- for i := 0; i < count; i++ {
- //randomly pick one server from the datacenter, and then choose from the a different rack
- if ok, server1, vid := topo.RandomlyReserveOneVolume(dataCenter); ok {
- rack := server1.Parent()
- dc := rack.Parent()
- exclusion := make(map[string]topology.Node)
- exclusion[rack.String()] = rack
- newNodeList := topology.NewNodeList(dc.Children(), exclusion)
- if newNodeList.FreeSpace() > 0 {
- if ok2, server2 := newNodeList.ReserveOneVolume(rand.Intn(newNodeList.FreeSpace()), *vid); ok2 {
- if err = vg.grow(topo, *vid, collection, repType, server1, server2); err == nil {
- counter++
- }
- }
- }
- }
+ for i := 0; i < targetCount; i++ {
+ if c, e := vg.findAndGrow(topo, preferredDataCenter, collection, rp); e == nil {
+ counter += c
+ } else {
+ return counter, e
}
- case storage.Copy100:
- for i := 0; i < count; i++ {
- nl := topology.NewNodeList(topo.Children(), nil)
- picked, ret := nl.RandomlyPickN(2, 1, dataCenter)
- vid := topo.NextVolumeId()
- if ret {
- var servers []*topology.DataNode
- for _, n := range picked {
- if n.FreeSpace() > 0 {
- if ok, server := n.ReserveOneVolume(rand.Intn(n.FreeSpace()), vid, ""); ok {
- servers = append(servers, server)
- }
- }
- }
- if len(servers) == 2 {
- if err = vg.grow(topo, vid, collection, repType, servers...); err == nil {
- counter++
- }
- }
- } else {
- return counter, fmt.Errorf("Failed to grown volume on data center %s and another data center", dataCenter)
- }
+ }
+ return
+}
+
+func (vg *VolumeGrowth) findAndGrow(topo *topology.Topology, preferredDataCenter string, collection string, rp *storage.ReplicaPlacement) (int, error) {
+ servers, e := vg.findEmptySlotsForOneVolume(topo, preferredDataCenter, rp)
+ if e != nil {
+ return 0, e
+ }
+ vid := topo.NextVolumeId()
+ err := vg.grow(topo, vid, collection, rp, servers...)
+ return len(servers), err
+}
+
+func (vg *VolumeGrowth) findEmptySlotsForOneVolume(topo *topology.Topology, preferredDataCenter string, rp *storage.ReplicaPlacement) (servers []*topology.DataNode, err error) {
+ //find main datacenter and other data centers
+ mainDataCenter, otherDataCenters, dc_err := topo.RandomlyPickNodes(rp.DiffDataCenterCount+1, func(node topology.Node) bool {
+ if preferredDataCenter != "" && node.IsDataCenter() && node.Id() != topology.NodeId(preferredDataCenter) {
+ return false
}
- case storage.Copy110:
- for i := 0; i < count; i++ {
- nl := topology.NewNodeList(topo.Children(), nil)
- picked, ret := nl.RandomlyPickN(2, 2, dataCenter)
- vid := topo.NextVolumeId()
- if ret {
- var servers []*topology.DataNode
- dc1, dc2 := picked[0], picked[1]
- if dc2.FreeSpace() > dc1.FreeSpace() {
- dc1, dc2 = dc2, dc1
- }
- if dc1.FreeSpace() > 0 {
- if ok, server1 := dc1.ReserveOneVolume(rand.Intn(dc1.FreeSpace()), vid, ""); ok {
- servers = append(servers, server1)
- rack := server1.Parent()
- exclusion := make(map[string]topology.Node)
- exclusion[rack.String()] = rack
- newNodeList := topology.NewNodeList(dc1.Children(), exclusion)
- if newNodeList.FreeSpace() > 0 {
- if ok2, server2 := newNodeList.ReserveOneVolume(rand.Intn(newNodeList.FreeSpace()), vid); ok2 {
- servers = append(servers, server2)
- }
- }
- }
- }
- if dc2.FreeSpace() > 0 {
- if ok, server := dc2.ReserveOneVolume(rand.Intn(dc2.FreeSpace()), vid, ""); ok {
- servers = append(servers, server)
- }
- }
- if len(servers) == 3 {
- if err = vg.grow(topo, vid, collection, repType, servers...); err == nil {
- counter++
- }
- }
- }
+ return node.FreeSpace() > rp.DiffRackCount+rp.SameRackCount+1
+ })
+ if dc_err != nil {
+ return nil, dc_err
+ }
+
+ //find main rack and other racks
+ mainRack, otherRacks, rack_err := mainDataCenter.(*topology.DataCenter).RandomlyPickNodes(rp.DiffRackCount+1, func(node topology.Node) bool {
+ return node.FreeSpace() > rp.SameRackCount+1
+ })
+ if rack_err != nil {
+ return nil, rack_err
+ }
+
+ //find main rack and other racks
+ mainServer, otherServers, server_err := mainRack.(*topology.Rack).RandomlyPickNodes(rp.SameRackCount+1, func(node topology.Node) bool {
+ return node.FreeSpace() > 1
+ })
+ if server_err != nil {
+ return nil, server_err
+ }
+
+ servers = append(servers, mainServer.(*topology.DataNode))
+ for _, server := range otherServers {
+ servers = append(servers, server.(*topology.DataNode))
+ }
+ for _, rack := range otherRacks {
+ r := rand.Intn(rack.FreeSpace())
+ if server, e := rack.ReserveOneVolume(r); e == nil {
+ servers = append(servers, server)
+ } else {
+ return servers, e
}
- case storage.Copy200:
- for i := 0; i < count; i++ {
- nl := topology.NewNodeList(topo.Children(), nil)
- picked, ret := nl.RandomlyPickN(3, 1, dataCenter)
- vid := topo.NextVolumeId()
- if ret {
- var servers []*topology.DataNode
- for _, n := range picked {
- if n.FreeSpace() > 0 {
- if ok, server := n.ReserveOneVolume(rand.Intn(n.FreeSpace()), vid, ""); ok {
- servers = append(servers, server)
- }
- }
- }
- if len(servers) == 3 {
- if err = vg.grow(topo, vid, collection, repType, servers...); err == nil {
- counter++
- }
- }
- }
+ }
+ for _, datacenter := range otherDataCenters {
+ r := rand.Intn(datacenter.FreeSpace())
+ if server, e := datacenter.ReserveOneVolume(r); e == nil {
+ servers = append(servers, server)
+ } else {
+ return servers, e
}
}
return
}
-func (vg *VolumeGrowth) grow(topo *topology.Topology, vid storage.VolumeId, collection string, repType storage.ReplicationType, servers ...*topology.DataNode) error {
+
+func (vg *VolumeGrowth) grow(topo *topology.Topology, vid storage.VolumeId, collection string, rp *storage.ReplicaPlacement, servers ...*topology.DataNode) error {
for _, server := range servers {
- if err := AllocateVolume(server, vid, collection, repType); err == nil {
- vi := storage.VolumeInfo{Id: vid, Size: 0, Collection: collection, RepType: repType, Version: storage.CurrentVersion}
+ if err := AllocateVolume(server, vid, collection, rp); err == nil {
+ vi := storage.VolumeInfo{Id: vid, Size: 0, Collection: collection, ReplicaPlacement: rp, Version: storage.CurrentVersion}
server.AddOrUpdateVolume(vi)
topo.RegisterVolumeLayout(&vi, server)
glog.V(0).Infoln("Created Volume", vid, "on", server)
diff --git a/go/replication/volume_growth_test.go b/go/replication/volume_growth_test.go
index 99f82a7fa..bb6cbe90e 100644
--- a/go/replication/volume_growth_test.go
+++ b/go/replication/volume_growth_test.go
@@ -13,7 +13,7 @@ var topologyLayout = `
{
"dc1":{
"rack1":{
- "server1":{
+ "server111":{
"volumes":[
{"id":1, "size":12312},
{"id":2, "size":12312},
@@ -21,7 +21,7 @@ var topologyLayout = `
],
"limit":3
},
- "server2":{
+ "server112":{
"volumes":[
{"id":4, "size":12312},
{"id":5, "size":12312},
@@ -31,7 +31,7 @@ var topologyLayout = `
}
},
"rack2":{
- "server1":{
+ "server121":{
"volumes":[
{"id":4, "size":12312},
{"id":5, "size":12312},
@@ -39,17 +39,17 @@ var topologyLayout = `
],
"limit":4
},
- "server2":{
+ "server122":{
"volumes":[],
"limit":4
},
- "server3":{
+ "server123":{
"volumes":[
{"id":2, "size":12312},
{"id":3, "size":12312},
{"id":4, "size":12312}
],
- "limit":2
+ "limit":5
}
}
},
@@ -57,7 +57,7 @@ var topologyLayout = `
},
"dc3":{
"rack2":{
- "server1":{
+ "server321":{
"volumes":[
{"id":1, "size":12312},
{"id":3, "size":12312},
@@ -113,14 +113,16 @@ func setup(topologyLayout string) *topology.Topology {
return topo
}
-func TestRemoveDataCenter(t *testing.T) {
+func TestFindEmptySlotsForOneVolume(t *testing.T) {
topo := setup(topologyLayout)
- topo.UnlinkChildNode(topology.NodeId("dc2"))
- if topo.GetActiveVolumeCount() != 15 {
+ vg := NewDefaultVolumeGrowth()
+ rp, _ := storage.NewReplicaPlacementFromString("002")
+ servers, err := vg.findEmptySlotsForOneVolume(topo, "dc1", rp)
+ if err != nil {
+ fmt.Println("finding empty slots error :", err)
t.Fail()
}
- topo.UnlinkChildNode(topology.NodeId("dc3"))
- if topo.GetActiveVolumeCount() != 12 {
- t.Fail()
+ for _, server := range servers {
+ fmt.Println("assigned node :", server.Id())
}
}
diff --git a/go/storage/cdb_map.go b/go/storage/cdb_map.go
index 3e38b0bdb..0d790cc0f 100644
--- a/go/storage/cdb_map.go
+++ b/go/storage/cdb_map.go
@@ -76,8 +76,8 @@ func (m cdbMap) FileCount() int {
func (m *cdbMap) DeletedCount() int {
return m.DeletionCounter
}
-func (m *cdbMap) NextFileKey(count int) (uint64) {
- return 0
+func (m *cdbMap) NextFileKey(count int) uint64 {
+ return 0
}
func getMetric(c *cdb.Cdb, m *mapMetric) error {
diff --git a/go/storage/compact_map_perf_test.go b/go/storage/compact_map_perf_test.go
index e940310c0..37b23a59f 100644
--- a/go/storage/compact_map_perf_test.go
+++ b/go/storage/compact_map_perf_test.go
@@ -3,8 +3,8 @@ package storage
import (
"code.google.com/p/weed-fs/go/glog"
"code.google.com/p/weed-fs/go/util"
- "os"
"log"
+ "os"
"testing"
)
diff --git a/go/storage/replica_placement.go b/go/storage/replica_placement.go
new file mode 100644
index 000000000..55428749b
--- /dev/null
+++ b/go/storage/replica_placement.go
@@ -0,0 +1,61 @@
+package storage
+
+import (
+ "errors"
+ "fmt"
+)
+
+const (
+ ReplicaPlacementCount = 9
+)
+
+type ReplicaPlacement struct {
+ SameRackCount int
+ DiffRackCount int
+ DiffDataCenterCount int
+}
+
+func NewReplicaPlacementFromString(t string) (*ReplicaPlacement, error) {
+ rp := &ReplicaPlacement{}
+ for i, c := range t {
+ count := int(c - '0')
+ if 0 <= count && count <= 2 {
+ switch i {
+ case 0:
+ rp.DiffDataCenterCount = count
+ case 1:
+ rp.DiffRackCount = count
+ case 2:
+ rp.SameRackCount = count
+ }
+ } else {
+ return rp, errors.New("Unknown Replication Type:" + t)
+ }
+ }
+ return rp, nil
+}
+
+func NewReplicaPlacementFromByte(b byte) (*ReplicaPlacement, error) {
+ return NewReplicaPlacementFromString(fmt.Sprintf("%d", b))
+}
+
+func (rp *ReplicaPlacement) Byte() byte {
+ ret := rp.DiffDataCenterCount*100 + rp.DiffRackCount*10 + rp.SameRackCount
+ return byte(ret)
+}
+
+func (rp *ReplicaPlacement) String() string {
+ b := make([]byte, 3)
+ b[0] = byte(rp.DiffDataCenterCount + '0')
+ b[1] = byte(rp.DiffRackCount + '0')
+ b[2] = byte(rp.SameRackCount + '0')
+ return string(b)
+}
+
+func (rp *ReplicaPlacement) GetCopyCount() int {
+ return rp.DiffDataCenterCount + rp.DiffRackCount + rp.SameRackCount + 1
+}
+
+func (rp *ReplicaPlacement) GetReplicationLevelIndex() int {
+ return rp.DiffDataCenterCount*3 + rp.DiffRackCount*3 + rp.SameRackCount
+}
diff --git a/go/storage/replication_type.go b/go/storage/replication_type.go
deleted file mode 100644
index 0902d1016..000000000
--- a/go/storage/replication_type.go
+++ /dev/null
@@ -1,123 +0,0 @@
-package storage
-
-import (
- "errors"
-)
-
-type ReplicationType string
-
-const (
- Copy000 = ReplicationType("000") // single copy
- Copy001 = ReplicationType("001") // 2 copies, both on the same racks, and same data center
- Copy010 = ReplicationType("010") // 2 copies, both on different racks, but same data center
- Copy100 = ReplicationType("100") // 2 copies, each on different data center
- Copy110 = ReplicationType("110") // 3 copies, 2 on different racks and local data center, 1 on different data center
- Copy200 = ReplicationType("200") // 3 copies, each on dffereint data center
- LengthRelicationType = 6
- CopyNil = ReplicationType(255) // nil value
-)
-
-func NewReplicationTypeFromString(t string) (ReplicationType, error) {
- switch t {
- case "000":
- return Copy000, nil
- case "001":
- return Copy001, nil
- case "010":
- return Copy010, nil
- case "100":
- return Copy100, nil
- case "110":
- return Copy110, nil
- case "200":
- return Copy200, nil
- }
- return Copy000, errors.New("Unknown Replication Type:" + t)
-}
-func NewReplicationTypeFromByte(b byte) (ReplicationType, error) {
- switch b {
- case byte(000):
- return Copy000, nil
- case byte(001):
- return Copy001, nil
- case byte(010):
- return Copy010, nil
- case byte(100):
- return Copy100, nil
- case byte(110):
- return Copy110, nil
- case byte(200):
- return Copy200, nil
- }
- return Copy000, errors.New("Unknown Replication Type:" + string(b))
-}
-
-func (r *ReplicationType) String() string {
- switch *r {
- case Copy000:
- return "000"
- case Copy001:
- return "001"
- case Copy010:
- return "010"
- case Copy100:
- return "100"
- case Copy110:
- return "110"
- case Copy200:
- return "200"
- }
- return "000"
-}
-func (r *ReplicationType) Byte() byte {
- switch *r {
- case Copy000:
- return byte(000)
- case Copy001:
- return byte(001)
- case Copy010:
- return byte(010)
- case Copy100:
- return byte(100)
- case Copy110:
- return byte(110)
- case Copy200:
- return byte(200)
- }
- return byte(000)
-}
-
-func (repType ReplicationType) GetReplicationLevelIndex() int {
- switch repType {
- case Copy000:
- return 0
- case Copy001:
- return 1
- case Copy010:
- return 2
- case Copy100:
- return 3
- case Copy110:
- return 4
- case Copy200:
- return 5
- }
- return -1
-}
-func (repType ReplicationType) GetCopyCount() int {
- switch repType {
- case Copy000:
- return 1
- case Copy001:
- return 2
- case Copy010:
- return 2
- case Copy100:
- return 2
- case Copy110:
- return 3
- case Copy200:
- return 3
- }
- return 0
-}
diff --git a/go/storage/store.go b/go/storage/store.go
index 52e78d27d..2df0e6cb7 100644
--- a/go/storage/store.go
+++ b/go/storage/store.go
@@ -79,8 +79,8 @@ func NewStore(port int, ip, publicUrl string, dirnames []string, maxVolumeCounts
}
return
}
-func (s *Store) AddVolume(volumeListString string, collection string, replicationType string) error {
- rt, e := NewReplicationTypeFromString(replicationType)
+func (s *Store) AddVolume(volumeListString string, collection string, replicaPlacement string) error {
+ rt, e := NewReplicaPlacementFromString(replicaPlacement)
if e != nil {
return e
}
@@ -130,13 +130,13 @@ func (s *Store) findFreeLocation() (ret *DiskLocation) {
}
return ret
}
-func (s *Store) addVolume(vid VolumeId, collection string, replicationType ReplicationType) error {
+func (s *Store) addVolume(vid VolumeId, collection string, replicaPlacement *ReplicaPlacement) error {
if s.findVolume(vid) != nil {
return fmt.Errorf("Volume Id %s already exists!", vid)
}
if location := s.findFreeLocation(); location != nil {
- glog.V(0).Infoln("In dir", location.directory, "adds volume =", vid, ", collection =", collection, ", replicationType =", replicationType)
- if volume, err := NewVolume(location.directory, collection, vid, replicationType); err == nil {
+ glog.V(0).Infoln("In dir", location.directory, "adds volume =", vid, ", collection =", collection, ", replicaPlacement =", replicaPlacement)
+ if volume, err := NewVolume(location.directory, collection, vid, replicaPlacement); err == nil {
location.volumes[vid] = volume
return nil
} else {
@@ -206,9 +206,9 @@ func (l *DiskLocation) loadExistingVolumes() {
}
if vid, err := NewVolumeId(base); err == nil {
if l.volumes[vid] == nil {
- if v, e := NewVolume(l.directory, collection, vid, CopyNil); e == nil {
+ if v, e := NewVolume(l.directory, collection, vid, nil); e == nil {
l.volumes[vid] = v
- glog.V(0).Infoln("data file", l.directory+"/"+name, "replicationType =", v.ReplicaType, "version =", v.Version(), "size =", v.Size())
+ glog.V(0).Infoln("data file", l.directory+"/"+name, "replicaPlacement =", v.ReplicaPlacement, "version =", v.Version(), "size =", v.Size())
}
}
}
@@ -223,7 +223,7 @@ func (s *Store) Status() []*VolumeInfo {
for k, v := range location.volumes {
s := &VolumeInfo{Id: VolumeId(k), Size: v.ContentSize(),
Collection: v.Collection,
- RepType: v.ReplicaType,
+ ReplicaPlacement: v.ReplicaPlacement,
Version: v.Version(),
FileCount: v.nm.FileCount(),
DeleteCount: v.nm.DeletedCount(),
@@ -261,7 +261,7 @@ func (s *Store) Join() error {
for k, v := range location.volumes {
s := &VolumeInfo{Id: VolumeId(k), Size: uint64(v.Size()),
Collection: v.Collection,
- RepType: v.ReplicaType,
+ ReplicaPlacement: v.ReplicaPlacement,
Version: v.Version(),
FileCount: v.nm.FileCount(),
DeleteCount: v.nm.DeletedCount(),
diff --git a/go/storage/volume.go b/go/storage/volume.go
index a8d8f9a58..59c3055e3 100644
--- a/go/storage/volume.go
+++ b/go/storage/volume.go
@@ -17,14 +17,14 @@ const (
)
type SuperBlock struct {
- Version Version
- ReplicaType ReplicationType
+ Version Version
+ ReplicaPlacement *ReplicaPlacement
}
func (s *SuperBlock) Bytes() []byte {
header := make([]byte, SuperBlockSize)
header[0] = byte(s.Version)
- header[1] = s.ReplicaType.Byte()
+ header[1] = s.ReplicaPlacement.Byte()
return header
}
@@ -41,15 +41,15 @@ type Volume struct {
accessLock sync.Mutex
}
-func NewVolume(dirname string, collection string, id VolumeId, replicationType ReplicationType) (v *Volume, e error) {
+func NewVolume(dirname string, collection string, id VolumeId, replicaPlacement *ReplicaPlacement) (v *Volume, e error) {
v = &Volume{dir: dirname, Collection: collection, Id: id}
- v.SuperBlock = SuperBlock{ReplicaType: replicationType}
+ v.SuperBlock = SuperBlock{ReplicaPlacement: replicaPlacement}
e = v.load(true, true)
return
}
func loadVolumeWithoutIndex(dirname string, collection string, id VolumeId) (v *Volume, e error) {
v = &Volume{dir: dirname, Collection: collection, Id: id}
- v.SuperBlock = SuperBlock{ReplicaType: CopyNil}
+ v.SuperBlock = SuperBlock{}
e = v.load(false, false)
return
}
@@ -90,7 +90,7 @@ func (v *Volume) load(alsoLoadIndex bool, createDatIfMissing bool) error {
}
}
- if v.ReplicaType == CopyNil {
+ if v.ReplicaPlacement == nil {
e = v.readSuperBlock()
} else {
e = v.maybeWriteSuperBlock()
@@ -173,13 +173,13 @@ func (v *Volume) readSuperBlock() (err error) {
}
func ParseSuperBlock(header []byte) (superBlock SuperBlock, err error) {
superBlock.Version = Version(header[0])
- if superBlock.ReplicaType, err = NewReplicationTypeFromByte(header[1]); err != nil {
+ if superBlock.ReplicaPlacement, err = NewReplicaPlacementFromByte(header[1]); err != nil {
err = fmt.Errorf("cannot read replica type: %s", err.Error())
}
return
}
func (v *Volume) NeedToReplicate() bool {
- return v.ReplicaType.GetCopyCount() > 1
+ return v.ReplicaPlacement.GetCopyCount() > 1
}
func (v *Volume) isFileUnchanged(n *Needle) bool {
diff --git a/go/storage/volume_info.go b/go/storage/volume_info.go
index c8eb7612e..1dfb3dcae 100644
--- a/go/storage/volume_info.go
+++ b/go/storage/volume_info.go
@@ -5,7 +5,7 @@ import ()
type VolumeInfo struct {
Id VolumeId
Size uint64
- RepType ReplicationType
+ ReplicaPlacement *ReplicaPlacement
Collection string
Version Version
FileCount int
diff --git a/go/topology/collection.go b/go/topology/collection.go
index 0a7971424..8042369a9 100644
--- a/go/topology/collection.go
+++ b/go/topology/collection.go
@@ -13,17 +13,17 @@ type Collection struct {
func NewCollection(name string, volumeSizeLimit uint64) *Collection {
c := &Collection{Name: name, volumeSizeLimit: volumeSizeLimit}
- c.replicaType2VolumeLayout = make([]*VolumeLayout, storage.LengthRelicationType)
+ c.replicaType2VolumeLayout = make([]*VolumeLayout, storage.ReplicaPlacementCount)
return c
}
-func (c *Collection) GetOrCreateVolumeLayout(repType storage.ReplicationType) *VolumeLayout {
- replicationTypeIndex := repType.GetReplicationLevelIndex()
- if c.replicaType2VolumeLayout[replicationTypeIndex] == nil {
- glog.V(0).Infoln("collection", c.Name, "adding replication type", repType)
- c.replicaType2VolumeLayout[replicationTypeIndex] = NewVolumeLayout(repType, c.volumeSizeLimit)
+func (c *Collection) GetOrCreateVolumeLayout(rp *storage.ReplicaPlacement) *VolumeLayout {
+ replicaPlacementIndex := rp.GetReplicationLevelIndex()
+ if c.replicaType2VolumeLayout[replicaPlacementIndex] == nil {
+ glog.V(0).Infoln("collection", c.Name, "adding replication type", rp)
+ c.replicaType2VolumeLayout[replicaPlacementIndex] = NewVolumeLayout(rp, c.volumeSizeLimit)
}
- return c.replicaType2VolumeLayout[replicationTypeIndex]
+ return c.replicaType2VolumeLayout[replicaPlacementIndex]
}
func (c *Collection) Lookup(vid storage.VolumeId) []*DataNode {
diff --git a/go/topology/data_center.go b/go/topology/data_center.go
index a3b2b7d13..ebd07803b 100644
--- a/go/topology/data_center.go
+++ b/go/topology/data_center.go
@@ -29,6 +29,7 @@ func (dc *DataCenter) GetOrCreateRack(rackName string) *Rack {
func (dc *DataCenter) ToMap() interface{} {
m := make(map[string]interface{})
+ m["Id"] = dc.Id()
m["Max"] = dc.GetMaxVolumeCount()
m["Free"] = dc.FreeSpace()
var racks []interface{}
diff --git a/go/topology/node.go b/go/topology/node.go
index cfd6f6489..abe363b39 100644
--- a/go/topology/node.go
+++ b/go/topology/node.go
@@ -3,6 +3,8 @@ package topology
import (
"code.google.com/p/weed-fs/go/glog"
"code.google.com/p/weed-fs/go/storage"
+ "errors"
+ "math/rand"
)
type NodeId string
@@ -10,7 +12,7 @@ type Node interface {
Id() NodeId
String() string
FreeSpace() int
- ReserveOneVolume(r int, vid storage.VolumeId, dataCenter string) (bool, *DataNode)
+ ReserveOneVolume(r int) (*DataNode, error)
UpAdjustMaxVolumeCountDelta(maxVolumeCountDelta int)
UpAdjustVolumeCountDelta(volumeCountDelta int)
UpAdjustActiveVolumeCountDelta(activeVolumeCountDelta int)
@@ -47,6 +49,54 @@ type NodeImpl struct {
value interface{}
}
+// the first node must satisfy filterFirstNodeFn(), the rest nodes must have one free slot
+func (n *NodeImpl) RandomlyPickNodes(numberOfNodes int, filterFirstNodeFn func(dn Node) bool) (firstNode Node, restNodes []Node, err error) {
+ candidates := make([]Node, 0, len(n.children))
+ for _, node := range n.children {
+ if filterFirstNodeFn(node) {
+ candidates = append(candidates, node)
+ }
+ }
+ if len(candidates) == 0 {
+ return nil, nil, errors.New("No matching data node found!")
+ }
+ firstNode = candidates[rand.Intn(len(candidates))]
+ glog.V(2).Infoln(n.Id(), "picked main node:", firstNode.Id())
+
+ restNodes = make([]Node, numberOfNodes-1)
+ candidates = candidates[:0]
+ for _, node := range n.children {
+ if node.Id() == firstNode.Id() {
+ continue
+ }
+ if node.FreeSpace() <= 0 {
+ continue
+ }
+ glog.V(2).Infoln("select rest node candidate:", node.Id())
+ candidates = append(candidates, node)
+ }
+ glog.V(2).Infoln(n.Id(), "picking", numberOfNodes-1, "from rest", len(candidates), "node candidates")
+ ret := len(restNodes) == 0
+ for k, node := range candidates {
+ if k < len(restNodes) {
+ restNodes[k] = node
+ if k == len(restNodes)-1 {
+ ret = true
+ }
+ } else {
+ r := rand.Intn(k + 1)
+ if r < len(restNodes) {
+ restNodes[r] = node
+ }
+ }
+ }
+ if !ret {
+ glog.V(2).Infoln(n.Id(), "failed to pick", numberOfNodes-1, "from rest", len(candidates), "node candidates")
+ err = errors.New("Not enough data node found!")
+ }
+ return
+}
+
func (n *NodeImpl) IsDataNode() bool {
return n.nodeType == "DataNode"
}
@@ -80,32 +130,27 @@ func (n *NodeImpl) Parent() Node {
func (n *NodeImpl) GetValue() interface{} {
return n.value
}
-func (n *NodeImpl) ReserveOneVolume(r int, vid storage.VolumeId, dataCenter string) (bool, *DataNode) {
- ret := false
- var assignedNode *DataNode
+func (n *NodeImpl) ReserveOneVolume(r int) (assignedNode *DataNode, err error) {
for _, node := range n.children {
freeSpace := node.FreeSpace()
// fmt.Println("r =", r, ", node =", node, ", freeSpace =", freeSpace)
if freeSpace <= 0 {
continue
}
- if dataCenter != "" && node.IsDataCenter() && node.Id() != NodeId(dataCenter) {
- continue
- }
if r >= freeSpace {
r -= freeSpace
} else {
if node.IsDataNode() && node.FreeSpace() > 0 {
// fmt.Println("vid =", vid, " assigned to node =", node, ", freeSpace =", node.FreeSpace())
- return true, node.(*DataNode)
+ return node.(*DataNode), nil
}
- ret, assignedNode = node.ReserveOneVolume(r, vid, dataCenter)
- if ret {
- break
+ assignedNode, err = node.ReserveOneVolume(r)
+ if err != nil {
+ return
}
}
}
- return ret, assignedNode
+ return
}
func (n *NodeImpl) UpAdjustMaxVolumeCountDelta(maxVolumeCountDelta int) { //can be negative
diff --git a/go/topology/node_list.go b/go/topology/node_list.go
deleted file mode 100644
index bed151b54..000000000
--- a/go/topology/node_list.go
+++ /dev/null
@@ -1,83 +0,0 @@
-package topology
-
-import (
- "code.google.com/p/weed-fs/go/glog"
- "code.google.com/p/weed-fs/go/storage"
- "math/rand"
-)
-
-type NodeList struct {
- nodes map[NodeId]Node
- except map[string]Node
-}
-
-func NewNodeList(nodes map[NodeId]Node, except map[string]Node) *NodeList {
- m := make(map[NodeId]Node, len(nodes)-len(except))
- for _, n := range nodes {
- if except[n.String()] == nil {
- m[n.Id()] = n
- }
- }
- nl := &NodeList{nodes: m}
- return nl
-}
-
-func (nl *NodeList) FreeSpace() int {
- freeSpace := 0
- for _, n := range nl.nodes {
- freeSpace += n.FreeSpace()
- }
- return freeSpace
-}
-
-func (nl *NodeList) RandomlyPickN(count int, minSpace int, firstNodeName string) ([]Node, bool) {
- var list []Node
- var preferredNode *Node
- if firstNodeName != "" {
- for _, n := range nl.nodes {
- if n.Id() == NodeId(firstNodeName) && n.FreeSpace() >= minSpace {
- preferredNode = &n
- break
- }
- }
- if preferredNode == nil {
- return list, false
- }
- }
-
- for _, n := range nl.nodes {
- if n.FreeSpace() >= minSpace && n.Id() != NodeId(firstNodeName) {
- list = append(list, n)
- }
- }
- if count > len(list) || count == len(list) && firstNodeName != "" {
- return nil, false
- }
- for i := len(list); i > 0; i-- {
- r := rand.Intn(i)
- list[r], list[i-1] = list[i-1], list[r]
- }
- if firstNodeName != "" {
- list[0] = *preferredNode
- }
- return list[:count], true
-}
-
-func (nl *NodeList) ReserveOneVolume(randomVolumeIndex int, vid storage.VolumeId) (bool, *DataNode) {
- for _, node := range nl.nodes {
- freeSpace := node.FreeSpace()
- if randomVolumeIndex >= freeSpace {
- randomVolumeIndex -= freeSpace
- } else {
- if node.IsDataNode() && node.FreeSpace() > 0 {
- glog.V(0).Infoln("vid =", vid, " assigned to node =", node, ", freeSpace =", node.FreeSpace())
- return true, node.(*DataNode)
- }
- children := node.Children()
- newNodeList := NewNodeList(children, nl.except)
- return newNodeList.ReserveOneVolume(randomVolumeIndex, vid)
- }
- }
- return false, nil
-
-}
diff --git a/go/topology/node_list_test.go b/go/topology/node_list_test.go
deleted file mode 100644
index c526f55f8..000000000
--- a/go/topology/node_list_test.go
+++ /dev/null
@@ -1,60 +0,0 @@
-package topology
-
-import (
- "code.google.com/p/weed-fs/go/sequence"
- _ "fmt"
- "strconv"
- "testing"
-)
-
-func TestXYZ(t *testing.T) {
- topo, err := NewTopology("topo", "/etc/weed.conf", sequence.NewMemorySequencer(), 234, 5)
- if err != nil {
- t.Error("cannot create new topology:", err)
- t.FailNow()
- }
- for i := 0; i < 5; i++ {
- dc := NewDataCenter("dc" + strconv.Itoa(i))
- dc.activeVolumeCount = i
- dc.maxVolumeCount = 5
- topo.LinkChildNode(dc)
- }
- nl := NewNodeList(topo.Children(), nil)
-
- picked, ret := nl.RandomlyPickN(1, 0, "")
- if !ret || len(picked) != 1 {
- t.Error("need to randomly pick 1 node")
- }
-
- picked, ret = nl.RandomlyPickN(1, 0, "dc1")
- if !ret || len(picked) != 1 {
- t.Error("need to randomly pick 1 node")
- }
- if picked[0].Id() != "dc1" {
- t.Error("need to randomly pick 1 dc1 node")
- }
-
- picked, ret = nl.RandomlyPickN(2, 0, "dc1")
- if !ret || len(picked) != 2 {
- t.Error("need to randomly pick 1 node")
- }
- if picked[0].Id() != "dc1" {
- t.Error("need to randomly pick 2 with one dc1 node")
- }
-
- picked, ret = nl.RandomlyPickN(4, 0, "")
- if !ret || len(picked) != 4 {
- t.Error("need to randomly pick 4 nodes")
- }
-
- picked, ret = nl.RandomlyPickN(5, 0, "")
- if !ret || len(picked) != 5 {
- t.Error("need to randomly pick 5 nodes")
- }
-
- picked, ret = nl.RandomlyPickN(6, 0, "")
- if ret || len(picked) != 0 {
- t.Error("can not randomly pick 6 nodes:", ret, picked)
- }
-
-}
diff --git a/go/topology/rack.go b/go/topology/rack.go
index acc34417a..40e19dd0d 100644
--- a/go/topology/rack.go
+++ b/go/topology/rack.go
@@ -52,6 +52,7 @@ func (r *Rack) GetOrCreateDataNode(ip string, port int, publicUrl string, maxVol
func (rack *Rack) ToMap() interface{} {
m := make(map[string]interface{})
+ m["Id"] = rack.Id()
m["Max"] = rack.GetMaxVolumeCount()
m["Free"] = rack.FreeSpace()
var dns []interface{}
diff --git a/go/topology/topo_test.go b/go/topology/topo_test.go
index c0edca7c1..f3ae2096b 100644
--- a/go/topology/topo_test.go
+++ b/go/topology/topo_test.go
@@ -5,9 +5,7 @@ import (
"code.google.com/p/weed-fs/go/storage"
"encoding/json"
"fmt"
- "math/rand"
"testing"
- "time"
)
var topologyLayout = `
@@ -124,15 +122,3 @@ func TestRemoveDataCenter(t *testing.T) {
t.Fail()
}
}
-
-func TestReserveOneVolume(t *testing.T) {
- topo := setup(topologyLayout)
- rand.Seed(time.Now().UnixNano())
- rand.Seed(1)
- ret, node, vid := topo.RandomlyReserveOneVolume("dc1")
- if node.Parent().Parent().Id() != NodeId("dc1") {
- t.Fail()
- }
- fmt.Println("assigned :", ret, ", node :", node, ", volume id:", vid)
-
-}
diff --git a/go/topology/topology.go b/go/topology/topology.go
index 24b3ab337..1426f7a12 100644
--- a/go/topology/topology.go
+++ b/go/topology/topology.go
@@ -77,23 +77,13 @@ func (t *Topology) Lookup(collection string, vid storage.VolumeId) []*DataNode {
return nil
}
-func (t *Topology) RandomlyReserveOneVolume(dataCenter string) (bool, *DataNode, *storage.VolumeId) {
- if t.FreeSpace() <= 0 {
- glog.V(0).Infoln("Topology does not have free space left!")
- return false, nil, nil
- }
- vid := t.NextVolumeId()
- ret, node := t.ReserveOneVolume(rand.Intn(t.FreeSpace()), vid, dataCenter)
- return ret, node, &vid
-}
-
func (t *Topology) NextVolumeId() storage.VolumeId {
vid := t.GetMaxVolumeId()
return vid.Next()
}
-func (t *Topology) PickForWrite(collectionName string, repType storage.ReplicationType, count int, dataCenter string) (string, int, *DataNode, error) {
- vid, count, datanodes, err := t.GetVolumeLayout(collectionName, repType).PickForWrite(count, dataCenter)
+func (t *Topology) PickForWrite(collectionName string, rp *storage.ReplicaPlacement, count int, dataCenter string) (string, int, *DataNode, error) {
+ vid, count, datanodes, err := t.GetVolumeLayout(collectionName, rp).PickForWrite(count, dataCenter)
if err != nil || datanodes.Length() == 0 {
return "", 0, nil, errors.New("No writable volumes avalable!")
}
@@ -101,16 +91,16 @@ func (t *Topology) PickForWrite(collectionName string, repType storage.Replicati
return storage.NewFileId(*vid, fileId, rand.Uint32()).String(), count, datanodes.Head(), nil
}
-func (t *Topology) GetVolumeLayout(collectionName string, repType storage.ReplicationType) *VolumeLayout {
+func (t *Topology) GetVolumeLayout(collectionName string, rp *storage.ReplicaPlacement) *VolumeLayout {
_, ok := t.collectionMap[collectionName]
if !ok {
t.collectionMap[collectionName] = NewCollection(collectionName, t.volumeSizeLimit)
}
- return t.collectionMap[collectionName].GetOrCreateVolumeLayout(repType)
+ return t.collectionMap[collectionName].GetOrCreateVolumeLayout(rp)
}
func (t *Topology) RegisterVolumeLayout(v *storage.VolumeInfo, dn *DataNode) {
- t.GetVolumeLayout(v.Collection, v.RepType).RegisterVolume(v, dn)
+ t.GetVolumeLayout(v.Collection, v.ReplicaPlacement).RegisterVolume(v, dn)
}
func (t *Topology) RegisterVolumes(init bool, volumeInfos []storage.VolumeInfo, ip string, port int, publicUrl string, maxVolumeCount int, dcName string, rackName string) {
diff --git a/go/topology/topology_event_handling.go b/go/topology/topology_event_handling.go
index 5097e9874..5740c9a03 100644
--- a/go/topology/topology_event_handling.go
+++ b/go/topology/topology_event_handling.go
@@ -41,7 +41,7 @@ func (t *Topology) StartRefreshWritableVolumes(garbageThreshold string) {
}()
}
func (t *Topology) SetVolumeCapacityFull(volumeInfo storage.VolumeInfo) bool {
- vl := t.GetVolumeLayout(volumeInfo.Collection, volumeInfo.RepType)
+ vl := t.GetVolumeLayout(volumeInfo.Collection, volumeInfo.ReplicaPlacement)
if !vl.SetVolumeCapacityFull(volumeInfo.Id) {
return false
}
@@ -53,7 +53,7 @@ func (t *Topology) SetVolumeCapacityFull(volumeInfo storage.VolumeInfo) bool {
func (t *Topology) UnRegisterDataNode(dn *DataNode) {
for _, v := range dn.volumes {
glog.V(0).Infoln("Removing Volume", v.Id, "from the dead volume server", dn)
- vl := t.GetVolumeLayout(v.Collection, v.RepType)
+ vl := t.GetVolumeLayout(v.Collection, v.ReplicaPlacement)
vl.SetVolumeUnavailable(dn, v.Id)
}
dn.UpAdjustVolumeCountDelta(-dn.GetVolumeCount())
@@ -63,7 +63,7 @@ func (t *Topology) UnRegisterDataNode(dn *DataNode) {
}
func (t *Topology) RegisterRecoveredDataNode(dn *DataNode) {
for _, v := range dn.volumes {
- vl := t.GetVolumeLayout(v.Collection, v.RepType)
+ vl := t.GetVolumeLayout(v.Collection, v.ReplicaPlacement)
if vl.isWritable(&v) {
vl.SetVolumeAvailable(dn, v.Id)
}
diff --git a/go/topology/volume_layout.go b/go/topology/volume_layout.go
index 8877d7ccf..40628b4a0 100644
--- a/go/topology/volume_layout.go
+++ b/go/topology/volume_layout.go
@@ -9,16 +9,16 @@ import (
)
type VolumeLayout struct {
- repType storage.ReplicationType
+ rp *storage.ReplicaPlacement
vid2location map[storage.VolumeId]*VolumeLocationList
writables []storage.VolumeId // transient array of writable volume id
volumeSizeLimit uint64
accessLock sync.Mutex
}
-func NewVolumeLayout(repType storage.ReplicationType, volumeSizeLimit uint64) *VolumeLayout {
+func NewVolumeLayout(rp *storage.ReplicaPlacement, volumeSizeLimit uint64) *VolumeLayout {
return &VolumeLayout{
- repType: repType,
+ rp: rp,
vid2location: make(map[storage.VolumeId]*VolumeLocationList),
writables: *new([]storage.VolumeId),
volumeSizeLimit: volumeSizeLimit,
@@ -33,7 +33,7 @@ func (vl *VolumeLayout) RegisterVolume(v *storage.VolumeInfo, dn *DataNode) {
vl.vid2location[v.Id] = NewVolumeLocationList()
}
if vl.vid2location[v.Id].Add(dn) {
- if len(vl.vid2location[v.Id].list) == v.RepType.GetCopyCount() {
+ if len(vl.vid2location[v.Id].list) == v.ReplicaPlacement.GetCopyCount() {
if vl.isWritable(v) {
vl.writables = append(vl.writables, v.Id)
} else {
@@ -135,8 +135,8 @@ func (vl *VolumeLayout) SetVolumeUnavailable(dn *DataNode, vid storage.VolumeId)
defer vl.accessLock.Unlock()
if vl.vid2location[vid].Remove(dn) {
- if vl.vid2location[vid].Length() < vl.repType.GetCopyCount() {
- glog.V(0).Infoln("Volume", vid, "has", vl.vid2location[vid].Length(), "replica, less than required", vl.repType.GetCopyCount())
+ if vl.vid2location[vid].Length() < vl.rp.GetCopyCount() {
+ glog.V(0).Infoln("Volume", vid, "has", vl.vid2location[vid].Length(), "replica, less than required", vl.rp.GetCopyCount())
return vl.removeFromWritable(vid)
}
}
@@ -147,7 +147,7 @@ func (vl *VolumeLayout) SetVolumeAvailable(dn *DataNode, vid storage.VolumeId) b
defer vl.accessLock.Unlock()
if vl.vid2location[vid].Add(dn) {
- if vl.vid2location[vid].Length() >= vl.repType.GetCopyCount() {
+ if vl.vid2location[vid].Length() >= vl.rp.GetCopyCount() {
return vl.setVolumeWritable(vid)
}
}
@@ -164,7 +164,7 @@ func (vl *VolumeLayout) SetVolumeCapacityFull(vid storage.VolumeId) bool {
func (vl *VolumeLayout) ToMap() map[string]interface{} {
m := make(map[string]interface{})
- m["replication"] = vl.repType.String()
+ m["replication"] = vl.rp.String()
m["writables"] = vl.writables
//m["locations"] = vl.vid2location
return m
diff --git a/go/util/file_util.go b/go/util/file_util.go
index 46e404851..8444296d3 100644
--- a/go/util/file_util.go
+++ b/go/util/file_util.go
@@ -1,7 +1,7 @@
package util
import (
- "code.google.com/p/weed-fs/go/glog"
+ "code.google.com/p/weed-fs/go/glog"
"errors"
"os"
)
diff --git a/go/weed/compact.go b/go/weed/compact.go
index 2600b3362..580f3f98d 100644
--- a/go/weed/compact.go
+++ b/go/weed/compact.go
@@ -33,7 +33,7 @@ func runCompact(cmd *Command, args []string) bool {
}
vid := storage.VolumeId(*compactVolumeId)
- v, err := storage.NewVolume(*compactVolumePath, *compactVolumeCollection, vid, storage.CopyNil)
+ v, err := storage.NewVolume(*compactVolumePath, *compactVolumeCollection, vid, nil)
if err != nil {
glog.Fatalf("Load Volume [ERROR] %s\n", err)
}
diff --git a/go/weed/download.go b/go/weed/download.go
index 4f332bd2e..2e32f3ae9 100644
--- a/go/weed/download.go
+++ b/go/weed/download.go
@@ -49,7 +49,7 @@ func runDownload(cmd *Command, args []string) bool {
filename = fid
}
if strings.HasSuffix(filename, "-list") {
- filename = filename[0:len(filename)-len("-list")]
+ filename = filename[0 : len(filename)-len("-list")]
fids := strings.Split(string(content), "\n")
f, err := os.OpenFile(path.Join(*downloadDir, filename), os.O_WRONLY|os.O_CREATE|os.O_TRUNC, os.ModePerm)
if err != nil {
diff --git a/go/weed/export.go b/go/weed/export.go
index 2ab197652..02452950d 100644
--- a/go/weed/export.go
+++ b/go/weed/export.go
@@ -82,8 +82,8 @@ func runExport(cmd *Command, args []string) bool {
}
fileName := strconv.Itoa(*exportVolumeId)
- if *exportCollection!=""{
- fileName = *exportCollection + "_" + fileName
+ if *exportCollection != "" {
+ fileName = *exportCollection + "_" + fileName
}
vid := storage.VolumeId(*exportVolumeId)
indexFile, err := os.OpenFile(path.Join(*exportVolumePath, fileName+".idx"), os.O_RDONLY, 0644)
diff --git a/go/weed/master.go b/go/weed/master.go
index 97def1948..0d2866c68 100644
--- a/go/weed/master.go
+++ b/go/weed/master.go
@@ -27,18 +27,18 @@ var cmdMaster = &Command{
}
var (
- mport = cmdMaster.Flag.Int("port", 9333, "http listen port")
- masterIp = cmdMaster.Flag.String("ip", "", "master ip address")
- metaFolder = cmdMaster.Flag.String("mdir", os.TempDir(), "data directory to store meta data")
- masterPeers = cmdMaster.Flag.String("peers", "", "other master nodes in comma separated ip:port list")
- volumeSizeLimitMB = cmdMaster.Flag.Uint("volumeSizeLimitMB", 32*1000, "Default Volume Size in MegaBytes")
- mpulse = cmdMaster.Flag.Int("pulseSeconds", 5, "number of seconds between heartbeats")
- confFile = cmdMaster.Flag.String("conf", "/etc/weedfs/weedfs.conf", "xml configuration file")
- defaultRepType = cmdMaster.Flag.String("defaultReplicationType", "000", "Default replication type if not specified.")
- mReadTimeout = cmdMaster.Flag.Int("readTimeout", 3, "connection read timeout in seconds")
- mMaxCpu = cmdMaster.Flag.Int("maxCpu", 0, "maximum number of CPUs. 0 means all available CPUs")
- garbageThreshold = cmdMaster.Flag.String("garbageThreshold", "0.3", "threshold to vacuum and reclaim spaces")
- masterWhiteListOption = cmdMaster.Flag.String("whiteList", "", "comma separated Ip addresses having write permission. No limit if empty.")
+ mport = cmdMaster.Flag.Int("port", 9333, "http listen port")
+ masterIp = cmdMaster.Flag.String("ip", "", "master ip address")
+ metaFolder = cmdMaster.Flag.String("mdir", os.TempDir(), "data directory to store meta data")
+ masterPeers = cmdMaster.Flag.String("peers", "", "other master nodes in comma separated ip:port list")
+ volumeSizeLimitMB = cmdMaster.Flag.Uint("volumeSizeLimitMB", 32*1000, "Default Volume Size in MegaBytes")
+ mpulse = cmdMaster.Flag.Int("pulseSeconds", 5, "number of seconds between heartbeats")
+ confFile = cmdMaster.Flag.String("conf", "/etc/weedfs/weedfs.conf", "xml configuration file")
+ defaultReplicaPlacement = cmdMaster.Flag.String("defaultReplication", "000", "Default replication type if not specified.")
+ mReadTimeout = cmdMaster.Flag.Int("readTimeout", 3, "connection read timeout in seconds")
+ mMaxCpu = cmdMaster.Flag.Int("maxCpu", 0, "maximum number of CPUs. 0 means all available CPUs")
+ garbageThreshold = cmdMaster.Flag.String("garbageThreshold", "0.3", "threshold to vacuum and reclaim spaces")
+ masterWhiteListOption = cmdMaster.Flag.String("whiteList", "", "comma separated Ip addresses having write permission. No limit if empty.")
masterWhiteList []string
)
@@ -57,7 +57,7 @@ func runMaster(cmd *Command, args []string) bool {
r := mux.NewRouter()
ms := weed_server.NewMasterServer(r, VERSION, *mport, *metaFolder,
- *volumeSizeLimitMB, *mpulse, *confFile, *defaultRepType, *garbageThreshold, masterWhiteList,
+ *volumeSizeLimitMB, *mpulse, *confFile, *defaultReplicaPlacement, *garbageThreshold, masterWhiteList,
)
glog.V(0).Infoln("Start Weed Master", VERSION, "at port", *masterIp+":"+strconv.Itoa(*mport))
diff --git a/go/weed/server.go b/go/weed/server.go
index b1f5dc049..4cd800142 100644
--- a/go/weed/server.go
+++ b/go/weed/server.go
@@ -35,23 +35,23 @@ var cmdServer = &Command{
}
var (
- serverIp = cmdServer.Flag.String("ip", "localhost", "ip or server name")
- serverMaxCpu = cmdServer.Flag.Int("maxCpu", 0, "maximum number of CPUs. 0 means all available CPUs")
- serverReadTimeout = cmdServer.Flag.Int("readTimeout", 3, "connection read timeout in seconds. Increase this if uploading large files.")
- serverDataCenter = cmdServer.Flag.String("dataCenter", "", "current volume server's data center name")
- serverRack = cmdServer.Flag.String("rack", "", "current volume server's rack name")
- serverWhiteListOption = cmdServer.Flag.String("whiteList", "", "comma separated Ip addresses having write permission. No limit if empty.")
- serverPeers = cmdServer.Flag.String("peers", "", "other master nodes in comma separated ip:masterPort list")
- masterPort = cmdServer.Flag.Int("masterPort", 9333, "master server http listen port")
- masterMetaFolder = cmdServer.Flag.String("mdir", os.TempDir(), "data directory to store meta data")
- masterVolumeSizeLimitMB = cmdServer.Flag.Uint("volumeSizeLimitMB", 32*1000, "Default Volume Size in MegaBytes")
- masterConfFile = cmdServer.Flag.String("conf", "/etc/weedfs/weedfs.conf", "xml configuration file")
- masterDefaultRepType = cmdServer.Flag.String("defaultReplicationType", "000", "Default replication type if not specified.")
- volumePort = cmdServer.Flag.Int("port", 8080, "volume server http listen port")
- volumePublicUrl = cmdServer.Flag.String("publicUrl", "", "Publicly accessible <ip|server_name>:<port>")
- volumeDataFolders = cmdServer.Flag.String("dir", os.TempDir(), "directories to store data files. dir[,dir]...")
- volumeMaxDataVolumeCounts = cmdServer.Flag.String("max", "7", "maximum numbers of volumes, count[,count]...")
- volumePulse = cmdServer.Flag.Int("pulseSeconds", 5, "number of seconds between heartbeats, must be smaller than the master's setting")
+ serverIp = cmdServer.Flag.String("ip", "localhost", "ip or server name")
+ serverMaxCpu = cmdServer.Flag.Int("maxCpu", 0, "maximum number of CPUs. 0 means all available CPUs")
+ serverReadTimeout = cmdServer.Flag.Int("readTimeout", 3, "connection read timeout in seconds. Increase this if uploading large files.")
+ serverDataCenter = cmdServer.Flag.String("dataCenter", "", "current volume server's data center name")
+ serverRack = cmdServer.Flag.String("rack", "", "current volume server's rack name")
+ serverWhiteListOption = cmdServer.Flag.String("whiteList", "", "comma separated Ip addresses having write permission. No limit if empty.")
+ serverPeers = cmdServer.Flag.String("peers", "", "other master nodes in comma separated ip:masterPort list")
+ masterPort = cmdServer.Flag.Int("masterPort", 9333, "master server http listen port")
+ masterMetaFolder = cmdServer.Flag.String("mdir", "", "data directory to store meta data, default to same as -dir specified")
+ masterVolumeSizeLimitMB = cmdServer.Flag.Uint("volumeSizeLimitMB", 32*1000, "Default Volume Size in MegaBytes")
+ masterConfFile = cmdServer.Flag.String("conf", "/etc/weedfs/weedfs.conf", "xml configuration file")
+ masterDefaultReplicaPlacement = cmdServer.Flag.String("defaultReplicaPlacement", "000", "Default replication type if not specified.")
+ volumePort = cmdServer.Flag.Int("port", 8080, "volume server http listen port")
+ volumePublicUrl = cmdServer.Flag.String("publicUrl", "", "Publicly accessible <ip|server_name>:<port>")
+ volumeDataFolders = cmdServer.Flag.String("dir", os.TempDir(), "directories to store data files. dir[,dir]...")
+ volumeMaxDataVolumeCounts = cmdServer.Flag.String("max", "7", "maximum numbers of volumes, count[,count]...")
+ volumePulse = cmdServer.Flag.Int("pulseSeconds", 5, "number of seconds between heartbeats, must be smaller than the master's setting")
serverWhiteList []string
)
@@ -62,6 +62,9 @@ func runServer(cmd *Command, args []string) bool {
}
runtime.GOMAXPROCS(*serverMaxCpu)
+ if *masterMetaFolder == "" {
+ *masterMetaFolder = *volumeDataFolders
+ }
if err := util.TestFolderWritable(*masterMetaFolder); err != nil {
glog.Fatalf("Check Meta Folder (-mdir) Writable %s : %s", *masterMetaFolder, err)
}
@@ -95,7 +98,7 @@ func runServer(cmd *Command, args []string) bool {
go func() {
r := mux.NewRouter()
ms := weed_server.NewMasterServer(r, VERSION, *masterPort, *masterMetaFolder,
- *masterVolumeSizeLimitMB, *volumePulse, *masterConfFile, *masterDefaultRepType, *garbageThreshold, serverWhiteList,
+ *masterVolumeSizeLimitMB, *volumePulse, *masterConfFile, *masterDefaultReplicaPlacement, *garbageThreshold, serverWhiteList,
)
glog.V(0).Infoln("Start Weed Master", VERSION, "at port", *serverIp+":"+strconv.Itoa(*masterPort))
diff --git a/go/weed/version.go b/go/weed/version.go
index 561057778..9b6ca63e1 100644
--- a/go/weed/version.go
+++ b/go/weed/version.go
@@ -6,7 +6,7 @@ import (
)
const (
- VERSION = "0.46"
+ VERSION = "0.47"
)
var cmdVersion = &Command{
diff --git a/go/weed/weed_server/master_server.go b/go/weed/weed_server/master_server.go
index 738484ff0..c8a9bbbed 100644
--- a/go/weed/weed_server/master_server.go
+++ b/go/weed/weed_server/master_server.go
@@ -15,14 +15,14 @@ import (
)
type MasterServer struct {
- port int
- metaFolder string
- volumeSizeLimitMB uint
- pulseSeconds int
- defaultRepType string
- garbageThreshold string
- whiteList []string
- version string
+ port int
+ metaFolder string
+ volumeSizeLimitMB uint
+ pulseSeconds int
+ defaultReplicaPlacement string
+ garbageThreshold string
+ whiteList []string
+ version string
topo *topology.Topology
vg *replication.VolumeGrowth
@@ -35,17 +35,17 @@ func NewMasterServer(r *mux.Router, version string, port int, metaFolder string,
volumeSizeLimitMB uint,
pulseSeconds int,
confFile string,
- defaultRepType string,
+ defaultReplicaPlacement string,
garbageThreshold string,
whiteList []string,
) *MasterServer {
ms := &MasterServer{
- version: version,
- volumeSizeLimitMB: volumeSizeLimitMB,
- pulseSeconds: pulseSeconds,
- defaultRepType: defaultRepType,
- garbageThreshold: garbageThreshold,
- whiteList: whiteList,
+ version: version,
+ volumeSizeLimitMB: volumeSizeLimitMB,
+ pulseSeconds: pulseSeconds,
+ defaultReplicaPlacement: defaultReplicaPlacement,
+ garbageThreshold: garbageThreshold,
+ whiteList: whiteList,
}
seq := sequence.NewFileSequencer(path.Join(metaFolder, "weed.seq"))
var e error
diff --git a/go/weed/weed_server/master_server_handlers.go b/go/weed/weed_server/master_server_handlers.go
index 99e8fd1ef..f53d2b8ed 100644
--- a/go/weed/weed_server/master_server_handlers.go
+++ b/go/weed/weed_server/master_server_handlers.go
@@ -40,20 +40,20 @@ func (ms *MasterServer) dirAssignHandler(w http.ResponseWriter, r *http.Request)
if e != nil {
c = 1
}
- repType := r.FormValue("replication")
- if repType == "" {
- repType = ms.defaultRepType
+ replication := r.FormValue("replication")
+ if replication == "" {
+ replication = ms.defaultReplicaPlacement
}
collection := r.FormValue("collection")
dataCenter := r.FormValue("dataCenter")
- rt, err := storage.NewReplicationTypeFromString(repType)
+ replicaPlacement, err := storage.NewReplicaPlacementFromString(replication)
if err != nil {
w.WriteHeader(http.StatusNotAcceptable)
writeJsonQuiet(w, r, map[string]string{"error": err.Error()})
return
}
- if ms.topo.GetVolumeLayout(collection, rt).GetActiveVolumeCount(dataCenter) <= 0 {
+ if ms.topo.GetVolumeLayout(collection, replicaPlacement).GetActiveVolumeCount(dataCenter) <= 0 {
if ms.topo.FreeSpace() <= 0 {
w.WriteHeader(http.StatusNotFound)
writeJsonQuiet(w, r, map[string]string{"error": "No free volumes left!"})
@@ -61,15 +61,15 @@ func (ms *MasterServer) dirAssignHandler(w http.ResponseWriter, r *http.Request)
} else {
ms.vgLock.Lock()
defer ms.vgLock.Unlock()
- if ms.topo.GetVolumeLayout(collection, rt).GetActiveVolumeCount(dataCenter) <= 0 {
- if _, err = ms.vg.AutomaticGrowByType(collection, rt, dataCenter, ms.topo); err != nil {
+ if ms.topo.GetVolumeLayout(collection, replicaPlacement).GetActiveVolumeCount(dataCenter) <= 0 {
+ if _, err = ms.vg.AutomaticGrowByType(collection, replicaPlacement, dataCenter, ms.topo); err != nil {
writeJsonQuiet(w, r, map[string]string{"error": "Cannot grow volume group! " + err.Error()})
return
}
}
}
}
- fid, count, dn, err := ms.topo.PickForWrite(collection, rt, c, dataCenter)
+ fid, count, dn, err := ms.topo.PickForWrite(collection, replicaPlacement, c, dataCenter)
if err == nil {
writeJsonQuiet(w, r, map[string]interface{}{"fid": fid, "url": dn.Url(), "publicUrl": dn.PublicUrl, "count": count})
} else {
@@ -119,13 +119,13 @@ func (ms *MasterServer) volumeVacuumHandler(w http.ResponseWriter, r *http.Reque
func (ms *MasterServer) volumeGrowHandler(w http.ResponseWriter, r *http.Request) {
count := 0
- rt, err := storage.NewReplicationTypeFromString(r.FormValue("replication"))
+ replicaPlacement, err := storage.NewReplicaPlacementFromString(r.FormValue("replication"))
if err == nil {
if count, err = strconv.Atoi(r.FormValue("count")); err == nil {
- if ms.topo.FreeSpace() < count*rt.GetCopyCount() {
- err = errors.New("Only " + strconv.Itoa(ms.topo.FreeSpace()) + " volumes left! Not enough for " + strconv.Itoa(count*rt.GetCopyCount()))
+ if ms.topo.FreeSpace() < count*replicaPlacement.GetCopyCount() {
+ err = errors.New("Only " + strconv.Itoa(ms.topo.FreeSpace()) + " volumes left! Not enough for " + strconv.Itoa(count*replicaPlacement.GetCopyCount()))
} else {
- count, err = ms.vg.GrowByCountAndType(count, r.FormValue("collection"), rt, r.FormValue("dataCneter"), ms.topo)
+ count, err = ms.vg.GrowByCountAndType(count, r.FormValue("collection"), replicaPlacement, r.FormValue("dataCneter"), ms.topo)
}
} else {
err = errors.New("parameter count is not found")
diff --git a/go/weed/weed_server/volume_server.go b/go/weed/weed_server/volume_server.go
index f42585da2..a2939f848 100644
--- a/go/weed/weed_server/volume_server.go
+++ b/go/weed/weed_server/volume_server.go
@@ -61,7 +61,7 @@ func NewVolumeServer(r *http.ServeMux, version string, ip string, port int, publ
if connected {
time.Sleep(time.Duration(float32(vs.pulseSeconds*1e3)*(1+rand.Float32())) * time.Millisecond)
} else {
- time.Sleep(time.Duration(float32(vs.pulseSeconds*1e3)* 0.25) * time.Millisecond)
+ time.Sleep(time.Duration(float32(vs.pulseSeconds*1e3)*0.25) * time.Millisecond)
}
}
}()
diff --git a/note/replication.txt b/note/replication.txt
index c4bf46044..a151e80c3 100644
--- a/note/replication.txt
+++ b/note/replication.txt
@@ -59,11 +59,6 @@ If any "assign" request comes in
3. return a writable volume to the user
-Plan:
- Step 1. implement one copy(no replication), automatically assign volume ids
- Step 2. add replication
-
-For the above operations, here are the todo list:
for data node:
0. detect existing volumes DONE
1. onStartUp, and periodically, send existing volumes and maxVolumeCount store.Join(), DONE
@@ -77,10 +72,38 @@ For the above operations, here are the todo list:
1. accept data node's report of existing volumes and maxVolumeCount ALREADY EXISTS /dir/join
2. periodically refresh for active data nodes, and adjust writable volumes
3. send command to grow a volume(id + replication level) DONE
- 4. NOT_IMPLEMENTING: if dead/stale data nodes are found, for the affected volumes, send stale info
- to other data nodes. BECAUSE the master will stop sending writes to these data nodes
5. accept lookup for volume locations ALREADY EXISTS /dir/lookup
6. read topology/datacenter/rack layout
+An algorithm to allocate volumes evenly, but may be inefficient if free volumes are plenty:
+input: replication=xyz
+algorithm:
+ret_dcs = []
+foreach dc that has y+z+1 volumes{
+ ret_racks = []
+ foreach rack with z+1 volumes{
+ ret = select z+1 servers with 1 volume
+ if ret.size()==z+1 {
+ ret_racks.append(ret)
+ }
+ }
+ randomly pick one rack from ret_racks
+ ret += select y racks with 1 volume each
+ if ret.size()==y+z+1{
+ ret_dcs.append(ret)
+ }
+}
+randomly pick one dc from ret_dcs
+ret += select x data centers with 1 volume each
+
+A simple replica placement algorithm, but may fail when free volume slots are not plenty:
+ret := []volumes
+dc = randomly pick 1 data center with y+z+1 volumes
+ rack = randomly pick 1 rack with z+1 volumes
+ ret = ret.append(randomly pick z+1 volumes)
+ ret = ret.append(randomly pick y racks with 1 volume)
+ret = ret.append(randomly pick x data centers with 1 volume)
+
+
TODO:
1. replicate content to the other server if the replication type needs replicas