aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorChris Lu <chris.lu@gmail.com>2012-09-30 02:20:33 -0700
committerChris Lu <chris.lu@gmail.com>2012-09-30 02:20:33 -0700
commit6ce41e30a47666792fa3c4346f5c33d360d25930 (patch)
tree5d933846a559875a7f5f46da17a8240ca6d157f3
parent2fe43718994a034e655ec2684b3dcf5ab094eeb2 (diff)
downloadseaweedfs-6ce41e30a47666792fa3c4346f5c33d360d25930.tar.xz
seaweedfs-6ce41e30a47666792fa3c4346f5c33d360d25930.zip
change to 3-digit replication types
-rw-r--r--weed-fs/src/cmd/weed/master.go11
-rw-r--r--weed-fs/src/cmd/weed/upload.go2
-rw-r--r--weed-fs/src/cmd/weed/volume.go5
-rw-r--r--weed-fs/src/pkg/replication/volume_growth.go121
-rw-r--r--weed-fs/src/pkg/replication/volume_growth_test.go19
-rw-r--r--weed-fs/src/pkg/storage/compact_map_test.go2
-rw-r--r--weed-fs/src/pkg/storage/store.go2
-rw-r--r--weed-fs/src/pkg/storage/volume.go4
-rw-r--r--weed-fs/src/pkg/storage/volume_info.go126
-rw-r--r--weed-fs/src/pkg/topology/node_list.go4
10 files changed, 194 insertions, 102 deletions
diff --git a/weed-fs/src/cmd/weed/master.go b/weed-fs/src/cmd/weed/master.go
index 7751a4819..a87c8f312 100644
--- a/weed-fs/src/cmd/weed/master.go
+++ b/weed-fs/src/cmd/weed/master.go
@@ -33,7 +33,7 @@ var (
volumeSizeLimitMB = cmdMaster.Flag.Uint("volumeSizeLimitMB", 32*1024, "Default Volume Size in MegaBytes")
mpulse = cmdMaster.Flag.Int("pulseSeconds", 5, "number of seconds between heartbeats")
confFile = cmdMaster.Flag.String("conf", "/etc/weedfs/weedfs.conf", "xml configuration file")
- defaultRepType = cmdMaster.Flag.String("defaultReplicationType", "00", "Default replication type if not specified.")
+ defaultRepType = cmdMaster.Flag.String("defaultReplicationType", "000", "Default replication type if not specified.")
mReadTimeout = cmdMaster.Flag.Int("readTimeout", 5, "connection read timeout in seconds")
)
@@ -70,7 +70,7 @@ func dirAssignHandler(w http.ResponseWriter, r *http.Request) {
if repType == "" {
repType = *defaultRepType
}
- rt, err := storage.NewReplicationType(repType)
+ rt, err := storage.NewReplicationTypeFromString(repType)
if err != nil {
writeJson(w, r, map[string]string{"error": err.Error()})
return
@@ -107,12 +107,15 @@ func dirJoinHandler(w http.ResponseWriter, r *http.Request) {
}
func dirStatusHandler(w http.ResponseWriter, r *http.Request) {
- writeJson(w, r, topo.ToMap())
+ m := make(map[string]interface{})
+ m["Version"] = VERSION
+ m["Topology"] = topo.ToMap()
+ writeJson(w, r, m)
}
func volumeGrowHandler(w http.ResponseWriter, r *http.Request) {
count := 0
- rt, err := storage.NewReplicationType(r.FormValue("replication"))
+ rt, err := storage.NewReplicationTypeFromString(r.FormValue("replication"))
if err == nil {
if count, err = strconv.Atoi(r.FormValue("count")); err == nil {
if topo.FreeSpace() < count*rt.GetCopyCount() {
diff --git a/weed-fs/src/cmd/weed/upload.go b/weed-fs/src/cmd/weed/upload.go
index 882459c05..e25930b5d 100644
--- a/weed-fs/src/cmd/weed/upload.go
+++ b/weed-fs/src/cmd/weed/upload.go
@@ -17,7 +17,7 @@ func init() {
cmdUpload.Run = runUpload // break init cycle
IsDebug = cmdUpload.Flag.Bool("debug", false, "verbose debug information")
server = cmdUpload.Flag.String("server", "localhost:9333", "weedfs master location")
- uploadReplication = cmdUpload.Flag.String("replication", "00", "replication type(00,01,10,11)")
+ uploadReplication = cmdUpload.Flag.String("replication", "000", "replication type(000,001,010,100,110,200)")
}
var cmdUpload = &Command{
diff --git a/weed-fs/src/cmd/weed/volume.go b/weed-fs/src/cmd/weed/volume.go
index a9ab85c98..38946396e 100644
--- a/weed-fs/src/cmd/weed/volume.go
+++ b/weed-fs/src/cmd/weed/volume.go
@@ -41,7 +41,10 @@ var (
)
func statusHandler(w http.ResponseWriter, r *http.Request) {
- writeJson(w, r, store.Status())
+ m := make(map[string]interface{})
+ m["Version"] = VERSION
+ m["Volumes"] = store.Status()
+ writeJson(w, r, m)
}
func assignVolumeHandler(w http.ResponseWriter, r *http.Request) {
err := store.AddVolume(r.FormValue("volume"), r.FormValue("replicationType"))
diff --git a/weed-fs/src/pkg/replication/volume_growth.go b/weed-fs/src/pkg/replication/volume_growth.go
index 39597f2a6..3942c5743 100644
--- a/weed-fs/src/pkg/replication/volume_growth.go
+++ b/weed-fs/src/pkg/replication/volume_growth.go
@@ -4,9 +4,9 @@ import (
"errors"
"fmt"
"math/rand"
+ "pkg/operation"
"pkg/storage"
"pkg/topology"
- "pkg/operation"
)
/*
@@ -30,15 +30,17 @@ func NewDefaultVolumeGrowth() *VolumeGrowth {
func (vg *VolumeGrowth) GrowByType(repType storage.ReplicationType, topo *topology.Topology) (int, error) {
switch repType {
- case storage.Copy00:
+ case storage.Copy000:
return vg.GrowByCountAndType(vg.copy1factor, repType, topo)
- case storage.Copy10:
+ case storage.Copy001:
return vg.GrowByCountAndType(vg.copy2factor, repType, topo)
- case storage.Copy20:
- return vg.GrowByCountAndType(vg.copy3factor, repType, topo)
- case storage.Copy01:
+ case storage.Copy010:
return vg.GrowByCountAndType(vg.copy2factor, repType, topo)
- case storage.Copy11:
+ case storage.Copy100:
+ return vg.GrowByCountAndType(vg.copy2factor, repType, topo)
+ case storage.Copy110:
+ return vg.GrowByCountAndType(vg.copy3factor, repType, topo)
+ case storage.Copy200:
return vg.GrowByCountAndType(vg.copy3factor, repType, topo)
}
return 0, errors.New("Unknown Replication Type!")
@@ -46,7 +48,7 @@ func (vg *VolumeGrowth) GrowByType(repType storage.ReplicationType, topo *topolo
func (vg *VolumeGrowth) GrowByCountAndType(count int, repType storage.ReplicationType, topo *topology.Topology) (counter int, err error) {
counter = 0
switch repType {
- case storage.Copy00:
+ case storage.Copy000:
for i := 0; i < count; i++ {
if ok, server, vid := topo.RandomlyReserveOneVolume(); ok {
if err = vg.grow(topo, *vid, repType, server); err == nil {
@@ -54,10 +56,45 @@ func (vg *VolumeGrowth) GrowByCountAndType(count int, repType storage.Replicatio
}
}
}
- case storage.Copy10:
+ case storage.Copy001:
+ for i := 0; i < count; i++ {
+ //randomly pick one server, and then choose from the same rack
+ if ok, server1, vid := topo.RandomlyReserveOneVolume(); ok {
+ rack := server1.Parent()
+ exclusion := make(map[string]topology.Node)
+ exclusion[server1.String()] = server1
+ newNodeList := topology.NewNodeList(rack.Children(), exclusion)
+ if newNodeList.FreeSpace() > 0 {
+ if ok2, server2 := newNodeList.ReserveOneVolume(rand.Intn(newNodeList.FreeSpace()), *vid); ok2 {
+ if err = vg.grow(topo, *vid, repType, server1, server2); err == nil {
+ counter++
+ }
+ }
+ }
+ }
+ }
+ case storage.Copy010:
+ for i := 0; i < count; i++ {
+ //randomly pick one server, and then choose from the same rack
+ if ok, server1, vid := topo.RandomlyReserveOneVolume(); ok {
+ rack := server1.Parent()
+ dc := rack.Parent()
+ exclusion := make(map[string]topology.Node)
+ exclusion[rack.String()] = rack
+ newNodeList := topology.NewNodeList(dc.Children(), exclusion)
+ if newNodeList.FreeSpace() > 0 {
+ if ok2, server2 := newNodeList.ReserveOneVolume(rand.Intn(newNodeList.FreeSpace()), *vid); ok2 {
+ if err = vg.grow(topo, *vid, repType, server1, server2); err == nil {
+ counter++
+ }
+ }
+ }
+ }
+ }
+ case storage.Copy100:
for i := 0; i < count; i++ {
nl := topology.NewNodeList(topo.Children(), nil)
- picked, ret := nl.RandomlyPickN(2)
+ picked, ret := nl.RandomlyPickN(2, 1)
vid := topo.NextVolumeId()
if ret {
var servers []*topology.DataNode
@@ -69,61 +106,77 @@ func (vg *VolumeGrowth) GrowByCountAndType(count int, repType storage.Replicatio
}
}
if len(servers) == 2 {
- if err = vg.grow(topo, vid, repType, servers[0], servers[1]); err == nil {
+ if err = vg.grow(topo, vid, repType, servers...); err == nil {
counter++
}
}
}
}
- case storage.Copy20:
+ case storage.Copy110:
for i := 0; i < count; i++ {
nl := topology.NewNodeList(topo.Children(), nil)
- picked, ret := nl.RandomlyPickN(3)
+ picked, ret := nl.RandomlyPickN(2, 2)
vid := topo.NextVolumeId()
if ret {
var servers []*topology.DataNode
- for _, n := range picked {
- if n.FreeSpace() > 0 {
- if ok, server := n.ReserveOneVolume(rand.Intn(n.FreeSpace()), vid); ok {
- servers = append(servers, server)
+ dc1, dc2 := picked[0], picked[1]
+ if dc2.FreeSpace() > dc1.FreeSpace() {
+ dc1, dc2 = dc2, dc1
+ }
+ if dc1.FreeSpace() > 0 {
+ if ok, server1 := dc1.ReserveOneVolume(rand.Intn(dc1.FreeSpace()), vid); ok {
+ servers = append(servers, server1)
+ rack := server1.Parent()
+ exclusion := make(map[string]topology.Node)
+ exclusion[rack.String()] = rack
+ newNodeList := topology.NewNodeList(dc1.Children(), exclusion)
+ if newNodeList.FreeSpace() > 0 {
+ if ok2, server2 := newNodeList.ReserveOneVolume(rand.Intn(newNodeList.FreeSpace()), vid); ok2 {
+ servers = append(servers, server2)
+ }
}
}
}
+ if dc2.FreeSpace() > 0 {
+ if ok, server := dc2.ReserveOneVolume(rand.Intn(dc2.FreeSpace()), vid); ok {
+ servers = append(servers, server)
+ }
+ }
if len(servers) == 3 {
- if err = vg.grow(topo, vid, repType, servers[0], servers[1], servers[2]); err == nil {
+ if err = vg.grow(topo, vid, repType, servers...); err == nil {
counter++
}
}
}
}
- case storage.Copy01:
+ case storage.Copy200:
for i := 0; i < count; i++ {
- //randomly pick one server, and then choose from the same rack
- if ok, server1, vid := topo.RandomlyReserveOneVolume(); ok {
- rack := server1.Parent()
- exclusion := make(map[string]topology.Node)
- exclusion[server1.String()] = server1
- newNodeList := topology.NewNodeList(rack.Children(), exclusion)
- if newNodeList.FreeSpace() > 0 {
- if ok2, server2 := newNodeList.ReserveOneVolume(rand.Intn(newNodeList.FreeSpace()), *vid); ok2 {
- if err = vg.grow(topo, *vid, repType, server1, server2); err == nil {
- counter++
+ nl := topology.NewNodeList(topo.Children(), nil)
+ picked, ret := nl.RandomlyPickN(3, 1)
+ vid := topo.NextVolumeId()
+ if ret {
+ var servers []*topology.DataNode
+ for _, n := range picked {
+ if n.FreeSpace() > 0 {
+ if ok, server := n.ReserveOneVolume(rand.Intn(n.FreeSpace()), vid); ok {
+ servers = append(servers, server)
}
}
}
+ if len(servers) == 3 {
+ if err = vg.grow(topo, vid, repType, servers...); err == nil {
+ counter++
+ }
+ }
}
}
- case storage.Copy11:
- for i := 0; i < count; i++ {
- }
- err = errors.New("Replication Type Not Implemented Yet!")
}
return
}
func (vg *VolumeGrowth) grow(topo *topology.Topology, vid storage.VolumeId, repType storage.ReplicationType, servers ...*topology.DataNode) error {
for _, server := range servers {
if err := operation.AllocateVolume(server, vid, repType); err == nil {
- vi := storage.VolumeInfo{Id: vid, Size: 0, RepType:repType}
+ vi := storage.VolumeInfo{Id: vid, Size: 0, RepType: repType}
server.AddOrUpdateVolume(vi)
topo.RegisterVolumeLayout(&vi, server)
fmt.Println("Created Volume", vid, "on", server)
diff --git a/weed-fs/src/pkg/replication/volume_growth_test.go b/weed-fs/src/pkg/replication/volume_growth_test.go
index 5068577b1..3cebc62db 100644
--- a/weed-fs/src/pkg/replication/volume_growth_test.go
+++ b/weed-fs/src/pkg/replication/volume_growth_test.go
@@ -80,7 +80,7 @@ func setup(topologyLayout string) *topology.Topology {
fmt.Println("data:", data)
//need to connect all nodes first before server adding volumes
- topo := topology.NewTopology("mynetwork","/tmp","testing",32*1024, 5)
+ topo := topology.NewTopology("mynetwork","/etc/weedfs/weedfs.conf","/tmp","testing",32*1024, 5)
mTopology := data.(map[string]interface{})
for dcKey, dcValue := range mTopology {
dc := topology.NewDataCenter(dcKey)
@@ -96,7 +96,7 @@ func setup(topologyLayout string) *topology.Topology {
rack.LinkChildNode(server)
for _, v := range serverMap["volumes"].([]interface{}) {
m := v.(map[string]interface{})
- vi := &storage.VolumeInfo{Id: storage.VolumeId(int64(m["id"].(float64))), Size: int64(m["size"].(float64))}
+ vi := storage.VolumeInfo{Id: storage.VolumeId(int64(m["id"].(float64))), Size: int64(m["size"].(float64))}
server.AddOrUpdateVolume(vi)
}
server.UpAdjustMaxVolumeCountDelta(int(serverMap["limit"].(float64)))
@@ -104,8 +104,6 @@ func setup(topologyLayout string) *topology.Topology {
}
}
- fmt.Println("topology:", *topo)
-
return topo
}
@@ -125,15 +123,8 @@ func TestReserveOneVolume(t *testing.T) {
topo := setup(topologyLayout)
rand.Seed(time.Now().UnixNano())
vg:=&VolumeGrowth{copy1factor:3,copy2factor:2,copy3factor:1,copyAll:4}
- vg.GrowByType(storage.Copy20,topo)
+ if c, e := vg.GrowByCountAndType(1,storage.Copy000,topo);e==nil{
+ t.Log("reserved", c)
+ }
}
-
-func TestXYZ(t *testing.T) {
- dn := topology.NewDataNode("server1")
- dn.Ip = "localhost"
- dn.Port = 8080
- vid, _:= storage.NewVolumeId("600")
- out := AllocateVolume(dn,vid,storage.Copy00)
- fmt.Println(out)
-}
diff --git a/weed-fs/src/pkg/storage/compact_map_test.go b/weed-fs/src/pkg/storage/compact_map_test.go
index 6c3bb6e96..c05515b29 100644
--- a/weed-fs/src/pkg/storage/compact_map_test.go
+++ b/weed-fs/src/pkg/storage/compact_map_test.go
@@ -44,8 +44,6 @@ func TestXYZ(t *testing.T) {
}
}
- //println("cm.list =", len(m.list))
-
for i := uint32(10 * batch); i < 100*batch; i++ {
v, ok := m.Get(Key(i))
if i%37 == 0 {
diff --git a/weed-fs/src/pkg/storage/store.go b/weed-fs/src/pkg/storage/store.go
index b8337d51f..7ebc028ba 100644
--- a/weed-fs/src/pkg/storage/store.go
+++ b/weed-fs/src/pkg/storage/store.go
@@ -29,7 +29,7 @@ func NewStore(port int, ip, publicUrl, dirname string, maxVolumeCount int) (s *S
return
}
func (s *Store) AddVolume(volumeListString string, replicationType string) error {
- rt, e := NewReplicationType(replicationType)
+ rt, e := NewReplicationTypeFromString(replicationType)
if e != nil {
return e
}
diff --git a/weed-fs/src/pkg/storage/volume.go b/weed-fs/src/pkg/storage/volume.go
index 40b322787..0e6095089 100644
--- a/weed-fs/src/pkg/storage/volume.go
+++ b/weed-fs/src/pkg/storage/volume.go
@@ -61,7 +61,7 @@ func (v *Volume) maybeWriteSuperBlock() {
if stat.Size() == 0 {
header := make([]byte, SuperBlockSize)
header[0] = 1
- header[1] = byte(v.replicaType)
+ header[1] = v.replicaType.Byte()
v.dataFile.Write(header)
}
}
@@ -69,7 +69,7 @@ func (v *Volume) readSuperBlock() {
v.dataFile.Seek(0, 0)
header := make([]byte, SuperBlockSize)
if _, error := v.dataFile.Read(header); error == nil {
- v.replicaType = ReplicationType(header[1])
+ v.replicaType, _ = NewReplicationTypeFromByte(header[1])
}
}
diff --git a/weed-fs/src/pkg/storage/volume_info.go b/weed-fs/src/pkg/storage/volume_info.go
index ca8e3372e..3f96db21b 100644
--- a/weed-fs/src/pkg/storage/volume_info.go
+++ b/weed-fs/src/pkg/storage/volume_info.go
@@ -9,75 +9,119 @@ type VolumeInfo struct {
Size int64
RepType ReplicationType
}
-type ReplicationType byte
+type ReplicationType string
const (
- Copy00 = ReplicationType(00) // single copy
- Copy01 = ReplicationType(01) // 2 copies, each on different racks, same data center
- Copy10 = ReplicationType(10) // 2 copies, each on different data center
- Copy11 = ReplicationType(11) // 3 copies, 2 on different racks and local data center, 1 on different data center
- Copy20 = ReplicationType(20) // 3 copies, each on dffereint data center
- LengthRelicationType = 5
+ Copy000 = ReplicationType("000") // single copy
+ Copy001 = ReplicationType("001") // 2 copies, both on the same racks, and same data center
+ Copy010 = ReplicationType("010") // 2 copies, both on different racks, but same data center
+ Copy100 = ReplicationType("100") // 2 copies, each on different data center
+ Copy110 = ReplicationType("110") // 3 copies, 2 on different racks and local data center, 1 on different data center
+ Copy200 = ReplicationType("200") // 3 copies, each on dffereint data center
+ LengthRelicationType = 6
CopyNil = ReplicationType(255) // nil value
)
-func NewReplicationType(t string) (ReplicationType, error) {
+func NewReplicationTypeFromString(t string) (ReplicationType, error) {
switch t {
- case "00":
- return Copy00, nil
- case "01":
- return Copy01, nil
- case "10":
- return Copy10, nil
- case "11":
- return Copy11, nil
- case "20":
- return Copy20, nil
+ case "000":
+ return Copy000, nil
+ case "001":
+ return Copy001, nil
+ case "010":
+ return Copy010, nil
+ case "100":
+ return Copy100, nil
+ case "110":
+ return Copy110, nil
+ case "200":
+ return Copy200, nil
}
- return Copy00, errors.New("Unknown Replication Type:"+t)
+ return Copy000, errors.New("Unknown Replication Type:"+t)
}
+func NewReplicationTypeFromByte(b byte) (ReplicationType, error) {
+ switch b {
+ case byte(000):
+ return Copy000, nil
+ case byte(001):
+ return Copy001, nil
+ case byte(010):
+ return Copy010, nil
+ case byte(100):
+ return Copy100, nil
+ case byte(110):
+ return Copy110, nil
+ case byte(200):
+ return Copy200, nil
+ }
+ return Copy000, errors.New("Unknown Replication Type:"+string(b))
+}
+
func (r *ReplicationType) String() string {
switch *r {
- case Copy00:
- return "00"
- case Copy01:
- return "01"
- case Copy10:
- return "10"
- case Copy11:
- return "11"
- case Copy20:
- return "20"
+ case Copy000:
+ return "000"
+ case Copy001:
+ return "001"
+ case Copy010:
+ return "010"
+ case Copy100:
+ return "100"
+ case Copy110:
+ return "110"
+ case Copy200:
+ return "200"
}
- return "00"
+ return "000"
+}
+func (r *ReplicationType) Byte() byte {
+ switch *r {
+ case Copy000:
+ return byte(000)
+ case Copy001:
+ return byte(001)
+ case Copy010:
+ return byte(010)
+ case Copy100:
+ return byte(100)
+ case Copy110:
+ return byte(110)
+ case Copy200:
+ return byte(200)
+ }
+ return byte(000)
}
func (repType ReplicationType)GetReplicationLevelIndex() int {
switch repType {
- case Copy00:
+ case Copy000:
return 0
- case Copy01:
+ case Copy001:
return 1
- case Copy10:
- return 2
- case Copy11:
+ case Copy010:
+ return 2
+ case Copy100:
return 3
- case Copy20:
+ case Copy110:
return 4
+ case Copy200:
+ return 5
}
return -1
}
func (repType ReplicationType)GetCopyCount() int {
switch repType {
- case Copy00:
+ case Copy000:
return 1
- case Copy01:
+ case Copy001:
return 2
- case Copy10:
+ case Copy010:
+ return 2
+ case Copy100:
return 2
- case Copy11:
+ case Copy110:
return 3
- case Copy20:
+ case Copy200:
return 3
}
return 0
diff --git a/weed-fs/src/pkg/topology/node_list.go b/weed-fs/src/pkg/topology/node_list.go
index 396f1b16a..1d9e1891a 100644
--- a/weed-fs/src/pkg/topology/node_list.go
+++ b/weed-fs/src/pkg/topology/node_list.go
@@ -30,10 +30,10 @@ func (nl *NodeList) FreeSpace() int {
return freeSpace
}
-func (nl *NodeList) RandomlyPickN(n int) ([]Node, bool) {
+func (nl *NodeList) RandomlyPickN(n int, min int) ([]Node, bool) {
var list []Node
for _, n := range nl.nodes {
- if n.FreeSpace() > 0 {
+ if n.FreeSpace() >= min {
list = append(list, n)
}
}