aboutsummaryrefslogtreecommitdiff
path: root/go/topology/volume_layout.go
diff options
context:
space:
mode:
Diffstat (limited to 'go/topology/volume_layout.go')
-rw-r--r--go/topology/volume_layout.go15
1 files changed, 15 insertions, 0 deletions
diff --git a/go/topology/volume_layout.go b/go/topology/volume_layout.go
index 7d7e3f662..9664d3748 100644
--- a/go/topology/volume_layout.go
+++ b/go/topology/volume_layout.go
@@ -5,6 +5,7 @@ import (
"errors"
"log"
"math/rand"
+ "sync"
)
type VolumeLayout struct {
@@ -13,6 +14,7 @@ type VolumeLayout struct {
writables []storage.VolumeId // transient array of writable volume id
pulse int64
volumeSizeLimit uint64
+ accessLock sync.Mutex
}
func NewVolumeLayout(repType storage.ReplicationType, volumeSizeLimit uint64, pulse int64) *VolumeLayout {
@@ -26,6 +28,9 @@ func NewVolumeLayout(repType storage.ReplicationType, volumeSizeLimit uint64, pu
}
func (vl *VolumeLayout) RegisterVolume(v *storage.VolumeInfo, dn *DataNode) {
+ vl.accessLock.Lock()
+ defer vl.accessLock.Unlock()
+
if _, ok := vl.vid2location[v.Id]; !ok {
vl.vid2location[v.Id] = NewVolumeLocationList()
}
@@ -121,6 +126,9 @@ func (vl *VolumeLayout) setVolumeWritable(vid storage.VolumeId) bool {
}
func (vl *VolumeLayout) SetVolumeUnavailable(dn *DataNode, vid storage.VolumeId) bool {
+ vl.accessLock.Lock()
+ defer vl.accessLock.Unlock()
+
if vl.vid2location[vid].Remove(dn) {
if vl.vid2location[vid].Length() < vl.repType.GetCopyCount() {
log.Println("Volume", vid, "has", vl.vid2location[vid].Length(), "replica, less than required", vl.repType.GetCopyCount())
@@ -130,6 +138,9 @@ func (vl *VolumeLayout) SetVolumeUnavailable(dn *DataNode, vid storage.VolumeId)
return false
}
func (vl *VolumeLayout) SetVolumeAvailable(dn *DataNode, vid storage.VolumeId) bool {
+ vl.accessLock.Lock()
+ defer vl.accessLock.Unlock()
+
if vl.vid2location[vid].Add(dn) {
if vl.vid2location[vid].Length() >= vl.repType.GetCopyCount() {
return vl.setVolumeWritable(vid)
@@ -139,6 +150,10 @@ func (vl *VolumeLayout) SetVolumeAvailable(dn *DataNode, vid storage.VolumeId) b
}
func (vl *VolumeLayout) SetVolumeCapacityFull(vid storage.VolumeId) bool {
+ vl.accessLock.Lock()
+ defer vl.accessLock.Unlock()
+
+ log.Println("Volume", vid, "reaches full capacity.")
return vl.removeFromWritable(vid)
}