aboutsummaryrefslogtreecommitdiff
path: root/go/topology
diff options
context:
space:
mode:
Diffstat (limited to 'go/topology')
-rw-r--r--go/topology/volume_layout.go18
1 files changed, 15 insertions, 3 deletions
diff --git a/go/topology/volume_layout.go b/go/topology/volume_layout.go
index 3c1dd9503..7b6c0f117 100644
--- a/go/topology/volume_layout.go
+++ b/go/topology/volume_layout.go
@@ -17,7 +17,7 @@ type VolumeLayout struct {
vid2location map[storage.VolumeId]*VolumeLocationList
writables []storage.VolumeId // transient array of writable volume id
volumeSizeLimit uint64
- accessLock sync.Mutex
+ accessLock sync.RWMutex
}
func NewVolumeLayout(rp *storage.ReplicaPlacement, ttl *storage.TTL, volumeSizeLimit uint64) *VolumeLayout {
@@ -44,7 +44,7 @@ func (vl *VolumeLayout) RegisterVolume(v *storage.VolumeInfo, dn *DataNode) {
vl.vid2location[v.Id].Set(dn)
glog.V(4).Infoln("volume", v.Id, "added to dn", dn.Id(), "len", vl.vid2location[v.Id].Length(), "copy", v.ReplicaPlacement.GetCopyCount())
if vl.vid2location[v.Id].Length() == vl.rp.GetCopyCount() && vl.isWritable(v) {
- vl.AddToWritable(v.Id)
+ vl.addToWritable(v.Id)
} else {
vl.removeFromWritable(v.Id)
}
@@ -58,7 +58,7 @@ func (vl *VolumeLayout) UnRegisterVolume(v *storage.VolumeInfo, dn *DataNode) {
delete(vl.vid2location, v.Id)
}
-func (vl *VolumeLayout) AddToWritable(vid storage.VolumeId) {
+func (vl *VolumeLayout) addToWritable(vid storage.VolumeId) {
for _, id := range vl.writables {
if vid == id {
return
@@ -74,6 +74,9 @@ func (vl *VolumeLayout) isWritable(v *storage.VolumeInfo) bool {
}
func (vl *VolumeLayout) Lookup(vid storage.VolumeId) []*DataNode {
+ vl.accessLock.RLock()
+ defer vl.accessLock.RUnlock()
+
if location := vl.vid2location[vid]; location != nil {
return location.list
}
@@ -81,6 +84,9 @@ func (vl *VolumeLayout) Lookup(vid storage.VolumeId) []*DataNode {
}
func (vl *VolumeLayout) ListVolumeServers() (nodes []*DataNode) {
+ vl.accessLock.RLock()
+ defer vl.accessLock.RUnlock()
+
for _, location := range vl.vid2location {
nodes = append(nodes, location.list...)
}
@@ -88,6 +94,9 @@ func (vl *VolumeLayout) ListVolumeServers() (nodes []*DataNode) {
}
func (vl *VolumeLayout) PickForWrite(count uint64, option *VolumeGrowOption) (*storage.VolumeId, uint64, *VolumeLocationList, error) {
+ vl.accessLock.RLock()
+ defer vl.accessLock.RUnlock()
+
len_writers := len(vl.writables)
if len_writers <= 0 {
glog.V(0).Infoln("No more writable volumes!")
@@ -125,6 +134,9 @@ func (vl *VolumeLayout) PickForWrite(count uint64, option *VolumeGrowOption) (*s
}
func (vl *VolumeLayout) GetActiveVolumeCount(option *VolumeGrowOption) int {
+ vl.accessLock.RLock()
+ defer vl.accessLock.RUnlock()
+
if option.DataCenter == "" {
return len(vl.writables)
}