aboutsummaryrefslogtreecommitdiff
path: root/weed/topology/volume_layout.go
diff options
context:
space:
mode:
Diffstat (limited to 'weed/topology/volume_layout.go')
-rw-r--r--weed/topology/volume_layout.go14
1 files changed, 11 insertions, 3 deletions
diff --git a/weed/topology/volume_layout.go b/weed/topology/volume_layout.go
index 908bbb9e9..9e84fd2da 100644
--- a/weed/topology/volume_layout.go
+++ b/weed/topology/volume_layout.go
@@ -22,6 +22,7 @@ type VolumeLayout struct {
readonlyVolumes map[needle.VolumeId]bool // transient set of readonly volumes
oversizedVolumes map[needle.VolumeId]bool // set of oversized volumes
volumeSizeLimit uint64
+ replicationAsMin bool
accessLock sync.RWMutex
}
@@ -31,7 +32,7 @@ type VolumeLayoutStats struct {
FileCount uint64
}
-func NewVolumeLayout(rp *super_block.ReplicaPlacement, ttl *needle.TTL, volumeSizeLimit uint64) *VolumeLayout {
+func NewVolumeLayout(rp *super_block.ReplicaPlacement, ttl *needle.TTL, volumeSizeLimit uint64, replicationAsMin bool) *VolumeLayout {
return &VolumeLayout{
rp: rp,
ttl: ttl,
@@ -40,6 +41,7 @@ func NewVolumeLayout(rp *super_block.ReplicaPlacement, ttl *needle.TTL, volumeSi
readonlyVolumes: make(map[needle.VolumeId]bool),
oversizedVolumes: make(map[needle.VolumeId]bool),
volumeSizeLimit: volumeSizeLimit,
+ replicationAsMin: replicationAsMin,
}
}
@@ -107,7 +109,7 @@ func (vl *VolumeLayout) UnRegisterVolume(v *storage.VolumeInfo, dn *DataNode) {
}
func (vl *VolumeLayout) ensureCorrectWritables(v *storage.VolumeInfo) {
- if vl.vid2location[v.Id].Length() == vl.rp.GetCopyCount() && vl.isWritable(v) {
+ if vl.enoughCopies(v.Id) && vl.isWritable(v) {
if _, ok := vl.oversizedVolumes[v.Id]; !ok {
vl.setVolumeWritable(v.Id)
}
@@ -272,12 +274,18 @@ func (vl *VolumeLayout) SetVolumeAvailable(dn *DataNode, vid needle.VolumeId, is
return false
}
- if vl.vid2location[vid].Length() == vl.rp.GetCopyCount() {
+ if vl.enoughCopies(vid) {
return vl.setVolumeWritable(vid)
}
return false
}
+func (vl *VolumeLayout) enoughCopies(vid needle.VolumeId) bool {
+ locations := vl.vid2location[vid].Length()
+ desired := vl.rp.GetCopyCount()
+ return locations == desired || (vl.replicationAsMin && locations > desired)
+}
+
func (vl *VolumeLayout) SetVolumeCapacityFull(vid needle.VolumeId) bool {
vl.accessLock.Lock()
defer vl.accessLock.Unlock()