aboutsummaryrefslogtreecommitdiff
path: root/go
diff options
context:
space:
mode:
authorChris Lu <chris.lu@gmail.com>2013-07-15 21:34:43 -0700
committerChris Lu <chris.lu@gmail.com>2013-07-15 21:34:43 -0700
commitdd2245956f91d3d704766f9d44b988b6ac30d345 (patch)
treeb4779697574eeae4516b42426e1466bc996a6c33 /go
parentb409ccc5ab4bbf46a8b465aa5b374dc2a09036b3 (diff)
downloadseaweedfs-dd2245956f91d3d704766f9d44b988b6ac30d345.tar.xz
seaweedfs-dd2245956f91d3d704766f9d44b988b6ac30d345.zip
better locking to prevent any possible memory access error
Diffstat (limited to 'go')
-rw-r--r--go/topology/volume_layout.go15
1 files changed, 15 insertions, 0 deletions
diff --git a/go/topology/volume_layout.go b/go/topology/volume_layout.go
index 7d7e3f662..9664d3748 100644
--- a/go/topology/volume_layout.go
+++ b/go/topology/volume_layout.go
@@ -5,6 +5,7 @@ import (
"errors"
"log"
"math/rand"
+ "sync"
)
type VolumeLayout struct {
@@ -13,6 +14,7 @@ type VolumeLayout struct {
writables []storage.VolumeId // transient array of writable volume id
pulse int64
volumeSizeLimit uint64
+ accessLock sync.Mutex
}
func NewVolumeLayout(repType storage.ReplicationType, volumeSizeLimit uint64, pulse int64) *VolumeLayout {
@@ -26,6 +28,9 @@ func NewVolumeLayout(repType storage.ReplicationType, volumeSizeLimit uint64, pu
}
func (vl *VolumeLayout) RegisterVolume(v *storage.VolumeInfo, dn *DataNode) {
+ vl.accessLock.Lock()
+ defer vl.accessLock.Unlock()
+
if _, ok := vl.vid2location[v.Id]; !ok {
vl.vid2location[v.Id] = NewVolumeLocationList()
}
@@ -121,6 +126,9 @@ func (vl *VolumeLayout) setVolumeWritable(vid storage.VolumeId) bool {
}
func (vl *VolumeLayout) SetVolumeUnavailable(dn *DataNode, vid storage.VolumeId) bool {
+ vl.accessLock.Lock()
+ defer vl.accessLock.Unlock()
+
if vl.vid2location[vid].Remove(dn) {
if vl.vid2location[vid].Length() < vl.repType.GetCopyCount() {
log.Println("Volume", vid, "has", vl.vid2location[vid].Length(), "replica, less than required", vl.repType.GetCopyCount())
@@ -130,6 +138,9 @@ func (vl *VolumeLayout) SetVolumeUnavailable(dn *DataNode, vid storage.VolumeId)
return false
}
func (vl *VolumeLayout) SetVolumeAvailable(dn *DataNode, vid storage.VolumeId) bool {
+ vl.accessLock.Lock()
+ defer vl.accessLock.Unlock()
+
if vl.vid2location[vid].Add(dn) {
if vl.vid2location[vid].Length() >= vl.repType.GetCopyCount() {
return vl.setVolumeWritable(vid)
@@ -139,6 +150,10 @@ func (vl *VolumeLayout) SetVolumeAvailable(dn *DataNode, vid storage.VolumeId) b
}
func (vl *VolumeLayout) SetVolumeCapacityFull(vid storage.VolumeId) bool {
+ vl.accessLock.Lock()
+ defer vl.accessLock.Unlock()
+
+ log.Println("Volume", vid, "reaches full capacity.")
return vl.removeFromWritable(vid)
}