aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorChris Lu <chris.lu@gmail.com>2019-04-21 13:32:36 -0700
committerChris Lu <chris.lu@gmail.com>2019-04-21 13:32:36 -0700
commitebe94be1a17986a3ffa4c78e48964dd8d6a731b3 (patch)
tree2b2d4212cc0e77dac5a4c319add07f0820eb0be9
parent0302b9496cedd0b830733a738ff973a9355774a2 (diff)
downloadseaweedfs-ebe94be1a17986a3ffa4c78e48964dd8d6a731b3.tar.xz
seaweedfs-ebe94be1a17986a3ffa4c78e48964dd8d6a731b3.zip
maintain layout correctness when changing volumes
-rw-r--r--weed/topology/volume_layout.go46
1 files changed, 32 insertions, 14 deletions
diff --git a/weed/topology/volume_layout.go b/weed/topology/volume_layout.go
index 6ab90e9e2..799cbca62 100644
--- a/weed/topology/volume_layout.go
+++ b/weed/topology/volume_layout.go
@@ -50,7 +50,7 @@ func (vl *VolumeLayout) RegisterVolume(v *storage.VolumeInfo, dn *DataNode) {
vl.accessLock.Lock()
defer vl.accessLock.Unlock()
- if _, ok := vl.vid2location[v.Id]; !ok || vl.vid2location[v.Id] == nil {
+ if _, ok := vl.vid2location[v.Id]; !ok {
vl.vid2location[v.Id] = NewVolumeLocationList()
}
vl.vid2location[v.Id].Set(dn)
@@ -72,17 +72,13 @@ func (vl *VolumeLayout) RegisterVolume(v *storage.VolumeInfo, dn *DataNode) {
return
}
}
- if vl.vid2location[v.Id].Length() == vl.rp.GetCopyCount() && vl.isWritable(v) {
- if _, ok := vl.oversizedVolumes[v.Id]; !ok {
- vl.addToWritable(v.Id)
- }
- } else {
- vl.rememberOversizedVolumne(v)
- vl.removeFromWritable(v.Id)
- }
+
+ vl.rememberOversizedVolume(v)
+ vl.ensureCorrectWritables(v)
+
}
-func (vl *VolumeLayout) rememberOversizedVolumne(v *storage.VolumeInfo) {
+func (vl *VolumeLayout) rememberOversizedVolume(v *storage.VolumeInfo) {
if vl.isOversized(v) {
vl.oversizedVolumes[v.Id] = true
}
@@ -92,9 +88,31 @@ func (vl *VolumeLayout) UnRegisterVolume(v *storage.VolumeInfo, dn *DataNode) {
vl.accessLock.Lock()
defer vl.accessLock.Unlock()
- vl.removeFromWritable(v.Id)
- delete(vl.vid2location, v.Id) // somehow this line does not work as expected
- // vl.vid2location[v.Id] = nil
+ // remove from vid2location map
+ location, ok := vl.vid2location[v.Id]
+ if !ok {
+ return
+ }
+
+ if location.Remove(dn) {
+
+ vl.ensureCorrectWritables(v)
+
+ if location.Length() == 0 {
+ delete(vl.vid2location, v.Id)
+ }
+
+ }
+}
+
+func (vl *VolumeLayout) ensureCorrectWritables(v *storage.VolumeInfo) {
+ if vl.vid2location[v.Id].Length() == vl.rp.GetCopyCount() && vl.isWritable(v) {
+ if _, ok := vl.oversizedVolumes[v.Id]; !ok {
+ vl.addToWritable(v.Id)
+ }
+ } else {
+ vl.removeFromWritable(v.Id)
+ }
}
func (vl *VolumeLayout) addToWritable(vid needle.VolumeId) {
@@ -252,7 +270,7 @@ func (vl *VolumeLayout) SetVolumeAvailable(dn *DataNode, vid needle.VolumeId) bo
defer vl.accessLock.Unlock()
vl.vid2location[vid].Set(dn)
- if vl.vid2location[vid].Length() >= vl.rp.GetCopyCount() {
+ if vl.vid2location[vid].Length() == vl.rp.GetCopyCount() {
return vl.setVolumeWritable(vid)
}
return false