aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorChris Lu <chris.lu@gmail.com>2016-11-06 20:55:22 -0800
committerChris Lu <chris.lu@gmail.com>2016-11-06 20:55:22 -0800
commit36f96332238a37cb659bb3ff27d82febb1b22c91 (patch)
treea791cf25889ae2b8d42be5753add37f9b9a3ab39
parentdf49692dff75577f5eedc4d4251f1224b3f12799 (diff)
downloadseaweedfs-36f96332238a37cb659bb3ff27d82febb1b22c91.tar.xz
seaweedfs-36f96332238a37cb659bb3ff27d82febb1b22c91.zip
add locks for location.volumes
fix https://github.com/chrislusf/seaweedfs/issues/392
-rw-r--r--weed/storage/disk_location.go39
-rw-r--r--weed/storage/store.go14
2 files changed, 47 insertions, 6 deletions
diff --git a/weed/storage/disk_location.go b/weed/storage/disk_location.go
index cc3c83b63..fc11a411f 100644
--- a/weed/storage/disk_location.go
+++ b/weed/storage/disk_location.go
@@ -3,6 +3,7 @@ package storage
import (
"io/ioutil"
"strings"
+ "sync"
"github.com/chrislusf/seaweedfs/weed/glog"
)
@@ -11,6 +12,7 @@ type DiskLocation struct {
Directory string
MaxVolumeCount int
volumes map[VolumeId]*Volume
+ sync.RWMutex
}
func NewDiskLocation(dir string, maxVolumeCount int) *DiskLocation {
@@ -20,6 +22,8 @@ func NewDiskLocation(dir string, maxVolumeCount int) *DiskLocation {
}
func (l *DiskLocation) loadExistingVolumes(needleMapKind NeedleMapType) {
+ l.Lock()
+ defer l.Unlock()
if dirs, err := ioutil.ReadDir(l.Directory); err == nil {
for _, dir := range dirs {
@@ -48,6 +52,9 @@ func (l *DiskLocation) loadExistingVolumes(needleMapKind NeedleMapType) {
}
func (l *DiskLocation) DeleteCollectionFromDiskLocation(collection string) (e error) {
+ l.Lock()
+ defer l.Unlock()
+
for k, v := range l.volumes {
if v.Collection == collection {
e = l.deleteVolumeById(k)
@@ -71,3 +78,35 @@ func (l *DiskLocation) deleteVolumeById(vid VolumeId) (e error) {
delete(l.volumes, vid)
return
}
+
+func (l *DiskLocation) SetVolume(vid VolumeId, volume *Volume) {
+ l.Lock()
+ defer l.Unlock()
+
+ l.volumes[vid] = volume
+}
+
+func (l *DiskLocation) FindVolume(vid VolumeId) (*Volume, bool) {
+ l.RLock()
+ defer l.RUnlock()
+
+ v, ok := l.volumes[vid]
+ return v, ok
+}
+
+func (l *DiskLocation) VolumesLen() int {
+ l.RLock()
+ defer l.RUnlock()
+
+ return len(l.volumes)
+}
+
+func (l *DiskLocation) Close() {
+ l.Lock()
+ defer l.Unlock()
+
+ for _, v := range l.volumes {
+ v.Close()
+ }
+ return
+}
diff --git a/weed/storage/store.go b/weed/storage/store.go
index d6c7172e7..614c87ace 100644
--- a/weed/storage/store.go
+++ b/weed/storage/store.go
@@ -143,7 +143,7 @@ func (s *Store) DeleteCollection(collection string) (e error) {
func (s *Store) findVolume(vid VolumeId) *Volume {
for _, location := range s.Locations {
- if v, found := location.volumes[vid]; found {
+ if v, found := location.FindVolume(vid); found {
return v
}
}
@@ -152,7 +152,7 @@ func (s *Store) findVolume(vid VolumeId) *Volume {
func (s *Store) findFreeLocation() (ret *DiskLocation) {
max := 0
for _, location := range s.Locations {
- currentFreeCount := location.MaxVolumeCount - len(location.volumes)
+ currentFreeCount := location.MaxVolumeCount - location.VolumesLen()
if currentFreeCount > max {
max = currentFreeCount
ret = location
@@ -168,7 +168,7 @@ func (s *Store) addVolume(vid VolumeId, collection string, needleMapKind NeedleM
glog.V(0).Infof("In dir %s adds volume:%v collection:%s replicaPlacement:%v ttl:%v",
location.Directory, vid, collection, replicaPlacement, ttl)
if volume, err := NewVolume(location.Directory, collection, vid, needleMapKind, replicaPlacement, ttl); err == nil {
- location.volumes[vid] = volume
+ location.SetVolume(vid, volume)
return nil
} else {
return err
@@ -180,6 +180,7 @@ func (s *Store) addVolume(vid VolumeId, collection string, needleMapKind NeedleM
func (s *Store) Status() []*VolumeInfo {
var stats []*VolumeInfo
for _, location := range s.Locations {
+ location.RLock()
for k, v := range location.volumes {
s := &VolumeInfo{
Id: VolumeId(k),
@@ -194,6 +195,7 @@ func (s *Store) Status() []*VolumeInfo {
Ttl: v.Ttl}
stats = append(stats, s)
}
+ location.RUnlock()
}
sortVolumeInfos(stats)
return stats
@@ -219,6 +221,7 @@ func (s *Store) SendHeartbeatToMaster() (masterNode string, secretKey security.S
var maxFileKey uint64
for _, location := range s.Locations {
maxVolumeCount = maxVolumeCount + location.MaxVolumeCount
+ location.Lock()
for k, v := range location.volumes {
if maxFileKey < v.nm.MaxFileKey() {
maxFileKey = v.nm.MaxFileKey()
@@ -246,6 +249,7 @@ func (s *Store) SendHeartbeatToMaster() (masterNode string, secretKey security.S
}
}
}
+ location.Unlock()
}
joinMessage := &operation.JoinMessage{
@@ -290,9 +294,7 @@ func (s *Store) SendHeartbeatToMaster() (masterNode string, secretKey security.S
}
func (s *Store) Close() {
for _, location := range s.Locations {
- for _, v := range location.volumes {
- v.Close()
- }
+ location.Close()
}
}
func (s *Store) Write(i VolumeId, n *Needle) (size uint32, err error) {