aboutsummaryrefslogtreecommitdiff
path: root/go
diff options
context:
space:
mode:
Diffstat (limited to 'go')
-rw-r--r--go/topology/collection.go4
-rw-r--r--go/topology/data_node.go2
-rw-r--r--go/topology/topology.go10
-rw-r--r--go/topology/topology_map.go4
-rw-r--r--go/topology/topology_vacuum.go4
-rw-r--r--go/util/concurrent_read_map.go44
-rw-r--r--go/weed/weed_server/master_server_handlers_admin.go2
7 files changed, 45 insertions, 25 deletions
diff --git a/go/topology/collection.go b/go/topology/collection.go
index 376b62405..6368900c3 100644
--- a/go/topology/collection.go
+++ b/go/topology/collection.go
@@ -35,7 +35,7 @@ func (c *Collection) GetOrCreateVolumeLayout(rp *storage.ReplicaPlacement, ttl *
}
func (c *Collection) Lookup(vid storage.VolumeId) []*DataNode {
- for _, vl := range c.storageType2VolumeLayout.Items {
+ for _, vl := range c.storageType2VolumeLayout.Items() {
if vl != nil {
if list := vl.(*VolumeLayout).Lookup(vid); list != nil {
return list
@@ -46,7 +46,7 @@ func (c *Collection) Lookup(vid storage.VolumeId) []*DataNode {
}
func (c *Collection) ListVolumeServers() (nodes []*DataNode) {
- for _, vl := range c.storageType2VolumeLayout.Items {
+ for _, vl := range c.storageType2VolumeLayout.Items() {
if vl != nil {
if list := vl.(*VolumeLayout).ListVolumeServers(); list != nil {
nodes = append(nodes, list...)
diff --git a/go/topology/data_node.go b/go/topology/data_node.go
index 01419a791..3bad8c188 100644
--- a/go/topology/data_node.go
+++ b/go/topology/data_node.go
@@ -3,7 +3,6 @@ package topology
import (
"fmt"
"strconv"
- "sync"
"github.com/chrislusf/seaweedfs/go/glog"
"github.com/chrislusf/seaweedfs/go/storage"
@@ -11,7 +10,6 @@ import (
type DataNode struct {
NodeImpl
- sync.RWMutex
volumes map[storage.VolumeId]storage.VolumeInfo
Ip string
Port int
diff --git a/go/topology/topology.go b/go/topology/topology.go
index ee1477cd2..088639eef 100644
--- a/go/topology/topology.go
+++ b/go/topology/topology.go
@@ -90,13 +90,13 @@ func (t *Topology) loadConfiguration(configurationFile string) error {
func (t *Topology) Lookup(collection string, vid storage.VolumeId) []*DataNode {
//maybe an issue if lots of collections?
if collection == "" {
- for _, c := range t.collectionMap.Items {
+ for _, c := range t.collectionMap.Items() {
if list := c.(*Collection).Lookup(vid); list != nil {
return list
}
}
} else {
- if c, ok := t.collectionMap.Items[collection]; ok {
+ if c, ok := t.collectionMap.Find(collection); ok {
return c.(*Collection).Lookup(vid)
}
}
@@ -130,13 +130,13 @@ func (t *Topology) GetVolumeLayout(collectionName string, rp *storage.ReplicaPla
}).(*Collection).GetOrCreateVolumeLayout(rp, ttl)
}
-func (t *Topology) GetCollection(collectionName string) (*Collection, bool) {
- c, hasCollection := t.collectionMap.Items[collectionName]
+func (t *Topology) FindCollection(collectionName string) (*Collection, bool) {
+ c, hasCollection := t.collectionMap.Find(collectionName)
return c.(*Collection), hasCollection
}
func (t *Topology) DeleteCollection(collectionName string) {
- delete(t.collectionMap.Items, collectionName)
+ t.collectionMap.Delete(collectionName)
}
func (t *Topology) RegisterVolumeLayout(v storage.VolumeInfo, dn *DataNode) {
diff --git a/go/topology/topology_map.go b/go/topology/topology_map.go
index dff11aaad..ce8e9e663 100644
--- a/go/topology/topology_map.go
+++ b/go/topology/topology_map.go
@@ -11,9 +11,9 @@ func (t *Topology) ToMap() interface{} {
}
m["DataCenters"] = dcs
var layouts []interface{}
- for _, col := range t.collectionMap.Items {
+ for _, col := range t.collectionMap.Items() {
c := col.(*Collection)
- for _, layout := range c.storageType2VolumeLayout.Items {
+ for _, layout := range c.storageType2VolumeLayout.Items() {
if layout != nil {
tmp := layout.(*VolumeLayout).ToMap()
tmp["collection"] = c.Name
diff --git a/go/topology/topology_vacuum.go b/go/topology/topology_vacuum.go
index 48bc8311d..eeb4fef69 100644
--- a/go/topology/topology_vacuum.go
+++ b/go/topology/topology_vacuum.go
@@ -81,10 +81,10 @@ func batchVacuumVolumeCommit(vl *VolumeLayout, vid storage.VolumeId, locationlis
}
func (t *Topology) Vacuum(garbageThreshold string) int {
glog.V(0).Infoln("Start vacuum on demand")
- for _, col := range t.collectionMap.Items {
+ for _, col := range t.collectionMap.Items() {
c := col.(*Collection)
glog.V(0).Infoln("vacuum on collection:", c.Name)
- for _, vl := range c.storageType2VolumeLayout.Items {
+ for _, vl := range c.storageType2VolumeLayout.Items() {
if vl != nil {
volumeLayout := vl.(*VolumeLayout)
for vid, locationlist := range volumeLayout.vid2location {
diff --git a/go/util/concurrent_read_map.go b/go/util/concurrent_read_map.go
index ca1109f22..bbf303d82 100644
--- a/go/util/concurrent_read_map.go
+++ b/go/util/concurrent_read_map.go
@@ -7,31 +7,53 @@ import (
// A mostly for read map, which can thread-safely
// initialize the map entries.
type ConcurrentReadMap struct {
- rmutex sync.RWMutex
- Items map[string]interface{}
+ sync.RWMutex
+
+ items map[string]interface{}
}
func NewConcurrentReadMap() *ConcurrentReadMap {
- return &ConcurrentReadMap{Items: make(map[string]interface{})}
+ return &ConcurrentReadMap{items: make(map[string]interface{})}
}
func (m *ConcurrentReadMap) initMapEntry(key string, newEntry func() interface{}) (value interface{}) {
- m.rmutex.Lock()
- defer m.rmutex.Unlock()
- if value, ok := m.Items[key]; ok {
+ m.Lock()
+ defer m.Unlock()
+ if value, ok := m.items[key]; ok {
return value
}
value = newEntry()
- m.Items[key] = value
+ m.items[key] = value
return value
}
func (m *ConcurrentReadMap) Get(key string, newEntry func() interface{}) interface{} {
- m.rmutex.RLock()
- if value, ok := m.Items[key]; ok {
- m.rmutex.RUnlock()
+ m.RLock()
+ if value, ok := m.items[key]; ok {
+ m.RUnlock()
return value
}
- m.rmutex.RUnlock()
+ m.RUnlock()
return m.initMapEntry(key, newEntry)
}
+
+func (m *ConcurrentReadMap) Find(key string) (interface{}, bool) {
+ m.RLock()
+ value, ok := m.items[key]
+ m.RUnlock()
+ return value, ok
+}
+
+func (m *ConcurrentReadMap) Items() (itemsCopy []interface{}) {
+ m.RLock()
+ for _, i := range m.items {
+ itemsCopy = append(itemsCopy, i)
+ }
+ return itemsCopy
+}
+
+func (m *ConcurrentReadMap) Delete(key string) {
+ m.Lock()
+ delete(m.items, key)
+ m.Unlock()
+}
diff --git a/go/weed/weed_server/master_server_handlers_admin.go b/go/weed/weed_server/master_server_handlers_admin.go
index 383456356..07399596a 100644
--- a/go/weed/weed_server/master_server_handlers_admin.go
+++ b/go/weed/weed_server/master_server_handlers_admin.go
@@ -19,7 +19,7 @@ import (
)
func (ms *MasterServer) collectionDeleteHandler(w http.ResponseWriter, r *http.Request) {
- collection, ok := ms.Topo.GetCollection(r.FormValue("collection"))
+ collection, ok := ms.Topo.FindCollection(r.FormValue("collection"))
if !ok {
writeJsonError(w, r, http.StatusBadRequest, fmt.Errorf("collection %s does not exist", r.FormValue("collection")))
return