diff options
Diffstat (limited to 'go/topology')
| -rw-r--r-- | go/topology/allocate_volume.go | 5 | ||||
| -rw-r--r-- | go/topology/collection.go | 27 | ||||
| -rw-r--r-- | go/topology/data_center.go | 2 | ||||
| -rw-r--r-- | go/topology/data_node.go | 8 | ||||
| -rw-r--r-- | go/topology/node.go | 5 | ||||
| -rw-r--r-- | go/topology/store_replicate.go | 5 | ||||
| -rw-r--r-- | go/topology/topology.go | 38 | ||||
| -rw-r--r-- | go/topology/topology_event_handling.go | 5 | ||||
| -rw-r--r-- | go/topology/topology_map.go | 9 | ||||
| -rw-r--r-- | go/topology/topology_vacuum.go | 21 | ||||
| -rw-r--r-- | go/topology/volume_growth.go | 9 | ||||
| -rw-r--r-- | go/topology/volume_growth_test.go | 5 | ||||
| -rw-r--r-- | go/topology/volume_layout.go | 10 | ||||
| -rw-r--r-- | go/topology/volume_location_list.go | 8 |
14 files changed, 96 insertions, 61 deletions
diff --git a/go/topology/allocate_volume.go b/go/topology/allocate_volume.go index 6562e9ac5..a791b4c1c 100644 --- a/go/topology/allocate_volume.go +++ b/go/topology/allocate_volume.go @@ -1,11 +1,12 @@ package topology import ( - "github.com/chrislusf/weed-fs/go/storage" - "github.com/chrislusf/weed-fs/go/util" "encoding/json" "errors" "net/url" + + "github.com/chrislusf/weed-fs/go/storage" + "github.com/chrislusf/weed-fs/go/util" ) type AllocateVolumeResult struct { diff --git a/go/topology/collection.go b/go/topology/collection.go index 506f43fbf..5437ffd79 100644 --- a/go/topology/collection.go +++ b/go/topology/collection.go @@ -1,36 +1,43 @@ package topology import ( + "fmt" + "github.com/chrislusf/weed-fs/go/storage" + "github.com/chrislusf/weed-fs/go/util" ) type Collection struct { Name string volumeSizeLimit uint64 - storageType2VolumeLayout map[string]*VolumeLayout + storageType2VolumeLayout *util.ConcurrentReadMap } func NewCollection(name string, volumeSizeLimit uint64) *Collection { c := &Collection{Name: name, volumeSizeLimit: volumeSizeLimit} - c.storageType2VolumeLayout = make(map[string]*VolumeLayout) + c.storageType2VolumeLayout = util.NewConcurrentReadMap() return c } +func (c *Collection) String() string { + return fmt.Sprintf("Name:%s, volumeSizeLimit:%d, storageType2VolumeLayout:%v", c.Name, c.volumeSizeLimit, c.storageType2VolumeLayout) +} + func (c *Collection) GetOrCreateVolumeLayout(rp *storage.ReplicaPlacement, ttl *storage.TTL) *VolumeLayout { keyString := rp.String() if ttl != nil { keyString += ttl.String() } - if c.storageType2VolumeLayout[keyString] == nil { - c.storageType2VolumeLayout[keyString] = NewVolumeLayout(rp, ttl, c.volumeSizeLimit) - } - return c.storageType2VolumeLayout[keyString] + vl := c.storageType2VolumeLayout.Get(keyString, func() interface{} { + return NewVolumeLayout(rp, ttl, c.volumeSizeLimit) + }) + return vl.(*VolumeLayout) } func (c *Collection) Lookup(vid storage.VolumeId) []*DataNode { - for _, vl := range c.storageType2VolumeLayout { + for _, vl := range c.storageType2VolumeLayout.Items { if vl != nil { - if list := vl.Lookup(vid); list != nil { + if list := vl.(*VolumeLayout).Lookup(vid); list != nil { return list } } @@ -39,9 +46,9 @@ func (c *Collection) Lookup(vid storage.VolumeId) []*DataNode { } func (c *Collection) ListVolumeServers() (nodes []*DataNode) { - for _, vl := range c.storageType2VolumeLayout { + for _, vl := range c.storageType2VolumeLayout.Items { if vl != nil { - if list := vl.ListVolumeServers(); list != nil { + if list := vl.(*VolumeLayout).ListVolumeServers(); list != nil { nodes = append(nodes, list...) } } diff --git a/go/topology/data_center.go b/go/topology/data_center.go index ebd07803b..bcf2dfd31 100644 --- a/go/topology/data_center.go +++ b/go/topology/data_center.go @@ -1,7 +1,5 @@ package topology -import () - type DataCenter struct { NodeImpl } diff --git a/go/topology/data_node.go b/go/topology/data_node.go index c3b90470f..09b9fac6c 100644 --- a/go/topology/data_node.go +++ b/go/topology/data_node.go @@ -1,9 +1,11 @@ package topology import ( + "fmt" + "strconv" + "github.com/chrislusf/weed-fs/go/glog" "github.com/chrislusf/weed-fs/go/storage" - "strconv" ) type DataNode struct { @@ -25,6 +27,10 @@ func NewDataNode(id string) *DataNode { return s } +func (dn *DataNode) String() string { + return fmt.Sprintf("NodeImpl:%s ,volumes:%v, Ip:%s, Port:%d, PublicUrl:%s, Dead:%v", dn.NodeImpl.String(), dn.volumes, dn.Ip, dn.Port, dn.PublicUrl, dn.Dead) +} + func (dn *DataNode) AddOrUpdateVolume(v storage.VolumeInfo) { if _, ok := dn.volumes[v.Id]; !ok { dn.volumes[v.Id] = v diff --git a/go/topology/node.go b/go/topology/node.go index 54118802e..10955fa72 100644 --- a/go/topology/node.go +++ b/go/topology/node.go @@ -1,11 +1,12 @@ package topology import ( - "github.com/chrislusf/weed-fs/go/glog" - "github.com/chrislusf/weed-fs/go/storage" "errors" "math/rand" "strings" + + "github.com/chrislusf/weed-fs/go/glog" + "github.com/chrislusf/weed-fs/go/storage" ) type NodeId string diff --git a/go/topology/store_replicate.go b/go/topology/store_replicate.go index 6ea019bd8..0c52f9d30 100644 --- a/go/topology/store_replicate.go +++ b/go/topology/store_replicate.go @@ -2,12 +2,13 @@ package topology import ( "bytes" + "net/http" + "strconv" + "github.com/chrislusf/weed-fs/go/glog" "github.com/chrislusf/weed-fs/go/operation" "github.com/chrislusf/weed-fs/go/storage" "github.com/chrislusf/weed-fs/go/util" - "net/http" - "strconv" ) func ReplicatedWrite(masterNode string, s *storage.Store, volumeId storage.VolumeId, needle *storage.Needle, r *http.Request) (size uint32, errorStatus string) { diff --git a/go/topology/topology.go b/go/topology/topology.go index c90e8de0b..c2073ed2f 100644 --- a/go/topology/topology.go +++ b/go/topology/topology.go @@ -1,20 +1,22 @@ package topology import ( + "errors" + "io/ioutil" + "math/rand" + "github.com/chrislusf/weed-fs/go/glog" "github.com/chrislusf/weed-fs/go/operation" "github.com/chrislusf/weed-fs/go/sequence" "github.com/chrislusf/weed-fs/go/storage" - "errors" + "github.com/chrislusf/weed-fs/go/util" "github.com/goraft/raft" - "io/ioutil" - "math/rand" ) type Topology struct { NodeImpl - collectionMap map[string]*Collection + collectionMap *util.ConcurrentReadMap pulse int64 @@ -37,7 +39,7 @@ func NewTopology(id string, confFile string, seq sequence.Sequencer, volumeSizeL t.nodeType = "Topology" t.NodeImpl.value = t t.children = make(map[NodeId]Node) - t.collectionMap = make(map[string]*Collection) + t.collectionMap = util.NewConcurrentReadMap() t.pulse = int64(pulse) t.volumeSizeLimit = volumeSizeLimit @@ -89,14 +91,14 @@ func (t *Topology) loadConfiguration(configurationFile string) error { func (t *Topology) Lookup(collection string, vid storage.VolumeId) []*DataNode { //maybe an issue if lots of collections? if collection == "" { - for _, c := range t.collectionMap { - if list := c.Lookup(vid); list != nil { + for _, c := range t.collectionMap.Items { + if list := c.(*Collection).Lookup(vid); list != nil { return list } } } else { - if c, ok := t.collectionMap[collection]; ok { - return c.Lookup(vid) + if c, ok := t.collectionMap.Items[collection]; ok { + return c.(*Collection).Lookup(vid) } } return nil @@ -109,7 +111,7 @@ func (t *Topology) NextVolumeId() storage.VolumeId { return next } -func (t *Topology) HasWriableVolume(option *VolumeGrowOption) bool { +func (t *Topology) HasWritableVolume(option *VolumeGrowOption) bool { vl := t.GetVolumeLayout(option.Collection, option.ReplicaPlacement, option.Ttl) return vl.GetActiveVolumeCount(option) > 0 } @@ -124,20 +126,18 @@ func (t *Topology) PickForWrite(count int, option *VolumeGrowOption) (string, in } func (t *Topology) GetVolumeLayout(collectionName string, rp *storage.ReplicaPlacement, ttl *storage.TTL) *VolumeLayout { - _, ok := t.collectionMap[collectionName] - if !ok { - t.collectionMap[collectionName] = NewCollection(collectionName, t.volumeSizeLimit) - } - return t.collectionMap[collectionName].GetOrCreateVolumeLayout(rp, ttl) + return t.collectionMap.Get(collectionName, func() interface{} { + return NewCollection(collectionName, t.volumeSizeLimit) + }).(*Collection).GetOrCreateVolumeLayout(rp, ttl) } -func (t *Topology) GetCollection(collectionName string) (collection *Collection, ok bool) { - collection, ok = t.collectionMap[collectionName] - return +func (t *Topology) GetCollection(collectionName string) (*Collection, bool) { + c, hasCollection := t.collectionMap.Items[collectionName] + return c.(*Collection), hasCollection } func (t *Topology) DeleteCollection(collectionName string) { - delete(t.collectionMap, collectionName) + delete(t.collectionMap.Items, collectionName) } func (t *Topology) RegisterVolumeLayout(v storage.VolumeInfo, dn *DataNode) { diff --git a/go/topology/topology_event_handling.go b/go/topology/topology_event_handling.go index eb4491484..7e36568b6 100644 --- a/go/topology/topology_event_handling.go +++ b/go/topology/topology_event_handling.go @@ -1,10 +1,11 @@ package topology import ( - "github.com/chrislusf/weed-fs/go/glog" - "github.com/chrislusf/weed-fs/go/storage" "math/rand" "time" + + "github.com/chrislusf/weed-fs/go/glog" + "github.com/chrislusf/weed-fs/go/storage" ) func (t *Topology) StartRefreshWritableVolumes(garbageThreshold string) { diff --git a/go/topology/topology_map.go b/go/topology/topology_map.go index d6400c988..6a1423ca8 100644 --- a/go/topology/topology_map.go +++ b/go/topology/topology_map.go @@ -1,7 +1,5 @@ package topology -import () - func (t *Topology) ToMap() interface{} { m := make(map[string]interface{}) m["Max"] = t.GetMaxVolumeCount() @@ -13,10 +11,11 @@ func (t *Topology) ToMap() interface{} { } m["DataCenters"] = dcs var layouts []interface{} - for _, c := range t.collectionMap { - for _, layout := range c.storageType2VolumeLayout { + for _, col := range t.collectionMap.Items { + c := col.(*Collection) + for _, layout := range c.storageType2VolumeLayout.Items { if layout != nil { - tmp := layout.ToMap() + tmp := layout.(*VolumeLayout).ToMap() tmp["collection"] = c.Name layouts = append(layouts, tmp) } diff --git a/go/topology/topology_vacuum.go b/go/topology/topology_vacuum.go index 72846f20b..d6fa2213e 100644 --- a/go/topology/topology_vacuum.go +++ b/go/topology/topology_vacuum.go @@ -1,13 +1,14 @@ package topology import ( - "github.com/chrislusf/weed-fs/go/glog" - "github.com/chrislusf/weed-fs/go/storage" - "github.com/chrislusf/weed-fs/go/util" "encoding/json" "errors" "net/url" "time" + + "github.com/chrislusf/weed-fs/go/glog" + "github.com/chrislusf/weed-fs/go/storage" + "github.com/chrislusf/weed-fs/go/util" ) func batchVacuumVolumeCheck(vl *VolumeLayout, vid storage.VolumeId, locationlist *VolumeLocationList, garbageThreshold string) bool { @@ -79,13 +80,15 @@ func batchVacuumVolumeCommit(vl *VolumeLayout, vid storage.VolumeId, locationlis return isCommitSuccess } func (t *Topology) Vacuum(garbageThreshold string) int { - for _, c := range t.collectionMap { - for _, vl := range c.storageType2VolumeLayout { + for _, col := range t.collectionMap.Items { + c := col.(*Collection) + for _, vl := range c.storageType2VolumeLayout.Items { if vl != nil { - for vid, locationlist := range vl.vid2location { - if batchVacuumVolumeCheck(vl, vid, locationlist, garbageThreshold) { - if batchVacuumVolumeCompact(vl, vid, locationlist) { - batchVacuumVolumeCommit(vl, vid, locationlist) + volumeLayout := vl.(*VolumeLayout) + for vid, locationlist := range volumeLayout.vid2location { + if batchVacuumVolumeCheck(volumeLayout, vid, locationlist, garbageThreshold) { + if batchVacuumVolumeCompact(volumeLayout, vid, locationlist) { + batchVacuumVolumeCommit(volumeLayout, vid, locationlist) } } } diff --git a/go/topology/volume_growth.go b/go/topology/volume_growth.go index 2859d3992..6124c0da2 100644 --- a/go/topology/volume_growth.go +++ b/go/topology/volume_growth.go @@ -1,11 +1,12 @@ package topology import ( - "github.com/chrislusf/weed-fs/go/glog" - "github.com/chrislusf/weed-fs/go/storage" "fmt" "math/rand" "sync" + + "github.com/chrislusf/weed-fs/go/glog" + "github.com/chrislusf/weed-fs/go/storage" ) /* @@ -29,6 +30,10 @@ type VolumeGrowth struct { accessLock sync.Mutex } +func (o *VolumeGrowOption) String() string { + return fmt.Sprintf("Collection:%s, ReplicaPlacement:%v, Ttl:%v, DataCenter:%s, Rack:%s, DataNode:%s", o.Collection, o.ReplicaPlacement, o.Ttl, o.DataCenter, o.Rack, o.DataNode) +} + func NewDefaultVolumeGrowth() *VolumeGrowth { return &VolumeGrowth{} } diff --git a/go/topology/volume_growth_test.go b/go/topology/volume_growth_test.go index 5581c87ce..267b36042 100644 --- a/go/topology/volume_growth_test.go +++ b/go/topology/volume_growth_test.go @@ -1,11 +1,12 @@ package topology import ( - "github.com/chrislusf/weed-fs/go/sequence" - "github.com/chrislusf/weed-fs/go/storage" "encoding/json" "fmt" "testing" + + "github.com/chrislusf/weed-fs/go/sequence" + "github.com/chrislusf/weed-fs/go/storage" ) var topologyLayout = ` diff --git a/go/topology/volume_layout.go b/go/topology/volume_layout.go index 7bb0cf7e3..4b1d3dad9 100644 --- a/go/topology/volume_layout.go +++ b/go/topology/volume_layout.go @@ -1,11 +1,13 @@ package topology import ( - "github.com/chrislusf/weed-fs/go/glog" - "github.com/chrislusf/weed-fs/go/storage" "errors" + "fmt" "math/rand" "sync" + + "github.com/chrislusf/weed-fs/go/glog" + "github.com/chrislusf/weed-fs/go/storage" ) // mapping from volume to its locations, inverted from server to volume @@ -28,6 +30,10 @@ func NewVolumeLayout(rp *storage.ReplicaPlacement, ttl *storage.TTL, volumeSizeL } } +func (vl *VolumeLayout) String() string { + return fmt.Sprintf("rp:%v, ttl:%v, vid2location:%v, writables:%v, volumeSizeLimit:%v", vl.rp, vl.ttl, vl.vid2location, vl.writables, vl.volumeSizeLimit) +} + func (vl *VolumeLayout) RegisterVolume(v *storage.VolumeInfo, dn *DataNode) { vl.accessLock.Lock() defer vl.accessLock.Unlock() diff --git a/go/topology/volume_location_list.go b/go/topology/volume_location_list.go index 176f469b9..0f892c010 100644 --- a/go/topology/volume_location_list.go +++ b/go/topology/volume_location_list.go @@ -1,6 +1,8 @@ package topology -import () +import ( + "fmt" +) type VolumeLocationList struct { list []*DataNode @@ -10,6 +12,10 @@ func NewVolumeLocationList() *VolumeLocationList { return &VolumeLocationList{} } +func (dnll *VolumeLocationList) String() string { + return fmt.Sprintf("%v", dnll.list) +} + func (dnll *VolumeLocationList) Head() *DataNode { return dnll.list[0] } |
