diff options
Diffstat (limited to 'weed/topology/volume_layout.go')
| -rw-r--r-- | weed/topology/volume_layout.go | 14 |
1 files changed, 11 insertions, 3 deletions
diff --git a/weed/topology/volume_layout.go b/weed/topology/volume_layout.go index 908bbb9e9..9e84fd2da 100644 --- a/weed/topology/volume_layout.go +++ b/weed/topology/volume_layout.go @@ -22,6 +22,7 @@ type VolumeLayout struct { readonlyVolumes map[needle.VolumeId]bool // transient set of readonly volumes oversizedVolumes map[needle.VolumeId]bool // set of oversized volumes volumeSizeLimit uint64 + replicationAsMin bool accessLock sync.RWMutex } @@ -31,7 +32,7 @@ type VolumeLayoutStats struct { FileCount uint64 } -func NewVolumeLayout(rp *super_block.ReplicaPlacement, ttl *needle.TTL, volumeSizeLimit uint64) *VolumeLayout { +func NewVolumeLayout(rp *super_block.ReplicaPlacement, ttl *needle.TTL, volumeSizeLimit uint64, replicationAsMin bool) *VolumeLayout { return &VolumeLayout{ rp: rp, ttl: ttl, @@ -40,6 +41,7 @@ func NewVolumeLayout(rp *super_block.ReplicaPlacement, ttl *needle.TTL, volumeSi readonlyVolumes: make(map[needle.VolumeId]bool), oversizedVolumes: make(map[needle.VolumeId]bool), volumeSizeLimit: volumeSizeLimit, + replicationAsMin: replicationAsMin, } } @@ -107,7 +109,7 @@ func (vl *VolumeLayout) UnRegisterVolume(v *storage.VolumeInfo, dn *DataNode) { } func (vl *VolumeLayout) ensureCorrectWritables(v *storage.VolumeInfo) { - if vl.vid2location[v.Id].Length() == vl.rp.GetCopyCount() && vl.isWritable(v) { + if vl.enoughCopies(v.Id) && vl.isWritable(v) { if _, ok := vl.oversizedVolumes[v.Id]; !ok { vl.setVolumeWritable(v.Id) } @@ -272,12 +274,18 @@ func (vl *VolumeLayout) SetVolumeAvailable(dn *DataNode, vid needle.VolumeId, is return false } - if vl.vid2location[vid].Length() == vl.rp.GetCopyCount() { + if vl.enoughCopies(vid) { return vl.setVolumeWritable(vid) } return false } +func (vl *VolumeLayout) enoughCopies(vid needle.VolumeId) bool { + locations := vl.vid2location[vid].Length() + desired := vl.rp.GetCopyCount() + return locations == desired || (vl.replicationAsMin && locations > desired) +} + func (vl *VolumeLayout) SetVolumeCapacityFull(vid needle.VolumeId) bool { vl.accessLock.Lock() defer vl.accessLock.Unlock() |
