aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorChris Lu <chris.lu@gmail.com>2016-06-27 15:28:23 -0700
committerChris Lu <chris.lu@gmail.com>2016-06-27 15:28:23 -0700
commitb617b13c4338d031e1a78006f079fc935b0948b0 (patch)
treefacee2d0068056751f4d1e8518cdc384ee6933e9
parentd0dbf6d2eafc0a8758427870a8536f2fe1b82cc6 (diff)
downloadseaweedfs-b617b13c4338d031e1a78006f079fc935b0948b0.tar.xz
seaweedfs-b617b13c4338d031e1a78006f079fc935b0948b0.zip
remember oversized volumes
fix https://github.com/chrislusf/seaweedfs/issues/331
-rw-r--r--weed/topology/volume_layout.go41
1 files changed, 28 insertions, 13 deletions
diff --git a/weed/topology/volume_layout.go b/weed/topology/volume_layout.go
index e500de583..1816577b4 100644
--- a/weed/topology/volume_layout.go
+++ b/weed/topology/volume_layout.go
@@ -12,21 +12,23 @@ import (
// mapping from volume to its locations, inverted from server to volume
type VolumeLayout struct {
- rp *storage.ReplicaPlacement
- ttl *storage.TTL
- vid2location map[storage.VolumeId]*VolumeLocationList
- writables []storage.VolumeId // transient array of writable volume id
- volumeSizeLimit uint64
- accessLock sync.RWMutex
+ rp *storage.ReplicaPlacement
+ ttl *storage.TTL
+ vid2location map[storage.VolumeId]*VolumeLocationList
+ writables []storage.VolumeId // transient array of writable volume id
+ oversizedVolumes map[storage.VolumeId]bool // set of oversized volumes
+ volumeSizeLimit uint64
+ accessLock sync.RWMutex
}
func NewVolumeLayout(rp *storage.ReplicaPlacement, ttl *storage.TTL, volumeSizeLimit uint64) *VolumeLayout {
return &VolumeLayout{
- rp: rp,
- ttl: ttl,
- vid2location: make(map[storage.VolumeId]*VolumeLocationList),
- writables: *new([]storage.VolumeId),
- volumeSizeLimit: volumeSizeLimit,
+ rp: rp,
+ ttl: ttl,
+ vid2location: make(map[storage.VolumeId]*VolumeLocationList),
+ writables: *new([]storage.VolumeId),
+ oversizedVolumes: make(map[storage.VolumeId]bool),
+ volumeSizeLimit: volumeSizeLimit,
}
}
@@ -44,12 +46,21 @@ func (vl *VolumeLayout) RegisterVolume(v *storage.VolumeInfo, dn *DataNode) {
vl.vid2location[v.Id].Set(dn)
glog.V(4).Infoln("volume", v.Id, "added to dn", dn.Id(), "len", vl.vid2location[v.Id].Length(), "copy", v.ReplicaPlacement.GetCopyCount())
if vl.vid2location[v.Id].Length() == vl.rp.GetCopyCount() && vl.isWritable(v) {
- vl.addToWritable(v.Id)
+ if _, ok := vl.oversizedVolumes[v.Id]; !ok {
+ vl.addToWritable(v.Id)
+ }
} else {
+ vl.rememberOversizedVolumne(v)
vl.removeFromWritable(v.Id)
}
}
+func (vl *VolumeLayout) rememberOversizedVolumne(v *storage.VolumeInfo) {
+ if vl.isOversized(v) {
+ vl.oversizedVolumes[v.Id] = true
+ }
+}
+
func (vl *VolumeLayout) UnRegisterVolume(v *storage.VolumeInfo, dn *DataNode) {
vl.accessLock.Lock()
defer vl.accessLock.Unlock()
@@ -67,8 +78,12 @@ func (vl *VolumeLayout) addToWritable(vid storage.VolumeId) {
vl.writables = append(vl.writables, vid)
}
+func (vl *VolumeLayout) isOversized(v *storage.VolumeInfo) bool {
+ return uint64(v.Size) < vl.volumeSizeLimit
+}
+
func (vl *VolumeLayout) isWritable(v *storage.VolumeInfo) bool {
- return uint64(v.Size) < vl.volumeSizeLimit &&
+ return vl.isOversized(v) &&
v.Version == storage.CurrentVersion &&
!v.ReadOnly
}