aboutsummaryrefslogtreecommitdiff
path: root/go/topology
diff options
context:
space:
mode:
authoryourchanges <yourchanges@gmail.com>2015-01-10 02:51:26 +0800
committeryourchanges <yourchanges@gmail.com>2015-01-10 02:51:26 +0800
commit9601880e323bbdf9540f2c79fb21d66374245b50 (patch)
tree14fd3b36a89955ec6e0be6d51186031e978b519d /go/topology
parentf7bcd8e958ef185baeca0c455a397d49fcb62256 (diff)
parent2c1a846279c172bcae457e70efa142c29a18892e (diff)
downloadseaweedfs-9601880e323bbdf9540f2c79fb21d66374245b50.tar.xz
seaweedfs-9601880e323bbdf9540f2c79fb21d66374245b50.zip
Merge pull request #2 from chrislusf/master
merge
Diffstat (limited to 'go/topology')
-rw-r--r--go/topology/allocate_volume.go5
-rw-r--r--go/topology/collection.go27
-rw-r--r--go/topology/data_center.go2
-rw-r--r--go/topology/data_node.go8
-rw-r--r--go/topology/node.go5
-rw-r--r--go/topology/store_replicate.go5
-rw-r--r--go/topology/topology.go38
-rw-r--r--go/topology/topology_event_handling.go5
-rw-r--r--go/topology/topology_map.go9
-rw-r--r--go/topology/topology_vacuum.go21
-rw-r--r--go/topology/volume_growth.go9
-rw-r--r--go/topology/volume_growth_test.go5
-rw-r--r--go/topology/volume_layout.go10
-rw-r--r--go/topology/volume_location_list.go8
14 files changed, 96 insertions, 61 deletions
diff --git a/go/topology/allocate_volume.go b/go/topology/allocate_volume.go
index 6562e9ac5..a791b4c1c 100644
--- a/go/topology/allocate_volume.go
+++ b/go/topology/allocate_volume.go
@@ -1,11 +1,12 @@
package topology
import (
- "github.com/chrislusf/weed-fs/go/storage"
- "github.com/chrislusf/weed-fs/go/util"
"encoding/json"
"errors"
"net/url"
+
+ "github.com/chrislusf/weed-fs/go/storage"
+ "github.com/chrislusf/weed-fs/go/util"
)
type AllocateVolumeResult struct {
diff --git a/go/topology/collection.go b/go/topology/collection.go
index 506f43fbf..5437ffd79 100644
--- a/go/topology/collection.go
+++ b/go/topology/collection.go
@@ -1,36 +1,43 @@
package topology
import (
+ "fmt"
+
"github.com/chrislusf/weed-fs/go/storage"
+ "github.com/chrislusf/weed-fs/go/util"
)
type Collection struct {
Name string
volumeSizeLimit uint64
- storageType2VolumeLayout map[string]*VolumeLayout
+ storageType2VolumeLayout *util.ConcurrentReadMap
}
func NewCollection(name string, volumeSizeLimit uint64) *Collection {
c := &Collection{Name: name, volumeSizeLimit: volumeSizeLimit}
- c.storageType2VolumeLayout = make(map[string]*VolumeLayout)
+ c.storageType2VolumeLayout = util.NewConcurrentReadMap()
return c
}
+func (c *Collection) String() string {
+ return fmt.Sprintf("Name:%s, volumeSizeLimit:%d, storageType2VolumeLayout:%v", c.Name, c.volumeSizeLimit, c.storageType2VolumeLayout)
+}
+
func (c *Collection) GetOrCreateVolumeLayout(rp *storage.ReplicaPlacement, ttl *storage.TTL) *VolumeLayout {
keyString := rp.String()
if ttl != nil {
keyString += ttl.String()
}
- if c.storageType2VolumeLayout[keyString] == nil {
- c.storageType2VolumeLayout[keyString] = NewVolumeLayout(rp, ttl, c.volumeSizeLimit)
- }
- return c.storageType2VolumeLayout[keyString]
+ vl := c.storageType2VolumeLayout.Get(keyString, func() interface{} {
+ return NewVolumeLayout(rp, ttl, c.volumeSizeLimit)
+ })
+ return vl.(*VolumeLayout)
}
func (c *Collection) Lookup(vid storage.VolumeId) []*DataNode {
- for _, vl := range c.storageType2VolumeLayout {
+ for _, vl := range c.storageType2VolumeLayout.Items {
if vl != nil {
- if list := vl.Lookup(vid); list != nil {
+ if list := vl.(*VolumeLayout).Lookup(vid); list != nil {
return list
}
}
@@ -39,9 +46,9 @@ func (c *Collection) Lookup(vid storage.VolumeId) []*DataNode {
}
func (c *Collection) ListVolumeServers() (nodes []*DataNode) {
- for _, vl := range c.storageType2VolumeLayout {
+ for _, vl := range c.storageType2VolumeLayout.Items {
if vl != nil {
- if list := vl.ListVolumeServers(); list != nil {
+ if list := vl.(*VolumeLayout).ListVolumeServers(); list != nil {
nodes = append(nodes, list...)
}
}
diff --git a/go/topology/data_center.go b/go/topology/data_center.go
index ebd07803b..bcf2dfd31 100644
--- a/go/topology/data_center.go
+++ b/go/topology/data_center.go
@@ -1,7 +1,5 @@
package topology
-import ()
-
type DataCenter struct {
NodeImpl
}
diff --git a/go/topology/data_node.go b/go/topology/data_node.go
index c3b90470f..09b9fac6c 100644
--- a/go/topology/data_node.go
+++ b/go/topology/data_node.go
@@ -1,9 +1,11 @@
package topology
import (
+ "fmt"
+ "strconv"
+
"github.com/chrislusf/weed-fs/go/glog"
"github.com/chrislusf/weed-fs/go/storage"
- "strconv"
)
type DataNode struct {
@@ -25,6 +27,10 @@ func NewDataNode(id string) *DataNode {
return s
}
+func (dn *DataNode) String() string {
+ return fmt.Sprintf("NodeImpl:%s ,volumes:%v, Ip:%s, Port:%d, PublicUrl:%s, Dead:%v", dn.NodeImpl.String(), dn.volumes, dn.Ip, dn.Port, dn.PublicUrl, dn.Dead)
+}
+
func (dn *DataNode) AddOrUpdateVolume(v storage.VolumeInfo) {
if _, ok := dn.volumes[v.Id]; !ok {
dn.volumes[v.Id] = v
diff --git a/go/topology/node.go b/go/topology/node.go
index 54118802e..10955fa72 100644
--- a/go/topology/node.go
+++ b/go/topology/node.go
@@ -1,11 +1,12 @@
package topology
import (
- "github.com/chrislusf/weed-fs/go/glog"
- "github.com/chrislusf/weed-fs/go/storage"
"errors"
"math/rand"
"strings"
+
+ "github.com/chrislusf/weed-fs/go/glog"
+ "github.com/chrislusf/weed-fs/go/storage"
)
type NodeId string
diff --git a/go/topology/store_replicate.go b/go/topology/store_replicate.go
index 6ea019bd8..0c52f9d30 100644
--- a/go/topology/store_replicate.go
+++ b/go/topology/store_replicate.go
@@ -2,12 +2,13 @@ package topology
import (
"bytes"
+ "net/http"
+ "strconv"
+
"github.com/chrislusf/weed-fs/go/glog"
"github.com/chrislusf/weed-fs/go/operation"
"github.com/chrislusf/weed-fs/go/storage"
"github.com/chrislusf/weed-fs/go/util"
- "net/http"
- "strconv"
)
func ReplicatedWrite(masterNode string, s *storage.Store, volumeId storage.VolumeId, needle *storage.Needle, r *http.Request) (size uint32, errorStatus string) {
diff --git a/go/topology/topology.go b/go/topology/topology.go
index c90e8de0b..c2073ed2f 100644
--- a/go/topology/topology.go
+++ b/go/topology/topology.go
@@ -1,20 +1,22 @@
package topology
import (
+ "errors"
+ "io/ioutil"
+ "math/rand"
+
"github.com/chrislusf/weed-fs/go/glog"
"github.com/chrislusf/weed-fs/go/operation"
"github.com/chrislusf/weed-fs/go/sequence"
"github.com/chrislusf/weed-fs/go/storage"
- "errors"
+ "github.com/chrislusf/weed-fs/go/util"
"github.com/goraft/raft"
- "io/ioutil"
- "math/rand"
)
type Topology struct {
NodeImpl
- collectionMap map[string]*Collection
+ collectionMap *util.ConcurrentReadMap
pulse int64
@@ -37,7 +39,7 @@ func NewTopology(id string, confFile string, seq sequence.Sequencer, volumeSizeL
t.nodeType = "Topology"
t.NodeImpl.value = t
t.children = make(map[NodeId]Node)
- t.collectionMap = make(map[string]*Collection)
+ t.collectionMap = util.NewConcurrentReadMap()
t.pulse = int64(pulse)
t.volumeSizeLimit = volumeSizeLimit
@@ -89,14 +91,14 @@ func (t *Topology) loadConfiguration(configurationFile string) error {
func (t *Topology) Lookup(collection string, vid storage.VolumeId) []*DataNode {
//maybe an issue if lots of collections?
if collection == "" {
- for _, c := range t.collectionMap {
- if list := c.Lookup(vid); list != nil {
+ for _, c := range t.collectionMap.Items {
+ if list := c.(*Collection).Lookup(vid); list != nil {
return list
}
}
} else {
- if c, ok := t.collectionMap[collection]; ok {
- return c.Lookup(vid)
+ if c, ok := t.collectionMap.Items[collection]; ok {
+ return c.(*Collection).Lookup(vid)
}
}
return nil
@@ -109,7 +111,7 @@ func (t *Topology) NextVolumeId() storage.VolumeId {
return next
}
-func (t *Topology) HasWriableVolume(option *VolumeGrowOption) bool {
+func (t *Topology) HasWritableVolume(option *VolumeGrowOption) bool {
vl := t.GetVolumeLayout(option.Collection, option.ReplicaPlacement, option.Ttl)
return vl.GetActiveVolumeCount(option) > 0
}
@@ -124,20 +126,18 @@ func (t *Topology) PickForWrite(count int, option *VolumeGrowOption) (string, in
}
func (t *Topology) GetVolumeLayout(collectionName string, rp *storage.ReplicaPlacement, ttl *storage.TTL) *VolumeLayout {
- _, ok := t.collectionMap[collectionName]
- if !ok {
- t.collectionMap[collectionName] = NewCollection(collectionName, t.volumeSizeLimit)
- }
- return t.collectionMap[collectionName].GetOrCreateVolumeLayout(rp, ttl)
+ return t.collectionMap.Get(collectionName, func() interface{} {
+ return NewCollection(collectionName, t.volumeSizeLimit)
+ }).(*Collection).GetOrCreateVolumeLayout(rp, ttl)
}
-func (t *Topology) GetCollection(collectionName string) (collection *Collection, ok bool) {
- collection, ok = t.collectionMap[collectionName]
- return
+func (t *Topology) GetCollection(collectionName string) (*Collection, bool) {
+ c, hasCollection := t.collectionMap.Items[collectionName]
+ return c.(*Collection), hasCollection
}
func (t *Topology) DeleteCollection(collectionName string) {
- delete(t.collectionMap, collectionName)
+ delete(t.collectionMap.Items, collectionName)
}
func (t *Topology) RegisterVolumeLayout(v storage.VolumeInfo, dn *DataNode) {
diff --git a/go/topology/topology_event_handling.go b/go/topology/topology_event_handling.go
index eb4491484..7e36568b6 100644
--- a/go/topology/topology_event_handling.go
+++ b/go/topology/topology_event_handling.go
@@ -1,10 +1,11 @@
package topology
import (
- "github.com/chrislusf/weed-fs/go/glog"
- "github.com/chrislusf/weed-fs/go/storage"
"math/rand"
"time"
+
+ "github.com/chrislusf/weed-fs/go/glog"
+ "github.com/chrislusf/weed-fs/go/storage"
)
func (t *Topology) StartRefreshWritableVolumes(garbageThreshold string) {
diff --git a/go/topology/topology_map.go b/go/topology/topology_map.go
index d6400c988..6a1423ca8 100644
--- a/go/topology/topology_map.go
+++ b/go/topology/topology_map.go
@@ -1,7 +1,5 @@
package topology
-import ()
-
func (t *Topology) ToMap() interface{} {
m := make(map[string]interface{})
m["Max"] = t.GetMaxVolumeCount()
@@ -13,10 +11,11 @@ func (t *Topology) ToMap() interface{} {
}
m["DataCenters"] = dcs
var layouts []interface{}
- for _, c := range t.collectionMap {
- for _, layout := range c.storageType2VolumeLayout {
+ for _, col := range t.collectionMap.Items {
+ c := col.(*Collection)
+ for _, layout := range c.storageType2VolumeLayout.Items {
if layout != nil {
- tmp := layout.ToMap()
+ tmp := layout.(*VolumeLayout).ToMap()
tmp["collection"] = c.Name
layouts = append(layouts, tmp)
}
diff --git a/go/topology/topology_vacuum.go b/go/topology/topology_vacuum.go
index 72846f20b..d6fa2213e 100644
--- a/go/topology/topology_vacuum.go
+++ b/go/topology/topology_vacuum.go
@@ -1,13 +1,14 @@
package topology
import (
- "github.com/chrislusf/weed-fs/go/glog"
- "github.com/chrislusf/weed-fs/go/storage"
- "github.com/chrislusf/weed-fs/go/util"
"encoding/json"
"errors"
"net/url"
"time"
+
+ "github.com/chrislusf/weed-fs/go/glog"
+ "github.com/chrislusf/weed-fs/go/storage"
+ "github.com/chrislusf/weed-fs/go/util"
)
func batchVacuumVolumeCheck(vl *VolumeLayout, vid storage.VolumeId, locationlist *VolumeLocationList, garbageThreshold string) bool {
@@ -79,13 +80,15 @@ func batchVacuumVolumeCommit(vl *VolumeLayout, vid storage.VolumeId, locationlis
return isCommitSuccess
}
func (t *Topology) Vacuum(garbageThreshold string) int {
- for _, c := range t.collectionMap {
- for _, vl := range c.storageType2VolumeLayout {
+ for _, col := range t.collectionMap.Items {
+ c := col.(*Collection)
+ for _, vl := range c.storageType2VolumeLayout.Items {
if vl != nil {
- for vid, locationlist := range vl.vid2location {
- if batchVacuumVolumeCheck(vl, vid, locationlist, garbageThreshold) {
- if batchVacuumVolumeCompact(vl, vid, locationlist) {
- batchVacuumVolumeCommit(vl, vid, locationlist)
+ volumeLayout := vl.(*VolumeLayout)
+ for vid, locationlist := range volumeLayout.vid2location {
+ if batchVacuumVolumeCheck(volumeLayout, vid, locationlist, garbageThreshold) {
+ if batchVacuumVolumeCompact(volumeLayout, vid, locationlist) {
+ batchVacuumVolumeCommit(volumeLayout, vid, locationlist)
}
}
}
diff --git a/go/topology/volume_growth.go b/go/topology/volume_growth.go
index 2859d3992..6124c0da2 100644
--- a/go/topology/volume_growth.go
+++ b/go/topology/volume_growth.go
@@ -1,11 +1,12 @@
package topology
import (
- "github.com/chrislusf/weed-fs/go/glog"
- "github.com/chrislusf/weed-fs/go/storage"
"fmt"
"math/rand"
"sync"
+
+ "github.com/chrislusf/weed-fs/go/glog"
+ "github.com/chrislusf/weed-fs/go/storage"
)
/*
@@ -29,6 +30,10 @@ type VolumeGrowth struct {
accessLock sync.Mutex
}
+func (o *VolumeGrowOption) String() string {
+ return fmt.Sprintf("Collection:%s, ReplicaPlacement:%v, Ttl:%v, DataCenter:%s, Rack:%s, DataNode:%s", o.Collection, o.ReplicaPlacement, o.Ttl, o.DataCenter, o.Rack, o.DataNode)
+}
+
func NewDefaultVolumeGrowth() *VolumeGrowth {
return &VolumeGrowth{}
}
diff --git a/go/topology/volume_growth_test.go b/go/topology/volume_growth_test.go
index 5581c87ce..267b36042 100644
--- a/go/topology/volume_growth_test.go
+++ b/go/topology/volume_growth_test.go
@@ -1,11 +1,12 @@
package topology
import (
- "github.com/chrislusf/weed-fs/go/sequence"
- "github.com/chrislusf/weed-fs/go/storage"
"encoding/json"
"fmt"
"testing"
+
+ "github.com/chrislusf/weed-fs/go/sequence"
+ "github.com/chrislusf/weed-fs/go/storage"
)
var topologyLayout = `
diff --git a/go/topology/volume_layout.go b/go/topology/volume_layout.go
index 7bb0cf7e3..4b1d3dad9 100644
--- a/go/topology/volume_layout.go
+++ b/go/topology/volume_layout.go
@@ -1,11 +1,13 @@
package topology
import (
- "github.com/chrislusf/weed-fs/go/glog"
- "github.com/chrislusf/weed-fs/go/storage"
"errors"
+ "fmt"
"math/rand"
"sync"
+
+ "github.com/chrislusf/weed-fs/go/glog"
+ "github.com/chrislusf/weed-fs/go/storage"
)
// mapping from volume to its locations, inverted from server to volume
@@ -28,6 +30,10 @@ func NewVolumeLayout(rp *storage.ReplicaPlacement, ttl *storage.TTL, volumeSizeL
}
}
+func (vl *VolumeLayout) String() string {
+ return fmt.Sprintf("rp:%v, ttl:%v, vid2location:%v, writables:%v, volumeSizeLimit:%v", vl.rp, vl.ttl, vl.vid2location, vl.writables, vl.volumeSizeLimit)
+}
+
func (vl *VolumeLayout) RegisterVolume(v *storage.VolumeInfo, dn *DataNode) {
vl.accessLock.Lock()
defer vl.accessLock.Unlock()
diff --git a/go/topology/volume_location_list.go b/go/topology/volume_location_list.go
index 176f469b9..0f892c010 100644
--- a/go/topology/volume_location_list.go
+++ b/go/topology/volume_location_list.go
@@ -1,6 +1,8 @@
package topology
-import ()
+import (
+ "fmt"
+)
type VolumeLocationList struct {
list []*DataNode
@@ -10,6 +12,10 @@ func NewVolumeLocationList() *VolumeLocationList {
return &VolumeLocationList{}
}
+func (dnll *VolumeLocationList) String() string {
+ return fmt.Sprintf("%v", dnll.list)
+}
+
func (dnll *VolumeLocationList) Head() *DataNode {
return dnll.list[0]
}