aboutsummaryrefslogtreecommitdiff
path: root/weed-fs/src
diff options
context:
space:
mode:
authorChris Lu <chris.lu@gmail.com>2012-09-16 17:31:15 -0700
committerChris Lu <chris.lu@gmail.com>2012-09-16 17:31:15 -0700
commit9b99240584fa2ea47ca3c7f42f017e24f648a7ab (patch)
treed7aca7b1e1cd203f7cf134b68c6ffaa382522524 /weed-fs/src
parente7c4ee1c64a0b523d7fcfd9f1620c85bf3b642f4 (diff)
downloadseaweedfs-9b99240584fa2ea47ca3c7f42f017e24f648a7ab.tar.xz
seaweedfs-9b99240584fa2ea47ca3c7f42f017e24f648a7ab.zip
can auto grow 00 and 01 replication volumes
Diffstat (limited to 'weed-fs/src')
-rw-r--r--weed-fs/src/cmd/weed/master.go61
-rw-r--r--weed-fs/src/cmd/weed/upload.go15
-rw-r--r--weed-fs/src/cmd/weed/volume.go39
-rw-r--r--weed-fs/src/pkg/admin/storage.go25
-rw-r--r--weed-fs/src/pkg/admin/storage_test.go17
-rw-r--r--weed-fs/src/pkg/replication/volume_growth.go97
-rw-r--r--weed-fs/src/pkg/replication/volume_growth_test.go16
-rw-r--r--weed-fs/src/pkg/storage/storage_limit.go13
-rw-r--r--weed-fs/src/pkg/storage/store.go43
-rw-r--r--weed-fs/src/pkg/storage/volume_info.go8
-rw-r--r--weed-fs/src/pkg/topology/data_center.go15
-rw-r--r--weed-fs/src/pkg/topology/data_node.go39
-rw-r--r--weed-fs/src/pkg/topology/node.go8
-rw-r--r--weed-fs/src/pkg/topology/rack.go15
-rw-r--r--weed-fs/src/pkg/topology/topology.go89
-rw-r--r--weed-fs/src/pkg/topology/volume_layout.go33
-rw-r--r--weed-fs/src/pkg/topology/volume_location.go9
-rw-r--r--weed-fs/src/pkg/util/post.go8
18 files changed, 363 insertions, 187 deletions
diff --git a/weed-fs/src/cmd/weed/master.go b/weed-fs/src/cmd/weed/master.go
index 29b05efce..72c0d6f91 100644
--- a/weed-fs/src/cmd/weed/master.go
+++ b/weed-fs/src/cmd/weed/master.go
@@ -5,7 +5,9 @@ import (
"log"
"net/http"
"pkg/directory"
+ "pkg/replication"
"pkg/storage"
+ "pkg/topology"
"strconv"
"strings"
"time"
@@ -29,11 +31,14 @@ var (
mport = cmdMaster.Flag.Int("port", 9333, "http listen port")
metaFolder = cmdMaster.Flag.String("mdir", "/tmp", "data directory to store mappings")
capacity = cmdMaster.Flag.Int("capacity", 100, "maximum number of volumes to hold")
- mapper *directory.Mapper
volumeSizeLimitMB = cmdMaster.Flag.Uint("volumeSizeLimitMB", 32*1024, "Default Volume Size in MegaBytes")
mpulse = cmdMaster.Flag.Int("pulseSeconds", 5, "number of seconds between heartbeats")
)
+var mapper *directory.Mapper
+var topo *topology.Topology
+var vg *replication.VolumeGrowth
+
func dirLookupHandler(w http.ResponseWriter, r *http.Request) {
vid := r.FormValue("volumeId")
commaSep := strings.Index(vid, ",")
@@ -43,10 +48,10 @@ func dirLookupHandler(w http.ResponseWriter, r *http.Request) {
volumeId, _ := storage.NewVolumeId(vid)
machines, e := mapper.Get(volumeId)
if e == nil {
- ret:= []map[string]string{}
- for _, machine := range machines {
- ret = append(ret,map[string]string{"url": machine.Url, "publicUrl": machine.PublicUrl})
- }
+ ret := []map[string]string{}
+ for _, machine := range machines {
+ ret = append(ret, map[string]string{"url": machine.Url, "publicUrl": machine.PublicUrl})
+ }
writeJson(w, r, ret)
} else {
log.Println("Invalid volume id", volumeId)
@@ -62,7 +67,27 @@ func dirAssignHandler(w http.ResponseWriter, r *http.Request) {
writeJson(w, r, map[string]string{"error": err.Error()})
}
}
+func dirAssign2Handler(w http.ResponseWriter, r *http.Request) {
+ c, _ := strconv.Atoi(r.FormValue("count"))
+ rt := storage.NewReplicationType(r.FormValue("replication"))
+ if topo.GetVolumeLayout(rt).GetActiveVolumeCount() <= 0 {
+ if topo.FreeSpace() <= 0 {
+ writeJson(w, r, map[string]string{"error": "No free volumes left!"})
+ } else {
+ vg.GrowByType(rt, topo)
+ }
+ }
+ fid, count, dn, err := topo.PickForWrite(rt, c)
+ if err == nil {
+ writeJson(w, r, map[string]interface{}{"fid": fid, "url": dn.Ip + ":" + strconv.Itoa(dn.Port), "publicUrl": dn.PublicUrl, "count": count})
+ } else {
+ writeJson(w, r, map[string]string{"error": err.Error()})
+ }
+}
func dirJoinHandler(w http.ResponseWriter, r *http.Request) {
+ ip := r.RemoteAddr[0:strings.Index(r.RemoteAddr, ":")]
+ port, _ := strconv.Atoi(r.FormValue("port"))
+ maxVolumeCount, _ := strconv.Atoi(r.FormValue("maxVolumeCount"))
s := r.RemoteAddr[0:strings.Index(r.RemoteAddr, ":")+1] + r.FormValue("port")
publicUrl := r.FormValue("publicUrl")
volumes := new([]storage.VolumeInfo)
@@ -71,20 +96,40 @@ func dirJoinHandler(w http.ResponseWriter, r *http.Request) {
log.Println(s, "volumes", r.FormValue("volumes"))
}
mapper.Add(directory.NewMachine(s, publicUrl, *volumes, time.Now().Unix()))
+
+ //new ways
+ topo.RegisterVolumes(*volumes, ip, port, publicUrl, maxVolumeCount)
}
-func dirStatusHandler(w http.ResponseWriter, r *http.Request) {
+func dirOldStatusHandler(w http.ResponseWriter, r *http.Request) {
writeJson(w, r, mapper)
}
+func dirNewStatusHandler(w http.ResponseWriter, r *http.Request) {
+ writeJson(w, r, topo.ToMap())
+}
+func volumeGrowHandler(w http.ResponseWriter, r *http.Request) {
+ rt := storage.NewReplicationType(r.FormValue("replication"))
+ count, err := strconv.Atoi(r.FormValue("count"))
+ if err != nil {
+ vg.GrowByType(rt, topo)
+ } else {
+ vg.GrowByCountAndType(count, rt, topo)
+ }
+}
func runMaster(cmd *Command, args []string) bool {
+ topo = topology.NewTopology("topo", *metaFolder, "toposequence", uint64(*volumeSizeLimitMB)*1024*1024, *mpulse)
+ vg = replication.NewDefaultVolumeGrowth()
log.Println("Volume Size Limit is", *volumeSizeLimitMB, "MB")
mapper = directory.NewMapper(*metaFolder, "directory", uint64(*volumeSizeLimitMB)*1024*1024, *mpulse)
http.HandleFunc("/dir/assign", dirAssignHandler)
+ http.HandleFunc("/dir/assign2", dirAssign2Handler)
http.HandleFunc("/dir/lookup", dirLookupHandler)
http.HandleFunc("/dir/join", dirJoinHandler)
- http.HandleFunc("/dir/status", dirStatusHandler)
+ http.HandleFunc("/dir/status", dirOldStatusHandler)
+ http.HandleFunc("/dir/status2", dirNewStatusHandler) //temporary
+ http.HandleFunc("/vol/grow", volumeGrowHandler)
- mapper.StartRefreshWritableVolumes()
+ mapper.StartRefreshWritableVolumes()
log.Println("Start directory service at http://127.0.0.1:" + strconv.Itoa(*mport))
e := http.ListenAndServe(":"+strconv.Itoa(*mport), nil)
diff --git a/weed-fs/src/cmd/weed/upload.go b/weed-fs/src/cmd/weed/upload.go
index 5cfb6ee97..8502ca417 100644
--- a/weed-fs/src/cmd/weed/upload.go
+++ b/weed-fs/src/cmd/weed/upload.go
@@ -39,19 +39,22 @@ type AssignResult struct {
Error string "error"
}
-func assign(count int) (AssignResult, error) {
+func assign(count int) (*AssignResult, error) {
values := make(url.Values)
values.Add("count", strconv.Itoa(count))
- jsonBlob := util.Post("http://"+*server+"/dir/assign", values)
+ jsonBlob, err := util.Post("http://"+*server+"/dir/assign", values)
+ if err != nil {
+ return nil, err
+ }
var ret AssignResult
- err := json.Unmarshal(jsonBlob, &ret)
+ err = json.Unmarshal(jsonBlob, &ret)
if err != nil {
- return ret, err
+ return nil, err
}
if ret.Count <= 0 {
- return ret, errors.New(ret.Error)
+ return nil, errors.New(ret.Error)
}
- return ret, nil
+ return &ret, nil
}
type UploadResult struct {
diff --git a/weed-fs/src/cmd/weed/volume.go b/weed-fs/src/cmd/weed/volume.go
index 0b7ef9671..39faaffec 100644
--- a/weed-fs/src/cmd/weed/volume.go
+++ b/weed-fs/src/cmd/weed/volume.go
@@ -6,6 +6,7 @@ import (
"math/rand"
"mime"
"net/http"
+ "os"
"pkg/storage"
"strconv"
"strings"
@@ -18,7 +19,7 @@ func init() {
}
var cmdVolume = &Command{
- UsageLine: "volume -port=8080 -dir=/tmp -volumes=0-99 -publicUrl=server_name:8080 -mserver=localhost:9333",
+ UsageLine: "volume -port=8080 -dir=/tmp -min=3 -max=5 -publicUrl=server_name:8080 -mserver=localhost:9333",
Short: "start a volume server",
Long: `start a volume server to provide storage spaces
@@ -26,12 +27,13 @@ var cmdVolume = &Command{
}
var (
- vport = cmdVolume.Flag.Int("port", 8080, "http listen port")
- chunkFolder = cmdVolume.Flag.String("dir", "/tmp", "data directory to store files")
- volumes = cmdVolume.Flag.String("volumes", "0,1-3,4", "comma-separated list of volume ids or range of ids")
- publicUrl = cmdVolume.Flag.String("publicUrl", "localhost:8080", "public url to serve data read")
- masterNode = cmdVolume.Flag.String("mserver", "localhost:9333", "master directory server to store mappings")
- vpulse = cmdVolume.Flag.Int("pulseSeconds", 5, "number of seconds between heartbeats")
+ vport = cmdVolume.Flag.Int("port", 8080, "http listen port")
+ volumeFolder = cmdVolume.Flag.String("dir", "/tmp", "data directory to store files")
+ volumes = cmdVolume.Flag.String("volumes", "", "comma-separated list, or ranges of volume ids")
+ publicUrl = cmdVolume.Flag.String("publicUrl", "localhost:8080", "public url to serve data read")
+ masterNode = cmdVolume.Flag.String("mserver", "localhost:9333", "master directory server to store mappings")
+ vpulse = cmdVolume.Flag.Int("pulseSeconds", 5, "number of seconds between heartbeats")
+ maxVolumeCount = cmdVolume.Flag.Int("maxVolumeCount", 5, "maximum number of volumes")
store *storage.Store
)
@@ -46,9 +48,9 @@ func assignVolumeHandler(w http.ResponseWriter, r *http.Request) {
} else {
writeJson(w, r, map[string]string{"error": err.Error()})
}
- if *IsDebug {
- log.Println("volume =", r.FormValue("volume"), ", replicationType =", r.FormValue("replicationType"), ", error =", err)
- }
+ if *IsDebug {
+ log.Println("volume =", r.FormValue("volume"), ", replicationType =", r.FormValue("replicationType"), ", error =", err)
+ }
}
func setVolumeLocationsHandler(w http.ResponseWriter, r *http.Request) {
if *IsDebug {
@@ -86,11 +88,15 @@ func GetHandler(w http.ResponseWriter, r *http.Request) {
}
cookie := n.Cookie
count, e := store.Read(volumeId, n)
+ if e != nil {
+ w.WriteHeader(404)
+ }
if *IsDebug {
log.Println("read bytes", count, "error", e)
}
if n.Cookie != cookie {
log.Println("request with unmaching cookie from ", r.RemoteAddr, "agent", r.UserAgent())
+ w.WriteHeader(404)
return
}
if ext != "" {
@@ -175,14 +181,25 @@ func parseURLPath(path string) (vid, fid, ext string) {
}
func runVolume(cmd *Command, args []string) bool {
+ fileInfo, err := os.Stat(*volumeFolder)
//TODO: now default to 1G, this value should come from server?
- store = storage.NewStore(*vport, *publicUrl, *chunkFolder, *volumes)
+ if err!=nil{
+ log.Fatalf("No Existing Folder:%s", *volumeFolder)
+ }
+ if !fileInfo.IsDir() {
+ log.Fatalf("Volume Folder should not be a file:%s", *volumeFolder)
+ }
+ perm:=fileInfo.Mode().Perm()
+ log.Println("Volume Folder permission:", perm)
+
+ store = storage.NewStore(*vport, *publicUrl, *volumeFolder, *maxVolumeCount, *volumes)
defer store.Close()
http.HandleFunc("/", storeHandler)
http.HandleFunc("/status", statusHandler)
http.HandleFunc("/admin/assign_volume", assignVolumeHandler)
http.HandleFunc("/admin/set_volume_locations_list", setVolumeLocationsHandler)
+
go func() {
for {
store.Join(*masterNode)
diff --git a/weed-fs/src/pkg/admin/storage.go b/weed-fs/src/pkg/admin/storage.go
index 8e78b8697..8d9e8a103 100644
--- a/weed-fs/src/pkg/admin/storage.go
+++ b/weed-fs/src/pkg/admin/storage.go
@@ -1,35 +1,10 @@
package admin
import (
- "encoding/json"
- "errors"
- "strconv"
- "net/url"
- "pkg/util"
"pkg/storage"
"pkg/topology"
)
-type AllocateVolumeResult struct {
- Error string
-}
-
-func AllocateVolume(dn *topology.DataNode, vid storage.VolumeId, repType storage.ReplicationType) error{
- values := make(url.Values)
- values.Add("volume", vid.String())
- values.Add("replicationType", repType.String())
- jsonBlob := util.Post("http://"+dn.Ip+":"+strconv.Itoa(dn.Port)+"/admin/assign_volume", values)
- var ret AllocateVolumeResult
- err := json.Unmarshal(jsonBlob, &ret)
- if err != nil {
- return err
- }
- if ret.Error != "" {
- return errors.New(ret.Error)
- }
- return nil
-}
-
func SendVolumeLocationsList(t *topology.Topology, vid storage.VolumeId) error{
// values := make(url.Values)
// values.Add("volumeLocationsList", vid.String())
diff --git a/weed-fs/src/pkg/admin/storage_test.go b/weed-fs/src/pkg/admin/storage_test.go
deleted file mode 100644
index ecc2ab22e..000000000
--- a/weed-fs/src/pkg/admin/storage_test.go
+++ /dev/null
@@ -1,17 +0,0 @@
-package admin
-
-import (
- "log"
- "pkg/storage"
- "pkg/topology"
- "testing"
-)
-
-func TestXYZ(t *testing.T) {
- dn := topology.NewDataNode("server1")
- dn.Ip = "localhost"
- dn.Port = 8080
- vid, _:= storage.NewVolumeId("6")
- out := AllocateVolume(dn,vid,storage.Copy00)
- log.Println(out)
-}
diff --git a/weed-fs/src/pkg/replication/volume_growth.go b/weed-fs/src/pkg/replication/volume_growth.go
index 3269fcbc6..f6dec8a65 100644
--- a/weed-fs/src/pkg/replication/volume_growth.go
+++ b/weed-fs/src/pkg/replication/volume_growth.go
@@ -1,10 +1,15 @@
package replication
import (
+ "encoding/json"
+ "errors"
"fmt"
"math/rand"
+ "net/url"
"pkg/storage"
"pkg/topology"
+ "pkg/util"
+ "strconv"
)
/*
@@ -22,17 +27,36 @@ type VolumeGrowth struct {
copyAll int
}
-func (vg *VolumeGrowth) GrowVolumeCopy(copyLevel int, topo *topology.Topology) {
- switch copyLevel {
- case 1:
- for i := 0; i < vg.copy1factor; i++ {
+func NewDefaultVolumeGrowth() *VolumeGrowth {
+ return &VolumeGrowth{copy1factor: 7, copy2factor: 6, copy3factor: 3}
+}
+
+func (vg *VolumeGrowth) GrowByType(repType storage.ReplicationType, topo *topology.Topology) {
+ switch repType {
+ case storage.Copy00:
+ vg.GrowByCountAndType(vg.copy1factor, repType, topo)
+ case storage.Copy10:
+ vg.GrowByCountAndType(vg.copy2factor, repType, topo)
+ case storage.Copy20:
+ vg.GrowByCountAndType(vg.copy3factor, repType, topo)
+ case storage.Copy01:
+ vg.GrowByCountAndType(vg.copy2factor, repType, topo)
+ case storage.Copy11:
+ vg.GrowByCountAndType(vg.copy3factor, repType, topo)
+ }
+
+}
+func (vg *VolumeGrowth) GrowByCountAndType(count int, repType storage.ReplicationType, topo *topology.Topology) {
+ switch repType {
+ case storage.Copy00:
+ for i := 0; i < count; i++ {
ret, server, vid := topo.RandomlyReserveOneVolume()
if ret {
- vg.Grow(vid, server)
+ vg.grow(topo, *vid, repType, server)
}
}
- case 20:
- for i := 0; i < vg.copy2factor; i++ {
+ case storage.Copy10:
+ for i := 0; i < count; i++ {
nl := topology.NewNodeList(topo.Children(), nil)
picked, ret := nl.RandomlyPickN(2)
vid := topo.NextVolumeId()
@@ -44,12 +68,12 @@ func (vg *VolumeGrowth) GrowVolumeCopy(copyLevel int, topo *topology.Topology) {
}
}
if len(servers) == 2 {
- vg.Grow(vid, servers[0], servers[1])
+ vg.grow(topo, vid, repType, servers[0], servers[1])
}
}
}
- case 30:
- for i := 0; i < vg.copy3factor; i++ {
+ case storage.Copy20:
+ for i := 0; i < count; i++ {
nl := topology.NewNodeList(topo.Children(), nil)
picked, ret := nl.RandomlyPickN(3)
vid := topo.NextVolumeId()
@@ -61,12 +85,12 @@ func (vg *VolumeGrowth) GrowVolumeCopy(copyLevel int, topo *topology.Topology) {
}
}
if len(servers) == 3 {
- vg.Grow(vid, servers[0], servers[1], servers[2])
+ vg.grow(topo, vid, repType, servers[0], servers[1], servers[2])
}
}
}
- case 02:
- for i := 0; i < vg.copy2factor; i++ {
+ case storage.Copy01:
+ for i := 0; i < count; i++ {
//randomly pick one server, and then choose from the same rack
ret, server1, vid := topo.RandomlyReserveOneVolume()
if ret {
@@ -74,22 +98,53 @@ func (vg *VolumeGrowth) GrowVolumeCopy(copyLevel int, topo *topology.Topology) {
exclusion := make(map[string]topology.Node)
exclusion[server1.String()] = server1
newNodeList := topology.NewNodeList(rack.Children(), exclusion)
- ret2, server2 := newNodeList.ReserveOneVolume(rand.Intn(newNodeList.FreeSpace()), vid)
- if ret2 {
- vg.Grow(vid, server1, server2)
+ if newNodeList.FreeSpace() > 0 {
+ ret2, server2 := newNodeList.ReserveOneVolume(rand.Intn(newNodeList.FreeSpace()), *vid)
+ if ret2 {
+ vg.grow(topo, *vid, repType, server1, server2)
+ }
}
}
}
- case 12:
- for i := 0; i < vg.copy3factor; i++ {
+ case storage.Copy11:
+ for i := 0; i < count; i++ {
}
}
}
-func (vg *VolumeGrowth) Grow(vid storage.VolumeId, servers ...*topology.DataNode) {
+func (vg *VolumeGrowth) grow(topo *topology.Topology, vid storage.VolumeId, repType storage.ReplicationType, servers ...*topology.DataNode) {
for _, server := range servers {
- vi := &storage.VolumeInfo{Id: vid, Size: 0}
- server.AddVolume(vi)
+ if err := AllocateVolume(server, vid, repType); err == nil {
+ vi := &storage.VolumeInfo{Id: vid, Size: 0}
+ server.AddOrUpdateVolume(vi)
+ topo.RegisterVolumeLayout(vi, server)
+ fmt.Println("added", vid, "to", server)
+ } else {
+ //TODO: need error handling
+ fmt.Println("Failed to assign", vid, "to", servers)
+ }
}
fmt.Println("Assigning", vid, "to", servers)
}
+
+type AllocateVolumeResult struct {
+ Error string
+}
+
+func AllocateVolume(dn *topology.DataNode, vid storage.VolumeId, repType storage.ReplicationType) error {
+ values := make(url.Values)
+ values.Add("volume", vid.String())
+ values.Add("replicationType", repType.String())
+ jsonBlob, err := util.Post("http://"+dn.Ip+":"+strconv.Itoa(dn.Port)+"/admin/assign_volume", values)
+ if err != nil {
+ return err
+ }
+ var ret AllocateVolumeResult
+ if err := json.Unmarshal(jsonBlob, &ret); err != nil {
+ return err
+ }
+ if ret.Error != "" {
+ return errors.New(ret.Error)
+ }
+ return nil
+}
diff --git a/weed-fs/src/pkg/replication/volume_growth_test.go b/weed-fs/src/pkg/replication/volume_growth_test.go
index f8e441f03..5068577b1 100644
--- a/weed-fs/src/pkg/replication/volume_growth_test.go
+++ b/weed-fs/src/pkg/replication/volume_growth_test.go
@@ -80,7 +80,7 @@ func setup(topologyLayout string) *topology.Topology {
fmt.Println("data:", data)
//need to connect all nodes first before server adding volumes
- topo := topology.NewTopology("mynetwork")
+ topo := topology.NewTopology("mynetwork","/tmp","testing",32*1024, 5)
mTopology := data.(map[string]interface{})
for dcKey, dcValue := range mTopology {
dc := topology.NewDataCenter(dcKey)
@@ -97,7 +97,7 @@ func setup(topologyLayout string) *topology.Topology {
for _, v := range serverMap["volumes"].([]interface{}) {
m := v.(map[string]interface{})
vi := &storage.VolumeInfo{Id: storage.VolumeId(int64(m["id"].(float64))), Size: int64(m["size"].(float64))}
- server.AddVolume(vi)
+ server.AddOrUpdateVolume(vi)
}
server.UpAdjustMaxVolumeCountDelta(int(serverMap["limit"].(float64)))
}
@@ -125,5 +125,15 @@ func TestReserveOneVolume(t *testing.T) {
topo := setup(topologyLayout)
rand.Seed(time.Now().UnixNano())
vg:=&VolumeGrowth{copy1factor:3,copy2factor:2,copy3factor:1,copyAll:4}
- vg.GrowVolumeCopy(20,topo)
+ vg.GrowByType(storage.Copy20,topo)
+}
+
+
+func TestXYZ(t *testing.T) {
+ dn := topology.NewDataNode("server1")
+ dn.Ip = "localhost"
+ dn.Port = 8080
+ vid, _:= storage.NewVolumeId("600")
+ out := AllocateVolume(dn,vid,storage.Copy00)
+ fmt.Println(out)
}
diff --git a/weed-fs/src/pkg/storage/storage_limit.go b/weed-fs/src/pkg/storage/storage_limit.go
deleted file mode 100644
index b40618046..000000000
--- a/weed-fs/src/pkg/storage/storage_limit.go
+++ /dev/null
@@ -1,13 +0,0 @@
-package storage
-
-import (
-)
-
-type StorageLimit struct {
- sizeLimit uint64
-}
-
-func NewStorageLimit(desiredLimit uint64) *StorageLimit {
- sl := &StorageLimit{sizeLimit: desiredLimit}
- return sl
-}
diff --git a/weed-fs/src/pkg/storage/store.go b/weed-fs/src/pkg/storage/store.go
index 6bc1ec028..a0238d99d 100644
--- a/weed-fs/src/pkg/storage/store.go
+++ b/weed-fs/src/pkg/storage/store.go
@@ -12,19 +12,20 @@ import (
)
type Store struct {
- volumes map[VolumeId]*Volume
- dir string
- Port int
- PublicUrl string
- Limit StorageLimit
+ volumes map[VolumeId]*Volume
+ dir string
+ Port int
+ PublicUrl string
+ MaxVolumeCount int
}
-func NewStore(port int, publicUrl, dirname string, volumeListString string) (s *Store) {
- s = &Store{Port: port, PublicUrl: publicUrl, dir: dirname}
+func NewStore(port int, publicUrl, dirname string, maxVolumeCount int, volumeListString string) (s *Store) {
+ s = &Store{Port: port, PublicUrl: publicUrl, dir: dirname, MaxVolumeCount: maxVolumeCount}
s.volumes = make(map[VolumeId]*Volume)
-
s.loadExistingVolumes()
- s.AddVolume(volumeListString, "00")
+ if volumeListString != "" {
+ s.AddVolume(volumeListString, "00")
+ }
log.Println("Store started on dir:", dirname, "with", len(s.volumes), "volumes", volumeListString)
return
@@ -82,16 +83,16 @@ func (s *Store) loadExistingVolumes() {
}
}
}
-func (s *Store) Status() *[]*VolumeInfo {
- stats := new([]*VolumeInfo)
+func (s *Store) Status() []*VolumeInfo {
+ var stats []*VolumeInfo
for k, v := range s.volumes {
s := new(VolumeInfo)
s.Id, s.Size, s.RepType = VolumeId(k), v.Size(), v.replicaType
- *stats = append(*stats, s)
+ stats = append(stats, s)
}
return stats
}
-func (s *Store) Join(mserver string) {
+func (s *Store) Join(mserver string) error {
stats := new([]*VolumeInfo)
for k, v := range s.volumes {
s := new(VolumeInfo)
@@ -103,7 +104,9 @@ func (s *Store) Join(mserver string) {
values.Add("port", strconv.Itoa(s.Port))
values.Add("publicUrl", s.PublicUrl)
values.Add("volumes", string(bytes))
- util.Post("http://"+mserver+"/dir/join", values)
+ values.Add("maxVolumeCount", strconv.Itoa(s.MaxVolumeCount))
+ _, err := util.Post("http://"+mserver+"/dir/join", values)
+ return err
}
func (s *Store) Close() {
for _, v := range s.volumes {
@@ -111,22 +114,19 @@ func (s *Store) Close() {
}
}
func (s *Store) Write(i VolumeId, n *Needle) uint32 {
- v := s.volumes[i]
- if v != nil {
+ if v := s.volumes[i]; v != nil {
return v.write(n)
}
return 0
}
func (s *Store) Delete(i VolumeId, n *Needle) uint32 {
- v := s.volumes[i]
- if v != nil {
+ if v := s.volumes[i]; v != nil {
return v.delete(n)
}
return 0
}
func (s *Store) Read(i VolumeId, n *Needle) (int, error) {
- v := s.volumes[i]
- if v != nil {
+ if v := s.volumes[i]; v != nil {
return v.read(n)
}
return 0, errors.New("Not Found")
@@ -140,8 +140,7 @@ type VolumeLocations struct {
func (s *Store) SetVolumeLocations(volumeLocationList []VolumeLocations) error {
for _, volumeLocations := range volumeLocationList {
vid := volumeLocations.Vid
- v := s.volumes[vid]
- if v != nil {
+ if v := s.volumes[vid]; v != nil {
v.locations = volumeLocations.Locations
}
}
diff --git a/weed-fs/src/pkg/storage/volume_info.go b/weed-fs/src/pkg/storage/volume_info.go
index c9ed5265c..01a56a30c 100644
--- a/weed-fs/src/pkg/storage/volume_info.go
+++ b/weed-fs/src/pkg/storage/volume_info.go
@@ -50,8 +50,8 @@ func (r *ReplicationType) String() string {
return "00"
}
-func GetReplicationLevelIndex(v *VolumeInfo) int {
- switch v.RepType {
+func GetReplicationLevelIndex(repType ReplicationType) int {
+ switch repType {
case Copy00:
return 0
case Copy01:
@@ -65,8 +65,8 @@ func GetReplicationLevelIndex(v *VolumeInfo) int {
}
return -1
}
-func GetCopyCount(v *VolumeInfo) int {
- switch v.RepType {
+func GetCopyCount(repType ReplicationType) int {
+ switch repType {
case Copy00:
return 1
case Copy01:
diff --git a/weed-fs/src/pkg/topology/data_center.go b/weed-fs/src/pkg/topology/data_center.go
index 48466e258..5edf7c6eb 100644
--- a/weed-fs/src/pkg/topology/data_center.go
+++ b/weed-fs/src/pkg/topology/data_center.go
@@ -1,6 +1,7 @@
package topology
-import ()
+import (
+)
type DataCenter struct {
NodeImpl
@@ -33,3 +34,15 @@ func (dc *DataCenter) GetOrCreateRack(ip string) *Rack {
dc.LinkChildNode(rack)
return rack
}
+
+func (dc *DataCenter) ToMap() interface{}{
+ m := make(map[string]interface{})
+ m["Free"] = dc.FreeSpace()
+ var racks []interface{}
+ for _, c := range dc.Children() {
+ rack := c.(*Rack)
+ racks = append(racks, rack.ToMap())
+ }
+ m["Racks"] = racks
+ return m
+}
diff --git a/weed-fs/src/pkg/topology/data_node.go b/weed-fs/src/pkg/topology/data_node.go
index 254754ccd..1516572fd 100644
--- a/weed-fs/src/pkg/topology/data_node.go
+++ b/weed-fs/src/pkg/topology/data_node.go
@@ -22,22 +22,37 @@ func NewDataNode(id string) *DataNode {
return s
}
func (dn *DataNode) CreateOneVolume(r int, vid storage.VolumeId) storage.VolumeId {
- dn.AddVolume(&storage.VolumeInfo{Id: vid})
+ dn.AddOrUpdateVolume(&storage.VolumeInfo{Id: vid})
return vid
}
-func (dn *DataNode) AddVolume(v *storage.VolumeInfo) {
- dn.volumes[v.Id] = v
- dn.UpAdjustActiveVolumeCountDelta(1)
- dn.UpAdjustMaxVolumeId(v.Id)
+func (dn *DataNode) AddOrUpdateVolume(v *storage.VolumeInfo) {
+ if dn.volumes[v.Id] == nil {
+ dn.volumes[v.Id] = v
+ dn.UpAdjustActiveVolumeCountDelta(1)
+ dn.UpAdjustMaxVolumeId(v.Id)
+ }else{
+ dn.volumes[v.Id] = v
+ }
}
func (dn *DataNode) GetTopology() *Topology {
- p := dn.parent
- for p.Parent()!=nil{
- p = p.Parent()
- }
- t := p.(*Topology)
- return t
+ p := dn.parent
+ for p.Parent() != nil {
+ p = p.Parent()
+ }
+ t := p.(*Topology)
+ return t
}
func (dn *DataNode) MatchLocation(ip string, port int) bool {
- return dn.Ip == ip && dn.Port == port
+ return dn.Ip == ip && dn.Port == port
+}
+
+func (dn *DataNode) ToMap() interface{} {
+ ret := make(map[string]interface{})
+ ret["Ip"] = dn.Ip
+ ret["Port"] = dn.Port
+ ret["Volumes"] = dn.GetActiveVolumeCount()
+ ret["MaxVolumeCount"] = dn.GetMaxVolumeCount()
+ ret["FreeVolumeCount"] = dn.FreeSpace()
+ ret["PublicUrl"] = dn.PublicUrl
+ return ret
}
diff --git a/weed-fs/src/pkg/topology/node.go b/weed-fs/src/pkg/topology/node.go
index fb610bd73..cdd9accd4 100644
--- a/weed-fs/src/pkg/topology/node.go
+++ b/weed-fs/src/pkg/topology/node.go
@@ -127,12 +127,10 @@ func (n *NodeImpl) GetMaxVolumeCount() int {
func (n *NodeImpl) LinkChildNode(node Node) {
if n.children[node.Id()] == nil {
n.children[node.Id()] = node
- n.activeVolumeCount += node.GetActiveVolumeCount()
- n.maxVolumeCount += node.GetMaxVolumeCount()
+ n.UpAdjustMaxVolumeCountDelta(node.GetMaxVolumeCount())
+ n.UpAdjustMaxVolumeId(node.GetMaxVolumeId())
+ n.UpAdjustActiveVolumeCountDelta(node.GetActiveVolumeCount())
node.setParent(n)
- if n.maxVolumeId < node.GetMaxVolumeId() {
- n.maxVolumeId = node.GetMaxVolumeId()
- }
fmt.Println(n, "adds", node, "volumeCount =", n.activeVolumeCount)
}
}
diff --git a/weed-fs/src/pkg/topology/rack.go b/weed-fs/src/pkg/topology/rack.go
index a52223887..8a09d0bfe 100644
--- a/weed-fs/src/pkg/topology/rack.go
+++ b/weed-fs/src/pkg/topology/rack.go
@@ -24,7 +24,7 @@ func (r *Rack) MatchLocationRange(ip string) bool{
return r.ipRange.Match(ip)
}
-func (r *Rack) GetOrCreateDataNode(ip string, port int, publicUrl string) *DataNode{
+func (r *Rack) GetOrCreateDataNode(ip string, port int, publicUrl string, maxVolumeCount int) *DataNode{
for _, c := range r.Children() {
dn := c.(*DataNode)
if dn.MatchLocation(ip,port) {
@@ -35,6 +35,19 @@ func (r *Rack) GetOrCreateDataNode(ip string, port int, publicUrl string) *DataN
dn.Ip = ip
dn.Port = port
dn.PublicUrl = publicUrl
+ dn.maxVolumeCount = maxVolumeCount
r.LinkChildNode(dn)
return dn
}
+
+func (rack *Rack) ToMap() interface{}{
+ m := make(map[string]interface{})
+ m["Free"] = rack.FreeSpace()
+ var dns []interface{}
+ for _, c := range rack.Children() {
+ dn := c.(*DataNode)
+ dns = append(dns, dn.ToMap())
+ }
+ m["DataNodes"] = dns
+ return m
+}
diff --git a/weed-fs/src/pkg/topology/topology.go b/weed-fs/src/pkg/topology/topology.go
index 2366c29d0..7a96665af 100644
--- a/weed-fs/src/pkg/topology/topology.go
+++ b/weed-fs/src/pkg/topology/topology.go
@@ -1,8 +1,9 @@
package topology
import (
- _ "fmt"
+ "errors"
"math/rand"
+ "pkg/directory"
"pkg/sequence"
"pkg/storage"
)
@@ -31,10 +32,14 @@ func NewTopology(id string, dirname string, filename string, volumeSizeLimit uin
t.sequence = sequence.NewSequencer(dirname, filename)
return t
}
-func (t *Topology) RandomlyReserveOneVolume() (bool, *DataNode, storage.VolumeId) {
+
+func (t *Topology) RandomlyReserveOneVolume() (bool, *DataNode, *storage.VolumeId) {
+ if t.FreeSpace()<=0 {
+ return false, nil, nil
+ }
vid := t.NextVolumeId()
ret, node := t.ReserveOneVolume(rand.Intn(t.FreeSpace()), vid)
- return ret, node, vid
+ return ret, node, &vid
}
func (t *Topology) RandomlyReserveOneVolumeExcept(except []Node) (bool, *DataNode, storage.VolumeId) {
@@ -52,30 +57,68 @@ func (t *Topology) NextVolumeId() storage.VolumeId {
return vid.Next()
}
-func (t *Topology) registerVolumeLayout(v *storage.VolumeInfo, dn *DataNode) {
- replicationTypeIndex := storage.GetReplicationLevelIndex(v)
- if t.replicaType2VolumeLayout[replicationTypeIndex] == nil {
- t.replicaType2VolumeLayout[replicationTypeIndex] = NewVolumeLayout(t.volumeSizeLimit, t.pulse)
- }
- t.replicaType2VolumeLayout[replicationTypeIndex].RegisterVolume(v, dn)
+func (t *Topology) PickForWrite(repType storage.ReplicationType, count int) (string, int, *DataNode, error) {
+ replicationTypeIndex := storage.GetReplicationLevelIndex(repType)
+ if t.replicaType2VolumeLayout[replicationTypeIndex] == nil {
+ t.replicaType2VolumeLayout[replicationTypeIndex] = NewVolumeLayout(repType, t.volumeSizeLimit, t.pulse)
+ }
+ vid, count, datanodes, err := t.replicaType2VolumeLayout[replicationTypeIndex].PickForWrite(count)
+ if err != nil {
+ return "", 0, nil, errors.New("No writable volumes avalable!")
+ }
+ fileId, count := t.sequence.NextFileId(count)
+ return directory.NewFileId(*vid, fileId, rand.Uint32()).String(), count, datanodes.Head(), nil
+}
+
+func (t *Topology) GetVolumeLayout(repType storage.ReplicationType) *VolumeLayout {
+ replicationTypeIndex := storage.GetReplicationLevelIndex(repType)
+ if t.replicaType2VolumeLayout[replicationTypeIndex] == nil {
+ t.replicaType2VolumeLayout[replicationTypeIndex] = NewVolumeLayout(repType, t.volumeSizeLimit, t.pulse)
+ }
+ return t.replicaType2VolumeLayout[replicationTypeIndex]
}
-func (t *Topology) RegisterVolume(v *storage.VolumeInfo, ip string, port int, publicUrl string) {
+func (t *Topology) RegisterVolumeLayout(v *storage.VolumeInfo, dn *DataNode) {
+ t.GetVolumeLayout(v.RepType).RegisterVolume(v, dn)
+}
+
+func (t *Topology) RegisterVolumes(volumeInfos []storage.VolumeInfo, ip string, port int, publicUrl string, maxVolumeCount int) {
dc := t.GetOrCreateDataCenter(ip)
rack := dc.GetOrCreateRack(ip)
- dn := rack.GetOrCreateDataNode(ip, port, publicUrl)
- dn.AddVolume(v)
- t.registerVolumeLayout(v,dn)
+ dn := rack.GetOrCreateDataNode(ip, port, publicUrl, maxVolumeCount)
+ for _, v := range volumeInfos {
+ dn.AddOrUpdateVolume(&v)
+ t.RegisterVolumeLayout(&v, dn)
+ }
}
-func (t *Topology) GetOrCreateDataCenter(ip string) *DataCenter{
- for _, c := range t.Children() {
- dc := c.(*DataCenter)
- if dc.MatchLocationRange(ip) {
- return dc
- }
- }
- dc := NewDataCenter("DefaultDataCenter")
- t.LinkChildNode(dc)
- return dc
+func (t *Topology) GetOrCreateDataCenter(ip string) *DataCenter {
+ for _, c := range t.Children() {
+ dc := c.(*DataCenter)
+ if dc.MatchLocationRange(ip) {
+ return dc
+ }
+ }
+ dc := NewDataCenter("DefaultDataCenter")
+ t.LinkChildNode(dc)
+ return dc
+}
+
+func (t *Topology) ToMap() interface{} {
+ m := make(map[string]interface{})
+ m["Free"] = t.FreeSpace()
+ var dcs []interface{}
+ for _, c := range t.Children() {
+ dc := c.(*DataCenter)
+ dcs = append(dcs, dc.ToMap())
+ }
+ m["DataCenters"] = dcs
+ var layouts []interface{}
+ for _, layout := range t.replicaType2VolumeLayout {
+ if layout != nil {
+ layouts = append(layouts, layout.ToMap())
+ }
+ }
+ m["layouts"] = layouts
+ return m
}
diff --git a/weed-fs/src/pkg/topology/volume_layout.go b/weed-fs/src/pkg/topology/volume_layout.go
index 48d76325e..99859668e 100644
--- a/weed-fs/src/pkg/topology/volume_layout.go
+++ b/weed-fs/src/pkg/topology/volume_layout.go
@@ -8,14 +8,16 @@ import (
)
type VolumeLayout struct {
+ repType storage.ReplicationType
vid2location map[storage.VolumeId]*DataNodeLocationList
writables []storage.VolumeId // transient array of writable volume id
pulse int64
volumeSizeLimit uint64
}
-func NewVolumeLayout(volumeSizeLimit uint64, pulse int64) *VolumeLayout {
+func NewVolumeLayout(repType storage.ReplicationType, volumeSizeLimit uint64, pulse int64) *VolumeLayout {
return &VolumeLayout{
+ repType: repType,
vid2location: make(map[storage.VolumeId]*DataNodeLocationList),
writables: *new([]storage.VolumeId),
pulse: pulse,
@@ -27,24 +29,37 @@ func (vl *VolumeLayout) RegisterVolume(v *storage.VolumeInfo, dn *DataNode) {
if _, ok := vl.vid2location[v.Id]; !ok {
vl.vid2location[v.Id] = NewDataNodeLocationList()
}
- vl.vid2location[v.Id].Add(dn)
- if len(vl.vid2location[v.Id].list) >= storage.GetCopyCount(v) {
- if uint64(v.Size) < vl.volumeSizeLimit {
- vl.writables = append(vl.writables, v.Id)
+ if vl.vid2location[v.Id].Add(dn) {
+ if len(vl.vid2location[v.Id].list) == storage.GetCopyCount(v.RepType) {
+ if uint64(v.Size) < vl.volumeSizeLimit {
+ vl.writables = append(vl.writables, v.Id)
+ }
}
}
}
-func (vl *VolumeLayout) PickForWrite(count int) (int, *DataNodeLocationList, error) {
+func (vl *VolumeLayout) PickForWrite(count int) (*storage.VolumeId, int, *DataNodeLocationList, error) {
len_writers := len(vl.writables)
if len_writers <= 0 {
fmt.Println("No more writable volumes!")
- return 0, nil, errors.New("No more writable volumes!")
+ return nil, 0, nil, errors.New("No more writable volumes!")
}
vid := vl.writables[rand.Intn(len_writers)]
locationList := vl.vid2location[vid]
if locationList != nil {
- return count, locationList, nil
+ return &vid, count, locationList, nil
}
- return 0, nil, errors.New("Strangely vid " + vid.String() + " is on no machine!")
+ return nil, 0, nil, errors.New("Strangely vid " + vid.String() + " is on no machine!")
+}
+
+func (vl *VolumeLayout) GetActiveVolumeCount() int {
+ return len(vl.writables)
+}
+
+func (vl *VolumeLayout) ToMap() interface{} {
+ m := make(map[string]interface{})
+ m["replication"] = vl.repType.String()
+ m["writables"] = vl.writables
+ //m["locations"] = vl.vid2location
+ return m
}
diff --git a/weed-fs/src/pkg/topology/volume_location.go b/weed-fs/src/pkg/topology/volume_location.go
index be8218898..92d89ae46 100644
--- a/weed-fs/src/pkg/topology/volume_location.go
+++ b/weed-fs/src/pkg/topology/volume_location.go
@@ -10,13 +10,18 @@ func NewDataNodeLocationList() *DataNodeLocationList {
return &DataNodeLocationList{}
}
-func (dnll *DataNodeLocationList) Add(loc *DataNode) {
+func (dnll *DataNodeLocationList) Head() *DataNode {
+ return dnll.list[0]
+}
+
+func (dnll *DataNodeLocationList) Add(loc *DataNode) bool {
for _, dnl := range dnll.list {
if loc.Ip == dnl.Ip && loc.Port == dnl.Port {
- break
+ return false
}
}
dnll.list = append(dnll.list, loc)
+ return true
}
func (dnll *DataNodeLocationList) Refresh(freshThreshHold int64) {
diff --git a/weed-fs/src/pkg/util/post.go b/weed-fs/src/pkg/util/post.go
index 871448785..357b42185 100644
--- a/weed-fs/src/pkg/util/post.go
+++ b/weed-fs/src/pkg/util/post.go
@@ -7,17 +7,17 @@ import (
"net/url"
)
-func Post(url string, values url.Values) []byte {
+func Post(url string, values url.Values) ([]byte, error) {
r, err := http.PostForm(url, values)
if err != nil {
log.Println("post:", err)
- return nil
+ return nil, err
}
defer r.Body.Close()
b, err := ioutil.ReadAll(r.Body)
if err != nil {
log.Println("post:", err)
- return nil
+ return nil, err
}
- return b
+ return b, nil
}