aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorchrislusf <chrislu@Chriss-MacBook-Air.local>2014-12-08 20:29:25 -0800
committerchrislusf <chrislu@Chriss-MacBook-Air.local>2014-12-08 20:29:25 -0800
commit52180f386b044af7a6daba3e33ff04b5e9da25ba (patch)
tree1490ad6c11f986da12a421e717767402dc993d3c
parentba972694c730429889c696bd9853a38843f64f65 (diff)
downloadseaweedfs-52180f386b044af7a6daba3e33ff04b5e9da25ba.tar.xz
seaweedfs-52180f386b044af7a6daba3e33ff04b5e9da25ba.zip
Add read-write lock to guard topology changes on new collections and ttls.
-rw-r--r--go/topology/collection.go21
-rw-r--r--go/topology/topology.go29
-rw-r--r--go/topology/topology_map.go7
-rw-r--r--go/topology/topology_vacuum.go14
-rw-r--r--go/util/concurrent_read_map.go37
5 files changed, 74 insertions, 34 deletions
diff --git a/go/topology/collection.go b/go/topology/collection.go
index 506f43fbf..4b47ae88a 100644
--- a/go/topology/collection.go
+++ b/go/topology/collection.go
@@ -2,17 +2,18 @@ package topology
import (
"github.com/chrislusf/weed-fs/go/storage"
+ "github.com/chrislusf/weed-fs/go/util"
)
type Collection struct {
Name string
volumeSizeLimit uint64
- storageType2VolumeLayout map[string]*VolumeLayout
+ storageType2VolumeLayout *util.ConcurrentReadMap
}
func NewCollection(name string, volumeSizeLimit uint64) *Collection {
c := &Collection{Name: name, volumeSizeLimit: volumeSizeLimit}
- c.storageType2VolumeLayout = make(map[string]*VolumeLayout)
+ c.storageType2VolumeLayout = util.NewConcurrentReadMap()
return c
}
@@ -21,16 +22,16 @@ func (c *Collection) GetOrCreateVolumeLayout(rp *storage.ReplicaPlacement, ttl *
if ttl != nil {
keyString += ttl.String()
}
- if c.storageType2VolumeLayout[keyString] == nil {
- c.storageType2VolumeLayout[keyString] = NewVolumeLayout(rp, ttl, c.volumeSizeLimit)
- }
- return c.storageType2VolumeLayout[keyString]
+ vl := c.storageType2VolumeLayout.Get(keyString, func() interface{} {
+ return NewVolumeLayout(rp, ttl, c.volumeSizeLimit)
+ })
+ return vl.(*VolumeLayout)
}
func (c *Collection) Lookup(vid storage.VolumeId) []*DataNode {
- for _, vl := range c.storageType2VolumeLayout {
+ for _, vl := range c.storageType2VolumeLayout.Items {
if vl != nil {
- if list := vl.Lookup(vid); list != nil {
+ if list := vl.(*VolumeLayout).Lookup(vid); list != nil {
return list
}
}
@@ -39,9 +40,9 @@ func (c *Collection) Lookup(vid storage.VolumeId) []*DataNode {
}
func (c *Collection) ListVolumeServers() (nodes []*DataNode) {
- for _, vl := range c.storageType2VolumeLayout {
+ for _, vl := range c.storageType2VolumeLayout.Items {
if vl != nil {
- if list := vl.ListVolumeServers(); list != nil {
+ if list := vl.(*VolumeLayout).ListVolumeServers(); list != nil {
nodes = append(nodes, list...)
}
}
diff --git a/go/topology/topology.go b/go/topology/topology.go
index eb64d336c..c2073ed2f 100644
--- a/go/topology/topology.go
+++ b/go/topology/topology.go
@@ -9,13 +9,14 @@ import (
"github.com/chrislusf/weed-fs/go/operation"
"github.com/chrislusf/weed-fs/go/sequence"
"github.com/chrislusf/weed-fs/go/storage"
+ "github.com/chrislusf/weed-fs/go/util"
"github.com/goraft/raft"
)
type Topology struct {
NodeImpl
- collectionMap map[string]*Collection
+ collectionMap *util.ConcurrentReadMap
pulse int64
@@ -38,7 +39,7 @@ func NewTopology(id string, confFile string, seq sequence.Sequencer, volumeSizeL
t.nodeType = "Topology"
t.NodeImpl.value = t
t.children = make(map[NodeId]Node)
- t.collectionMap = make(map[string]*Collection)
+ t.collectionMap = util.NewConcurrentReadMap()
t.pulse = int64(pulse)
t.volumeSizeLimit = volumeSizeLimit
@@ -90,14 +91,14 @@ func (t *Topology) loadConfiguration(configurationFile string) error {
func (t *Topology) Lookup(collection string, vid storage.VolumeId) []*DataNode {
//maybe an issue if lots of collections?
if collection == "" {
- for _, c := range t.collectionMap {
- if list := c.Lookup(vid); list != nil {
+ for _, c := range t.collectionMap.Items {
+ if list := c.(*Collection).Lookup(vid); list != nil {
return list
}
}
} else {
- if c, ok := t.collectionMap[collection]; ok {
- return c.Lookup(vid)
+ if c, ok := t.collectionMap.Items[collection]; ok {
+ return c.(*Collection).Lookup(vid)
}
}
return nil
@@ -125,20 +126,18 @@ func (t *Topology) PickForWrite(count int, option *VolumeGrowOption) (string, in
}
func (t *Topology) GetVolumeLayout(collectionName string, rp *storage.ReplicaPlacement, ttl *storage.TTL) *VolumeLayout {
- _, ok := t.collectionMap[collectionName]
- if !ok {
- t.collectionMap[collectionName] = NewCollection(collectionName, t.volumeSizeLimit)
- }
- return t.collectionMap[collectionName].GetOrCreateVolumeLayout(rp, ttl)
+ return t.collectionMap.Get(collectionName, func() interface{} {
+ return NewCollection(collectionName, t.volumeSizeLimit)
+ }).(*Collection).GetOrCreateVolumeLayout(rp, ttl)
}
-func (t *Topology) GetCollection(collectionName string) (collection *Collection, ok bool) {
- collection, ok = t.collectionMap[collectionName]
- return
+func (t *Topology) GetCollection(collectionName string) (*Collection, bool) {
+ c, hasCollection := t.collectionMap.Items[collectionName]
+ return c.(*Collection), hasCollection
}
func (t *Topology) DeleteCollection(collectionName string) {
- delete(t.collectionMap, collectionName)
+ delete(t.collectionMap.Items, collectionName)
}
func (t *Topology) RegisterVolumeLayout(v storage.VolumeInfo, dn *DataNode) {
diff --git a/go/topology/topology_map.go b/go/topology/topology_map.go
index af95c6536..6a1423ca8 100644
--- a/go/topology/topology_map.go
+++ b/go/topology/topology_map.go
@@ -11,10 +11,11 @@ func (t *Topology) ToMap() interface{} {
}
m["DataCenters"] = dcs
var layouts []interface{}
- for _, c := range t.collectionMap {
- for _, layout := range c.storageType2VolumeLayout {
+ for _, col := range t.collectionMap.Items {
+ c := col.(*Collection)
+ for _, layout := range c.storageType2VolumeLayout.Items {
if layout != nil {
- tmp := layout.ToMap()
+ tmp := layout.(*VolumeLayout).ToMap()
tmp["collection"] = c.Name
layouts = append(layouts, tmp)
}
diff --git a/go/topology/topology_vacuum.go b/go/topology/topology_vacuum.go
index 97a76026d..d6fa2213e 100644
--- a/go/topology/topology_vacuum.go
+++ b/go/topology/topology_vacuum.go
@@ -80,13 +80,15 @@ func batchVacuumVolumeCommit(vl *VolumeLayout, vid storage.VolumeId, locationlis
return isCommitSuccess
}
func (t *Topology) Vacuum(garbageThreshold string) int {
- for _, c := range t.collectionMap {
- for _, vl := range c.storageType2VolumeLayout {
+ for _, col := range t.collectionMap.Items {
+ c := col.(*Collection)
+ for _, vl := range c.storageType2VolumeLayout.Items {
if vl != nil {
- for vid, locationlist := range vl.vid2location {
- if batchVacuumVolumeCheck(vl, vid, locationlist, garbageThreshold) {
- if batchVacuumVolumeCompact(vl, vid, locationlist) {
- batchVacuumVolumeCommit(vl, vid, locationlist)
+ volumeLayout := vl.(*VolumeLayout)
+ for vid, locationlist := range volumeLayout.vid2location {
+ if batchVacuumVolumeCheck(volumeLayout, vid, locationlist, garbageThreshold) {
+ if batchVacuumVolumeCompact(volumeLayout, vid, locationlist) {
+ batchVacuumVolumeCommit(volumeLayout, vid, locationlist)
}
}
}
diff --git a/go/util/concurrent_read_map.go b/go/util/concurrent_read_map.go
new file mode 100644
index 000000000..d16fdbcaf
--- /dev/null
+++ b/go/util/concurrent_read_map.go
@@ -0,0 +1,37 @@
+package util
+
+import "sync"
+
+// A mostly for read map, which can thread-safely
+// initialize the map entries.
+type ConcurrentReadMap struct {
+ rmutex sync.RWMutex
+ mutex sync.Mutex
+ Items map[string]interface{}
+}
+
+func NewConcurrentReadMap() *ConcurrentReadMap {
+ return &ConcurrentReadMap{Items: make(map[string]interface{})}
+}
+
+func (m *ConcurrentReadMap) initMapEntry(key string, newEntry func() interface{}) (value interface{}) {
+ m.mutex.Lock()
+ defer m.mutex.Unlock()
+ if value, ok := m.Items[key]; ok {
+ return value
+ }
+ value = newEntry()
+ m.Items[key] = value
+ return value
+}
+
+func (m *ConcurrentReadMap) Get(key string, newEntry func() interface{}) interface{} {
+ m.rmutex.RLock()
+ if value, ok := m.Items[key]; ok {
+ m.rmutex.RUnlock()
+ return value
+ } else {
+ m.rmutex.RUnlock()
+ return m.initMapEntry(key, newEntry)
+ }
+}